Merge pull request #10348 from appwrite/fix-users-events

Fix users events & missed publisher logic for Functions
This commit is contained in:
Jake Barnby 2025-08-22 19:17:43 +12:00 committed by GitHub
commit 45e31f2b1c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
9 changed files with 38 additions and 14 deletions

View file

@ -194,6 +194,9 @@ CLI::setResource('publisher', function (Group $pools) {
CLI::setResource('publisherDatabases', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
CLI::setResource('publisherFunctions', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
CLI::setResource('publisherMigrations', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);

View file

@ -688,12 +688,12 @@ App::get('/v1/health/queue/functions')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('publisher')
->inject('publisherFunctions')
->inject('response')
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
->action(function (int|string $threshold, Publisher $publisherFunctions, Response $response) {
$threshold = \intval($threshold);
$size = $publisher->getQueueSize(new Queue(Event::FUNCTIONS_QUEUE_NAME));
$size = $publisherFunctions->getQueueSize(new Queue(Event::FUNCTIONS_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");

View file

@ -29,6 +29,7 @@ use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Helpers\Role;
use Utopia\Database\Validator\Authorization;
use Utopia\Queue\Broker\Pool as BrokerPool;
use Utopia\Queue\Publisher;
use Utopia\System\System;
use Utopia\Telemetry\Adapter as Telemetry;
@ -60,6 +61,12 @@ $parseLabel = function (string $label, array $responsePayload, array $requestPar
return $label;
};
/**
* This isolated event handling for `users.*.create` which is based on a `Database::EVENT_DOCUMENT_CREATE` listener may look odd, but it is **intentional**.
*
* Accounts can be created in many ways beyond `createAccount`
* (anonymous, OAuth, phone, etc.), and those flows are probably not covered in event tests; so we handle this here.
*/
$eventDatabaseListener = function (Document $project, Document $document, Response $response, Event $queueForEvents, Func $queueForFunctions, Webhook $queueForWebhooks, Realtime $queueForRealtime) {
// Only trigger events for user creation with the database listener.
if ($document->getCollection() !== 'users') {
@ -408,6 +415,7 @@ App::init()
->inject('project')
->inject('user')
->inject('publisher')
->inject('publisherFunctions')
->inject('queueForEvents')
->inject('queueForMessaging')
->inject('queueForAudits')
@ -423,7 +431,7 @@ App::init()
->inject('plan')
->inject('devKey')
->inject('telemetry')
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry) use ($usageDatabaseListener, $eventDatabaseListener) {
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, BrokerPool $publisherFunctions, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, StatsUsage $queueForStatsUsage, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry) use ($usageDatabaseListener, $eventDatabaseListener) {
$route = $utopia->getRoute();
@ -535,7 +543,7 @@ App::init()
// Clone the queues, to prevent events triggered by the database listener
// from overwriting the events that are supposed to be triggered in the shutdown hook.
$queueForEventsClone = new Event($publisher);
$queueForFunctions = new Func($publisher);
$queueForFunctions = new Func($publisherFunctions);
$queueForWebhooks = new Webhook($publisher);
$queueForRealtime = new Realtime();

View file

@ -87,6 +87,9 @@ App::setResource('publisher', function (Group $pools) {
App::setResource('publisherDatabases', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
App::setResource('publisherFunctions', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
App::setResource('publisherMigrations', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
@ -99,6 +102,9 @@ App::setResource('consumer', function (Group $pools) {
App::setResource('consumerDatabases', function (BrokerPool $consumer) {
return $consumer;
}, ['consumer']);
App::setResource('consumerFunctions', function (BrokerPool $consumer) {
return $consumer;
}, ['consumer']);
App::setResource('consumerMigrations', function (BrokerPool $consumer) {
return $consumer;
}, ['consumer']);

View file

@ -251,6 +251,10 @@ Server::setResource('publisherDatabases', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
Server::setResource('publisherFunctions', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
Server::setResource('publisherMigrations', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);

12
composer.lock generated
View file

@ -3557,16 +3557,16 @@
},
{
"name": "utopia-php/database",
"version": "1.0.3",
"version": "1.1.0",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/database.git",
"reference": "6284aaa2726afdf837bb7aac57747831e21c904d"
"reference": "670e8efe7fb91f0fe43570caa5db97a1a5223357"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/database/zipball/6284aaa2726afdf837bb7aac57747831e21c904d",
"reference": "6284aaa2726afdf837bb7aac57747831e21c904d",
"url": "https://api.github.com/repos/utopia-php/database/zipball/670e8efe7fb91f0fe43570caa5db97a1a5223357",
"reference": "670e8efe7fb91f0fe43570caa5db97a1a5223357",
"shasum": ""
},
"require": {
@ -3607,9 +3607,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/database/issues",
"source": "https://github.com/utopia-php/database/tree/1.0.3"
"source": "https://github.com/utopia-php/database/tree/1.1.0"
},
"time": "2025-08-20T07:39:30+00:00"
"time": "2025-08-21T15:37:11+00:00"
},
{
"name": "utopia-php/detector",

View file

@ -26,6 +26,7 @@ abstract class ScheduleBase extends Action
protected BrokerPool $publisher;
protected BrokerPool $publisherMigrations;
protected BrokerPool $publisherFunctions;
private ?Histogram $collectSchedulesTelemetryDuration = null;
private ?Gauge $collectSchedulesTelemetryCount = null;
@ -45,6 +46,7 @@ abstract class ScheduleBase extends Action
->desc("Execute {$type}s scheduled in Appwrite")
->inject('publisher')
->inject('publisherMigrations')
->inject('publisherFunctions')
->inject('dbForPlatform')
->inject('getProjectDB')
->inject('telemetry')
@ -67,13 +69,14 @@ abstract class ScheduleBase extends Action
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
*/
public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void
public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void
{
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
$this->publisher = $publisher;
$this->publisherMigrations = $publisherMigrations;
$this->publisherFunctions = $publisherFunctions;
$this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count');
$this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's');

View file

@ -30,7 +30,7 @@ class ScheduleExecutions extends ScheduleBase
{
$intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds');
$queueForFunctions = new Func($this->publisher);
$queueForFunctions = new Func($this->publisherFunctions);
foreach ($this->schedules as $schedule) {
if (!$schedule['active']) {

View file

@ -90,7 +90,7 @@ class ScheduleFunctions extends ScheduleBase
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
$queueForFunctions = new Func($this->publisher);
$queueForFunctions = new Func($this->publisherFunctions);
$queueForFunctions
->setType('schedule')