fix: coroutine access in schedulers

This commit is contained in:
Binyamin Yawitz 2024-09-06 15:52:20 -04:00
parent b200e66bd1
commit b390485dd9
No known key found for this signature in database
4 changed files with 28 additions and 23 deletions

View file

@ -5,7 +5,6 @@ namespace Appwrite\Platform\Tasks;
use Appwrite\Utopia\Queue\Connections;
use Swoole\Timer;
use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Exception;
@ -27,7 +26,7 @@ abstract class ScheduleBase extends Action
abstract protected function enqueueResources(
array $pools,
Database $dbForConsole
callable $getConsoleDB
);
public function __construct()
@ -39,9 +38,9 @@ abstract class ScheduleBase extends Action
$this
->desc("Execute {$type}s scheduled in Appwrite")
->inject('pools')
->inject('dbForConsole')
->inject('getConsoleDB')
->inject('getProjectDB')
->callback(fn (array $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB));
->callback(fn (array $pools, callable $getConsoleDB, callable $getProjectDB) => $this->action($pools, $getConsoleDB, $getProjectDB));
});
}
@ -50,11 +49,12 @@ abstract class ScheduleBase extends Action
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
*/
public function action(array $pools, Database $dbForConsole, callable $getProjectDB): void
public function action(array $pools, callable $getConsoleDB, callable $getProjectDB): void
{
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
[$_, $_, $dbForConsole] = $getConsoleDB();
/**
* Extract only necessary attributes to lower memory used.
*
@ -135,7 +135,11 @@ abstract class ScheduleBase extends Action
Console::success("Starting timers at " . DateTime::now());
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) {
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($getConsoleDB, &$lastSyncUpdate, $getSchedule, $pools) {
[$connection,$pool, $dbForConsole] = $getConsoleDB();
$connections = new Connections();
$connections->add($connection, $pool);
$time = DateTime::now();
$timerStart = \microtime(true);
@ -184,15 +188,15 @@ abstract class ScheduleBase extends Action
$lastSyncUpdate = $time;
$timerEnd = \microtime(true);
$connections->reclaim();
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
});
Timer::tick(
static::ENQUEUE_TIMER * 1000,
fn () => $this->enqueueResources($pools, $dbForConsole)
fn () => $this->enqueueResources($pools, $getConsoleDB)
);
$this->enqueueResources($pools, $dbForConsole);
$this->enqueueResources($pools, $getConsoleDB);
}
}

View file

@ -4,7 +4,6 @@ namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Func;
use Swoole\Coroutine as Co;
use Utopia\Database\Database;
use Utopia\Queue\Connection\Redis;
class ScheduleExecutions extends ScheduleBase
@ -22,8 +21,11 @@ class ScheduleExecutions extends ScheduleBase
return 'execution';
}
protected function enqueueResources(array $pools, Database $dbForConsole): void
protected function enqueueResources(array $pools, callable $getConsoleDB): void
{
[$connection,$pool, $dbForConsole] = $getConsoleDB();
$this->connections->add($connection, $pool);
$pool = $pools['pools-queue-queue']['pool'];
$connection = $pool->get();
$this->connections->add($connection, $pool);
@ -50,7 +52,7 @@ class ScheduleExecutions extends ScheduleBase
$delay = $scheduledAt->getTimestamp() - (new \DateTime())->getTimestamp();
\go(function () use ($queueForFunctions, $schedule, $delay) {
\go(function () use ($queueForFunctions, $schedule, $delay, $dbForConsole) {
Co::sleep($delay);
$queueForFunctions
@ -65,12 +67,12 @@ class ScheduleExecutions extends ScheduleBase
->setBody($schedule['data']['body'] ?? '')
->setProject($schedule['project'])
->trigger();
});
$dbForConsole->deleteDocument(
'schedules',
$schedule['$id'],
);
$dbForConsole->deleteDocument(
'schedules',
$schedule['$id'],
);
});
unset($this->schedules[$schedule['resourceId']]);
}

View file

@ -5,7 +5,6 @@ namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Func;
use Cron\CronExpression;
use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Queue\Connection\Redis;
@ -26,7 +25,7 @@ class ScheduleFunctions extends ScheduleBase
return 'function';
}
protected function enqueueResources(array $pools, Database $dbForConsole): void
protected function enqueueResources(array $pools, callable $getConsoleDB): void
{
$timerStart = \microtime(true);
$time = DateTime::now();

View file

@ -3,7 +3,6 @@
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Messaging;
use Utopia\Database\Database;
use Utopia\Queue\Connection\Redis;
class ScheduleMessages extends ScheduleBase
@ -21,8 +20,11 @@ class ScheduleMessages extends ScheduleBase
return 'message';
}
protected function enqueueResources(array $pools, Database $dbForConsole): void
protected function enqueueResources(array $pools, callable $getConsoleDB): void
{
[$connection,$pool, $dbForConsole] = $getConsoleDB();
$this->connections->add($connection, $pool);
foreach ($this->schedules as $schedule) {
if (!$schedule['active']) {
continue;
@ -57,8 +59,6 @@ class ScheduleMessages extends ScheduleBase
);
$this->connections->reclaim();
// $queue->reclaim(); // TODO: Do in try/catch/finally, or add to connectons resource
unset($this->schedules[$schedule['resourceId']]);
});
}