Merge pull request #10370 from appwrite/chore-allow-publisherMessaging-override

chore: allow publisher messaging override in scheduler
This commit is contained in:
Luke B. Silver 2025-08-25 18:43:21 +01:00 committed by GitHub
commit 5436c16f85
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 12 additions and 2 deletions

View file

@ -203,6 +203,9 @@ CLI::setResource('publisherMigrations', function (BrokerPool $publisher) {
CLI::setResource('publisherStatsUsage', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
CLI::setResource('publisherMessaging', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
CLI::setResource('queueForStatsUsage', function (Publisher $publisher) {
return new StatsUsage($publisher);
}, ['publisher']);

View file

@ -263,6 +263,10 @@ Server::setResource('publisherStatsUsage', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
Server::setResource('publisherMessaging', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
Server::setResource('consumer', function (Group $pools) {
return new BrokerPool(consumer: $pools->get('consumer'));
}, ['pools']);

View file

@ -27,6 +27,7 @@ abstract class ScheduleBase extends Action
protected BrokerPool $publisher;
protected BrokerPool $publisherMigrations;
protected BrokerPool $publisherFunctions;
protected BrokerPool $publisherMessaging;
private ?Histogram $collectSchedulesTelemetryDuration = null;
private ?Gauge $collectSchedulesTelemetryCount = null;
@ -47,6 +48,7 @@ abstract class ScheduleBase extends Action
->inject('publisher')
->inject('publisherMigrations')
->inject('publisherFunctions')
->inject('publisherMessaging')
->inject('dbForPlatform')
->inject('getProjectDB')
->inject('telemetry')
@ -69,7 +71,7 @@ abstract class ScheduleBase extends Action
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
*/
public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void
public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, BrokerPool $publisherMessaging, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void
{
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
@ -77,6 +79,7 @@ abstract class ScheduleBase extends Action
$this->publisher = $publisher;
$this->publisherMigrations = $publisherMigrations;
$this->publisherFunctions = $publisherFunctions;
$this->publisherMessaging = $publisherMessaging;
$this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count');
$this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's');

View file

@ -40,7 +40,7 @@ class ScheduleMessages extends ScheduleBase
}
\go(function () use ($schedule, $scheduledAt, $dbForPlatform) {
$queueForMessaging = new Messaging($this->publisher);
$queueForMessaging = new Messaging($this->publisherMessaging);
$this->updateProjectAccess($schedule['project'], $dbForPlatform);