diff --git a/app/cli.php b/app/cli.php index 2ad37c0ce2..0f98cf3458 100644 --- a/app/cli.php +++ b/app/cli.php @@ -203,6 +203,9 @@ CLI::setResource('publisherMigrations', function (BrokerPool $publisher) { CLI::setResource('publisherStatsUsage', function (BrokerPool $publisher) { return $publisher; }, ['publisher']); +CLI::setResource('publisherMessaging', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); CLI::setResource('queueForStatsUsage', function (Publisher $publisher) { return new StatsUsage($publisher); }, ['publisher']); diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index 975db909c6..fb084fddb3 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -556,12 +556,12 @@ App::get('/v1/health/queue/deletes') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') + ->inject('publisherDeletes') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, Publisher $publisherDeletes, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::DELETE_QUEUE_NAME)); + $size = $publisherDeletes->getQueueSize(new Queue(Event::DELETE_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -589,12 +589,12 @@ App::get('/v1/health/queue/mails') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') + ->inject('publisherMails') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, Publisher $publisherMails, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::MAILS_QUEUE_NAME)); + $size = $publisherMails->getQueueSize(new Queue(Event::MAILS_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -622,12 +622,12 @@ App::get('/v1/health/queue/messaging') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') + ->inject('publisherMessaging') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, Publisher $publisherMessaging, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::MESSAGING_QUEUE_NAME)); + $size = $publisherMessaging->getQueueSize(new Queue(Event::MESSAGING_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); diff --git a/app/init/resources.php b/app/init/resources.php index 9ae132d30c..9a91e50491 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -96,20 +96,14 @@ App::setResource('publisherMigrations', function (BrokerPool $publisher) { App::setResource('publisherStatsUsage', function (BrokerPool $publisher) { return $publisher; }, ['publisher']); -App::setResource('consumer', function (Group $pools) { - return new BrokerPool(consumer: $pools->get('consumer')); -}, ['pools']); -App::setResource('consumerDatabases', function (BrokerPool $consumer) { - return $consumer; -}, ['consumer']); -App::setResource('consumerFunctions', function (BrokerPool $consumer) { - return $consumer; -}, ['consumer']); -App::setResource('consumerMigrations', function (BrokerPool $consumer) { - return $consumer; +App::setResource('publisherMails', function (BrokerPool $publisher) { + return $publisher; }, ['publisher']); -App::setResource('consumerStatsUsage', function (BrokerPool $consumer) { - return $consumer; +App::setResource('publisherDeletes', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); +App::setResource('publisherMessaging', function (BrokerPool $publisher) { + return $publisher; }, ['publisher']); App::setResource('queueForMessaging', function (Publisher $publisher) { return new Messaging($publisher); diff --git a/app/worker.php b/app/worker.php index cc8aca2d8e..33cada5e93 100644 --- a/app/worker.php +++ b/app/worker.php @@ -263,6 +263,10 @@ Server::setResource('publisherStatsUsage', function (BrokerPool $publisher) { return $publisher; }, ['publisher']); +Server::setResource('publisherMessaging', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); + Server::setResource('consumer', function (Group $pools) { return new BrokerPool(consumer: $pools->get('consumer')); }, ['pools']); diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index 1710f0163d..5cd25b09b4 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -27,6 +27,7 @@ abstract class ScheduleBase extends Action protected BrokerPool $publisher; protected BrokerPool $publisherMigrations; protected BrokerPool $publisherFunctions; + protected BrokerPool $publisherMessaging; private ?Histogram $collectSchedulesTelemetryDuration = null; private ?Gauge $collectSchedulesTelemetryCount = null; @@ -47,6 +48,7 @@ abstract class ScheduleBase extends Action ->inject('publisher') ->inject('publisherMigrations') ->inject('publisherFunctions') + ->inject('publisherMessaging') ->inject('dbForPlatform') ->inject('getProjectDB') ->inject('telemetry') @@ -69,7 +71,7 @@ abstract class ScheduleBase extends Action * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker. */ - public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void + public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, BrokerPool $publisherMessaging, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void { Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); @@ -77,6 +79,7 @@ abstract class ScheduleBase extends Action $this->publisher = $publisher; $this->publisherMigrations = $publisherMigrations; $this->publisherFunctions = $publisherFunctions; + $this->publisherMessaging = $publisherMessaging; $this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count'); $this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's'); diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index c4e1376ff9..fe4afbe69c 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -40,7 +40,7 @@ class ScheduleMessages extends ScheduleBase } \go(function () use ($schedule, $scheduledAt, $dbForPlatform) { - $queueForMessaging = new Messaging($this->publisher); + $queueForMessaging = new Messaging($this->publisherMessaging); $this->updateProjectAccess($schedule['project'], $dbForPlatform);