Merge branch '1.7.x' into chore-update-env-var-used-for-domain

This commit is contained in:
Khushboo Verma 2025-07-04 10:40:38 +05:30 committed by GitHub
commit 131cdefac5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 67 additions and 14 deletions

View file

@ -188,16 +188,18 @@ CLI::setResource('getLogsDB', function (Group $pools, Cache $cache) {
return $database;
};
}, ['pools', 'cache']);
CLI::setResource('publisher', function (Group $pools) {
return new BrokerPool(publisher: $pools->get('publisher'));
}, ['pools']);
CLI::setResource('publisherRedis', function () {
// Stub
});
CLI::setResource('queueForStatsUsage', function (Publisher $publisher) {
return new StatsUsage($publisher);
}, ['publisher']);
CLI::setResource('queueForStatsResources', function (Publisher $publisher) {
return new StatsResources($publisher);
}, ['publisher']);
CLI::setResource('publisher', function (Group $pools) {
return new BrokerPool(publisher: $pools->get('publisher'));
}, ['pools']);
CLI::setResource('queueForFunctions', function (Publisher $publisher) {
return new Func($publisher);
}, ['publisher']);

View file

@ -523,11 +523,16 @@ App::get('/v1/health/queue/databases')
->param('name', 'database_db_main', new Text(256), 'Queue name for which to check the queue size', true)
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisher')
->inject('publisherRedis')
->inject('response')
->action(function (string $name, int|string $threshold, Publisher $publisher, Response $response) {
->action(function (string $name, int|string $threshold, Publisher $publisher, ?Publisher $publisherRedis, Response $response) {
$threshold = \intval($threshold);
$size = $publisher->getQueueSize(new Queue($name));
$isRedisFallback = \str_contains(System::getEnv('_APP_WORKER_REDIS_FALLBACK', ''), 'databases');
$size = $isRedisFallback
? $publisherRedis->getQueueSize(new Queue($name))
: $publisher->getQueueSize(new Queue($name));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -655,11 +660,16 @@ App::get('/v1/health/queue/migrations')
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisher')
->inject('publisherRedis')
->inject('response')
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
->action(function (int|string $threshold, Publisher $publisher, ?Publisher $publisherRedis, Response $response) {
$threshold = \intval($threshold);
$size = $publisher->getQueueSize(new Queue(Event::MIGRATIONS_QUEUE_NAME));
$isRedisFallback = \str_contains(System::getEnv('_APP_WORKER_REDIS_FALLBACK', ''), 'migrations');
$size = $isRedisFallback
? $publisherRedis->getQueueSize(new Queue(Event::MIGRATIONS_QUEUE_NAME))
: $publisher->getQueueSize(new Queue(Event::MIGRATIONS_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");

View file

@ -79,9 +79,15 @@ App::setResource('localeCodes', function () {
App::setResource('publisher', function (Group $pools) {
return new BrokerPool(publisher: $pools->get('publisher'));
}, ['pools']);
App::setResource('publisherRedis', function () {
// Stub
});
App::setResource('consumer', function (Group $pools) {
return new BrokerPool(consumer: $pools->get('consumer'));
}, ['pools']);
App::setResource('consumerRedis', function () {
// Stub
});
App::setResource('queueForMessaging', function (Publisher $publisher) {
return new Messaging($publisher);
}, ['publisher']);
@ -222,7 +228,9 @@ App::setResource('user', function ($mode, $project, $console, $request, $respons
Auth::$unique = $session['id'] ?? '';
Auth::$secret = $session['secret'] ?? '';
if (APP_MODE_ADMIN !== $mode) {
if ($mode === APP_MODE_ADMIN) {
$user = $dbForPlatform->getDocument('users', Auth::$unique);
} else {
if ($project->isEmpty()) {
$user = new Document([]);
} else {
@ -232,8 +240,6 @@ App::setResource('user', function ($mode, $project, $console, $request, $respons
$user = $dbForProject->getDocument('users', Auth::$unique);
}
}
} else {
$user = $dbForPlatform->getDocument('users', Auth::$unique);
}
if (
@ -264,7 +270,11 @@ App::setResource('user', function ($mode, $project, $console, $request, $respons
$jwtUserId = $payload['userId'] ?? '';
if (!empty($jwtUserId)) {
$user = $dbForProject->getDocument('users', $jwtUserId);
if ($mode === APP_MODE_ADMIN) {
$user = $dbForPlatform->getDocument('users', $jwtUserId);
} else {
$user = $dbForProject->getDocument('users', $jwtUserId);
}
}
$jwtSessionId = $payload['sessionId'] ?? '';

View file

@ -247,10 +247,18 @@ Server::setResource('publisher', function (Group $pools) {
return new BrokerPool(publisher: $pools->get('publisher'));
}, ['pools']);
Server::setResource('publisherRedis', function () {
// Stub
});
Server::setResource('consumer', function (Group $pools) {
return new BrokerPool(consumer: $pools->get('consumer'));
}, ['pools']);
Server::setResource('consumerRedis', function () {
// Stub
});
Server::setResource('queueForStatsUsage', function (Publisher $publisher) {
return new StatsUsage($publisher);
}, ['publisher']);

View file

@ -26,6 +26,7 @@ abstract class ScheduleBase extends Action
protected array $schedules = [];
protected BrokerPool $publisher;
protected ?BrokerPool $publisherRedis = null;
private ?Histogram $collectSchedulesTelemetryDuration = null;
private ?Gauge $collectSchedulesTelemetryCount = null;
@ -72,6 +73,13 @@ abstract class ScheduleBase extends Action
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
$this->publisher = new BrokerPool($pools->get('publisher'));
try {
$this->publisherRedis = new BrokerPool($pools->get('publisherRedis'));
} catch (\Throwable) {
$this->publisherRedis = null;
}
$this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count');
$this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's');
$this->collectSchedulesTelemetryCount = $telemetry->createGauge('task.schedule.collect_schedules.count');

View file

@ -6,6 +6,7 @@ use Appwrite\Event\Func;
use Swoole\Coroutine as Co;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Utopia\System\System;
class ScheduleExecutions extends ScheduleBase
{
@ -29,9 +30,16 @@ class ScheduleExecutions extends ScheduleBase
protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
{
$queueForFunctions = new Func($this->publisher);
$intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds');
$isRedisFallback = \str_contains(System::getEnv('_APP_WORKER_REDIS_FALLBACK', ''), 'functions');
$queueForFunctions = new Func(
$isRedisFallback
? $this->publisherRedis
: $this->publisher
);
foreach ($this->schedules as $schedule) {
if (!$schedule['active']) {
$dbForPlatform->deleteDocument(

View file

@ -8,6 +8,7 @@ use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Pools\Group;
use Utopia\System\System;
class ScheduleFunctions extends ScheduleBase
{
@ -90,7 +91,13 @@ class ScheduleFunctions extends ScheduleBase
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
$queueForFunctions = new Func($this->publisher);
$isRedisFallback = \str_contains(System::getEnv('_APP_WORKER_REDIS_FALLBACK', ''), 'functions');
$queueForFunctions = new Func(
$isRedisFallback
? $this->publisherRedis
: $this->publisher
);
$queueForFunctions
->setType('schedule')