Merge pull request #10620 from appwrite/fix-block-schedules

Fix block schedules
This commit is contained in:
Luke B. Silver 2025-10-13 12:51:38 +01:00 committed by GitHub
commit 08d661d8c3
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 36 additions and 24 deletions

View file

@ -103,6 +103,11 @@ CLI::setResource('console', function () {
return new Document(Config::getParam('console'));
}, []);
CLI::setResource(
'isResourceBlocked',
fn () => fn (Document $project, string $resourceType, ?string $resourceId) => false
);
CLI::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform, $cache) {
$databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools

View file

@ -49,6 +49,7 @@ abstract class ScheduleBase extends Action
->inject('publisherMigrations')
->inject('publisherFunctions')
->inject('publisherMessaging')
->inject('isResourceBlocked')
->inject('dbForPlatform')
->inject('getProjectDB')
->inject('telemetry')
@ -71,7 +72,7 @@ abstract class ScheduleBase extends Action
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
*/
public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, BrokerPool $publisherMessaging, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void
public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, BrokerPool $publisherMessaging, callable $isResourceBlocked, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void
{
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
@ -88,16 +89,16 @@ abstract class ScheduleBase extends Action
// start with "0" to load all active documents.
$lastSyncUpdate = "0";
$this->collectSchedules($dbForPlatform, $getProjectDB, $lastSyncUpdate);
$this->collectSchedules($dbForPlatform, $getProjectDB, $lastSyncUpdate, $isResourceBlocked);
Console::success("Starting timers at " . DateTime::now());
/**
* The timer synchronize $schedules copy with database collection.
*/
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForPlatform, $getProjectDB, &$lastSyncUpdate) {
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForPlatform, $getProjectDB, &$lastSyncUpdate, $isResourceBlocked) {
$time = DateTime::now();
Console::log("Sync tick: Running at $time");
$this->collectSchedules($dbForPlatform, $getProjectDB, $lastSyncUpdate);
$this->collectSchedules($dbForPlatform, $getProjectDB, $lastSyncUpdate, $isResourceBlocked);
});
while (true) {
@ -112,7 +113,7 @@ abstract class ScheduleBase extends Action
}
}
private function collectSchedules(Database $dbForPlatform, callable $getProjectDB, string &$lastSyncUpdate): void
private function collectSchedules(Database $dbForPlatform, callable $getProjectDB, string &$lastSyncUpdate, callable $isResourceBlocked): void
{
// If we haven't synced yet, load all active schedules
$initialLoad = $lastSyncUpdate === "0";
@ -178,34 +179,40 @@ abstract class ScheduleBase extends Action
$paginationQueries[] = Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate);
}
$results = $dbForPlatform->find('schedules', $paginationQueries);
$collectionId = static::getCollectionId();
$schedules = $dbForPlatform->find('schedules', $paginationQueries);
$sum = count($schedules);
$total += $sum;
$sum = count($results);
$total = $total + $sum;
foreach ($schedules as $schedule) {
$existing = $this->schedules[$schedule->getSequence()] ?? null;
$updated = strtotime($existing['resourceUpdatedAt'] ?? '0') !== strtotime($schedule['resourceUpdatedAt'] ?? '0');
foreach ($results as $document) {
$localDocument = $this->schedules[$document->getSequence()] ?? null;
if ($localDocument !== null) {
if (!$document['active']) {
Console::info("Removing: {$document['resourceType']}::{$document['resourceId']}");
unset($this->schedules[$document->getSequence()]);
} elseif (strtotime($localDocument['resourceUpdatedAt']) !== strtotime($document['resourceUpdatedAt'])) {
Console::info("Updating: {$document['resourceType']}::{$document['resourceId']}");
$this->schedules[$document->getSequence()] = $getSchedule($document);
}
} else {
if ($existing === null || $updated) {
try {
$this->schedules[$document->getSequence()] = $getSchedule($document);
$candidate = $getSchedule($schedule);
} catch (\Throwable $th) {
$collectionId = static::getCollectionId();
Console::error("Failed to load schedule for project {$document['projectId']} {$collectionId} {$document['resourceId']}");
Console::error("Failed to load schedule for project {$schedule['projectId']} {$collectionId} {$schedule['resourceId']}");
Console::error($th->getMessage());
continue;
}
if (!$candidate['active']) {
unset($this->schedules[$schedule->getSequence()]);
continue;
}
if ($isResourceBlocked($candidate['project'], $collectionId, $candidate['resourceId'])) {
unset($this->schedules[$schedule->getSequence()]);
continue;
}
Console::info("Updating: {$schedule['resourceType']}::{$schedule['resourceId']}");
$this->schedules[$schedule->getSequence()] = $candidate;
}
}
$latestDocument = \end($results);
$latestDocument = \end($schedules);
}
$lastSyncUpdate = $time;