diff --git a/app/cli.php b/app/cli.php index c829546011..86ec241c93 100644 --- a/app/cli.php +++ b/app/cli.php @@ -188,16 +188,18 @@ CLI::setResource('getLogsDB', function (Group $pools, Cache $cache) { return $database; }; }, ['pools', 'cache']); - +CLI::setResource('publisher', function (Group $pools) { + return new BrokerPool(publisher: $pools->get('publisher')); +}, ['pools']); +CLI::setResource('publisherRedis', function () { + // Stub +}); CLI::setResource('queueForStatsUsage', function (Publisher $publisher) { return new StatsUsage($publisher); }, ['publisher']); CLI::setResource('queueForStatsResources', function (Publisher $publisher) { return new StatsResources($publisher); }, ['publisher']); -CLI::setResource('publisher', function (Group $pools) { - return new BrokerPool(publisher: $pools->get('publisher')); -}, ['pools']); CLI::setResource('queueForFunctions', function (Publisher $publisher) { return new Func($publisher); }, ['publisher']); diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index 2bdaea3c2c..43368b8d3f 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -523,11 +523,16 @@ App::get('/v1/health/queue/databases') ->param('name', 'database_db_main', new Text(256), 'Queue name for which to check the queue size', true) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) ->inject('publisher') + ->inject('publisherRedis') ->inject('response') - ->action(function (string $name, int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (string $name, int|string $threshold, Publisher $publisher, ?Publisher $publisherRedis, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue($name)); + $isRedisFallback = \str_contains(System::getEnv('_APP_WORKER_REDIS_FALLBACK', ''), 'databases'); + + $size = $isRedisFallback + ? $publisherRedis->getQueueSize(new Queue($name)) + : $publisher->getQueueSize(new Queue($name)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -655,11 +660,16 @@ App::get('/v1/health/queue/migrations') )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) ->inject('publisher') + ->inject('publisherRedis') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, Publisher $publisher, ?Publisher $publisherRedis, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::MIGRATIONS_QUEUE_NAME)); + $isRedisFallback = \str_contains(System::getEnv('_APP_WORKER_REDIS_FALLBACK', ''), 'migrations'); + + $size = $isRedisFallback + ? $publisherRedis->getQueueSize(new Queue(Event::MIGRATIONS_QUEUE_NAME)) + : $publisher->getQueueSize(new Queue(Event::MIGRATIONS_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); diff --git a/app/init/resources.php b/app/init/resources.php index aa04b46e1f..339106291b 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -79,9 +79,15 @@ App::setResource('localeCodes', function () { App::setResource('publisher', function (Group $pools) { return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); +App::setResource('publisherRedis', function () { + // Stub +}); App::setResource('consumer', function (Group $pools) { return new BrokerPool(consumer: $pools->get('consumer')); }, ['pools']); +App::setResource('consumerRedis', function () { + // Stub +}); App::setResource('queueForMessaging', function (Publisher $publisher) { return new Messaging($publisher); }, ['publisher']); @@ -222,7 +228,9 @@ App::setResource('user', function ($mode, $project, $console, $request, $respons Auth::$unique = $session['id'] ?? ''; Auth::$secret = $session['secret'] ?? ''; - if (APP_MODE_ADMIN !== $mode) { + if ($mode === APP_MODE_ADMIN) { + $user = $dbForPlatform->getDocument('users', Auth::$unique); + } else { if ($project->isEmpty()) { $user = new Document([]); } else { @@ -232,8 +240,6 @@ App::setResource('user', function ($mode, $project, $console, $request, $respons $user = $dbForProject->getDocument('users', Auth::$unique); } } - } else { - $user = $dbForPlatform->getDocument('users', Auth::$unique); } if ( @@ -264,7 +270,11 @@ App::setResource('user', function ($mode, $project, $console, $request, $respons $jwtUserId = $payload['userId'] ?? ''; if (!empty($jwtUserId)) { - $user = $dbForProject->getDocument('users', $jwtUserId); + if ($mode === APP_MODE_ADMIN) { + $user = $dbForPlatform->getDocument('users', $jwtUserId); + } else { + $user = $dbForProject->getDocument('users', $jwtUserId); + } } $jwtSessionId = $payload['sessionId'] ?? ''; diff --git a/app/worker.php b/app/worker.php index 845914c923..4f0f569a9e 100644 --- a/app/worker.php +++ b/app/worker.php @@ -247,10 +247,18 @@ Server::setResource('publisher', function (Group $pools) { return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); +Server::setResource('publisherRedis', function () { + // Stub +}); + Server::setResource('consumer', function (Group $pools) { return new BrokerPool(consumer: $pools->get('consumer')); }, ['pools']); +Server::setResource('consumerRedis', function () { + // Stub +}); + Server::setResource('queueForStatsUsage', function (Publisher $publisher) { return new StatsUsage($publisher); }, ['publisher']); diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index b2134707e7..7686815868 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -26,6 +26,7 @@ abstract class ScheduleBase extends Action protected array $schedules = []; protected BrokerPool $publisher; + protected ?BrokerPool $publisherRedis = null; private ?Histogram $collectSchedulesTelemetryDuration = null; private ?Gauge $collectSchedulesTelemetryCount = null; @@ -72,6 +73,13 @@ abstract class ScheduleBase extends Action Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); $this->publisher = new BrokerPool($pools->get('publisher')); + + try { + $this->publisherRedis = new BrokerPool($pools->get('publisherRedis')); + } catch (\Throwable) { + $this->publisherRedis = null; + } + $this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count'); $this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's'); $this->collectSchedulesTelemetryCount = $telemetry->createGauge('task.schedule.collect_schedules.count'); diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index 27a7c1dbf1..acb2dd3070 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -6,6 +6,7 @@ use Appwrite\Event\Func; use Swoole\Coroutine as Co; use Utopia\Database\Database; use Utopia\Pools\Group; +use Utopia\System\System; class ScheduleExecutions extends ScheduleBase { @@ -29,9 +30,16 @@ class ScheduleExecutions extends ScheduleBase protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void { - $queueForFunctions = new Func($this->publisher); $intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds'); + $isRedisFallback = \str_contains(System::getEnv('_APP_WORKER_REDIS_FALLBACK', ''), 'functions'); + + $queueForFunctions = new Func( + $isRedisFallback + ? $this->publisherRedis + : $this->publisher + ); + foreach ($this->schedules as $schedule) { if (!$schedule['active']) { $dbForPlatform->deleteDocument( diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index 7812b27832..7a3363d74d 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -8,6 +8,7 @@ use Utopia\CLI\Console; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Pools\Group; +use Utopia\System\System; class ScheduleFunctions extends ScheduleBase { @@ -90,7 +91,13 @@ class ScheduleFunctions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - $queueForFunctions = new Func($this->publisher); + $isRedisFallback = \str_contains(System::getEnv('_APP_WORKER_REDIS_FALLBACK', ''), 'functions'); + + $queueForFunctions = new Func( + $isRedisFallback + ? $this->publisherRedis + : $this->publisher + ); $queueForFunctions ->setType('schedule')