From 5423b63c8ad4fae353d149275f9b72ee329eeb64 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Mon, 1 Sep 2025 15:03:49 +0100 Subject: [PATCH 1/3] refactor: prevent direct publisher calls --- app/cli.php | 19 +-- app/controllers/api/health.php | 136 ++++++++++++------ app/controllers/shared/api.php | 29 ++-- app/init/resources.php | 28 +--- app/worker.php | 32 ----- src/Appwrite/Event/Event.php | 35 +++++ src/Appwrite/Event/Func.php | 19 +++ src/Appwrite/Platform/Tasks/ScheduleBase.php | 29 +--- .../Platform/Tasks/ScheduleExecutions.php | 27 +++- .../Platform/Tasks/ScheduleFunctions.php | 24 +++- .../Platform/Tasks/ScheduleMessages.php | 23 ++- 11 files changed, 237 insertions(+), 164 deletions(-) diff --git a/app/cli.php b/app/cli.php index 0f98cf3458..ecb1675567 100644 --- a/app/cli.php +++ b/app/cli.php @@ -5,6 +5,7 @@ require_once __DIR__ . '/init.php'; use Appwrite\Event\Certificate; use Appwrite\Event\Delete; use Appwrite\Event\Func; +use Appwrite\Event\Messaging; use Appwrite\Event\StatsResources; use Appwrite\Event\StatsUsage; use Appwrite\Platform\Appwrite; @@ -191,21 +192,6 @@ CLI::setResource('getLogsDB', function (Group $pools, Cache $cache) { CLI::setResource('publisher', function (Group $pools) { return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); -CLI::setResource('publisherDatabases', function (BrokerPool $publisher) { - return $publisher; -}, ['publisher']); -CLI::setResource('publisherFunctions', function (BrokerPool $publisher) { - return $publisher; -}, ['publisher']); -CLI::setResource('publisherMigrations', function (BrokerPool $publisher) { - return $publisher; -}, ['publisher']); -CLI::setResource('publisherStatsUsage', function (BrokerPool $publisher) { - return $publisher; -}, ['publisher']); -CLI::setResource('publisherMessaging', function (BrokerPool $publisher) { - return $publisher; -}, ['publisher']); CLI::setResource('queueForStatsUsage', function (Publisher $publisher) { return new StatsUsage($publisher); }, ['publisher']); @@ -218,6 +204,9 @@ CLI::setResource('queueForFunctions', function (Publisher $publisher) { CLI::setResource('queueForDeletes', function (Publisher $publisher) { return new Delete($publisher); }, ['publisher']); +CLI::setResource('queueForMessaging', function (Publisher $publisher) { + return new Messaging($publisher); +}, ['publisher']); CLI::setResource('queueForCertificates', function (Publisher $publisher) { return new Certificate($publisher); }, ['publisher']); diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index fb084fddb3..818c431de7 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -1,7 +1,19 @@ param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') + ->inject('queueForWebhooks') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, Webhook $queueForWebhooks, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::WEBHOOK_QUEUE_NAME)); + $size = $queueForWebhooks->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -367,12 +377,12 @@ App::get('/v1/health/queue/logs') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') + ->inject('queueForAudits') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, Audit $queueForAudits, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::AUDITS_QUEUE_NAME)); + $size = $queueForAudits->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -457,12 +467,12 @@ App::get('/v1/health/queue/certificates') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') + ->inject('queueForCertificates') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, Certificate $queueForCertificates, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::CERTIFICATES_QUEUE_NAME)); + $size = $queueForCertificates->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -490,12 +500,12 @@ App::get('/v1/health/queue/builds') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') + ->inject('queueForBuilds') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, Build $queueForBuilds, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::BUILDS_QUEUE_NAME)); + $size = $queueForBuilds->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -524,11 +534,11 @@ App::get('/v1/health/queue/databases') )) ->param('name', 'database_db_main', new Text(256), 'Queue name for which to check the queue size', true) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisherDatabases') + ->inject('queueForDatabase') ->inject('response') - ->action(function (string $name, int|string $threshold, Publisher $publisherDatabases, Response $response) { + ->action(function (string $name, int|string $threshold, Database $queueForDatabase, Response $response) { $threshold = \intval($threshold); - $size = $publisherDatabases->getQueueSize(new Queue($name)); + $size = $queueForDatabase->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -556,12 +566,12 @@ App::get('/v1/health/queue/deletes') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisherDeletes') + ->inject('queueForDeletes') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisherDeletes, Response $response) { + ->action(function (int|string $threshold, Delete $queueForDeletes, Response $response) { $threshold = \intval($threshold); - $size = $publisherDeletes->getQueueSize(new Queue(Event::DELETE_QUEUE_NAME)); + $size = $queueForDeletes->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -589,12 +599,12 @@ App::get('/v1/health/queue/mails') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisherMails') + ->inject('queueForMails') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisherMails, Response $response) { + ->action(function (int|string $threshold, Mail $queueForMails, Response $response) { $threshold = \intval($threshold); - $size = $publisherMails->getQueueSize(new Queue(Event::MAILS_QUEUE_NAME)); + $size = $queueForMails->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -622,12 +632,12 @@ App::get('/v1/health/queue/messaging') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisherMessaging') + ->inject('queueForMessaging') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisherMessaging, Response $response) { + ->action(function (int|string $threshold, Messaging $queueForMessaging, Response $response) { $threshold = \intval($threshold); - $size = $publisherMessaging->getQueueSize(new Queue(Event::MESSAGING_QUEUE_NAME)); + $size = $queueForMessaging->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -655,12 +665,12 @@ App::get('/v1/health/queue/migrations') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisherMigrations') + ->inject('queueForMigrations') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisherMigrations, Response $response) { + ->action(function (int|string $threshold, Migration $queueForMigrations, Response $response) { $threshold = \intval($threshold); - $size = $publisherMigrations->getQueueSize(new Queue(Event::MIGRATIONS_QUEUE_NAME)); + $size = $queueForMigrations->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -688,12 +698,12 @@ App::get('/v1/health/queue/functions') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisherFunctions') + ->inject('queueForFunctions') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisherFunctions, Response $response) { + ->action(function (int|string $threshold, Func $queueForFunctions, Response $response) { $threshold = \intval($threshold); - $size = $publisherFunctions->getQueueSize(new Queue(Event::FUNCTIONS_QUEUE_NAME)); + $size = $queueForFunctions->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -703,7 +713,7 @@ App::get('/v1/health/queue/functions') }); App::get('/v1/health/queue/stats-resources') - ->desc('Get stats resources queue') + ->desc('Get stats resources queue') ->groups(['api', 'health']) ->label('scope', 'health.read') ->label('sdk', new Method( @@ -721,12 +731,12 @@ App::get('/v1/health/queue/stats-resources') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') + ->inject('queueForStatsResources') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, StatsResources $queueForStatsResources, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::STATS_RESOURCES_QUEUE_NAME)); + $size = $queueForStatsResources->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -754,12 +764,12 @@ App::get('/v1/health/queue/stats-usage') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') + ->inject('queueForStatsUsage') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, StatsUsage $queueForStatsUsage, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::STATS_USAGE_QUEUE_NAME)); + $size = $queueForStatsUsage->getSize(); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -950,11 +960,53 @@ App::get('/v1/health/queue/failed/:name') ]), 'The name of the queue') ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) ->inject('response') - ->inject('publisher') - ->action(function (string $name, int|string $threshold, Response $response, Publisher $publisher) { + ->inject('queueForDatabase') + ->inject('queueForDeletes') + ->inject('queueForAudits') + ->inject('queueForMails') + ->inject('queueForFunctions') + ->inject('queueForStatsResources') + ->inject('queueForStatsUsage') + ->inject('queueForWebhooks') + ->inject('queueForCertificates') + ->inject('queueForBuilds') + ->inject('queueForMessaging') + ->inject('queueForMigrations') + ->action(function ( + string $name, + int|string $threshold, + Response $response, + Database $queueForDatabase, + Database $queueForDeletes, + Database $queueForAudits, + Database $queueForMails, + Database $queueForFunctions, + Database $queueForStatsResources, + Database $queueForStatsUsage, + Database $queueForWebhooks, + Database $queueForCertificates, + Database $queueForBuilds, + Database $queueForMessaging, + Database $queueForMigrations + ) { $threshold = \intval($threshold); - $failed = $publisher->getQueueSize(new Queue($name), failedJobs: true); + /** @var Event $queue */ + $queue = match ($name) { + Event::DATABASE_QUEUE_NAME => $queueForDatabase, + Event::DELETE_QUEUE_NAME => $queueForDeletes, + Event::AUDITS_QUEUE_NAME => $queueForAudits, + Event::MAILS_QUEUE_NAME => $queueForMails, + Event::FUNCTIONS_QUEUE_NAME => $queueForFunctions, + Event::STATS_RESOURCES_QUEUE_NAME => $queueForStatsResources, + Event::STATS_USAGE_QUEUE_NAME => $queueForStatsUsage, + Event::WEBHOOK_QUEUE_NAME => $queueForWebhooks, + Event::CERTIFICATES_QUEUE_NAME => $queueForCertificates, + Event::BUILDS_QUEUE_NAME => $queueForBuilds, + Event::MESSAGING_QUEUE_NAME => $queueForMessaging, + Event::MIGRATIONS_QUEUE_NAME => $queueForMigrations, + }; + $failed = $queue->getSize(failed: true); if ($failed >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue failed jobs threshold hit. Current size is {$failed} and threshold is {$threshold}."); diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 6dcb99b56f..f6eb5a0f3d 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -29,7 +29,6 @@ use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Database\Helpers\Role; use Utopia\Database\Validator\Authorization; -use Utopia\Queue\Publisher; use Utopia\System\System; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Validator\WhiteList; @@ -413,9 +412,6 @@ App::init() ->inject('response') ->inject('project') ->inject('user') - ->inject('publisher') - ->inject('publisherFunctions') - ->inject('publisherWebhooks') ->inject('queueForEvents') ->inject('queueForMessaging') ->inject('queueForAudits') @@ -423,6 +419,9 @@ App::init() ->inject('queueForDatabase') ->inject('queueForBuilds') ->inject('queueForStatsUsage') + ->inject('queueForFunctions') + ->inject('queueForWebhooks') + ->inject('queueForRealtime') ->inject('dbForProject') ->inject('timelimit') ->inject('resourceToken') @@ -431,7 +430,7 @@ App::init() ->inject('plan') ->inject('devKey') ->inject('telemetry') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry) use ($usageDatabaseListener, $eventDatabaseListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry) use ($usageDatabaseListener, $eventDatabaseListener) { $route = $utopia->getRoute(); @@ -539,13 +538,9 @@ App::init() $queueForDatabase->setProject($project); $queueForBuilds->setProject($project); $queueForMessaging->setProject($project); - - // Clone the queues, to prevent events triggered by the database listener - // from overwriting the events that are supposed to be triggered in the shutdown hook. - $queueForEventsClone = new Event($publisher); - $queueForFunctions = new Func($publisherFunctions); - $queueForWebhooks = new Webhook($publisherWebhooks); - $queueForRealtime = new Realtime(); + $queueForFunctions->setProject($project); + $queueForWebhooks->setProject($project); + $queueForRealtime->setProject($project); $dbForProject ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) @@ -557,10 +552,12 @@ App::init() $project, $document, $response, - $queueForEventsClone->from($queueForEvents), - $queueForFunctions->from($queueForEvents), - $queueForWebhooks->from($queueForEvents), - $queueForRealtime->from($queueForEvents) + // Clone the queues used by database, to prevent overwriting the events that are supposed to be triggered in the shutdown hook. + // This is a hack, we should define this listener in the domain layer and allow it to inject it's own dependencies. + clone $queueForEvents, + clone $queueForFunctions, + clone $queueForWebhooks, + clone $queueForRealtime )); $useCache = $route->getLabel('cache', false); diff --git a/app/init/resources.php b/app/init/resources.php index d4f0433447..71680cf32b 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -15,6 +15,7 @@ use Appwrite\Event\Mail; use Appwrite\Event\Messaging; use Appwrite\Event\Migration; use Appwrite\Event\Realtime; +use Appwrite\Event\StatsResources; use Appwrite\Event\StatsUsage; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; @@ -84,30 +85,6 @@ App::setResource('localeCodes', function () { App::setResource('publisher', function (Group $pools) { return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); -App::setResource('publisherDatabases', function (Publisher $publisher) { - return $publisher; -}, ['publisher']); -App::setResource('publisherFunctions', function (Publisher $publisher) { - return $publisher; -}, ['publisher']); -App::setResource('publisherMigrations', function (Publisher $publisher) { - return $publisher; -}, ['publisher']); -App::setResource('publisherStatsUsage', function (Publisher $publisher) { - return $publisher; -}, ['publisher']); -App::setResource('publisherMails', function (Publisher $publisher) { - return $publisher; -}, ['publisher']); -App::setResource('publisherDeletes', function (Publisher $publisher) { - return $publisher; -}, ['publisher']); -App::setResource('publisherMessaging', function (Publisher $publisher) { - return $publisher; -}, ['publisher']); -App::setResource('publisherWebhooks', function (Publisher $publisher) { - return $publisher; -}, ['publisher']); App::setResource('queueForMessaging', function (Publisher $publisher) { return new Messaging($publisher); }, ['publisher']); @@ -132,6 +109,9 @@ App::setResource('queueForWebhooks', function (Publisher $publisher) { App::setResource('queueForRealtime', function () { return new Realtime(); }, []); +App::setResource('queueForStatsResources', function (Publisher $publisher) { + return new StatsResources($publisher); +}, ['publisher']); App::setResource('queueForStatsUsage', function (Publisher $publisher) { return new StatsUsage($publisher); }, ['publisher']); diff --git a/app/worker.php b/app/worker.php index bf0a6583ec..845914c923 100644 --- a/app/worker.php +++ b/app/worker.php @@ -247,42 +247,10 @@ Server::setResource('publisher', function (Group $pools) { return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); -Server::setResource('publisherDatabases', function (BrokerPool $publisher) { - return $publisher; -}, ['publisher']); - -Server::setResource('publisherFunctions', function (BrokerPool $publisher) { - return $publisher; -}, ['publisher']); - -Server::setResource('publisherMigrations', function (BrokerPool $publisher) { - return $publisher; -}, ['publisher']); - -Server::setResource('publisherStatsUsage', function (BrokerPool $publisher) { - return $publisher; -}, ['publisher']); - -Server::setResource('publisherMessaging', function (BrokerPool $publisher) { - return $publisher; -}, ['publisher']); - Server::setResource('consumer', function (Group $pools) { return new BrokerPool(consumer: $pools->get('consumer')); }, ['pools']); -Server::setResource('consumerDatabases', function (BrokerPool $consumer) { - return $consumer; -}, ['consumer']); - -Server::setResource('consumerMigrations', function (BrokerPool $consumer) { - return $consumer; -}, ['consumer']); - -Server::setResource('consumerStatsUsage', function (BrokerPool $consumer) { - return $consumer; -}, ['consumer']); - Server::setResource('queueForStatsUsage', function (Publisher $publisher) { return new StatsUsage($publisher); }, ['publisher']); diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 2557bf3f26..c88b886c70 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -638,4 +638,39 @@ class Event return $events; } + + /** + * Clone the event instance. + * + * Creates a deep copy of the event with all properties, + * including nested objects and arrays. + */ + public function __clone(): void + { + if ($this->project !== null) { + $this->project = clone $this->project; + } + + if ($this->user !== null) { + $this->user = clone $this->user; + } + + $clonedContext = []; + foreach ($this->context as $key => $document) { + $clonedContext[$key] = clone $document; + } + $this->context = $clonedContext; + } + + /** + * Returns the size of the queue. + * + * @param bool $failed Whether to include failed events in the count. + * @return int The size of the queue. + */ + public function getSize(bool $failed = false): int + { + $queue = new Queue($this->getQueue()); + return $this->publisher->getQueueSize($queue, $failed); + } } diff --git a/src/Appwrite/Event/Func.php b/src/Appwrite/Event/Func.php index ae316c84e5..df356b1ad5 100644 --- a/src/Appwrite/Event/Func.php +++ b/src/Appwrite/Event/Func.php @@ -219,4 +219,23 @@ class Func extends Event 'method' => $this->method, ]; } + + /** + * Clone the function event instance. + * + * Handles deep copying of Func-specific properties + * after parent cloning completes. + */ + public function __clone(): void + { + parent::__clone(); + + if ($this->function !== null) { + $this->function = clone $this->function; + } + + if ($this->execution !== null) { + $this->execution = clone $this->execution; + } + } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index 5cd25b09b4..22636e578f 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -11,7 +11,6 @@ use Utopia\Database\Exception; use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; use Utopia\Platform\Action; -use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\System\System; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Telemetry\Gauge; @@ -24,10 +23,6 @@ abstract class ScheduleBase extends Action protected array $schedules = []; - protected BrokerPool $publisher; - protected BrokerPool $publisherMigrations; - protected BrokerPool $publisherFunctions; - protected BrokerPool $publisherMessaging; private ?Histogram $collectSchedulesTelemetryDuration = null; private ?Gauge $collectSchedulesTelemetryCount = null; @@ -39,21 +34,6 @@ abstract class ScheduleBase extends Action abstract public static function getCollectionId(): string; abstract protected function enqueueResources(Database $dbForPlatform, callable $getProjectDB): void; - public function __construct() - { - $type = static::getSupportedResource(); - - $this - ->desc("Execute {$type}s scheduled in Appwrite") - ->inject('publisher') - ->inject('publisherMigrations') - ->inject('publisherFunctions') - ->inject('publisherMessaging') - ->inject('dbForPlatform') - ->inject('getProjectDB') - ->inject('telemetry') - ->callback($this->action(...)); - } protected function updateProjectAccess(Document $project, Database $dbForPlatform): void { @@ -71,16 +51,11 @@ abstract class ScheduleBase extends Action * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker. */ - public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, BrokerPool $publisherMessaging, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void + protected function schedule(Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void { Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); - $this->publisher = $publisher; - $this->publisherMigrations = $publisherMigrations; - $this->publisherFunctions = $publisherFunctions; - $this->publisherMessaging = $publisherMessaging; - $this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count'); $this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's'); $this->collectSchedulesTelemetryCount = $telemetry->createGauge('task.schedule.collect_schedules.count'); @@ -112,7 +87,7 @@ abstract class ScheduleBase extends Action } } - private function collectSchedules(Database $dbForPlatform, callable $getProjectDB, string &$lastSyncUpdate): void + protected function collectSchedules(Database $dbForPlatform, callable $getProjectDB, string &$lastSyncUpdate): void { // If we haven't synced yet, load all active schedules $initialLoad = $lastSyncUpdate === "0"; diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index 14a4259e17..6b19ef24e6 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -5,12 +5,15 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Func; use Swoole\Coroutine as Co; use Utopia\Database\Database; +use Utopia\Telemetry\Adapter as Telemetry; class ScheduleExecutions extends ScheduleBase { public const UPDATE_TIMER = 3; // seconds public const ENQUEUE_TIMER = 4; // seconds + protected Func $queueForFunctions; + public static function getName(): string { return 'schedule-executions'; @@ -26,12 +29,29 @@ class ScheduleExecutions extends ScheduleBase return 'executions'; } + public function __construct() + { + $type = static::getSupportedResource(); + + $this + ->desc("Execute executions scheduled in Appwrite") + ->inject('queueForFunctions') + ->inject('dbForPlatform') + ->inject('getProjectDB') + ->inject('telemetry') + ->callback($this->action(...)); + } + + public function action(Func $queueForFunctions, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void + { + $this->queueForFunctions = $queueForFunctions; + $this->schedule($dbForPlatform, $getProjectDB, $telemetry); + } + protected function enqueueResources(Database $dbForPlatform, callable $getProjectDB): void { $intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds'); - $queueForFunctions = new Func($this->publisherFunctions); - foreach ($this->schedules as $schedule) { if (!$schedule['active']) { $dbForPlatform->deleteDocument( @@ -57,7 +77,8 @@ class ScheduleExecutions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - \go(function () use ($queueForFunctions, $schedule, $scheduledAt, $delay, $data) { + \go(function () use ($schedule, $scheduledAt, $delay, $data) { + $queueForFunctions = clone $this->queueForFunctions; Co::sleep($delay); $queueForFunctions->setType('schedule') diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index 6f072425e4..6e45fda408 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -7,7 +7,7 @@ use Cron\CronExpression; use Utopia\CLI\Console; use Utopia\Database\Database; use Utopia\Database\DateTime; -use Utopia\Pools\Group; +use Utopia\Telemetry\Adapter as Telemetry; class ScheduleFunctions extends ScheduleBase { @@ -15,6 +15,7 @@ class ScheduleFunctions extends ScheduleBase public const ENQUEUE_TIMER = 60; // seconds private ?float $lastEnqueueUpdate = null; + protected Func $queueForFunctions; public static function getName(): string { @@ -31,6 +32,23 @@ class ScheduleFunctions extends ScheduleBase return 'functions'; } + public function __construct() + { + $this + ->desc("Execute functions scheduled in Appwrite") + ->inject('queueForFunctions') + ->inject('dbForPlatform') + ->inject('getProjectDB') + ->inject('telemetry') + ->callback($this->action(...)); + } + + public function action(Func $queueForFunctions, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void + { + $this->queueForFunctions = $queueForFunctions; + $this->schedule($dbForPlatform, $getProjectDB, $telemetry); + } + protected function enqueueResources(Database $dbForPlatform, callable $getProjectDB): void { $timerStart = \microtime(true); @@ -77,6 +95,8 @@ class ScheduleFunctions extends ScheduleBase foreach ($delayedExecutions as $delay => $schedules) { \go(function () use ($delay, $schedules, $dbForPlatform) { + $queueForFunctions = clone $this->queueForFunctions; + \sleep($delay); // in seconds foreach ($schedules as $delayConfig) { @@ -90,8 +110,6 @@ class ScheduleFunctions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - $queueForFunctions = new Func($this->publisherFunctions); - $queueForFunctions ->setType('schedule') ->setFunction($schedule['resource']) diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index fe4afbe69c..62d5344e0d 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -4,12 +4,15 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Messaging; use Utopia\Database\Database; +use Utopia\Telemetry\Adapter as Telemetry; class ScheduleMessages extends ScheduleBase { public const UPDATE_TIMER = 3; // seconds public const ENQUEUE_TIMER = 4; // seconds + protected Messaging $queueForMessaging; + public static function getName(): string { return 'schedule-messages'; @@ -25,6 +28,23 @@ class ScheduleMessages extends ScheduleBase return 'messages'; } + public function __construct() + { + $this + ->desc("Execute messages scheduled in Appwrite") + ->inject('queueForMessaging') + ->inject('dbForPlatform') + ->inject('getProjectDB') + ->inject('telemetry') + ->callback($this->action(...)); + } + + public function action(Messaging $queueForMessaging, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void + { + $this->queueForMessaging = $queueForMessaging; + $this->schedule($dbForPlatform, $getProjectDB, $telemetry); + } + protected function enqueueResources(Database $dbForPlatform, callable $getProjectDB): void { foreach ($this->schedules as $schedule) { @@ -40,10 +60,9 @@ class ScheduleMessages extends ScheduleBase } \go(function () use ($schedule, $scheduledAt, $dbForPlatform) { - $queueForMessaging = new Messaging($this->publisherMessaging); - $this->updateProjectAccess($schedule['project'], $dbForPlatform); + $queueForMessaging = clone $this->queueForMessaging; $queueForMessaging ->setType(MESSAGE_SEND_TYPE_EXTERNAL) ->setMessageId($schedule['resourceId']) From e4adff4edcecea89340fef3cb58dfbb359558584 Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Wed, 3 Sep 2025 10:29:49 +0100 Subject: [PATCH 2/3] chore: jake review --- app/controllers/api/health.php | 22 +++++++-------- src/Appwrite/Event/Database.php | 28 +++++++++++++++++++ src/Appwrite/Event/Func.php | 3 -- .../Platform/Tasks/ScheduleExecutions.php | 4 +-- .../Platform/Tasks/ScheduleFunctions.php | 2 +- .../Platform/Tasks/ScheduleMessages.php | 2 +- 6 files changed, 42 insertions(+), 19 deletions(-) diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index 818c431de7..988f061ed5 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -977,17 +977,17 @@ App::get('/v1/health/queue/failed/:name') int|string $threshold, Response $response, Database $queueForDatabase, - Database $queueForDeletes, - Database $queueForAudits, - Database $queueForMails, - Database $queueForFunctions, - Database $queueForStatsResources, - Database $queueForStatsUsage, - Database $queueForWebhooks, - Database $queueForCertificates, - Database $queueForBuilds, - Database $queueForMessaging, - Database $queueForMigrations + Delete $queueForDeletes, + Audit $queueForAudits, + Mail $queueForMails, + Func $queueForFunctions, + StatsResources $queueForStatsResources, + StatsUsage $queueForStatsUsage, + Webhook $queueForWebhooks, + Certificate $queueForCertificates, + Build $queueForBuilds, + Messaging $queueForMessaging, + Migration $queueForMigrations ) { $threshold = \intval($threshold); diff --git a/src/Appwrite/Event/Database.php b/src/Appwrite/Event/Database.php index 70051f9055..26639fc60b 100644 --- a/src/Appwrite/Event/Database.php +++ b/src/Appwrite/Event/Database.php @@ -193,4 +193,32 @@ class Database extends Event 'events' => Event::generateEvents($this->getEvent(), $this->getParams()) ]; } + + /** + * Clone the database event instance. + */ + public function __clone(): void + { + parent::__clone(); + + if ($this->database !== null) { + $this->database = clone $this->database; + } + + if ($this->row !== null) { + $this->row = clone $this->row; + } + + if ($this->table !== null) { + $this->table = clone $this->table; + } + + if ($this->document !== null) { + $this->document = clone $this->document; + } + + if ($this->collection !== null) { + $this->collection = clone $this->collection; + } + } } diff --git a/src/Appwrite/Event/Func.php b/src/Appwrite/Event/Func.php index df356b1ad5..9741576ecc 100644 --- a/src/Appwrite/Event/Func.php +++ b/src/Appwrite/Event/Func.php @@ -222,9 +222,6 @@ class Func extends Event /** * Clone the function event instance. - * - * Handles deep copying of Func-specific properties - * after parent cloning completes. */ public function __clone(): void { diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index 6b19ef24e6..cdfb188190 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -31,10 +31,8 @@ class ScheduleExecutions extends ScheduleBase public function __construct() { - $type = static::getSupportedResource(); - $this - ->desc("Execute executions scheduled in Appwrite") + ->desc('Execute executions scheduled in Appwrite') ->inject('queueForFunctions') ->inject('dbForPlatform') ->inject('getProjectDB') diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index 6e45fda408..d8ded60ec6 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -35,7 +35,7 @@ class ScheduleFunctions extends ScheduleBase public function __construct() { $this - ->desc("Execute functions scheduled in Appwrite") + ->desc('Execute functions scheduled in Appwrite') ->inject('queueForFunctions') ->inject('dbForPlatform') ->inject('getProjectDB') diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index 62d5344e0d..58a0a76b3e 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -31,7 +31,7 @@ class ScheduleMessages extends ScheduleBase public function __construct() { $this - ->desc("Execute messages scheduled in Appwrite") + ->desc('Execute messages scheduled in Appwrite') ->inject('queueForMessaging') ->inject('dbForPlatform') ->inject('getProjectDB') From 1b586bbc51a916d00eb17c418716e730d30abf4d Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Wed, 3 Sep 2025 12:08:07 +0100 Subject: [PATCH 3/3] chore: revert scheduler & hook changes --- app/cli.php | 19 ++++++++--- app/controllers/shared/api.php | 29 +++++++++-------- app/init/resources.php | 30 +++++++++++++++-- app/worker.php | 32 +++++++++++++++++++ src/Appwrite/Event/Database.php | 28 ---------------- src/Appwrite/Event/Event.php | 23 ------------- src/Appwrite/Event/Func.php | 16 ---------- src/Appwrite/Platform/Tasks/ScheduleBase.php | 29 +++++++++++++++-- .../Platform/Tasks/ScheduleExecutions.php | 25 ++------------- .../Platform/Tasks/ScheduleFunctions.php | 24 ++------------ .../Platform/Tasks/ScheduleMessages.php | 23 ++----------- 11 files changed, 125 insertions(+), 153 deletions(-) diff --git a/app/cli.php b/app/cli.php index ecb1675567..0f98cf3458 100644 --- a/app/cli.php +++ b/app/cli.php @@ -5,7 +5,6 @@ require_once __DIR__ . '/init.php'; use Appwrite\Event\Certificate; use Appwrite\Event\Delete; use Appwrite\Event\Func; -use Appwrite\Event\Messaging; use Appwrite\Event\StatsResources; use Appwrite\Event\StatsUsage; use Appwrite\Platform\Appwrite; @@ -192,6 +191,21 @@ CLI::setResource('getLogsDB', function (Group $pools, Cache $cache) { CLI::setResource('publisher', function (Group $pools) { return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); +CLI::setResource('publisherDatabases', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); +CLI::setResource('publisherFunctions', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); +CLI::setResource('publisherMigrations', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); +CLI::setResource('publisherStatsUsage', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); +CLI::setResource('publisherMessaging', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); CLI::setResource('queueForStatsUsage', function (Publisher $publisher) { return new StatsUsage($publisher); }, ['publisher']); @@ -204,9 +218,6 @@ CLI::setResource('queueForFunctions', function (Publisher $publisher) { CLI::setResource('queueForDeletes', function (Publisher $publisher) { return new Delete($publisher); }, ['publisher']); -CLI::setResource('queueForMessaging', function (Publisher $publisher) { - return new Messaging($publisher); -}, ['publisher']); CLI::setResource('queueForCertificates', function (Publisher $publisher) { return new Certificate($publisher); }, ['publisher']); diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index f6eb5a0f3d..6dcb99b56f 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -29,6 +29,7 @@ use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Database\Helpers\Role; use Utopia\Database\Validator\Authorization; +use Utopia\Queue\Publisher; use Utopia\System\System; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Validator\WhiteList; @@ -412,6 +413,9 @@ App::init() ->inject('response') ->inject('project') ->inject('user') + ->inject('publisher') + ->inject('publisherFunctions') + ->inject('publisherWebhooks') ->inject('queueForEvents') ->inject('queueForMessaging') ->inject('queueForAudits') @@ -419,9 +423,6 @@ App::init() ->inject('queueForDatabase') ->inject('queueForBuilds') ->inject('queueForStatsUsage') - ->inject('queueForFunctions') - ->inject('queueForWebhooks') - ->inject('queueForRealtime') ->inject('dbForProject') ->inject('timelimit') ->inject('resourceToken') @@ -430,7 +431,7 @@ App::init() ->inject('plan') ->inject('devKey') ->inject('telemetry') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry) use ($usageDatabaseListener, $eventDatabaseListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Publisher $publisherFunctions, Publisher $publisherWebhooks, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry) use ($usageDatabaseListener, $eventDatabaseListener) { $route = $utopia->getRoute(); @@ -538,9 +539,13 @@ App::init() $queueForDatabase->setProject($project); $queueForBuilds->setProject($project); $queueForMessaging->setProject($project); - $queueForFunctions->setProject($project); - $queueForWebhooks->setProject($project); - $queueForRealtime->setProject($project); + + // Clone the queues, to prevent events triggered by the database listener + // from overwriting the events that are supposed to be triggered in the shutdown hook. + $queueForEventsClone = new Event($publisher); + $queueForFunctions = new Func($publisherFunctions); + $queueForWebhooks = new Webhook($publisherWebhooks); + $queueForRealtime = new Realtime(); $dbForProject ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $usageDatabaseListener($event, $document, $queueForStatsUsage)) @@ -552,12 +557,10 @@ App::init() $project, $document, $response, - // Clone the queues used by database, to prevent overwriting the events that are supposed to be triggered in the shutdown hook. - // This is a hack, we should define this listener in the domain layer and allow it to inject it's own dependencies. - clone $queueForEvents, - clone $queueForFunctions, - clone $queueForWebhooks, - clone $queueForRealtime + $queueForEventsClone->from($queueForEvents), + $queueForFunctions->from($queueForEvents), + $queueForWebhooks->from($queueForEvents), + $queueForRealtime->from($queueForEvents) )); $useCache = $route->getLabel('cache', false); diff --git a/app/init/resources.php b/app/init/resources.php index 71680cf32b..e4e8fbef5e 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -85,6 +85,30 @@ App::setResource('localeCodes', function () { App::setResource('publisher', function (Group $pools) { return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); +App::setResource('publisherDatabases', function (Publisher $publisher) { + return $publisher; +}, ['publisher']); +App::setResource('publisherFunctions', function (Publisher $publisher) { + return $publisher; +}, ['publisher']); +App::setResource('publisherMigrations', function (Publisher $publisher) { + return $publisher; +}, ['publisher']); +App::setResource('publisherStatsUsage', function (Publisher $publisher) { + return $publisher; +}, ['publisher']); +App::setResource('publisherMails', function (Publisher $publisher) { + return $publisher; +}, ['publisher']); +App::setResource('publisherDeletes', function (Publisher $publisher) { + return $publisher; +}, ['publisher']); +App::setResource('publisherMessaging', function (Publisher $publisher) { + return $publisher; +}, ['publisher']); +App::setResource('publisherWebhooks', function (Publisher $publisher) { + return $publisher; +}, ['publisher']); App::setResource('queueForMessaging', function (Publisher $publisher) { return new Messaging($publisher); }, ['publisher']); @@ -109,9 +133,6 @@ App::setResource('queueForWebhooks', function (Publisher $publisher) { App::setResource('queueForRealtime', function () { return new Realtime(); }, []); -App::setResource('queueForStatsResources', function (Publisher $publisher) { - return new StatsResources($publisher); -}, ['publisher']); App::setResource('queueForStatsUsage', function (Publisher $publisher) { return new StatsUsage($publisher); }, ['publisher']); @@ -127,6 +148,9 @@ App::setResource('queueForCertificates', function (Publisher $publisher) { App::setResource('queueForMigrations', function (Publisher $publisher) { return new Migration($publisher); }, ['publisher']); +App::setResource('queueForStatsResources', function (Publisher $publisher) { + return new StatsResources($publisher); +}, ['publisher']); App::setResource('platforms', function (Request $request, Document $console, Document $project) { $console->setAttribute('platforms', [ // Always allow current host '$collection' => ID::custom('platforms'), diff --git a/app/worker.php b/app/worker.php index 845914c923..bf0a6583ec 100644 --- a/app/worker.php +++ b/app/worker.php @@ -247,10 +247,42 @@ Server::setResource('publisher', function (Group $pools) { return new BrokerPool(publisher: $pools->get('publisher')); }, ['pools']); +Server::setResource('publisherDatabases', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); + +Server::setResource('publisherFunctions', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); + +Server::setResource('publisherMigrations', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); + +Server::setResource('publisherStatsUsage', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); + +Server::setResource('publisherMessaging', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); + Server::setResource('consumer', function (Group $pools) { return new BrokerPool(consumer: $pools->get('consumer')); }, ['pools']); +Server::setResource('consumerDatabases', function (BrokerPool $consumer) { + return $consumer; +}, ['consumer']); + +Server::setResource('consumerMigrations', function (BrokerPool $consumer) { + return $consumer; +}, ['consumer']); + +Server::setResource('consumerStatsUsage', function (BrokerPool $consumer) { + return $consumer; +}, ['consumer']); + Server::setResource('queueForStatsUsage', function (Publisher $publisher) { return new StatsUsage($publisher); }, ['publisher']); diff --git a/src/Appwrite/Event/Database.php b/src/Appwrite/Event/Database.php index 26639fc60b..70051f9055 100644 --- a/src/Appwrite/Event/Database.php +++ b/src/Appwrite/Event/Database.php @@ -193,32 +193,4 @@ class Database extends Event 'events' => Event::generateEvents($this->getEvent(), $this->getParams()) ]; } - - /** - * Clone the database event instance. - */ - public function __clone(): void - { - parent::__clone(); - - if ($this->database !== null) { - $this->database = clone $this->database; - } - - if ($this->row !== null) { - $this->row = clone $this->row; - } - - if ($this->table !== null) { - $this->table = clone $this->table; - } - - if ($this->document !== null) { - $this->document = clone $this->document; - } - - if ($this->collection !== null) { - $this->collection = clone $this->collection; - } - } } diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index c88b886c70..e9f3ccc2a2 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -639,29 +639,6 @@ class Event return $events; } - /** - * Clone the event instance. - * - * Creates a deep copy of the event with all properties, - * including nested objects and arrays. - */ - public function __clone(): void - { - if ($this->project !== null) { - $this->project = clone $this->project; - } - - if ($this->user !== null) { - $this->user = clone $this->user; - } - - $clonedContext = []; - foreach ($this->context as $key => $document) { - $clonedContext[$key] = clone $document; - } - $this->context = $clonedContext; - } - /** * Returns the size of the queue. * diff --git a/src/Appwrite/Event/Func.php b/src/Appwrite/Event/Func.php index 9741576ecc..ae316c84e5 100644 --- a/src/Appwrite/Event/Func.php +++ b/src/Appwrite/Event/Func.php @@ -219,20 +219,4 @@ class Func extends Event 'method' => $this->method, ]; } - - /** - * Clone the function event instance. - */ - public function __clone(): void - { - parent::__clone(); - - if ($this->function !== null) { - $this->function = clone $this->function; - } - - if ($this->execution !== null) { - $this->execution = clone $this->execution; - } - } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index 22636e578f..5cd25b09b4 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -11,6 +11,7 @@ use Utopia\Database\Exception; use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; use Utopia\Platform\Action; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\System\System; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Telemetry\Gauge; @@ -23,6 +24,10 @@ abstract class ScheduleBase extends Action protected array $schedules = []; + protected BrokerPool $publisher; + protected BrokerPool $publisherMigrations; + protected BrokerPool $publisherFunctions; + protected BrokerPool $publisherMessaging; private ?Histogram $collectSchedulesTelemetryDuration = null; private ?Gauge $collectSchedulesTelemetryCount = null; @@ -34,6 +39,21 @@ abstract class ScheduleBase extends Action abstract public static function getCollectionId(): string; abstract protected function enqueueResources(Database $dbForPlatform, callable $getProjectDB): void; + public function __construct() + { + $type = static::getSupportedResource(); + + $this + ->desc("Execute {$type}s scheduled in Appwrite") + ->inject('publisher') + ->inject('publisherMigrations') + ->inject('publisherFunctions') + ->inject('publisherMessaging') + ->inject('dbForPlatform') + ->inject('getProjectDB') + ->inject('telemetry') + ->callback($this->action(...)); + } protected function updateProjectAccess(Document $project, Database $dbForPlatform): void { @@ -51,11 +71,16 @@ abstract class ScheduleBase extends Action * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker. */ - protected function schedule(Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void + public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, BrokerPool $publisherMessaging, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void { Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); + $this->publisher = $publisher; + $this->publisherMigrations = $publisherMigrations; + $this->publisherFunctions = $publisherFunctions; + $this->publisherMessaging = $publisherMessaging; + $this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count'); $this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's'); $this->collectSchedulesTelemetryCount = $telemetry->createGauge('task.schedule.collect_schedules.count'); @@ -87,7 +112,7 @@ abstract class ScheduleBase extends Action } } - protected function collectSchedules(Database $dbForPlatform, callable $getProjectDB, string &$lastSyncUpdate): void + private function collectSchedules(Database $dbForPlatform, callable $getProjectDB, string &$lastSyncUpdate): void { // If we haven't synced yet, load all active schedules $initialLoad = $lastSyncUpdate === "0"; diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index cdfb188190..14a4259e17 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -5,15 +5,12 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Func; use Swoole\Coroutine as Co; use Utopia\Database\Database; -use Utopia\Telemetry\Adapter as Telemetry; class ScheduleExecutions extends ScheduleBase { public const UPDATE_TIMER = 3; // seconds public const ENQUEUE_TIMER = 4; // seconds - protected Func $queueForFunctions; - public static function getName(): string { return 'schedule-executions'; @@ -29,27 +26,12 @@ class ScheduleExecutions extends ScheduleBase return 'executions'; } - public function __construct() - { - $this - ->desc('Execute executions scheduled in Appwrite') - ->inject('queueForFunctions') - ->inject('dbForPlatform') - ->inject('getProjectDB') - ->inject('telemetry') - ->callback($this->action(...)); - } - - public function action(Func $queueForFunctions, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void - { - $this->queueForFunctions = $queueForFunctions; - $this->schedule($dbForPlatform, $getProjectDB, $telemetry); - } - protected function enqueueResources(Database $dbForPlatform, callable $getProjectDB): void { $intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds'); + $queueForFunctions = new Func($this->publisherFunctions); + foreach ($this->schedules as $schedule) { if (!$schedule['active']) { $dbForPlatform->deleteDocument( @@ -75,8 +57,7 @@ class ScheduleExecutions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - \go(function () use ($schedule, $scheduledAt, $delay, $data) { - $queueForFunctions = clone $this->queueForFunctions; + \go(function () use ($queueForFunctions, $schedule, $scheduledAt, $delay, $data) { Co::sleep($delay); $queueForFunctions->setType('schedule') diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index d8ded60ec6..6f072425e4 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -7,7 +7,7 @@ use Cron\CronExpression; use Utopia\CLI\Console; use Utopia\Database\Database; use Utopia\Database\DateTime; -use Utopia\Telemetry\Adapter as Telemetry; +use Utopia\Pools\Group; class ScheduleFunctions extends ScheduleBase { @@ -15,7 +15,6 @@ class ScheduleFunctions extends ScheduleBase public const ENQUEUE_TIMER = 60; // seconds private ?float $lastEnqueueUpdate = null; - protected Func $queueForFunctions; public static function getName(): string { @@ -32,23 +31,6 @@ class ScheduleFunctions extends ScheduleBase return 'functions'; } - public function __construct() - { - $this - ->desc('Execute functions scheduled in Appwrite') - ->inject('queueForFunctions') - ->inject('dbForPlatform') - ->inject('getProjectDB') - ->inject('telemetry') - ->callback($this->action(...)); - } - - public function action(Func $queueForFunctions, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void - { - $this->queueForFunctions = $queueForFunctions; - $this->schedule($dbForPlatform, $getProjectDB, $telemetry); - } - protected function enqueueResources(Database $dbForPlatform, callable $getProjectDB): void { $timerStart = \microtime(true); @@ -95,8 +77,6 @@ class ScheduleFunctions extends ScheduleBase foreach ($delayedExecutions as $delay => $schedules) { \go(function () use ($delay, $schedules, $dbForPlatform) { - $queueForFunctions = clone $this->queueForFunctions; - \sleep($delay); // in seconds foreach ($schedules as $delayConfig) { @@ -110,6 +90,8 @@ class ScheduleFunctions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); + $queueForFunctions = new Func($this->publisherFunctions); + $queueForFunctions ->setType('schedule') ->setFunction($schedule['resource']) diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index 58a0a76b3e..fe4afbe69c 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -4,15 +4,12 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Messaging; use Utopia\Database\Database; -use Utopia\Telemetry\Adapter as Telemetry; class ScheduleMessages extends ScheduleBase { public const UPDATE_TIMER = 3; // seconds public const ENQUEUE_TIMER = 4; // seconds - protected Messaging $queueForMessaging; - public static function getName(): string { return 'schedule-messages'; @@ -28,23 +25,6 @@ class ScheduleMessages extends ScheduleBase return 'messages'; } - public function __construct() - { - $this - ->desc('Execute messages scheduled in Appwrite') - ->inject('queueForMessaging') - ->inject('dbForPlatform') - ->inject('getProjectDB') - ->inject('telemetry') - ->callback($this->action(...)); - } - - public function action(Messaging $queueForMessaging, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void - { - $this->queueForMessaging = $queueForMessaging; - $this->schedule($dbForPlatform, $getProjectDB, $telemetry); - } - protected function enqueueResources(Database $dbForPlatform, callable $getProjectDB): void { foreach ($this->schedules as $schedule) { @@ -60,9 +40,10 @@ class ScheduleMessages extends ScheduleBase } \go(function () use ($schedule, $scheduledAt, $dbForPlatform) { + $queueForMessaging = new Messaging($this->publisherMessaging); + $this->updateProjectAccess($schedule['project'], $dbForPlatform); - $queueForMessaging = clone $this->queueForMessaging; $queueForMessaging ->setType(MESSAGE_SEND_TYPE_EXTERNAL) ->setMessageId($schedule['resourceId'])