mirror of
https://github.com/appwrite/appwrite
synced 2026-05-23 00:49:02 +00:00
injecting poolForQueue to schedulers
This commit is contained in:
parent
cf32ba0351
commit
66bfc5748b
5 changed files with 25 additions and 18 deletions
|
|
@ -163,6 +163,10 @@ CLI::setResource('queue', function (Group $pools) {
|
|||
return $pools->get('queue')->pop()->getResource();
|
||||
}, ['pools']);
|
||||
|
||||
CLI::setResource('poolForQueue', function (Group $pools) {
|
||||
return $pools->get('queue');
|
||||
}, ['pools']);
|
||||
|
||||
CLI::setResource('queueForFunctions', function (Connection $queue) {
|
||||
return new Func($queue);
|
||||
}, ['queue']);
|
||||
|
|
|
|||
|
|
@ -11,6 +11,7 @@ use Utopia\Database\Exception;
|
|||
use Utopia\Database\Query;
|
||||
use Utopia\Platform\Action;
|
||||
use Utopia\Pools\Group;
|
||||
use Utopia\Pools\Pool;
|
||||
use Utopia\System\System;
|
||||
|
||||
use function Swoole\Coroutine\run;
|
||||
|
|
@ -25,7 +26,7 @@ abstract class ScheduleBase extends Action
|
|||
abstract public static function getName(): string;
|
||||
abstract public static function getSupportedResource(): string;
|
||||
abstract public static function getCollectionId(): string;
|
||||
abstract protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void;
|
||||
abstract protected function enqueueResources(Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void;
|
||||
|
||||
public function __construct()
|
||||
{
|
||||
|
|
@ -33,10 +34,10 @@ abstract class ScheduleBase extends Action
|
|||
|
||||
$this
|
||||
->desc("Execute {$type}s scheduled in Appwrite")
|
||||
->inject('pools')
|
||||
->inject('queuePool')
|
||||
->inject('dbForConsole')
|
||||
->inject('getProjectDB')
|
||||
->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB));
|
||||
->callback(fn (Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB) => $this->action($poolForQueue, $dbForConsole, $getProjectDB));
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -44,7 +45,7 @@ abstract class ScheduleBase extends Action
|
|||
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
|
||||
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
|
||||
*/
|
||||
public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void
|
||||
public function action(Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void
|
||||
{
|
||||
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
|
||||
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
|
||||
|
|
@ -113,17 +114,17 @@ abstract class ScheduleBase extends Action
|
|||
$latestDocument = \end($results);
|
||||
}
|
||||
|
||||
$pools->reclaim();
|
||||
$poolForQueue->reclaim();
|
||||
|
||||
Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds");
|
||||
|
||||
Console::success("Starting timers at " . DateTime::now());
|
||||
|
||||
run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools, $getProjectDB) {
|
||||
run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $poolForQueue, $getProjectDB) {
|
||||
/**
|
||||
* The timer synchronize $schedules copy with database collection.
|
||||
*/
|
||||
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) {
|
||||
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $poolForQueue) {
|
||||
$time = DateTime::now();
|
||||
$timerStart = \microtime(true);
|
||||
|
||||
|
|
@ -172,17 +173,17 @@ abstract class ScheduleBase extends Action
|
|||
$lastSyncUpdate = $time;
|
||||
$timerEnd = \microtime(true);
|
||||
|
||||
$pools->reclaim();
|
||||
$poolForQueue->reclaim();
|
||||
|
||||
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
|
||||
});
|
||||
|
||||
Timer::tick(
|
||||
static::ENQUEUE_TIMER * 1000,
|
||||
fn () => $this->enqueueResources($pools, $dbForConsole, $getProjectDB)
|
||||
fn () => $this->enqueueResources($poolForQueue, $dbForConsole, $getProjectDB)
|
||||
);
|
||||
|
||||
$this->enqueueResources($pools, $dbForConsole, $getProjectDB);
|
||||
$this->enqueueResources($poolForQueue, $dbForConsole, $getProjectDB);
|
||||
});
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -6,6 +6,7 @@ use Appwrite\Event\Func;
|
|||
use Swoole\Coroutine as Co;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Pools\Group;
|
||||
use Utopia\Pools\Pool;
|
||||
|
||||
class ScheduleExecutions extends ScheduleBase
|
||||
{
|
||||
|
|
@ -27,9 +28,9 @@ class ScheduleExecutions extends ScheduleBase
|
|||
return 'executions';
|
||||
}
|
||||
|
||||
protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void
|
||||
protected function enqueueResources(Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void
|
||||
{
|
||||
$queue = $pools->get('queue')->pop();
|
||||
$queue = $poolForQueue->pop();
|
||||
$connection = $queue->getResource();
|
||||
$queueForFunctions = new Func($connection);
|
||||
$intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds');
|
||||
|
|
|
|||
|
|
@ -31,7 +31,7 @@ class ScheduleFunctions extends ScheduleBase
|
|||
return 'functions';
|
||||
}
|
||||
|
||||
protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void
|
||||
protected function enqueueResources(Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void
|
||||
{
|
||||
$timerStart = \microtime(true);
|
||||
$time = DateTime::now();
|
||||
|
|
@ -70,10 +70,10 @@ class ScheduleFunctions extends ScheduleBase
|
|||
}
|
||||
|
||||
foreach ($delayedExecutions as $delay => $scheduleKeys) {
|
||||
\go(function () use ($delay, $scheduleKeys, $pools) {
|
||||
\go(function () use ($delay, $scheduleKeys, $poolForQueue) {
|
||||
\sleep($delay); // in seconds
|
||||
|
||||
$queue = $pools->get('queue')->pop();
|
||||
$queue = $poolForQueue->pop();
|
||||
$connection = $queue->getResource();
|
||||
|
||||
foreach ($scheduleKeys as $scheduleKey) {
|
||||
|
|
|
|||
|
|
@ -5,6 +5,7 @@ namespace Appwrite\Platform\Tasks;
|
|||
use Appwrite\Event\Messaging;
|
||||
use Utopia\Database\Database;
|
||||
use Utopia\Pools\Group;
|
||||
use Utopia\Pools\Pool;
|
||||
|
||||
class ScheduleMessages extends ScheduleBase
|
||||
{
|
||||
|
|
@ -26,7 +27,7 @@ class ScheduleMessages extends ScheduleBase
|
|||
return 'messages';
|
||||
}
|
||||
|
||||
protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void
|
||||
protected function enqueueResources(Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void
|
||||
{
|
||||
foreach ($this->schedules as $schedule) {
|
||||
if (!$schedule['active']) {
|
||||
|
|
@ -40,8 +41,8 @@ class ScheduleMessages extends ScheduleBase
|
|||
continue;
|
||||
}
|
||||
|
||||
\go(function () use ($schedule, $pools, $dbForConsole) {
|
||||
$queue = $pools->get('queue')->pop();
|
||||
\go(function () use ($schedule, $poolForQueue, $dbForConsole) {
|
||||
$queue = $poolForQueue->pop();
|
||||
$connection = $queue->getResource();
|
||||
$queueForMessaging = new Messaging($connection);
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue