From a16f26d378bb1b5b2383c3c40f3c78eb000754f1 Mon Sep 17 00:00:00 2001 From: Fabian Gruber Date: Wed, 29 Jan 2025 15:13:58 +0100 Subject: [PATCH] feat: add AMQP queues --- app/cli.php | 24 +- app/controllers/api/health.php | 158 +++------- app/controllers/shared/api.php | 12 +- app/init.php | 109 +++---- app/worker.php | 88 +++--- composer.json | 4 +- composer.lock | 278 ++++++++++++++++-- src/Appwrite/Event/Audit.php | 6 +- src/Appwrite/Event/Build.php | 6 +- src/Appwrite/Event/Certificate.php | 6 +- src/Appwrite/Event/Database.php | 6 +- src/Appwrite/Event/Delete.php | 6 +- src/Appwrite/Event/Event.php | 13 +- src/Appwrite/Event/Func.php | 6 +- src/Appwrite/Event/Mail.php | 6 +- src/Appwrite/Event/Messaging.php | 6 +- src/Appwrite/Event/Migration.php | 6 +- src/Appwrite/Event/Usage.php | 6 +- src/Appwrite/Event/UsageDump.php | 6 +- src/Appwrite/Event/Webhook.php | 6 +- src/Appwrite/Platform/Services/Tasks.php | 2 - src/Appwrite/Platform/Tasks/QueueCount.php | 57 ---- src/Appwrite/Platform/Tasks/QueueRetry.php | 23 +- src/Appwrite/Platform/Tasks/ScheduleBase.php | 22 +- .../Platform/Tasks/ScheduleExecutions.php | 10 +- .../Platform/Tasks/ScheduleFunctions.php | 14 +- .../Platform/Tasks/ScheduleMessages.php | 14 +- .../Health/HealthCustomServerTest.php | 18 -- tests/unit/Event/EventTest.php | 14 +- tests/unit/Event/MockPublisher.php | 35 +++ tests/unit/Usage/StatsTest.php | 41 --- 31 files changed, 534 insertions(+), 474 deletions(-) delete mode 100644 src/Appwrite/Platform/Tasks/QueueCount.php create mode 100644 tests/unit/Event/MockPublisher.php delete mode 100644 tests/unit/Usage/StatsTest.php diff --git a/app/cli.php b/app/cli.php index 47f4525f0b..abf75c9608 100644 --- a/app/cli.php +++ b/app/cli.php @@ -19,7 +19,7 @@ use Utopia\DSN\DSN; use Utopia\Logger\Log; use Utopia\Platform\Service; use Utopia\Pools\Group; -use Utopia\Queue\Connection; +use Utopia\Queue\Publisher; use Utopia\Registry\Registry; use Utopia\System\System; @@ -160,18 +160,18 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform }; }, ['pools', 'dbForPlatform', 'cache']); -CLI::setResource('queue', function (Group $pools) { - return $pools->get('queue')->pop()->getResource(); +CLI::setResource('publisher', function (Group $pools) { + return $pools->get('publisher')->pop()->getResource(); }, ['pools']); -CLI::setResource('queueForFunctions', function (Connection $queue) { - return new Func($queue); -}, ['queue']); -CLI::setResource('queueForDeletes', function (Connection $queue) { - return new Delete($queue); -}, ['queue']); -CLI::setResource('queueForCertificates', function (Connection $queue) { - return new Certificate($queue); -}, ['queue']); +CLI::setResource('queueForFunctions', function (Publisher $publisher) { + return new Func($publisher); +}, ['publisher']); +CLI::setResource('queueForDeletes', function (Publisher $publisher) { + return new Delete($publisher); +}, ['publisher']); +CLI::setResource('queueForCertificates', function (Publisher $publisher) { + return new Certificate($publisher); +}, ['publisher']); CLI::setResource('logError', function (Registry $register) { return function (Throwable $error, string $namespace, string $action) use ($register) { $logger = $register->get('logger'); diff --git a/app/controllers/api/health.php b/app/controllers/api/health.php index 1db4713311..305444abc5 100644 --- a/app/controllers/api/health.php +++ b/app/controllers/api/health.php @@ -13,8 +13,8 @@ use Utopia\Config\Config; use Utopia\Database\Document; use Utopia\Domains\Validator\PublicDomain; use Utopia\Pools\Group; -use Utopia\Queue\Client; -use Utopia\Queue\Connection; +use Utopia\Queue\Publisher; +use Utopia\Queue\Queue; use Utopia\Registry\Registry; use Utopia\Storage\Device; use Utopia\Storage\Device\Local; @@ -188,69 +188,6 @@ App::get('/v1/health/cache') ]), Response::MODEL_HEALTH_STATUS_LIST); }); -App::get('/v1/health/queue') - ->desc('Get queue') - ->groups(['api', 'health']) - ->label('scope', 'health.read') - ->label('sdk', new Method( - auth: [AuthType::KEY], - namespace: 'health', - name: 'getQueue', - description: '/docs/references/health/get-queue.md', - responses: [ - new SDKResponse( - code: Response::STATUS_CODE_OK, - model: Response::MODEL_HEALTH_STATUS, - ) - ], - contentType: ContentType::JSON - )) - ->inject('response') - ->inject('pools') - ->action(function (Response $response, Group $pools) { - - $output = []; - - $configs = [ - 'Queue' => Config::getParam('pools-queue'), - ]; - - foreach ($configs as $key => $config) { - foreach ($config as $database) { - $checkStart = \microtime(true); - try { - /** @var Connection $adapter */ - $adapter = $pools->get($database)->pop()->getResource(); - - if ($adapter->ping()) { - $output[] = new Document([ - 'name' => $key . " ($database)", - 'status' => 'pass', - 'ping' => \round((\microtime(true) - $checkStart) / 1000) - ]); - } else { - $output[] = new Document([ - 'name' => $key . " ($database)", - 'status' => 'fail', - 'ping' => \round((\microtime(true) - $checkStart) / 1000) - ]); - } - } catch (\Throwable $th) { - $output[] = new Document([ - 'name' => $key . " ($database)", - 'status' => 'fail', - 'ping' => \round((\microtime(true) - $checkStart) / 1000) - ]); - } - } - } - - $response->dynamic(new Document([ - 'statuses' => $output, - 'total' => count($output), - ]), Response::MODEL_HEALTH_STATUS_LIST); - }); - App::get('/v1/health/pubsub') ->desc('Get pubsub') ->groups(['api', 'health']) @@ -396,13 +333,12 @@ App::get('/v1/health/queue/webhooks') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queue') + ->inject('publisher') ->inject('response') - ->action(function (int|string $threshold, Connection $queue, Response $response) { + ->action(function (int|string $threshold, Publisher $publisher, Response $response) { $threshold = \intval($threshold); - $client = new Client(Event::WEBHOOK_QUEUE_NAME, $queue); - $size = $client->getQueueSize(); + $size = $publisher->getQueueSize(new Queue(Event::WEBHOOK_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -429,13 +365,12 @@ App::get('/v1/health/queue/logs') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queue') + ->inject('publisher') ->inject('response') - ->action(function (int|string $threshold, Connection $queue, Response $response) { + ->action(function (int|string $threshold, Publisher $publisher, Response $response) { $threshold = \intval($threshold); - $client = new Client(Event::AUDITS_QUEUE_NAME, $queue); - $size = $client->getQueueSize(); + $size = $publisher->getQueueSize(new Queue(Event::AUDITS_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -518,13 +453,12 @@ App::get('/v1/health/queue/certificates') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queue') + ->inject('publisher') ->inject('response') - ->action(function (int|string $threshold, Connection $queue, Response $response) { + ->action(function (int|string $threshold, Publisher $publisher, Response $response) { $threshold = \intval($threshold); - $client = new Client(Event::CERTIFICATES_QUEUE_NAME, $queue); - $size = $client->getQueueSize(); + $size = $publisher->getQueueSize(new Queue(Event::CERTIFICATES_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -551,13 +485,12 @@ App::get('/v1/health/queue/builds') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queue') + ->inject('publisher') ->inject('response') - ->action(function (int|string $threshold, Connection $queue, Response $response) { + ->action(function (int|string $threshold, Publisher $publisher, Response $response) { $threshold = \intval($threshold); - $client = new Client(Event::BUILDS_QUEUE_NAME, $queue); - $size = $client->getQueueSize(); + $size = $publisher->getQueueSize(new Queue(Event::BUILDS_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -585,13 +518,12 @@ App::get('/v1/health/queue/databases') )) ->param('name', 'database_db_main', new Text(256), 'Queue name for which to check the queue size', true) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queue') + ->inject('publisher') ->inject('response') - ->action(function (string $name, int|string $threshold, Connection $queue, Response $response) { + ->action(function (string $name, int|string $threshold, Publisher $publisher, Response $response) { $threshold = \intval($threshold); - $client = new Client($name, $queue); - $size = $client->getQueueSize(); + $size = $publisher->getQueueSize(new Queue($name)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -618,13 +550,12 @@ App::get('/v1/health/queue/deletes') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queue') + ->inject('publisher') ->inject('response') - ->action(function (int|string $threshold, Connection $queue, Response $response) { + ->action(function (int|string $threshold, Publisher $publisher, Response $response) { $threshold = \intval($threshold); - $client = new Client(Event::DELETE_QUEUE_NAME, $queue); - $size = $client->getQueueSize(); + $size = $publisher->getQueueSize(new Queue(Event::DELETE_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -651,13 +582,12 @@ App::get('/v1/health/queue/mails') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queue') + ->inject('publisher') ->inject('response') - ->action(function (int|string $threshold, Connection $queue, Response $response) { + ->action(function (int|string $threshold, Publisher $publisher, Response $response) { $threshold = \intval($threshold); - $client = new Client(Event::MAILS_QUEUE_NAME, $queue); - $size = $client->getQueueSize(); + $size = $publisher->getQueueSize(new Queue(Event::MAILS_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -684,13 +614,12 @@ App::get('/v1/health/queue/messaging') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queue') + ->inject('publisher') ->inject('response') - ->action(function (int|string $threshold, Connection $queue, Response $response) { + ->action(function (int|string $threshold, Publisher $publisher, Response $response) { $threshold = \intval($threshold); - $client = new Client(Event::MESSAGING_QUEUE_NAME, $queue); - $size = $client->getQueueSize(); + $size = $publisher->getQueueSize(new Queue(Event::MESSAGING_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -717,13 +646,12 @@ App::get('/v1/health/queue/migrations') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queue') + ->inject('publisher') ->inject('response') - ->action(function (int|string $threshold, Connection $queue, Response $response) { + ->action(function (int|string $threshold, Publisher $publisher, Response $response) { $threshold = \intval($threshold); - $client = new Client(Event::MIGRATIONS_QUEUE_NAME, $queue); - $size = $client->getQueueSize(); + $size = $publisher->getQueueSize(new Queue(Event::MIGRATIONS_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -750,13 +678,12 @@ App::get('/v1/health/queue/functions') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queue') + ->inject('publisher') ->inject('response') - ->action(function (int|string $threshold, Connection $queue, Response $response) { + ->action(function (int|string $threshold, Publisher $publisher, Response $response) { $threshold = \intval($threshold); - $client = new Client(Event::FUNCTIONS_QUEUE_NAME, $queue); - $size = $client->getQueueSize(); + $size = $publisher->getQueueSize(new Queue(Event::FUNCTIONS_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -783,13 +710,12 @@ App::get('/v1/health/queue/usage') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queue') + ->inject('publisher') ->inject('response') - ->action(function (int|string $threshold, Connection $queue, Response $response) { + ->action(function (int|string $threshold, Publisher $publisher, Response $response) { $threshold = \intval($threshold); - $client = new Client(Event::USAGE_QUEUE_NAME, $queue); - $size = $client->getQueueSize(); + $size = $publisher->getQueueSize(new Queue(Event::USAGE_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -816,13 +742,12 @@ App::get('/v1/health/queue/usage-dump') contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queue') + ->inject('publisher') ->inject('response') - ->action(function (int|string $threshold, Connection $queue, Response $response) { + ->action(function (int|string $threshold, Publisher $publisher, Response $response) { $threshold = \intval($threshold); - $client = new Client(Event::USAGE_DUMP_QUEUE_NAME, $queue); - $size = $client->getQueueSize(); + $size = $publisher->getQueueSize(new Queue(Event::USAGE_DUMP_QUEUE_NAME)); if ($size >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}."); @@ -1005,12 +930,11 @@ App::get('/v1/health/queue/failed/:name') ]), 'The name of the queue') ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) ->inject('response') - ->inject('queue') - ->action(function (string $name, int|string $threshold, Response $response, Connection $queue) { + ->inject('publisher') + ->action(function (string $name, int|string $threshold, Response $response, Publisher $publisher) { $threshold = \intval($threshold); - $client = new Client($name, $queue); - $failed = $client->countFailedJobs(); + $failed = $publisher->getQueueSize(new Queue($name), failedJobs: true); if ($failed >= $threshold) { throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue failed jobs threshold hit. Current size is {$failed} and threshold is {$threshold}."); diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index d8f1eb59ef..bce36e9303 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -28,7 +28,7 @@ use Utopia\Database\DateTime; use Utopia\Database\Document; use Utopia\Database\Helpers\Role; use Utopia\Database\Validator\Authorization; -use Utopia\Queue\Connection; +use Utopia\Queue\Publisher; use Utopia\System\System; use Utopia\Validator\WhiteList; @@ -429,7 +429,7 @@ App::init() ->inject('response') ->inject('project') ->inject('user') - ->inject('queue') + ->inject('publisher') ->inject('queueForEvents') ->inject('queueForMessaging') ->inject('queueForAudits') @@ -440,7 +440,7 @@ App::init() ->inject('dbForProject') ->inject('timelimit') ->inject('mode') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Connection $queue, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Database $dbForProject, callable $timelimit, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Database $dbForProject, callable $timelimit, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) { $route = $utopia->getRoute(); @@ -544,9 +544,9 @@ App::init() // Clone the queues, to prevent events triggered by the database listener // from overwriting the events that are supposed to be triggered in the shutdown hook. - $queueForEventsClone = new Event($queue); - $queueForFunctions = new Func($queue); - $queueForWebhooks = new Webhook($queue); + $queueForEventsClone = new Event($publisher); + $queueForFunctions = new Func($publisher); + $queueForWebhooks = new Webhook($publisher); $queueForRealtime = new Realtime(); $dbForProject diff --git a/app/init.php b/app/init.php index e912ffd8f4..941e253260 100644 --- a/app/init.php +++ b/app/init.php @@ -77,7 +77,6 @@ use Utopia\Logger\Logger; use Utopia\Pools\Group; use Utopia\Pools\Pool; use Utopia\Queue; -use Utopia\Queue\Connection; use Utopia\Registry\Registry; use Utopia\Storage\Device; use Utopia\Storage\Device\Backblaze; @@ -890,8 +889,14 @@ $register->set('pools', function () { 'multiple' => false, 'schemes' => ['mariadb', 'mysql'], ], - 'queue' => [ - 'type' => 'queue', + 'publisher' => [ + 'type' => 'publisher', + 'dsns' => $fallbackForRedis, + 'multiple' => false, + 'schemes' => ['redis'], + ], + 'consumer' => [ + 'type' => 'consumer', 'dsns' => $fallbackForRedis, 'multiple' => false, 'schemes' => ['redis'], @@ -999,31 +1004,26 @@ $register->set('pools', function () { }; $adapter->setDatabase($dsn->getPath()); - break; + return $adapter; case 'pubsub': - $adapter = match ($dsn->getScheme()) { + return match ($dsn->getScheme()) { 'redis' => new PubSub($resource()), default => null }; - break; - case 'queue': - $adapter = match ($dsn->getScheme()) { - 'redis' => new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort()), + case 'publisher': + case 'consumer': + return match ($dsn->getScheme()) { + 'redis' => new Queue\Broker\Redis(new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort())), default => null }; - break; case 'cache': - $adapter = match ($dsn->getScheme()) { + return match ($dsn->getScheme()) { 'redis' => new RedisCache($resource()), default => null }; - break; - default: throw new Exception(Exception::GENERAL_SERVER_ERROR, "Server error: Missing adapter implementation."); } - - return $adapter; }); $group->add($pool); @@ -1146,48 +1146,51 @@ App::setResource('localeCodes', function () { }); // Queues -App::setResource('queue', function (Group $pools) { - return $pools->get('queue')->pop()->getResource(); +App::setResource('publisher', function (Group $pools) { + return $pools->get('publisher')->pop()->getResource(); }, ['pools']); -App::setResource('queueForMessaging', function (Connection $queue) { - return new Messaging($queue); -}, ['queue']); -App::setResource('queueForMails', function (Connection $queue) { - return new Mail($queue); -}, ['queue']); -App::setResource('queueForBuilds', function (Connection $queue) { - return new Build($queue); -}, ['queue']); -App::setResource('queueForDatabase', function (Connection $queue) { - return new EventDatabase($queue); -}, ['queue']); -App::setResource('queueForDeletes', function (Connection $queue) { - return new Delete($queue); -}, ['queue']); -App::setResource('queueForEvents', function (Connection $queue) { - return new Event($queue); -}, ['queue']); -App::setResource('queueForWebhooks', function (Connection $queue) { - return new Webhook($queue); -}, ['queue']); +App::setResource('consumer', function (Group $pools) { + return $pools->get('consumer')->pop()->getResource(); +}, ['pools']); +App::setResource('queueForMessaging', function (Queue\Publisher $publisher) { + return new Messaging($publisher); +}, ['publisher']); +App::setResource('queueForMails', function (Queue\Publisher $publisher) { + return new Mail($publisher); +}, ['publisher']); +App::setResource('queueForBuilds', function (Queue\Publisher $publisher) { + return new Build($publisher); +}, ['publisher']); +App::setResource('queueForDatabase', function (Queue\Publisher $publisher) { + return new EventDatabase($publisher); +}, ['publisher']); +App::setResource('queueForDeletes', function (Queue\Publisher $publisher) { + return new Delete($publisher); +}, ['publisher']); +App::setResource('queueForEvents', function (Queue\Publisher $publisher) { + return new Event($publisher); +}, ['publisher']); +App::setResource('queueForWebhooks', function (Queue\Publisher $publisher) { + return new Webhook($publisher); +}, ['publisher']); App::setResource('queueForRealtime', function () { return new Realtime(); }, []); -App::setResource('queueForAudits', function (Connection $queue) { - return new Audit($queue); -}, ['queue']); -App::setResource('queueForFunctions', function (Connection $queue) { - return new Func($queue); -}, ['queue']); -App::setResource('queueForUsage', function (Connection $queue) { - return new Usage($queue); -}, ['queue']); -App::setResource('queueForCertificates', function (Connection $queue) { - return new Certificate($queue); -}, ['queue']); -App::setResource('queueForMigrations', function (Connection $queue) { - return new Migration($queue); -}, ['queue']); +App::setResource('queueForAudits', function (Queue\Publisher $publisher) { + return new Audit($publisher); +}, ['publisher']); +App::setResource('queueForFunctions', function (Queue\Publisher $publisher) { + return new Func($publisher); +}, ['publisher']); +App::setResource('queueForUsage', function (Queue\Publisher $publisher) { + return new Usage($publisher); +}, ['publisher']); +App::setResource('queueForCertificates', function (Queue\Publisher $publisher) { + return new Certificate($publisher); +}, ['publisher']); +App::setResource('queueForMigrations', function (Queue\Publisher $publisher) { + return new Migration($publisher); +}, ['publisher']); App::setResource('clients', function ($request, $console, $project) { $console->setAttribute('platforms', [ // Always allow current host '$collection' => ID::custom('platforms'), diff --git a/app/worker.php b/app/worker.php index 6eb1363e9b..854d9cb8ae 100644 --- a/app/worker.php +++ b/app/worker.php @@ -31,8 +31,8 @@ use Utopia\Logger\Log; use Utopia\Logger\Logger; use Utopia\Platform\Service; use Utopia\Pools\Group; -use Utopia\Queue\Connection; use Utopia\Queue\Message; +use Utopia\Queue\Publisher; use Utopia\Queue\Server; use Utopia\Registry\Registry; use Utopia\System\System; @@ -224,57 +224,61 @@ Server::setResource('timelimit', function (\Redis $redis) { Server::setResource('log', fn () => new Log()); -Server::setResource('queueForUsage', function (Connection $queue) { - return new Usage($queue); -}, ['queue']); - -Server::setResource('queueForUsageDump', function (Connection $queue) { - return new UsageDump($queue); -}, ['queue']); - -Server::setResource('queue', function (Group $pools) { - return $pools->get('queue')->pop()->getResource(); +Server::setResource('publisher', function (Group $pools) { + return $pools->get('publisher')->pop()->getResource(); }, ['pools']); -Server::setResource('queueForDatabase', function (Connection $queue) { - return new EventDatabase($queue); -}, ['queue']); +Server::setResource('consumer', function (Group $pools) { + return $pools->get('consumer')->pop()->getResource(); +}, ['pools']); -Server::setResource('queueForMessaging', function (Connection $queue) { - return new Messaging($queue); -}, ['queue']); +Server::setResource('queueForUsage', function (Publisher $publisher) { + return new Usage($publisher); +}, ['publisher']); -Server::setResource('queueForMails', function (Connection $queue) { - return new Mail($queue); -}, ['queue']); +Server::setResource('queueForUsageDump', function (Publisher $publisher) { + return new UsageDump($publisher); +}, ['publisher']); -Server::setResource('queueForBuilds', function (Connection $queue) { - return new Build($queue); -}, ['queue']); +Server::setResource('queueForDatabase', function (Publisher $publisher) { + return new EventDatabase($publisher); +}, ['publisher']); -Server::setResource('queueForDeletes', function (Connection $queue) { - return new Delete($queue); -}, ['queue']); +Server::setResource('queueForMessaging', function (Publisher $publisher) { + return new Messaging($publisher); +}, ['publisher']); -Server::setResource('queueForEvents', function (Connection $queue) { - return new Event($queue); -}, ['queue']); +Server::setResource('queueForMails', function (Publisher $publisher) { + return new Mail($publisher); +}, ['publisher']); -Server::setResource('queueForAudits', function (Connection $queue) { - return new Audit($queue); -}, ['queue']); +Server::setResource('queueForBuilds', function (Publisher $publisher) { + return new Build($publisher); +}, ['publisher']); -Server::setResource('queueForFunctions', function (Connection $queue) { - return new Func($queue); -}, ['queue']); +Server::setResource('queueForDeletes', function (Publisher $publisher) { + return new Delete($publisher); +}, ['publisher']); -Server::setResource('queueForCertificates', function (Connection $queue) { - return new Certificate($queue); -}, ['queue']); +Server::setResource('queueForEvents', function (Publisher $publisher) { + return new Event($publisher); +}, ['publisher']); -Server::setResource('queueForMigrations', function (Connection $queue) { - return new Migration($queue); -}, ['queue']); +Server::setResource('queueForAudits', function (Publisher $publisher) { + return new Audit($publisher); +}, ['publisher']); + +Server::setResource('queueForFunctions', function (Publisher $publisher) { + return new Func($publisher); +}, ['publisher']); + +Server::setResource('queueForCertificates', function (Publisher $publisher) { + return new Certificate($publisher); +}, ['publisher']); + +Server::setResource('queueForMigrations', function (Publisher $publisher) { + return new Migration($publisher); +}, ['publisher']); Server::setResource('logger', function (Registry $register) { return $register->get('logger'); @@ -386,7 +390,7 @@ try { */ $platform->init(Service::TYPE_WORKER, [ 'workersNum' => System::getEnv('_APP_WORKERS_NUM', 1), - 'connection' => $pools->get('queue')->pop()->getResource(), + 'connection' => $pools->get('consumer')->pop()->getResource(), 'workerName' => strtolower($workerName) ?? null, 'queueName' => $queueName ]); diff --git a/composer.json b/composer.json index 5d36e45431..fe3400cfe5 100644 --- a/composer.json +++ b/composer.json @@ -62,10 +62,10 @@ "utopia-php/messaging": "0.14.*", "utopia-php/migration": "0.6.*", "utopia-php/orchestration": "0.9.*", - "utopia-php/platform": "0.7.1", + "utopia-php/platform": "0.7.3", "utopia-php/pools": "0.5.*", "utopia-php/preloader": "0.2.*", - "utopia-php/queue": "0.7.*", + "utopia-php/queue": "0.8.*", "utopia-php/registry": "0.5.*", "utopia-php/storage": "0.18.*", "utopia-php/swoole": "0.8.*", diff --git a/composer.lock b/composer.lock index 43821e4bff..9e957ef4bf 100644 --- a/composer.lock +++ b/composer.lock @@ -4,7 +4,7 @@ "Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies", "This file is @generated automatically" ], - "content-hash": "1cbdaa30797ba4f524dbcb830f5e14fa", + "content-hash": "232691925e05350c7a3831a4e43d79d1", "packages": [ { "name": "adhocore/jwt", @@ -1705,6 +1705,137 @@ }, "time": "2024-05-08T12:18:48+00:00" }, + { + "name": "paragonie/random_compat", + "version": "v9.99.100", + "source": { + "type": "git", + "url": "https://github.com/paragonie/random_compat.git", + "reference": "996434e5492cb4c3edcb9168db6fbb1359ef965a" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/paragonie/random_compat/zipball/996434e5492cb4c3edcb9168db6fbb1359ef965a", + "reference": "996434e5492cb4c3edcb9168db6fbb1359ef965a", + "shasum": "" + }, + "require": { + "php": ">= 7" + }, + "require-dev": { + "phpunit/phpunit": "4.*|5.*", + "vimeo/psalm": "^1" + }, + "suggest": { + "ext-libsodium": "Provides a modern crypto API that can be used to generate random bytes." + }, + "type": "library", + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Paragon Initiative Enterprises", + "email": "security@paragonie.com", + "homepage": "https://paragonie.com" + } + ], + "description": "PHP 5.x polyfill for random_bytes() and random_int() from PHP 7", + "keywords": [ + "csprng", + "polyfill", + "pseudorandom", + "random" + ], + "support": { + "email": "info@paragonie.com", + "issues": "https://github.com/paragonie/random_compat/issues", + "source": "https://github.com/paragonie/random_compat" + }, + "time": "2020-10-15T08:29:30+00:00" + }, + { + "name": "php-amqplib/php-amqplib", + "version": "v3.7.2", + "source": { + "type": "git", + "url": "https://github.com/php-amqplib/php-amqplib.git", + "reference": "738a73eb0019b6c99d9bc25d7a0c0dd8f56a5199" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/php-amqplib/php-amqplib/zipball/738a73eb0019b6c99d9bc25d7a0c0dd8f56a5199", + "reference": "738a73eb0019b6c99d9bc25d7a0c0dd8f56a5199", + "shasum": "" + }, + "require": { + "ext-mbstring": "*", + "ext-sockets": "*", + "php": "^7.2||^8.0", + "phpseclib/phpseclib": "^2.0|^3.0" + }, + "conflict": { + "php": "7.4.0 - 7.4.1" + }, + "replace": { + "videlalvaro/php-amqplib": "self.version" + }, + "require-dev": { + "ext-curl": "*", + "nategood/httpful": "^0.2.20", + "phpunit/phpunit": "^7.5|^9.5", + "squizlabs/php_codesniffer": "^3.6" + }, + "type": "library", + "extra": { + "branch-alias": { + "dev-master": "3.0-dev" + } + }, + "autoload": { + "psr-4": { + "PhpAmqpLib\\": "PhpAmqpLib/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "LGPL-2.1-or-later" + ], + "authors": [ + { + "name": "Alvaro Videla", + "role": "Original Maintainer" + }, + { + "name": "Raúl Araya", + "email": "nubeiro@gmail.com", + "role": "Maintainer" + }, + { + "name": "Luke Bakken", + "email": "luke@bakken.io", + "role": "Maintainer" + }, + { + "name": "Ramūnas Dronga", + "email": "github@ramuno.lt", + "role": "Maintainer" + } + ], + "description": "Formerly videlalvaro/php-amqplib. This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.", + "homepage": "https://github.com/php-amqplib/php-amqplib/", + "keywords": [ + "message", + "queue", + "rabbitmq" + ], + "support": { + "issues": "https://github.com/php-amqplib/php-amqplib/issues", + "source": "https://github.com/php-amqplib/php-amqplib/tree/v3.7.2" + }, + "time": "2024-11-21T09:21:41+00:00" + }, { "name": "php-http/discovery", "version": "1.20.0", @@ -1865,6 +1996,116 @@ ], "time": "2023-11-25T22:23:28+00:00" }, + { + "name": "phpseclib/phpseclib", + "version": "3.0.43", + "source": { + "type": "git", + "url": "https://github.com/phpseclib/phpseclib.git", + "reference": "709ec107af3cb2f385b9617be72af8cf62441d02" + }, + "dist": { + "type": "zip", + "url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/709ec107af3cb2f385b9617be72af8cf62441d02", + "reference": "709ec107af3cb2f385b9617be72af8cf62441d02", + "shasum": "" + }, + "require": { + "paragonie/constant_time_encoding": "^1|^2|^3", + "paragonie/random_compat": "^1.4|^2.0|^9.99.99", + "php": ">=5.6.1" + }, + "require-dev": { + "phpunit/phpunit": "*" + }, + "suggest": { + "ext-dom": "Install the DOM extension to load XML formatted public keys.", + "ext-gmp": "Install the GMP (GNU Multiple Precision) extension in order to speed up arbitrary precision integer arithmetic operations.", + "ext-libsodium": "SSH2/SFTP can make use of some algorithms provided by the libsodium-php extension.", + "ext-mcrypt": "Install the Mcrypt extension in order to speed up a few other cryptographic operations.", + "ext-openssl": "Install the OpenSSL extension in order to speed up a wide variety of cryptographic operations." + }, + "type": "library", + "autoload": { + "files": [ + "phpseclib/bootstrap.php" + ], + "psr-4": { + "phpseclib3\\": "phpseclib/" + } + }, + "notification-url": "https://packagist.org/downloads/", + "license": [ + "MIT" + ], + "authors": [ + { + "name": "Jim Wigginton", + "email": "terrafrost@php.net", + "role": "Lead Developer" + }, + { + "name": "Patrick Monnerat", + "email": "pm@datasphere.ch", + "role": "Developer" + }, + { + "name": "Andreas Fischer", + "email": "bantu@phpbb.com", + "role": "Developer" + }, + { + "name": "Hans-Jürgen Petrich", + "email": "petrich@tronic-media.com", + "role": "Developer" + }, + { + "name": "Graham Campbell", + "email": "graham@alt-three.com", + "role": "Developer" + } + ], + "description": "PHP Secure Communications Library - Pure-PHP implementations of RSA, AES, SSH2, SFTP, X.509 etc.", + "homepage": "http://phpseclib.sourceforge.net", + "keywords": [ + "BigInteger", + "aes", + "asn.1", + "asn1", + "blowfish", + "crypto", + "cryptography", + "encryption", + "rsa", + "security", + "sftp", + "signature", + "signing", + "ssh", + "twofish", + "x.509", + "x509" + ], + "support": { + "issues": "https://github.com/phpseclib/phpseclib/issues", + "source": "https://github.com/phpseclib/phpseclib/tree/3.0.43" + }, + "funding": [ + { + "url": "https://github.com/terrafrost", + "type": "github" + }, + { + "url": "https://www.patreon.com/phpseclib", + "type": "patreon" + }, + { + "url": "https://tidelift.com/funding/github/packagist/phpseclib/phpseclib", + "type": "tidelift" + } + ], + "time": "2024-12-14T21:12:59+00:00" + }, { "name": "psr/container", "version": "2.0.2", @@ -4095,16 +4336,16 @@ }, { "name": "utopia-php/platform", - "version": "0.7.1", + "version": "0.7.3", "source": { "type": "git", "url": "https://github.com/utopia-php/platform.git", - "reference": "3433a0f1a54988f2a59c735f507745cb2c24638a" + "reference": "463c2d817c893d7dbb678c2eac7a8291f2710e25" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/platform/zipball/3433a0f1a54988f2a59c735f507745cb2c24638a", - "reference": "3433a0f1a54988f2a59c735f507745cb2c24638a", + "url": "https://api.github.com/repos/utopia-php/platform/zipball/463c2d817c893d7dbb678c2eac7a8291f2710e25", + "reference": "463c2d817c893d7dbb678c2eac7a8291f2710e25", "shasum": "" }, "require": { @@ -4113,7 +4354,7 @@ "php": ">=8.0", "utopia-php/cli": "0.15.*", "utopia-php/framework": "0.33.*", - "utopia-php/queue": "0.7.*" + "utopia-php/queue": "0.8.*" }, "require-dev": { "laravel/pint": "1.2.*", @@ -4139,9 +4380,9 @@ ], "support": { "issues": "https://github.com/utopia-php/platform/issues", - "source": "https://github.com/utopia-php/platform/tree/0.7.1" + "source": "https://github.com/utopia-php/platform/tree/0.7.3" }, - "time": "2024-10-22T10:27:49+00:00" + "time": "2025-02-04T15:09:00+00:00" }, { "name": "utopia-php/pools", @@ -4249,25 +4490,28 @@ }, { "name": "utopia-php/queue", - "version": "0.7.3", + "version": "0.8.0", "source": { "type": "git", "url": "https://github.com/utopia-php/queue.git", - "reference": "16074a98ee7d6212bc1228de200e13db470c098a" + "reference": "a100316767418338350668980370a5896a98fd8c" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/queue/zipball/16074a98ee7d6212bc1228de200e13db470c098a", - "reference": "16074a98ee7d6212bc1228de200e13db470c098a", + "url": "https://api.github.com/repos/utopia-php/queue/zipball/a100316767418338350668980370a5896a98fd8c", + "reference": "a100316767418338350668980370a5896a98fd8c", "shasum": "" }, "require": { - "php": ">=8.1", + "php": ">=8.3", + "php-amqplib/php-amqplib": "^3.7", "utopia-php/cli": "0.15.*", - "utopia-php/framework": "0.*.*", + "utopia-php/fetch": "^0.3.0", + "utopia-php/framework": "0.33.*", "utopia-php/telemetry": "0.1.*" }, "require-dev": { + "ext-redis": "*", "laravel/pint": "^0.2.3", "phpstan/phpstan": "^1.8", "phpunit/phpunit": "^9.5.5", @@ -4305,9 +4549,9 @@ ], "support": { "issues": "https://github.com/utopia-php/queue/issues", - "source": "https://github.com/utopia-php/queue/tree/0.7.3" + "source": "https://github.com/utopia-php/queue/tree/0.8.0" }, - "time": "2024-11-13T12:47:48+00:00" + "time": "2025-02-04T15:01:50+00:00" }, { "name": "utopia-php/registry", @@ -8503,7 +8747,7 @@ ], "aliases": [], "minimum-stability": "stable", - "stability-flags": {}, + "stability-flags": [], "prefer-stable": false, "prefer-lowest": false, "platform": { diff --git a/src/Appwrite/Event/Audit.php b/src/Appwrite/Event/Audit.php index 4b9aa9f5c5..6c2a9c3086 100644 --- a/src/Appwrite/Event/Audit.php +++ b/src/Appwrite/Event/Audit.php @@ -2,7 +2,7 @@ namespace Appwrite\Event; -use Utopia\Queue\Connection; +use Utopia\Queue\Publisher; class Audit extends Event { @@ -12,9 +12,9 @@ class Audit extends Event protected string $ip = ''; protected string $hostname = ''; - public function __construct(protected Connection $connection) + public function __construct(protected Publisher $publisher) { - parent::__construct($connection); + parent::__construct($publisher); $this ->setQueue(Event::AUDITS_QUEUE_NAME) diff --git a/src/Appwrite/Event/Build.php b/src/Appwrite/Event/Build.php index 831adf8e41..9ea163174f 100644 --- a/src/Appwrite/Event/Build.php +++ b/src/Appwrite/Event/Build.php @@ -3,7 +3,7 @@ namespace Appwrite\Event; use Utopia\Database\Document; -use Utopia\Queue\Connection; +use Utopia\Queue\Publisher; class Build extends Event { @@ -12,9 +12,9 @@ class Build extends Event protected ?Document $deployment = null; protected ?Document $template = null; - public function __construct(protected Connection $connection) + public function __construct(protected Publisher $publisher) { - parent::__construct($connection); + parent::__construct($publisher); $this ->setQueue(Event::BUILDS_QUEUE_NAME) diff --git a/src/Appwrite/Event/Certificate.php b/src/Appwrite/Event/Certificate.php index 6a395417ed..827472ae37 100644 --- a/src/Appwrite/Event/Certificate.php +++ b/src/Appwrite/Event/Certificate.php @@ -3,16 +3,16 @@ namespace Appwrite\Event; use Utopia\Database\Document; -use Utopia\Queue\Connection; +use Utopia\Queue\Publisher; class Certificate extends Event { protected bool $skipRenewCheck = false; protected ?Document $domain = null; - public function __construct(protected Connection $connection) + public function __construct(protected Publisher $publisher) { - parent::__construct($connection); + parent::__construct($publisher); $this ->setQueue(Event::CERTIFICATES_QUEUE_NAME) diff --git a/src/Appwrite/Event/Database.php b/src/Appwrite/Event/Database.php index 24123de6c1..d2f70dddf2 100644 --- a/src/Appwrite/Event/Database.php +++ b/src/Appwrite/Event/Database.php @@ -4,7 +4,7 @@ namespace Appwrite\Event; use Utopia\Database\Document; use Utopia\DSN\DSN; -use Utopia\Queue\Connection; +use Utopia\Queue\Publisher; class Database extends Event { @@ -13,9 +13,9 @@ class Database extends Event protected ?Document $collection = null; protected ?Document $document = null; - public function __construct(protected Connection $connection) + public function __construct(protected Publisher $publisher) { - parent::__construct($connection); + parent::__construct($publisher); $this->setClass(Event::DATABASE_CLASS_NAME); } diff --git a/src/Appwrite/Event/Delete.php b/src/Appwrite/Event/Delete.php index f0af20f21b..450be306d7 100644 --- a/src/Appwrite/Event/Delete.php +++ b/src/Appwrite/Event/Delete.php @@ -3,7 +3,7 @@ namespace Appwrite\Event; use Utopia\Database\Document; -use Utopia\Queue\Connection; +use Utopia\Queue\Publisher; class Delete extends Event { @@ -15,9 +15,9 @@ class Delete extends Event protected ?string $hourlyUsageRetentionDatetime = null; - public function __construct(protected Connection $connection) + public function __construct(protected Publisher $publisher) { - parent::__construct($connection); + parent::__construct($publisher); $this ->setQueue(Event::DELETE_QUEUE_NAME) diff --git a/src/Appwrite/Event/Event.php b/src/Appwrite/Event/Event.php index 5cd5f8e7d6..f56aeeb757 100644 --- a/src/Appwrite/Event/Event.php +++ b/src/Appwrite/Event/Event.php @@ -4,8 +4,8 @@ namespace Appwrite\Event; use InvalidArgumentException; use Utopia\Database\Document; -use Utopia\Queue\Client; -use Utopia\Queue\Connection; +use Utopia\Queue\Publisher; +use Utopia\Queue\Queue; class Event { @@ -58,10 +58,10 @@ class Event protected bool $paused = false; /** - * @param Connection $connection + * @param Publisher $publisher * @return void */ - public function __construct(protected Connection $connection) + public function __construct(protected Publisher $publisher) { } @@ -345,12 +345,11 @@ class Event } /** The getter is required since events like Databases need to override the queue name depending on the project */ - $client = new Client($this->getQueue(), $this->connection); + $queue = new Queue($this->getQueue()); // Merge the base payload with any trimmed values $payload = array_merge($this->preparePayload(), $this->trimPayload()); - - return $client->enqueue($payload); + return $this->publisher->enqueue($queue, $payload); } /** diff --git a/src/Appwrite/Event/Func.php b/src/Appwrite/Event/Func.php index b3945fccb8..ae316c84e5 100644 --- a/src/Appwrite/Event/Func.php +++ b/src/Appwrite/Event/Func.php @@ -3,7 +3,7 @@ namespace Appwrite\Event; use Utopia\Database\Document; -use Utopia\Queue\Connection; +use Utopia\Queue\Publisher; class Func extends Event { @@ -19,9 +19,9 @@ class Func extends Event protected ?Document $function = null; protected ?Document $execution = null; - public function __construct(protected Connection $connection) + public function __construct(protected Publisher $publisher) { - parent::__construct($connection); + parent::__construct($publisher); $this ->setQueue(Event::FUNCTIONS_QUEUE_NAME) diff --git a/src/Appwrite/Event/Mail.php b/src/Appwrite/Event/Mail.php index 1c9e539cdb..87312182ea 100644 --- a/src/Appwrite/Event/Mail.php +++ b/src/Appwrite/Event/Mail.php @@ -2,7 +2,7 @@ namespace Appwrite\Event; -use Utopia\Queue\Connection; +use Utopia\Queue\Publisher; class Mail extends Event { @@ -15,9 +15,9 @@ class Mail extends Event protected string $bodyTemplate = ''; protected array $attachment = []; - public function __construct(protected Connection $connection) + public function __construct(protected Publisher $publisher) { - parent::__construct($connection); + parent::__construct($publisher); $this ->setQueue(Event::MAILS_QUEUE_NAME) diff --git a/src/Appwrite/Event/Messaging.php b/src/Appwrite/Event/Messaging.php index 61dbe9c427..3ddbac1040 100644 --- a/src/Appwrite/Event/Messaging.php +++ b/src/Appwrite/Event/Messaging.php @@ -3,7 +3,7 @@ namespace Appwrite\Event; use Utopia\Database\Document; -use Utopia\Queue\Connection; +use Utopia\Queue\Publisher; class Messaging extends Event { @@ -14,9 +14,9 @@ class Messaging extends Event protected ?string $scheduledAt = null; protected ?string $providerType = null; - public function __construct(protected Connection $connection) + public function __construct(protected Publisher $publisher) { - parent::__construct($connection); + parent::__construct($publisher); $this ->setQueue(Event::MESSAGING_QUEUE_NAME) diff --git a/src/Appwrite/Event/Migration.php b/src/Appwrite/Event/Migration.php index 5fb2d5a106..bbb8d77c73 100644 --- a/src/Appwrite/Event/Migration.php +++ b/src/Appwrite/Event/Migration.php @@ -3,16 +3,16 @@ namespace Appwrite\Event; use Utopia\Database\Document; -use Utopia\Queue\Connection; +use Utopia\Queue\Publisher; class Migration extends Event { protected string $type = ''; protected ?Document $migration = null; - public function __construct(protected Connection $connection) + public function __construct(protected Publisher $publisher) { - parent::__construct($connection); + parent::__construct($publisher); $this ->setQueue(Event::MIGRATIONS_QUEUE_NAME) diff --git a/src/Appwrite/Event/Usage.php b/src/Appwrite/Event/Usage.php index 5609859f37..c70cea5c73 100644 --- a/src/Appwrite/Event/Usage.php +++ b/src/Appwrite/Event/Usage.php @@ -3,16 +3,16 @@ namespace Appwrite\Event; use Utopia\Database\Document; -use Utopia\Queue\Connection; +use Utopia\Queue\Publisher; class Usage extends Event { protected array $metrics = []; protected array $reduce = []; - public function __construct(protected Connection $connection) + public function __construct(protected Publisher $publisher) { - parent::__construct($connection); + parent::__construct($publisher); $this ->setQueue(Event::USAGE_QUEUE_NAME) diff --git a/src/Appwrite/Event/UsageDump.php b/src/Appwrite/Event/UsageDump.php index 6f44de4eda..a70716e94f 100644 --- a/src/Appwrite/Event/UsageDump.php +++ b/src/Appwrite/Event/UsageDump.php @@ -2,15 +2,15 @@ namespace Appwrite\Event; -use Utopia\Queue\Connection; +use Utopia\Queue\Publisher; class UsageDump extends Event { protected array $stats; - public function __construct(protected Connection $connection) + public function __construct(protected Publisher $publisher) { - parent::__construct($connection); + parent::__construct($publisher); $this ->setQueue(Event::USAGE_DUMP_QUEUE_NAME) diff --git a/src/Appwrite/Event/Webhook.php b/src/Appwrite/Event/Webhook.php index 3e0dbe446f..5cc65758ee 100644 --- a/src/Appwrite/Event/Webhook.php +++ b/src/Appwrite/Event/Webhook.php @@ -2,13 +2,13 @@ namespace Appwrite\Event; -use Utopia\Queue\Connection; +use Utopia\Queue\Publisher; class Webhook extends Event { - public function __construct(protected Connection $connection) + public function __construct(protected Publisher $publisher) { - parent::__construct($connection); + parent::__construct($publisher); $this ->setQueue(Event::WEBHOOK_QUEUE_NAME) diff --git a/src/Appwrite/Platform/Services/Tasks.php b/src/Appwrite/Platform/Services/Tasks.php index 7a0d5b60ac..c09f961fc0 100644 --- a/src/Appwrite/Platform/Services/Tasks.php +++ b/src/Appwrite/Platform/Services/Tasks.php @@ -6,7 +6,6 @@ use Appwrite\Platform\Tasks\Doctor; use Appwrite\Platform\Tasks\Install; use Appwrite\Platform\Tasks\Maintenance; use Appwrite\Platform\Tasks\Migrate; -use Appwrite\Platform\Tasks\QueueCount; use Appwrite\Platform\Tasks\QueueRetry; use Appwrite\Platform\Tasks\ScheduleExecutions; use Appwrite\Platform\Tasks\ScheduleFunctions; @@ -29,7 +28,6 @@ class Tasks extends Service ->addAction(Install::getName(), new Install()) ->addAction(Maintenance::getName(), new Maintenance()) ->addAction(Migrate::getName(), new Migrate()) - ->addAction(QueueCount::getName(), new QueueCount()) ->addAction(QueueRetry::getName(), new QueueRetry()) ->addAction(SDKs::getName(), new SDKs()) ->addAction(SSL::getName(), new SSL()) diff --git a/src/Appwrite/Platform/Tasks/QueueCount.php b/src/Appwrite/Platform/Tasks/QueueCount.php deleted file mode 100644 index b02165c1d2..0000000000 --- a/src/Appwrite/Platform/Tasks/QueueCount.php +++ /dev/null @@ -1,57 +0,0 @@ -desc('Return the number of from a specific queue identified by the name parameter with a specific type') - ->param('name', '', new Text(100), 'Queue name') - ->param('type', '', new WhiteList([ - 'success', - 'failed', - 'processing', - ]), 'Queue type') - ->inject('queue') - ->callback(fn ($name, $type, $queue) => $this->action($name, $type, $queue)); - } - - /** - * @param string $name The name of the queue to count the jobs from - * @param string $type The type of jobs to count - * @param Connection $queue - */ - public function action(string $name, string $type, Connection $queue): void - { - if (!$name) { - Console::error('Missing required parameter $name'); - return; - } - - $queueClient = new Client($name, $queue); - - $count = match ($type) { - 'success' => $queueClient->countSuccessfulJobs(), - 'failed' => $queueClient->countFailedJobs(), - 'processing' => $queueClient->countProcessingJobs(), - default => 0 - }; - - Console::log("Queue: '{$name}' has {$count} {$type} jobs."); - } -} diff --git a/src/Appwrite/Platform/Tasks/QueueRetry.php b/src/Appwrite/Platform/Tasks/QueueRetry.php index b6139dc177..9fe4aed799 100644 --- a/src/Appwrite/Platform/Tasks/QueueRetry.php +++ b/src/Appwrite/Platform/Tasks/QueueRetry.php @@ -4,8 +4,8 @@ namespace Appwrite\Platform\Tasks; use Utopia\CLI\Console; use Utopia\Platform\Action; -use Utopia\Queue\Client; -use Utopia\Queue\Connection; +use Utopia\Queue\Publisher; +use Utopia\Queue\Queue; use Utopia\Validator\Text; use Utopia\Validator\Wildcard; @@ -23,33 +23,24 @@ class QueueRetry extends Action ->desc('Retry failed jobs from a specific queue identified by the name parameter') ->param('name', '', new Text(100), 'Queue name') ->param('limit', 0, new Wildcard(), 'jobs limit', true) - ->inject('queue') - ->callback(fn ($name, $limit, $queue) => $this->action($name, $limit, $queue)); + ->inject('publisher') + ->callback(fn ($name, $limit, $publisher) => $this->action($name, $limit, $publisher)); } /** * @param string $name The name of the queue to retry jobs from * @param mixed $limit - * @param Connection $queue + * @param Publisher $publisher */ - public function action(string $name, mixed $limit, Connection $queue): void + public function action(string $name, mixed $limit, Publisher $publisher): void { - if (!$name) { Console::error('Missing required parameter $name'); return; } $limit = (int)$limit; - $queueClient = new Client($name, $queue); - - if ($queueClient->countFailedJobs() === 0) { - Console::error('No failed jobs found.'); - return; - } - Console::log('Retrying failed jobs...'); - - $queueClient->retry($limit); + $publisher->retry(new Queue($name), $limit); } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index dad2db0d9a..abcad8c02e 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -11,7 +11,7 @@ use Utopia\Database\Exception; use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; use Utopia\Platform\Action; -use Utopia\Pools\Group; +use Utopia\Queue\Publisher; use Utopia\System\System; use function Swoole\Coroutine\run; @@ -26,7 +26,7 @@ abstract class ScheduleBase extends Action abstract public static function getName(): string; abstract public static function getSupportedResource(): string; abstract public static function getCollectionId(): string; - abstract protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void; + abstract protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void; public function __construct() { @@ -34,10 +34,10 @@ abstract class ScheduleBase extends Action $this ->desc("Execute {$type}s scheduled in Appwrite") - ->inject('pools') + ->inject('publisher') ->inject('dbForPlatform') ->inject('getProjectDB') - ->callback(fn (Group $pools, Database $dbForPlatform, callable $getProjectDB) => $this->action($pools, $dbForPlatform, $getProjectDB)); + ->callback(fn (Publisher $publisher, Database $dbForPlatform, callable $getProjectDB) => $this->action($publisher, $dbForPlatform, $getProjectDB)); } protected function updateProjectAccess(Document $project, Database $dbForPlatform): void @@ -56,7 +56,7 @@ abstract class ScheduleBase extends Action * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker. */ - public function action(Group $pools, Database $dbForPlatform, callable $getProjectDB): void + public function action(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void { Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); @@ -125,17 +125,15 @@ abstract class ScheduleBase extends Action $latestDocument = \end($results); } - $pools->reclaim(); - Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds"); Console::success("Starting timers at " . DateTime::now()); - run(function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $pools, $getProjectDB) { + run(function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $publisher, $getProjectDB) { /** * The timer synchronize $schedules copy with database collection. */ - Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $pools) { + Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule) { $time = DateTime::now(); $timerStart = \microtime(true); @@ -184,17 +182,15 @@ abstract class ScheduleBase extends Action $lastSyncUpdate = $time; $timerEnd = \microtime(true); - $pools->reclaim(); - Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds"); }); Timer::tick( static::ENQUEUE_TIMER * 1000, - fn () => $this->enqueueResources($pools, $dbForPlatform, $getProjectDB) + fn () => $this->enqueueResources($publisher, $dbForPlatform, $getProjectDB) ); - $this->enqueueResources($pools, $dbForPlatform, $getProjectDB); + $this->enqueueResources($publisher, $dbForPlatform, $getProjectDB); }); } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php index 086bad513e..25de721b38 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleExecutions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleExecutions.php @@ -5,7 +5,7 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Func; use Swoole\Coroutine as Co; use Utopia\Database\Database; -use Utopia\Pools\Group; +use Utopia\Queue\Publisher; class ScheduleExecutions extends ScheduleBase { @@ -27,11 +27,9 @@ class ScheduleExecutions extends ScheduleBase return 'executions'; } - protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void + protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void { - $queue = $pools->get('queue')->pop(); - $connection = $queue->getResource(); - $queueForFunctions = new Func($connection); + $queueForFunctions = new Func($publisher); $intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds'); foreach ($this->schedules as $schedule) { @@ -83,7 +81,5 @@ class ScheduleExecutions extends ScheduleBase unset($this->schedules[$schedule['$internalId']]); } - - $queue->reclaim(); } } diff --git a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php index c443bb6c2d..4f8579862f 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleFunctions.php +++ b/src/Appwrite/Platform/Tasks/ScheduleFunctions.php @@ -7,7 +7,7 @@ use Cron\CronExpression; use Utopia\CLI\Console; use Utopia\Database\Database; use Utopia\Database\DateTime; -use Utopia\Pools\Group; +use Utopia\Queue\Publisher; class ScheduleFunctions extends ScheduleBase { @@ -31,7 +31,7 @@ class ScheduleFunctions extends ScheduleBase return 'functions'; } - protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void + protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void { $timerStart = \microtime(true); $time = DateTime::now(); @@ -70,12 +70,9 @@ class ScheduleFunctions extends ScheduleBase } foreach ($delayedExecutions as $delay => $scheduleKeys) { - \go(function () use ($delay, $scheduleKeys, $pools, $dbForPlatform) { + \go(function () use ($delay, $scheduleKeys, $publisher, $dbForPlatform) { \sleep($delay); // in seconds - $queue = $pools->get('queue')->pop(); - $connection = $queue->getResource(); - foreach ($scheduleKeys as $scheduleKey) { // Ensure schedule was not deleted if (!\array_key_exists($scheduleKey, $this->schedules)) { @@ -86,8 +83,7 @@ class ScheduleFunctions extends ScheduleBase $this->updateProjectAccess($schedule['project'], $dbForPlatform); - $queueForFunctions = new Func($connection); - + $queueForFunctions = new Func($publisher); $queueForFunctions ->setType('schedule') ->setFunction($schedule['resource']) @@ -96,8 +92,6 @@ class ScheduleFunctions extends ScheduleBase ->setProject($schedule['project']) ->trigger(); } - - $queue->reclaim(); }); } diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index 5d997fc5bb..a575fb819a 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -4,7 +4,7 @@ namespace Appwrite\Platform\Tasks; use Appwrite\Event\Messaging; use Utopia\Database\Database; -use Utopia\Pools\Group; +use Utopia\Queue\Publisher; class ScheduleMessages extends ScheduleBase { @@ -26,7 +26,7 @@ class ScheduleMessages extends ScheduleBase return 'messages'; } - protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void + protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void { foreach ($this->schedules as $schedule) { if (!$schedule['active']) { @@ -40,13 +40,9 @@ class ScheduleMessages extends ScheduleBase continue; } - \go(function () use ($schedule, $pools, $dbForPlatform) { - $queue = $pools->get('queue')->pop(); - $connection = $queue->getResource(); - $queueForMessaging = new Messaging($connection); - + \go(function () use ($schedule, $publisher, $dbForPlatform) { $this->updateProjectAccess($schedule['project'], $dbForPlatform); - + $queueForMessaging = new Messaging($publisher); $queueForMessaging ->setType(MESSAGE_SEND_TYPE_EXTERNAL) ->setMessageId($schedule['resourceId']) @@ -58,8 +54,6 @@ class ScheduleMessages extends ScheduleBase $schedule['$id'], ); - $queue->reclaim(); - unset($this->schedules[$schedule['$internalId']]); }); } diff --git a/tests/e2e/Services/Health/HealthCustomServerTest.php b/tests/e2e/Services/Health/HealthCustomServerTest.php index 9d6a04abe6..f2c6a2e5c2 100644 --- a/tests/e2e/Services/Health/HealthCustomServerTest.php +++ b/tests/e2e/Services/Health/HealthCustomServerTest.php @@ -67,24 +67,6 @@ class HealthCustomServerTest extends Scope return []; } - public function testQueueSuccess(): array - { - /** - * Test for SUCCESS - */ - $response = $this->client->call(Client::METHOD_GET, '/health/queue', array_merge([ - 'content-type' => 'application/json', - 'x-appwrite-project' => $this->getProject()['$id'], - ], $this->getHeaders()), []); - - $this->assertEquals(200, $response['headers']['status-code']); - $this->assertEquals('pass', $response['body']['statuses'][0]['status']); - $this->assertIsInt($response['body']['statuses'][0]['ping']); - $this->assertLessThan(100, $response['body']['statuses'][0]['ping']); - - return []; - } - public function testPubSubSuccess(): array { /** diff --git a/tests/unit/Event/EventTest.php b/tests/unit/Event/EventTest.php index 079bb47b65..c852cf2757 100644 --- a/tests/unit/Event/EventTest.php +++ b/tests/unit/Event/EventTest.php @@ -5,7 +5,7 @@ namespace Tests\Unit\Event; use Appwrite\Event\Event; use InvalidArgumentException; use PHPUnit\Framework\TestCase; -use Utopia\Queue\Client; +use Utopia\Queue\Publisher; require_once __DIR__ . '/../../../app/init.php'; @@ -13,13 +13,14 @@ class EventTest extends TestCase { protected ?Event $object = null; protected string $queue = ''; + protected Publisher $publisher; public function setUp(): void { - global $register; - $connection = $register->get('pools')->get('queue')->pop()->getResource(); + $this->publisher = new MockPublisher(); + $this->queue = 'v1-tests' . uniqid(); - $this->object = new Event($connection); + $this->object = new Event($this->publisher); $this->object->setClass('TestsV1'); $this->object->setQueue($this->queue); } @@ -51,10 +52,7 @@ class EventTest extends TestCase $this->assertEquals('eventValue1', $this->object->getParam('eventKey1')); $this->assertEquals('eventValue2', $this->object->getParam('eventKey2')); $this->assertEquals(null, $this->object->getParam('eventKey3')); - global $register; - $pools = $register->get('pools'); - $client = new Client($this->object->getQueue(), $pools->get('queue')->pop()->getResource()); - $this->assertEquals($client->getQueueSize(), 1); + $this->assertCount(1, $this->publisher->getEvents($this->object->getQueue())); } public function testReset(): void diff --git a/tests/unit/Event/MockPublisher.php b/tests/unit/Event/MockPublisher.php new file mode 100644 index 0000000000..54fcc89358 --- /dev/null +++ b/tests/unit/Event/MockPublisher.php @@ -0,0 +1,35 @@ +events[$queue->name])) { + $this->events[$queue->name] = []; + } + $this->events[$queue->name][] = $payload; + return true; + } + + public function getEvents(string $queue) + { + return $this->events[$queue] ?? null; + } + + public function retry(Queue $queue, int $limit = null): void + { + // TODO: Implement retry() method. + } + + public function getQueueSize(Queue $queue, bool $failedJobs = false): int + { + return count($this->events[$queue->name]); + } +} diff --git a/tests/unit/Usage/StatsTest.php b/tests/unit/Usage/StatsTest.php deleted file mode 100644 index 79fa1f58ec..0000000000 --- a/tests/unit/Usage/StatsTest.php +++ /dev/null @@ -1,41 +0,0 @@ -get('pools')->get('queue')->pop()->getResource(); - $this->connection = $connection; - $this->client = new Client(self::QUEUE_NAME, $this->connection); - } - - public function tearDown(): void - { - } - - public function testSamePayload(): void - { - $inToQueue = [ - 'key_1' => 'value_1', - 'key_2' => 'value_2', - ]; - - $result = $this->client->enqueue($inToQueue); - $this->assertTrue($result); - $outFromQueue = $this->connection->leftPopArray('utopia-queue.queue.' . self::QUEUE_NAME, 0)['payload']; - $this->assertNotEmpty($outFromQueue); - $this->assertSame($inToQueue, $outFromQueue); - } -}