diff --git a/app/cli.php b/app/cli.php index f080217365..82f229673e 100644 --- a/app/cli.php +++ b/app/cli.php @@ -9,6 +9,7 @@ use Appwrite\Event\StatsResources; use Appwrite\Event\StatsUsage; use Appwrite\Platform\Appwrite; use Appwrite\Runtimes\Runtimes; +use Executor\Executor; use Utopia\Cache\Adapter\Sharding; use Utopia\Cache\Cache; use Utopia\CLI\CLI; @@ -255,6 +256,8 @@ CLI::setResource('logError', function (Registry $register) { }; }, ['register']); +CLI::setResource('executor', fn () => new Executor(fn (string $projectId, string $deploymentId) => System::getEnv('_APP_EXECUTOR_HOST'))); + $platform = new Appwrite(); $platform->init(Service::TYPE_TASK); diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 37932e4165..c7f24f984a 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -869,7 +869,8 @@ App::put('/v1/functions/:functionId') ->inject('queueForBuilds') ->inject('dbForPlatform') ->inject('gitHub') - ->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, ?string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $specification, Request $request, Response $response, Database $dbForProject, Document $project, Event $queueForEvents, Build $queueForBuilds, Database $dbForPlatform, GitHub $github) use ($redeployVcs) { + ->inject('executor') + ->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, ?string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $specification, Request $request, Response $response, Database $dbForProject, Document $project, Event $queueForEvents, Build $queueForBuilds, Database $dbForPlatform, GitHub $github, Executor $executor) use ($redeployVcs) { // TODO: If only branch changes, re-deploy $function = $dbForProject->getDocument('functions', $functionId); @@ -972,7 +973,6 @@ App::put('/v1/functions/:functionId') // Enforce Cold Start if spec limits change. if ($function->getAttribute('specification') !== $specification && !empty($function->getAttribute('deployment'))) { - $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); try { $executor->deleteRuntime($project->getId(), $function->getAttribute('deployment')); } catch (\Throwable $th) { @@ -1779,7 +1779,8 @@ App::patch('/v1/functions/:functionId/deployments/:deploymentId/build') ->inject('dbForProject') ->inject('project') ->inject('queueForEvents') - ->action(function (string $functionId, string $deploymentId, Response $response, Database $dbForProject, Document $project, Event $queueForEvents) { + ->inject('executor') + ->action(function (string $functionId, string $deploymentId, Response $response, Database $dbForProject, Document $project, Event $queueForEvents, Executor $executor) { $function = $dbForProject->getDocument('functions', $functionId); if ($function->isEmpty()) { @@ -1834,7 +1835,6 @@ App::patch('/v1/functions/:functionId/deployments/:deploymentId/build') $dbForProject->purgeCachedDocument('deployments', $deployment->getId()); try { - $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); $executor->deleteRuntime($project->getId(), $deploymentId . "-build"); } catch (\Throwable $th) { // Don't throw if the deployment doesn't exist @@ -1886,8 +1886,9 @@ App::post('/v1/functions/:functionId/executions') ->inject('queueForEvents') ->inject('queueForStatsUsage') ->inject('queueForFunctions') + ->inject('executor') ->inject('geodb') - ->action(function (string $functionId, string $body, mixed $async, string $path, string $method, mixed $headers, ?string $scheduledAt, Response $response, Request $request, Document $project, Database $dbForProject, Database $dbForPlatform, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb) { + ->action(function (string $functionId, string $body, mixed $async, string $path, string $method, mixed $headers, ?string $scheduledAt, Response $response, Request $request, Document $project, Database $dbForProject, Database $dbForPlatform, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Executor $executor, Reader $geodb) { $async = \strval($async) === 'true' || \strval($async) === '1'; if (!$async && !is_null($scheduledAt)) { @@ -2160,7 +2161,6 @@ App::post('/v1/functions/:functionId/executions') ]); /** Execute function */ - $executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST')); try { $version = $function->getAttribute('version', 'v2'); $command = $runtime['startCommand']; diff --git a/app/controllers/general.php b/app/controllers/general.php index 1423763cec..6c1167cf76 100644 --- a/app/controllers/general.php +++ b/app/controllers/general.php @@ -50,7 +50,7 @@ Config::setParam('domainVerification', false); Config::setParam('cookieDomain', 'localhost'); Config::setParam('cookieSamesite', Response::COOKIE_SAMESITE_NONE); -function router(App $utopia, Database $dbForPlatform, callable $getProjectDB, SwooleRequest $swooleRequest, Request $request, Response $response, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) +function router(App $utopia, Database $dbForPlatform, callable $getProjectDB, SwooleRequest $swooleRequest, Request $request, Response $response, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Executor $executor, Reader $geodb, callable $isResourceBlocked, string $previewHostname) { $utopia->getRoute()?->label('error', __DIR__ . '/../views/general/error.phtml'); @@ -339,7 +339,6 @@ function router(App $utopia, Database $dbForPlatform, callable $getProjectDB, Sw ]); /** Execute function */ - $executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST')); try { $version = $function->getAttribute('version', 'v2'); $command = $runtime['startCommand']; @@ -503,9 +502,10 @@ App::init() ->inject('queueForEvents') ->inject('queueForCertificates') ->inject('queueForFunctions') + ->inject('executor') ->inject('isResourceBlocked') ->inject('previewHostname') - ->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Document $console, Document $project, Database $dbForPlatform, callable $getProjectDB, Locale $locale, array $localeCodes, array $clients, Reader $geodb, StatsUsage $queueForStatsUsage, Event $queueForEvents, Certificate $queueForCertificates, Func $queueForFunctions, callable $isResourceBlocked, string $previewHostname) { + ->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Document $console, Document $project, Database $dbForPlatform, callable $getProjectDB, Locale $locale, array $localeCodes, array $clients, Reader $geodb, StatsUsage $queueForStatsUsage, Event $queueForEvents, Certificate $queueForCertificates, Func $queueForFunctions, Executor $executor, callable $isResourceBlocked, string $previewHostname) { /* * Appwrite Router */ @@ -513,7 +513,7 @@ App::init() $mainDomain = System::getEnv('_APP_DOMAIN', ''); // Only run Router when external domain if ($host !== $mainDomain || !empty($previewHostname)) { - if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname)) { + if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $executor, $geodb, $isResourceBlocked, $previewHostname)) { return; } } @@ -742,11 +742,12 @@ App::options() ->inject('queueForEvents') ->inject('queueForStatsUsage') ->inject('queueForFunctions') + ->inject('executor') ->inject('geodb') ->inject('isResourceBlocked') ->inject('previewHostname') ->inject('project') - ->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname, Document $project) { + ->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Executor $executor, Reader $geodb, callable $isResourceBlocked, string $previewHostname, Document $project) { /* * Appwrite Router */ @@ -754,7 +755,7 @@ App::options() $mainDomain = System::getEnv('_APP_DOMAIN', ''); // Only run Router when external domain if ($host !== $mainDomain || !empty($previewHostname)) { - if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname)) { + if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $executor, $geodb, $isResourceBlocked, $previewHostname)) { return; } } @@ -1062,10 +1063,11 @@ App::get('/robots.txt') ->inject('queueForEvents') ->inject('queueForStatsUsage') ->inject('queueForFunctions') + ->inject('executor') ->inject('geodb') ->inject('isResourceBlocked') ->inject('previewHostname') - ->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) { + ->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Executor $executor, Reader $geodb, callable $isResourceBlocked, string $previewHostname) { $host = $request->getHostname() ?? ''; $mainDomain = System::getEnv('_APP_DOMAIN', ''); @@ -1073,7 +1075,7 @@ App::get('/robots.txt') $template = new View(__DIR__ . '/../views/general/robots.phtml'); $response->text($template->render(false)); } else { - router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname); + router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $executor, $geodb, $isResourceBlocked, $previewHostname); } }); @@ -1090,10 +1092,11 @@ App::get('/humans.txt') ->inject('queueForEvents') ->inject('queueForStatsUsage') ->inject('queueForFunctions') + ->inject('executor') ->inject('geodb') ->inject('isResourceBlocked') ->inject('previewHostname') - ->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) { + ->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Executor $executor, Reader $geodb, callable $isResourceBlocked, string $previewHostname) { $host = $request->getHostname() ?? ''; $mainDomain = System::getEnv('_APP_DOMAIN', ''); @@ -1101,7 +1104,7 @@ App::get('/humans.txt') $template = new View(__DIR__ . '/../views/general/humans.phtml'); $response->text($template->render(false)); } else { - router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname); + router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $executor, $geodb, $isResourceBlocked, $previewHostname); } }); diff --git a/app/init/resources.php b/app/init/resources.php index 4e53b24c06..2360179913 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -21,6 +21,7 @@ use Appwrite\Extend\Exception; use Appwrite\GraphQL\Schema; use Appwrite\Network\Validator\Origin; use Appwrite\Utopia\Request; +use Executor\Executor; use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis; use Utopia\App; use Utopia\Cache\Adapter\Sharding; @@ -38,6 +39,7 @@ use Utopia\Logger\Log; use Utopia\Pools\Group; use Utopia\Queue\Publisher; use Utopia\Storage\Device; +use Utopia\Storage\Device\AWS; use Utopia\Storage\Device\Backblaze; use Utopia\Storage\Device\DOSpaces; use Utopia\Storage\Device\Linode; @@ -540,7 +542,12 @@ function getDevice(string $root, string $connection = ''): Device switch ($device) { case Storage::DEVICE_S3: - return new S3($root, $accessKey, $accessSecret, $bucket, $region, $acl, $url); + if (!empty($url)) { + return new S3($root, $accessKey, $accessSecret, $url, $region, $acl); + } else { + return new AWS($root, $accessKey, $accessSecret, $bucket, $region, $acl); + } + // no break case STORAGE::DEVICE_DO_SPACES: $device = new DOSpaces($root, $accessKey, $accessSecret, $bucket, $region, $acl); $device->setHttpVersion(S3::HTTP_VERSION_1_1); @@ -567,7 +574,12 @@ function getDevice(string $root, string $connection = ''): Device $s3Bucket = System::getEnv('_APP_STORAGE_S3_BUCKET', ''); $s3Acl = 'private'; $s3EndpointUrl = System::getEnv('_APP_STORAGE_S3_ENDPOINT', ''); - return new S3($root, $s3AccessKey, $s3SecretKey, $s3Bucket, $s3Region, $s3Acl, $s3EndpointUrl); + if (!empty($s3EndpointUrl)) { + return new S3($root, $s3AccessKey, $s3SecretKey, $s3EndpointUrl, $s3Region, $s3Acl); + } else { + return new AWS($root, $s3AccessKey, $s3SecretKey, $s3Bucket, $s3Region, $s3Acl); + } + // no break case Storage::DEVICE_DO_SPACES: $doSpacesAccessKey = System::getEnv('_APP_STORAGE_DO_SPACES_ACCESS_KEY', ''); $doSpacesSecretKey = System::getEnv('_APP_STORAGE_DO_SPACES_SECRET', ''); @@ -822,3 +834,5 @@ App::setResource('apiKey', function (Request $request, Document $project): ?Key return Key::decode($project, $key); }, ['request', 'project']); + +App::setResource('executor', fn () => new Executor(fn (string $projectId, string $deploymentId) => System::getEnv('_APP_EXECUTOR_HOST'))); diff --git a/app/worker.php b/app/worker.php index 90496c0430..6a51ee55be 100644 --- a/app/worker.php +++ b/app/worker.php @@ -18,6 +18,7 @@ use Appwrite\Event\StatsUsage; use Appwrite\Event\StatsUsageDump; use Appwrite\Event\Webhook; use Appwrite\Platform\Appwrite; +use Executor\Executor; use Swoole\Runtime; use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis; use Utopia\Cache\Adapter\Sharding; @@ -413,6 +414,8 @@ Server::setResource('logError', function (Registry $register, Document $project) }; }, ['register', 'project']); +Server::setResource('executor', fn () => new Executor(fn (string $projectId, string $deploymentId) => System::getEnv('_APP_EXECUTOR_HOST'))); + $pools = $register->get('pools'); $platform = new Appwrite(); $args = $platform->getEnv('argv'); diff --git a/docker-compose.yml b/docker-compose.yml index 05ddba967a..8c8a364f30 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -198,6 +198,8 @@ services: - _APP_DATABASE_SHARED_TABLES_V1 - _APP_DATABASE_SHARED_NAMESPACE - _APP_FUNCTIONS_CREATION_ABUSE_LIMIT + extra_hosts: + - "host.docker.internal:host-gateway" appwrite-console: <<: *x-logging @@ -489,6 +491,8 @@ services: - _APP_STORAGE_WASABI_REGION - _APP_STORAGE_WASABI_BUCKET - _APP_DATABASE_SHARED_TABLES + extra_hosts: + - "host.docker.internal:host-gateway" appwrite-worker-certificates: entrypoint: worker-certificates @@ -1131,4 +1135,4 @@ volumes: appwrite-certificates: appwrite-functions: appwrite-builds: - appwrite-config: \ No newline at end of file + appwrite-config: diff --git a/src/Appwrite/Platform/Workers/Builds.php b/src/Appwrite/Platform/Workers/Builds.php index 6f26e9a80c..4057d4b190 100644 --- a/src/Appwrite/Platform/Workers/Builds.php +++ b/src/Appwrite/Platform/Workers/Builds.php @@ -59,8 +59,9 @@ class Builds extends Action ->inject('deviceForFunctions') ->inject('isResourceBlocked') ->inject('log') - ->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log) => - $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $usage, $cache, $dbForProject, $deviceForFunctions, $isResourceBlocked, $log)); + ->inject('executor') + ->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log, Executor $executor) => + $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $usage, $cache, $dbForProject, $deviceForFunctions, $isResourceBlocked, $log, $executor)); } /** @@ -76,10 +77,11 @@ class Builds extends Action * @param Database $dbForProject * @param Device $deviceForFunctions * @param Log $log + * @param Executor $executor * @return void * @throws \Utopia\Database\Exception */ - public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log): void + public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log, Executor $executor): void { $payload = $message->getPayload() ?? []; @@ -100,7 +102,7 @@ class Builds extends Action case BUILD_TYPE_RETRY: Console::info('Creating build for deployment: ' . $deployment->getId()); $github = new GitHub($cache); - $this->buildDeployment($deviceForFunctions, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $isResourceBlocked, $log); + $this->buildDeployment($deviceForFunctions, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $isResourceBlocked, $log, $executor); break; default: @@ -123,14 +125,13 @@ class Builds extends Action * @param Document $deployment * @param Document $template * @param Log $log + * @param Executor $executor * @return void * @throws \Utopia\Database\Exception * @throws Exception */ - protected function buildDeployment(Device $deviceForFunctions, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, callable $isResourceBlocked, Log $log): void + protected function buildDeployment(Device $deviceForFunctions, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, callable $isResourceBlocked, Log $log, Executor $executor): void { - $executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST')); - $functionId = $function->getId(); $log->addTag('functionId', $function->getId()); diff --git a/src/Appwrite/Platform/Workers/Deletes.php b/src/Appwrite/Platform/Workers/Deletes.php index 09c60c7740..5092599d67 100644 --- a/src/Appwrite/Platform/Workers/Deletes.php +++ b/src/Appwrite/Platform/Workers/Deletes.php @@ -55,12 +55,13 @@ class Deletes extends Action ->inject('deviceForBuilds') ->inject('deviceForCache') ->inject('certificates') + ->inject('executor') ->inject('executionRetention') ->inject('auditRetention') ->inject('log') ->callback( - fn ($message, Document $project, Database $dbForPlatform, callable $getProjectDB, callable $getLogsDB, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, string $executionRetention, string $auditRetention, Log $log) => - $this->action($message, $project, $dbForPlatform, $getProjectDB, $getLogsDB, $deviceForFiles, $deviceForFunctions, $deviceForBuilds, $deviceForCache, $certificates, $executionRetention, $auditRetention, $log) + fn ($message, Document $project, Database $dbForPlatform, callable $getProjectDB, callable $getLogsDB, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, Executor $executor, string $executionRetention, string $auditRetention, Log $log) => + $this->action($message, $project, $dbForPlatform, $getProjectDB, $getLogsDB, $deviceForFiles, $deviceForFunctions, $deviceForBuilds, $deviceForCache, $certificates, $executor, $executionRetention, $auditRetention, $log) ); } @@ -68,7 +69,7 @@ class Deletes extends Action * @throws Exception * @throws Throwable */ - public function action(Message $message, Document $project, Database $dbForPlatform, callable $getProjectDB, callable $getLogsDB, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, string $executionRetention, string $auditRetention, Log $log): void + public function action(Message $message, Document $project, Database $dbForPlatform, callable $getProjectDB, callable $getLogsDB, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, Executor $executor, string $executionRetention, string $auditRetention, Log $log): void { $payload = $message->getPayload() ?? []; @@ -93,10 +94,10 @@ class Deletes extends Action $this->deleteProject($dbForPlatform, $getProjectDB, $deviceForFiles, $deviceForFunctions, $deviceForBuilds, $deviceForCache, $certificates, $document); break; case DELETE_TYPE_FUNCTIONS: - $this->deleteFunction($dbForPlatform, $getProjectDB, $deviceForFunctions, $deviceForBuilds, $certificates, $document, $project); + $this->deleteFunction($dbForPlatform, $getProjectDB, $deviceForFunctions, $deviceForBuilds, $certificates, $document, $project, $executor); break; case DELETE_TYPE_DEPLOYMENTS: - $this->deleteDeployment($getProjectDB, $deviceForFunctions, $deviceForBuilds, $document, $project); + $this->deleteDeployment($getProjectDB, $deviceForFunctions, $deviceForBuilds, $document, $project, $executor); break; case DELETE_TYPE_USERS: $this->deleteUser($getProjectDB, $document, $project); @@ -827,10 +828,11 @@ class Deletes extends Action * @param Device $deviceForBuilds * @param Document $document function document * @param Document $project + * @param Executor $executor * @return void * @throws Exception */ - private function deleteFunction(Database $dbForPlatform, callable $getProjectDB, Device $deviceForFunctions, Device $deviceForBuilds, CertificatesAdapter $certificates, Document $document, Document $project): void + private function deleteFunction(Database $dbForPlatform, callable $getProjectDB, Device $deviceForFunctions, Device $deviceForBuilds, CertificatesAdapter $certificates, Document $document, Document $project, Executor $executor): void { $projectId = $project->getId(); $dbForProject = $getProjectDB($project); @@ -922,7 +924,7 @@ class Deletes extends Action * Request executor to delete all deployment containers */ Console::info("Requesting executor to delete all deployment containers for function " . $functionId); - $this->deleteRuntimes($getProjectDB, $document, $project); + $this->deleteRuntimes($getProjectDB, $document, $project, $executor); } /** @@ -993,10 +995,11 @@ class Deletes extends Action * @param Device $deviceForBuilds * @param Document $document * @param Document $project + * @param Executor $executor * @return void * @throws Exception */ - private function deleteDeployment(callable $getProjectDB, Device $deviceForFunctions, Device $deviceForBuilds, Document $document, Document $project): void + private function deleteDeployment(callable $getProjectDB, Device $deviceForFunctions, Device $deviceForBuilds, Document $document, Document $project, Executor $executor): void { $projectId = $project->getId(); $dbForProject = $getProjectDB($project); @@ -1024,7 +1027,7 @@ class Deletes extends Action * Request executor to delete all deployment containers */ Console::info("Requesting executor to delete deployment container for deployment " . $deploymentId); - $this->deleteRuntimes($getProjectDB, $document, $project); + $this->deleteRuntimes($getProjectDB, $document, $project, $executor); } /** @@ -1180,13 +1183,12 @@ class Deletes extends Action * @param callable $getProjectDB * @param ?Document $function * @param Document $project + * @param Executor $executor * @return void * @throws Exception */ - private function deleteRuntimes(callable $getProjectDB, ?Document $function, Document $project): void + private function deleteRuntimes(callable $getProjectDB, ?Document $function, Document $project, Executor $executor): void { - $executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST')); - $deleteByFunction = function (Document $function) use ($getProjectDB, $project, $executor) { $this->listByGroup( 'deployments', diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php index a7caa3207f..4e1794c085 100644 --- a/src/Appwrite/Platform/Workers/Functions.php +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -51,11 +51,12 @@ class Functions extends Action ->inject('queueForEvents') ->inject('queueForStatsUsage') ->inject('log') + ->inject('executor') ->inject('isResourceBlocked') - ->callback(fn (Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked)); + ->callback(fn (Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, Executor $executor, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $log, $executor, $isResourceBlocked)); } - public function action(Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void + public function action(Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, Executor $executor, callable $isResourceBlocked): void { $payload = $message->getPayload() ?? []; @@ -146,6 +147,7 @@ class Functions extends Action queueForEvents: $queueForEvents, project: $project, function: $function, + executor: $executor, trigger: 'event', path: '/', method: 'POST', @@ -188,6 +190,7 @@ class Functions extends Action queueForEvents: $queueForEvents, project: $project, function: $function, + executor: $executor, trigger: 'http', path: $path, method: $method, @@ -212,6 +215,7 @@ class Functions extends Action queueForEvents: $queueForEvents, project: $project, function: $function, + executor: $executor, trigger: 'schedule', path: $path, method: $method, @@ -298,6 +302,7 @@ class Functions extends Action * @param Event $queueForEvents * @param Document $project * @param Document $function + * @param Executor $executor * @param string $trigger * @param string $path * @param string $method @@ -324,6 +329,7 @@ class Functions extends Action Event $queueForEvents, Document $project, Document $function, + Executor $executor, string $trigger, string $path, string $method, @@ -514,7 +520,6 @@ class Functions extends Action try { $version = $function->getAttribute('version', 'v2'); $command = $runtime['startCommand']; - $executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST')); $command = $version === 'v2' ? '' : 'cp /tmp/code.tar.gz /mnt/code/code.tar.gz && nohup helpers/start.sh "' . $command . '"'; $executionResponse = $executor->createExecution( projectId: $project->getId(), diff --git a/src/Executor/Executor.php b/src/Executor/Executor.php index c230cfb664..6bc2fe7aab 100644 --- a/src/Executor/Executor.php +++ b/src/Executor/Executor.php @@ -21,17 +21,19 @@ class Executor private bool $selfSigned = false; - private string $endpoint; + /** + * @var callable(string, string): string $endpoint + */ + private $endpointSelector; protected array $headers; - public function __construct(string $endpoint) + /** + * @param callable(string, string): string $endpointSelector + */ + public function __construct(callable $endpointSelector) { - if (!filter_var($endpoint, FILTER_VALIDATE_URL)) { - throw new Exception('Unsupported endpoint'); - } - - $this->endpoint = $endpoint; + $this->endpointSelector = $endpointSelector; $this->headers = [ 'content-type' => 'application/json', 'authorization' => 'Bearer ' . System::getEnv('_APP_EXECUTOR_SECRET', ''), @@ -92,7 +94,8 @@ class Executor 'timeout' => $timeout, ]; - $response = $this->call(self::METHOD_POST, $route, [ 'x-opr-runtime-id' => $runtimeId ], $params, true, $timeout); + $endpoint = $this->selectEndpoint($projectId, $deploymentId); + $response = $this->call($endpoint, self::METHOD_POST, $route, [ 'x-opr-runtime-id' => $runtimeId ], $params, true, $timeout); $status = $response['headers']['status-code']; if ($status >= 400) { @@ -123,7 +126,8 @@ class Executor 'timeout' => $timeout ]; - $this->call(self::METHOD_GET, $route, [ 'x-opr-runtime-id' => $runtimeId ], $params, true, $timeout, $callback); + $endpoint = $this->selectEndpoint($projectId, $deploymentId); + $this->call($endpoint, self::METHOD_GET, $route, [ 'x-opr-runtime-id' => $runtimeId ], $params, true, $timeout, $callback); } /** @@ -139,7 +143,8 @@ class Executor $runtimeId = "$projectId-$deploymentId"; $route = "/runtimes/$runtimeId"; - $response = $this->call(self::METHOD_DELETE, $route, [ + $endpoint = $this->selectEndpoint($projectId, $deploymentId); + $response = $this->call($endpoint, self::METHOD_DELETE, $route, [ 'x-opr-addressing-method' => 'broadcast' ], [], true, 30); @@ -227,7 +232,8 @@ class Executor $requestTimeout = $timeout + 15; } - $response = $this->call(self::METHOD_POST, $route, [ 'x-opr-runtime-id' => $runtimeId, 'content-type' => 'multipart/form-data', 'accept' => 'multipart/form-data' ], $params, true, $requestTimeout); + $endpoint = $this->selectEndpoint($projectId, $deploymentId); + $response = $this->call($endpoint, self::METHOD_POST, $route, [ 'x-opr-runtime-id' => $runtimeId, 'content-type' => 'multipart/form-data', 'accept' => 'multipart/form-data' ], $params, true, $requestTimeout); $status = $response['headers']['status-code']; if ($status >= 400) { @@ -235,7 +241,11 @@ class Executor throw new \Exception($message, $status); } - $response['body']['headers'] = \json_decode($response['body']['headers'] ?? '{}', true); + $headers = $response['body']['headers'] ?? []; + if (is_string($headers)) { + $headers = \json_decode($headers, true); + } + $response['body']['headers'] = $headers; $response['body']['statusCode'] = \intval($response['body']['statusCode'] ?? 500); $response['body']['duration'] = \floatval($response['body']['duration'] ?? 0); $response['body']['startTime'] = \floatval($response['body']['startTime'] ?? \microtime(true)); @@ -256,10 +266,10 @@ class Executor * @return array|string * @throws Exception */ - public function call(string $method, string $path = '', array $headers = [], array $params = [], bool $decode = true, int $timeout = 15, callable $callback = null) + private function call(string $endpoint, string $method, string $path = '', array $headers = [], array $params = [], bool $decode = true, int $timeout = 15, callable $callback = null) { $headers = array_merge($this->headers, $headers); - $ch = curl_init($this->endpoint . $path . (($method == self::METHOD_GET && !empty($params)) ? '?' . http_build_query($params) : '')); + $ch = curl_init($endpoint . $path . (($method == self::METHOD_GET && !empty($params)) ? '?' . http_build_query($params) : '')); $responseHeaders = []; $responseStatus = -1; $responseType = ''; @@ -422,4 +432,9 @@ class Executor return $output; } + + private function selectEndpoint(string $projectId, string $deploymentId): string + { + return call_user_func($this->endpointSelector, $projectId, $deploymentId); + } }