diff --git a/src/Appwrite/Platform/Workers/Builds.php b/src/Appwrite/Platform/Workers/Builds.php index e7cbbd5088..f3f733f5eb 100644 --- a/src/Appwrite/Platform/Workers/Builds.php +++ b/src/Appwrite/Platform/Workers/Builds.php @@ -5,8 +5,8 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; use Appwrite\Event\Event; use Appwrite\Event\Func; +use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; -use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Utopia\Response\Model\Deployment; use Appwrite\Vcs\Comment; use Exception; @@ -50,12 +50,13 @@ class Builds extends Action ->inject('dbForPlatform') ->inject('queueForEvents') ->inject('queueForFunctions') + ->inject('queueForRealtime') ->inject('queueForStatsUsage') ->inject('cache') ->inject('dbForProject') ->inject('deviceForFunctions') ->inject('log') - ->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log) => $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $deviceForFunctions, $log)); + ->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log) => $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime, $usage, $cache, $dbForProject, $deviceForFunctions, $log)); } /** @@ -64,6 +65,7 @@ class Builds extends Action * @param Database $dbForPlatform * @param Event $queueForEvents * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @param StatsUsage $queueForStatsUsage * @param Cache $cache * @param Database $dbForProject @@ -72,7 +74,7 @@ class Builds extends Action * @return void * @throws \Utopia\Database\Exception */ - public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log): void + public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log): void { $payload = $message->getPayload() ?? []; @@ -93,7 +95,7 @@ class Builds extends Action case BUILD_TYPE_RETRY: Console::info('Creating build for deployment: ' . $deployment->getId()); $github = new GitHub($cache); - $this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $log); + $this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $log); break; default: @@ -104,6 +106,7 @@ class Builds extends Action /** * @param Device $deviceForFunctions * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @param Event $queueForEvents * @param StatsUsage $queueForStatsUsage * @param Database $dbForPlatform @@ -118,7 +121,7 @@ class Builds extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void + protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void { $executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST')); @@ -152,10 +155,7 @@ class Builds extends Action } // Realtime preparation - $allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [ - 'functionId' => $function->getId(), - 'deploymentId' => $deployment->getId() - ]); + $event = "functions.[functionId].deployments.[deploymentId].update"; $startTime = DateTime::now(); $durationStart = \microtime(true); @@ -369,21 +369,16 @@ class Builds extends Action $deployment = $dbForProject->updateDocument('deployments', $deployment->getId(), $deployment); /** - * Send realtime Event + * Trigger Realtime Event */ - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $build, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $build->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + $queueForRealtime + ->setProject($project) + ->setProjectId('console') + ->setEvent($event) + ->setParam('functionId', $function->getId()) + ->setParam('deploymentId', $deployment->getId()) + ->setPayload($build->getArrayCopy()) + ->trigger(); } $tmpPath = '/tmp/builds/' . $buildId; @@ -449,21 +444,15 @@ class Builds extends Action ->from($deploymentUpdate) ->trigger(); - /** Trigger Realtime */ - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $build, - project: $project - ); - - Realtime::send( - projectId: 'console', - payload: $build->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + /** Trigger Realtime Event */ + $queueForRealtime + ->setProject($project) + ->setProjectId('console') + ->setEvent($event) + ->setParam('functionId', $function->getId()) + ->setParam('deploymentId', $deployment->getId()) + ->setPayload($build->getArrayCopy()) + ->trigger(); $vars = []; @@ -556,12 +545,12 @@ class Builds extends Action $err = $error; } }), - Co\go(function () use ($executor, $project, $deployment, &$response, &$build, $dbForProject, $allEvents, &$err, &$isCanceled) { + Co\go(function () use ($executor, $project, $function, $deployment, &$response, &$build, $dbForProject, $event, &$err, $queueForRealtime, &$isCanceled) { try { $executor->getLogs( deploymentId: $deployment->getId(), projectId: $project->getId(), - callback: function ($logs) use (&$response, &$err, &$build, $dbForProject, $allEvents, $project, &$isCanceled) { + callback: function ($logs) use (&$response, &$err, &$build, $dbForProject, $event, $project, $function, $deployment, $queueForRealtime, &$isCanceled) { if ($isCanceled) { return; } @@ -586,21 +575,16 @@ class Builds extends Action $build = $dbForProject->updateDocument('builds', $build->getId(), $build); /** - * Send realtime Event + * Trigger Realtime Event */ - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $build, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $build->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + $queueForRealtime + ->setProject($project) + ->setProjectId('console') + ->setEvent($event) + ->setParam('functionId', $function->getId()) + ->setParam('deploymentId', $deployment->getId()) + ->setPayload($build->getArrayCopy()) + ->trigger(); } } ); @@ -688,21 +672,16 @@ class Builds extends Action } } finally { /** - * Send realtime Event + * Trigger Realtime Event */ - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $build, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $build->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + $queueForRealtime + ->setProject($project) + ->setProjectId('console') + ->setEvent($event) + ->setParam('functionId', $function->getId()) + ->setParam('deploymentId', $deployment->getId()) + ->setPayload($build->getArrayCopy()) + ->trigger(); /** Trigger usage queue */ if ($build->getAttribute('status') === 'ready') {