chore: refactor realtime queueing in functions worker

This commit is contained in:
Chirag Aggarwal 2025-02-11 05:50:30 +00:00
parent f666e18154
commit 532160705f

View file

@ -5,8 +5,8 @@ namespace Appwrite\Platform\Workers;
use Ahc\Jwt\JWT; use Ahc\Jwt\JWT;
use Appwrite\Event\Event; use Appwrite\Event\Event;
use Appwrite\Event\Func; use Appwrite\Event\Func;
use Appwrite\Event\Realtime;
use Appwrite\Event\StatsUsage; use Appwrite\Event\StatsUsage;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Utopia\Response\Model\Execution; use Appwrite\Utopia\Response\Model\Execution;
use Exception; use Exception;
use Executor\Executor; use Executor\Executor;
@ -45,14 +45,15 @@ class Functions extends Action
->inject('message') ->inject('message')
->inject('dbForProject') ->inject('dbForProject')
->inject('queueForFunctions') ->inject('queueForFunctions')
->inject('queueForRealtime')
->inject('queueForEvents') ->inject('queueForEvents')
->inject('queueForStatsUsage') ->inject('queueForStatsUsage')
->inject('log') ->inject('log')
->inject('isResourceBlocked') ->inject('isResourceBlocked')
->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked)); ->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked));
} }
public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void
{ {
$payload = $message->getPayload() ?? []; $payload = $message->getPayload() ?? [];
@ -137,6 +138,7 @@ class Functions extends Action
log: $log, log: $log,
dbForProject: $dbForProject, dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions, queueForFunctions: $queueForFunctions,
queueForRealtime: $queueForRealtime,
queueForStatsUsage: $queueForStatsUsage, queueForStatsUsage: $queueForStatsUsage,
queueForEvents: $queueForEvents, queueForEvents: $queueForEvents,
project: $project, project: $project,
@ -177,6 +179,7 @@ class Functions extends Action
log: $log, log: $log,
dbForProject: $dbForProject, dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions, queueForFunctions: $queueForFunctions,
queueForRealtime: $queueForRealtime,
queueForStatsUsage: $queueForStatsUsage, queueForStatsUsage: $queueForStatsUsage,
queueForEvents: $queueForEvents, queueForEvents: $queueForEvents,
project: $project, project: $project,
@ -199,6 +202,7 @@ class Functions extends Action
log: $log, log: $log,
dbForProject: $dbForProject, dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions, queueForFunctions: $queueForFunctions,
queueForRealtime: $queueForRealtime,
queueForStatsUsage: $queueForStatsUsage, queueForStatsUsage: $queueForStatsUsage,
queueForEvents: $queueForEvents, queueForEvents: $queueForEvents,
project: $project, project: $project,
@ -284,6 +288,7 @@ class Functions extends Action
* @param Log $log * @param Log $log
* @param Database $dbForProject * @param Database $dbForProject
* @param Func $queueForFunctions * @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @param StatsUsage $queueForStatsUsage * @param StatsUsage $queueForStatsUsage
* @param Event $queueForEvents * @param Event $queueForEvents
* @param Document $project * @param Document $project
@ -308,6 +313,7 @@ class Functions extends Action
Log $log, Log $log,
Database $dbForProject, Database $dbForProject,
Func $queueForFunctions, Func $queueForFunctions,
Realtime $queueForRealtime,
StatsUsage $queueForStatsUsage, StatsUsage $queueForStatsUsage,
Event $queueForEvents, Event $queueForEvents,
Document $project, Document $project,
@ -564,20 +570,21 @@ class Functions extends Action
; ;
} }
$execution = $dbForProject->updateDocument('executions', $executionId, $execution); $execution = $dbForProject->updateDocument('executions', $executionId, $execution);
/** Trigger Webhook */
$executionModel = new Execution(); $executionModel = new Execution();
$queueForEvents $queueForEvents
->setQueue(Event::WEBHOOK_QUEUE_NAME)
->setClass(Event::WEBHOOK_CLASS_NAME)
->setProject($project) ->setProject($project)
->setUser($user) ->setUser($user)
->setEvent('functions.[functionId].executions.[executionId].update') ->setEvent('functions.[functionId].executions.[executionId].update')
->setParam('functionId', $function->getId()) ->setParam('functionId', $function->getId())
->setParam('executionId', $execution->getId()) ->setParam('executionId', $execution->getId())
->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))) ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules())));
/** Trigger Webhook */
$queueForEvents
->setQueue(Event::WEBHOOK_QUEUE_NAME)
->setClass(Event::WEBHOOK_CLASS_NAME)
->trigger(); ->trigger();
/** Trigger Functions */ /** Trigger Functions */
@ -585,31 +592,16 @@ class Functions extends Action
->from($queueForEvents) ->from($queueForEvents)
->trigger(); ->trigger();
/** Trigger realtime event */ /** Trigger Realtime Events */
$allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [ $queueForRealtime
'functionId' => $function->getId(), ->from($queueForEvents)
'executionId' => $execution->getId() ->setProjectId('console')
]); ->trigger();
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern $queueForRealtime
event: $allEvents[0], ->from($queueForEvents)
payload: $execution, ->setProjectId($project->getId())
project: $project ->trigger();
);
Realtime::send(
projectId: 'console',
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
Realtime::send(
projectId: $project->getId(),
payload: $execution->getArrayCopy(),
events: $allEvents,
channels: $target['channels'],
roles: $target['roles']
);
if (!empty($error)) { if (!empty($error)) {
throw new Exception($error, $errorCode); throw new Exception($error, $errorCode);