From 2f711c84ed177090db463f69bd63a9a451454162 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 21 Feb 2024 00:40:55 +1300 Subject: [PATCH 1/4] Remove callback resources from workers --- app/controllers/api/functions.php | 12 +-- app/controllers/api/vcs.php | 4 +- app/controllers/shared/api.php | 12 ++- app/worker.php | 93 ++++++++++++----------- src/Appwrite/Platform/Workers/Builds.php | 26 +++---- src/Appwrite/Platform/Workers/Deletes.php | 91 ++++++++++------------ 6 files changed, 117 insertions(+), 121 deletions(-) diff --git a/app/controllers/api/functions.php b/app/controllers/api/functions.php index 21d5928267..19408e31cc 100644 --- a/app/controllers/api/functions.php +++ b/app/controllers/api/functions.php @@ -121,9 +121,7 @@ $redeployVcs = function (Request $request, Document $function, Document $project ->setType(BUILD_TYPE_DEPLOYMENT) ->setResource($function) ->setDeployment($deployment) - ->setTemplate($template) - ->setProject($project) - ->trigger(); + ->setTemplate($template); }; App::post('/v1/functions') @@ -1196,9 +1194,7 @@ App::post('/v1/functions/:functionId/deployments') $queueForBuilds ->setType(BUILD_TYPE_DEPLOYMENT) ->setResource($function) - ->setDeployment($deployment) - ->setProject($project) - ->trigger(); + ->setDeployment($deployment); } else { if ($deployment->isEmpty()) { $deployment = $dbForProject->createDocument('deployments', new Document([ @@ -1478,9 +1474,7 @@ App::post('/v1/functions/:functionId/deployments/:deploymentId/builds/:buildId') $queueForBuilds ->setType(BUILD_TYPE_DEPLOYMENT) ->setResource($function) - ->setDeployment($deployment) - ->setProject($project) - ->trigger(); + ->setDeployment($deployment); $queueForEvents ->setParam('functionId', $function->getId()) diff --git a/app/controllers/api/vcs.php b/app/controllers/api/vcs.php index 2bc94885b1..09ec5d7690 100644 --- a/app/controllers/api/vcs.php +++ b/app/controllers/api/vcs.php @@ -238,9 +238,7 @@ $createGitDeployments = function (GitHub $github, string $providerInstallationId $queueForBuilds ->setType(BUILD_TYPE_DEPLOYMENT) ->setResource($function) - ->setDeployment($deployment) - ->setProject($project) - ->trigger(); + ->setDeployment($deployment); //TODO: Add event? } diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 810d778a21..7becb3c522 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -2,6 +2,7 @@ use Appwrite\Auth\Auth; use Appwrite\Event\Audit; +use Appwrite\Event\Build; use Appwrite\Event\Database as EventDatabase; use Appwrite\Event\Delete; use Appwrite\Event\Event; @@ -305,10 +306,11 @@ App::init() ->inject('queueForAudits') ->inject('queueForDeletes') ->inject('queueForDatabase') + ->inject('queueForBuilds') ->inject('queueForUsage') ->inject('dbForProject') ->inject('mode') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Usage $queueForUsage, Database $dbForProject, string $mode) use ($databaseListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Database $dbForProject, string $mode) use ($databaseListener) { $route = $utopia->getRoute(); @@ -396,6 +398,7 @@ App::init() $queueForDeletes->setProject($project); $queueForDatabase->setProject($project); + $queueForBuilds->setProject($project); $dbForProject ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $databaseListener($event, $document, $project, $queueForUsage, $dbForProject)) @@ -513,11 +516,12 @@ App::shutdown() ->inject('queueForUsage') ->inject('queueForDeletes') ->inject('queueForDatabase') + ->inject('queueForBuilds') ->inject('dbForProject') ->inject('queueForFunctions') ->inject('mode') ->inject('dbForConsole') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole) use ($parseLabel) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole) use ($parseLabel) { $responsePayload = $response->getPayload(); @@ -618,6 +622,10 @@ App::shutdown() $queueForDatabase->trigger(); } + if (!empty($queueForBuilds->getType())) { + $queueForBuilds->trigger(); + } + /** * Cache label */ diff --git a/app/worker.php b/app/worker.php index 2080970acb..701f7b7036 100644 --- a/app/worker.php +++ b/app/worker.php @@ -45,8 +45,7 @@ Server::setResource('dbForConsole', function (Cache $cache, Registry $register) $database = $pools ->get('console') ->pop() - ->getResource() - ; + ->getResource(); $adapter = new Database($database, $cache); $adapter->setNamespace('_console'); @@ -54,26 +53,6 @@ Server::setResource('dbForConsole', function (Cache $cache, Registry $register) return $adapter; }, ['cache', 'register']); -Server::setResource('dbForProject', function (Cache $cache, Registry $register, Message $message, Database $dbForConsole) { - $payload = $message->getPayload() ?? []; - $project = new Document($payload['project'] ?? []); - - if ($project->isEmpty() || $project->getId() === 'console') { - return $dbForConsole; - } - - $pools = $register->get('pools'); - $database = $pools - ->get($project->getAttribute('database')) - ->pop() - ->getResource() - ; - - $adapter = new Database($database, $cache); - $adapter->setNamespace('_' . $project->getInternalId()); - return $adapter; -}, ['cache', 'register', 'message', 'dbForConsole']); - Server::setResource('project', function (Message $message, Database $dbForConsole) { $payload = $message->getPayload() ?? []; $project = new Document($payload['project'] ?? []); @@ -81,10 +60,26 @@ Server::setResource('project', function (Message $message, Database $dbForConsol if ($project->getId() === 'console') { return $project; } + return $dbForConsole->getDocument('projects', $project->getId()); - ; }, ['message', 'dbForConsole']); +Server::setResource('dbForProject', function (Cache $cache, Registry $register, Message $message, Document $project, Database $dbForConsole) { + if ($project->isEmpty() || $project->getId() === 'console') { + return $dbForConsole; + } + + $pools = $register->get('pools'); + $database = $pools + ->get($project->getAttribute('database')) + ->pop() + ->getResource(); + + $adapter = new Database($database, $cache); + $adapter->setNamespace('_' . $project->getInternalId()); + return $adapter; +}, ['cache', 'register', 'message', 'project', 'dbForConsole']); + Server::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, $cache) { $databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools @@ -143,72 +138,84 @@ Server::setResource('cache', function (Registry $register) { return new Cache(new Sharding($adapters)); }, ['register']); + Server::setResource('log', fn() => new Log()); + Server::setResource('queueForUsage', function (Connection $queue) { return new Usage($queue); }, ['queue']); + Server::setResource('queue', function (Group $pools) { return $pools->get('queue')->pop()->getResource(); }, ['pools']); + Server::setResource('queueForDatabase', function (Connection $queue) { return new EventDatabase($queue); }, ['queue']); + Server::setResource('queueForMessaging', function (Connection $queue) { return new Messaging($queue); }, ['queue']); + Server::setResource('queueForMails', function (Connection $queue) { return new Mail($queue); }, ['queue']); + Server::setResource('queueForBuilds', function (Connection $queue) { return new Build($queue); }, ['queue']); + Server::setResource('queueForDeletes', function (Connection $queue) { return new Delete($queue); }, ['queue']); + Server::setResource('queueForEvents', function (Connection $queue) { return new Event($queue); }, ['queue']); + Server::setResource('queueForAudits', function (Connection $queue) { return new Audit($queue); }, ['queue']); + Server::setResource('queueForFunctions', function (Connection $queue) { return new Func($queue); }, ['queue']); + Server::setResource('queueForCertificates', function (Connection $queue) { return new Certificate($queue); }, ['queue']); + Server::setResource('queueForMigrations', function (Connection $queue) { return new Migration($queue); }, ['queue']); + Server::setResource('queueForHamster', function (Connection $queue) { return new Hamster($queue); }, ['queue']); + Server::setResource('logger', function (Registry $register) { return $register->get('logger'); }, ['register']); + Server::setResource('pools', function (Registry $register) { return $register->get('pools'); }, ['register']); -Server::setResource('getFunctionsDevice', function () { - return function (string $projectId) { - return getDevice(APP_STORAGE_FUNCTIONS . '/app-' . $projectId); - }; -}); -Server::setResource('getFilesDevice', function () { - return function (string $projectId) { - return getDevice(APP_STORAGE_UPLOADS . '/app-' . $projectId); - }; -}); -Server::setResource('getBuildsDevice', function () { - return function (string $projectId) { - return getDevice(APP_STORAGE_BUILDS . '/app-' . $projectId); - }; -}); -Server::setResource('getCacheDevice', function () { - return function (string $projectId) { - return getDevice(APP_STORAGE_CACHE . '/app-' . $projectId); - }; -}); + +Server::setResource('functionsDevice', function (Document $project) { + return getDevice(APP_STORAGE_FUNCTIONS . '/app-' . $project->getId()); +}, ['project']); + +Server::setResource('filesDevice', function (Document $project) { + return getDevice(APP_STORAGE_UPLOADS . '/app-' . $project->getId()); +}, ['project']); + +Server::setResource('buildsDevice', function (Document $project) { + return getDevice(APP_STORAGE_BUILDS . '/app-' . $project->getId()); +}, ['project']); + +Server::setResource('cacheDevice', function (Document $project) { + return getDevice(APP_STORAGE_CACHE . '/app-' . $project->getId()); +}, ['project']); $pools = $register->get('pools'); $platform = new Appwrite(); diff --git a/src/Appwrite/Platform/Workers/Builds.php b/src/Appwrite/Platform/Workers/Builds.php index d23cfbff94..3d2ef365be 100644 --- a/src/Appwrite/Platform/Workers/Builds.php +++ b/src/Appwrite/Platform/Workers/Builds.php @@ -25,6 +25,7 @@ use Utopia\Database\Helpers\ID; use Utopia\Logger\Log; use Utopia\Platform\Action; use Utopia\Queue\Message; +use Utopia\Storage\Device; use Utopia\Storage\Device\Local; use Utopia\VCS\Adapter\Git\GitHub; @@ -49,9 +50,9 @@ class Builds extends Action ->inject('queueForUsage') ->inject('cache') ->inject('dbForProject') - ->inject('getFunctionsDevice') + ->inject('functionsDevice') ->inject('log') - ->callback(fn($message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $usage, Cache $cache, Database $dbForProject, callable $getFunctionsDevice, Log $log) => $this->action($message, $dbForConsole, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $getFunctionsDevice, $log)); + ->callback(fn($message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $usage, Cache $cache, Database $dbForProject, Device $functionsDevice, Log $log) => $this->action($message, $dbForConsole, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $functionsDevice, $log)); } /** @@ -62,12 +63,12 @@ class Builds extends Action * @param Usage $queueForUsage * @param Cache $cache * @param Database $dbForProject - * @param callable $getFunctionsDevice + * @param Device $functionsDevice * @param Log $log * @return void * @throws \Utopia\Database\Exception */ - public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, Database $dbForProject, callable $getFunctionsDevice, Log $log): void + public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, Database $dbForProject, Device $functionsDevice, Log $log): void { $payload = $message->getPayload() ?? []; @@ -89,7 +90,7 @@ class Builds extends Action case BUILD_TYPE_RETRY: Console::info('Creating build for deployment: ' . $deployment->getId()); $github = new GitHub($cache); - $this->buildDeployment($getFunctionsDevice, $queueForFunctions, $queueForEvents, $queueForUsage, $dbForConsole, $dbForProject, $github, $project, $resource, $deployment, $template, $log); + $this->buildDeployment($functionsDevice, $queueForFunctions, $queueForEvents, $queueForUsage, $dbForConsole, $dbForProject, $github, $project, $resource, $deployment, $template, $log); break; default: @@ -98,7 +99,7 @@ class Builds extends Action } /** - * @param callable $getFunctionsDevice + * @param Device $functionsDevice * @param Func $queueForFunctions * @param Event $queueForEvents * @param Usage $queueForUsage @@ -114,7 +115,7 @@ class Builds extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function buildDeployment(callable $getFunctionsDevice, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Database $dbForConsole, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void + protected function buildDeployment(Device $functionsDevice, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Database $dbForConsole, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void { $executor = new Executor(App::getEnv('_APP_EXECUTOR_HOST')); @@ -156,7 +157,6 @@ class Builds extends Action $durationStart = \microtime(true); $buildId = $deployment->getAttribute('buildId', ''); $isNewBuild = empty($buildId); - $deviceFunctions = $getFunctionsDevice($project->getId()); if ($isNewBuild) { $buildId = ID::unique(); @@ -170,7 +170,7 @@ class Builds extends Action 'path' => '', 'runtime' => $function->getAttribute('runtime'), 'source' => $deployment->getAttribute('path', ''), - 'sourceType' => strtolower($deviceFunctions->getType()), + 'sourceType' => strtolower($functionsDevice->getType()), 'logs' => '', 'endTime' => null, 'duration' => 0, @@ -188,7 +188,7 @@ class Builds extends Action $installationId = $deployment->getAttribute('installationId', ''); $providerRepositoryId = $deployment->getAttribute('providerRepositoryId', ''); $providerCommitHash = $deployment->getAttribute('providerCommitHash', ''); - $isVcsEnabled = $providerRepositoryId ? true : false; + $isVcsEnabled = !empty($providerRepositoryId); $owner = ''; $repositoryName = ''; @@ -311,10 +311,8 @@ class Builds extends Action Console::execute('tar --exclude code.tar.gz -czf ' . $tmpPathFile . ' -C /tmp/builds/' . \escapeshellcmd($buildId) . '/code' . (empty($rootDirectory) ? '' : '/' . $rootDirectory) . ' .', '', $stdout, $stderr); - $deviceFunctions = $getFunctionsDevice($project->getId()); - - $path = $deviceFunctions->getPath($deployment->getId() . '.' . \pathinfo('code.tar.gz', PATHINFO_EXTENSION)); - $result = $localDevice->transfer($tmpPathFile, $path, $deviceFunctions); + $path = $functionsDevice->getPath($deployment->getId() . '.' . \pathinfo('code.tar.gz', PATHINFO_EXTENSION)); + $result = $localDevice->transfer($tmpPathFile, $path, $functionsDevice); if (!$result) { throw new \Exception("Unable to move file"); diff --git a/src/Appwrite/Platform/Workers/Deletes.php b/src/Appwrite/Platform/Workers/Deletes.php index 542f7f41db..473bdc6de9 100644 --- a/src/Appwrite/Platform/Workers/Deletes.php +++ b/src/Appwrite/Platform/Workers/Deletes.php @@ -44,22 +44,22 @@ class Deletes extends Action ->inject('message') ->inject('dbForConsole') ->inject('getProjectDB') - ->inject('getFilesDevice') - ->inject('getFunctionsDevice') - ->inject('getBuildsDevice') - ->inject('getCacheDevice') + ->inject('filesDevice') + ->inject('functionsDevice') + ->inject('buildsDevice') + ->inject('cacheDevice') ->inject('abuseRetention') ->inject('executionRetention') ->inject('auditRetention') ->inject('log') - ->callback(fn ($message, $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice, string $abuseRetention, string $executionRetention, string $auditRetention, Log $log) => $this->action($message, $dbForConsole, $getProjectDB, $getFilesDevice, $getFunctionsDevice, $getBuildsDevice, $getCacheDevice, $abuseRetention, $executionRetention, $auditRetention, $log)); + ->callback(fn ($message, $dbForConsole, callable $getProjectDB, Device $filesDevice, Device $functionsDevice, Device $buildsDevice, Device $cacheDevice, string $abuseRetention, string $executionRetention, string $auditRetention, Log $log) => $this->action($message, $dbForConsole, $getProjectDB, $filesDevice, $functionsDevice, $buildsDevice, $cacheDevice, $abuseRetention, $executionRetention, $auditRetention, $log)); } /** * @throws Exception * @throws Throwable */ - public function action(Message $message, Database $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice, string $abuseRetention, string $executionRetention, string $auditRetention, Log $log): void + public function action(Message $message, Database $dbForConsole, callable $getProjectDB, Device $filesDevice, Device $functionsDevice, Device $buildsDevice, Device $cacheDevice, string $abuseRetention, string $executionRetention, string $auditRetention, Log $log): void { $payload = $message->getPayload() ?? []; @@ -87,13 +87,13 @@ class Deletes extends Action $this->deleteCollection($getProjectDB, $document, $project); break; case DELETE_TYPE_PROJECTS: - $this->deleteProject($dbForConsole, $getProjectDB, $getFilesDevice, $getFunctionsDevice, $getBuildsDevice, $getCacheDevice, $document); + $this->deleteProject($dbForConsole, $getProjectDB, $filesDevice, $functionsDevice, $buildsDevice, $cacheDevice, $document); break; case DELETE_TYPE_FUNCTIONS: - $this->deleteFunction($dbForConsole, $getProjectDB, $getFunctionsDevice, $getBuildsDevice, $document, $project); + $this->deleteFunction($dbForConsole, $getProjectDB, $functionsDevice, $buildsDevice, $document, $project); break; case DELETE_TYPE_DEPLOYMENTS: - $this->deleteDeployment($getProjectDB, $getFunctionsDevice, $getBuildsDevice, $document, $project); + $this->deleteDeployment($getProjectDB, $functionsDevice, $buildsDevice, $document, $project); break; case DELETE_TYPE_USERS: $this->deleteUser($getProjectDB, $document, $project); @@ -101,11 +101,11 @@ class Deletes extends Action case DELETE_TYPE_TEAMS: $this->deleteMemberships($getProjectDB, $document, $project); if ($project->getId() === 'console') { - $this->deleteProjectsByTeam($dbForConsole, $getProjectDB, $getFilesDevice, $getFunctionsDevice, $getBuildsDevice, $getCacheDevice, $document); + $this->deleteProjectsByTeam($dbForConsole, $getProjectDB, $filesDevice, $functionsDevice, $buildsDevice, $cacheDevice, $document); } break; case DELETE_TYPE_BUCKETS: - $this->deleteBucket($getProjectDB, $getFilesDevice, $document, $project); + $this->deleteBucket($getProjectDB, $filesDevice, $document, $project); break; case DELETE_TYPE_INSTALLATIONS: $this->deleteInstallation($dbForConsole, $getProjectDB, $document, $project); @@ -511,14 +511,14 @@ class Deletes extends Action * @throws Restricted * @throws Structure */ - private function deleteProjectsByTeam(Database $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice, Document $document): void + private function deleteProjectsByTeam(Database $dbForConsole, callable $getProjectDB, Device $filesDevice, Device $functionsDevice, Device $buildsDevice, Device $cacheDevice, Document $document): void { $projects = $dbForConsole->find('projects', [ Query::equal('teamInternalId', [$document->getInternalId()]) ]); foreach ($projects as $project) { - $this->deleteProject($dbForConsole, $getProjectDB, $getFilesDevice, $getFunctionsDevice, $getBuildsDevice, $getCacheDevice, $project); + $this->deleteProject($dbForConsole, $getProjectDB, $filesDevice, $functionsDevice, $buildsDevice, $cacheDevice, $project); $dbForConsole->deleteDocument('projects', $project->getId()); } } @@ -526,17 +526,17 @@ class Deletes extends Action /** * @param Database $dbForConsole * @param callable $getProjectDB - * @param callable $getFilesDevice - * @param callable $getFunctionsDevice - * @param callable $getBuildsDevice - * @param callable $getCacheDevice + * @param Device $filesDevice + * @param Device $functionsDevice + * @param Device $buildsDevice + * @param Device $cacheDevice * @param Document $document * @return void * @throws Exception * @throws Authorization * @throws \Utopia\Database\Exception */ - private function deleteProject(Database $dbForConsole, callable $getProjectDB, callable $getFilesDevice, callable $getFunctionsDevice, callable $getBuildsDevice, callable $getCacheDevice, Document $document): void + private function deleteProject(Database $dbForConsole, callable $getProjectDB, Device $filesDevice, Device $functionsDevice, Device $buildsDevice, Device $cacheDevice, Document $document): void { $projectId = $document->getId(); $projectInternalId = $document->getInternalId(); @@ -602,15 +602,10 @@ class Deletes extends Action } // Delete all storage directories - $uploads = $getFilesDevice($projectId); - $functions = $getFunctionsDevice($projectId); - $builds = $getBuildsDevice($projectId); - $cache = $getCacheDevice($projectId); - - $uploads->delete($uploads->getRoot(), true); - $functions->delete($functions->getRoot(), true); - $builds->delete($builds->getRoot(), true); - $cache->delete($cache->getRoot(), true); + $filesDevice->delete($filesDevice->getRoot(), true); + $functionsDevice->delete($functionsDevice->getRoot(), true); + $buildsDevice->delete($buildsDevice->getRoot(), true); + $cacheDevice->delete($cacheDevice->getRoot(), true); } /** @@ -772,14 +767,14 @@ class Deletes extends Action /** * @param callable $getProjectDB - * @param callable $getFunctionsDevice - * @param callable $getBuildsDevice + * @param Device $functionsDevice + * @param Device $buildsDevice * @param Document $document function document * @param Document $project * @return void * @throws Exception */ - private function deleteFunction(Database $dbForConsole, callable $getProjectDB, callable $getFunctionsDevice, callable $getBuildsDevice, Document $document, Document $project): void + private function deleteFunction(Database $dbForConsole, callable $getProjectDB, Device $functionsDevice, Device $buildsDevice, Document $document, Document $project): void { $projectId = $project->getId(); $dbForProject = $getProjectDB($project); @@ -811,25 +806,25 @@ class Deletes extends Action * Delete Deployments */ Console::info("Deleting deployments for function " . $functionId); - $functionsStorage = $getFunctionsDevice($projectId); + $deploymentInternalIds = []; $this->deleteByGroup('deployments', [ Query::equal('resourceInternalId', [$functionInternalId]) - ], $dbForProject, function (Document $document) use ($functionsStorage, &$deploymentInternalIds) { + ], $dbForProject, function (Document $document) use ($functionsDevice, &$deploymentInternalIds) { $deploymentInternalIds[] = $document->getInternalId(); - $this->deleteDeploymentFiles($functionsStorage, $document); + $this->deleteDeploymentFiles($functionsDevice, $document); }); /** * Delete builds */ Console::info("Deleting builds for function " . $functionId); - $buildsStorage = $getBuildsDevice($projectId); + foreach ($deploymentInternalIds as $deploymentInternalId) { $this->deleteByGroup('builds', [ Query::equal('deploymentInternalId', [$deploymentInternalId]) - ], $dbForProject, function (Document $document) use ($buildsStorage) { - $this->deleteBuildFiles($buildsStorage, $document); + ], $dbForProject, function (Document $document) use ($buildsDevice) { + $this->deleteBuildFiles($buildsDevice, $document); }); } @@ -929,14 +924,14 @@ class Deletes extends Action /** * @param callable $getProjectDB - * @param callable $getFunctionsDevice - * @param callable $getBuildsDevice + * @param Device $functionsDevice + * @param Device $buildsDevice * @param Document $document * @param Document $project * @return void * @throws Exception */ - private function deleteDeployment(callable $getProjectDB, callable $getFunctionsDevice, callable $getBuildsDevice, Document $document, Document $project): void + private function deleteDeployment(callable $getProjectDB, Device $functionsDevice, Device $buildsDevice, Document $document, Document $project): void { $projectId = $project->getId(); $dbForProject = $getProjectDB($project); @@ -946,18 +941,17 @@ class Deletes extends Action /** * Delete deployment files */ - $functionsStorage = $getFunctionsDevice($projectId); - $this->deleteDeploymentFiles($functionsStorage, $document); + $this->deleteDeploymentFiles($functionsDevice, $document); /** * Delete builds */ Console::info("Deleting builds for deployment " . $deploymentId); - $buildsStorage = $getBuildsDevice($projectId); + $this->deleteByGroup('builds', [ Query::equal('deploymentInternalId', [$deploymentInternalId]) - ], $dbForProject, function (Document $document) use ($buildsStorage) { - $this->deleteBuildFiles($buildsStorage, $document); + ], $dbForProject, function (Document $document) use ($buildsDevice) { + $this->deleteBuildFiles($buildsDevice, $document); }); /** @@ -1101,21 +1095,18 @@ class Deletes extends Action /** * @param callable $getProjectDB - * @param callable $getFilesDevice + * @param Device $filesDevice * @param Document $document * @param Document $project * @return void */ - private function deleteBucket(callable $getProjectDB, callable $getFilesDevice, Document $document, Document $project): void + private function deleteBucket(callable $getProjectDB, Device $filesDevice, Document $document, Document $project): void { - $projectId = $project->getId(); $dbForProject = $getProjectDB($project); $dbForProject->deleteCollection('bucket_' . $document->getInternalId()); - $device = $getFilesDevice($projectId); - - $device->deletePath($document->getId()); + $filesDevice->deletePath($document->getId()); } /** From bbce53cda5d93d464d7b01d5071bacdc8edd0726 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 21 Feb 2024 01:06:35 +1300 Subject: [PATCH 2/4] Auto trigger messaging events --- app/controllers/api/account.php | 12 ++-- app/controllers/api/messaging.php | 24 +++---- app/controllers/api/teams.php | 4 +- app/controllers/shared/api.php | 12 ++-- app/init.php | 8 +++ src/Appwrite/Event/Messaging.php | 25 +++++++- .../Platform/Tasks/ScheduleMessages.php | 1 + src/Appwrite/Platform/Workers/Messaging.php | 63 +++++++++---------- 8 files changed, 89 insertions(+), 60 deletions(-) diff --git a/app/controllers/api/account.php b/app/controllers/api/account.php index dd009864a7..11facf04a7 100644 --- a/app/controllers/api/account.php +++ b/app/controllers/api/account.php @@ -1773,10 +1773,10 @@ App::post('/v1/account/tokens/phone') ]); $queueForMessaging + ->setType(MESSAGE_SEND_TYPE_INTERNAL) ->setMessage($messageDoc) ->setRecipients([$phone]) - ->setProviderType(MESSAGE_TYPE_SMS) - ->trigger(); + ->setProviderType(MESSAGE_TYPE_SMS); $queueForEvents->setPayload( $response->output( @@ -3314,10 +3314,10 @@ App::post('/v1/account/verification/phone') ]); $queueForMessaging + ->setType(MESSAGE_SEND_TYPE_INTERNAL) ->setMessage($messageDoc) ->setRecipients([$user->getAttribute('phone')]) - ->setProviderType(MESSAGE_TYPE_SMS) - ->trigger(); + ->setProviderType(MESSAGE_TYPE_SMS); $queueForEvents ->setParam('userId', $user->getId()) @@ -3677,14 +3677,14 @@ App::post('/v1/account/mfa/challenge') } $queueForMessaging + ->setType(MESSAGE_SEND_TYPE_INTERNAL) ->setMessage(new Document([ '$id' => $challenge->getId(), 'data' => [ 'content' => $code, ], ])) - ->setRecipients([$user->getAttribute('phone')]) - ->trigger(); + ->setRecipients([$user->getAttribute('phone')]); break; case 'email': if (empty(App::getEnv('_APP_SMTP_HOST'))) { diff --git a/app/controllers/api/messaging.php b/app/controllers/api/messaging.php index 8e6c73f3bc..d8af44f7fd 100644 --- a/app/controllers/api/messaging.php +++ b/app/controllers/api/messaging.php @@ -2635,8 +2635,8 @@ App::post('/v1/messaging/messages/email') switch ($status) { case MessageStatus::PROCESSING: $queueForMessaging - ->setMessageId($message->getId()) - ->trigger(); + ->setType(MESSAGE_SEND_TYPE_EXTERNAL) + ->setMessageId($message->getId()); break; case MessageStatus::SCHEDULED: $schedule = $dbForConsole->createDocument('schedules', new Document([ @@ -2744,8 +2744,8 @@ App::post('/v1/messaging/messages/sms') switch ($status) { case MessageStatus::PROCESSING: $queueForMessaging - ->setMessageId($message->getId()) - ->trigger(); + ->setType(MESSAGE_SEND_TYPE_EXTERNAL) + ->setMessageId($message->getId()); break; case MessageStatus::SCHEDULED: $schedule = $dbForConsole->createDocument('schedules', new Document([ @@ -2870,8 +2870,8 @@ App::post('/v1/messaging/messages/push') switch ($status) { case MessageStatus::PROCESSING: $queueForMessaging - ->setMessageId($message->getId()) - ->trigger(); + ->setType(MESSAGE_SEND_TYPE_EXTERNAL) + ->setMessageId($message->getId()); break; case MessageStatus::SCHEDULED: $schedule = $dbForConsole->createDocument('schedules', new Document([ @@ -3263,8 +3263,8 @@ App::patch('/v1/messaging/messages/email/:messageId') if ($status === MessageStatus::PROCESSING) { $queueForMessaging - ->setMessageId($message->getId()) - ->trigger(); + ->setType(MESSAGE_SEND_TYPE_EXTERNAL) + ->setMessageId($message->getId()); } $queueForEvents @@ -3382,8 +3382,8 @@ App::patch('/v1/messaging/messages/sms/:messageId') if ($status === MessageStatus::PROCESSING) { $queueForMessaging - ->setMessageId($message->getId()) - ->trigger(); + ->setType(MESSAGE_SEND_TYPE_EXTERNAL) + ->setMessageId($message->getId()); } $queueForEvents @@ -3541,8 +3541,8 @@ App::patch('/v1/messaging/messages/push/:messageId') if ($status === MessageStatus::PROCESSING) { $queueForMessaging - ->setMessageId($message->getId()) - ->trigger(); + ->setType(MESSAGE_SEND_TYPE_EXTERNAL) + ->setMessageId($message->getId()); } $queueForEvents diff --git a/app/controllers/api/teams.php b/app/controllers/api/teams.php index 775fc27bb7..d5db918cd7 100644 --- a/app/controllers/api/teams.php +++ b/app/controllers/api/teams.php @@ -658,10 +658,10 @@ App::post('/v1/teams/:teamId/memberships') ]); $queueForMessaging + ->setType(MESSAGE_SEND_TYPE_INTERNAL) ->setMessage($messageDoc) ->setRecipients([$phone]) - ->setProviderType('SMS') - ->trigger(); + ->setProviderType('SMS'); } } diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 7becb3c522..7f8041396f 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -384,9 +384,6 @@ App::init() ->setProject($project) ->setUser($user); - $queueForMessaging - ->setProject($project); - $queueForAudits ->setMode($mode) ->setUserAgent($request->getUserAgent('')) @@ -395,10 +392,10 @@ App::init() ->setProject($project) ->setUser($user); - $queueForDeletes->setProject($project); $queueForDatabase->setProject($project); $queueForBuilds->setProject($project); + $queueForMessaging->setProject($project); $dbForProject ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $databaseListener($event, $document, $project, $queueForUsage, $dbForProject)) @@ -517,11 +514,12 @@ App::shutdown() ->inject('queueForDeletes') ->inject('queueForDatabase') ->inject('queueForBuilds') + ->inject('queueForMessaging') ->inject('dbForProject') ->inject('queueForFunctions') ->inject('mode') ->inject('dbForConsole') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole) use ($parseLabel) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole) use ($parseLabel) { $responsePayload = $response->getPayload(); @@ -626,6 +624,10 @@ App::shutdown() $queueForBuilds->trigger(); } + if (!empty($queueForMessaging->getType())) { + $queueForBuilds->trigger(); + } + /** * Cache label */ diff --git a/app/init.php b/app/init.php index 4e75b5198b..8935fc7265 100644 --- a/app/init.php +++ b/app/init.php @@ -142,9 +142,11 @@ const APP_SOCIAL_DEV = 'https://dev.to/appwrite'; const APP_SOCIAL_STACKSHARE = 'https://stackshare.io/appwrite'; const APP_SOCIAL_YOUTUBE = 'https://www.youtube.com/c/appwrite?sub_confirmation=1'; const APP_HOSTNAME_INTERNAL = 'appwrite'; + // Database Reconnect const DATABASE_RECONNECT_SLEEP = 2; const DATABASE_RECONNECT_MAX_ATTEMPTS = 10; + // Database Worker Types const DATABASE_TYPE_CREATE_ATTRIBUTE = 'createAttribute'; const DATABASE_TYPE_CREATE_INDEX = 'createIndex'; @@ -152,9 +154,11 @@ const DATABASE_TYPE_DELETE_ATTRIBUTE = 'deleteAttribute'; const DATABASE_TYPE_DELETE_INDEX = 'deleteIndex'; const DATABASE_TYPE_DELETE_COLLECTION = 'deleteCollection'; const DATABASE_TYPE_DELETE_DATABASE = 'deleteDatabase'; + // Build Worker Types const BUILD_TYPE_DEPLOYMENT = 'deployment'; const BUILD_TYPE_RETRY = 'retry'; + // Deletion Types const DELETE_TYPE_DATABASES = 'databases'; const DELETE_TYPE_DOCUMENT = 'document'; @@ -180,6 +184,10 @@ const DELETE_TYPE_TOPIC = 'topic'; const DELETE_TYPE_TARGET = 'target'; const DELETE_TYPE_EXPIRED_TARGETS = 'invalid_targets'; const DELETE_TYPE_SESSION_TARGETS = 'session_targets'; + +// Message types +const MESSAGE_SEND_TYPE_INTERNAL = 'internal'; +const MESSAGE_SEND_TYPE_EXTERNAL = 'external'; // Mail Types const MAIL_TYPE_VERIFICATION = 'verification'; const MAIL_TYPE_MAGIC_SESSION = 'magicSession'; diff --git a/src/Appwrite/Event/Messaging.php b/src/Appwrite/Event/Messaging.php index 9201799355..b39af7e4fe 100644 --- a/src/Appwrite/Event/Messaging.php +++ b/src/Appwrite/Event/Messaging.php @@ -8,13 +8,13 @@ use Utopia\Queue\Client; class Messaging extends Event { + protected string $type = ''; protected ?string $messageId = null; protected ?Document $message = null; protected ?array $recipients = null; protected ?string $scheduledAt = null; protected ?string $providerType = null; - public function __construct(protected Connection $connection) { parent::__construct($connection); @@ -24,6 +24,29 @@ class Messaging extends Event ->setClass(Event::MESSAGING_CLASS_NAME); } + /** + * Sets type for the build event. + * + * @param string $type Can be `MESSAGE_TYPE_INTERNAL` or `MESSAGE_TYPE_EXTERNAL`. + * @return self + */ + public function setType(string $type): self + { + $this->type = $type; + + return $this; + } + + /** + * Returns set type for the function event. + * + * @return string + */ + public function getType(): string + { + return $this->type; + } + /** * Sets recipient for the messaging event. * diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index cc641b434a..bc9b6d37d2 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -50,6 +50,7 @@ class ScheduleMessages extends ScheduleBase $queueForMessaging = new Messaging($connection); $queueForMessaging + ->setType(MESSAGE_SEND_TYPE_EXTERNAL) ->setMessageId($schedule['resourceId']) ->setProject($schedule['project']) ->trigger(); diff --git a/src/Appwrite/Platform/Workers/Messaging.php b/src/Appwrite/Platform/Workers/Messaging.php index 083eae4e0a..2b954e4a9e 100644 --- a/src/Appwrite/Platform/Workers/Messaging.php +++ b/src/Appwrite/Platform/Workers/Messaging.php @@ -44,7 +44,7 @@ class Messaging extends Action } /** - * @throws Exception + * @throws \Exception */ public function __construct() { @@ -63,7 +63,7 @@ class Messaging extends Action * @param Database $dbForProject * @param Usage $queueForUsage * @return void - * @throws Exception + * @throws \Exception */ public function action(Message $message, Log $log, Database $dbForProject, Usage $queueForUsage): void { @@ -73,28 +73,27 @@ class Messaging extends Action throw new Exception('Missing payload'); } + $type = $payload['type'] ?? ''; + $project = new Document($payload['project'] ?? []); - if ( - !\is_null($payload['message']) - && !\is_null($payload['recipients']) - && $payload['providerType'] === MESSAGE_TYPE_SMS - ) { - // Message was triggered internally - $this->processInternalSMSMessage( - new Document($payload['message']), - new Document($payload['project'] ?? []), - $payload['recipients'], - $queueForUsage, - $log, - ); - } else { - $message = $dbForProject->getDocument('messages', $payload['messageId']); + switch ($type) { + case MESSAGE_SEND_TYPE_INTERNAL: + $message = new Document($payload['message'] ?? []); + $recipients = $payload['recipients'] ?? []; - $this->processMessage($dbForProject, $message); + $this->sendInternalSMSMessage($message, $project, $recipients, $queueForUsage, $log); + break; + case MESSAGE_SEND_TYPE_EXTERNAL: + $message = $dbForProject->getDocument('messages', $payload['messageId']); + + $this->sendExternalMessage($dbForProject, $message); + break; + default: + throw new Exception('Unknown message type: ' . $type); } } - private function processMessage(Database $dbForProject, Document $message): void + private function sendExternalMessage(Database $dbForProject, Document $message): void { $topicIds = $message->getAttribute('topics', []); $targetIds = $message->getAttribute('targets', []); @@ -216,9 +215,9 @@ class Messaging extends Action $identifiers = $identifiers[$providerId]; $adapter = match ($provider->getAttribute('type')) { - MESSAGE_TYPE_SMS => $this->sms($provider), - MESSAGE_TYPE_PUSH => $this->push($provider), - MESSAGE_TYPE_EMAIL => $this->email($provider), + MESSAGE_TYPE_SMS => $this->getSmsAdapter($provider), + MESSAGE_TYPE_PUSH => $this->getPushAdapter($provider), + MESSAGE_TYPE_EMAIL => $this->getEmailAdapter($provider), default => throw new Exception(Exception::PROVIDER_INCORRECT_TYPE) }; @@ -234,7 +233,7 @@ class Messaging extends Action $messageData->setAttribute('to', $batch); $data = match ($provider->getAttribute('type')) { - MESSAGE_TYPE_SMS => $this->buildSMSMessage($messageData, $provider), + MESSAGE_TYPE_SMS => $this->buildSmsMessage($messageData, $provider), MESSAGE_TYPE_PUSH => $this->buildPushMessage($messageData), MESSAGE_TYPE_EMAIL => $this->buildEmailMessage($dbForProject, $messageData, $provider), default => throw new Exception(Exception::PROVIDER_INCORRECT_TYPE) @@ -312,7 +311,7 @@ class Messaging extends Action $dbForProject->updateDocument('messages', $message->getId(), $message); } - private function processInternalSMSMessage(Document $message, Document $project, array $recipients, Usage $queueForUsage, Log $log): void + private function sendInternalSMSMessage(Document $message, Document $project, array $recipients, Usage $queueForUsage, Log $log): void { if (empty(App::getEnv('_APP_SMS_PROVIDER')) || empty(App::getEnv('_APP_SMS_FROM'))) { throw new \Exception('Skipped SMS processing. Missing "_APP_SMS_PROVIDER" or "_APP_SMS_FROM" environment variables.'); @@ -375,7 +374,7 @@ class Messaging extends Action ] ]); - $adapter = $this->sms($provider); + $adapter = $this->getSmsAdapter($provider); $maxBatchSize = $adapter->getMaxMessagesPerRequest(); $batches = \array_chunk($recipients, $maxBatchSize); @@ -385,7 +384,7 @@ class Messaging extends Action return function () use ($batch, $message, $provider, $adapter, $batchIndex, $project, $queueForUsage) { $message->setAttribute('to', $batch); - $data = $this->buildSMSMessage($message, $provider); + $data = $this->buildSmsMessage($message, $provider); try { $adapter->send($data); @@ -401,11 +400,7 @@ class Messaging extends Action }, $batches)); } - public function shutdown(): void - { - } - - private function sms(Document $provider): ?SMSAdapter + private function getSmsAdapter(Document $provider): ?SMSAdapter { $credentials = $provider->getAttribute('credentials'); @@ -420,7 +415,7 @@ class Messaging extends Action }; } - private function push(Document $provider): ?PushAdapter + private function getPushAdapter(Document $provider): ?PushAdapter { $credentials = $provider->getAttribute('credentials'); @@ -437,7 +432,7 @@ class Messaging extends Action }; } - private function email(Document $provider): ?EmailAdapter + private function getEmailAdapter(Document $provider): ?EmailAdapter { $credentials = $provider->getAttribute('credentials', []); $options = $provider->getAttribute('options', []); @@ -503,7 +498,7 @@ class Messaging extends Action return new Email($to, $subject, $content, $fromName, $fromEmail, $replyToName, $replyToEmail, $cc, $bcc, null, $html); } - private function buildSMSMessage(Document $message, Document $provider): SMS + private function buildSmsMessage(Document $message, Document $provider): SMS { $to = $message['to']; $content = $message['data']['content']; From 7ad9b1cd5f61ba54540ede0ea8b74501b9a5ff04 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 21 Feb 2024 01:54:17 +1300 Subject: [PATCH 3/4] Try fix phone test --- .../Account/AccountCustomClientTest.php | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/tests/e2e/Services/Account/AccountCustomClientTest.php b/tests/e2e/Services/Account/AccountCustomClientTest.php index 1169686ada..ba525f556e 100644 --- a/tests/e2e/Services/Account/AccountCustomClientTest.php +++ b/tests/e2e/Services/Account/AccountCustomClientTest.php @@ -1918,7 +1918,7 @@ class AccountCustomClientTest extends Scope $this->assertEquals($response['body']['users'][0]['email'], $email); } - + #[Retry(count: 2)] public function testCreatePhone(): array { $number = '+123456789'; @@ -1941,22 +1941,8 @@ class AccountCustomClientTest extends Scope $this->assertEquals(true, (new DatetimeValidator())->isValid($response['body']['expire'])); $userId = $response['body']['userId']; - $messageId = $response['body']['$id']; - /** - * Test for FAILURE - */ - $response = $this->client->call(Client::METHOD_POST, '/account/tokens/phone', array_merge([ - 'origin' => 'http://localhost', - 'content-type' => 'application/json', - 'x-appwrite-project' => $this->getProject()['$id'], - ]), [ - 'userId' => ID::unique() - ]); - - $this->assertEquals(400, $response['headers']['status-code']); - - \sleep(5); + \sleep(7); $smsRequest = $this->getLastRequest(); @@ -1972,6 +1958,19 @@ class AccountCustomClientTest extends Scope $data['id'] = $userId; $data['number'] = $number; + /** + * Test for FAILURE + */ + $response = $this->client->call(Client::METHOD_POST, '/account/tokens/phone', array_merge([ + 'origin' => 'http://localhost', + 'content-type' => 'application/json', + 'x-appwrite-project' => $this->getProject()['$id'], + ]), [ + 'userId' => ID::unique() + ]); + + $this->assertEquals(400, $response['headers']['status-code']); + return $data; } From b265106af20d15e82a8f54a7b673dd45689ceec8 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 21 Feb 2024 02:20:09 +1300 Subject: [PATCH 4/4] Fix tests --- app/controllers/shared/api.php | 2 +- src/Appwrite/Event/Messaging.php | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 7f8041396f..141553e12d 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -625,7 +625,7 @@ App::shutdown() } if (!empty($queueForMessaging->getType())) { - $queueForBuilds->trigger(); + $queueForMessaging->trigger(); } /** diff --git a/src/Appwrite/Event/Messaging.php b/src/Appwrite/Event/Messaging.php index b39af7e4fe..f8ea9e472b 100644 --- a/src/Appwrite/Event/Messaging.php +++ b/src/Appwrite/Event/Messaging.php @@ -185,6 +185,7 @@ class Messaging extends Event $client = new Client($this->queue, $this->connection); return $client->enqueue([ + 'type' => $this->type, 'project' => $this->project, 'user' => $this->user, 'messageId' => $this->messageId,