diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php index 0a7c39c02f..f485db8648 100644 --- a/src/Appwrite/Platform/Workers/Functions.php +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -5,8 +5,8 @@ namespace Appwrite\Platform\Workers; use Ahc\Jwt\JWT; use Appwrite\Event\Event; use Appwrite\Event\Func; +use Appwrite\Event\Realtime; use Appwrite\Event\StatsUsage; -use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Utopia\Response\Model\Execution; use Exception; use Executor\Executor; @@ -45,14 +45,15 @@ class Functions extends Action ->inject('message') ->inject('dbForProject') ->inject('queueForFunctions') + ->inject('queueForRealtime') ->inject('queueForEvents') ->inject('queueForStatsUsage') ->inject('log') ->inject('isResourceBlocked') - ->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked)); + ->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked)); } - public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void + public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void { $payload = $message->getPayload() ?? []; @@ -137,6 +138,7 @@ class Functions extends Action log: $log, dbForProject: $dbForProject, queueForFunctions: $queueForFunctions, + queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, project: $project, @@ -177,6 +179,7 @@ class Functions extends Action log: $log, dbForProject: $dbForProject, queueForFunctions: $queueForFunctions, + queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, project: $project, @@ -199,6 +202,7 @@ class Functions extends Action log: $log, dbForProject: $dbForProject, queueForFunctions: $queueForFunctions, + queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, project: $project, @@ -284,6 +288,7 @@ class Functions extends Action * @param Log $log * @param Database $dbForProject * @param Func $queueForFunctions + * @param Realtime $queueForRealtime * @param StatsUsage $queueForStatsUsage * @param Event $queueForEvents * @param Document $project @@ -308,6 +313,7 @@ class Functions extends Action Log $log, Database $dbForProject, Func $queueForFunctions, + Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Event $queueForEvents, Document $project, @@ -564,20 +570,21 @@ class Functions extends Action ; } - $execution = $dbForProject->updateDocument('executions', $executionId, $execution); - /** Trigger Webhook */ $executionModel = new Execution(); $queueForEvents - ->setQueue(Event::WEBHOOK_QUEUE_NAME) - ->setClass(Event::WEBHOOK_CLASS_NAME) ->setProject($project) ->setUser($user) ->setEvent('functions.[functionId].executions.[executionId].update') ->setParam('functionId', $function->getId()) ->setParam('executionId', $execution->getId()) - ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))) + ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))); + + /** Trigger Webhook */ + $queueForEvents + ->setQueue(Event::WEBHOOK_QUEUE_NAME) + ->setClass(Event::WEBHOOK_CLASS_NAME) ->trigger(); /** Trigger Functions */ @@ -585,31 +592,16 @@ class Functions extends Action ->from($queueForEvents) ->trigger(); - /** Trigger realtime event */ - $allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [ - 'functionId' => $function->getId(), - 'executionId' => $execution->getId() - ]); - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $execution, - project: $project - ); - Realtime::send( - projectId: 'console', - payload: $execution->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); - Realtime::send( - projectId: $project->getId(), - payload: $execution->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); + /** Trigger Realtime Events */ + $queueForRealtime + ->from($queueForEvents) + ->setProjectId('console') + ->trigger(); + + $queueForRealtime + ->from($queueForEvents) + ->setProjectId($project->getId()) + ->trigger(); if (!empty($error)) { throw new Exception($error, $errorCode);