diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index cdaf062788..583468f6c1 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -6,13 +6,14 @@ use Appwrite\Event\Build; use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Func; +use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; use Appwrite\Event\Validator\FunctionEvent; +use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Functions\Validator\Headers; use Appwrite\Functions\Validator\RuntimeSpecification; -use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Platform\Tasks\ScheduleExecutions; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -194,9 +195,12 @@ App::post('/v1/functions') ->inject('user') ->inject('queueForEvents') ->inject('queueForBuilds') + ->inject('queueForWebhooks') + ->inject('queueForFunctions') + ->inject('queueForRealtime') ->inject('dbForPlatform') ->inject('gitHub') - ->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, callable $timelimit, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Database $dbForPlatform, GitHub $github) use ($redeployVcs) { + ->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, callable $timelimit, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Database $dbForPlatform, GitHub $github) use ($redeployVcs) { $functionId = ($functionId == 'unique()') ? ID::unique() : $functionId; // Temporary abuse check @@ -386,51 +390,29 @@ App::post('/v1/functions') ])) ); - /** Trigger Webhook */ $ruleModel = new Rule(); $ruleCreate = $queueForEvents - ->setClass(Event::WEBHOOK_CLASS_NAME) - ->setQueue(Event::WEBHOOK_QUEUE_NAME); + ->setProject($project) + ->setEvent('rules.[ruleId].create') + ->setParam('ruleId', $rule->getId()) + ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))); - $ruleCreate - ->setProject($project) - ->setEvent('rules.[ruleId].create') - ->setParam('ruleId', $rule->getId()) - ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))) + /** Trigger Webhook */ + $queueForWebhooks + ->from($ruleCreate) ->trigger(); /** Trigger Functions */ - $ruleCreate - ->setClass(Event::FUNCTIONS_CLASS_NAME) - ->setQueue(Event::FUNCTIONS_QUEUE_NAME) + $queueForFunctions + ->from($ruleCreate) ->trigger(); - /** Trigger realtime event */ - $allEvents = Event::generateEvents('rules.[ruleId].create', [ - 'ruleId' => $rule->getId(), - ]); - - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $rule, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $rule->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); - Realtime::send( - projectId: $project->getId(), - payload: $rule->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + /** Trigger Realtime Events */ + $queueForRealtime + ->from($ruleCreate) + ->setSubscribers(['console', $project->getId()]) + ->trigger(); } $queueForEvents->setParam('functionId', $function->getId()); diff --git a/app/worker.php b/app/worker.php index 605474e9f1..1088a9b6db 100644 --- a/app/worker.php +++ b/app/worker.php @@ -13,12 +13,14 @@ use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; use Appwrite\Event\Migration; +use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; use Appwrite\Event\StatsUsageDump; /** remove */ use Appwrite\Event\Usage; use Appwrite\Event\UsageDump; /** /remove */ +use Appwrite\Event\Webhook; use Appwrite\Platform\Appwrite; use Swoole\Runtime; use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis; @@ -313,10 +315,18 @@ Server::setResource('queueForAudits', function (Publisher $publisher) { return new Audit($publisher); }, ['publisher']); +Server::setResource('queueForWebhooks', function (Publisher $publisher) { + return new Webhook($publisher); +}, ['publisher']); + Server::setResource('queueForFunctions', function (Publisher $publisher) { return new Func($publisher); }, ['publisher']); +Server::setResource('queueForRealtime', function () { + return new Realtime(); +}, []); + Server::setResource('queueForCertificates', function (Publisher $publisher) { return new Certificate($publisher); }, ['publisher']); diff --git a/src/Appwrite/Event/Realtime.php b/src/Appwrite/Event/Realtime.php index f4f00b59d4..28a1bb6a6d 100644 --- a/src/Appwrite/Event/Realtime.php +++ b/src/Appwrite/Event/Realtime.php @@ -7,10 +7,17 @@ use Utopia\Database\Document; class Realtime extends Event { + protected array $subscribers = []; + public function __construct() { } + /** + * Get Realtime payload for this event. + * + * @return array + */ public function getRealtimePayload(): array { $payload = []; @@ -24,6 +31,28 @@ class Realtime extends Event return $payload; } + /** + * Set subscribers for this realtime event. + * + * @param array $subscribers + * @return array + */ + public function setSubscribers(array $subscribers): self + { + $this->subscribers = $subscribers; + return $this; + } + + /** + * Get subscribers for this realtime event. + * + * @return array + */ + public function getSubscribers(): array + { + return $this->subscribers; + } + /** * Execute Event. * @@ -53,17 +82,23 @@ class Realtime extends Event bucket: $bucket, ); - RealtimeAdapter::send( - projectId: $target['projectId'] ?? $this->getProject()->getId(), - payload: $this->getRealtimePayload(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'permissionsChanged' => $target['permissionsChanged'], - 'userId' => $this->getParam('userId') - ] - ); + $projectIds = !empty($this->getSubscribers()) + ? $this->getSubscribers() + : [$target['projectId'] ?? $this->getProject()->getId()]; + + foreach ($projectIds as $projectId) { + RealtimeAdapter::send( + projectId: $projectId, + payload: $this->getRealtimePayload(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'], + options: [ + 'permissionsChanged' => $target['permissionsChanged'], + 'userId' => $this->getParam('userId') + ] + ); + } return true; } diff --git a/src/Appwrite/Platform/Workers/Builds.php b/src/Appwrite/Platform/Workers/Builds.php index c9833adcfb..6f26e9a80c 100644 --- a/src/Appwrite/Platform/Workers/Builds.php +++ b/src/Appwrite/Platform/Workers/Builds.php @@ -5,8 +5,9 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; use Appwrite\Event\Event; use Appwrite\Event\Func; +use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; -use Appwrite\Messaging\Adapter\Realtime; +use Appwrite\Event\Webhook; use Appwrite\Utopia\Response\Model\Deployment; use Appwrite\Vcs\Comment; use Exception; @@ -49,15 +50,17 @@ class Builds extends Action ->inject('project') ->inject('dbForPlatform') ->inject('queueForEvents') + ->inject('queueForWebhooks') ->inject('queueForFunctions') + ->inject('queueForRealtime') ->inject('queueForStatsUsage') ->inject('cache') ->inject('dbForProject') ->inject('deviceForFunctions') ->inject('isResourceBlocked') ->inject('log') - ->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log) => - $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $deviceForFunctions, $isResourceBlocked, $log)); + ->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log) => + $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $usage, $cache, $dbForProject, $deviceForFunctions, $isResourceBlocked, $log)); } /** @@ -65,7 +68,9 @@ class Builds extends Action * @param Document $project * @param Database $dbForPlatform * @param Event $queueForEvents + * @param Webhook $queueForWebhooks * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @param StatsUsage $queueForStatsUsage * @param Cache $cache * @param Database $dbForProject @@ -74,7 +79,7 @@ class Builds extends Action * @return void * @throws \Utopia\Database\Exception */ - public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log): void + public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log): void { $payload = $message->getPayload() ?? []; @@ -95,7 +100,7 @@ class Builds extends Action case BUILD_TYPE_RETRY: Console::info('Creating build for deployment: ' . $deployment->getId()); $github = new GitHub($cache); - $this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $isResourceBlocked, $log); + $this->buildDeployment($deviceForFunctions, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $isResourceBlocked, $log); break; default: @@ -105,7 +110,9 @@ class Builds extends Action /** * @param Device $deviceForFunctions + * @param Webhook $queueForWebhooks * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @param Event $queueForEvents * @param StatsUsage $queueForStatsUsage * @param Database $dbForPlatform @@ -120,7 +127,7 @@ class Builds extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, callable $isResourceBlocked, Log $log): void + protected function buildDeployment(Device $deviceForFunctions, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, callable $isResourceBlocked, Log $log): void { $executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST')); @@ -158,10 +165,7 @@ class Builds extends Action } // Realtime preparation - $allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [ - 'functionId' => $function->getId(), - 'deploymentId' => $deployment->getId() - ]); + $event = "functions.[functionId].deployments.[deploymentId].update"; $startTime = DateTime::now(); $durationStart = \microtime(true); @@ -375,21 +379,16 @@ class Builds extends Action $deployment = $dbForProject->updateDocument('deployments', $deployment->getId(), $deployment); /** - * Send realtime Event + * Trigger Realtime Event */ - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $build, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $build->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + $queueForRealtime + ->setProject($project) + ->setSubscribers(['console']) + ->setEvent($event) + ->setParam('functionId', $function->getId()) + ->setParam('deploymentId', $deployment->getId()) + ->setPayload($build->getArrayCopy()) + ->trigger(); } $tmpPath = '/tmp/builds/' . $buildId; @@ -436,40 +435,34 @@ class Builds extends Action $this->runGitAction('building', $github, $providerCommitHash, $owner, $repositoryName, $project, $function, $deployment->getId(), $dbForProject, $dbForPlatform); } - /** Trigger Webhook */ $deploymentModel = new Deployment(); $deploymentUpdate = $queueForEvents - ->setQueue(Event::WEBHOOK_QUEUE_NAME) - ->setClass(Event::WEBHOOK_CLASS_NAME) ->setProject($project) ->setEvent('functions.[functionId].deployments.[deploymentId].update') ->setParam('functionId', $function->getId()) ->setParam('deploymentId', $deployment->getId()) ->setPayload($deployment->getArrayCopy(array_keys($deploymentModel->getRules()))); - $deploymentUpdate->trigger(); + /** Trigger Webhook */ + $queueForWebhooks + ->from($deploymentUpdate) + ->trigger(); /** Trigger Functions */ $queueForFunctions ->from($deploymentUpdate) ->trigger(); - /** Trigger Realtime */ - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $build, - project: $project - ); - - Realtime::send( - projectId: 'console', - payload: $build->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + /** Trigger Realtime Event */ + $queueForRealtime + ->setProject($project) + ->setSubscribers(['console']) + ->setEvent($event) + ->setParam('functionId', $function->getId()) + ->setParam('deploymentId', $deployment->getId()) + ->setPayload($build->getArrayCopy()) + ->trigger(); $vars = []; @@ -562,12 +555,12 @@ class Builds extends Action $err = $error; } }), - Co\go(function () use ($executor, $project, $deployment, &$response, &$build, $dbForProject, $allEvents, &$err, &$isCanceled) { + Co\go(function () use ($executor, $project, $function, $deployment, &$response, &$build, $dbForProject, $event, &$err, $queueForRealtime, &$isCanceled) { try { $executor->getLogs( deploymentId: $deployment->getId(), projectId: $project->getId(), - callback: function ($logs) use (&$response, &$err, &$build, $dbForProject, $allEvents, $project, &$isCanceled) { + callback: function ($logs) use (&$response, &$err, &$build, $dbForProject, $event, $project, $function, $deployment, $queueForRealtime, &$isCanceled) { if ($isCanceled) { return; } @@ -592,21 +585,16 @@ class Builds extends Action $build = $dbForProject->updateDocument('builds', $build->getId(), $build); /** - * Send realtime Event + * Trigger Realtime Event */ - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $build, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $build->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + $queueForRealtime + ->setProject($project) + ->setSubscribers(['console']) + ->setEvent($event) + ->setParam('functionId', $function->getId()) + ->setParam('deploymentId', $deployment->getId()) + ->setPayload($build->getArrayCopy()) + ->trigger(); } } ); @@ -694,21 +682,16 @@ class Builds extends Action } } finally { /** - * Send realtime Event + * Trigger Realtime Event */ - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $build, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $build->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + $queueForRealtime + ->setProject($project) + ->setSubscribers(['console']) + ->setEvent($event) + ->setParam('functionId', $function->getId()) + ->setParam('deploymentId', $deployment->getId()) + ->setPayload($build->getArrayCopy()) + ->trigger(); /** Trigger usage queue */ if ($build->getAttribute('status') === 'ready') { diff --git a/src/Appwrite/Platform/Workers/Certificates.php b/src/Appwrite/Platform/Workers/Certificates.php index 7e220b2734..093e6fda5a 100644 --- a/src/Appwrite/Platform/Workers/Certificates.php +++ b/src/Appwrite/Platform/Workers/Certificates.php @@ -6,7 +6,8 @@ use Appwrite\Certificates\Adapter as CertificatesAdapter; use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; -use Appwrite\Messaging\Adapter\Realtime; +use Appwrite\Event\Realtime; +use Appwrite\Event\Webhook; use Appwrite\Network\Validator\CNAME; use Appwrite\Template\Template; use Appwrite\Utopia\Response\Model\Rule; @@ -46,12 +47,14 @@ class Certificates extends Action ->inject('dbForPlatform') ->inject('queueForMails') ->inject('queueForEvents') + ->inject('queueForWebhooks') ->inject('queueForFunctions') + ->inject('queueForRealtime') ->inject('log') ->inject('certificates') ->callback( - fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, CertificatesAdapter $certificates) => - $this->action($message, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $log, $certificates) + fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates) => + $this->action($message, $dbForPlatform, $queueForMails, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $log, $certificates) ); } @@ -60,14 +63,16 @@ class Certificates extends Action * @param Database $dbForPlatform * @param Mail $queueForMails * @param Event $queueForEvents + * @param Webhook $queueForWebhooks * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @param Log $log * @param CertificatesAdapter $certificates * @return void * @throws Throwable * @throws \Utopia\Database\Exception */ - public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, CertificatesAdapter $certificates): void + public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates): void { $payload = $message->getPayload() ?? []; @@ -81,7 +86,7 @@ class Certificates extends Action $log->addTag('domain', $domain->get()); - $this->execute($domain, $dbForPlatform, $queueForMails, $queueForEvents, $queueForFunctions, $log, $certificates, $skipRenewCheck); + $this->execute($domain, $dbForPlatform, $queueForMails, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $log, $certificates, $skipRenewCheck); } /** @@ -90,13 +95,14 @@ class Certificates extends Action * @param Mail $queueForMails * @param Event $queueForEvents * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @param CertificatesAdapter $certificates * @param bool $skipRenewCheck * @return void * @throws Throwable * @throws \Utopia\Database\Exception */ - private function execute(Domain $domain, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, CertificatesAdapter $certificates, bool $skipRenewCheck = false): void + private function execute(Domain $domain, Database $dbForPlatform, Mail $queueForMails, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Log $log, CertificatesAdapter $certificates, bool $skipRenewCheck = false): void { /** * 1. Read arguments and validate domain @@ -186,7 +192,7 @@ class Certificates extends Action $certificate->setAttribute('updated', DateTime::now()); // Save all changes we made to certificate document into database - $this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForPlatform, $queueForEvents, $queueForFunctions); + $this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime); } } @@ -199,13 +205,14 @@ class Certificates extends Action * @param Database $dbForPlatform Database connection for console * @param Event $queueForEvents * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @return void * @throws \Utopia\Database\Exception * @throws Authorization * @throws Conflict * @throws Structure */ - private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions): void + private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime): void { // Check if update or insert required $certificateDocument = $dbForPlatform->findOne('certificates', [Query::equal('domain', [$domain])]); @@ -219,7 +226,7 @@ class Certificates extends Action } $certificateId = $certificate->getId(); - $this->updateDomainDocuments($certificateId, $domain, $success, $dbForPlatform, $queueForEvents, $queueForFunctions); + $this->updateDomainDocuments($certificateId, $domain, $success, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime); } /** @@ -338,7 +345,7 @@ class Certificates extends Action * * @return void */ - private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions): void + private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime): void { // TODO: @christyjacob remove once we migrate the rules in 1.7.x if (System::getEnv('_APP_RULES_FORMAT') === 'md5') { @@ -367,50 +374,28 @@ class Certificates extends Action return; } - /** Trigger Webhook */ $ruleModel = new Rule(); $queueForEvents - ->setQueue(Event::WEBHOOK_QUEUE_NAME) - ->setClass(Event::WEBHOOK_CLASS_NAME) ->setProject($project) ->setEvent('rules.[ruleId].update') ->setParam('ruleId', $rule->getId()) - ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))) - ->trigger(); + ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))); + /** Trigger Webhook */ + $queueForWebhooks + ->from($queueForEvents) + ->trigger(); /** Trigger Functions */ $queueForFunctions - ->setProject($project) - ->setEvent('rules.[ruleId].update') - ->setParam('ruleId', $rule->getId()) - ->setPayload($rule->getArrayCopy(array_keys($ruleModel->getRules()))) + ->from($queueForEvents) ->trigger(); - /** Trigger realtime event */ - $allEvents = Event::generateEvents('rules.[ruleId].update', [ - 'ruleId' => $rule->getId(), - ]); - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $rule, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $rule->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); - Realtime::send( - projectId: $project->getId(), - payload: $rule->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + /** Trigger Realtime Events */ + $queueForRealtime + ->from($queueForEvents) + ->setSubscribers(['console', $projectId]) + ->trigger(); } } } diff --git a/src/Appwrite/Platform/Workers/Databases.php b/src/Appwrite/Platform/Workers/Databases.php index 0f400e0107..4abd035599 100644 --- a/src/Appwrite/Platform/Workers/Databases.php +++ b/src/Appwrite/Platform/Workers/Databases.php @@ -2,8 +2,7 @@ namespace Appwrite\Platform\Workers; -use Appwrite\Event\Event; -use Appwrite\Messaging\Adapter\Realtime; +use Appwrite\Event\Realtime; use Exception; use Utopia\CLI\Console; use Utopia\Database\Database; @@ -37,8 +36,9 @@ class Databases extends Action ->inject('project') ->inject('dbForPlatform') ->inject('dbForProject') + ->inject('queueForRealtime') ->inject('log') - ->callback(fn (Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log) => $this->action($message, $project, $dbForPlatform, $dbForProject, $log)); + ->callback(fn (Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime, Log $log) => $this->action($message, $project, $dbForPlatform, $dbForProject, $queueForRealtime, $log)); } /** @@ -46,16 +46,17 @@ class Databases extends Action * @param Document $project * @param Database $dbForPlatform * @param Database $dbForProject + * @param Realtime $queueForRealtime * @param Log $log * @return void * @throws \Exception */ - public function action(Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log): void + public function action(Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime, Log $log): void { $payload = $message->getPayload() ?? []; if (empty($payload)) { - throw new \Exception('Missing payload'); + throw new Exception('Missing payload'); } $type = $payload['type']; @@ -75,11 +76,11 @@ class Databases extends Action match (\strval($type)) { DATABASE_TYPE_DELETE_DATABASE => $this->deleteDatabase($database, $project, $dbForProject), DATABASE_TYPE_DELETE_COLLECTION => $this->deleteCollection($database, $collection, $project, $dbForProject), - DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject), - DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject), - DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject), - DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject), - default => throw new \Exception('No database operation for type: ' . \strval($type)), + DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime), + DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime), + DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime), + DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForPlatform, $dbForProject, $queueForRealtime), + default => throw new Exception('No database operation for type: ' . \strval($type)), }; } @@ -90,13 +91,14 @@ class Databases extends Action * @param Document $project * @param Database $dbForPlatform * @param Database $dbForProject + * @param Realtime $queueForRealtime * @return void * @throws Authorization * @throws Conflict * @throws \Exception * @throws \Throwable */ - private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject): void + private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void { if ($collection->isEmpty()) { throw new Exception('Missing collection'); @@ -106,12 +108,7 @@ class Databases extends Action } $projectId = $project->getId(); - - $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].update', [ - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId(), - 'attributeId' => $attribute->getId() - ]); + $event = "databases.[databaseId].collections.[collectionId].attributes.[attributeId].update"; /** * TODO @christyjacob4 verify if this is still the case * Fetch attribute from the database, since with Resque float values are loosing informations. @@ -169,7 +166,7 @@ class Databases extends Action break; default: if (!$dbForProject->createAttribute('database_' . $database->getInternalId() . '_collection_' . $collection->getInternalId(), $key, $type, $size, $required, $default, $signed, $array, $format, $formatOptions, $filters)) { - throw new \Exception('Failed to create Attribute'); + throw new Exception('Failed to create Attribute'); } } @@ -200,7 +197,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $attribute, $project, $projectId, $events); + $this->trigger($database, $collection, $project, $event, $queueForRealtime, $attribute); if (! $relatedCollection->isEmpty()) { $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $relatedCollection->getId()); @@ -217,13 +214,14 @@ class Databases extends Action * @param Document $project * @param Database $dbForPlatform * @param Database $dbForProject + * @param Realtime $queueForRealtime * @return void * @throws Authorization * @throws Conflict * @throws \Exception * @throws \Throwable **/ - private function deleteAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject): void + private function deleteAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void { if ($collection->isEmpty()) { throw new Exception('Missing collection'); @@ -233,15 +231,9 @@ class Databases extends Action } $projectId = $project->getId(); - - $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete', [ - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId(), - 'attributeId' => $attribute->getId() - ]); + $event = 'databases.[databaseId].collections.[collectionId].attributes.[attributeId].delete'; $collectionId = $collection->getId(); $key = $attribute->getAttribute('key', ''); - $status = $attribute->getAttribute('status', ''); $type = $attribute->getAttribute('type', ''); $project = $dbForPlatform->getDocument('projects', $projectId); $options = $attribute->getAttribute('options', []); @@ -312,7 +304,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $attribute, $project, $projectId, $events); + $this->trigger($database, $collection, $project, $event, $queueForRealtime, $attribute); } // The underlying database removes/rebuilds indexes when attribute is removed @@ -358,7 +350,7 @@ class Databases extends Action } if ($exists) { // Delete the duplicate if created, else update in db - $this->deleteIndex($database, $collection, $index, $project, $dbForPlatform, $dbForProject); + $this->deleteIndex($database, $collection, $index, $project, $dbForPlatform, $dbForProject, $queueForRealtime); } else { $dbForProject->updateDocument('indexes', $index->getId(), $index); } @@ -381,6 +373,7 @@ class Databases extends Action * @param Document $project * @param Database $dbForPlatform * @param Database $dbForProject + * @param Realtime $queueForRealtime * @return void * @throws Authorization * @throws Conflict @@ -388,7 +381,7 @@ class Databases extends Action * @throws DatabaseException * @throws \Throwable */ - private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject): void + private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void { if ($collection->isEmpty()) { throw new Exception('Missing collection'); @@ -398,12 +391,7 @@ class Databases extends Action } $projectId = $project->getId(); - - $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].update', [ - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId(), - 'indexId' => $index->getId() - ]); + $event = 'databases.[databaseId].collections.[collectionId].indexes.[indexId].update'; $collectionId = $collection->getId(); $key = $index->getAttribute('key', ''); $type = $index->getAttribute('type', ''); @@ -430,7 +418,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $index, $project, $projectId, $events); + $this->trigger($database, $collection, $project, $event, $queueForRealtime, null, $index); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collectionId); } } @@ -442,6 +430,7 @@ class Databases extends Action * @param Document $project * @param Database $dbForPlatform * @param Database $dbForProject + * @param Realtime $queueForRealtime * @return void * @throws Authorization * @throws Conflict @@ -449,7 +438,7 @@ class Databases extends Action * @throws DatabaseException * @throws \Throwable */ - private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject): void + private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForPlatform, Database $dbForProject, Realtime $queueForRealtime): void { if ($collection->isEmpty()) { throw new Exception('Missing collection'); @@ -459,12 +448,7 @@ class Databases extends Action } $projectId = $project->getId(); - - $events = Event::generateEvents('databases.[databaseId].collections.[collectionId].indexes.[indexId].delete', [ - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId(), - 'indexId' => $index->getId() - ]); + $event = 'databases.[databaseId].collections.[collectionId].indexes.[indexId].delete'; $key = $index->getAttribute('key'); $status = $index->getAttribute('status', ''); $project = $dbForPlatform->getDocument('projects', $projectId); @@ -490,7 +474,7 @@ class Databases extends Action throw $e; } finally { - $this->trigger($database, $collection, $index, $project, $projectId, $events); + $this->trigger($database, $collection, $project, $event, $queueForRealtime, null, $index); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collection->getId()); } } @@ -532,7 +516,6 @@ class Databases extends Action $collectionId = $collection->getId(); $collectionInternalId = $collection->getInternalId(); - $databaseId = $database->getId(); $databaseInternalId = $database->getInternalId(); $dbForProject->deleteCollection('database_' . $databaseInternalId . '_collection_' . $collection->getInternalId()); @@ -568,21 +551,21 @@ class Databases extends Action /** - * @param string $collection collectionID + * @param string $collectionId * @param array $queries * @param Database $database * @param callable|null $callback * @return void * @throws Exception */ - protected function deleteByGroup(string $collection, array $queries, Database $database, callable $callback = null): void + protected function deleteByGroup(string $collectionId, array $queries, Database $database, callable $callback = null): void { $start = \microtime(true); try { - $documents = $database->deleteDocuments($collection, $queries); + $documents = $database->deleteDocuments($collectionId, $queries); } catch (\Throwable $th) { - Console::error('Failed to delete documents for collection ' . $collection . ': ' . $th->getMessage()); + Console::error('Failed to delete documents for collection ' . $collectionId . ': ' . $th->getMessage()); return; } @@ -598,31 +581,42 @@ class Databases extends Action Console::info("Deleted {$count} documents by group in " . ($end - $start) . " seconds"); } + /** + * @param Document $database + * @param Document $collection + * @param Document $project + * @param Realtime $queueForRealtime + * @param Document|null $attribute + * @param Document|null $index + * @return void + */ protected function trigger( Document $database, Document $collection, - Document $attribute, Document $project, - string $projectId, - array $events + string $event, + Realtime $queueForRealtime, + Document|null $attribute = null, + Document|null $index = null, ): void { - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $events[0], - payload: $attribute, - project: $project, - ); - Realtime::send( - projectId: 'console', - payload: $attribute->getArrayCopy(), - events: $events, - channels: $target['channels'], - roles: $target['roles'], - options: [ - 'projectId' => $projectId, - 'databaseId' => $database->getId(), - 'collectionId' => $collection->getId() - ] - ); + $queueForRealtime + ->setProject($project) + ->setSubscribers(['console']) + ->setEvent($event) + ->setParam('databaseId', $database->getId()) + ->setParam('collectionId', $collection->getId()); + + if ($attribute !== null && !empty($attribute)) { + $queueForRealtime + ->setParam('attributeId', $attribute->getId()) + ->setPayload($attribute->getArrayCopy()); + } + if ($index !== null && !empty($index)) { + $queueForRealtime + ->setParam('indexId', $index->getId()) + ->setPayload($index->getArrayCopy()); + } + + $queueForRealtime->trigger(); } } diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php index 0a7c39c02f..a7caa3207f 100644 --- a/src/Appwrite/Platform/Workers/Functions.php +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -5,8 +5,9 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; use Appwrite\Event\Event; use Appwrite\Event\Func; +use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; -use Appwrite\Messaging\Adapter\Realtime; +use Appwrite\Event\Webhook; use Appwrite\Utopia\Response\Model\Execution; use Exception; use Executor\Executor; @@ -44,15 +45,17 @@ class Functions extends Action ->inject('project') ->inject('message') ->inject('dbForProject') + ->inject('queueForWebhooks') ->inject('queueForFunctions') + ->inject('queueForRealtime') ->inject('queueForEvents') ->inject('queueForStatsUsage') ->inject('log') ->inject('isResourceBlocked') - ->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked)); + ->callback(fn (Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked)); } - public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void + public function action(Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void { $payload = $message->getPayload() ?? []; @@ -136,7 +139,9 @@ class Functions extends Action $this->execute( log: $log, dbForProject: $dbForProject, + queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, + queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, project: $project, @@ -176,7 +181,9 @@ class Functions extends Action $this->execute( log: $log, dbForProject: $dbForProject, + queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, + queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, project: $project, @@ -198,7 +205,9 @@ class Functions extends Action $this->execute( log: $log, dbForProject: $dbForProject, + queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, + queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, project: $project, @@ -284,6 +293,7 @@ class Functions extends Action * @param Log $log * @param Database $dbForProject * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @param StatsUsage $queueForStatsUsage * @param Event $queueForEvents * @param Document $project @@ -307,7 +317,9 @@ class Functions extends Action private function execute( Log $log, Database $dbForProject, + Webhook $queueForWebhooks, Func $queueForFunctions, + Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Event $queueForEvents, Document $project, @@ -564,20 +576,20 @@ class Functions extends Action ; } - $execution = $dbForProject->updateDocument('executions', $executionId, $execution); - /** Trigger Webhook */ $executionModel = new Execution(); $queueForEvents - ->setQueue(Event::WEBHOOK_QUEUE_NAME) - ->setClass(Event::WEBHOOK_CLASS_NAME) ->setProject($project) ->setUser($user) ->setEvent('functions.[functionId].executions.[executionId].update') ->setParam('functionId', $function->getId()) ->setParam('executionId', $execution->getId()) - ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))) + ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))); + + /** Trigger Webhook */ + $queueForWebhooks + ->from($queueForEvents) ->trigger(); /** Trigger Functions */ @@ -585,31 +597,11 @@ class Functions extends Action ->from($queueForEvents) ->trigger(); - /** Trigger realtime event */ - $allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [ - 'functionId' => $function->getId(), - 'executionId' => $execution->getId() - ]); - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $execution, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $execution->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); - Realtime::send( - projectId: $project->getId(), - payload: $execution->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + /** Trigger Realtime Events */ + $queueForRealtime + ->from($queueForEvents) + ->setSubscribers(['console', $project->getId()]) + ->trigger(); if (!empty($error)) { throw new Exception($error, $errorCode); diff --git a/src/Appwrite/Platform/Workers/Migrations.php b/src/Appwrite/Platform/Workers/Migrations.php index 3bf8e06356..f21a846a0d 100644 --- a/src/Appwrite/Platform/Workers/Migrations.php +++ b/src/Appwrite/Platform/Workers/Migrations.php @@ -3,8 +3,7 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; -use Appwrite\Event\Event; -use Appwrite\Messaging\Adapter\Realtime; +use Appwrite\Event\Realtime; use Exception; use Utopia\CLI\Console; use Utopia\Config\Config; @@ -54,13 +53,14 @@ class Migrations extends Action ->inject('dbForProject') ->inject('dbForPlatform') ->inject('logError') - ->callback(fn (Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError) => $this->action($message, $project, $dbForProject, $dbForPlatform, $logError)); + ->inject('queueForRealtime') + ->callback(fn (Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError, Realtime $queueForRealtime) => $this->action($message, $project, $dbForProject, $dbForPlatform, $logError, $queueForRealtime)); } /** * @throws Exception */ - public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError): void + public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError, Realtime $queueForRealtime): void { $payload = $message->getPayload() ?? []; @@ -87,7 +87,7 @@ class Migrations extends Action return; } - $this->processMigration($migration); + $this->processMigration($migration, $queueForRealtime); } /** @@ -155,34 +155,16 @@ class Migrations extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function updateMigrationDocument(Document $migration, Document $project): Document + protected function updateMigrationDocument(Document $migration, Document $project, Realtime $queueForRealtime): Document { - /** Trigger Realtime */ - $allEvents = Event::generateEvents('migrations.[migrationId].update', [ - 'migrationId' => $migration->getId(), - ]); - - $target = Realtime::fromPayload( - event: $allEvents[0], - payload: $migration, - project: $project - ); - - Realtime::send( - projectId: 'console', - payload: $migration->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'], - ); - - Realtime::send( - projectId: $project->getId(), - payload: $migration->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'], - ); + /** Trigger Realtime Events */ + $queueForRealtime + ->setProject($project) + ->setSubscribers(['console', $project->getId()]) + ->setEvent('migrations.[migrationId].update') + ->setParam('migrationId', $migration->getId()) + ->setPayload($migration->getArrayCopy()) + ->trigger(); return $this->dbForProject->updateDocument('migrations', $migration->getId(), $migration); } @@ -231,7 +213,7 @@ class Migrations extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function processMigration(Document $migration): void + protected function processMigration(Document $migration, Realtime $queueForRealtime): void { $project = $this->project; $projectDocument = $this->dbForPlatform->getDocument('projects', $project->getId()); @@ -255,7 +237,7 @@ class Migrations extends Action $migration->setAttribute('stage', 'processing'); $migration->setAttribute('status', 'processing'); - $this->updateMigrationDocument($migration, $projectDocument); + $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); $source = $this->processSource($migration); $destination = $this->processDestination($migration, $tempAPIKey); @@ -269,14 +251,14 @@ class Migrations extends Action /** Start Transfer */ $migration->setAttribute('stage', 'migrating'); - $this->updateMigrationDocument($migration, $projectDocument); + $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); $transfer->run( $migration->getAttribute('resources'), - function () use ($migration, $transfer, $projectDocument) { + function () use ($migration, $transfer, $projectDocument, $queueForRealtime) { $migration->setAttribute('resourceData', json_encode($transfer->getCache())); $migration->setAttribute('statusCounters', json_encode($transfer->getStatusCounters())); - $this->updateMigrationDocument($migration, $projectDocument); + $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); }, $migration->getAttribute('resourceId'), $migration->getAttribute('resourceType') @@ -313,7 +295,7 @@ class Migrations extends Action } $migration->setAttribute('errors', $errorMessages); - $this->updateMigrationDocument($migration, $projectDocument); + $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); return; } @@ -354,7 +336,7 @@ class Migrations extends Action $migration->setAttribute('errors', $errorMessages); } } finally { - $this->updateMigrationDocument($migration, $projectDocument); + $this->updateMigrationDocument($migration, $projectDocument, $queueForRealtime); if ($migration->getAttribute('status', '') === 'failed') { Console::error('Migration('.$migration->getInternalId().':'.$migration->getId().') failed, Project('.$this->project->getInternalId().':'.$this->project->getId().')');