Merge pull request #9287 from appwrite/amqp

feat: add AMQP queues
This commit is contained in:
Christy Jacob 2025-02-05 15:01:42 +05:30 committed by GitHub
commit 1adb1e23a9
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
31 changed files with 534 additions and 474 deletions

View file

@ -19,7 +19,7 @@ use Utopia\DSN\DSN;
use Utopia\Logger\Log;
use Utopia\Platform\Service;
use Utopia\Pools\Group;
use Utopia\Queue\Connection;
use Utopia\Queue\Publisher;
use Utopia\Registry\Registry;
use Utopia\System\System;
@ -160,18 +160,18 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform
};
}, ['pools', 'dbForPlatform', 'cache']);
CLI::setResource('queue', function (Group $pools) {
return $pools->get('queue')->pop()->getResource();
CLI::setResource('publisher', function (Group $pools) {
return $pools->get('publisher')->pop()->getResource();
}, ['pools']);
CLI::setResource('queueForFunctions', function (Connection $queue) {
return new Func($queue);
}, ['queue']);
CLI::setResource('queueForDeletes', function (Connection $queue) {
return new Delete($queue);
}, ['queue']);
CLI::setResource('queueForCertificates', function (Connection $queue) {
return new Certificate($queue);
}, ['queue']);
CLI::setResource('queueForFunctions', function (Publisher $publisher) {
return new Func($publisher);
}, ['publisher']);
CLI::setResource('queueForDeletes', function (Publisher $publisher) {
return new Delete($publisher);
}, ['publisher']);
CLI::setResource('queueForCertificates', function (Publisher $publisher) {
return new Certificate($publisher);
}, ['publisher']);
CLI::setResource('logError', function (Registry $register) {
return function (Throwable $error, string $namespace, string $action) use ($register) {
$logger = $register->get('logger');

View file

@ -13,8 +13,8 @@ use Utopia\Config\Config;
use Utopia\Database\Document;
use Utopia\Domains\Validator\PublicDomain;
use Utopia\Pools\Group;
use Utopia\Queue\Client;
use Utopia\Queue\Connection;
use Utopia\Queue\Publisher;
use Utopia\Queue\Queue;
use Utopia\Registry\Registry;
use Utopia\Storage\Device;
use Utopia\Storage\Device\Local;
@ -188,69 +188,6 @@ App::get('/v1/health/cache')
]), Response::MODEL_HEALTH_STATUS_LIST);
});
App::get('/v1/health/queue')
->desc('Get queue')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
name: 'getQueue',
description: '/docs/references/health/get-queue.md',
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
model: Response::MODEL_HEALTH_STATUS,
)
],
contentType: ContentType::JSON
))
->inject('response')
->inject('pools')
->action(function (Response $response, Group $pools) {
$output = [];
$configs = [
'Queue' => Config::getParam('pools-queue'),
];
foreach ($configs as $key => $config) {
foreach ($config as $database) {
$checkStart = \microtime(true);
try {
/** @var Connection $adapter */
$adapter = $pools->get($database)->pop()->getResource();
if ($adapter->ping()) {
$output[] = new Document([
'name' => $key . " ($database)",
'status' => 'pass',
'ping' => \round((\microtime(true) - $checkStart) / 1000)
]);
} else {
$output[] = new Document([
'name' => $key . " ($database)",
'status' => 'fail',
'ping' => \round((\microtime(true) - $checkStart) / 1000)
]);
}
} catch (\Throwable $th) {
$output[] = new Document([
'name' => $key . " ($database)",
'status' => 'fail',
'ping' => \round((\microtime(true) - $checkStart) / 1000)
]);
}
}
}
$response->dynamic(new Document([
'statuses' => $output,
'total' => count($output),
]), Response::MODEL_HEALTH_STATUS_LIST);
});
App::get('/v1/health/pubsub')
->desc('Get pubsub')
->groups(['api', 'health'])
@ -396,13 +333,12 @@ App::get('/v1/health/queue/webhooks')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queue')
->inject('publisher')
->inject('response')
->action(function (int|string $threshold, Connection $queue, Response $response) {
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
$threshold = \intval($threshold);
$client = new Client(Event::WEBHOOK_QUEUE_NAME, $queue);
$size = $client->getQueueSize();
$size = $publisher->getQueueSize(new Queue(Event::WEBHOOK_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -429,13 +365,12 @@ App::get('/v1/health/queue/logs')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queue')
->inject('publisher')
->inject('response')
->action(function (int|string $threshold, Connection $queue, Response $response) {
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
$threshold = \intval($threshold);
$client = new Client(Event::AUDITS_QUEUE_NAME, $queue);
$size = $client->getQueueSize();
$size = $publisher->getQueueSize(new Queue(Event::AUDITS_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -518,13 +453,12 @@ App::get('/v1/health/queue/certificates')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queue')
->inject('publisher')
->inject('response')
->action(function (int|string $threshold, Connection $queue, Response $response) {
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
$threshold = \intval($threshold);
$client = new Client(Event::CERTIFICATES_QUEUE_NAME, $queue);
$size = $client->getQueueSize();
$size = $publisher->getQueueSize(new Queue(Event::CERTIFICATES_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -551,13 +485,12 @@ App::get('/v1/health/queue/builds')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queue')
->inject('publisher')
->inject('response')
->action(function (int|string $threshold, Connection $queue, Response $response) {
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
$threshold = \intval($threshold);
$client = new Client(Event::BUILDS_QUEUE_NAME, $queue);
$size = $client->getQueueSize();
$size = $publisher->getQueueSize(new Queue(Event::BUILDS_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -585,13 +518,12 @@ App::get('/v1/health/queue/databases')
))
->param('name', 'database_db_main', new Text(256), 'Queue name for which to check the queue size', true)
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queue')
->inject('publisher')
->inject('response')
->action(function (string $name, int|string $threshold, Connection $queue, Response $response) {
->action(function (string $name, int|string $threshold, Publisher $publisher, Response $response) {
$threshold = \intval($threshold);
$client = new Client($name, $queue);
$size = $client->getQueueSize();
$size = $publisher->getQueueSize(new Queue($name));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -618,13 +550,12 @@ App::get('/v1/health/queue/deletes')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queue')
->inject('publisher')
->inject('response')
->action(function (int|string $threshold, Connection $queue, Response $response) {
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
$threshold = \intval($threshold);
$client = new Client(Event::DELETE_QUEUE_NAME, $queue);
$size = $client->getQueueSize();
$size = $publisher->getQueueSize(new Queue(Event::DELETE_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -651,13 +582,12 @@ App::get('/v1/health/queue/mails')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queue')
->inject('publisher')
->inject('response')
->action(function (int|string $threshold, Connection $queue, Response $response) {
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
$threshold = \intval($threshold);
$client = new Client(Event::MAILS_QUEUE_NAME, $queue);
$size = $client->getQueueSize();
$size = $publisher->getQueueSize(new Queue(Event::MAILS_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -684,13 +614,12 @@ App::get('/v1/health/queue/messaging')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queue')
->inject('publisher')
->inject('response')
->action(function (int|string $threshold, Connection $queue, Response $response) {
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
$threshold = \intval($threshold);
$client = new Client(Event::MESSAGING_QUEUE_NAME, $queue);
$size = $client->getQueueSize();
$size = $publisher->getQueueSize(new Queue(Event::MESSAGING_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -717,13 +646,12 @@ App::get('/v1/health/queue/migrations')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queue')
->inject('publisher')
->inject('response')
->action(function (int|string $threshold, Connection $queue, Response $response) {
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
$threshold = \intval($threshold);
$client = new Client(Event::MIGRATIONS_QUEUE_NAME, $queue);
$size = $client->getQueueSize();
$size = $publisher->getQueueSize(new Queue(Event::MIGRATIONS_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -750,13 +678,12 @@ App::get('/v1/health/queue/functions')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queue')
->inject('publisher')
->inject('response')
->action(function (int|string $threshold, Connection $queue, Response $response) {
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
$threshold = \intval($threshold);
$client = new Client(Event::FUNCTIONS_QUEUE_NAME, $queue);
$size = $client->getQueueSize();
$size = $publisher->getQueueSize(new Queue(Event::FUNCTIONS_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -783,13 +710,12 @@ App::get('/v1/health/queue/usage')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queue')
->inject('publisher')
->inject('response')
->action(function (int|string $threshold, Connection $queue, Response $response) {
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
$threshold = \intval($threshold);
$client = new Client(Event::USAGE_QUEUE_NAME, $queue);
$size = $client->getQueueSize();
$size = $publisher->getQueueSize(new Queue(Event::USAGE_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -816,13 +742,12 @@ App::get('/v1/health/queue/usage-dump')
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queue')
->inject('publisher')
->inject('response')
->action(function (int|string $threshold, Connection $queue, Response $response) {
->action(function (int|string $threshold, Publisher $publisher, Response $response) {
$threshold = \intval($threshold);
$client = new Client(Event::USAGE_DUMP_QUEUE_NAME, $queue);
$size = $client->getQueueSize();
$size = $publisher->getQueueSize(new Queue(Event::USAGE_DUMP_QUEUE_NAME));
if ($size >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue size threshold hit. Current size is {$size} and threshold is {$threshold}.");
@ -1005,12 +930,11 @@ App::get('/v1/health/queue/failed/:name')
]), 'The name of the queue')
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('response')
->inject('queue')
->action(function (string $name, int|string $threshold, Response $response, Connection $queue) {
->inject('publisher')
->action(function (string $name, int|string $threshold, Response $response, Publisher $publisher) {
$threshold = \intval($threshold);
$client = new Client($name, $queue);
$failed = $client->countFailedJobs();
$failed = $publisher->getQueueSize(new Queue($name), failedJobs: true);
if ($failed >= $threshold) {
throw new Exception(Exception::HEALTH_QUEUE_SIZE_EXCEEDED, "Queue failed jobs threshold hit. Current size is {$failed} and threshold is {$threshold}.");

View file

@ -28,7 +28,7 @@ use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Helpers\Role;
use Utopia\Database\Validator\Authorization;
use Utopia\Queue\Connection;
use Utopia\Queue\Publisher;
use Utopia\System\System;
use Utopia\Validator\WhiteList;
@ -429,7 +429,7 @@ App::init()
->inject('response')
->inject('project')
->inject('user')
->inject('queue')
->inject('publisher')
->inject('queueForEvents')
->inject('queueForMessaging')
->inject('queueForAudits')
@ -440,7 +440,7 @@ App::init()
->inject('dbForProject')
->inject('timelimit')
->inject('mode')
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Connection $queue, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Database $dbForProject, callable $timelimit, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) {
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Publisher $publisher, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Database $dbForProject, callable $timelimit, string $mode) use ($usageDatabaseListener, $eventDatabaseListener) {
$route = $utopia->getRoute();
@ -544,9 +544,9 @@ App::init()
// Clone the queues, to prevent events triggered by the database listener
// from overwriting the events that are supposed to be triggered in the shutdown hook.
$queueForEventsClone = new Event($queue);
$queueForFunctions = new Func($queue);
$queueForWebhooks = new Webhook($queue);
$queueForEventsClone = new Event($publisher);
$queueForFunctions = new Func($publisher);
$queueForWebhooks = new Webhook($publisher);
$queueForRealtime = new Realtime();
$dbForProject

View file

@ -77,7 +77,6 @@ use Utopia\Logger\Logger;
use Utopia\Pools\Group;
use Utopia\Pools\Pool;
use Utopia\Queue;
use Utopia\Queue\Connection;
use Utopia\Registry\Registry;
use Utopia\Storage\Device;
use Utopia\Storage\Device\Backblaze;
@ -890,8 +889,14 @@ $register->set('pools', function () {
'multiple' => false,
'schemes' => ['mariadb', 'mysql'],
],
'queue' => [
'type' => 'queue',
'publisher' => [
'type' => 'publisher',
'dsns' => $fallbackForRedis,
'multiple' => false,
'schemes' => ['redis'],
],
'consumer' => [
'type' => 'consumer',
'dsns' => $fallbackForRedis,
'multiple' => false,
'schemes' => ['redis'],
@ -999,31 +1004,26 @@ $register->set('pools', function () {
};
$adapter->setDatabase($dsn->getPath());
break;
return $adapter;
case 'pubsub':
$adapter = match ($dsn->getScheme()) {
return match ($dsn->getScheme()) {
'redis' => new PubSub($resource()),
default => null
};
break;
case 'queue':
$adapter = match ($dsn->getScheme()) {
'redis' => new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort()),
case 'publisher':
case 'consumer':
return match ($dsn->getScheme()) {
'redis' => new Queue\Broker\Redis(new Queue\Connection\Redis($dsn->getHost(), $dsn->getPort())),
default => null
};
break;
case 'cache':
$adapter = match ($dsn->getScheme()) {
return match ($dsn->getScheme()) {
'redis' => new RedisCache($resource()),
default => null
};
break;
default:
throw new Exception(Exception::GENERAL_SERVER_ERROR, "Server error: Missing adapter implementation.");
}
return $adapter;
});
$group->add($pool);
@ -1146,48 +1146,51 @@ App::setResource('localeCodes', function () {
});
// Queues
App::setResource('queue', function (Group $pools) {
return $pools->get('queue')->pop()->getResource();
App::setResource('publisher', function (Group $pools) {
return $pools->get('publisher')->pop()->getResource();
}, ['pools']);
App::setResource('queueForMessaging', function (Connection $queue) {
return new Messaging($queue);
}, ['queue']);
App::setResource('queueForMails', function (Connection $queue) {
return new Mail($queue);
}, ['queue']);
App::setResource('queueForBuilds', function (Connection $queue) {
return new Build($queue);
}, ['queue']);
App::setResource('queueForDatabase', function (Connection $queue) {
return new EventDatabase($queue);
}, ['queue']);
App::setResource('queueForDeletes', function (Connection $queue) {
return new Delete($queue);
}, ['queue']);
App::setResource('queueForEvents', function (Connection $queue) {
return new Event($queue);
}, ['queue']);
App::setResource('queueForWebhooks', function (Connection $queue) {
return new Webhook($queue);
}, ['queue']);
App::setResource('consumer', function (Group $pools) {
return $pools->get('consumer')->pop()->getResource();
}, ['pools']);
App::setResource('queueForMessaging', function (Queue\Publisher $publisher) {
return new Messaging($publisher);
}, ['publisher']);
App::setResource('queueForMails', function (Queue\Publisher $publisher) {
return new Mail($publisher);
}, ['publisher']);
App::setResource('queueForBuilds', function (Queue\Publisher $publisher) {
return new Build($publisher);
}, ['publisher']);
App::setResource('queueForDatabase', function (Queue\Publisher $publisher) {
return new EventDatabase($publisher);
}, ['publisher']);
App::setResource('queueForDeletes', function (Queue\Publisher $publisher) {
return new Delete($publisher);
}, ['publisher']);
App::setResource('queueForEvents', function (Queue\Publisher $publisher) {
return new Event($publisher);
}, ['publisher']);
App::setResource('queueForWebhooks', function (Queue\Publisher $publisher) {
return new Webhook($publisher);
}, ['publisher']);
App::setResource('queueForRealtime', function () {
return new Realtime();
}, []);
App::setResource('queueForAudits', function (Connection $queue) {
return new Audit($queue);
}, ['queue']);
App::setResource('queueForFunctions', function (Connection $queue) {
return new Func($queue);
}, ['queue']);
App::setResource('queueForUsage', function (Connection $queue) {
return new Usage($queue);
}, ['queue']);
App::setResource('queueForCertificates', function (Connection $queue) {
return new Certificate($queue);
}, ['queue']);
App::setResource('queueForMigrations', function (Connection $queue) {
return new Migration($queue);
}, ['queue']);
App::setResource('queueForAudits', function (Queue\Publisher $publisher) {
return new Audit($publisher);
}, ['publisher']);
App::setResource('queueForFunctions', function (Queue\Publisher $publisher) {
return new Func($publisher);
}, ['publisher']);
App::setResource('queueForUsage', function (Queue\Publisher $publisher) {
return new Usage($publisher);
}, ['publisher']);
App::setResource('queueForCertificates', function (Queue\Publisher $publisher) {
return new Certificate($publisher);
}, ['publisher']);
App::setResource('queueForMigrations', function (Queue\Publisher $publisher) {
return new Migration($publisher);
}, ['publisher']);
App::setResource('clients', function ($request, $console, $project) {
$console->setAttribute('platforms', [ // Always allow current host
'$collection' => ID::custom('platforms'),

View file

@ -31,8 +31,8 @@ use Utopia\Logger\Log;
use Utopia\Logger\Logger;
use Utopia\Platform\Service;
use Utopia\Pools\Group;
use Utopia\Queue\Connection;
use Utopia\Queue\Message;
use Utopia\Queue\Publisher;
use Utopia\Queue\Server;
use Utopia\Registry\Registry;
use Utopia\System\System;
@ -224,57 +224,61 @@ Server::setResource('timelimit', function (\Redis $redis) {
Server::setResource('log', fn () => new Log());
Server::setResource('queueForUsage', function (Connection $queue) {
return new Usage($queue);
}, ['queue']);
Server::setResource('queueForUsageDump', function (Connection $queue) {
return new UsageDump($queue);
}, ['queue']);
Server::setResource('queue', function (Group $pools) {
return $pools->get('queue')->pop()->getResource();
Server::setResource('publisher', function (Group $pools) {
return $pools->get('publisher')->pop()->getResource();
}, ['pools']);
Server::setResource('queueForDatabase', function (Connection $queue) {
return new EventDatabase($queue);
}, ['queue']);
Server::setResource('consumer', function (Group $pools) {
return $pools->get('consumer')->pop()->getResource();
}, ['pools']);
Server::setResource('queueForMessaging', function (Connection $queue) {
return new Messaging($queue);
}, ['queue']);
Server::setResource('queueForUsage', function (Publisher $publisher) {
return new Usage($publisher);
}, ['publisher']);
Server::setResource('queueForMails', function (Connection $queue) {
return new Mail($queue);
}, ['queue']);
Server::setResource('queueForUsageDump', function (Publisher $publisher) {
return new UsageDump($publisher);
}, ['publisher']);
Server::setResource('queueForBuilds', function (Connection $queue) {
return new Build($queue);
}, ['queue']);
Server::setResource('queueForDatabase', function (Publisher $publisher) {
return new EventDatabase($publisher);
}, ['publisher']);
Server::setResource('queueForDeletes', function (Connection $queue) {
return new Delete($queue);
}, ['queue']);
Server::setResource('queueForMessaging', function (Publisher $publisher) {
return new Messaging($publisher);
}, ['publisher']);
Server::setResource('queueForEvents', function (Connection $queue) {
return new Event($queue);
}, ['queue']);
Server::setResource('queueForMails', function (Publisher $publisher) {
return new Mail($publisher);
}, ['publisher']);
Server::setResource('queueForAudits', function (Connection $queue) {
return new Audit($queue);
}, ['queue']);
Server::setResource('queueForBuilds', function (Publisher $publisher) {
return new Build($publisher);
}, ['publisher']);
Server::setResource('queueForFunctions', function (Connection $queue) {
return new Func($queue);
}, ['queue']);
Server::setResource('queueForDeletes', function (Publisher $publisher) {
return new Delete($publisher);
}, ['publisher']);
Server::setResource('queueForCertificates', function (Connection $queue) {
return new Certificate($queue);
}, ['queue']);
Server::setResource('queueForEvents', function (Publisher $publisher) {
return new Event($publisher);
}, ['publisher']);
Server::setResource('queueForMigrations', function (Connection $queue) {
return new Migration($queue);
}, ['queue']);
Server::setResource('queueForAudits', function (Publisher $publisher) {
return new Audit($publisher);
}, ['publisher']);
Server::setResource('queueForFunctions', function (Publisher $publisher) {
return new Func($publisher);
}, ['publisher']);
Server::setResource('queueForCertificates', function (Publisher $publisher) {
return new Certificate($publisher);
}, ['publisher']);
Server::setResource('queueForMigrations', function (Publisher $publisher) {
return new Migration($publisher);
}, ['publisher']);
Server::setResource('logger', function (Registry $register) {
return $register->get('logger');
@ -386,7 +390,7 @@ try {
*/
$platform->init(Service::TYPE_WORKER, [
'workersNum' => System::getEnv('_APP_WORKERS_NUM', 1),
'connection' => $pools->get('queue')->pop()->getResource(),
'connection' => $pools->get('consumer')->pop()->getResource(),
'workerName' => strtolower($workerName) ?? null,
'queueName' => $queueName
]);

View file

@ -62,10 +62,10 @@
"utopia-php/messaging": "0.14.*",
"utopia-php/migration": "0.6.*",
"utopia-php/orchestration": "0.9.*",
"utopia-php/platform": "0.7.1",
"utopia-php/platform": "0.7.3",
"utopia-php/pools": "0.5.*",
"utopia-php/preloader": "0.2.*",
"utopia-php/queue": "0.7.*",
"utopia-php/queue": "0.8.*",
"utopia-php/registry": "0.5.*",
"utopia-php/storage": "0.18.*",
"utopia-php/swoole": "0.8.*",

278
composer.lock generated
View file

@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "1cbdaa30797ba4f524dbcb830f5e14fa",
"content-hash": "232691925e05350c7a3831a4e43d79d1",
"packages": [
{
"name": "adhocore/jwt",
@ -1705,6 +1705,137 @@
},
"time": "2024-05-08T12:18:48+00:00"
},
{
"name": "paragonie/random_compat",
"version": "v9.99.100",
"source": {
"type": "git",
"url": "https://github.com/paragonie/random_compat.git",
"reference": "996434e5492cb4c3edcb9168db6fbb1359ef965a"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/paragonie/random_compat/zipball/996434e5492cb4c3edcb9168db6fbb1359ef965a",
"reference": "996434e5492cb4c3edcb9168db6fbb1359ef965a",
"shasum": ""
},
"require": {
"php": ">= 7"
},
"require-dev": {
"phpunit/phpunit": "4.*|5.*",
"vimeo/psalm": "^1"
},
"suggest": {
"ext-libsodium": "Provides a modern crypto API that can be used to generate random bytes."
},
"type": "library",
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Paragon Initiative Enterprises",
"email": "security@paragonie.com",
"homepage": "https://paragonie.com"
}
],
"description": "PHP 5.x polyfill for random_bytes() and random_int() from PHP 7",
"keywords": [
"csprng",
"polyfill",
"pseudorandom",
"random"
],
"support": {
"email": "info@paragonie.com",
"issues": "https://github.com/paragonie/random_compat/issues",
"source": "https://github.com/paragonie/random_compat"
},
"time": "2020-10-15T08:29:30+00:00"
},
{
"name": "php-amqplib/php-amqplib",
"version": "v3.7.2",
"source": {
"type": "git",
"url": "https://github.com/php-amqplib/php-amqplib.git",
"reference": "738a73eb0019b6c99d9bc25d7a0c0dd8f56a5199"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/php-amqplib/php-amqplib/zipball/738a73eb0019b6c99d9bc25d7a0c0dd8f56a5199",
"reference": "738a73eb0019b6c99d9bc25d7a0c0dd8f56a5199",
"shasum": ""
},
"require": {
"ext-mbstring": "*",
"ext-sockets": "*",
"php": "^7.2||^8.0",
"phpseclib/phpseclib": "^2.0|^3.0"
},
"conflict": {
"php": "7.4.0 - 7.4.1"
},
"replace": {
"videlalvaro/php-amqplib": "self.version"
},
"require-dev": {
"ext-curl": "*",
"nategood/httpful": "^0.2.20",
"phpunit/phpunit": "^7.5|^9.5",
"squizlabs/php_codesniffer": "^3.6"
},
"type": "library",
"extra": {
"branch-alias": {
"dev-master": "3.0-dev"
}
},
"autoload": {
"psr-4": {
"PhpAmqpLib\\": "PhpAmqpLib/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"LGPL-2.1-or-later"
],
"authors": [
{
"name": "Alvaro Videla",
"role": "Original Maintainer"
},
{
"name": "Raúl Araya",
"email": "nubeiro@gmail.com",
"role": "Maintainer"
},
{
"name": "Luke Bakken",
"email": "luke@bakken.io",
"role": "Maintainer"
},
{
"name": "Ramūnas Dronga",
"email": "github@ramuno.lt",
"role": "Maintainer"
}
],
"description": "Formerly videlalvaro/php-amqplib. This library is a pure PHP implementation of the AMQP protocol. It's been tested against RabbitMQ.",
"homepage": "https://github.com/php-amqplib/php-amqplib/",
"keywords": [
"message",
"queue",
"rabbitmq"
],
"support": {
"issues": "https://github.com/php-amqplib/php-amqplib/issues",
"source": "https://github.com/php-amqplib/php-amqplib/tree/v3.7.2"
},
"time": "2024-11-21T09:21:41+00:00"
},
{
"name": "php-http/discovery",
"version": "1.20.0",
@ -1865,6 +1996,116 @@
],
"time": "2023-11-25T22:23:28+00:00"
},
{
"name": "phpseclib/phpseclib",
"version": "3.0.43",
"source": {
"type": "git",
"url": "https://github.com/phpseclib/phpseclib.git",
"reference": "709ec107af3cb2f385b9617be72af8cf62441d02"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/709ec107af3cb2f385b9617be72af8cf62441d02",
"reference": "709ec107af3cb2f385b9617be72af8cf62441d02",
"shasum": ""
},
"require": {
"paragonie/constant_time_encoding": "^1|^2|^3",
"paragonie/random_compat": "^1.4|^2.0|^9.99.99",
"php": ">=5.6.1"
},
"require-dev": {
"phpunit/phpunit": "*"
},
"suggest": {
"ext-dom": "Install the DOM extension to load XML formatted public keys.",
"ext-gmp": "Install the GMP (GNU Multiple Precision) extension in order to speed up arbitrary precision integer arithmetic operations.",
"ext-libsodium": "SSH2/SFTP can make use of some algorithms provided by the libsodium-php extension.",
"ext-mcrypt": "Install the Mcrypt extension in order to speed up a few other cryptographic operations.",
"ext-openssl": "Install the OpenSSL extension in order to speed up a wide variety of cryptographic operations."
},
"type": "library",
"autoload": {
"files": [
"phpseclib/bootstrap.php"
],
"psr-4": {
"phpseclib3\\": "phpseclib/"
}
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Jim Wigginton",
"email": "terrafrost@php.net",
"role": "Lead Developer"
},
{
"name": "Patrick Monnerat",
"email": "pm@datasphere.ch",
"role": "Developer"
},
{
"name": "Andreas Fischer",
"email": "bantu@phpbb.com",
"role": "Developer"
},
{
"name": "Hans-Jürgen Petrich",
"email": "petrich@tronic-media.com",
"role": "Developer"
},
{
"name": "Graham Campbell",
"email": "graham@alt-three.com",
"role": "Developer"
}
],
"description": "PHP Secure Communications Library - Pure-PHP implementations of RSA, AES, SSH2, SFTP, X.509 etc.",
"homepage": "http://phpseclib.sourceforge.net",
"keywords": [
"BigInteger",
"aes",
"asn.1",
"asn1",
"blowfish",
"crypto",
"cryptography",
"encryption",
"rsa",
"security",
"sftp",
"signature",
"signing",
"ssh",
"twofish",
"x.509",
"x509"
],
"support": {
"issues": "https://github.com/phpseclib/phpseclib/issues",
"source": "https://github.com/phpseclib/phpseclib/tree/3.0.43"
},
"funding": [
{
"url": "https://github.com/terrafrost",
"type": "github"
},
{
"url": "https://www.patreon.com/phpseclib",
"type": "patreon"
},
{
"url": "https://tidelift.com/funding/github/packagist/phpseclib/phpseclib",
"type": "tidelift"
}
],
"time": "2024-12-14T21:12:59+00:00"
},
{
"name": "psr/container",
"version": "2.0.2",
@ -4095,16 +4336,16 @@
},
{
"name": "utopia-php/platform",
"version": "0.7.1",
"version": "0.7.3",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/platform.git",
"reference": "3433a0f1a54988f2a59c735f507745cb2c24638a"
"reference": "463c2d817c893d7dbb678c2eac7a8291f2710e25"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/platform/zipball/3433a0f1a54988f2a59c735f507745cb2c24638a",
"reference": "3433a0f1a54988f2a59c735f507745cb2c24638a",
"url": "https://api.github.com/repos/utopia-php/platform/zipball/463c2d817c893d7dbb678c2eac7a8291f2710e25",
"reference": "463c2d817c893d7dbb678c2eac7a8291f2710e25",
"shasum": ""
},
"require": {
@ -4113,7 +4354,7 @@
"php": ">=8.0",
"utopia-php/cli": "0.15.*",
"utopia-php/framework": "0.33.*",
"utopia-php/queue": "0.7.*"
"utopia-php/queue": "0.8.*"
},
"require-dev": {
"laravel/pint": "1.2.*",
@ -4139,9 +4380,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/platform/issues",
"source": "https://github.com/utopia-php/platform/tree/0.7.1"
"source": "https://github.com/utopia-php/platform/tree/0.7.3"
},
"time": "2024-10-22T10:27:49+00:00"
"time": "2025-02-04T15:09:00+00:00"
},
{
"name": "utopia-php/pools",
@ -4249,25 +4490,28 @@
},
{
"name": "utopia-php/queue",
"version": "0.7.3",
"version": "0.8.0",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/queue.git",
"reference": "16074a98ee7d6212bc1228de200e13db470c098a"
"reference": "a100316767418338350668980370a5896a98fd8c"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/queue/zipball/16074a98ee7d6212bc1228de200e13db470c098a",
"reference": "16074a98ee7d6212bc1228de200e13db470c098a",
"url": "https://api.github.com/repos/utopia-php/queue/zipball/a100316767418338350668980370a5896a98fd8c",
"reference": "a100316767418338350668980370a5896a98fd8c",
"shasum": ""
},
"require": {
"php": ">=8.1",
"php": ">=8.3",
"php-amqplib/php-amqplib": "^3.7",
"utopia-php/cli": "0.15.*",
"utopia-php/framework": "0.*.*",
"utopia-php/fetch": "^0.3.0",
"utopia-php/framework": "0.33.*",
"utopia-php/telemetry": "0.1.*"
},
"require-dev": {
"ext-redis": "*",
"laravel/pint": "^0.2.3",
"phpstan/phpstan": "^1.8",
"phpunit/phpunit": "^9.5.5",
@ -4305,9 +4549,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/queue/issues",
"source": "https://github.com/utopia-php/queue/tree/0.7.3"
"source": "https://github.com/utopia-php/queue/tree/0.8.0"
},
"time": "2024-11-13T12:47:48+00:00"
"time": "2025-02-04T15:01:50+00:00"
},
{
"name": "utopia-php/registry",
@ -8503,7 +8747,7 @@
],
"aliases": [],
"minimum-stability": "stable",
"stability-flags": {},
"stability-flags": [],
"prefer-stable": false,
"prefer-lowest": false,
"platform": {

View file

@ -2,7 +2,7 @@
namespace Appwrite\Event;
use Utopia\Queue\Connection;
use Utopia\Queue\Publisher;
class Audit extends Event
{
@ -12,9 +12,9 @@ class Audit extends Event
protected string $ip = '';
protected string $hostname = '';
public function __construct(protected Connection $connection)
public function __construct(protected Publisher $publisher)
{
parent::__construct($connection);
parent::__construct($publisher);
$this
->setQueue(Event::AUDITS_QUEUE_NAME)

View file

@ -3,7 +3,7 @@
namespace Appwrite\Event;
use Utopia\Database\Document;
use Utopia\Queue\Connection;
use Utopia\Queue\Publisher;
class Build extends Event
{
@ -12,9 +12,9 @@ class Build extends Event
protected ?Document $deployment = null;
protected ?Document $template = null;
public function __construct(protected Connection $connection)
public function __construct(protected Publisher $publisher)
{
parent::__construct($connection);
parent::__construct($publisher);
$this
->setQueue(Event::BUILDS_QUEUE_NAME)

View file

@ -3,16 +3,16 @@
namespace Appwrite\Event;
use Utopia\Database\Document;
use Utopia\Queue\Connection;
use Utopia\Queue\Publisher;
class Certificate extends Event
{
protected bool $skipRenewCheck = false;
protected ?Document $domain = null;
public function __construct(protected Connection $connection)
public function __construct(protected Publisher $publisher)
{
parent::__construct($connection);
parent::__construct($publisher);
$this
->setQueue(Event::CERTIFICATES_QUEUE_NAME)

View file

@ -4,7 +4,7 @@ namespace Appwrite\Event;
use Utopia\Database\Document;
use Utopia\DSN\DSN;
use Utopia\Queue\Connection;
use Utopia\Queue\Publisher;
class Database extends Event
{
@ -13,9 +13,9 @@ class Database extends Event
protected ?Document $collection = null;
protected ?Document $document = null;
public function __construct(protected Connection $connection)
public function __construct(protected Publisher $publisher)
{
parent::__construct($connection);
parent::__construct($publisher);
$this->setClass(Event::DATABASE_CLASS_NAME);
}

View file

@ -3,7 +3,7 @@
namespace Appwrite\Event;
use Utopia\Database\Document;
use Utopia\Queue\Connection;
use Utopia\Queue\Publisher;
class Delete extends Event
{
@ -15,9 +15,9 @@ class Delete extends Event
protected ?string $hourlyUsageRetentionDatetime = null;
public function __construct(protected Connection $connection)
public function __construct(protected Publisher $publisher)
{
parent::__construct($connection);
parent::__construct($publisher);
$this
->setQueue(Event::DELETE_QUEUE_NAME)

View file

@ -4,8 +4,8 @@ namespace Appwrite\Event;
use InvalidArgumentException;
use Utopia\Database\Document;
use Utopia\Queue\Client;
use Utopia\Queue\Connection;
use Utopia\Queue\Publisher;
use Utopia\Queue\Queue;
class Event
{
@ -58,10 +58,10 @@ class Event
protected bool $paused = false;
/**
* @param Connection $connection
* @param Publisher $publisher
* @return void
*/
public function __construct(protected Connection $connection)
public function __construct(protected Publisher $publisher)
{
}
@ -345,12 +345,11 @@ class Event
}
/** The getter is required since events like Databases need to override the queue name depending on the project */
$client = new Client($this->getQueue(), $this->connection);
$queue = new Queue($this->getQueue());
// Merge the base payload with any trimmed values
$payload = array_merge($this->preparePayload(), $this->trimPayload());
return $client->enqueue($payload);
return $this->publisher->enqueue($queue, $payload);
}
/**

View file

@ -3,7 +3,7 @@
namespace Appwrite\Event;
use Utopia\Database\Document;
use Utopia\Queue\Connection;
use Utopia\Queue\Publisher;
class Func extends Event
{
@ -19,9 +19,9 @@ class Func extends Event
protected ?Document $function = null;
protected ?Document $execution = null;
public function __construct(protected Connection $connection)
public function __construct(protected Publisher $publisher)
{
parent::__construct($connection);
parent::__construct($publisher);
$this
->setQueue(Event::FUNCTIONS_QUEUE_NAME)

View file

@ -2,7 +2,7 @@
namespace Appwrite\Event;
use Utopia\Queue\Connection;
use Utopia\Queue\Publisher;
class Mail extends Event
{
@ -15,9 +15,9 @@ class Mail extends Event
protected string $bodyTemplate = '';
protected array $attachment = [];
public function __construct(protected Connection $connection)
public function __construct(protected Publisher $publisher)
{
parent::__construct($connection);
parent::__construct($publisher);
$this
->setQueue(Event::MAILS_QUEUE_NAME)

View file

@ -3,7 +3,7 @@
namespace Appwrite\Event;
use Utopia\Database\Document;
use Utopia\Queue\Connection;
use Utopia\Queue\Publisher;
class Messaging extends Event
{
@ -14,9 +14,9 @@ class Messaging extends Event
protected ?string $scheduledAt = null;
protected ?string $providerType = null;
public function __construct(protected Connection $connection)
public function __construct(protected Publisher $publisher)
{
parent::__construct($connection);
parent::__construct($publisher);
$this
->setQueue(Event::MESSAGING_QUEUE_NAME)

View file

@ -3,16 +3,16 @@
namespace Appwrite\Event;
use Utopia\Database\Document;
use Utopia\Queue\Connection;
use Utopia\Queue\Publisher;
class Migration extends Event
{
protected string $type = '';
protected ?Document $migration = null;
public function __construct(protected Connection $connection)
public function __construct(protected Publisher $publisher)
{
parent::__construct($connection);
parent::__construct($publisher);
$this
->setQueue(Event::MIGRATIONS_QUEUE_NAME)

View file

@ -3,16 +3,16 @@
namespace Appwrite\Event;
use Utopia\Database\Document;
use Utopia\Queue\Connection;
use Utopia\Queue\Publisher;
class Usage extends Event
{
protected array $metrics = [];
protected array $reduce = [];
public function __construct(protected Connection $connection)
public function __construct(protected Publisher $publisher)
{
parent::__construct($connection);
parent::__construct($publisher);
$this
->setQueue(Event::USAGE_QUEUE_NAME)

View file

@ -2,15 +2,15 @@
namespace Appwrite\Event;
use Utopia\Queue\Connection;
use Utopia\Queue\Publisher;
class UsageDump extends Event
{
protected array $stats;
public function __construct(protected Connection $connection)
public function __construct(protected Publisher $publisher)
{
parent::__construct($connection);
parent::__construct($publisher);
$this
->setQueue(Event::USAGE_DUMP_QUEUE_NAME)

View file

@ -2,13 +2,13 @@
namespace Appwrite\Event;
use Utopia\Queue\Connection;
use Utopia\Queue\Publisher;
class Webhook extends Event
{
public function __construct(protected Connection $connection)
public function __construct(protected Publisher $publisher)
{
parent::__construct($connection);
parent::__construct($publisher);
$this
->setQueue(Event::WEBHOOK_QUEUE_NAME)

View file

@ -6,7 +6,6 @@ use Appwrite\Platform\Tasks\Doctor;
use Appwrite\Platform\Tasks\Install;
use Appwrite\Platform\Tasks\Maintenance;
use Appwrite\Platform\Tasks\Migrate;
use Appwrite\Platform\Tasks\QueueCount;
use Appwrite\Platform\Tasks\QueueRetry;
use Appwrite\Platform\Tasks\ScheduleExecutions;
use Appwrite\Platform\Tasks\ScheduleFunctions;
@ -29,7 +28,6 @@ class Tasks extends Service
->addAction(Install::getName(), new Install())
->addAction(Maintenance::getName(), new Maintenance())
->addAction(Migrate::getName(), new Migrate())
->addAction(QueueCount::getName(), new QueueCount())
->addAction(QueueRetry::getName(), new QueueRetry())
->addAction(SDKs::getName(), new SDKs())
->addAction(SSL::getName(), new SSL())

View file

@ -1,57 +0,0 @@
<?php
namespace Appwrite\Platform\Tasks;
use Utopia\CLI\Console;
use Utopia\Platform\Action;
use Utopia\Queue\Client;
use Utopia\Queue\Connection;
use Utopia\Validator\Text;
use Utopia\Validator\WhiteList;
class QueueCount extends Action
{
public static function getName(): string
{
return 'queue-count';
}
public function __construct()
{
$this
->desc('Return the number of from a specific queue identified by the name parameter with a specific type')
->param('name', '', new Text(100), 'Queue name')
->param('type', '', new WhiteList([
'success',
'failed',
'processing',
]), 'Queue type')
->inject('queue')
->callback(fn ($name, $type, $queue) => $this->action($name, $type, $queue));
}
/**
* @param string $name The name of the queue to count the jobs from
* @param string $type The type of jobs to count
* @param Connection $queue
*/
public function action(string $name, string $type, Connection $queue): void
{
if (!$name) {
Console::error('Missing required parameter $name');
return;
}
$queueClient = new Client($name, $queue);
$count = match ($type) {
'success' => $queueClient->countSuccessfulJobs(),
'failed' => $queueClient->countFailedJobs(),
'processing' => $queueClient->countProcessingJobs(),
default => 0
};
Console::log("Queue: '{$name}' has {$count} {$type} jobs.");
}
}

View file

@ -4,8 +4,8 @@ namespace Appwrite\Platform\Tasks;
use Utopia\CLI\Console;
use Utopia\Platform\Action;
use Utopia\Queue\Client;
use Utopia\Queue\Connection;
use Utopia\Queue\Publisher;
use Utopia\Queue\Queue;
use Utopia\Validator\Text;
use Utopia\Validator\Wildcard;
@ -23,33 +23,24 @@ class QueueRetry extends Action
->desc('Retry failed jobs from a specific queue identified by the name parameter')
->param('name', '', new Text(100), 'Queue name')
->param('limit', 0, new Wildcard(), 'jobs limit', true)
->inject('queue')
->callback(fn ($name, $limit, $queue) => $this->action($name, $limit, $queue));
->inject('publisher')
->callback(fn ($name, $limit, $publisher) => $this->action($name, $limit, $publisher));
}
/**
* @param string $name The name of the queue to retry jobs from
* @param mixed $limit
* @param Connection $queue
* @param Publisher $publisher
*/
public function action(string $name, mixed $limit, Connection $queue): void
public function action(string $name, mixed $limit, Publisher $publisher): void
{
if (!$name) {
Console::error('Missing required parameter $name');
return;
}
$limit = (int)$limit;
$queueClient = new Client($name, $queue);
if ($queueClient->countFailedJobs() === 0) {
Console::error('No failed jobs found.');
return;
}
Console::log('Retrying failed jobs...');
$queueClient->retry($limit);
$publisher->retry(new Queue($name), $limit);
}
}

View file

@ -11,7 +11,7 @@ use Utopia\Database\Exception;
use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
use Utopia\Platform\Action;
use Utopia\Pools\Group;
use Utopia\Queue\Publisher;
use Utopia\System\System;
use function Swoole\Coroutine\run;
@ -26,7 +26,7 @@ abstract class ScheduleBase extends Action
abstract public static function getName(): string;
abstract public static function getSupportedResource(): string;
abstract public static function getCollectionId(): string;
abstract protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void;
abstract protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void;
public function __construct()
{
@ -34,10 +34,10 @@ abstract class ScheduleBase extends Action
$this
->desc("Execute {$type}s scheduled in Appwrite")
->inject('pools')
->inject('publisher')
->inject('dbForPlatform')
->inject('getProjectDB')
->callback(fn (Group $pools, Database $dbForPlatform, callable $getProjectDB) => $this->action($pools, $dbForPlatform, $getProjectDB));
->callback(fn (Publisher $publisher, Database $dbForPlatform, callable $getProjectDB) => $this->action($publisher, $dbForPlatform, $getProjectDB));
}
protected function updateProjectAccess(Document $project, Database $dbForPlatform): void
@ -56,7 +56,7 @@ abstract class ScheduleBase extends Action
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
*/
public function action(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
public function action(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void
{
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
@ -125,17 +125,15 @@ abstract class ScheduleBase extends Action
$latestDocument = \end($results);
}
$pools->reclaim();
Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds");
Console::success("Starting timers at " . DateTime::now());
run(function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $pools, $getProjectDB) {
run(function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $publisher, $getProjectDB) {
/**
* The timer synchronize $schedules copy with database collection.
*/
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $pools) {
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule) {
$time = DateTime::now();
$timerStart = \microtime(true);
@ -184,17 +182,15 @@ abstract class ScheduleBase extends Action
$lastSyncUpdate = $time;
$timerEnd = \microtime(true);
$pools->reclaim();
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
});
Timer::tick(
static::ENQUEUE_TIMER * 1000,
fn () => $this->enqueueResources($pools, $dbForPlatform, $getProjectDB)
fn () => $this->enqueueResources($publisher, $dbForPlatform, $getProjectDB)
);
$this->enqueueResources($pools, $dbForPlatform, $getProjectDB);
$this->enqueueResources($publisher, $dbForPlatform, $getProjectDB);
});
}
}

View file

@ -5,7 +5,7 @@ namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Func;
use Swoole\Coroutine as Co;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Utopia\Queue\Publisher;
class ScheduleExecutions extends ScheduleBase
{
@ -27,11 +27,9 @@ class ScheduleExecutions extends ScheduleBase
return 'executions';
}
protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void
{
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
$queueForFunctions = new Func($connection);
$queueForFunctions = new Func($publisher);
$intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds');
foreach ($this->schedules as $schedule) {
@ -83,7 +81,5 @@ class ScheduleExecutions extends ScheduleBase
unset($this->schedules[$schedule['$internalId']]);
}
$queue->reclaim();
}
}

View file

@ -7,7 +7,7 @@ use Cron\CronExpression;
use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Pools\Group;
use Utopia\Queue\Publisher;
class ScheduleFunctions extends ScheduleBase
{
@ -31,7 +31,7 @@ class ScheduleFunctions extends ScheduleBase
return 'functions';
}
protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void
{
$timerStart = \microtime(true);
$time = DateTime::now();
@ -70,12 +70,9 @@ class ScheduleFunctions extends ScheduleBase
}
foreach ($delayedExecutions as $delay => $scheduleKeys) {
\go(function () use ($delay, $scheduleKeys, $pools, $dbForPlatform) {
\go(function () use ($delay, $scheduleKeys, $publisher, $dbForPlatform) {
\sleep($delay); // in seconds
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
foreach ($scheduleKeys as $scheduleKey) {
// Ensure schedule was not deleted
if (!\array_key_exists($scheduleKey, $this->schedules)) {
@ -86,8 +83,7 @@ class ScheduleFunctions extends ScheduleBase
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
$queueForFunctions = new Func($connection);
$queueForFunctions = new Func($publisher);
$queueForFunctions
->setType('schedule')
->setFunction($schedule['resource'])
@ -96,8 +92,6 @@ class ScheduleFunctions extends ScheduleBase
->setProject($schedule['project'])
->trigger();
}
$queue->reclaim();
});
}

View file

@ -4,7 +4,7 @@ namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Messaging;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Utopia\Queue\Publisher;
class ScheduleMessages extends ScheduleBase
{
@ -26,7 +26,7 @@ class ScheduleMessages extends ScheduleBase
return 'messages';
}
protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void
{
foreach ($this->schedules as $schedule) {
if (!$schedule['active']) {
@ -40,13 +40,9 @@ class ScheduleMessages extends ScheduleBase
continue;
}
\go(function () use ($schedule, $pools, $dbForPlatform) {
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
$queueForMessaging = new Messaging($connection);
\go(function () use ($schedule, $publisher, $dbForPlatform) {
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
$queueForMessaging = new Messaging($publisher);
$queueForMessaging
->setType(MESSAGE_SEND_TYPE_EXTERNAL)
->setMessageId($schedule['resourceId'])
@ -58,8 +54,6 @@ class ScheduleMessages extends ScheduleBase
$schedule['$id'],
);
$queue->reclaim();
unset($this->schedules[$schedule['$internalId']]);
});
}

View file

@ -67,24 +67,6 @@ class HealthCustomServerTest extends Scope
return [];
}
public function testQueueSuccess(): array
{
/**
* Test for SUCCESS
*/
$response = $this->client->call(Client::METHOD_GET, '/health/queue', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()), []);
$this->assertEquals(200, $response['headers']['status-code']);
$this->assertEquals('pass', $response['body']['statuses'][0]['status']);
$this->assertIsInt($response['body']['statuses'][0]['ping']);
$this->assertLessThan(100, $response['body']['statuses'][0]['ping']);
return [];
}
public function testPubSubSuccess(): array
{
/**

View file

@ -5,7 +5,7 @@ namespace Tests\Unit\Event;
use Appwrite\Event\Event;
use InvalidArgumentException;
use PHPUnit\Framework\TestCase;
use Utopia\Queue\Client;
use Utopia\Queue\Publisher;
require_once __DIR__ . '/../../../app/init.php';
@ -13,13 +13,14 @@ class EventTest extends TestCase
{
protected ?Event $object = null;
protected string $queue = '';
protected Publisher $publisher;
public function setUp(): void
{
global $register;
$connection = $register->get('pools')->get('queue')->pop()->getResource();
$this->publisher = new MockPublisher();
$this->queue = 'v1-tests' . uniqid();
$this->object = new Event($connection);
$this->object = new Event($this->publisher);
$this->object->setClass('TestsV1');
$this->object->setQueue($this->queue);
}
@ -51,10 +52,7 @@ class EventTest extends TestCase
$this->assertEquals('eventValue1', $this->object->getParam('eventKey1'));
$this->assertEquals('eventValue2', $this->object->getParam('eventKey2'));
$this->assertEquals(null, $this->object->getParam('eventKey3'));
global $register;
$pools = $register->get('pools');
$client = new Client($this->object->getQueue(), $pools->get('queue')->pop()->getResource());
$this->assertEquals($client->getQueueSize(), 1);
$this->assertCount(1, $this->publisher->getEvents($this->object->getQueue()));
}
public function testReset(): void

View file

@ -0,0 +1,35 @@
<?php
namespace Tests\Unit\Event;
use Utopia\Queue\Publisher;
use Utopia\Queue\Queue;
class MockPublisher implements Publisher
{
private $events = [];
public function enqueue(Queue $queue, array $payload): bool
{
if (!isset($this->events[$queue->name])) {
$this->events[$queue->name] = [];
}
$this->events[$queue->name][] = $payload;
return true;
}
public function getEvents(string $queue)
{
return $this->events[$queue] ?? null;
}
public function retry(Queue $queue, int $limit = null): void
{
// TODO: Implement retry() method.
}
public function getQueueSize(Queue $queue, bool $failedJobs = false): int
{
return count($this->events[$queue->name]);
}
}

View file

@ -1,41 +0,0 @@
<?php
namespace Tests\Unit\Usage;
use PHPUnit\Framework\TestCase;
use Utopia\Queue\Client;
use Utopia\Queue\Connection;
class StatsTest extends TestCase
{
protected ?Connection $connection = null;
protected ?Client $client = null;
protected const QUEUE_NAME = 'usage-test-q';
public function setUp(): void
{
global $register;
$connection = $register->get('pools')->get('queue')->pop()->getResource();
$this->connection = $connection;
$this->client = new Client(self::QUEUE_NAME, $this->connection);
}
public function tearDown(): void
{
}
public function testSamePayload(): void
{
$inToQueue = [
'key_1' => 'value_1',
'key_2' => 'value_2',
];
$result = $this->client->enqueue($inToQueue);
$this->assertTrue($result);
$outFromQueue = $this->connection->leftPopArray('utopia-queue.queue.' . self::QUEUE_NAME, 0)['payload'];
$this->assertNotEmpty($outFromQueue);
$this->assertSame($inToQueue, $outFromQueue);
}
}