From 5abe9ad73cfb46a5120ea779aa73e312643202de Mon Sep 17 00:00:00 2001 From: Christy Jacob Date: Wed, 16 Nov 2022 10:10:34 +0530 Subject: [PATCH] feat: refactor execute function to a resource --- app/workers/functions.php | 503 ++++++++++++++++++++------------------ 1 file changed, 271 insertions(+), 232 deletions(-) diff --git a/app/workers/functions.php b/app/workers/functions.php index 67fb716e54..a34f0aec99 100644 --- a/app/workers/functions.php +++ b/app/workers/functions.php @@ -32,243 +32,246 @@ global $workerNumber; $adapter = new Swoole($connection, $workerNumber, Event::FUNCTIONS_QUEUE_NAME); $server = new Server($adapter); -$execute = function ( - Document $project, - Document $function, - Database $dbForProject, - Func $functions, - string $trigger, - string $executionId = null, - string $event = null, - string $eventData = null, - string $data = null, - ?Document $user = null, - string $jwt = null, - Client $statsd -) { +Server::setResource('execute', function () { + return function ( + Document $project, + Document $function, + Database $dbForProject, + Func $functions, + string $trigger, + string $executionId = null, + string $event = null, + string $eventData = null, + string $data = null, + ?Document $user = null, + string $jwt = null, + Client $statsd + ) { - $user ??= new Document(); - $functionId = $function->getId(); - $deploymentId = $function->getAttribute('deployment', ''); - var_dump("Deployment ID : ", $deploymentId); - - /** Check if deployment exists */ - $deployment = $dbForProject->getDocument('deployments', $deploymentId); - - if ($deployment->getAttribute('resourceId') !== $functionId) { - throw new Exception('Deployment not found. Create deployment before trying to execute a function'); - } - - if ($deployment->isEmpty()) { - throw new Exception('Deployment not found. Create deployment before trying to execute a function'); - } - - /** Check if build has exists */ - $build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', '')); - if ($build->isEmpty()) { - throw new Exception('Build not found'); - } - - if ($build->getAttribute('status') !== 'ready') { - throw new Exception('Build not ready'); - } - - /** Check if runtime is supported */ - $runtimes = Config::getParam('runtimes', []); - - if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) { - throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); - } - - $runtime = $runtimes[$function->getAttribute('runtime')]; - - /** Create execution or update execution status */ - $execution = $dbForProject->getDocument('executions', $executionId ?? ''); - if ($execution->isEmpty()) { - $executionId = ID::unique(); - $execution = $dbForProject->createDocument('executions', new Document([ - '$id' => $executionId, - '$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))], - 'functionId' => $functionId, - 'deploymentId' => $deploymentId, - 'trigger' => $trigger, - 'status' => 'waiting', - 'statusCode' => 0, - 'response' => '', - 'stderr' => '', - 'duration' => 0.0, - 'search' => implode(' ', [$functionId, $executionId]), - ])); - - if ($execution->isEmpty()) { - throw new Exception('Failed to create or read execution'); + $user ??= new Document(); + $functionId = $function->getId(); + $deploymentId = $function->getAttribute('deployment', ''); + var_dump("Deployment ID : ", $deploymentId); + + /** Check if deployment exists */ + $deployment = $dbForProject->getDocument('deployments', $deploymentId); + + if ($deployment->getAttribute('resourceId') !== $functionId) { + throw new Exception('Deployment not found. Create deployment before trying to execute a function'); } - } - $execution->setAttribute('status', 'processing'); - $execution = $dbForProject->updateDocument('executions', $executionId, $execution); - - if ($build->getAttribute('status') !== 'ready') { - throw new Exception('Build not ready'); - } - - /** Check if runtime is supported */ - $runtimes = Config::getParam('runtimes', []); - - if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) { - throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); - } - - $runtime = $runtimes[$function->getAttribute('runtime')]; - - /** Create execution or update execution status */ - $execution = $dbForProject->getDocument('executions', $executionId ?? ''); - if ($execution->isEmpty()) { - $executionId = ID::unique(); - $execution = $dbForProject->createDocument('executions', new Document([ - '$id' => $executionId, - '$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))], - 'functionId' => $functionId, - 'deploymentId' => $deploymentId, - 'trigger' => $trigger, - 'status' => 'waiting', - 'statusCode' => 0, - 'response' => '', - 'stderr' => '', - 'duration' => 0.0, - 'search' => implode(' ', [$functionId, $executionId]), - ])); - - if ($execution->isEmpty()) { - throw new Exception('Failed to create or read execution'); + + if ($deployment->isEmpty()) { + throw new Exception('Deployment not found. Create deployment before trying to execute a function'); } - } - $execution->setAttribute('status', 'processing'); - $execution = $dbForProject->updateDocument('executions', $executionId, $execution); - - $vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) { - $carry[$var->getAttribute('key')] = $var->getAttribute('value'); - return $carry; - }, []); - - /** Collect environment variables */ - $vars = \array_merge($vars, [ - 'APPWRITE_FUNCTION_ID' => $functionId, - 'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name'), - 'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId, - 'APPWRITE_FUNCTION_TRIGGER' => $trigger, - 'APPWRITE_FUNCTION_PROJECT_ID' => $project->getId(), - 'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'] ?? '', - 'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'] ?? '', - 'APPWRITE_FUNCTION_EVENT' => $event ?? '', - 'APPWRITE_FUNCTION_EVENT_DATA' => $eventData ?? '', - 'APPWRITE_FUNCTION_DATA' => $data ?? '', - 'APPWRITE_FUNCTION_USER_ID' => $user->getId() ?? '', - 'APPWRITE_FUNCTION_JWT' => $jwt ?? '', - ]); - - /** Execute function */ - $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); - try { - $executionResponse = $executor->createExecution( - projectId: $project->getId(), - deploymentId: $deployment->getId(), - payload: $vars['APPWRITE_FUNCTION_DATA'] ?? '', - variables: $vars, - timeout: $function->getAttribute('timeout', 0), - image: $runtime['image'], - source: $build->getAttribute('outputPath', ''), - entrypoint: $deployment->getAttribute('entrypoint', ''), - ); - - /** Update execution status */ - $execution - ->setAttribute('status', $executionResponse['status']) - ->setAttribute('statusCode', $executionResponse['statusCode']) - ->setAttribute('response', $executionResponse['response']) - ->setAttribute('stdout', $executionResponse['stdout']) - ->setAttribute('stderr', $executionResponse['stderr']) - ->setAttribute('duration', $executionResponse['duration']); - } catch (\Throwable $th) { - $interval = (new \DateTime())->diff(new \DateTime($execution->getCreatedAt())); - $execution - ->setAttribute('duration', (float)$interval->format('%s.%f')) - ->setAttribute('status', 'failed') - ->setAttribute('statusCode', $th->getCode()) - ->setAttribute('stderr', $th->getMessage()); - Console::error($th->getMessage()); - } - - $execution = $dbForProject->updateDocument('executions', $executionId, $execution); - - /** Trigger Webhook */ - $executionModel = new Execution(); - $executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME); - $executionUpdate - ->setProject($project) - ->setUser($user) - ->setEvent('functions.[functionId].executions.[executionId].update') - ->setParam('functionId', $function->getId()) - ->setParam('executionId', $execution->getId()) - ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))) - ->trigger(); - - /** Trigger Functions */ - $functions - ->setData($data ?? '') - ->setProject($project) - ->setUser($user) - ->setEvent('functions.[functionId].executions.[executionId].update') - ->setParam('functionId', $function->getId()) - ->setParam('executionId', $execution->getId()) - ->trigger(); - - /** Trigger realtime event */ - $allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [ - 'functionId' => $function->getId(), - 'executionId' => $execution->getId() - ]); - $target = Realtime::fromPayload( - // Pass first, most verbose event pattern - event: $allEvents[0], - payload: $execution - ); - Realtime::send( - projectId: 'console', - payload: $execution->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); - Realtime::send( - projectId: $project->getId(), - payload: $execution->getArrayCopy(), - events: $allEvents, - channels: $target['channels'], - roles: $target['roles'] - ); - - /** Update usage stats */ - if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') { - $usage = new Stats($statsd); - $usage - ->setParam('projectId', $project->getId()) - ->setParam('projectInternalId', $project->getInternalId()) + + /** Check if build has exists */ + $build = $dbForProject->getDocument('builds', $deployment->getAttribute('buildId', '')); + if ($build->isEmpty()) { + throw new Exception('Build not found'); + } + + if ($build->getAttribute('status') !== 'ready') { + throw new Exception('Build not ready'); + } + + /** Check if runtime is supported */ + $runtimes = Config::getParam('runtimes', []); + + if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) { + throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); + } + + $runtime = $runtimes[$function->getAttribute('runtime')]; + + /** Create execution or update execution status */ + $execution = $dbForProject->getDocument('executions', $executionId ?? ''); + if ($execution->isEmpty()) { + $executionId = ID::unique(); + $execution = $dbForProject->createDocument('executions', new Document([ + '$id' => $executionId, + '$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))], + 'functionId' => $functionId, + 'deploymentId' => $deploymentId, + 'trigger' => $trigger, + 'status' => 'waiting', + 'statusCode' => 0, + 'response' => '', + 'stderr' => '', + 'duration' => 0.0, + 'search' => implode(' ', [$functionId, $executionId]), + ])); + + if ($execution->isEmpty()) { + throw new Exception('Failed to create or read execution'); + } + } + $execution->setAttribute('status', 'processing'); + $execution = $dbForProject->updateDocument('executions', $executionId, $execution); + + if ($build->getAttribute('status') !== 'ready') { + throw new Exception('Build not ready'); + } + + /** Check if runtime is supported */ + $runtimes = Config::getParam('runtimes', []); + + if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) { + throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); + } + + $runtime = $runtimes[$function->getAttribute('runtime')]; + + /** Create execution or update execution status */ + $execution = $dbForProject->getDocument('executions', $executionId ?? ''); + if ($execution->isEmpty()) { + $executionId = ID::unique(); + $execution = $dbForProject->createDocument('executions', new Document([ + '$id' => $executionId, + '$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))], + 'functionId' => $functionId, + 'deploymentId' => $deploymentId, + 'trigger' => $trigger, + 'status' => 'waiting', + 'statusCode' => 0, + 'response' => '', + 'stderr' => '', + 'duration' => 0.0, + 'search' => implode(' ', [$functionId, $executionId]), + ])); + + if ($execution->isEmpty()) { + throw new Exception('Failed to create or read execution'); + } + } + $execution->setAttribute('status', 'processing'); + $execution = $dbForProject->updateDocument('executions', $executionId, $execution); + + $vars = array_reduce($function['vars'] ?? [], function (array $carry, Document $var) { + $carry[$var->getAttribute('key')] = $var->getAttribute('value'); + return $carry; + }, []); + + /** Collect environment variables */ + $vars = \array_merge($vars, [ + 'APPWRITE_FUNCTION_ID' => $functionId, + 'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name'), + 'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId, + 'APPWRITE_FUNCTION_TRIGGER' => $trigger, + 'APPWRITE_FUNCTION_PROJECT_ID' => $project->getId(), + 'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'] ?? '', + 'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'] ?? '', + 'APPWRITE_FUNCTION_EVENT' => $event ?? '', + 'APPWRITE_FUNCTION_EVENT_DATA' => $eventData ?? '', + 'APPWRITE_FUNCTION_DATA' => $data ?? '', + 'APPWRITE_FUNCTION_USER_ID' => $user->getId() ?? '', + 'APPWRITE_FUNCTION_JWT' => $jwt ?? '', + ]); + + /** Execute function */ + $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); + try { + $executionResponse = $executor->createExecution( + projectId: $project->getId(), + deploymentId: $deployment->getId(), + payload: $vars['APPWRITE_FUNCTION_DATA'] ?? '', + variables: $vars, + timeout: $function->getAttribute('timeout', 0), + image: $runtime['image'], + source: $build->getAttribute('outputPath', ''), + entrypoint: $deployment->getAttribute('entrypoint', ''), + ); + + /** Update execution status */ + $execution + ->setAttribute('status', $executionResponse['status']) + ->setAttribute('statusCode', $executionResponse['statusCode']) + ->setAttribute('response', $executionResponse['response']) + ->setAttribute('stdout', $executionResponse['stdout']) + ->setAttribute('stderr', $executionResponse['stderr']) + ->setAttribute('duration', $executionResponse['duration']); + } catch (\Throwable $th) { + $interval = (new \DateTime())->diff(new \DateTime($execution->getCreatedAt())); + $execution + ->setAttribute('duration', (float)$interval->format('%s.%f')) + ->setAttribute('status', 'failed') + ->setAttribute('statusCode', $th->getCode()) + ->setAttribute('stderr', $th->getMessage()); + Console::error($th->getMessage()); + } + + $execution = $dbForProject->updateDocument('executions', $executionId, $execution); + + /** Trigger Webhook */ + $executionModel = new Execution(); + $executionUpdate = new Event(Event::WEBHOOK_QUEUE_NAME, Event::WEBHOOK_CLASS_NAME); + $executionUpdate + ->setProject($project) + ->setUser($user) + ->setEvent('functions.[functionId].executions.[executionId].update') ->setParam('functionId', $function->getId()) - ->setParam('executions.{scope}.compute', 1) - ->setParam('executionStatus', $execution->getAttribute('status', '')) - ->setParam('executionTime', $execution->getAttribute('duration')) - ->setParam('networkRequestSize', 0) - ->setParam('networkResponseSize', 0) - ->submit(); - } -}; + ->setParam('executionId', $execution->getId()) + ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))) + ->trigger(); + + /** Trigger Functions */ + $functions + ->setData($data ?? '') + ->setProject($project) + ->setUser($user) + ->setEvent('functions.[functionId].executions.[executionId].update') + ->setParam('functionId', $function->getId()) + ->setParam('executionId', $execution->getId()) + ->trigger(); + + /** Trigger realtime event */ + $allEvents = Event::generateEvents('functions.[functionId].executions.[executionId].update', [ + 'functionId' => $function->getId(), + 'executionId' => $execution->getId() + ]); + $target = Realtime::fromPayload( + // Pass first, most verbose event pattern + event: $allEvents[0], + payload: $execution + ); + Realtime::send( + projectId: 'console', + payload: $execution->getArrayCopy(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'] + ); + Realtime::send( + projectId: $project->getId(), + payload: $execution->getArrayCopy(), + events: $allEvents, + channels: $target['channels'], + roles: $target['roles'] + ); + + /** Update usage stats */ + if (App::getEnv('_APP_USAGE_STATS', 'enabled') === 'enabled') { + $usage = new Stats($statsd); + $usage + ->setParam('projectId', $project->getId()) + ->setParam('projectInternalId', $project->getInternalId()) + ->setParam('functionId', $function->getId()) + ->setParam('executions.{scope}.compute', 1) + ->setParam('executionStatus', $execution->getAttribute('status', '')) + ->setParam('executionTime', $execution->getAttribute('duration')) + ->setParam('networkRequestSize', 0) + ->setParam('networkResponseSize', 0) + ->submit(); + } + }; +}); $server->job() ->inject('message') ->inject('dbForProject') ->inject('functions') ->inject('statsd') - ->action(function (Message $message, Database $dbForProject, Func $functions, Client $statsd) use ($execute) { + ->inject('execute') + ->action(function (Message $message, Database $dbForProject, Func $functions, Client $statsd, callable $execute) { $payload = $message->getPayload() ?? []; if (empty($payload)) { @@ -276,7 +279,6 @@ $server->job() } var_dump(json_encode($payload)); - $type = $payload['type'] ?? ''; $events = $payload['events'] ?? []; $data = $payload['data'] ?? ''; @@ -298,7 +300,6 @@ $server->job() $offset = 0; $functions = []; /** @var Document[] $functions */ - while ($sum >= $limit) { $functions = $dbForProject->find('functions', [ Query::limit($limit), @@ -316,7 +317,19 @@ $server->job() continue; } Console::success('Iterating function: ' . $function->getAttribute('name')); - call_user_func($execute, $project, $function, $dbForProject, 'event', null, $events[0], $payload, null, $user, null, $statsd); + $execute( + statsd: $statsd, + dbForProject: $dbForProject, + project: $project, + function: $function, + trigger: 'event', + event: $events[0], + eventData: $payload, + user: $user, + data: null, + executionId: null, + jwt: null + ); Console::success('Triggered function: ' . $events[0]); } } @@ -332,10 +345,36 @@ $server->job() $jwt = $payload['jwt'] ?? ''; $execution = new Document($payload['execution'] ?? []); $user = new Document($payload['user'] ?? []); - call_user_func($execute, $project, $function, $dbForProject, $functions, 'http', $execution->getId(), null, null, $data, $user, $jwt, $statsd); + $execute( + project: $project, + function: $function, + dbForProject: $dbForProject, + functions: $functions, + trigger: 'http', + executionId: $execution->getId(), + event: null, + eventData: null, + data: $data, + user: $user, + jwt: $jwt, + statsd: $statsd, + ); break; case 'schedule': - call_user_func($execute, $project, $function, $dbForProject, $functions, 'schedule', null, null, null, null, null, null, $statsd); + $execute( + project: $project, + function: $function, + dbForProject: $dbForProject, + functions: $functions, + trigger: 'http', + executionId: null, + event: null, + eventData: null, + data: null, + user: null, + jwt: null, + statsd: $statsd, + ); break; } });