Merge pull request #8855 from appwrite/PLA-1776

fix(blocks): check if resource is blocked inside functions worker
This commit is contained in:
Christy Jacob 2024-10-25 15:38:41 +04:00 committed by GitHub
commit 52936d7eb5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 65 additions and 40 deletions

View file

@ -58,7 +58,7 @@ Server::setResource('project', function (Message $message, Database $dbForConsol
$payload = $message->getPayload() ?? [];
$project = new Document($payload['project'] ?? []);
if ($project->getId() === 'console' || $project->isEmpty() || ! empty($project->getInternalId())) {
if ($project->getId() === 'console') {
return $project;
}
@ -272,6 +272,11 @@ Server::setResource('deviceForCache', function (Document $project) {
return getDevice(APP_STORAGE_CACHE . '/app-' . $project->getId());
}, ['project']);
Server::setResource(
'isResourceBlocked',
fn () => fn (Document $project, string $resourceType, ?string $resourceId) => false
);
$pools = $register->get('pools');
$platform = new Appwrite();

View file

@ -41,29 +41,33 @@ class Functions extends Action
$this
->desc('Functions worker')
->groups(['functions'])
->inject('project')
->inject('message')
->inject('dbForProject')
->inject('queueForFunctions')
->inject('queueForEvents')
->inject('queueForUsage')
->inject('log')
->callback(fn (Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log) => $this->action($message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage, $log));
->inject('isResourceBlocked')
->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage, $log, $isResourceBlocked));
}
/**
* @param Document $project
* @param Message $message
* @param Database $dbForProject
* @param Func $queueForFunctions
* @param Event $queueForEvents
* @param Usage $queueForUsage
* @param Log $log
* @param callable $isResourceBlocked
* @return void
* @throws Authorization
* @throws Structure
* @throws \Utopia\Database\Exception
* @throws Conflict
*/
public function action(Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log): void
public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, callable $isResourceBlocked): void
{
$payload = $message->getPayload() ?? [];
@ -73,55 +77,21 @@ class Functions extends Action
$type = $payload['type'] ?? '';
// Short-term solution to offhand write operation from API contianer
// Short-term solution to offhand write operation from API container
if ($type === Func::TYPE_ASYNC_WRITE) {
$execution = new Document($payload['execution'] ?? []);
$execution = $dbForProject->createDocument('executions', $execution);
$dbForProject->createDocument('executions', $execution);
return;
}
$events = $payload['events'] ?? [];
$data = $payload['body'] ?? '';
$eventData = $payload['payload'] ?? '';
$project = new Document($payload['project'] ?? []);
$function = new Document($payload['function'] ?? []);
$functionId = $payload['functionId'] ?? '';
$user = new Document($payload['user'] ?? []);
$userId = $payload['userId'] ?? '';
$method = $payload['method'] ?? 'POST';
$headers = $payload['headers'] ?? [];
$path = $payload['path'] ?? '/';
$jwt = $payload['jwt'] ?? '';
if ($user->isEmpty() && !empty($userId)) {
$user = $dbForProject->getDocument('users', $userId);
}
if (empty($jwt) && !$user->isEmpty()) {
$jwtExpiry = $function->getAttribute('timeout', 900);
$jwtObj = new JWT(System::getEnv('_APP_OPENSSL_KEY_V1'), 'HS256', $jwtExpiry, 0);
$jwt = $jwtObj->encode([
'userId' => $user->getId(),
]);
}
if ($project->getId() === 'console') {
return;
}
if ($function->isEmpty() && !empty($functionId)) {
$function = $dbForProject->getDocument('functions', $functionId);
}
$log->addTag('functionId', $function->getId());
$log->addTag('projectId', $project->getId());
$log->addTag('type', $type);
$events = $payload['events'] ?? [];
if (!empty($events)) {
$limit = 30;
$sum = 30;
$offset = 0;
/** @var Document[] $functions */
while ($sum >= $limit) {
$functions = $dbForProject->find('functions', [
Query::limit($limit),
@ -138,6 +108,12 @@ class Functions extends Action
if (!array_intersect($events, $function->getAttribute('events', []))) {
continue;
}
if ($isResourceBlocked($project, 'functions', $function->getId())) {
Console::log('Function ' . $function->getId() . ' is blocked, skipping execution.');
continue;
}
Console::success('Iterating function: ' . $function->getAttribute('name'));
$this->execute(
@ -168,6 +144,50 @@ class Functions extends Action
return;
}
$data = $payload['body'] ?? '';
$function = new Document($payload['function'] ?? []);
$functionId = $payload['functionId'] ?? '';
$userId = $payload['userId'] ?? '';
$method = $payload['method'] ?? 'POST';
$headers = $payload['headers'] ?? [];
$path = $payload['path'] ?? '/';
$jwt = $payload['jwt'] ?? '';
if ($user->isEmpty() && !empty($userId)) {
$user = $dbForProject->getDocument('users', $userId);
}
if (empty($jwt) && !$user->isEmpty()) {
$jwtExpiry = $function->getAttribute('timeout', 900);
$jwtObj = new JWT(System::getEnv('_APP_OPENSSL_KEY_V1'), 'HS256', $jwtExpiry, 0);
$jwt = $jwtObj->encode([
'userId' => $user->getId(),
]);
}
if ($project->getId() === 'console') {
return;
}
if ($function->isEmpty() && !empty($functionId)) {
$function = $dbForProject->getDocument('functions', $functionId);
}
// $function still empty, we can't execute this
if ($function->isEmpty()) {
Console::warning('Got empty function without functionId.');
return;
}
if ($isResourceBlocked($project, 'functions', $function->getId())) {
Console::log('Function ' . $function->getId() . ' is blocked, skipping execution.');
return;
}
$log->addTag('functionId', $function->getId());
$log->addTag('projectId', $project->getId());
$log->addTag('type', $type);
/**
* Handle Schedule and HTTP execution.
*/