Merge pull request #9420 from appwrite/PLA-1968

feat: refactor executor setup
This commit is contained in:
Christy Jacob 2025-04-07 12:49:25 +04:00 committed by GitHub
commit 31a5a175b3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 105 additions and 55 deletions

View file

@ -9,6 +9,7 @@ use Appwrite\Event\StatsResources;
use Appwrite\Event\StatsUsage;
use Appwrite\Platform\Appwrite;
use Appwrite\Runtimes\Runtimes;
use Executor\Executor;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\CLI\CLI;
@ -255,6 +256,8 @@ CLI::setResource('logError', function (Registry $register) {
};
}, ['register']);
CLI::setResource('executor', fn () => new Executor(fn (string $projectId, string $deploymentId) => System::getEnv('_APP_EXECUTOR_HOST')));
$platform = new Appwrite();
$platform->init(Service::TYPE_TASK);

View file

@ -869,7 +869,8 @@ App::put('/v1/functions/:functionId')
->inject('queueForBuilds')
->inject('dbForPlatform')
->inject('gitHub')
->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, ?string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $specification, Request $request, Response $response, Database $dbForProject, Document $project, Event $queueForEvents, Build $queueForBuilds, Database $dbForPlatform, GitHub $github) use ($redeployVcs) {
->inject('executor')
->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, ?string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $specification, Request $request, Response $response, Database $dbForProject, Document $project, Event $queueForEvents, Build $queueForBuilds, Database $dbForPlatform, GitHub $github, Executor $executor) use ($redeployVcs) {
// TODO: If only branch changes, re-deploy
$function = $dbForProject->getDocument('functions', $functionId);
@ -972,7 +973,6 @@ App::put('/v1/functions/:functionId')
// Enforce Cold Start if spec limits change.
if ($function->getAttribute('specification') !== $specification && !empty($function->getAttribute('deployment'))) {
$executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
try {
$executor->deleteRuntime($project->getId(), $function->getAttribute('deployment'));
} catch (\Throwable $th) {
@ -1779,7 +1779,8 @@ App::patch('/v1/functions/:functionId/deployments/:deploymentId/build')
->inject('dbForProject')
->inject('project')
->inject('queueForEvents')
->action(function (string $functionId, string $deploymentId, Response $response, Database $dbForProject, Document $project, Event $queueForEvents) {
->inject('executor')
->action(function (string $functionId, string $deploymentId, Response $response, Database $dbForProject, Document $project, Event $queueForEvents, Executor $executor) {
$function = $dbForProject->getDocument('functions', $functionId);
if ($function->isEmpty()) {
@ -1834,7 +1835,6 @@ App::patch('/v1/functions/:functionId/deployments/:deploymentId/build')
$dbForProject->purgeCachedDocument('deployments', $deployment->getId());
try {
$executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST'));
$executor->deleteRuntime($project->getId(), $deploymentId . "-build");
} catch (\Throwable $th) {
// Don't throw if the deployment doesn't exist
@ -1886,8 +1886,9 @@ App::post('/v1/functions/:functionId/executions')
->inject('queueForEvents')
->inject('queueForStatsUsage')
->inject('queueForFunctions')
->inject('executor')
->inject('geodb')
->action(function (string $functionId, string $body, mixed $async, string $path, string $method, mixed $headers, ?string $scheduledAt, Response $response, Request $request, Document $project, Database $dbForProject, Database $dbForPlatform, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb) {
->action(function (string $functionId, string $body, mixed $async, string $path, string $method, mixed $headers, ?string $scheduledAt, Response $response, Request $request, Document $project, Database $dbForProject, Database $dbForPlatform, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Executor $executor, Reader $geodb) {
$async = \strval($async) === 'true' || \strval($async) === '1';
if (!$async && !is_null($scheduledAt)) {
@ -2160,7 +2161,6 @@ App::post('/v1/functions/:functionId/executions')
]);
/** Execute function */
$executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST'));
try {
$version = $function->getAttribute('version', 'v2');
$command = $runtime['startCommand'];

View file

@ -50,7 +50,7 @@ Config::setParam('domainVerification', false);
Config::setParam('cookieDomain', 'localhost');
Config::setParam('cookieSamesite', Response::COOKIE_SAMESITE_NONE);
function router(App $utopia, Database $dbForPlatform, callable $getProjectDB, SwooleRequest $swooleRequest, Request $request, Response $response, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname)
function router(App $utopia, Database $dbForPlatform, callable $getProjectDB, SwooleRequest $swooleRequest, Request $request, Response $response, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Executor $executor, Reader $geodb, callable $isResourceBlocked, string $previewHostname)
{
$utopia->getRoute()?->label('error', __DIR__ . '/../views/general/error.phtml');
@ -339,7 +339,6 @@ function router(App $utopia, Database $dbForPlatform, callable $getProjectDB, Sw
]);
/** Execute function */
$executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST'));
try {
$version = $function->getAttribute('version', 'v2');
$command = $runtime['startCommand'];
@ -503,9 +502,10 @@ App::init()
->inject('queueForEvents')
->inject('queueForCertificates')
->inject('queueForFunctions')
->inject('executor')
->inject('isResourceBlocked')
->inject('previewHostname')
->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Document $console, Document $project, Database $dbForPlatform, callable $getProjectDB, Locale $locale, array $localeCodes, array $clients, Reader $geodb, StatsUsage $queueForStatsUsage, Event $queueForEvents, Certificate $queueForCertificates, Func $queueForFunctions, callable $isResourceBlocked, string $previewHostname) {
->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Document $console, Document $project, Database $dbForPlatform, callable $getProjectDB, Locale $locale, array $localeCodes, array $clients, Reader $geodb, StatsUsage $queueForStatsUsage, Event $queueForEvents, Certificate $queueForCertificates, Func $queueForFunctions, Executor $executor, callable $isResourceBlocked, string $previewHostname) {
/*
* Appwrite Router
*/
@ -513,7 +513,7 @@ App::init()
$mainDomain = System::getEnv('_APP_DOMAIN', '');
// Only run Router when external domain
if ($host !== $mainDomain || !empty($previewHostname)) {
if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname)) {
if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $executor, $geodb, $isResourceBlocked, $previewHostname)) {
return;
}
}
@ -742,11 +742,12 @@ App::options()
->inject('queueForEvents')
->inject('queueForStatsUsage')
->inject('queueForFunctions')
->inject('executor')
->inject('geodb')
->inject('isResourceBlocked')
->inject('previewHostname')
->inject('project')
->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname, Document $project) {
->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Executor $executor, Reader $geodb, callable $isResourceBlocked, string $previewHostname, Document $project) {
/*
* Appwrite Router
*/
@ -754,7 +755,7 @@ App::options()
$mainDomain = System::getEnv('_APP_DOMAIN', '');
// Only run Router when external domain
if ($host !== $mainDomain || !empty($previewHostname)) {
if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname)) {
if (router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $executor, $geodb, $isResourceBlocked, $previewHostname)) {
return;
}
}
@ -1062,10 +1063,11 @@ App::get('/robots.txt')
->inject('queueForEvents')
->inject('queueForStatsUsage')
->inject('queueForFunctions')
->inject('executor')
->inject('geodb')
->inject('isResourceBlocked')
->inject('previewHostname')
->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) {
->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Executor $executor, Reader $geodb, callable $isResourceBlocked, string $previewHostname) {
$host = $request->getHostname() ?? '';
$mainDomain = System::getEnv('_APP_DOMAIN', '');
@ -1073,7 +1075,7 @@ App::get('/robots.txt')
$template = new View(__DIR__ . '/../views/general/robots.phtml');
$response->text($template->render(false));
} else {
router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname);
router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $executor, $geodb, $isResourceBlocked, $previewHostname);
}
});
@ -1090,10 +1092,11 @@ App::get('/humans.txt')
->inject('queueForEvents')
->inject('queueForStatsUsage')
->inject('queueForFunctions')
->inject('executor')
->inject('geodb')
->inject('isResourceBlocked')
->inject('previewHostname')
->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Reader $geodb, callable $isResourceBlocked, string $previewHostname) {
->action(function (App $utopia, SwooleRequest $swooleRequest, Request $request, Response $response, Database $dbForPlatform, callable $getProjectDB, Event $queueForEvents, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Executor $executor, Reader $geodb, callable $isResourceBlocked, string $previewHostname) {
$host = $request->getHostname() ?? '';
$mainDomain = System::getEnv('_APP_DOMAIN', '');
@ -1101,7 +1104,7 @@ App::get('/humans.txt')
$template = new View(__DIR__ . '/../views/general/humans.phtml');
$response->text($template->render(false));
} else {
router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $geodb, $isResourceBlocked, $previewHostname);
router($utopia, $dbForPlatform, $getProjectDB, $swooleRequest, $request, $response, $queueForEvents, $queueForStatsUsage, $queueForFunctions, $executor, $geodb, $isResourceBlocked, $previewHostname);
}
});

View file

@ -21,6 +21,7 @@ use Appwrite\Extend\Exception;
use Appwrite\GraphQL\Schema;
use Appwrite\Network\Validator\Origin;
use Appwrite\Utopia\Request;
use Executor\Executor;
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
use Utopia\App;
use Utopia\Cache\Adapter\Sharding;
@ -38,6 +39,7 @@ use Utopia\Logger\Log;
use Utopia\Pools\Group;
use Utopia\Queue\Publisher;
use Utopia\Storage\Device;
use Utopia\Storage\Device\AWS;
use Utopia\Storage\Device\Backblaze;
use Utopia\Storage\Device\DOSpaces;
use Utopia\Storage\Device\Linode;
@ -540,7 +542,12 @@ function getDevice(string $root, string $connection = ''): Device
switch ($device) {
case Storage::DEVICE_S3:
return new S3($root, $accessKey, $accessSecret, $bucket, $region, $acl, $url);
if (!empty($url)) {
return new S3($root, $accessKey, $accessSecret, $url, $region, $acl);
} else {
return new AWS($root, $accessKey, $accessSecret, $bucket, $region, $acl);
}
// no break
case STORAGE::DEVICE_DO_SPACES:
$device = new DOSpaces($root, $accessKey, $accessSecret, $bucket, $region, $acl);
$device->setHttpVersion(S3::HTTP_VERSION_1_1);
@ -567,7 +574,12 @@ function getDevice(string $root, string $connection = ''): Device
$s3Bucket = System::getEnv('_APP_STORAGE_S3_BUCKET', '');
$s3Acl = 'private';
$s3EndpointUrl = System::getEnv('_APP_STORAGE_S3_ENDPOINT', '');
return new S3($root, $s3AccessKey, $s3SecretKey, $s3Bucket, $s3Region, $s3Acl, $s3EndpointUrl);
if (!empty($s3EndpointUrl)) {
return new S3($root, $s3AccessKey, $s3SecretKey, $s3EndpointUrl, $s3Region, $s3Acl);
} else {
return new AWS($root, $s3AccessKey, $s3SecretKey, $s3Bucket, $s3Region, $s3Acl);
}
// no break
case Storage::DEVICE_DO_SPACES:
$doSpacesAccessKey = System::getEnv('_APP_STORAGE_DO_SPACES_ACCESS_KEY', '');
$doSpacesSecretKey = System::getEnv('_APP_STORAGE_DO_SPACES_SECRET', '');
@ -822,3 +834,5 @@ App::setResource('apiKey', function (Request $request, Document $project): ?Key
return Key::decode($project, $key);
}, ['request', 'project']);
App::setResource('executor', fn () => new Executor(fn (string $projectId, string $deploymentId) => System::getEnv('_APP_EXECUTOR_HOST')));

View file

@ -18,6 +18,7 @@ use Appwrite\Event\StatsUsage;
use Appwrite\Event\StatsUsageDump;
use Appwrite\Event\Webhook;
use Appwrite\Platform\Appwrite;
use Executor\Executor;
use Swoole\Runtime;
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
use Utopia\Cache\Adapter\Sharding;
@ -413,6 +414,8 @@ Server::setResource('logError', function (Registry $register, Document $project)
};
}, ['register', 'project']);
Server::setResource('executor', fn () => new Executor(fn (string $projectId, string $deploymentId) => System::getEnv('_APP_EXECUTOR_HOST')));
$pools = $register->get('pools');
$platform = new Appwrite();
$args = $platform->getEnv('argv');

View file

@ -198,6 +198,8 @@ services:
- _APP_DATABASE_SHARED_TABLES_V1
- _APP_DATABASE_SHARED_NAMESPACE
- _APP_FUNCTIONS_CREATION_ABUSE_LIMIT
extra_hosts:
- "host.docker.internal:host-gateway"
appwrite-console:
<<: *x-logging
@ -489,6 +491,8 @@ services:
- _APP_STORAGE_WASABI_REGION
- _APP_STORAGE_WASABI_BUCKET
- _APP_DATABASE_SHARED_TABLES
extra_hosts:
- "host.docker.internal:host-gateway"
appwrite-worker-certificates:
entrypoint: worker-certificates
@ -1131,4 +1135,4 @@ volumes:
appwrite-certificates:
appwrite-functions:
appwrite-builds:
appwrite-config:
appwrite-config:

View file

@ -59,8 +59,9 @@ class Builds extends Action
->inject('deviceForFunctions')
->inject('isResourceBlocked')
->inject('log')
->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log) =>
$this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $usage, $cache, $dbForProject, $deviceForFunctions, $isResourceBlocked, $log));
->inject('executor')
->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log, Executor $executor) =>
$this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $usage, $cache, $dbForProject, $deviceForFunctions, $isResourceBlocked, $log, $executor));
}
/**
@ -76,10 +77,11 @@ class Builds extends Action
* @param Database $dbForProject
* @param Device $deviceForFunctions
* @param Log $log
* @param Executor $executor
* @return void
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log): void
public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, StatsUsage $queueForStatsUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, callable $isResourceBlocked, Log $log, Executor $executor): void
{
$payload = $message->getPayload() ?? [];
@ -100,7 +102,7 @@ class Builds extends Action
case BUILD_TYPE_RETRY:
Console::info('Creating build for deployment: ' . $deployment->getId());
$github = new GitHub($cache);
$this->buildDeployment($deviceForFunctions, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $isResourceBlocked, $log);
$this->buildDeployment($deviceForFunctions, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $dbForPlatform, $dbForProject, $github, $project, $resource, $deployment, $template, $isResourceBlocked, $log, $executor);
break;
default:
@ -123,14 +125,13 @@ class Builds extends Action
* @param Document $deployment
* @param Document $template
* @param Log $log
* @param Executor $executor
* @return void
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function buildDeployment(Device $deviceForFunctions, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, callable $isResourceBlocked, Log $log): void
protected function buildDeployment(Device $deviceForFunctions, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Database $dbForPlatform, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, callable $isResourceBlocked, Log $log, Executor $executor): void
{
$executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST'));
$functionId = $function->getId();
$log->addTag('functionId', $function->getId());

View file

@ -55,12 +55,13 @@ class Deletes extends Action
->inject('deviceForBuilds')
->inject('deviceForCache')
->inject('certificates')
->inject('executor')
->inject('executionRetention')
->inject('auditRetention')
->inject('log')
->callback(
fn ($message, Document $project, Database $dbForPlatform, callable $getProjectDB, callable $getLogsDB, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, string $executionRetention, string $auditRetention, Log $log) =>
$this->action($message, $project, $dbForPlatform, $getProjectDB, $getLogsDB, $deviceForFiles, $deviceForFunctions, $deviceForBuilds, $deviceForCache, $certificates, $executionRetention, $auditRetention, $log)
fn ($message, Document $project, Database $dbForPlatform, callable $getProjectDB, callable $getLogsDB, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, Executor $executor, string $executionRetention, string $auditRetention, Log $log) =>
$this->action($message, $project, $dbForPlatform, $getProjectDB, $getLogsDB, $deviceForFiles, $deviceForFunctions, $deviceForBuilds, $deviceForCache, $certificates, $executor, $executionRetention, $auditRetention, $log)
);
}
@ -68,7 +69,7 @@ class Deletes extends Action
* @throws Exception
* @throws Throwable
*/
public function action(Message $message, Document $project, Database $dbForPlatform, callable $getProjectDB, callable $getLogsDB, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, string $executionRetention, string $auditRetention, Log $log): void
public function action(Message $message, Document $project, Database $dbForPlatform, callable $getProjectDB, callable $getLogsDB, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, Executor $executor, string $executionRetention, string $auditRetention, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -93,10 +94,10 @@ class Deletes extends Action
$this->deleteProject($dbForPlatform, $getProjectDB, $deviceForFiles, $deviceForFunctions, $deviceForBuilds, $deviceForCache, $certificates, $document);
break;
case DELETE_TYPE_FUNCTIONS:
$this->deleteFunction($dbForPlatform, $getProjectDB, $deviceForFunctions, $deviceForBuilds, $certificates, $document, $project);
$this->deleteFunction($dbForPlatform, $getProjectDB, $deviceForFunctions, $deviceForBuilds, $certificates, $document, $project, $executor);
break;
case DELETE_TYPE_DEPLOYMENTS:
$this->deleteDeployment($getProjectDB, $deviceForFunctions, $deviceForBuilds, $document, $project);
$this->deleteDeployment($getProjectDB, $deviceForFunctions, $deviceForBuilds, $document, $project, $executor);
break;
case DELETE_TYPE_USERS:
$this->deleteUser($getProjectDB, $document, $project);
@ -827,10 +828,11 @@ class Deletes extends Action
* @param Device $deviceForBuilds
* @param Document $document function document
* @param Document $project
* @param Executor $executor
* @return void
* @throws Exception
*/
private function deleteFunction(Database $dbForPlatform, callable $getProjectDB, Device $deviceForFunctions, Device $deviceForBuilds, CertificatesAdapter $certificates, Document $document, Document $project): void
private function deleteFunction(Database $dbForPlatform, callable $getProjectDB, Device $deviceForFunctions, Device $deviceForBuilds, CertificatesAdapter $certificates, Document $document, Document $project, Executor $executor): void
{
$projectId = $project->getId();
$dbForProject = $getProjectDB($project);
@ -922,7 +924,7 @@ class Deletes extends Action
* Request executor to delete all deployment containers
*/
Console::info("Requesting executor to delete all deployment containers for function " . $functionId);
$this->deleteRuntimes($getProjectDB, $document, $project);
$this->deleteRuntimes($getProjectDB, $document, $project, $executor);
}
/**
@ -993,10 +995,11 @@ class Deletes extends Action
* @param Device $deviceForBuilds
* @param Document $document
* @param Document $project
* @param Executor $executor
* @return void
* @throws Exception
*/
private function deleteDeployment(callable $getProjectDB, Device $deviceForFunctions, Device $deviceForBuilds, Document $document, Document $project): void
private function deleteDeployment(callable $getProjectDB, Device $deviceForFunctions, Device $deviceForBuilds, Document $document, Document $project, Executor $executor): void
{
$projectId = $project->getId();
$dbForProject = $getProjectDB($project);
@ -1024,7 +1027,7 @@ class Deletes extends Action
* Request executor to delete all deployment containers
*/
Console::info("Requesting executor to delete deployment container for deployment " . $deploymentId);
$this->deleteRuntimes($getProjectDB, $document, $project);
$this->deleteRuntimes($getProjectDB, $document, $project, $executor);
}
/**
@ -1180,13 +1183,12 @@ class Deletes extends Action
* @param callable $getProjectDB
* @param ?Document $function
* @param Document $project
* @param Executor $executor
* @return void
* @throws Exception
*/
private function deleteRuntimes(callable $getProjectDB, ?Document $function, Document $project): void
private function deleteRuntimes(callable $getProjectDB, ?Document $function, Document $project, Executor $executor): void
{
$executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST'));
$deleteByFunction = function (Document $function) use ($getProjectDB, $project, $executor) {
$this->listByGroup(
'deployments',

View file

@ -51,11 +51,12 @@ class Functions extends Action
->inject('queueForEvents')
->inject('queueForStatsUsage')
->inject('log')
->inject('executor')
->inject('isResourceBlocked')
->callback(fn (Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $log, $isResourceBlocked));
->callback(fn (Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, Executor $executor, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForEvents, $queueForStatsUsage, $log, $executor, $isResourceBlocked));
}
public function action(Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, callable $isResourceBlocked): void
public function action(Document $project, Message $message, Database $dbForProject, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, Event $queueForEvents, StatsUsage $queueForStatsUsage, Log $log, Executor $executor, callable $isResourceBlocked): void
{
$payload = $message->getPayload() ?? [];
@ -146,6 +147,7 @@ class Functions extends Action
queueForEvents: $queueForEvents,
project: $project,
function: $function,
executor: $executor,
trigger: 'event',
path: '/',
method: 'POST',
@ -188,6 +190,7 @@ class Functions extends Action
queueForEvents: $queueForEvents,
project: $project,
function: $function,
executor: $executor,
trigger: 'http',
path: $path,
method: $method,
@ -212,6 +215,7 @@ class Functions extends Action
queueForEvents: $queueForEvents,
project: $project,
function: $function,
executor: $executor,
trigger: 'schedule',
path: $path,
method: $method,
@ -298,6 +302,7 @@ class Functions extends Action
* @param Event $queueForEvents
* @param Document $project
* @param Document $function
* @param Executor $executor
* @param string $trigger
* @param string $path
* @param string $method
@ -324,6 +329,7 @@ class Functions extends Action
Event $queueForEvents,
Document $project,
Document $function,
Executor $executor,
string $trigger,
string $path,
string $method,
@ -514,7 +520,6 @@ class Functions extends Action
try {
$version = $function->getAttribute('version', 'v2');
$command = $runtime['startCommand'];
$executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST'));
$command = $version === 'v2' ? '' : 'cp /tmp/code.tar.gz /mnt/code/code.tar.gz && nohup helpers/start.sh "' . $command . '"';
$executionResponse = $executor->createExecution(
projectId: $project->getId(),

View file

@ -21,17 +21,19 @@ class Executor
private bool $selfSigned = false;
private string $endpoint;
/**
* @var callable(string, string): string $endpoint
*/
private $endpointSelector;
protected array $headers;
public function __construct(string $endpoint)
/**
* @param callable(string, string): string $endpointSelector
*/
public function __construct(callable $endpointSelector)
{
if (!filter_var($endpoint, FILTER_VALIDATE_URL)) {
throw new Exception('Unsupported endpoint');
}
$this->endpoint = $endpoint;
$this->endpointSelector = $endpointSelector;
$this->headers = [
'content-type' => 'application/json',
'authorization' => 'Bearer ' . System::getEnv('_APP_EXECUTOR_SECRET', ''),
@ -92,7 +94,8 @@ class Executor
'timeout' => $timeout,
];
$response = $this->call(self::METHOD_POST, $route, [ 'x-opr-runtime-id' => $runtimeId ], $params, true, $timeout);
$endpoint = $this->selectEndpoint($projectId, $deploymentId);
$response = $this->call($endpoint, self::METHOD_POST, $route, [ 'x-opr-runtime-id' => $runtimeId ], $params, true, $timeout);
$status = $response['headers']['status-code'];
if ($status >= 400) {
@ -123,7 +126,8 @@ class Executor
'timeout' => $timeout
];
$this->call(self::METHOD_GET, $route, [ 'x-opr-runtime-id' => $runtimeId ], $params, true, $timeout, $callback);
$endpoint = $this->selectEndpoint($projectId, $deploymentId);
$this->call($endpoint, self::METHOD_GET, $route, [ 'x-opr-runtime-id' => $runtimeId ], $params, true, $timeout, $callback);
}
/**
@ -139,7 +143,8 @@ class Executor
$runtimeId = "$projectId-$deploymentId";
$route = "/runtimes/$runtimeId";
$response = $this->call(self::METHOD_DELETE, $route, [
$endpoint = $this->selectEndpoint($projectId, $deploymentId);
$response = $this->call($endpoint, self::METHOD_DELETE, $route, [
'x-opr-addressing-method' => 'broadcast'
], [], true, 30);
@ -227,7 +232,8 @@ class Executor
$requestTimeout = $timeout + 15;
}
$response = $this->call(self::METHOD_POST, $route, [ 'x-opr-runtime-id' => $runtimeId, 'content-type' => 'multipart/form-data', 'accept' => 'multipart/form-data' ], $params, true, $requestTimeout);
$endpoint = $this->selectEndpoint($projectId, $deploymentId);
$response = $this->call($endpoint, self::METHOD_POST, $route, [ 'x-opr-runtime-id' => $runtimeId, 'content-type' => 'multipart/form-data', 'accept' => 'multipart/form-data' ], $params, true, $requestTimeout);
$status = $response['headers']['status-code'];
if ($status >= 400) {
@ -235,7 +241,11 @@ class Executor
throw new \Exception($message, $status);
}
$response['body']['headers'] = \json_decode($response['body']['headers'] ?? '{}', true);
$headers = $response['body']['headers'] ?? [];
if (is_string($headers)) {
$headers = \json_decode($headers, true);
}
$response['body']['headers'] = $headers;
$response['body']['statusCode'] = \intval($response['body']['statusCode'] ?? 500);
$response['body']['duration'] = \floatval($response['body']['duration'] ?? 0);
$response['body']['startTime'] = \floatval($response['body']['startTime'] ?? \microtime(true));
@ -256,10 +266,10 @@ class Executor
* @return array|string
* @throws Exception
*/
public function call(string $method, string $path = '', array $headers = [], array $params = [], bool $decode = true, int $timeout = 15, callable $callback = null)
private function call(string $endpoint, string $method, string $path = '', array $headers = [], array $params = [], bool $decode = true, int $timeout = 15, callable $callback = null)
{
$headers = array_merge($this->headers, $headers);
$ch = curl_init($this->endpoint . $path . (($method == self::METHOD_GET && !empty($params)) ? '?' . http_build_query($params) : ''));
$ch = curl_init($endpoint . $path . (($method == self::METHOD_GET && !empty($params)) ? '?' . http_build_query($params) : ''));
$responseHeaders = [];
$responseStatus = -1;
$responseType = '';
@ -422,4 +432,9 @@ class Executor
return $output;
}
private function selectEndpoint(string $projectId, string $deploymentId): string
{
return call_user_func($this->endpointSelector, $projectId, $deploymentId);
}
}