desc('Functions worker') ->groups(['functions']) ->inject('project') ->inject('message') ->inject('dbForProject') ->inject('queueForWebhooks') ->inject('queueForFunctions') ->inject('queueForRealtime') ->inject('queueForEvents') ->inject('queueForStatsUsage') ->inject('log') ->inject('isResourceBlocked') ->callback(fn (Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked)); } public function action(Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void { $payload = $message->getPayload() ?? []; if (empty($payload)) { throw new Exception('Missing payload'); } $type = $payload['type'] ?? ''; // Short-term solution to offhand write operation from API container if ($type === Func::TYPE_ASYNC_WRITE) { $execution = new Document($payload['execution'] ?? []); $dbForProject->createDocument('executions', $execution); return; } $events = $payload['events'] ?? []; $data = $payload['body'] ?? ''; $eventData = $payload['payload'] ?? ''; $function = new Document($payload['function'] ?? []); $functionId = $payload['functionId'] ?? ''; $user = new Document($payload['user'] ?? []); $userId = $payload['userId'] ?? ''; $method = $payload['method'] ?? 'POST'; $headers = $payload['headers'] ?? []; $path = $payload['path'] ?? '/'; $jwt = $payload['jwt'] ?? ''; if ($user->isEmpty() && !empty($userId)) { $user = $dbForProject->getDocument('users', $userId); } if (empty($jwt) && !$user->isEmpty()) { $jwtExpiry = $function->getAttribute('timeout', 900); $jwtObj = new JWT(System::getEnv('_APP_OPENSSL_KEY_V1'), 'HS256', $jwtExpiry, 0); $jwt = $jwtObj->encode([ 'userId' => $user->getId(), ]); } if ($project->getId() === 'console') { return; } if ($function->isEmpty() && !empty($functionId)) { $function = $dbForProject->getDocument('functions', $functionId); } $log->addTag('functionId', $function->getId()); $log->addTag('projectId', $project->getId()); $log->addTag('type', $type); if (!empty($events)) { $limit = 30; $sum = 30; $offset = 0; while ($sum >= $limit) { $functions = $dbForProject->find('functions', [ Query::limit($limit), Query::offset($offset), Query::orderAsc('name'), ]); $sum = \count($functions); $offset = $offset + $limit; Console::log('Fetched ' . $sum . ' functions...'); foreach ($functions as $function) { if (!array_intersect($events, $function->getAttribute('events', []))) { continue; } if ($isResourceBlocked($project, RESOURCE_TYPE_FUNCTIONS, $function->getId())) { Console::log('Function ' . $function->getId() . ' is blocked, skipping execution.'); continue; } Console::success('Iterating function: ' . $function->getAttribute('name')); $this->execute( log: $log, dbForProject: $dbForProject, queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, project: $project, function: $function, trigger: 'event', path: '/', method: 'POST', headers: [ 'user-agent' => 'Appwrite/' . APP_VERSION_STABLE, 'content-type' => 'application/json' ], data: null, user: $user, jwt: null, event: $events[0], eventData: \is_string($eventData) ? $eventData : \json_encode($eventData), executionId: null, ); Console::success('Triggered function: ' . $events[0]); } } return; } if ($isResourceBlocked($project, RESOURCE_TYPE_FUNCTIONS, $function->getId())) { Console::log('Function ' . $function->getId() . ' is blocked, skipping execution.'); return; } /** * Handle Schedule and HTTP execution. */ switch ($type) { case 'http': $execution = new Document($payload['execution'] ?? []); $user = new Document($payload['user'] ?? []); $this->execute( log: $log, dbForProject: $dbForProject, queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, project: $project, function: $function, trigger: 'http', path: $path, method: $method, headers: $headers, data: $data, user: $user, jwt: $jwt, event: null, eventData: null, executionId: $execution->getId() ); break; case 'schedule': $execution = new Document($payload['execution'] ?? []); $this->execute( log: $log, dbForProject: $dbForProject, queueForWebhooks: $queueForWebhooks, queueForFunctions: $queueForFunctions, queueForRealtime: $queueForRealtime, queueForStatsUsage: $queueForStatsUsage, queueForEvents: $queueForEvents, project: $project, function: $function, trigger: 'schedule', path: $path, method: $method, headers: $headers, data: $data, user: $user, jwt: $jwt, event: null, eventData: null, executionId: $execution->getId() ?? null ); break; } } /** * @param string $message * @param Document $function * @param string $trigger * @param string $path * @param string $method * @param Document $user * @param string|null $jwt * @param string|null $event * @throws Exception */ private function fail( string $message, Database $dbForProject, Document $function, string $trigger, string $path, string $method, Document $user, string $jwt = null, string $event = null, ): void { $headers['x-appwrite-trigger'] = $trigger; $headers['x-appwrite-event'] = $event ?? ''; $headers['x-appwrite-user-id'] = $user->getId() ?? ''; $headers['x-appwrite-user-jwt'] = $jwt ?? ''; $headersFiltered = []; foreach ($headers as $key => $value) { if (\in_array(\strtolower($key), FUNCTION_ALLOWLIST_HEADERS_REQUEST)) { $headersFiltered[] = ['name' => $key, 'value' => $value]; } } $executionId = ID::unique(); $execution = new Document([ '$id' => $executionId, '$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))], 'functionInternalId' => $function->getInternalId(), 'functionId' => $function->getId(), 'deploymentInternalId' => '', 'deploymentId' => '', 'trigger' => $trigger, 'status' => 'failed', 'responseStatusCode' => 0, 'responseHeaders' => [], 'requestPath' => $path, 'requestMethod' => $method, 'requestHeaders' => $headersFiltered, 'errors' => $message, 'logs' => '', 'duration' => 0.0, 'search' => implode(' ', [$function->getId(), $executionId]), ]); $execution = $dbForProject->createDocument('executions', $execution); if ($execution->isEmpty()) { throw new Exception('Failed to create execution'); } } /** * @param Log $log * @param Database $dbForProject * @param Func $queueForFunctions * @param Realtime $queueForRealtime * @param StatsUsage $queueForStatsUsage * @param Event $queueForEvents * @param Document $project * @param Document $function * @param string $trigger * @param string $path * @param string $method * @param array $headers * @param string|null $data * @param Document|null $user * @param string|null $jwt * @param string|null $event * @param string|null $eventData * @param string|null $executionId * @return void * @throws Authorization * @throws Structure * @throws \Utopia\Database\Exception * @throws Conflict */ private function execute( Log $log, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Event $queueForEvents, Document $project, Document $function, string $trigger, string $path, string $method, array $headers, string $data = null, ?Document $user = null, string $jwt = null, string $event = null, string $eventData = null, string $executionId = null, ): void { $user ??= new Document(); $functionId = $function->getId(); $deploymentId = $function->getAttribute('deployment', ''); $spec = Config::getParam('runtime-specifications')[$function->getAttribute('specification', APP_FUNCTION_SPECIFICATION_DEFAULT)]; $log->addTag('deploymentId', $deploymentId); /** Check if deployment exists */ $deployment = $dbForProject->getDocument('deployments', $deploymentId); if ($deployment->getAttribute('resourceId') !== $functionId) { $errorMessage = 'The execution could not be completed because a corresponding deployment was not found. A function deployment needs to be created before it can be executed. Please create a deployment for your function and try again.'; $this->fail($errorMessage, $dbForProject, $function, $trigger, $path, $method, $user, $jwt, $event); return; } if ($deployment->isEmpty()) { $errorMessage = 'The execution could not be completed because a corresponding deployment was not found. A function deployment needs to be created before it can be executed. Please create a deployment for your function and try again.'; $this->fail($errorMessage, $dbForProject, $function, $trigger, $path, $method, $user, $jwt, $event); return; } $buildId = $deployment->getAttribute('buildId', ''); $log->addTag('buildId', $buildId); /** Check if build has exists */ $build = $dbForProject->getDocument('builds', $buildId); if ($build->isEmpty()) { $errorMessage = 'The execution could not be completed because a corresponding deployment was not found. A function deployment needs to be created before it can be executed. Please create a deployment for your function and try again.'; $this->fail($errorMessage, $dbForProject, $function, $trigger, $path, $method, $user, $jwt, $event); return; } if ($build->getAttribute('status') !== 'ready') { $errorMessage = 'The execution could not be completed because the build is not ready. Please wait for the build to complete and try again.'; $this->fail($errorMessage, $dbForProject, $function, $trigger, $path, $method, $user, $jwt, $event); return; } /** Check if runtime is supported */ $version = $function->getAttribute('version', 'v2'); $runtimes = Config::getParam($version === 'v2' ? 'runtimes-v2' : 'runtimes', []); if (!\array_key_exists($function->getAttribute('runtime'), $runtimes)) { throw new Exception('Runtime "' . $function->getAttribute('runtime', '') . '" is not supported'); } $runtime = $runtimes[$function->getAttribute('runtime')]; $jwtExpiry = $function->getAttribute('timeout', 900); $jwtObj = new JWT(System::getEnv('_APP_OPENSSL_KEY_V1'), 'HS256', $jwtExpiry, 0); $apiKey = $jwtObj->encode([ 'projectId' => $project->getId(), 'scopes' => $function->getAttribute('scopes', []) ]); $headers['x-appwrite-key'] = API_KEY_DYNAMIC . '_' . $apiKey; $headers['x-appwrite-trigger'] = $trigger; $headers['x-appwrite-event'] = $event ?? ''; $headers['x-appwrite-user-id'] = $user->getId() ?? ''; $headers['x-appwrite-user-jwt'] = $jwt ?? ''; $headers['x-appwrite-country-code'] = ''; $headers['x-appwrite-continent-code'] = ''; $headers['x-appwrite-continent-eu'] = 'false'; /** Create execution or update execution status */ $execution = $dbForProject->getDocument('executions', $executionId ?? ''); if ($execution->isEmpty()) { $headersFiltered = []; foreach ($headers as $key => $value) { if (\in_array(\strtolower($key), FUNCTION_ALLOWLIST_HEADERS_REQUEST)) { $headersFiltered[] = [ 'name' => $key, 'value' => $value ]; } } $executionId = ID::unique(); $execution = new Document([ '$id' => $executionId, '$permissions' => $user->isEmpty() ? [] : [Permission::read(Role::user($user->getId()))], 'functionInternalId' => $function->getInternalId(), 'functionId' => $function->getId(), 'deploymentInternalId' => $deployment->getInternalId(), 'deploymentId' => $deployment->getId(), 'trigger' => $trigger, 'status' => 'processing', 'responseStatusCode' => 0, 'responseHeaders' => [], 'requestPath' => $path, 'requestMethod' => $method, 'requestHeaders' => $headersFiltered, 'errors' => '', 'logs' => '', 'duration' => 0.0, 'search' => implode(' ', [$functionId, $executionId]), ]); $execution = $dbForProject->createDocument('executions', $execution); // TODO: @Meldiron Trigger executions.create event here if ($execution->isEmpty()) { throw new Exception('Failed to create or read execution'); } } if ($execution->getAttribute('status') !== 'processing') { $execution->setAttribute('status', 'processing'); $execution = $dbForProject->updateDocument('executions', $executionId, $execution); } $durationStart = \microtime(true); $body = $eventData ?? ''; if (empty($body)) { $body = $data ?? ''; } $vars = []; // V2 vars if ($version === 'v2') { $vars = \array_merge($vars, [ 'APPWRITE_FUNCTION_TRIGGER' => $headers['x-appwrite-trigger'] ?? '', 'APPWRITE_FUNCTION_DATA' => $body ?? '', 'APPWRITE_FUNCTION_EVENT_DATA' => $body ?? '', 'APPWRITE_FUNCTION_EVENT' => $headers['x-appwrite-event'] ?? '', 'APPWRITE_FUNCTION_USER_ID' => $headers['x-appwrite-user-id'] ?? '', 'APPWRITE_FUNCTION_JWT' => $headers['x-appwrite-user-jwt'] ?? '' ]); } // Shared vars foreach ($function->getAttribute('varsProject', []) as $var) { $vars[$var->getAttribute('key')] = $var->getAttribute('value', ''); } // Function vars foreach ($function->getAttribute('vars', []) as $var) { $vars[$var->getAttribute('key')] = $var->getAttribute('value', ''); } $protocol = System::getEnv('_APP_OPTIONS_FORCE_HTTPS') == 'disabled' ? 'http' : 'https'; $hostname = System::getEnv('_APP_DOMAIN'); $endpoint = $protocol . '://' . $hostname . "/v1"; // Appwrite vars $vars = \array_merge($vars, [ 'APPWRITE_FUNCTION_API_ENDPOINT' => $endpoint, 'APPWRITE_FUNCTION_ID' => $functionId, 'APPWRITE_FUNCTION_NAME' => $function->getAttribute('name'), 'APPWRITE_FUNCTION_DEPLOYMENT' => $deploymentId, 'APPWRITE_FUNCTION_PROJECT_ID' => $project->getId(), 'APPWRITE_FUNCTION_RUNTIME_NAME' => $runtime['name'] ?? '', 'APPWRITE_FUNCTION_RUNTIME_VERSION' => $runtime['version'] ?? '', 'APPWRITE_FUNCTION_CPUS' => ($spec['cpus'] ?? APP_FUNCTION_CPUS_DEFAULT), 'APPWRITE_FUNCTION_MEMORY' => ($spec['memory'] ?? APP_FUNCTION_MEMORY_DEFAULT), 'APPWRITE_VERSION' => APP_VERSION_STABLE, 'APPWRITE_REGION' => $project->getAttribute('region'), 'APPWRITE_DEPLOYMENT_TYPE' => $deployment->getAttribute('type', ''), 'APPWRITE_VCS_REPOSITORY_ID' => $deployment->getAttribute('providerRepositoryId', ''), 'APPWRITE_VCS_REPOSITORY_NAME' => $deployment->getAttribute('providerRepositoryName', ''), 'APPWRITE_VCS_REPOSITORY_OWNER' => $deployment->getAttribute('providerRepositoryOwner', ''), 'APPWRITE_VCS_REPOSITORY_URL' => $deployment->getAttribute('providerRepositoryUrl', ''), 'APPWRITE_VCS_REPOSITORY_BRANCH' => $deployment->getAttribute('providerBranch', ''), 'APPWRITE_VCS_REPOSITORY_BRANCH_URL' => $deployment->getAttribute('providerBranchUrl', ''), 'APPWRITE_VCS_COMMIT_HASH' => $deployment->getAttribute('providerCommitHash', ''), 'APPWRITE_VCS_COMMIT_MESSAGE' => $deployment->getAttribute('providerCommitMessage', ''), 'APPWRITE_VCS_COMMIT_URL' => $deployment->getAttribute('providerCommitUrl', ''), 'APPWRITE_VCS_COMMIT_AUTHOR_NAME' => $deployment->getAttribute('providerCommitAuthor', ''), 'APPWRITE_VCS_COMMIT_AUTHOR_URL' => $deployment->getAttribute('providerCommitAuthorUrl', ''), 'APPWRITE_VCS_ROOT_DIRECTORY' => $deployment->getAttribute('providerRootDirectory', ''), ]); /** Execute function */ try { $version = $function->getAttribute('version', 'v2'); $command = $runtime['startCommand']; $executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST')); $command = $version === 'v2' ? '' : 'cp /tmp/code.tar.gz /mnt/code/code.tar.gz && nohup helpers/start.sh "' . $command . '"'; $executionResponse = $executor->createExecution( projectId: $project->getId(), deploymentId: $deploymentId, body: \strlen($body) > 0 ? $body : null, variables: $vars, timeout: $function->getAttribute('timeout', 0), image: $runtime['image'], source: $build->getAttribute('path', ''), entrypoint: $deployment->getAttribute('entrypoint', ''), version: $version, path: $path, method: $method, headers: $headers, runtimeEntrypoint: $command, cpus: $spec['cpus'] ?? APP_FUNCTION_CPUS_DEFAULT, memory: $spec['memory'] ?? APP_FUNCTION_MEMORY_DEFAULT, logging: $function->getAttribute('logging', true), ); $status = $executionResponse['statusCode'] >= 500 ? 'failed' : 'completed'; $headersFiltered = []; foreach ($executionResponse['headers'] as $key => $value) { if (\in_array(\strtolower($key), FUNCTION_ALLOWLIST_HEADERS_RESPONSE)) { $headersFiltered[] = [ 'name' => $key, 'value' => $value ]; } } /** Update execution status */ $execution ->setAttribute('status', $status) ->setAttribute('responseStatusCode', $executionResponse['statusCode']) ->setAttribute('responseHeaders', $headersFiltered) ->setAttribute('logs', $executionResponse['logs']) ->setAttribute('errors', $executionResponse['errors']) ->setAttribute('duration', $executionResponse['duration']); } catch (\Throwable $th) { $durationEnd = \microtime(true); $execution ->setAttribute('duration', $durationEnd - $durationStart) ->setAttribute('status', 'failed') ->setAttribute('responseStatusCode', 500) ->setAttribute('errors', $th->getMessage() . '\nError Code: ' . $th->getCode()); $error = $th->getMessage(); $errorCode = $th->getCode(); } finally { /** Trigger usage queue */ $queueForStatsUsage ->setProject($project) ->addMetric(METRIC_EXECUTIONS, 1) ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS), 1) ->addMetric(METRIC_EXECUTIONS_COMPUTE, (int)($execution->getAttribute('duration') * 1000))// per project ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_COMPUTE), (int)($execution->getAttribute('duration') * 1000)) ->addMetric(METRIC_EXECUTIONS_MB_SECONDS, (int)(($spec['memory'] ?? APP_FUNCTION_MEMORY_DEFAULT) * $execution->getAttribute('duration', 0) * ($spec['cpus'] ?? APP_FUNCTION_CPUS_DEFAULT))) ->addMetric(str_replace('{functionInternalId}', $function->getInternalId(), METRIC_FUNCTION_ID_EXECUTIONS_MB_SECONDS), (int)(($spec['memory'] ?? APP_FUNCTION_MEMORY_DEFAULT) * $execution->getAttribute('duration', 0) * ($spec['cpus'] ?? APP_FUNCTION_CPUS_DEFAULT))) ->trigger() ; } $execution = $dbForProject->updateDocument('executions', $executionId, $execution); $executionModel = new Execution(); $queueForEvents ->setProject($project) ->setUser($user) ->setEvent('functions.[functionId].executions.[executionId].update') ->setParam('functionId', $function->getId()) ->setParam('executionId', $execution->getId()) ->setPayload($execution->getArrayCopy(array_keys($executionModel->getRules()))); /** Trigger Webhook */ $queueForWebhooks ->from($queueForEvents) ->trigger(); /** Trigger Functions */ $queueForFunctions ->from($queueForEvents) ->trigger(); /** Trigger Realtime Events */ $queueForRealtime ->from($queueForEvents) ->setSubscribers(['console', $project->getId()]) ->trigger(); if (!empty($error)) { throw new Exception($error, $errorCode); } } }