From 52e148761cc82708ebd1c2658af0fb051b4132d1 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Mon, 25 Aug 2025 17:59:32 +0100 Subject: [PATCH] chore: allow publisherMessaging override --- app/cli.php | 3 +++ app/worker.php | 4 ++++ src/Appwrite/Platform/Tasks/ScheduleBase.php | 5 ++++- src/Appwrite/Platform/Tasks/ScheduleMessages.php | 2 +- 4 files changed, 12 insertions(+), 2 deletions(-) diff --git a/app/cli.php b/app/cli.php index 2ad37c0ce2..0f98cf3458 100644 --- a/app/cli.php +++ b/app/cli.php @@ -203,6 +203,9 @@ CLI::setResource('publisherMigrations', function (BrokerPool $publisher) { CLI::setResource('publisherStatsUsage', function (BrokerPool $publisher) { return $publisher; }, ['publisher']); +CLI::setResource('publisherMessaging', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); CLI::setResource('queueForStatsUsage', function (Publisher $publisher) { return new StatsUsage($publisher); }, ['publisher']); diff --git a/app/worker.php b/app/worker.php index cc8aca2d8e..33cada5e93 100644 --- a/app/worker.php +++ b/app/worker.php @@ -263,6 +263,10 @@ Server::setResource('publisherStatsUsage', function (BrokerPool $publisher) { return $publisher; }, ['publisher']); +Server::setResource('publisherMessaging', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); + Server::setResource('consumer', function (Group $pools) { return new BrokerPool(consumer: $pools->get('consumer')); }, ['pools']); diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index 1710f0163d..5cd25b09b4 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -27,6 +27,7 @@ abstract class ScheduleBase extends Action protected BrokerPool $publisher; protected BrokerPool $publisherMigrations; protected BrokerPool $publisherFunctions; + protected BrokerPool $publisherMessaging; private ?Histogram $collectSchedulesTelemetryDuration = null; private ?Gauge $collectSchedulesTelemetryCount = null; @@ -47,6 +48,7 @@ abstract class ScheduleBase extends Action ->inject('publisher') ->inject('publisherMigrations') ->inject('publisherFunctions') + ->inject('publisherMessaging') ->inject('dbForPlatform') ->inject('getProjectDB') ->inject('telemetry') @@ -69,7 +71,7 @@ abstract class ScheduleBase extends Action * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker. */ - public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void + public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, BrokerPool $publisherMessaging, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void { Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); @@ -77,6 +79,7 @@ abstract class ScheduleBase extends Action $this->publisher = $publisher; $this->publisherMigrations = $publisherMigrations; $this->publisherFunctions = $publisherFunctions; + $this->publisherMessaging = $publisherMessaging; $this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count'); $this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's'); diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index c4e1376ff9..fe4afbe69c 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -40,7 +40,7 @@ class ScheduleMessages extends ScheduleBase } \go(function () use ($schedule, $scheduledAt, $dbForPlatform) { - $queueForMessaging = new Messaging($this->publisher); + $queueForMessaging = new Messaging($this->publisherMessaging); $this->updateProjectAccess($schedule['project'], $dbForPlatform);