chore: refactor realtime queueing in builds worker

This commit is contained in:
Chirag Aggarwal 2025-02-11 06:06:43 +00:00
parent 532160705f
commit 5076303c33

View file

@ -5,8 +5,8 @@ namespace Appwrite\Platform\Workers;
use Ahc\Jwt\JWT;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Realtime;
use Appwrite\Event\StatsUsage;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Utopia\Response\Model\Deployment;
use Appwrite\Vcs\Comment;
use Exception;
@ -50,12 +50,13 @@ class Builds extends Action
->inject('dbForPlatform')
->inject('queueForEvents')
->inject('queueForFunctions')
->inject('queueForRealtime')
->inject('queueForStatsUsage')
->inject('cache')
->inject('dbForProject')
->inject('deviceForFunctions')
->inject('log')
->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log) => $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $deviceForFunctions, $log));
->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log) => $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForFunctions, $queueForRealtime, $usage, $cache, $dbForProject, $deviceForFunctions, $log));
}
/**
@ -64,6 +65,7 @@ class Builds extends Action
* @param Database $dbForPlatform
* @param Event $queueForEvents
* @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @param StatsUsage $queueForStatsUsage
* @param Cache $cache
* @param Database $dbForProject
@ -72,7 +74,7 @@ class Builds extends Action
* @return void
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log): void
public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -93,7 +95,7 @@ class Builds extends Action
case BUILD_TYPE_RETRY:
Console::info('Creating build for deployment: ' . $deployment->getId());
$github = new GitHub($cache);
$this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $log);
$this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $log);
break;
default:
@ -104,6 +106,7 @@ class Builds extends Action
/**
* @param Device $deviceForFunctions
* @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @param Event $queueForEvents
* @param StatsUsage $queueForStatsUsage
* @param Database $dbForPlatform
@ -118,7 +121,7 @@ class Builds extends Action
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void
protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void
{
$executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST'));
@ -152,10 +155,7 @@ class Builds extends Action
}
// Realtime preparation
$allEvents = Event::generateEvents('functions.[functionId].deployments.[deploymentId].update', [
'functionId' => $function->getId(),
'deploymentId' => $deployment->getId()
]);
$event = "functions.[functionId].deployments.[deploymentId].update";
$startTime = DateTime::now();
$durationStart = \microtime(true);
@ -369,21 +369,16 @@ class Builds extends Action
$deployment = $dbForProject->updateDocument('deployments', $deployment->getId(), $deployment);
/**
* Send realtime Event
* Trigger Realtime Event
*/
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $build,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $build->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
$queueForRealtime
->setProject($project)
->setProjectId('console')
->setEvent($event)
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId())
->setPayload($build->getArrayCopy())
->trigger();
}
$tmpPath = '/tmp/builds/' . $buildId;
@ -449,21 +444,15 @@ class Builds extends Action
->from($deploymentUpdate)
->trigger();
/** Trigger Realtime */
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $build,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $build->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
/** Trigger Realtime Event */
$queueForRealtime
->setProject($project)
->setProjectId('console')
->setEvent($event)
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId())
->setPayload($build->getArrayCopy())
->trigger();
$vars = [];
@ -556,12 +545,12 @@ class Builds extends Action
$err = $error;
}
}),
Co\go(function () use ($executor, $project, $deployment, &$response, &$build, $dbForProject, $allEvents, &$err, &$isCanceled) {
Co\go(function () use ($executor, $project, $function, $deployment, &$response, &$build, $dbForProject, $event, &$err, $queueForRealtime, &$isCanceled) {
try {
$executor->getLogs(
deploymentId: $deployment->getId(),
projectId: $project->getId(),
callback: function ($logs) use (&$response, &$err, &$build, $dbForProject, $allEvents, $project, &$isCanceled) {
callback: function ($logs) use (&$response, &$err, &$build, $dbForProject, $event, $project, $function, $deployment, $queueForRealtime, &$isCanceled) {
if ($isCanceled) {
return;
}
@ -586,21 +575,16 @@ class Builds extends Action
$build = $dbForProject->updateDocument('builds', $build->getId(), $build);
/**
* Send realtime Event
* Trigger Realtime Event
*/
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $build,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $build->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
$queueForRealtime
->setProject($project)
->setProjectId('console')
->setEvent($event)
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId())
->setPayload($build->getArrayCopy())
->trigger();
}
}
);
@ -688,21 +672,16 @@ class Builds extends Action
}
} finally {
/**
* Send realtime Event
* Trigger Realtime Event
*/
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $build,
project: $project
);
Realtime::send(
projectId: 'console',
payload: $build->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
$queueForRealtime
->setProject($project)
->setProjectId('console')
->setEvent($event)
->setParam('functionId', $function->getId())
->setParam('deploymentId', $deployment->getId())
->setPayload($build->getArrayCopy())
->trigger();
/** Trigger usage queue */
if ($build->getAttribute('status') === 'ready') {