From 16a2d618745e5d0487750a7262d6d04270f1f127 Mon Sep 17 00:00:00 2001 From: shimon Date: Thu, 31 Oct 2024 17:43:49 +0200 Subject: [PATCH] Rollback cross regions connections --- app/cli.php | 8 --- app/controllers/shared/api.php | 17 +++--- app/init.php | 39 +++++--------- app/realtime.php | 13 ++--- app/worker.php | 25 ++++----- src/Appwrite/Messaging/Adapter/Realtime.php | 11 ++-- src/Appwrite/Platform/Tasks/ScheduleBase.php | 21 ++++---- .../Platform/Tasks/ScheduleExecutions.php | 5 +- .../Platform/Tasks/ScheduleFunctions.php | 7 ++- .../Platform/Tasks/ScheduleMessages.php | 7 ++- src/Appwrite/Platform/Workers/Builds.php | 24 +++------ .../Platform/Workers/Certificates.php | 53 +++++++------------ src/Appwrite/Platform/Workers/Databases.php | 49 ++++++----------- src/Appwrite/Platform/Workers/Deletes.php | 1 - src/Appwrite/Platform/Workers/Functions.php | 19 +------ src/Appwrite/Platform/Workers/Messaging.php | 2 +- src/Appwrite/Platform/Workers/Migrations.php | 50 +++++++---------- 17 files changed, 119 insertions(+), 232 deletions(-) diff --git a/app/cli.php b/app/cli.php index 0b76afc2e1..ca635d9543 100644 --- a/app/cli.php +++ b/app/cli.php @@ -162,23 +162,15 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole, CLI::setResource('queue', function (Group $pools) { return $pools->get('queue')->pop()->getResource(); }, ['pools']); - -CLI::setResource('poolForQueue', function (Group $pools) { - return $pools->get('queue'); -}, ['pools']); - CLI::setResource('queueForFunctions', function (Connection $queue) { return new Func($queue); }, ['queue']); - CLI::setResource('queueForDeletes', function (Connection $queue) { return new Delete($queue); }, ['queue']); - CLI::setResource('queueForCertificates', function (Connection $queue) { return new Certificate($queue); }, ['queue']); - CLI::setResource('logError', function (Registry $register) { return function (Throwable $error, string $namespace, string $action) use ($register) { $logger = $register->get('logger'); diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index f4be41544d..81af833c9f 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -162,7 +162,6 @@ App::init() ->inject('mode') ->inject('team') ->action(function (App $utopia, Request $request, Database $dbForConsole, Document $project, Document $user, ?Document $session, array $servers, string $mode, Document $team) { - $route = $utopia->getRoute(); if ($project->isEmpty()) { @@ -364,6 +363,7 @@ App::init() ->inject('dbForProject') ->inject('mode') ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Database $dbForProject, string $mode) use ($databaseListener) { + $route = $utopia->getRoute(); if ( @@ -461,7 +461,6 @@ App::init() ->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $databaseListener($event, $document, $project, $queueForUsage, $dbForProject)); $useCache = $route->getLabel('cache', false); - if ($useCache) { $key = md5($request->getURI() . '*' . implode('*', $request->getParams()) . '*' . APP_CACHE_BUSTER); $cacheLog = Authorization::skip(fn () => $dbForProject->getDocument('cache', $key)); @@ -470,8 +469,8 @@ App::init() ); $timestamp = 60 * 60 * 24 * 30; $data = $cache->load($key, $timestamp); + if (!empty($data) && !$cacheLog->isEmpty()) { - $timerStart = \microtime(true); $parts = explode('/', $cacheLog->getAttribute('resourceType')); $type = $parts[0] ?? null; @@ -552,7 +551,6 @@ App::shutdown() ->inject('project') ->inject('dbForProject') ->action(function (App $utopia, Request $request, Response $response, Document $project, Database $dbForProject) { - $route = $utopia->getRoute(); $sessionLimit = $project->getAttribute('auths', [])['maxSessions'] ?? APP_LIMIT_USER_SESSIONS_DEFAULT; $session = $response->getPayload(); $userId = $session['userId'] ?? ''; @@ -597,9 +595,8 @@ App::shutdown() ->inject('queueForFunctions') ->inject('mode') ->inject('dbForConsole') - ->inject('realtimeConnection') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole, callable $realtimeConnection) use ($parseLabel) { - $route = $utopia->getRoute(); + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole) use ($parseLabel) { + $responsePayload = $response->getPayload(); if (!empty($queueForEvents->getEvent())) { @@ -645,7 +642,6 @@ App::shutdown() ); Realtime::send( - redis: $realtimeConnection($queueForEvents->getSourceRegion()), projectId: $target['projectId'] ?? $project->getId(), payload: $queueForEvents->getRealtimePayload(), events: $allEvents, @@ -713,11 +709,9 @@ App::shutdown() * Cache label */ $useCache = $route->getLabel('cache', false); - if ($useCache) { $resource = $resourceType = null; $data = $response->getPayload(); - if (!empty($data['payload'])) { $pattern = $route->getLabel('cache.resource', null); if (!empty($pattern)) { @@ -734,7 +728,6 @@ App::shutdown() $cacheLog = Authorization::skip(fn () => $dbForProject->getDocument('cache', $key)); $accessedAt = $cacheLog->getAttribute('accessedAt', ''); $now = DateTime::now(); - if ($cacheLog->isEmpty()) { Authorization::skip(fn () => $dbForProject->createDocument('cache', new Document([ '$id' => $key, @@ -761,6 +754,8 @@ App::shutdown() } } + + if ($project->getId() !== 'console') { if (!Auth::isPrivilegedUser(Authorization::getRoles())) { $fileSize = 0; diff --git a/app/init.php b/app/init.php index 09c919dade..d6f7d8c984 100644 --- a/app/init.php +++ b/app/init.php @@ -1530,33 +1530,23 @@ App::setResource('deviceForLocal', function () { return new Local(); }); -App::setResource('deviceForFiles', function ($project, $connectionString) { - return getDevice(APP_STORAGE_UPLOADS.'/app-'.$project->getId(), $connectionString); -}, ['project', 'connectionString']); +App::setResource('deviceForFiles', function ($project) { + return getDevice(APP_STORAGE_UPLOADS . '/app-' . $project->getId()); +}, ['project']); -App::setResource('deviceForFunctions', function ($project, $connectionString) { - return getDevice(APP_STORAGE_FUNCTIONS.'/app-'.$project->getId(), $connectionString); -}, ['project', 'connectionString']); +App::setResource('deviceForFunctions', function ($project) { + return getDevice(APP_STORAGE_FUNCTIONS . '/app-' . $project->getId()); +}, ['project']); -App::setResource('deviceForBuilds', function ($project, $connectionString) { - return getDevice(APP_STORAGE_BUILDS.'/app-'.$project->getId(), $connectionString); -}, ['project', 'connectionString']); +App::setResource('deviceForBuilds', function ($project) { + return getDevice(APP_STORAGE_BUILDS . '/app-' . $project->getId()); +}, ['project']); -App::setResource('connectionString', function () { - return System::getEnv('_APP_CONNECTIONS_STORAGE', ''); -}); - -App::setResource('realtimeConnection',function ($pools) { - return function () use ($pools) { - return $pools->get('pubsub')->pop()->getResource(); - }; -}, ['pools']); - - -function getDevice(string $root, string $connectionString = ''): Device +function getDevice(string $root, string $connection = ''): Device { + $connection = !empty($connection) ? $connection : System::getEnv('_APP_CONNECTIONS_STORAGE', ''); - if (! empty($connectionString)) { + if (!empty($connection)) { $acl = 'private'; $device = Storage::DEVICE_LOCAL; $accessKey = ''; @@ -1565,7 +1555,7 @@ function getDevice(string $root, string $connectionString = ''): Device $region = ''; try { - $dsn = new DSN($connectionString); + $dsn = new DSN($connection); $device = $dsn->getScheme(); $accessKey = $dsn->getUser() ?? ''; $accessSecret = $dsn->getPassword() ?? ''; @@ -1826,9 +1816,6 @@ App::setResource('team', function (Document $project, Database $dbForConsole, Ap ]); }); - if (!$team) { - $team = new Document([]); - } return $team; }, ['project', 'dbForConsole', 'utopia', 'request']); diff --git a/app/realtime.php b/app/realtime.php index 6ca4639f85..2e95756e76 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -137,17 +137,14 @@ if (!function_exists('getCache')) { } } -if (!function_exists("getPubSub")) { - function getPubSub(): \Redis +if (!function_exists('getRealtime')) { + function getRealtime(): Realtime { - global $register; - - $pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */ - return $pools->get('pubsub')->pop()->getResource(); + return new Realtime(); } } -$realtime = new Realtime(); +$realtime = getRealtime(); /** * Table for statistics across all workers. @@ -370,7 +367,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, } $start = time(); - $redis = getPubSub(); /** @var \Redis $redis */ + $redis = $register->get('pools')->get('pubsub')->pop()->getResource(); /** @var Redis $redis */ $redis->setOption(Redis::OPT_READ_TIMEOUT, -1); if ($redis->ping(true)) { diff --git a/app/worker.php b/app/worker.php index e3fd7d1f7b..25defba3f8 100644 --- a/app/worker.php +++ b/app/worker.php @@ -262,20 +262,20 @@ Server::setResource('pools', function (Registry $register) { return $register->get('pools'); }, ['register']); -Server::setResource('deviceForFunctions', function (Document $project, $connectionString) { - return getDevice(APP_STORAGE_FUNCTIONS.'/app-'.$project->getId(), $connectionString); -}, ['project', 'connectionString']); +Server::setResource('deviceForFunctions', function (Document $project) { + return getDevice(APP_STORAGE_FUNCTIONS . '/app-' . $project->getId()); +}, ['project']); -Server::setResource('deviceForFiles', function (Document $project, $connectionString) { - return getDevice(APP_STORAGE_UPLOADS.'/app-'.$project->getId(), $connectionString); -}, ['project', 'connectionString']); +Server::setResource('deviceForFiles', function (Document $project) { + return getDevice(APP_STORAGE_UPLOADS . '/app-' . $project->getId()); +}, ['project']); -Server::setResource('deviceForBuilds', function (Document $project, $connectionString) { - return getDevice(APP_STORAGE_BUILDS.'/app-'.$project->getId(), $connectionString); -}, ['project', 'connectionString']); +Server::setResource('deviceForBuilds', function (Document $project) { + return getDevice(APP_STORAGE_BUILDS . '/app-' . $project->getId()); +}, ['project']); Server::setResource('deviceForCache', function (Document $project) { - return getDevice(APP_STORAGE_CACHE.'/app-'.$project->getId()); + return getDevice(APP_STORAGE_CACHE . '/app-' . $project->getId()); }, ['project']); Server::setResource( @@ -283,11 +283,6 @@ Server::setResource( fn () => fn (Document $project, string $resourceType, ?string $resourceId) => false ); -Server::setResource('realtimeConnection',function ($pools) { - return function () use ($pools) { - return $pools->get('pubsub')->pop()->getResource(); - }; -}, ['pools']); $pools = $register->get('pools'); $platform = new Appwrite(); diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 079a02428d..c437d4d487 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -122,17 +122,15 @@ class Realtime extends Adapter /** * Sends an event to the Realtime Server - * @param \Redis $redis * @param string $projectId * @param array $payload - * @param array $events + * @param string $event * @param array $channels * @param array $roles * @param array $options * @return void - * @throws \RedisException */ - public static function send(\Redis $redis, string $projectId, array $payload, array $events, array $channels, array $roles, array $options = []): void + public static function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options = []): void { if (empty($channels) || empty($roles) || empty($projectId)) { return; @@ -140,8 +138,9 @@ class Realtime extends Adapter $permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged']; $userId = array_key_exists('userId', $options) ? $options['userId'] : null; - var_dump($redis->getHost()); - var_dump($channels); + + $redis = new \Redis(); //TODO: make this part of the constructor + $redis->connect(System::getEnv('_APP_REDIS_HOST', ''), System::getEnv('_APP_REDIS_PORT', '')); $redis->publish('realtime', json_encode([ 'project' => $projectId, 'roles' => $roles, diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index abe614829f..a1b85c341f 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -11,7 +11,6 @@ use Utopia\Database\Exception; use Utopia\Database\Query; use Utopia\Platform\Action; use Utopia\Pools\Group; -use Utopia\Pools\Pool; use Utopia\System\System; use function Swoole\Coroutine\run; @@ -26,7 +25,7 @@ abstract class ScheduleBase extends Action abstract public static function getName(): string; abstract public static function getSupportedResource(): string; abstract public static function getCollectionId(): string; - abstract protected function enqueueResources(\Utopia\Pools\Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void; + abstract protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void; public function __construct() { @@ -34,10 +33,10 @@ abstract class ScheduleBase extends Action $this ->desc("Execute {$type}s scheduled in Appwrite") - ->inject('poolForQueue') + ->inject('pools') ->inject('dbForConsole') ->inject('getProjectDB') - ->callback(fn (Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB) => $this->action($poolForQueue, $dbForConsole, $getProjectDB)); + ->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB)); } /** @@ -45,7 +44,7 @@ abstract class ScheduleBase extends Action * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker. */ - public function action(Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void + public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void { Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); @@ -114,17 +113,17 @@ abstract class ScheduleBase extends Action $latestDocument = \end($results); } - $poolForQueue->reclaim(); + $pools->reclaim(); Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds"); Console::success("Starting timers at " . DateTime::now()); - run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $poolForQueue, $getProjectDB) { + run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools, $getProjectDB) { /** * The timer synchronize $schedules copy with database collection. */ - Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $poolForQueue) { + Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) { $time = DateTime::now(); $timerStart = \microtime(true); @@ -173,17 +172,17 @@ abstract class ScheduleBase extends Action $lastSyncUpdate = $time; $timerEnd = \microtime(true); - $poolForQueue->reclaim(); + $pools->reclaim(); Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds"); }); Timer::tick( static::ENQUEUE_TIMER * 1000, - fn () => $this->enqueueResources($poolForQueue, $dbForConsole, $getProjectDB) + fn () => $this->enqueueResources($pools, $dbForConsole, $getProjectDB) ); - $this->enqueueResources($poolForQueue, $dbForConsole, $getProjectDB); + $this->enqueueResources($pools, $dbForConsole, $getProjectDB); }); } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index 58fc07b019..73a2814397 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -6,7 +6,6 @@ use Appwrite\Event\Func; use Swoole\Coroutine as Co; use Utopia\Database\Database; use Utopia\Pools\Group; -use Utopia\Pools\Pool; class ScheduleExecutions extends ScheduleBase { @@ -28,9 +27,9 @@ class ScheduleExecutions extends ScheduleBase return 'executions'; } - protected function enqueueResources(\Utopia\Pools\Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void + protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void { - $queue = $poolForQueue->pop(); + $queue = $pools->get('queue')->pop(); $connection = $queue->getResource(); $queueForFunctions = new Func($connection); $intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds'); diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index 8095e812a9..4d57902330 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -8,7 +8,6 @@ use Utopia\CLI\Console; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Pools\Group; -use Utopia\Pools\Pool; class ScheduleFunctions extends ScheduleBase { @@ -32,7 +31,7 @@ class ScheduleFunctions extends ScheduleBase return 'functions'; } - protected function enqueueResources(\Utopia\Pools\Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void + protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void { $timerStart = \microtime(true); $time = DateTime::now(); @@ -71,10 +70,10 @@ class ScheduleFunctions extends ScheduleBase } foreach ($delayedExecutions as $delay => $scheduleKeys) { - \go(function () use ($delay, $scheduleKeys, $poolForQueue) { + \go(function () use ($delay, $scheduleKeys, $pools) { \sleep($delay); // in seconds - $queue = $poolForQueue->pop(); + $queue = $pools->get('queue')->pop(); $connection = $queue->getResource(); foreach ($scheduleKeys as $scheduleKey) { diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index 3167e29f7a..b9d8e2a282 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -5,7 +5,6 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Messaging; use Utopia\Database\Database; use Utopia\Pools\Group; -use Utopia\Pools\Pool; class ScheduleMessages extends ScheduleBase { @@ -27,7 +26,7 @@ class ScheduleMessages extends ScheduleBase return 'messages'; } - protected function enqueueResources(\Utopia\Pools\Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void + protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void { foreach ($this->schedules as $schedule) { if (!$schedule['active']) { @@ -41,8 +40,8 @@ class ScheduleMessages extends ScheduleBase continue; } - \go(function () use ($schedule, $poolForQueue, $dbForConsole) { - $queue = $poolForQueue->pop(); + \go(function () use ($schedule, $pools, $dbForConsole) { + $queue = $pools->get('queue')->pop(); $connection = $queue->getResource(); $queueForMessaging = new Messaging($connection); diff --git a/src/Appwrite/Platform/Workers/Builds.php b/src/Appwrite/Platform/Workers/Builds.php index 36fe1a6d1a..5dd2f7f886 100644 --- a/src/Appwrite/Platform/Workers/Builds.php +++ b/src/Appwrite/Platform/Workers/Builds.php @@ -33,11 +33,6 @@ use Utopia\VCS\Adapter\Git\GitHub; class Builds extends Action { - /** - * @var mixed|string - */ - protected string $sourceRegion; - public static function getName(): string { return 'builds'; @@ -59,8 +54,7 @@ class Builds extends Action ->inject('dbForProject') ->inject('deviceForFunctions') ->inject('log') - ->inject('realtimeConnection') - ->callback(fn ($message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log, Callable $realtimeConnection) => $this->action($message, $dbForConsole, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $deviceForFunctions, $log, $realtimeConnection)); + ->callback(fn ($message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log) => $this->action($message, $dbForConsole, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $deviceForFunctions, $log)); } /** @@ -73,11 +67,10 @@ class Builds extends Action * @param Database $dbForProject * @param Device $deviceForFunctions * @param Log $log - * @param callable $realtimeConnection * @return void * @throws \Utopia\Database\Exception */ - public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log, Callable $realtimeConnection): void + public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log): void { $payload = $message->getPayload() ?? []; @@ -90,7 +83,6 @@ class Builds extends Action $resource = new Document($payload['resource'] ?? []); $deployment = new Document($payload['deployment'] ?? []); $template = new Document($payload['template'] ?? []); - $this->sourceRegion = $payload['sourceRegion'] ?? 'default'; $log->addTag('projectId', $project->getId()); $log->addTag('type', $type); @@ -100,7 +92,7 @@ class Builds extends Action case BUILD_TYPE_RETRY: Console::info('Creating build for deployment: ' . $deployment->getId()); $github = new GitHub($cache); - $this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForEvents, $queueForUsage, $dbForConsole, $dbForProject, $github, $project, $resource, $deployment, $template, $log, $realtimeConnection); + $this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForEvents, $queueForUsage, $dbForConsole, $dbForProject, $github, $project, $resource, $deployment, $template, $log); break; default: @@ -125,7 +117,7 @@ class Builds extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Database $dbForConsole, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log, Callable $realtimeConnection): void + protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Database $dbForConsole, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void { $executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST')); @@ -384,7 +376,6 @@ class Builds extends Action project: $project ); Realtime::send( - redis: $realtimeConnection($this->sourceRegion), projectId: 'console', payload: $build->getArrayCopy(), events: $allEvents, @@ -463,7 +454,6 @@ class Builds extends Action ); Realtime::send( - redis: $realtimeConnection($this->sourceRegion), projectId: 'console', payload: $build->getArrayCopy(), events: $allEvents, @@ -562,12 +552,12 @@ class Builds extends Action $err = $error; } }), - Co\go(function () use ($realtimeConnection, $executor, $project, $deployment, &$response, &$build, $dbForProject, $allEvents, &$err, &$isCanceled) { + Co\go(function () use ($executor, $project, $deployment, &$response, &$build, $dbForProject, $allEvents, &$err, &$isCanceled) { try { $executor->getLogs( deploymentId: $deployment->getId(), projectId: $project->getId(), - callback: function ($logs) use ($realtimeConnection, &$response, &$err, &$build, $dbForProject, $allEvents, $project, &$isCanceled) { + callback: function ($logs) use (&$response, &$err, &$build, $dbForProject, $allEvents, $project, &$isCanceled) { if ($isCanceled) { return; } @@ -601,7 +591,6 @@ class Builds extends Action project: $project ); Realtime::send( - redis: $realtimeConnection($this->sourceRegion), projectId: 'console', payload: $build->getArrayCopy(), events: $allEvents, @@ -704,7 +693,6 @@ class Builds extends Action project: $project ); Realtime::send( - redis: $realtimeConnection($this->sourceRegion), projectId: 'console', payload: $build->getArrayCopy(), events: $allEvents, diff --git a/src/Appwrite/Platform/Workers/Certificates.php b/src/Appwrite/Platform/Workers/Certificates.php index dcc42a87fc..a14f164295 100644 --- a/src/Appwrite/Platform/Workers/Certificates.php +++ b/src/Appwrite/Platform/Workers/Certificates.php @@ -30,11 +30,6 @@ use Utopia\System\System; class Certificates extends Action { - /** - * @var mixed|string - */ - protected string $sourceRegion; - public static function getName(): string { return 'certificates'; @@ -53,8 +48,7 @@ class Certificates extends Action ->inject('queueForEvents') ->inject('queueForFunctions') ->inject('log') - ->inject('realtimeConnection') - ->callback(fn (Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, Callable $realtimeConnection) => $this->action($message, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $log, $realtimeConnection)); + ->callback(fn (Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log) => $this->action($message, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $log)); } /** @@ -64,12 +58,11 @@ class Certificates extends Action * @param Event $queueForEvents * @param Func $queueForFunctions * @param Log $log - * @param callable $realtimeConnection * @return void * @throws Throwable * @throws \Utopia\Database\Exception */ - public function action(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, Callable $realtimeConnection): void + public function action(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log): void { $payload = $message->getPayload() ?? []; @@ -80,10 +73,10 @@ class Certificates extends Action $document = new Document($payload['domain'] ?? []); $domain = new Domain($document->getAttribute('domain', '')); $skipRenewCheck = $payload['skipRenewCheck'] ?? false; - $this->sourceRegion = $payload['sourceRegion'] ?? 'default'; $log->addTag('domain', $domain->get()); - $this->execute($domain, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $log,$realtimeConnection, $skipRenewCheck); + + $this->execute($domain, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $log, $skipRenewCheck); } /** @@ -92,17 +85,12 @@ class Certificates extends Action * @param Mail $queueForMails * @param Event $queueForEvents * @param Func $queueForFunctions - * @param Log $log - * @param callable $realtimeConnection * @param bool $skipRenewCheck * @return void - * @throws Authorization - * @throws Conflict - * @throws Structure * @throws Throwable * @throws \Utopia\Database\Exception */ - protected function execute(Domain $domain, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, Callable $realtimeConnection, bool $skipRenewCheck = false): void + private function execute(Domain $domain, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, bool $skipRenewCheck = false): void { /** * 1. Read arguments and validate domain @@ -138,7 +126,7 @@ class Certificates extends Action $certificate = $dbForConsole->findOne('certificates', [Query::equal('domain', [$domain->get()])]); // If we don't have certificate for domain yet, let's create new document. At the end we save it - if (!$certificate) { + if ($certificate->isEmpty()) { $certificate = new Document(); $certificate->setAttribute('domain', $domain->get()); } @@ -174,6 +162,7 @@ class Certificates extends Action $logs = 'Certificate successfully generated.'; $certificate->setAttribute('logs', \mb_strcut($logs, 0, 1000000));// Limit to 1MB + // Give certificates to Traefik $this->applyCertificateFiles($folder, $domain->get(), $letsEncryptData); @@ -204,7 +193,7 @@ class Certificates extends Action $certificate->setAttribute('updated', DateTime::now()); // Save all changes we made to certificate document into database - $this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForConsole, $queueForEvents, $queueForFunctions, $realtimeConnection); + $this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForConsole, $queueForEvents, $queueForFunctions); } } @@ -223,11 +212,11 @@ class Certificates extends Action * @throws Conflict * @throws Structure */ - protected function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Callable $realtimeConnection): void + private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions): void { // Check if update or insert required $certificateDocument = $dbForConsole->findOne('certificates', [Query::equal('domain', [$domain])]); - if (!empty($certificateDocument) && !$certificateDocument->isEmpty()) { + if (!$certificateDocument->isEmpty()) { // Merge new data with current data $certificate = new Document(\array_merge($certificateDocument->getArrayCopy(), $certificate->getArrayCopy())); $certificate = $dbForConsole->updateDocument('certificates', $certificate->getId(), $certificate); @@ -237,7 +226,7 @@ class Certificates extends Action } $certificateId = $certificate->getId(); - $this->updateDomainDocuments($certificateId, $domain, $success, $dbForConsole, $queueForEvents, $queueForFunctions, $realtimeConnection); + $this->updateDomainDocuments($certificateId, $domain, $success, $dbForConsole, $queueForEvents, $queueForFunctions); } /** @@ -245,7 +234,7 @@ class Certificates extends Action * * @return null|string Returns main domain. If null, there is no main domain yet. */ - protected function getMainDomain(): ?string + private function getMainDomain(): ?string { $envDomain = System::getEnv('_APP_DOMAIN', ''); if (!empty($envDomain) && $envDomain !== 'localhost') { @@ -266,7 +255,7 @@ class Certificates extends Action * @return void * @throws Exception */ - protected function validateDomain(Domain $domain, bool $isMainDomain, Log $log): void + private function validateDomain(Domain $domain, bool $isMainDomain, Log $log): void { if (empty($domain->get())) { throw new Exception('Missing certificate domain.'); @@ -310,7 +299,7 @@ class Certificates extends Action * @return bool True, if certificate needs to be renewed * @throws Exception */ - protected function isRenewRequired(string $domain, Log $log): bool + private function isRenewRequired(string $domain, Log $log): bool { $certPath = APP_STORAGE_CERTIFICATES . '/' . $domain . '/cert.pem'; if (\file_exists($certPath)) { @@ -344,7 +333,7 @@ class Certificates extends Action * @return array Named array with keys 'stdout' and 'stderr', both string * @throws Exception */ - protected function issueCertificate(string $folder, string $domain, string $email): array + private function issueCertificate(string $folder, string $domain, string $email): array { $stdout = ''; $stderr = ''; @@ -374,7 +363,7 @@ class Certificates extends Action * @return string * @throws \Utopia\Database\Exception */ - protected function getRenewDate(string $domain): string + private function getRenewDate(string $domain): string { $certPath = APP_STORAGE_CERTIFICATES . '/' . $domain . '/cert.pem'; $certData = openssl_x509_parse(file_get_contents($certPath)); @@ -392,7 +381,7 @@ class Certificates extends Action * @return void * @throws Exception */ - protected function applyCertificateFiles(string $folder, string $domain, array $letsEncryptData): void + private function applyCertificateFiles(string $folder, string $domain, array $letsEncryptData): void { // Prepare folder in storage for domain @@ -443,7 +432,7 @@ class Certificates extends Action * @return void * @throws Exception */ - protected function notifyError(string $domain, string $errorMessage, int $attempt, Mail $queueForMails): void + private function notifyError(string $domain, string $errorMessage, int $attempt, Mail $queueForMails): void { // Log error into console Console::warning('Cannot renew domain (' . $domain . ') on attempt no. ' . $attempt . ' certificate: ' . $errorMessage); @@ -486,14 +475,14 @@ class Certificates extends Action * * @return void */ - protected function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Callable $realtimeConnection): void + private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions): void { $rule = $dbForConsole->findOne('rules', [ Query::equal('domain', [$domain]), ]); - if ($rule !== false && !$rule->isEmpty()) { + if (!$rule->isEmpty()) { $rule->setAttribute('certificateId', $certificateId); $rule->setAttribute('status', $success ? 'verified' : 'unverified'); $dbForConsole->updateDocument('rules', $rule->getId(), $rule); @@ -536,7 +525,6 @@ class Certificates extends Action project: $project ); Realtime::send( - redis: $realtimeConnection($this->sourceRegion), projectId: 'console', payload: $rule->getArrayCopy(), events: $allEvents, @@ -544,7 +532,6 @@ class Certificates extends Action roles: $target['roles'] ); Realtime::send( - redis: $realtimeConnection($this->sourceRegion), projectId: $project->getId(), payload: $rule->getArrayCopy(), events: $allEvents, diff --git a/src/Appwrite/Platform/Workers/Databases.php b/src/Appwrite/Platform/Workers/Databases.php index 0bb448e4a3..f697e7be13 100644 --- a/src/Appwrite/Platform/Workers/Databases.php +++ b/src/Appwrite/Platform/Workers/Databases.php @@ -17,15 +17,9 @@ use Utopia\Database\Query; use Utopia\Logger\Log; use Utopia\Platform\Action; use Utopia\Queue\Message; -use Utopia\System\System; class Databases extends Action { - /** - * @var array|mixed - */ - protected string $sourceRegion; - public static function getName(): string { return 'databases'; @@ -42,8 +36,7 @@ class Databases extends Action ->inject('dbForConsole') ->inject('dbForProject') ->inject('log') - ->inject('realtimeConnection') - ->callback(fn (Message $message, Database $dbForConsole, Database $dbForProject, Log $log, callable $realtimeConnection) => $this->action($message, $dbForConsole, $dbForProject, $log, $realtimeConnection)); + ->callback(fn (Message $message, Database $dbForConsole, Database $dbForProject, Log $log) => $this->action($message, $dbForConsole, $dbForProject, $log)); } /** @@ -51,11 +44,10 @@ class Databases extends Action * @param Database $dbForConsole * @param Database $dbForProject * @param Log $log - * @param callable $realtimeConnection * @return void * @throws \Exception */ - public function action(Message $message, Database $dbForConsole, Database $dbForProject, Log $log, callable $realtimeConnection): void + public function action(Message $message, Database $dbForConsole, Database $dbForProject, Log $log): void { $payload = $message->getPayload() ?? []; @@ -68,7 +60,6 @@ class Databases extends Action $collection = new Document($payload['collection'] ?? []); $document = new Document($payload['document'] ?? []); $database = new Document($payload['database'] ?? []); - $this->sourceRegion = $payload['sourceRegion'] ?? null; $log->addTag('projectId', $project->getId()); $log->addTag('type', $type); @@ -82,10 +73,10 @@ class Databases extends Action match (\strval($type)) { DATABASE_TYPE_DELETE_DATABASE => $this->deleteDatabase($database, $project, $dbForProject), DATABASE_TYPE_DELETE_COLLECTION => $this->deleteCollection($database, $collection, $project, $dbForProject), - DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject, $realtimeConnection), - DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject, $realtimeConnection), - DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForConsole, $dbForProject, $realtimeConnection), - DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForConsole, $dbForProject, $realtimeConnection), + DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject), + DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject), + DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForConsole, $dbForProject), + DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForConsole, $dbForProject), default => throw new \Exception('No database operation for type: ' . \strval($type)), }; } @@ -97,13 +88,12 @@ class Databases extends Action * @param Document $project * @param Database $dbForConsole * @param Database $dbForProject - * @param callable $realtimeConnection * @return void * @throws Authorization * @throws Conflict * @throws \Exception */ - private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForConsole, Database $dbForProject, callable $realtimeConnection): void + private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForConsole, Database $dbForProject): void { if ($collection->isEmpty()) { throw new Exception('Missing collection'); @@ -144,6 +134,7 @@ class Databases extends Action $options = $attribute->getAttribute('options', []); $project = $dbForConsole->getDocument('projects', $projectId); + try { switch ($type) { case Database::VAR_RELATIONSHIP: @@ -203,7 +194,7 @@ class Databases extends Action ); } } finally { - $this->trigger($database, $collection, $attribute, $project, $projectId, $events, $realtimeConnection); + $this->trigger($database, $collection, $attribute, $project, $projectId, $events); } if ($type === Database::VAR_RELATIONSHIP && $options['twoWay']) { @@ -220,7 +211,6 @@ class Databases extends Action * @param Document $project * @param Database $dbForConsole * @param Database $dbForProject - * @param callable $realtimeConnection * @return void * @throws Authorization * @throws Conflict @@ -305,7 +295,7 @@ class Databases extends Action ); } } finally { - $this->trigger($database, $collection, $attribute, $project, $projectId, $events, $realtimeConnection); + $this->trigger($database, $collection, $attribute, $project, $projectId, $events); } // The underlying database removes/rebuilds indexes when attribute is removed @@ -375,14 +365,13 @@ class Databases extends Action * @param Document $project * @param Database $dbForConsole * @param Database $dbForProject - * @param callable $realtimeConnection * @return void * @throws Authorization * @throws Conflict * @throws Structure * @throws DatabaseException */ - private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject, callable $realtimeConnection): void + private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject): void { if ($collection->isEmpty()) { throw new Exception('Missing collection'); @@ -424,7 +413,7 @@ class Databases extends Action $index->setAttribute('status', 'failed') ); } finally { - $this->trigger($database, $collection, $index, $project, $projectId, $events, $realtimeConnection); + $this->trigger($database, $collection, $index, $project, $projectId, $events); } $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collectionId); @@ -437,14 +426,13 @@ class Databases extends Action * @param Document $project * @param Database $dbForConsole * @param Database $dbForProject - * @param callable $realtimeConnection * @return void * @throws Authorization * @throws Conflict * @throws Structure * @throws DatabaseException */ - private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject, callable $realtimeConnection): void + private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject): void { if ($collection->isEmpty()) { throw new Exception('Missing collection'); @@ -483,7 +471,7 @@ class Databases extends Action $index->setAttribute('status', 'stuck') ); } finally { - $this->trigger($database, $collection, $index, $project, $projectId, $events, $realtimeConnection); + $this->trigger($database, $collection, $index, $project, $projectId, $events); } $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collection->getId()); @@ -606,20 +594,14 @@ class Databases extends Action Console::info("Deleted {$count} document by group in " . ($executionEnd - $executionStart) . " seconds"); } - /** - * @throws \RedisException - * @throws Exception - */ protected function trigger( Document $database, Document $collection, Document $attribute, Document $project, string $projectId, - array $events, - callable $realtimeConnection + array $events ): void { - var_dump('Send message to realtime region='.$this->sourceRegion); $target = Realtime::fromPayload( // Pass first, most verbose event pattern event: $events[0], @@ -627,7 +609,6 @@ class Databases extends Action project: $project, ); Realtime::send( - redis: $realtimeConnection($this->sourceRegion), projectId: 'console', payload: $attribute->getArrayCopy(), events: $events, diff --git a/src/Appwrite/Platform/Workers/Deletes.php b/src/Appwrite/Platform/Workers/Deletes.php index 700a480065..b9ff2e9609 100644 --- a/src/Appwrite/Platform/Workers/Deletes.php +++ b/src/Appwrite/Platform/Workers/Deletes.php @@ -494,7 +494,6 @@ class Deletes extends Action ]; $limit = \count($projectCollectionIds) + 25; - $sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', '')); while (true) { $collections = $dbForProject->listCollections($limit); diff --git a/src/Appwrite/Platform/Workers/Functions.php b/src/Appwrite/Platform/Workers/Functions.php index a315239600..72a3334f2f 100644 --- a/src/Appwrite/Platform/Workers/Functions.php +++ b/src/Appwrite/Platform/Workers/Functions.php @@ -28,11 +28,6 @@ use Utopia\System\System; class Functions extends Action { - /** - * @var mixed|string - */ - protected string $sourceRegion; - public static function getName(): string { return 'functions'; @@ -54,12 +49,10 @@ class Functions extends Action ->inject('queueForUsage') ->inject('log') ->inject('isResourceBlocked') - ->inject('realtimeConnection') - ->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, callable $isResourceBlocked, Callable $realtimeConnection) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage, $log, $isResourceBlocked, $realtimeConnection)); - + ->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage, $log, $isResourceBlocked)); } - public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, callable $isResourceBlocked, Callable $realtimeConnection): void + public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, callable $isResourceBlocked): void { $payload = $message->getPayload() ?? []; @@ -104,8 +97,6 @@ class Functions extends Action return; } - $this->sourceRegion = $payload['sourceRegion'] ?? 'default'; - if ($function->isEmpty() && !empty($functionId)) { $function = $dbForProject->getDocument('functions', $functionId); } @@ -143,7 +134,6 @@ class Functions extends Action Console::success('Iterating function: ' . $function->getAttribute('name')); $this->execute( - realtimeConnection: $realtimeConnection, log: $log, dbForProject: $dbForProject, queueForFunctions: $queueForFunctions, @@ -184,7 +174,6 @@ class Functions extends Action $execution = new Document($payload['execution'] ?? []); $user = new Document($payload['user'] ?? []); $this->execute( - realtimeConnection: $realtimeConnection, log: $log, dbForProject: $dbForProject, queueForFunctions: $queueForFunctions, @@ -207,7 +196,6 @@ class Functions extends Action case 'schedule': $execution = new Document($payload['execution'] ?? []); $this->execute( - realtimeConnection: $realtimeConnection, log: $log, dbForProject: $dbForProject, queueForFunctions: $queueForFunctions, @@ -317,7 +305,6 @@ class Functions extends Action * @throws Conflict */ private function execute( - Callable $realtimeConnection, Log $log, Database $dbForProject, Func $queueForFunctions, @@ -610,7 +597,6 @@ class Functions extends Action project: $project ); Realtime::send( - redis: $realtimeConnection($this->sourceRegion), projectId: 'console', payload: $execution->getArrayCopy(), events: $allEvents, @@ -618,7 +604,6 @@ class Functions extends Action roles: $target['roles'] ); Realtime::send( - redis: $realtimeConnection($this->sourceRegion), projectId: $project->getId(), payload: $execution->getArrayCopy(), events: $allEvents, diff --git a/src/Appwrite/Platform/Workers/Messaging.php b/src/Appwrite/Platform/Workers/Messaging.php index 510fec0431..58f6265ff4 100644 --- a/src/Appwrite/Platform/Workers/Messaging.php +++ b/src/Appwrite/Platform/Workers/Messaging.php @@ -178,7 +178,7 @@ class Messaging extends Action Query::equal('type', [$providerType]), ]); - if ($default === false || $default->isEmpty()) { + if ($default->isEmpty()) { $dbForProject->updateDocument('messages', $message->getId(), $message->setAttributes([ 'status' => MessageStatus::FAILED, 'deliveryErrors' => ['No enabled provider found.'] diff --git a/src/Appwrite/Platform/Workers/Migrations.php b/src/Appwrite/Platform/Workers/Migrations.php index 283c716379..d430d0eb67 100644 --- a/src/Appwrite/Platform/Workers/Migrations.php +++ b/src/Appwrite/Platform/Workers/Migrations.php @@ -36,10 +36,6 @@ class Migrations extends Action protected Database $dbForConsole; protected Document $project; - /** - * @var string - */ - protected string $sourceRegion; public static function getName(): string { @@ -57,14 +53,13 @@ class Migrations extends Action ->inject('dbForProject') ->inject('dbForConsole') ->inject('log') - ->inject('realtimeConnection') - ->callback(fn (Message $message, Database $dbForProject, Database $dbForConsole, Log $log, Callable $realtimeConnection) => $this->action($message, $dbForProject, $dbForConsole, $log, $realtimeConnection)); + ->callback(fn (Message $message, Database $dbForProject, Database $dbForConsole, Log $log) => $this->action($message, $dbForProject, $dbForConsole, $log)); } /** * @throws Exception */ - public function action(Message $message, Database $dbForProject, Database $dbForConsole, Log $log, Callable $realtimeConnection): void + public function action(Message $message, Database $dbForProject, Database $dbForConsole, Log $log): void { $payload = $message->getPayload() ?? []; @@ -80,7 +75,6 @@ class Migrations extends Action return; } - $this->sourceRegion = $payload['sourceRegion'] ?? 'default'; $this->dbForProject = $dbForProject; $this->dbForConsole = $dbForConsole; $this->project = $project; @@ -95,7 +89,7 @@ class Migrations extends Action $log->addTag('migrationId', $migration->getId()); $log->addTag('projectId', $project->getId()); - $this->processMigration($migration, $log, $realtimeConnection); + $this->processMigration($migration, $log); } /** @@ -130,7 +124,7 @@ class Migrations extends Action ), SourceAppwrite::getName() => new SourceAppwrite( $credentials['projectId'], - $credentials['endpoint'], + $credentials['endpoint'] === 'http://localhost/v1' ? 'http://appwrite/v1' : $credentials['endpoint'], $credentials['apiKey'], ), default => throw new \Exception('Invalid source type'), @@ -140,16 +134,15 @@ class Migrations extends Action /** * @throws Exception */ - protected function processDestination(Document $migration): Destination + protected function processDestination(Document $migration, string $apiKey): Destination { $destination = $migration->getAttribute('destination'); - $credentials = $migration->getAttribute('credentials'); return match ($destination) { DestinationAppwrite::getName() => new DestinationAppwrite( - $credentials['projectId'], - $credentials['endpoint'], - $credentials['apiKey'], + $this->project->getId(), + 'http://appwrite/v1', + $apiKey, $this->dbForProject, Config::getParam('collections', [])['databases']['collections'], ), @@ -164,7 +157,7 @@ class Migrations extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function updateMigrationDocument(Document $migration, Document $project, Callable $realtimeConnection): Document + protected function updateMigrationDocument(Document $migration, Document $project): Document { /** Trigger Realtime */ $allEvents = Event::generateEvents('migrations.[migrationId].update', [ @@ -178,7 +171,6 @@ class Migrations extends Action ); Realtime::send( - redis: $realtimeConnection($this->sourceRegion), projectId: 'console', payload: $migration->getArrayCopy(), events: $allEvents, @@ -187,7 +179,6 @@ class Migrations extends Action ); Realtime::send( - redis: $realtimeConnection($this->sourceRegion), projectId: $project->getId(), payload: $migration->getArrayCopy(), events: $allEvents, @@ -261,11 +252,6 @@ class Migrations extends Action } /** - * @param Document $project - * @param Document $migration - * @param Log $log - * @param callable $realtimeConnection - * @return void * @throws Authorization * @throws Conflict * @throws Restricted @@ -273,7 +259,7 @@ class Migrations extends Action * @throws \Utopia\Database\Exception * @throws Exception */ - protected function processMigration(Document $migration, Log $log, Callable $realtimeConnection): void + protected function processMigration(Document $migration, Log $log): void { $project = $this->project; $projectDocument = $this->dbForConsole->getDocument('projects', $project->getId()); @@ -285,8 +271,8 @@ class Migrations extends Action $migration = $this->dbForProject->getDocument('migrations', $migration->getId()); if ( - $migration->getAttribute('source') === SourceAppwrite::getName() || - $migration->getAttribute('destination') === DestinationAppwrite::getName() + $migration->getAttribute('source') === SourceAppwrite::getName() && + empty($migration->getAttribute('credentials', [])) ) { $credentials = $migration->getAttribute('credentials', []); @@ -299,12 +285,12 @@ class Migrations extends Action $migration->setAttribute('stage', 'processing'); $migration->setAttribute('status', 'processing'); - $this->updateMigrationDocument($migration, $projectDocument, $realtimeConnection); + $this->updateMigrationDocument($migration, $projectDocument); $log->addTag('type', $migration->getAttribute('source')); $source = $this->processSource($migration); - $destination = $this->processDestination($migration); + $destination = $this->processDestination($migration, $tempAPIKey->getAttribute('secret')); $source->report(); @@ -315,14 +301,14 @@ class Migrations extends Action /** Start Transfer */ $migration->setAttribute('stage', 'migrating'); - $this->updateMigrationDocument($migration, $projectDocument, $realtimeConnection); + $this->updateMigrationDocument($migration, $projectDocument); $transfer->run( $migration->getAttribute('resources'), function () use ($migration, $transfer, $projectDocument) { $migration->setAttribute('resourceData', json_encode($transfer->getCache())); $migration->setAttribute('statusCounters', json_encode($transfer->getStatusCounters())); - $this->updateMigrationDocument($migration, $projectDocument, $realtimeConnection); + $this->updateMigrationDocument($migration, $projectDocument); }, $migration->getAttribute('resourceId'), $migration->getAttribute('resourceType') @@ -361,7 +347,7 @@ class Migrations extends Action $migration->setAttribute('errors', $errorMessages); $log->addExtra('migrationErrors', json_encode($errorMessages)); - $this->updateMigrationDocument($migration, $projectDocument, $realtimeConnection); + $this->updateMigrationDocument($migration, $projectDocument); return; } @@ -402,7 +388,7 @@ class Migrations extends Action $this->removeAPIKey($tempAPIKey); } - $this->updateMigrationDocument($migration, $projectDocument, $realtimeConnection); + $this->updateMigrationDocument($migration, $projectDocument); if ($migration->getAttribute('status', '') === 'failed') { Console::error('Migration('.$migration->getInternalId().':'.$migration->getId().') failed, Project('.$this->project->getInternalId().':'.$this->project->getId().')');