Merge pull request #9325 from appwrite/pla-1883

refactor: migrate Realtime::send calls to queueForRealtime
This commit is contained in:
Jake Barnby 2025-03-03 21:23:55 +13:00 committed by GitHub
commit d85b28aabc
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 271 additions and 308 deletions

View file

@ -6,13 +6,14 @@ use Appwrite\Event\Build;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Realtime;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\Validator\FunctionEvent;
use Appwrite\Event\Webhook;
use Appwrite\Extend\Exception;
use Appwrite\Extend\Exception as AppwriteException;
use Appwrite\Functions\Validator\Headers;
use Appwrite\Functions\Validator\RuntimeSpecification;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Platform\Tasks\ScheduleExecutions;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@ -194,9 +195,12 @@ App::post('/v1/functions')
->inject('user')
->inject('queueForEvents')
->inject('queueForBuilds')
->inject('queueForWebhooks')
->inject('queueForFunctions')
->inject('queueForRealtime')
->inject('dbForPlatform')
->inject('gitHub')
->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, callable $timelimit, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Database $dbForPlatform, GitHub $github) use ($redeployVcs) {
->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, callable $timelimit, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Database $dbForPlatform, GitHub $github) use ($redeployVcs) {
$functionId = ($functionId == 'unique()') ? ID::unique() : $functionId;
// Temporary abuse check
@ -386,51 +390,29 @@ App::post('/v1/functions')
]))
);
/** Trigger Webhook */
$ruleModel = new Rule();
$ruleCreate =
$queueForEvents
->setClass(Event::WEBHOOK_CLASS_NAME)
->setQueue(Event::WEBHOOK_QUEUE_NAME);
->setProject($project)
->setEvent('rules.[ruleId].create')
->setParam('ruleId', $rule->getId())
->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules())));
$ruleCreate
->setProject($project)
->setEvent('rules.[ruleId].create')
->setParam('ruleId', $rule->getId())
->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules())))
/** Trigger Webhook */
$queueForWebhooks
->from($ruleCreate)
->trigger();
/** Trigger Functions */
$ruleCreate
->setClass(Event::FUNCTIONS_CLASS_NAME)
->setQueue(Event::FUNCTIONS_QUEUE_NAME)
$queueForFunctions
->from($ruleCreate)
->trigger();
/** Trigger realtime event */
$allEvents = Event::generateEvents('rules.[ruleId].create', [
'ruleId' => $rule->getId(),
]);
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $rule,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $rule->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
Realtime::send(
projectId: $project->getId(),
payload: $rule->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
/** Trigger Realtime Events */
$queueForRealtime
->from($ruleCreate)
->setSubscribers(['console', $project->getId()])
->trigger();
}
$queueForEvents->setParam('functionId', $function->getId());

View file

@ -13,12 +13,14 @@ use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
use Appwrite\Event\Migration;
use Appwrite\Event\Realtime;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\StatsUsageDump;
/** remove */
use Appwrite\Event\Usage;
use Appwrite\Event\UsageDump;
/** /remove */
use Appwrite\Event\Webhook;
use Appwrite\Platform\Appwrite;
use Swoole\Runtime;
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
@ -313,10 +315,18 @@ Server::setResource('queueForAudits', function (Publisher $publisher) {
return new Audit($publisher);
}, ['publisher']);
Server::setResource('queueForWebhooks', function (Publisher $publisher) {
return new Webhook($publisher);
}, ['publisher']);
Server::setResource('queueForFunctions', function (Publisher $publisher) {
return new Func($publisher);
}, ['publisher']);
Server::setResource('queueForRealtime', function () {
return new Realtime();
}, []);
Server::setResource('queueForCertificates', function (Publisher $publisher) {
return new Certificate($publisher);
}, ['publisher']);

View file

@ -7,10 +7,17 @@ use Utopia\Database\Document;
class Realtime extends Event
{
protected array $subscribers = [];
public function __construct()
{
}
/**
* Get Realtime payload for this event.
*
* @return array
*/
public function getRealtimePayload(): array
{
$payload = [];
@ -24,6 +31,28 @@ class Realtime extends Event
return $payload;
}
/**
* Set subscribers for this realtime event.
*
* @param array $subscribers
* @return array
*/
public function setSubscribers(array $subscribers): self
{
$this->subscribers = $subscribers;
return $this;
}
/**
* Get subscribers for this realtime event.
*
* @return array
*/
public function getSubscribers(): array
{
return $this->subscribers;
}
/**
* Execute Event.
*
@ -53,17 +82,23 @@ class Realtime extends Event
bucket: $bucket,
);
RealtimeAdapter::send(
projectId: $target['projectId'] ?? $this->getProject()->getId(),
payload: $this->getRealtimePayload(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles'],
options: [
'permissionsChanged' => $target['permissionsChanged'],
'userId' => $this->getParam('userId')
]
);
$projectIds = !empty($this->getSubscribers())
? $this->getSubscribers()
: [$target['projectId'] ?? $this->getProject()->getId()];
foreach ($projectIds as $projectId) {
RealtimeAdapter::send(
projectId: $projectId,
payload: $this->getRealtimePayload(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles'],
options: [
'permissionsChanged' => $target['permissionsChanged'],
'userId' => $this->getParam('userId')
]
);
}
return true;
}

View file

@ -5,8 +5,9 @@ namespace Appwrite\Platform\Workers;
use Ahc\Jwt\JWT;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Realtime;
use Appwrite\Event\StatsUsage;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Event\Webhook;
use Appwrite\Utopia\Response\Model\Deployment;
use Appwrite\Vcs\Comment;
use Exception;
@ -49,15 +50,17 @@ class Builds extends Action
->inject('project')
->inject('dbForPlatform')
->inject('queueForEvents')
->inject('queueForWebhooks')
->inject('queueForFunctions')
->inject('queueForRealtime')
->inject('queueForStatsUsage')
->inject('cache')
->inject('dbForProject')
->inject('deviceForFunctions')
->inject('isResourceBlocked')
->inject('log')
->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log) =>
$this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $deviceForFunctions, $isResourceBlocked, $log));
->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log) =>
$this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $usage, $cache, $dbForProject, $deviceForFunctions, $isResourceBlocked, $log));
}
/**
@ -65,7 +68,9 @@ class Builds extends Action
* @param Document $project
* @param Database $dbForPlatform
* @param Event $queueForEvents
* @param Webhook $queueForWebhooks
* @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @param StatsUsage $queueForStatsUsage
* @param Cache $cache
* @param Database $dbForProject
@ -74,7 +79,7 @@ class Builds extends Action
* @return void
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log): void
public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -95,7 +100,7 @@ class Builds extends Action
case BUILD_TYPE_RETRY:
Console::info('Creating build for deployment: ' . $deployment->getId());
$github = new GitHub($cache);
$this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $isResourceBlocked, $log);
$this->buildDeployment($deviceForFunctions, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $isResourceBlocked, $log);
break;
default:
@ -105,7 +110,9 @@ class Builds extends Action
/**
* @param Device $deviceForFunctions
* @param Webhook $queueForWebhooks
* @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @param Event $queueForEvents
* @param StatsUsage $queueForStatsUsage
* @param Database $dbForPlatform
@ -120,7 +127,7 @@ class Builds extends Action
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, callable $isResourceBlocked, Log $log): void
protected function buildDeployment(Device $deviceForFunctions, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, callable $isResourceBlocked, Log $log): void
{
$executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST'));
@ -158,10 +165,7 @@ class Builds extends Action
}
// Realtime preparation
$allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [
'functionId' => $function->getId(),
'deploymentId' => $deployment->getId()
]);
$event = "functions.[functionId].deployments.[deploymentId].update";
$startTime = DateTime::now();
$durationStart = \microtime(true);
@ -375,21 +379,16 @@ class Builds extends Action
$deployment = $dbForProject->updateDocument('deployments', $deployment->getId(), $deployment);
/**
* Send realtime Event
* Trigger Realtime Event
*/
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $build,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $build->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
$queueForRealtime
->setProject($project)
->setSubscribers(['console'])
->setEvent($event)
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId())
->setPayload($build->getArrayCopy())
->trigger();
}
$tmpPath = '/tmp/builds/' . $buildId;
@ -436,40 +435,34 @@ class Builds extends Action
$this->runGitAction('building', $github, $providerCommitHash, $owner, $repositoryName, $project, $function, $deployment->getId(), $dbForProject, $dbForPlatform);
}
/** Trigger Webhook */
$deploymentModel = new Deployment();
$deploymentUpdate =
$queueForEvents
->setQueue(Event::WEBHOOK_QUEUE_NAME)
->setClass(Event::WEBHOOK_CLASS_NAME)
->setProject($project)
->setEvent('functions.[functionId].deployments.[deploymentId].update')
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId())
->setPayload($deployment->getArrayCopy(array_keys($deploymentModel->getRules())));
$deploymentUpdate->trigger();
/** Trigger Webhook */
$queueForWebhooks
->from($deploymentUpdate)
->trigger();
/** Trigger Functions */
$queueForFunctions
->from($deploymentUpdate)
->trigger();
/** Trigger Realtime */
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $build,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $build->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
/** Trigger Realtime Event */
$queueForRealtime
->setProject($project)
->setSubscribers(['console'])
->setEvent($event)
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId())
->setPayload($build->getArrayCopy())
->trigger();
$vars = [];
@ -562,12 +555,12 @@ class Builds extends Action
$err = $error;
}
}),
Co\go(function () use ($executor, $project, $deployment, &$response, &$build, $dbForProject, $allEvents, &$err, &$isCanceled) {
Co\go(function () use ($executor, $project, $function, $deployment, &$response, &$build, $dbForProject, $event, &$err, $queueForRealtime, &$isCanceled) {
try {
$executor->getLogs(
deploymentId: $deployment->getId(),
projectId: $project->getId(),
callback: function ($logs) use (&$response, &$err, &$build, $dbForProject, $allEvents, $project, &$isCanceled) {
callback: function ($logs) use (&$response, &$err, &$build, $dbForProject, $event, $project, $function, $deployment, $queueForRealtime, &$isCanceled) {
if ($isCanceled) {
return;
}
@ -592,21 +585,16 @@ class Builds extends Action
$build = $dbForProject->updateDocument('builds', $build->getId(), $build);
/**
* Send realtime Event
* Trigger Realtime Event
*/
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $build,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $build->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
$queueForRealtime
->setProject($project)
->setSubscribers(['console'])
->setEvent($event)
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId())
->setPayload($build->getArrayCopy())
->trigger();
}
}
);
@ -694,21 +682,16 @@ class Builds extends Action
}
} finally {
/**
* Send realtime Event
* Trigger Realtime Event
*/
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $build,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $build->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
$queueForRealtime
->setProject($project)
->setSubscribers(['console'])
->setEvent($event)
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId())
->setPayload($build->getArrayCopy())
->trigger();
/** Trigger usage queue */
if ($build->getAttribute('status') === 'ready') {

View file

@ -6,7 +6,8 @@ use Appwrite\Certificates\Adapter as CertificatesAdapter;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Event\Realtime;
use Appwrite\Event\Webhook;
use Appwrite\Network\Validator\CNAME;
use Appwrite\Template\Template;
use Appwrite\Utopia\Response\Model\Rule;
@ -46,12 +47,14 @@ class Certificates extends Action
->inject('dbForPlatform')
->inject('queueForMails')
->inject('queueForEvents')
->inject('queueForWebhooks')
->inject('queueForFunctions')
->inject('queueForRealtime')
->inject('log')
->inject('certificates')
->callback(
fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, CertificatesAdapter $certificates) =>
$this->action($message, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $log, $certificates)
fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates) =>
$this->action($message, $dbForPlatform, $queueForMails, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $log, $certificates)
);
}
@ -60,14 +63,16 @@ class Certificates extends Action
* @param Database $dbForPlatform
* @param Mail $queueForMails
* @param Event $queueForEvents
* @param Webhook $queueForWebhooks
* @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @param Log $log
* @param CertificatesAdapter $certificates
* @return void
* @throws Throwable
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, CertificatesAdapter $certificates): void
public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates): void
{
$payload = $message->getPayload() ?? [];
@ -81,7 +86,7 @@ class Certificates extends Action
$log->addTag('domain', $domain->get());
$this->execute($domain, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $log, $certificates, $skipRenewCheck);
$this->execute($domain, $dbForPlatform, $queueForMails, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $log, $certificates, $skipRenewCheck);
}
/**
@ -90,13 +95,14 @@ class Certificates extends Action
* @param Mail $queueForMails
* @param Event $queueForEvents
* @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @param CertificatesAdapter $certificates
* @param bool $skipRenewCheck
* @return void
* @throws Throwable
* @throws \Utopia\Database\Exception
*/
private function execute(Domain $domain, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, CertificatesAdapter $certificates, bool $skipRenewCheck = false): void
private function execute(Domain $domain, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates, bool $skipRenewCheck = false): void
{
/**
* 1. Read arguments and validate domain
@ -186,7 +192,7 @@ class Certificates extends Action
$certificate->setAttribute('updated', DateTime::now());
// Save all changes we made to certificate document into database
$this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForPlatform, $queueForEvents, $queueForFunctions);
$this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime);
}
}
@ -199,13 +205,14 @@ class Certificates extends Action
* @param Database $dbForPlatform Database connection for console
* @param Event $queueForEvents
* @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @return void
* @throws \Utopia\Database\Exception
* @throws Authorization
* @throws Conflict
* @throws Structure
*/
private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions): void
private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime): void
{
// Check if update or insert required
$certificateDocument = $dbForPlatform->findOne('certificates', [Query::equal('domain', [$domain])]);
@ -219,7 +226,7 @@ class Certificates extends Action
}
$certificateId = $certificate->getId();
$this->updateDomainDocuments($certificateId, $domain, $success, $dbForPlatform, $queueForEvents, $queueForFunctions);
$this->updateDomainDocuments($certificateId, $domain, $success, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime);
}
/**
@ -338,7 +345,7 @@ class Certificates extends Action
*
* @return void
*/
private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions): void
private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime): void
{
// TODO: @christyjacob remove once we migrate the rules in 1.7.x
if (System::getEnv('_APP_RULES_FORMAT') === 'md5') {
@ -367,50 +374,28 @@ class Certificates extends Action
return;
}
/** Trigger Webhook */
$ruleModel = new Rule();
$queueForEvents
->setQueue(Event::WEBHOOK_QUEUE_NAME)
->setClass(Event::WEBHOOK_CLASS_NAME)
->setProject($project)
->setEvent('rules.[ruleId].update')
->setParam('ruleId', $rule->getId())
->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules())))
->trigger();
->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules())));
/** Trigger Webhook */
$queueForWebhooks
->from($queueForEvents)
->trigger();
/** Trigger Functions */
$queueForFunctions
->setProject($project)
->setEvent('rules.[ruleId].update')
->setParam('ruleId', $rule->getId())
->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules())))
->from($queueForEvents)
->trigger();
/** Trigger realtime event */
$allEvents = Event::generateEvents('rules.[ruleId].update', [
'ruleId' => $rule->getId(),
]);
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $rule,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $rule->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
Realtime::send(
projectId: $project->getId(),
payload: $rule->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
/** Trigger Realtime Events */
$queueForRealtime
->from($queueForEvents)
->setSubscribers(['console', $projectId])
->trigger();
}
}
}

View file

@ -2,8 +2,7 @@
namespace Appwrite\Platform\Workers;
use Appwrite\Event\Event;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Event\Realtime;
use Exception;
use Utopia\CLI\Console;
use Utopia\Database\Database;
@ -37,8 +36,9 @@ class Databases extends Action
->inject('project')
->inject('dbForPlatform')
->inject('dbForProject')
->inject('queueForRealtime')
->inject('log')
->callback(fn (Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log) => $this->action($message, $project, $dbForPlatform, $dbForProject, $log));
->callback(fn (Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime, Log $log) => $this->action($message, $project, $dbForPlatform, $dbForProject, $queueForRealtime, $log));
}
/**
@ -46,16 +46,17 @@ class Databases extends Action
* @param Document $project
* @param Database $dbForPlatform
* @param Database $dbForProject
* @param Realtime $queueForRealtime
* @param Log $log
* @return void
* @throws \Exception
*/
public function action(Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log): void
public function action(Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime, Log $log): void
{
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new \Exception('Missing payload');
throw new Exception('Missing payload');
}
$type = $payload['type'];
@ -75,11 +76,11 @@ class Databases extends Action
match (\strval($type)) {
DATABASE_TYPE_DELETE_DATABASE => $this->deleteDatabase($database, $project, $dbForProject),
DATABASE_TYPE_DELETE_COLLECTION => $this->deleteCollection($database, $collection, $project, $dbForProject),
DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject),
DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject),
DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject),
DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject),
default => throw new \Exception('No database operation for type: ' . \strval($type)),
DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime),
DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime),
DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime),
DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime),
default => throw new Exception('No database operation for type: ' . \strval($type)),
};
}
@ -90,13 +91,14 @@ class Databases extends Action
* @param Document $project
* @param Database $dbForPlatform
* @param Database $dbForProject
* @param Realtime $queueForRealtime
* @return void
* @throws Authorization
* @throws Conflict
* @throws \Exception
* @throws \Throwable
*/
private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject): void
private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void
{
if ($collection->isEmpty()) {
throw new Exception('Missing collection');
@ -106,12 +108,7 @@ class Databases extends Action
}
$projectId = $project->getId();
$events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].update', [
'databaseId' => $database->getId(),
'collectionId' => $collection->getId(),
'attributeId' => $attribute->getId()
]);
$event = "databases.[databaseId].collections.[collectionId].attributes.[attributeId].update";
/**
* TODO @christyjacob4 verify if this is still the case
* Fetch attribute from the database, since with Resque float values are loosing informations.
@ -169,7 +166,7 @@ class Databases extends Action
break;
default:
if (!$dbForProject->createAttribute('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key, $type, $size, $required, $default, $signed, $array, $format, $formatOptions, $filters)) {
throw new \Exception('Failed to create Attribute');
throw new Exception('Failed to create Attribute');
}
}
@ -200,7 +197,7 @@ class Databases extends Action
throw $e;
} finally {
$this->trigger($database, $collection, $attribute, $project, $projectId, $events);
$this->trigger($database, $collection, $project, $event, $queueForRealtime, $attribute);
if (! $relatedCollection->isEmpty()) {
$dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $relatedCollection->getId());
@ -217,13 +214,14 @@ class Databases extends Action
* @param Document $project
* @param Database $dbForPlatform
* @param Database $dbForProject
* @param Realtime $queueForRealtime
* @return void
* @throws Authorization
* @throws Conflict
* @throws \Exception
* @throws \Throwable
**/
private function deleteAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject): void
private function deleteAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void
{
if ($collection->isEmpty()) {
throw new Exception('Missing collection');
@ -233,15 +231,9 @@ class Databases extends Action
}
$projectId = $project->getId();
$events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete', [
'databaseId' => $database->getId(),
'collectionId' => $collection->getId(),
'attributeId' => $attribute->getId()
]);
$event = 'databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete';
$collectionId = $collection->getId();
$key = $attribute->getAttribute('key', '');
$status = $attribute->getAttribute('status', '');
$type = $attribute->getAttribute('type', '');
$project = $dbForPlatform->getDocument('projects', $projectId);
$options = $attribute->getAttribute('options', []);
@ -312,7 +304,7 @@ class Databases extends Action
throw $e;
} finally {
$this->trigger($database, $collection, $attribute, $project, $projectId, $events);
$this->trigger($database, $collection, $project, $event, $queueForRealtime, $attribute);
}
// The underlying database removes/rebuilds indexes when attribute is removed
@ -358,7 +350,7 @@ class Databases extends Action
}
if ($exists) { // Delete the duplicate if created, else update in db
$this->deleteIndex($database, $collection, $index, $project, $dbForPlatform, $dbForProject);
$this->deleteIndex($database, $collection, $index, $project, $dbForPlatform, $dbForProject, $queueForRealtime);
} else {
$dbForProject->updateDocument('indexes', $index->getId(), $index);
}
@ -381,6 +373,7 @@ class Databases extends Action
* @param Document $project
* @param Database $dbForPlatform
* @param Database $dbForProject
* @param Realtime $queueForRealtime
* @return void
* @throws Authorization
* @throws Conflict
@ -388,7 +381,7 @@ class Databases extends Action
* @throws DatabaseException
* @throws \Throwable
*/
private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject): void
private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void
{
if ($collection->isEmpty()) {
throw new Exception('Missing collection');
@ -398,12 +391,7 @@ class Databases extends Action
}
$projectId = $project->getId();
$events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].update', [
'databaseId' => $database->getId(),
'collectionId' => $collection->getId(),
'indexId' => $index->getId()
]);
$event = 'databases.[databaseId].collections.[collectionId].indexes.[indexId].update';
$collectionId = $collection->getId();
$key = $index->getAttribute('key', '');
$type = $index->getAttribute('type', '');
@ -430,7 +418,7 @@ class Databases extends Action
throw $e;
} finally {
$this->trigger($database, $collection, $index, $project, $projectId, $events);
$this->trigger($database, $collection, $project, $event, $queueForRealtime, null, $index);
$dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collectionId);
}
}
@ -442,6 +430,7 @@ class Databases extends Action
* @param Document $project
* @param Database $dbForPlatform
* @param Database $dbForProject
* @param Realtime $queueForRealtime
* @return void
* @throws Authorization
* @throws Conflict
@ -449,7 +438,7 @@ class Databases extends Action
* @throws DatabaseException
* @throws \Throwable
*/
private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject): void
private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void
{
if ($collection->isEmpty()) {
throw new Exception('Missing collection');
@ -459,12 +448,7 @@ class Databases extends Action
}
$projectId = $project->getId();
$events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].delete', [
'databaseId' => $database->getId(),
'collectionId' => $collection->getId(),
'indexId' => $index->getId()
]);
$event = 'databases.[databaseId].collections.[collectionId].indexes.[indexId].delete';
$key = $index->getAttribute('key');
$status = $index->getAttribute('status', '');
$project = $dbForPlatform->getDocument('projects', $projectId);
@ -490,7 +474,7 @@ class Databases extends Action
throw $e;
} finally {
$this->trigger($database, $collection, $index, $project, $projectId, $events);
$this->trigger($database, $collection, $project, $event, $queueForRealtime, null, $index);
$dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collection->getId());
}
}
@ -532,7 +516,6 @@ class Databases extends Action
$collectionId = $collection->getId();
$collectionInternalId = $collection->getInternalId();
$databaseId = $database->getId();
$databaseInternalId = $database->getInternalId();
$dbForProject->deleteCollection('database_' . $databaseInternalId . '_collection_' . $collection->getInternalId());
@ -568,21 +551,21 @@ class Databases extends Action
/**
* @param string $collection collectionID
* @param string $collectionId
* @param array $queries
* @param Database $database
* @param callable|null $callback
* @return void
* @throws Exception
*/
protected function deleteByGroup(string $collection, array $queries, Database $database, callable $callback = null): void
protected function deleteByGroup(string $collectionId, array $queries, Database $database, callable $callback = null): void
{
$start = \microtime(true);
try {
$documents = $database->deleteDocuments($collection, $queries);
$documents = $database->deleteDocuments($collectionId, $queries);
} catch (\Throwable $th) {
Console::error('Failed to delete documents for collection ' . $collection . ': ' . $th->getMessage());
Console::error('Failed to delete documents for collection ' . $collectionId . ': ' . $th->getMessage());
return;
}
@ -598,31 +581,42 @@ class Databases extends Action
Console::info("Deleted {$count} documents by group in " . ($end - $start) . " seconds");
}
/**
* @param Document $database
* @param Document $collection
* @param Document $project
* @param Realtime $queueForRealtime
* @param Document|null $attribute
* @param Document|null $index
* @return void
*/
protected function trigger(
Document $database,
Document $collection,
Document $attribute,
Document $project,
string $projectId,
array $events
string $event,
Realtime $queueForRealtime,
Document|null $attribute = null,
Document|null $index = null,
): void {
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $events[0],
payload: $attribute,
project: $project,
);
Realtime::send(
projectId: 'console',
payload: $attribute->getArrayCopy(),
events: $events,
channels: $target['channels'],
roles: $target['roles'],
options: [
'projectId' => $projectId,
'databaseId' => $database->getId(),
'collectionId' => $collection->getId()
]
);
$queueForRealtime
->setProject($project)
->setSubscribers(['console'])
->setEvent($event)
->setParam('databaseId', $database->getId())
->setParam('collectionId', $collection->getId());
if ($attribute !== null && !empty($attribute)) {
$queueForRealtime
->setParam('attributeId', $attribute->getId())
->setPayload($attribute->getArrayCopy());
}
if ($index !== null && !empty($index)) {
$queueForRealtime
->setParam('indexId', $index->getId())
->setPayload($index->getArrayCopy());
}
$queueForRealtime->trigger();
}
}

View file

@ -5,8 +5,9 @@ namespace Appwrite\Platform\Workers;
use Ahc\Jwt\JWT;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Realtime;
use Appwrite\Event\StatsUsage;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Event\Webhook;
use Appwrite\Utopia\Response\Model\Execution;
use Exception;
use Executor\Executor;
@ -44,15 +45,17 @@ class Functions extends Action
->inject('project')
->inject('message')
->inject('dbForProject')
->inject('queueForWebhooks')
->inject('queueForFunctions')
->inject('queueForRealtime')
->inject('queueForEvents')
->inject('queueForStatsUsage')
->inject('log')
->inject('isResourceBlocked')
->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked));
->callback(fn (Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked));
}
public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void
public function action(Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void
{
$payload = $message->getPayload() ?? [];
@ -136,7 +139,9 @@ class Functions extends Action
$this->execute(
log: $log,
dbForProject: $dbForProject,
queueForWebhooks: $queueForWebhooks,
queueForFunctions: $queueForFunctions,
queueForRealtime: $queueForRealtime,
queueForStatsUsage: $queueForStatsUsage,
queueForEvents: $queueForEvents,
project: $project,
@ -176,7 +181,9 @@ class Functions extends Action
$this->execute(
log: $log,
dbForProject: $dbForProject,
queueForWebhooks: $queueForWebhooks,
queueForFunctions: $queueForFunctions,
queueForRealtime: $queueForRealtime,
queueForStatsUsage: $queueForStatsUsage,
queueForEvents: $queueForEvents,
project: $project,
@ -198,7 +205,9 @@ class Functions extends Action
$this->execute(
log: $log,
dbForProject: $dbForProject,
queueForWebhooks: $queueForWebhooks,
queueForFunctions: $queueForFunctions,
queueForRealtime: $queueForRealtime,
queueForStatsUsage: $queueForStatsUsage,
queueForEvents: $queueForEvents,
project: $project,
@ -284,6 +293,7 @@ class Functions extends Action
* @param Log $log
* @param Database $dbForProject
* @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @param StatsUsage $queueForStatsUsage
* @param Event $queueForEvents
* @param Document $project
@ -307,7 +317,9 @@ class Functions extends Action
private function execute(
Log $log,
Database $dbForProject,
Webhook $queueForWebhooks,
Func $queueForFunctions,
Realtime $queueForRealtime,
StatsUsage $queueForStatsUsage,
Event $queueForEvents,
Document $project,
@ -564,20 +576,20 @@ class Functions extends Action
;
}
$execution = $dbForProject->updateDocument('executions', $executionId, $execution);
/** Trigger Webhook */
$executionModel = new Execution();
$queueForEvents
->setQueue(Event::WEBHOOK_QUEUE_NAME)
->setClass(Event::WEBHOOK_CLASS_NAME)
->setProject($project)
->setUser($user)
->setEvent('functions.[functionId].executions.[executionId].update')
->setParam('functionId', $function->getId())
->setParam('executionId', $execution->getId())
->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules())))
->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules())));
/** Trigger Webhook */
$queueForWebhooks
->from($queueForEvents)
->trigger();
/** Trigger Functions */
@ -585,31 +597,11 @@ class Functions extends Action
->from($queueForEvents)
->trigger();
/** Trigger realtime event */
$allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [
'functionId' => $function->getId(),
'executionId' => $execution->getId()
]);
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $execution,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
Realtime::send(
projectId: $project->getId(),
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
/** Trigger Realtime Events */
$queueForRealtime
->from($queueForEvents)
->setSubscribers(['console', $project->getId()])
->trigger();
if (!empty($error)) {
throw new Exception($error, $errorCode);

View file

@ -3,8 +3,7 @@
namespace Appwrite\Platform\Workers;
use Ahc\Jwt\JWT;
use Appwrite\Event\Event;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Event\Realtime;
use Exception;
use Utopia\CLI\Console;
use Utopia\Config\Config;
@ -54,13 +53,14 @@ class Migrations extends Action
->inject('dbForProject')
->inject('dbForPlatform')
->inject('logError')
->callback(fn (Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError) => $this->action($message, $project, $dbForProject, $dbForPlatform, $logError));
->inject('queueForRealtime')
->callback(fn (Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError, Realtime $queueForRealtime) => $this->action($message, $project, $dbForProject, $dbForPlatform, $logError, $queueForRealtime));
}
/**
* @throws Exception
*/
public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError): void
public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError, Realtime $queueForRealtime): void
{
$payload = $message->getPayload() ?? [];
@ -87,7 +87,7 @@ class Migrations extends Action
return;
}
$this->processMigration($migration);
$this->processMigration($migration, $queueForRealtime);
}
/**
@ -155,34 +155,16 @@ class Migrations extends Action
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function updateMigrationDocument(Document $migration, Document $project): Document
protected function updateMigrationDocument(Document $migration, Document $project, Realtime $queueForRealtime): Document
{
/** Trigger Realtime */
$allEvents = Event::generateEvents('migrations.[migrationId].update', [
'migrationId' => $migration->getId(),
]);
$target = Realtime::fromPayload(
event: $allEvents[0],
payload: $migration,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $migration->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles'],
);
Realtime::send(
projectId: $project->getId(),
payload: $migration->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles'],
);
/** Trigger Realtime Events */
$queueForRealtime
->setProject($project)
->setSubscribers(['console', $project->getId()])
->setEvent('migrations.[migrationId].update')
->setParam('migrationId', $migration->getId())
->setPayload($migration->getArrayCopy())
->trigger();
return $this->dbForProject->updateDocument('migrations', $migration->getId(), $migration);
}
@ -231,7 +213,7 @@ class Migrations extends Action
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function processMigration(Document $migration): void
protected function processMigration(Document $migration, Realtime $queueForRealtime): void
{
$project = $this->project;
$projectDocument = $this->dbForPlatform->getDocument('projects', $project->getId());
@ -255,7 +237,7 @@ class Migrations extends Action
$migration->setAttribute('stage', 'processing');
$migration->setAttribute('status', 'processing');
$this->updateMigrationDocument($migration, $projectDocument);
$this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime);
$source = $this->processSource($migration);
$destination = $this->processDestination($migration, $tempAPIKey);
@ -269,14 +251,14 @@ class Migrations extends Action
/** Start Transfer */
$migration->setAttribute('stage', 'migrating');
$this->updateMigrationDocument($migration, $projectDocument);
$this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime);
$transfer->run(
$migration->getAttribute('resources'),
function () use ($migration, $transfer, $projectDocument) {
function () use ($migration, $transfer, $projectDocument, $queueForRealtime) {
$migration->setAttribute('resourceData', json_encode($transfer->getCache()));
$migration->setAttribute('statusCounters', json_encode($transfer->getStatusCounters()));
$this->updateMigrationDocument($migration, $projectDocument);
$this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime);
},
$migration->getAttribute('resourceId'),
$migration->getAttribute('resourceType')
@ -313,7 +295,7 @@ class Migrations extends Action
}
$migration->setAttribute('errors', $errorMessages);
$this->updateMigrationDocument($migration, $projectDocument);
$this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime);
return;
}
@ -354,7 +336,7 @@ class Migrations extends Action
$migration->setAttribute('errors', $errorMessages);
}
} finally {
$this->updateMigrationDocument($migration, $projectDocument);
$this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime);
if ($migration->getAttribute('status', '') === 'failed') {
Console::error('Migration('.$migration->getInternalId().':'.$migration->getId().') failed, Project('.$this->project->getInternalId().':'.$this->project->getId().')');