diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index fb084fddb3..988f061ed5 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -1,7 +1,19 @@ param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') + ->inject('queueForWebhooks') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, Webhook $queueForWebhooks, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::WEBHOOK_QUEUE_NAME)); + $size = $queueForWebhooks->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -367,12 +377,12 @@ App::get('/v1/health/queue/logs') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') + ->inject('queueForAudits') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, Audit $queueForAudits, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::AUDITS_QUEUE_NAME)); + $size = $queueForAudits->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -457,12 +467,12 @@ App::get('/v1/health/queue/certificates') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') + ->inject('queueForCertificates') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, Certificate $queueForCertificates, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::CERTIFICATES_QUEUE_NAME)); + $size = $queueForCertificates->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -490,12 +500,12 @@ App::get('/v1/health/queue/builds') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') + ->inject('queueForBuilds') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, Build $queueForBuilds, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::BUILDS_QUEUE_NAME)); + $size = $queueForBuilds->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -524,11 +534,11 @@ App::get('/v1/health/queue/databases') )) ->param('name', 'database_db_main', new Text(256), 'Queue name for which to check the queue size', true) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisherDatabases') + ->inject('queueForDatabase') ->inject('response') - ->action(function (string $name, int|string $threshold, Publisher $publisherDatabases, Response $response) { + ->action(function (string $name, int|string $threshold, Database $queueForDatabase, Response $response) { $threshold = \intval($threshold); - $size = $publisherDatabases->getQueueSize(new Queue($name)); + $size = $queueForDatabase->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -556,12 +566,12 @@ App::get('/v1/health/queue/deletes') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisherDeletes') + ->inject('queueForDeletes') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisherDeletes, Response $response) { + ->action(function (int|string $threshold, Delete $queueForDeletes, Response $response) { $threshold = \intval($threshold); - $size = $publisherDeletes->getQueueSize(new Queue(Event::DELETE_QUEUE_NAME)); + $size = $queueForDeletes->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -589,12 +599,12 @@ App::get('/v1/health/queue/mails') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisherMails') + ->inject('queueForMails') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisherMails, Response $response) { + ->action(function (int|string $threshold, Mail $queueForMails, Response $response) { $threshold = \intval($threshold); - $size = $publisherMails->getQueueSize(new Queue(Event::MAILS_QUEUE_NAME)); + $size = $queueForMails->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -622,12 +632,12 @@ App::get('/v1/health/queue/messaging') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisherMessaging') + ->inject('queueForMessaging') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisherMessaging, Response $response) { + ->action(function (int|string $threshold, Messaging $queueForMessaging, Response $response) { $threshold = \intval($threshold); - $size = $publisherMessaging->getQueueSize(new Queue(Event::MESSAGING_QUEUE_NAME)); + $size = $queueForMessaging->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -655,12 +665,12 @@ App::get('/v1/health/queue/migrations') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisherMigrations') + ->inject('queueForMigrations') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisherMigrations, Response $response) { + ->action(function (int|string $threshold, Migration $queueForMigrations, Response $response) { $threshold = \intval($threshold); - $size = $publisherMigrations->getQueueSize(new Queue(Event::MIGRATIONS_QUEUE_NAME)); + $size = $queueForMigrations->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -688,12 +698,12 @@ App::get('/v1/health/queue/functions') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisherFunctions') + ->inject('queueForFunctions') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisherFunctions, Response $response) { + ->action(function (int|string $threshold, Func $queueForFunctions, Response $response) { $threshold = \intval($threshold); - $size = $publisherFunctions->getQueueSize(new Queue(Event::FUNCTIONS_QUEUE_NAME)); + $size = $queueForFunctions->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -703,7 +713,7 @@ App::get('/v1/health/queue/functions') }); App::get('/v1/health/queue/stats-resources') - ->desc('Get stats resources queue') + ->desc('Get stats resources queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( @@ -721,12 +731,12 @@ App::get('/v1/health/queue/stats-resources') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') + ->inject('queueForStatsResources') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, StatsResources $queueForStatsResources, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::STATS_RESOURCES_QUEUE_NAME)); + $size = $queueForStatsResources->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -754,12 +764,12 @@ App::get('/v1/health/queue/stats-usage') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') + ->inject('queueForStatsUsage') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, StatsUsage $queueForStatsUsage, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::STATS_USAGE_QUEUE_NAME)); + $size = $queueForStatsUsage->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -950,11 +960,53 @@ App::get('/v1/health/queue/failed/:name') ]), 'The name of the queue') ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) ->inject('response') - ->inject('publisher') - ->action(function (string $name, int|string $threshold, Response $response, Publisher $publisher) { + ->inject('queueForDatabase') + ->inject('queueForDeletes') + ->inject('queueForAudits') + ->inject('queueForMails') + ->inject('queueForFunctions') + ->inject('queueForStatsResources') + ->inject('queueForStatsUsage') + ->inject('queueForWebhooks') + ->inject('queueForCertificates') + ->inject('queueForBuilds') + ->inject('queueForMessaging') + ->inject('queueForMigrations') + ->action(function ( + string $name, + int|string $threshold, + Response $response, + Database $queueForDatabase, + Delete $queueForDeletes, + Audit $queueForAudits, + Mail $queueForMails, + Func $queueForFunctions, + StatsResources $queueForStatsResources, + StatsUsage $queueForStatsUsage, + Webhook $queueForWebhooks, + Certificate $queueForCertificates, + Build $queueForBuilds, + Messaging $queueForMessaging, + Migration $queueForMigrations + ) { $threshold = \intval($threshold); - $failed = $publisher->getQueueSize(new Queue($name), failedJobs: true); + /** @var Event $queue */ + $queue = match ($name) { + Event::DATABASE_QUEUE_NAME => $queueForDatabase, + Event::DELETE_QUEUE_NAME => $queueForDeletes, + Event::AUDITS_QUEUE_NAME => $queueForAudits, + Event::MAILS_QUEUE_NAME => $queueForMails, + Event::FUNCTIONS_QUEUE_NAME => $queueForFunctions, + Event::STATS_RESOURCES_QUEUE_NAME => $queueForStatsResources, + Event::STATS_USAGE_QUEUE_NAME => $queueForStatsUsage, + Event::WEBHOOK_QUEUE_NAME => $queueForWebhooks, + Event::CERTIFICATES_QUEUE_NAME => $queueForCertificates, + Event::BUILDS_QUEUE_NAME => $queueForBuilds, + Event::MESSAGING_QUEUE_NAME => $queueForMessaging, + Event::MIGRATIONS_QUEUE_NAME => $queueForMigrations, + }; + $failed = $queue->getSize(failed: true); if ($failed >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue failed jobs threshold hit. Current size is {$failed} and threshold is {$threshold}."); diff --git a/app/init/resources.php b/app/init/resources.php index d4f0433447..e4e8fbef5e 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -15,6 +15,7 @@ use Appwrite\Event\Mail; use Appwrite\Event\Messaging; use Appwrite\Event\Migration; use Appwrite\Event\Realtime; +use Appwrite\Event\StatsResources; use Appwrite\Event\StatsUsage; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; @@ -147,6 +148,9 @@ App::setResource('queueForCertificates', function (Publisher $publisher) { App::setResource('queueForMigrations', function (Publisher $publisher) { return new Migration($publisher); }, ['publisher']); +App::setResource('queueForStatsResources', function (Publisher $publisher) { + return new StatsResources($publisher); +}, ['publisher']); App::setResource('platforms', function (Request $request, Document $console, Document $project) { $console->setAttribute('platforms', [ // Always allow current host '$collection' => ID::custom('platforms'), diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 2557bf3f26..e9f3ccc2a2 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -638,4 +638,16 @@ class Event return $events; } + + /** + * Returns the size of the queue. + * + * @param bool $failed Whether to include failed events in the count. + * @return int The size of the queue. + */ + public function getSize(bool $failed = false): int + { + $queue = new Queue($this->getQueue()); + return $this->publisher->getQueueSize($queue, $failed); + } }