Merge branch '1.7.x' into feat-extend-function-headers

This commit is contained in:
Khushboo Verma 2025-08-26 12:56:22 +05:30
commit 2ef79d5300
6 changed files with 28 additions and 24 deletions

View file

@ -203,6 +203,9 @@ CLI::setResource('publisherMigrations', function (BrokerPool $publisher) {
CLI::setResource('publisherStatsUsage', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
CLI::setResource('publisherMessaging', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
CLI::setResource('queueForStatsUsage', function (Publisher $publisher) {
return new StatsUsage($publisher);
}, ['publisher']);

View file

@ -556,12 +556,12 @@ App::get('/v1/health/queue/deletes')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisher')
->inject('publisherDeletes')
->inject('response')
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
->action(function (int|string $threshold, Publisher $publisherDeletes, Response $response) {
$threshold = \intval($threshold);
$size = $publisher->getQueueSize(new Queue(Event::DELETE_QUEUE_NAME));
$size = $publisherDeletes->getQueueSize(new Queue(Event::DELETE_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -589,12 +589,12 @@ App::get('/v1/health/queue/mails')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisher')
->inject('publisherMails')
->inject('response')
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
->action(function (int|string $threshold, Publisher $publisherMails, Response $response) {
$threshold = \intval($threshold);
$size = $publisher->getQueueSize(new Queue(Event::MAILS_QUEUE_NAME));
$size = $publisherMails->getQueueSize(new Queue(Event::MAILS_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -622,12 +622,12 @@ App::get('/v1/health/queue/messaging')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisher')
->inject('publisherMessaging')
->inject('response')
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
->action(function (int|string $threshold, Publisher $publisherMessaging, Response $response) {
$threshold = \intval($threshold);
$size = $publisher->getQueueSize(new Queue(Event::MESSAGING_QUEUE_NAME));
$size = $publisherMessaging->getQueueSize(new Queue(Event::MESSAGING_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");

View file

@ -96,20 +96,14 @@ App::setResource('publisherMigrations', function (BrokerPool $publisher) {
App::setResource('publisherStatsUsage', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
App::setResource('consumer', function (Group $pools) {
return new BrokerPool(consumer: $pools->get('consumer'));
}, ['pools']);
App::setResource('consumerDatabases', function (BrokerPool $consumer) {
return $consumer;
}, ['consumer']);
App::setResource('consumerFunctions', function (BrokerPool $consumer) {
return $consumer;
}, ['consumer']);
App::setResource('consumerMigrations', function (BrokerPool $consumer) {
return $consumer;
App::setResource('publisherMails', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
App::setResource('consumerStatsUsage', function (BrokerPool $consumer) {
return $consumer;
App::setResource('publisherDeletes', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
App::setResource('publisherMessaging', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
App::setResource('queueForMessaging', function (Publisher $publisher) {
return new Messaging($publisher);

View file

@ -263,6 +263,10 @@ Server::setResource('publisherStatsUsage', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
Server::setResource('publisherMessaging', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
Server::setResource('consumer', function (Group $pools) {
return new BrokerPool(consumer: $pools->get('consumer'));
}, ['pools']);

View file

@ -27,6 +27,7 @@ abstract class ScheduleBase extends Action
protected BrokerPool $publisher;
protected BrokerPool $publisherMigrations;
protected BrokerPool $publisherFunctions;
protected BrokerPool $publisherMessaging;
private ?Histogram $collectSchedulesTelemetryDuration = null;
private ?Gauge $collectSchedulesTelemetryCount = null;
@ -47,6 +48,7 @@ abstract class ScheduleBase extends Action
->inject('publisher')
->inject('publisherMigrations')
->inject('publisherFunctions')
->inject('publisherMessaging')
->inject('dbForPlatform')
->inject('getProjectDB')
->inject('telemetry')
@ -69,7 +71,7 @@ abstract class ScheduleBase extends Action
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
*/
public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void
public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, BrokerPool $publisherMessaging, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void
{
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
@ -77,6 +79,7 @@ abstract class ScheduleBase extends Action
$this->publisher = $publisher;
$this->publisherMigrations = $publisherMigrations;
$this->publisherFunctions = $publisherFunctions;
$this->publisherMessaging = $publisherMessaging;
$this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count');
$this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's');

View file

@ -40,7 +40,7 @@ class ScheduleMessages extends ScheduleBase
}
\go(function () use ($schedule, $scheduledAt, $dbForPlatform) {
$queueForMessaging = new Messaging($this->publisher);
$queueForMessaging = new Messaging($this->publisherMessaging);
$this->updateProjectAccess($schedule['project'], $dbForPlatform);