Scheduled task fix

This commit is contained in:
shimon 2024-09-19 20:03:15 +03:00
parent 812d8a34cf
commit d1b062a418
5 changed files with 26 additions and 33 deletions

View file

@ -156,15 +156,19 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole,
CLI::setResource('queue', function (Group $pools) { CLI::setResource('queue', function (Group $pools) {
return $pools->get('queue')->pop()->getResource(); return $pools->get('queue')->pop()->getResource();
}, ['pools']); }, ['pools']);
CLI::setResource('queueForFunctions', function (Connection $queue) { CLI::setResource('queueForFunctions', function (Connection $queue) {
return new Func($queue); return new Func($queue);
}, ['queue']); }, ['queue']);
CLI::setResource('queueForDeletes', function (Connection $queue) { CLI::setResource('queueForDeletes', function (Connection $queue) {
return new Delete($queue); return new Delete($queue);
}, ['queue']); }, ['queue']);
CLI::setResource('queueForCertificates', function (Connection $queue) { CLI::setResource('queueForCertificates', function (Connection $queue) {
return new Certificate($queue); return new Certificate($queue);
}, ['queue']); }, ['queue']);
CLI::setResource('logError', function (Registry $register) { CLI::setResource('logError', function (Registry $register) {
return function (Throwable $error, string $namespace, string $action) use ($register) { return function (Throwable $error, string $namespace, string $action) use ($register) {
$logger = $register->get('logger'); $logger = $register->get('logger');

View file

@ -11,6 +11,7 @@ use Utopia\Database\Exception;
use Utopia\Database\Query; use Utopia\Database\Query;
use Utopia\Platform\Action; use Utopia\Platform\Action;
use Utopia\Pools\Group; use Utopia\Pools\Group;
use Utopia\Queue\Connection;
use Utopia\System\System; use Utopia\System\System;
use function Swoole\Coroutine\run; use function Swoole\Coroutine\run;
@ -26,7 +27,7 @@ abstract class ScheduleBase extends Action
abstract public static function getSupportedResource(): string; abstract public static function getSupportedResource(): string;
abstract protected function enqueueResources( abstract protected function enqueueResources(
Group $pools, Connection $queue,
Database $dbForConsole Database $dbForConsole
); );
@ -36,10 +37,10 @@ abstract class ScheduleBase extends Action
$this $this
->desc("Execute {$type}s scheduled in Appwrite") ->desc("Execute {$type}s scheduled in Appwrite")
->inject('pools') ->inject('queue')
->inject('dbForConsole') ->inject('dbForConsole')
->inject('getProjectDB') ->inject('getProjectDB')
->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB)); ->callback(fn (Connection $queue, Database $dbForConsole, callable $getProjectDB) => $this->action($queue, $dbForConsole, $getProjectDB));
} }
/** /**
@ -47,7 +48,7 @@ abstract class ScheduleBase extends Action
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker. * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
*/ */
public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void public function action(Connection $queue, Database $dbForConsole, callable $getProjectDB): void
{ {
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
@ -127,17 +128,15 @@ abstract class ScheduleBase extends Action
$latestDocument = \end($results); $latestDocument = \end($results);
} }
$pools->reclaim();
Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds"); Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds");
Console::success("Starting timers at " . DateTime::now()); Console::success("Starting timers at " . DateTime::now());
run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) { run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $queue) {
/** /**
* The timer synchronize $schedules copy with database collection. * The timer synchronize $schedules copy with database collection.
*/ */
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) { Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $queue) {
$time = DateTime::now(); $time = DateTime::now();
$timerStart = \microtime(true); $timerStart = \microtime(true);
@ -186,17 +185,15 @@ abstract class ScheduleBase extends Action
$lastSyncUpdate = $time; $lastSyncUpdate = $time;
$timerEnd = \microtime(true); $timerEnd = \microtime(true);
$pools->reclaim();
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds"); Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
}); });
Timer::tick( Timer::tick(
static::ENQUEUE_TIMER * 1000, static::ENQUEUE_TIMER * 1000,
fn () => $this->enqueueResources($pools, $dbForConsole) fn () => $this->enqueueResources($queue, $dbForConsole)
); );
$this->enqueueResources($pools, $dbForConsole); $this->enqueueResources($queue, $dbForConsole);
}); });
} }
} }

View file

@ -6,6 +6,7 @@ use Appwrite\Event\Func;
use Swoole\Coroutine as Co; use Swoole\Coroutine as Co;
use Utopia\Database\Database; use Utopia\Database\Database;
use Utopia\Pools\Group; use Utopia\Pools\Group;
use Utopia\Queue\Connection;
class ScheduleExecutions extends ScheduleBase class ScheduleExecutions extends ScheduleBase
{ {
@ -22,11 +23,10 @@ class ScheduleExecutions extends ScheduleBase
return 'execution'; return 'execution';
} }
protected function enqueueResources(Group $pools, Database $dbForConsole): void protected function enqueueResources(Connection $queue, Database $dbForConsole): void
{ {
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource(); $queueForFunctions = new Func($queue);
$queueForFunctions = new Func($connection);
$intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds'); $intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds');
foreach ($this->schedules as $schedule) { foreach ($this->schedules as $schedule) {
@ -72,7 +72,5 @@ class ScheduleExecutions extends ScheduleBase
unset($this->schedules[$schedule['$internalId']]); unset($this->schedules[$schedule['$internalId']]);
} }
$queue->reclaim();
} }
} }

View file

@ -8,6 +8,7 @@ use Utopia\CLI\Console;
use Utopia\Database\Database; use Utopia\Database\Database;
use Utopia\Database\DateTime; use Utopia\Database\DateTime;
use Utopia\Pools\Group; use Utopia\Pools\Group;
use Utopia\Queue\Connection;
class ScheduleFunctions extends ScheduleBase class ScheduleFunctions extends ScheduleBase
{ {
@ -26,7 +27,7 @@ class ScheduleFunctions extends ScheduleBase
return 'function'; return 'function';
} }
protected function enqueueResources(Group $pools, Database $dbForConsole): void protected function enqueueResources(Connection $queue, Database $dbForConsole): void
{ {
$timerStart = \microtime(true); $timerStart = \microtime(true);
$time = DateTime::now(); $time = DateTime::now();
@ -65,12 +66,9 @@ class ScheduleFunctions extends ScheduleBase
} }
foreach ($delayedExecutions as $delay => $scheduleKeys) { foreach ($delayedExecutions as $delay => $scheduleKeys) {
\go(function () use ($delay, $scheduleKeys, $pools) { \go(function () use ($delay, $scheduleKeys, $queue) {
\sleep($delay); // in seconds \sleep($delay); // in seconds
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
foreach ($scheduleKeys as $scheduleKey) { foreach ($scheduleKeys as $scheduleKey) {
// Ensure schedule was not deleted // Ensure schedule was not deleted
if (!\array_key_exists($scheduleKey, $this->schedules)) { if (!\array_key_exists($scheduleKey, $this->schedules)) {
@ -79,7 +77,7 @@ class ScheduleFunctions extends ScheduleBase
$schedule = $this->schedules[$scheduleKey]; $schedule = $this->schedules[$scheduleKey];
$queueForFunctions = new Func($connection); $queueForFunctions = new Func($queue);
$queueForFunctions $queueForFunctions
->setType('schedule') ->setType('schedule')
@ -89,8 +87,6 @@ class ScheduleFunctions extends ScheduleBase
->setProject($schedule['project']) ->setProject($schedule['project'])
->trigger(); ->trigger();
} }
$queue->reclaim();
}); });
} }

View file

@ -5,6 +5,7 @@ namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Messaging; use Appwrite\Event\Messaging;
use Utopia\Database\Database; use Utopia\Database\Database;
use Utopia\Pools\Group; use Utopia\Pools\Group;
use Utopia\Queue\Connection;
class ScheduleMessages extends ScheduleBase class ScheduleMessages extends ScheduleBase
{ {
@ -21,7 +22,7 @@ class ScheduleMessages extends ScheduleBase
return 'message'; return 'message';
} }
protected function enqueueResources(Group $pools, Database $dbForConsole): void protected function enqueueResources(Connection $queue, Database $dbForConsole): void
{ {
foreach ($this->schedules as $schedule) { foreach ($this->schedules as $schedule) {
if (!$schedule['active']) { if (!$schedule['active']) {
@ -35,10 +36,9 @@ class ScheduleMessages extends ScheduleBase
continue; continue;
} }
\go(function () use ($schedule, $pools, $dbForConsole) { \go(function () use ($schedule, $queue, $dbForConsole) {
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource(); $queueForMessaging = new Messaging($queue);
$queueForMessaging = new Messaging($connection);
$queueForMessaging $queueForMessaging
->setType(MESSAGE_SEND_TYPE_EXTERNAL) ->setType(MESSAGE_SEND_TYPE_EXTERNAL)
@ -51,8 +51,6 @@ class ScheduleMessages extends ScheduleBase
$schedule['$id'], $schedule['$id'],
); );
$queue->reclaim();
unset($this->schedules[$schedule['$internalId']]); unset($this->schedules[$schedule['$internalId']]);
}); });
} }