fix(schedule-tasks): revert back to direct pool usage

This commit is contained in:
Fabian Gruber 2025-02-06 13:18:18 +01:00
parent c02a8d3997
commit 273ea0abcc
4 changed files with 40 additions and 20 deletions

View file

@ -11,7 +11,7 @@ use Utopia\Database\Exception;
use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
use Utopia\Platform\Action;
use Utopia\Queue\Publisher;
use Utopia\Pools\Group;
use Utopia\System\System;
use function Swoole\Coroutine\run;
@ -26,7 +26,7 @@ abstract class ScheduleBase extends Action
abstract public static function getName(): string;
abstract public static function getSupportedResource(): string;
abstract public static function getCollectionId(): string;
abstract protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void;
abstract protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void;
public function __construct()
{
@ -34,10 +34,10 @@ abstract class ScheduleBase extends Action
$this
->desc("Execute {$type}s scheduled in Appwrite")
->inject('publisher')
->inject('pools')
->inject('dbForPlatform')
->inject('getProjectDB')
->callback(fn (Publisher $publisher, Database $dbForPlatform, callable $getProjectDB) => $this->action($publisher, $dbForPlatform, $getProjectDB));
->callback(fn (Group $pools, Database $dbForPlatform, callable $getProjectDB) => $this->action($pools, $dbForPlatform, $getProjectDB));
}
protected function updateProjectAccess(Document $project, Database $dbForPlatform): void
@ -56,7 +56,7 @@ abstract class ScheduleBase extends Action
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
*/
public function action(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void
public function action(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
{
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
@ -125,15 +125,17 @@ abstract class ScheduleBase extends Action
$latestDocument = \end($results);
}
$pools->reclaim();
Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds");
Console::success("Starting timers at " . DateTime::now());
run(function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $publisher, $getProjectDB) {
run(function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $pools, $getProjectDB) {
/**
* The timer synchronize $schedules copy with database collection.
*/
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule) {
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $pools) {
$time = DateTime::now();
$timerStart = \microtime(true);
@ -182,15 +184,17 @@ abstract class ScheduleBase extends Action
$lastSyncUpdate = $time;
$timerEnd = \microtime(true);
$pools->reclaim();
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
});
Timer::tick(
static::ENQUEUE_TIMER * 1000,
fn () => $this->enqueueResources($publisher, $dbForPlatform, $getProjectDB)
fn () => $this->enqueueResources($pools, $dbForPlatform, $getProjectDB)
);
$this->enqueueResources($publisher, $dbForPlatform, $getProjectDB);
$this->enqueueResources($pools, $dbForPlatform, $getProjectDB);
});
}
}

View file

@ -5,7 +5,7 @@ namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Func;
use Swoole\Coroutine as Co;
use Utopia\Database\Database;
use Utopia\Queue\Publisher;
use Utopia\Pools\Group;
class ScheduleExecutions extends ScheduleBase
{
@ -27,9 +27,11 @@ class ScheduleExecutions extends ScheduleBase
return 'executions';
}
protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void
protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
{
$queueForFunctions = new Func($publisher);
$queue = $pools->get('publisher')->pop();
$connection = $queue->getResource();
$queueForFunctions = new Func($connection);
$intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds');
foreach ($this->schedules as $schedule) {
@ -81,5 +83,7 @@ class ScheduleExecutions extends ScheduleBase
unset($this->schedules[$schedule['$internalId']]);
}
$queue->reclaim();
}
}

View file

@ -7,7 +7,7 @@ use Cron\CronExpression;
use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Queue\Publisher;
use Utopia\Pools\Group;
class ScheduleFunctions extends ScheduleBase
{
@ -31,7 +31,7 @@ class ScheduleFunctions extends ScheduleBase
return 'functions';
}
protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void
protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
{
$timerStart = \microtime(true);
$time = DateTime::now();
@ -70,9 +70,12 @@ class ScheduleFunctions extends ScheduleBase
}
foreach ($delayedExecutions as $delay => $scheduleKeys) {
\go(function () use ($delay, $scheduleKeys, $publisher, $dbForPlatform) {
\go(function () use ($delay, $scheduleKeys, $pools, $dbForPlatform) {
\sleep($delay); // in seconds
$queue = $pools->get('publisher')->pop();
$connection = $queue->getResource();
foreach ($scheduleKeys as $scheduleKey) {
// Ensure schedule was not deleted
if (!\array_key_exists($scheduleKey, $this->schedules)) {
@ -83,7 +86,8 @@ class ScheduleFunctions extends ScheduleBase
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
$queueForFunctions = new Func($publisher);
$queueForFunctions = new Func($connection);
$queueForFunctions
->setType('schedule')
->setFunction($schedule['resource'])
@ -92,6 +96,8 @@ class ScheduleFunctions extends ScheduleBase
->setProject($schedule['project'])
->trigger();
}
$queue->reclaim();
});
}

View file

@ -4,7 +4,7 @@ namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Messaging;
use Utopia\Database\Database;
use Utopia\Queue\Publisher;
use Utopia\Pools\Group;
class ScheduleMessages extends ScheduleBase
{
@ -26,7 +26,7 @@ class ScheduleMessages extends ScheduleBase
return 'messages';
}
protected function enqueueResources(Publisher $publisher, Database $dbForPlatform, callable $getProjectDB): void
protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
{
foreach ($this->schedules as $schedule) {
if (!$schedule['active']) {
@ -40,9 +40,13 @@ class ScheduleMessages extends ScheduleBase
continue;
}
\go(function () use ($schedule, $publisher, $dbForPlatform) {
\go(function () use ($schedule, $pools, $dbForPlatform) {
$queue = $pools->get('publisher')->pop();
$connection = $queue->getResource();
$queueForMessaging = new Messaging($connection);
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
$queueForMessaging = new Messaging($publisher);
$queueForMessaging
->setType(MESSAGE_SEND_TYPE_EXTERNAL)
->setMessageId($schedule['resourceId'])
@ -54,6 +58,8 @@ class ScheduleMessages extends ScheduleBase
$schedule['$id'],
);
$queue->reclaim();
unset($this->schedules[$schedule['$internalId']]);
});
}