diff --git a/app/cli.php b/app/cli.php index 504e4fb5e6..2ad37c0ce2 100644 --- a/app/cli.php +++ b/app/cli.php @@ -194,6 +194,9 @@ CLI::setResource('publisher', function (Group $pools) { CLI::setResource('publisherDatabases', function (BrokerPool $publisher) { return $publisher; }, ['publisher']); +CLI::setResource('publisherFunctions', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); CLI::setResource('publisherMigrations', function (BrokerPool $publisher) { return $publisher; }, ['publisher']); diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index 39ebae9590..32b77433f0 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -686,12 +686,12 @@ App::get('/v1/health/queue/functions') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') + ->inject('publisherFunctions') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, Publisher $publisherFunctions, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::FUNCTIONS_QUEUE_NAME)); + $size = $publisherFunctions->getQueueSize(new Queue(Event::FUNCTIONS_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); diff --git a/app/init/resources.php b/app/init/resources.php index 162eab1973..1d3b25062c 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -83,6 +83,9 @@ App::setResource('publisher', function (Group $pools) { App::setResource('publisherDatabases', function (BrokerPool $publisher) { return $publisher; }, ['publisher']); +App::setResource('publisherFunctions', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); App::setResource('publisherMigrations', function (BrokerPool $publisher) { return $publisher; }, ['publisher']); @@ -95,6 +98,9 @@ App::setResource('consumer', function (Group $pools) { App::setResource('consumerDatabases', function (BrokerPool $consumer) { return $consumer; }, ['consumer']); +App::setResource('consumerFunctions', function (BrokerPool $consumer) { + return $consumer; +}, ['consumer']); App::setResource('consumerMigrations', function (BrokerPool $consumer) { return $consumer; }, ['publisher']); diff --git a/app/worker.php b/app/worker.php index 90f3368fe7..cc8aca2d8e 100644 --- a/app/worker.php +++ b/app/worker.php @@ -251,6 +251,10 @@ Server::setResource('publisherDatabases', function (BrokerPool $publisher) { return $publisher; }, ['publisher']); +Server::setResource('publisherFunctions', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); + Server::setResource('publisherMigrations', function (BrokerPool $publisher) { return $publisher; }, ['publisher']); @@ -267,6 +271,10 @@ Server::setResource('consumerDatabases', function (BrokerPool $consumer) { return $consumer; }, ['consumer']); +Server::setResource('consumerFunctions', function (BrokerPool $consumer) { + return $consumer; +}, ['consumer']); + Server::setResource('consumerMigrations', function (BrokerPool $consumer) { return $consumer; }, ['consumer']); diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index 222051a67f..1710f0163d 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -26,6 +26,7 @@ abstract class ScheduleBase extends Action protected BrokerPool $publisher; protected BrokerPool $publisherMigrations; + protected BrokerPool $publisherFunctions; private ?Histogram $collectSchedulesTelemetryDuration = null; private ?Gauge $collectSchedulesTelemetryCount = null; @@ -45,6 +46,7 @@ abstract class ScheduleBase extends Action ->desc("Execute {$type}s scheduled in Appwrite") ->inject('publisher') ->inject('publisherMigrations') + ->inject('publisherFunctions') ->inject('dbForPlatform') ->inject('getProjectDB') ->inject('telemetry') @@ -67,13 +69,14 @@ abstract class ScheduleBase extends Action * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker. */ - public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void + public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void { Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); $this->publisher = $publisher; $this->publisherMigrations = $publisherMigrations; + $this->publisherFunctions = $publisherFunctions; $this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count'); $this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's'); diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index 96a5a05f0e..14a4259e17 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -30,7 +30,7 @@ class ScheduleExecutions extends ScheduleBase { $intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds'); - $queueForFunctions = new Func($this->publisher); + $queueForFunctions = new Func($this->publisherFunctions); foreach ($this->schedules as $schedule) { if (!$schedule['active']) { diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index 43f1025c08..6f072425e4 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -90,7 +90,7 @@ class ScheduleFunctions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - $queueForFunctions = new Func($this->publisher); + $queueForFunctions = new Func($this->publisherFunctions); $queueForFunctions ->setType('schedule')