diff --git a/app/cli.php b/app/cli.php index 504e4fb5e6..2ad37c0ce2 100644 --- a/app/cli.php +++ b/app/cli.php @@ -194,6 +194,9 @@ CLI::setResource('publisher', function (Group $pools) { CLI::setResource('publisherDatabases', function (BrokerPool $publisher) { return $publisher; }, ['publisher']); +CLI::setResource('publisherFunctions', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); CLI::setResource('publisherMigrations', function (BrokerPool $publisher) { return $publisher; }, ['publisher']); diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index 1dc6ce917b..975db909c6 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -688,12 +688,12 @@ App::get('/v1/health/queue/functions') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('publisher') + ->inject('publisherFunctions') ->inject('response') - ->action(function (int|string $threshold, Publisher $publisher, Response $response) { + ->action(function (int|string $threshold, Publisher $publisherFunctions, Response $response) { $threshold = \intval($threshold); - $size = $publisher->getQueueSize(new Queue(Event::FUNCTIONS_QUEUE_NAME)); + $size = $publisherFunctions->getQueueSize(new Queue(Event::FUNCTIONS_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index c2f5c415a0..8fe9e747b2 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -29,6 +29,7 @@ use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Database\Helpers\Role; use Utopia\Database\Validator\Authorization; +use Utopia\Queue\Broker\Pool as BrokerPool; use Utopia\Queue\Publisher; use Utopia\System\System; use Utopia\Telemetry\Adapter as Telemetry; @@ -60,6 +61,12 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar return $label; }; +/** + * This isolated event handling for `users.*.create` which is based on a `Database::EVENT_DOCUMENT_CREATE` listener may look odd, but it is **intentional**. + * + * Accounts can be created in many ways beyond `createAccount` + * (anonymous, OAuth, phone, etc.), and those flows are probably not covered in event tests; so we handle this here. + */ $eventDatabaseListener = function (Document $project, Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) { // Only trigger events for user creation with the database listener. if ($document->getCollection() !== 'users') { @@ -408,6 +415,7 @@ App::init() ->inject('project') ->inject('user') ->inject('publisher') + ->inject('publisherFunctions') ->inject('queueForEvents') ->inject('queueForMessaging') ->inject('queueForAudits') @@ -423,7 +431,7 @@ App::init() ->inject('plan') ->inject('devKey') ->inject('telemetry') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry) use ($usageDatabaseListener, $eventDatabaseListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, BrokerPool $publisherFunctions, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry) use ($usageDatabaseListener, $eventDatabaseListener) { $route = $utopia->getRoute(); @@ -535,7 +543,7 @@ App::init() // Clone the queues, to prevent events triggered by the database listener // from overwriting the events that are supposed to be triggered in the shutdown hook. $queueForEventsClone = new Event($publisher); - $queueForFunctions = new Func($publisher); + $queueForFunctions = new Func($publisherFunctions); $queueForWebhooks = new Webhook($publisher); $queueForRealtime = new Realtime(); diff --git a/app/init/resources.php b/app/init/resources.php index 468ca4fbd8..375c835ea9 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -87,6 +87,9 @@ App::setResource('publisher', function (Group $pools) { App::setResource('publisherDatabases', function (BrokerPool $publisher) { return $publisher; }, ['publisher']); +App::setResource('publisherFunctions', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); App::setResource('publisherMigrations', function (BrokerPool $publisher) { return $publisher; }, ['publisher']); @@ -99,6 +102,9 @@ App::setResource('consumer', function (Group $pools) { App::setResource('consumerDatabases', function (BrokerPool $consumer) { return $consumer; }, ['consumer']); +App::setResource('consumerFunctions', function (BrokerPool $consumer) { + return $consumer; +}, ['consumer']); App::setResource('consumerMigrations', function (BrokerPool $consumer) { return $consumer; }, ['consumer']); diff --git a/app/worker.php b/app/worker.php index 90f3368fe7..d358f578ea 100644 --- a/app/worker.php +++ b/app/worker.php @@ -251,6 +251,10 @@ Server::setResource('publisherDatabases', function (BrokerPool $publisher) { return $publisher; }, ['publisher']); +Server::setResource('publisherFunctions', function (BrokerPool $publisher) { + return $publisher; +}, ['publisher']); + Server::setResource('publisherMigrations', function (BrokerPool $publisher) { return $publisher; }, ['publisher']); diff --git a/composer.lock b/composer.lock index b7e9a76088..3b0d7e724a 100644 --- a/composer.lock +++ b/composer.lock @@ -3557,16 +3557,16 @@ }, { "name": "utopia-php/database", - "version": "1.0.3", + "version": "1.1.0", "source": { "type": "git", "url": "https://github.com/utopia-php/database.git", - "reference": "6284aaa2726afdf837bb7aac57747831e21c904d" + "reference": "670e8efe7fb91f0fe43570caa5db97a1a5223357" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/database/zipball/6284aaa2726afdf837bb7aac57747831e21c904d", - "reference": "6284aaa2726afdf837bb7aac57747831e21c904d", + "url": "https://api.github.com/repos/utopia-php/database/zipball/670e8efe7fb91f0fe43570caa5db97a1a5223357", + "reference": "670e8efe7fb91f0fe43570caa5db97a1a5223357", "shasum": "" }, "require": { @@ -3607,9 +3607,9 @@ ], "support": { "issues": "https://github.com/utopia-php/database/issues", - "source": "https://github.com/utopia-php/database/tree/1.0.3" + "source": "https://github.com/utopia-php/database/tree/1.1.0" }, - "time": "2025-08-20T07:39:30+00:00" + "time": "2025-08-21T15:37:11+00:00" }, { "name": "utopia-php/detector", diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index 222051a67f..1710f0163d 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -26,6 +26,7 @@ abstract class ScheduleBase extends Action protected BrokerPool $publisher; protected BrokerPool $publisherMigrations; + protected BrokerPool $publisherFunctions; private ?Histogram $collectSchedulesTelemetryDuration = null; private ?Gauge $collectSchedulesTelemetryCount = null; @@ -45,6 +46,7 @@ abstract class ScheduleBase extends Action ->desc("Execute {$type}s scheduled in Appwrite") ->inject('publisher') ->inject('publisherMigrations') + ->inject('publisherFunctions') ->inject('dbForPlatform') ->inject('getProjectDB') ->inject('telemetry') @@ -67,13 +69,14 @@ abstract class ScheduleBase extends Action * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker. */ - public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void + public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void { Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); $this->publisher = $publisher; $this->publisherMigrations = $publisherMigrations; + $this->publisherFunctions = $publisherFunctions; $this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count'); $this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's'); diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index 96a5a05f0e..14a4259e17 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -30,7 +30,7 @@ class ScheduleExecutions extends ScheduleBase { $intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds'); - $queueForFunctions = new Func($this->publisher); + $queueForFunctions = new Func($this->publisherFunctions); foreach ($this->schedules as $schedule) { if (!$schedule['active']) { diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index 43f1025c08..6f072425e4 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -90,7 +90,7 @@ class ScheduleFunctions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - $queueForFunctions = new Func($this->publisher); + $queueForFunctions = new Func($this->publisherFunctions); $queueForFunctions ->setType('schedule')