diff --git a/app/worker.php b/app/worker.php index 2d59259284..4741afe7ea 100644 --- a/app/worker.php +++ b/app/worker.php @@ -58,7 +58,7 @@ Server::setResource('project', function (Message $message, Database $dbForConsol $payload = $message->getPayload() ?? []; $project = new Document($payload['project'] ?? []); - if ($project->getId() === 'console' || $project->isEmpty() || ! empty($project->getInternalId())) { + if ($project->getId() === 'console') { return $project; } @@ -272,6 +272,11 @@ Server::setResource('deviceForCache', function (Document $project) { return getDevice(APP_STORAGE_CACHE . '/app-' . $project->getId()); }, ['project']); +Server::setResource( + 'isResourceBlocked', + fn () => fn (Document $project, string $resourceType, ?string $resourceId) => false +); + $pools = $register->get('pools'); $platform = new Appwrite(); diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php index 7e548f57be..3dc3e65eee 100644 --- a/src/Appwrite/Platform/Workers/Functions.php +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -41,29 +41,33 @@ class Functions extends Action $this ->desc('Functions worker') ->groups(['functions']) + ->inject('project') ->inject('message') ->inject('dbForProject') ->inject('queueForFunctions') ->inject('queueForEvents') ->inject('queueForUsage') ->inject('log') - ->callback(fn (Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log) => $this->action($message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage, $log)); + ->inject('isResourceBlocked') + ->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage, $log, $isResourceBlocked)); } /** + * @param Document $project * @param Message $message * @param Database $dbForProject * @param Func $queueForFunctions * @param Event $queueForEvents * @param Usage $queueForUsage * @param Log $log + * @param callable $isResourceBlocked * @return void * @throws Authorization * @throws Structure * @throws \Utopia\Database\Exception * @throws Conflict */ - public function action(Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log): void + public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, callable $isResourceBlocked): void { $payload = $message->getPayload() ?? []; @@ -73,55 +77,21 @@ class Functions extends Action $type = $payload['type'] ?? ''; - // Short-term solution to offhand write operation from API contianer + // Short-term solution to offhand write operation from API container if ($type === Func::TYPE_ASYNC_WRITE) { $execution = new Document($payload['execution'] ?? []); - $execution = $dbForProject->createDocument('executions', $execution); + $dbForProject->createDocument('executions', $execution); return; } - $events = $payload['events'] ?? []; - $data = $payload['body'] ?? ''; $eventData = $payload['payload'] ?? ''; - $project = new Document($payload['project'] ?? []); - $function = new Document($payload['function'] ?? []); - $functionId = $payload['functionId'] ?? ''; $user = new Document($payload['user'] ?? []); - $userId = $payload['userId'] ?? ''; - $method = $payload['method'] ?? 'POST'; - $headers = $payload['headers'] ?? []; - $path = $payload['path'] ?? '/'; - $jwt = $payload['jwt'] ?? ''; - - if ($user->isEmpty() && !empty($userId)) { - $user = $dbForProject->getDocument('users', $userId); - } - - if (empty($jwt) && !$user->isEmpty()) { - $jwtExpiry = $function->getAttribute('timeout', 900); - $jwtObj = new JWT(System::getEnv('_APP_OPENSSL_KEY_V1'), 'HS256', $jwtExpiry, 0); - $jwt = $jwtObj->encode([ - 'userId' => $user->getId(), - ]); - } - - if ($project->getId() === 'console') { - return; - } - - if ($function->isEmpty() && !empty($functionId)) { - $function = $dbForProject->getDocument('functions', $functionId); - } - - $log->addTag('functionId', $function->getId()); - $log->addTag('projectId', $project->getId()); - $log->addTag('type', $type); + $events = $payload['events'] ?? []; if (!empty($events)) { $limit = 30; $sum = 30; $offset = 0; - /** @var Document[] $functions */ while ($sum >= $limit) { $functions = $dbForProject->find('functions', [ Query::limit($limit), @@ -138,6 +108,12 @@ class Functions extends Action if (!array_intersect($events, $function->getAttribute('events', []))) { continue; } + + if ($isResourceBlocked($project, 'functions', $function->getId())) { + Console::log('Function ' . $function->getId() . ' is blocked, skipping execution.'); + continue; + } + Console::success('Iterating function: ' . $function->getAttribute('name')); $this->execute( @@ -168,6 +144,50 @@ class Functions extends Action return; } + $data = $payload['body'] ?? ''; + $function = new Document($payload['function'] ?? []); + $functionId = $payload['functionId'] ?? ''; + $userId = $payload['userId'] ?? ''; + $method = $payload['method'] ?? 'POST'; + $headers = $payload['headers'] ?? []; + $path = $payload['path'] ?? '/'; + $jwt = $payload['jwt'] ?? ''; + + if ($user->isEmpty() && !empty($userId)) { + $user = $dbForProject->getDocument('users', $userId); + } + + if (empty($jwt) && !$user->isEmpty()) { + $jwtExpiry = $function->getAttribute('timeout', 900); + $jwtObj = new JWT(System::getEnv('_APP_OPENSSL_KEY_V1'), 'HS256', $jwtExpiry, 0); + $jwt = $jwtObj->encode([ + 'userId' => $user->getId(), + ]); + } + + if ($project->getId() === 'console') { + return; + } + + if ($function->isEmpty() && !empty($functionId)) { + $function = $dbForProject->getDocument('functions', $functionId); + } + + // $function still empty, we can't execute this + if ($function->isEmpty()) { + Console::warning('Got empty function without functionId.'); + return; + } + + if ($isResourceBlocked($project, 'functions', $function->getId())) { + Console::log('Function ' . $function->getId() . ' is blocked, skipping execution.'); + return; + } + + $log->addTag('functionId', $function->getId()); + $log->addTag('projectId', $project->getId()); + $log->addTag('type', $type); + /** * Handle Schedule and HTTP execution. */