Merge pull request #10420 from appwrite/refactor-prevent-direct-publisher-clls

refactor: remove direct publisher calls
This commit is contained in:
Luke B. Silver 2025-09-03 13:14:23 +01:00 committed by GitHub
commit e54e7fbcd7
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 110 additions and 42 deletions

View file

@ -1,7 +1,19 @@
<?php
use Appwrite\ClamAV\Network;
use Appwrite\Event\Audit;
use Appwrite\Event\Build;
use Appwrite\Event\Certificate;
use Appwrite\Event\Database;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
use Appwrite\Event\Migration;
use Appwrite\Event\StatsResources;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\Webhook;
use Appwrite\Extend\Exception;
use Appwrite\PubSub\Adapter\Pool as PubSubPool;
use Appwrite\SDK\AuthType;
@ -16,8 +28,6 @@ use Utopia\Database\Adapter\Pool as DatabasePool;
use Utopia\Database\Document;
use Utopia\Domains\Validator\PublicDomain;
use Utopia\Pools\Group;
use Utopia\Queue\Publisher;
use Utopia\Queue\Queue;
use Utopia\Registry\Registry;
use Utopia\Storage\Device;
use Utopia\Storage\Device\Local;
@ -334,12 +344,12 @@ App::get('/v1/health/queue/webhooks')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisher')
->inject('queueForWebhooks')
->inject('response')
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
->action(function (int|string $threshold, Webhook $queueForWebhooks, Response $response) {
$threshold = \intval($threshold);
$size = $publisher->getQueueSize(new Queue(Event::WEBHOOK_QUEUE_NAME));
$size = $queueForWebhooks->getSize();
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -367,12 +377,12 @@ App::get('/v1/health/queue/logs')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisher')
->inject('queueForAudits')
->inject('response')
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
->action(function (int|string $threshold, Audit $queueForAudits, Response $response) {
$threshold = \intval($threshold);
$size = $publisher->getQueueSize(new Queue(Event::AUDITS_QUEUE_NAME));
$size = $queueForAudits->getSize();
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -457,12 +467,12 @@ App::get('/v1/health/queue/certificates')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisher')
->inject('queueForCertificates')
->inject('response')
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
->action(function (int|string $threshold, Certificate $queueForCertificates, Response $response) {
$threshold = \intval($threshold);
$size = $publisher->getQueueSize(new Queue(Event::CERTIFICATES_QUEUE_NAME));
$size = $queueForCertificates->getSize();
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -490,12 +500,12 @@ App::get('/v1/health/queue/builds')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisher')
->inject('queueForBuilds')
->inject('response')
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
->action(function (int|string $threshold, Build $queueForBuilds, Response $response) {
$threshold = \intval($threshold);
$size = $publisher->getQueueSize(new Queue(Event::BUILDS_QUEUE_NAME));
$size = $queueForBuilds->getSize();
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -524,11 +534,11 @@ App::get('/v1/health/queue/databases')
))
->param('name', 'database_db_main', new Text(256), 'Queue name for which to check the queue size', true)
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisherDatabases')
->inject('queueForDatabase')
->inject('response')
->action(function (string $name, int|string $threshold, Publisher $publisherDatabases, Response $response) {
->action(function (string $name, int|string $threshold, Database $queueForDatabase, Response $response) {
$threshold = \intval($threshold);
$size = $publisherDatabases->getQueueSize(new Queue($name));
$size = $queueForDatabase->getSize();
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -556,12 +566,12 @@ App::get('/v1/health/queue/deletes')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisherDeletes')
->inject('queueForDeletes')
->inject('response')
->action(function (int|string $threshold, Publisher $publisherDeletes, Response $response) {
->action(function (int|string $threshold, Delete $queueForDeletes, Response $response) {
$threshold = \intval($threshold);
$size = $publisherDeletes->getQueueSize(new Queue(Event::DELETE_QUEUE_NAME));
$size = $queueForDeletes->getSize();
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -589,12 +599,12 @@ App::get('/v1/health/queue/mails')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisherMails')
->inject('queueForMails')
->inject('response')
->action(function (int|string $threshold, Publisher $publisherMails, Response $response) {
->action(function (int|string $threshold, Mail $queueForMails, Response $response) {
$threshold = \intval($threshold);
$size = $publisherMails->getQueueSize(new Queue(Event::MAILS_QUEUE_NAME));
$size = $queueForMails->getSize();
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -622,12 +632,12 @@ App::get('/v1/health/queue/messaging')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisherMessaging')
->inject('queueForMessaging')
->inject('response')
->action(function (int|string $threshold, Publisher $publisherMessaging, Response $response) {
->action(function (int|string $threshold, Messaging $queueForMessaging, Response $response) {
$threshold = \intval($threshold);
$size = $publisherMessaging->getQueueSize(new Queue(Event::MESSAGING_QUEUE_NAME));
$size = $queueForMessaging->getSize();
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -655,12 +665,12 @@ App::get('/v1/health/queue/migrations')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisherMigrations')
->inject('queueForMigrations')
->inject('response')
->action(function (int|string $threshold, Publisher $publisherMigrations, Response $response) {
->action(function (int|string $threshold, Migration $queueForMigrations, Response $response) {
$threshold = \intval($threshold);
$size = $publisherMigrations->getQueueSize(new Queue(Event::MIGRATIONS_QUEUE_NAME));
$size = $queueForMigrations->getSize();
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -688,12 +698,12 @@ App::get('/v1/health/queue/functions')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisherFunctions')
->inject('queueForFunctions')
->inject('response')
->action(function (int|string $threshold, Publisher $publisherFunctions, Response $response) {
->action(function (int|string $threshold, Func $queueForFunctions, Response $response) {
$threshold = \intval($threshold);
$size = $publisherFunctions->getQueueSize(new Queue(Event::FUNCTIONS_QUEUE_NAME));
$size = $queueForFunctions->getSize();
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -703,7 +713,7 @@ App::get('/v1/health/queue/functions')
});
App::get('/v1/health/queue/stats-resources')
->desc('Get stats resources queue')
->desc('Get stats resources queue')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
@ -721,12 +731,12 @@ App::get('/v1/health/queue/stats-resources')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisher')
->inject('queueForStatsResources')
->inject('response')
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
->action(function (int|string $threshold, StatsResources $queueForStatsResources, Response $response) {
$threshold = \intval($threshold);
$size = $publisher->getQueueSize(new Queue(Event::STATS_RESOURCES_QUEUE_NAME));
$size = $queueForStatsResources->getSize();
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -754,12 +764,12 @@ App::get('/v1/health/queue/stats-usage')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisher')
->inject('queueForStatsUsage')
->inject('response')
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
->action(function (int|string $threshold, StatsUsage $queueForStatsUsage, Response $response) {
$threshold = \intval($threshold);
$size = $publisher->getQueueSize(new Queue(Event::STATS_USAGE_QUEUE_NAME));
$size = $queueForStatsUsage->getSize();
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -950,11 +960,53 @@ App::get('/v1/health/queue/failed/:name')
]), 'The name of the queue')
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('response')
->inject('publisher')
->action(function (string $name, int|string $threshold, Response $response, Publisher $publisher) {
->inject('queueForDatabase')
->inject('queueForDeletes')
->inject('queueForAudits')
->inject('queueForMails')
->inject('queueForFunctions')
->inject('queueForStatsResources')
->inject('queueForStatsUsage')
->inject('queueForWebhooks')
->inject('queueForCertificates')
->inject('queueForBuilds')
->inject('queueForMessaging')
->inject('queueForMigrations')
->action(function (
string $name,
int|string $threshold,
Response $response,
Database $queueForDatabase,
Delete $queueForDeletes,
Audit $queueForAudits,
Mail $queueForMails,
Func $queueForFunctions,
StatsResources $queueForStatsResources,
StatsUsage $queueForStatsUsage,
Webhook $queueForWebhooks,
Certificate $queueForCertificates,
Build $queueForBuilds,
Messaging $queueForMessaging,
Migration $queueForMigrations
) {
$threshold = \intval($threshold);
$failed = $publisher->getQueueSize(new Queue($name), failedJobs: true);
/** @var Event $queue */
$queue = match ($name) {
Event::DATABASE_QUEUE_NAME => $queueForDatabase,
Event::DELETE_QUEUE_NAME => $queueForDeletes,
Event::AUDITS_QUEUE_NAME => $queueForAudits,
Event::MAILS_QUEUE_NAME => $queueForMails,
Event::FUNCTIONS_QUEUE_NAME => $queueForFunctions,
Event::STATS_RESOURCES_QUEUE_NAME => $queueForStatsResources,
Event::STATS_USAGE_QUEUE_NAME => $queueForStatsUsage,
Event::WEBHOOK_QUEUE_NAME => $queueForWebhooks,
Event::CERTIFICATES_QUEUE_NAME => $queueForCertificates,
Event::BUILDS_QUEUE_NAME => $queueForBuilds,
Event::MESSAGING_QUEUE_NAME => $queueForMessaging,
Event::MIGRATIONS_QUEUE_NAME => $queueForMigrations,
};
$failed = $queue->getSize(failed: true);
if ($failed >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue failed jobs threshold hit. Current size is {$failed} and threshold is {$threshold}.");

View file

@ -15,6 +15,7 @@ use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
use Appwrite\Event\Migration;
use Appwrite\Event\Realtime;
use Appwrite\Event\StatsResources;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\Webhook;
use Appwrite\Extend\Exception;
@ -147,6 +148,9 @@ App::setResource('queueForCertificates', function (Publisher $publisher) {
App::setResource('queueForMigrations', function (Publisher $publisher) {
return new Migration($publisher);
}, ['publisher']);
App::setResource('queueForStatsResources', function (Publisher $publisher) {
return new StatsResources($publisher);
}, ['publisher']);
App::setResource('platforms', function (Request $request, Document $console, Document $project) {
$console->setAttribute('platforms', [ // Always allow current host
'$collection' => ID::custom('platforms'),

View file

@ -638,4 +638,16 @@ class Event
return $events;
}
/**
* Returns the size of the queue.
*
* @param bool $failed Whether to include failed events in the count.
* @return int The size of the queue.
*/
public function getSize(bool $failed = false): int
{
$queue = new Queue($this->getQueue());
return $this->publisher->getQueueSize($queue, $failed);
}
}