From 354464990392a05a2072be4e554e159e078aa19e Mon Sep 17 00:00:00 2001 From: loks0n <22452787+loks0n@users.noreply.github.com> Date: Thu, 9 Oct 2025 11:59:32 +0100 Subject: [PATCH] fix: block schedules --- app/cli.php | 5 ++ src/Appwrite/Platform/Tasks/ScheduleBase.php | 55 +++++++++++--------- 2 files changed, 36 insertions(+), 24 deletions(-) diff --git a/app/cli.php b/app/cli.php index 0f98cf3458..71b6464cb9 100644 --- a/app/cli.php +++ b/app/cli.php @@ -103,6 +103,11 @@ CLI::setResource('console', function () { return new Document(Config::getParam('console')); }, []); +CLI::setResource( + 'isResourceBlocked', + fn () => fn (Document $project, string $resourceType, ?string $resourceId) => false +); + CLI::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform, $cache) { $databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index 5cd25b09b4..e9a0e1d333 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -49,6 +49,7 @@ abstract class ScheduleBase extends Action ->inject('publisherMigrations') ->inject('publisherFunctions') ->inject('publisherMessaging') + ->inject('isResourceBlocked') ->inject('dbForPlatform') ->inject('getProjectDB') ->inject('telemetry') @@ -71,7 +72,7 @@ abstract class ScheduleBase extends Action * 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute * 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker. */ - public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, BrokerPool $publisherMessaging, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void + public function action(BrokerPool $publisher, BrokerPool $publisherMigrations, BrokerPool $publisherFunctions, BrokerPool $publisherMessaging, callable $isResourceBlocked, Database $dbForPlatform, callable $getProjectDB, Telemetry $telemetry): void { Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1'); Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started'); @@ -88,16 +89,16 @@ abstract class ScheduleBase extends Action // start with "0" to load all active documents. $lastSyncUpdate = "0"; - $this->collectSchedules($dbForPlatform, $getProjectDB, $lastSyncUpdate); + $this->collectSchedules($dbForPlatform, $getProjectDB, $lastSyncUpdate, $isResourceBlocked); Console::success("Starting timers at " . DateTime::now()); /** * The timer synchronize $schedules copy with database collection. */ - Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForPlatform, $getProjectDB, &$lastSyncUpdate) { + Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForPlatform, $getProjectDB, &$lastSyncUpdate, $isResourceBlocked) { $time = DateTime::now(); Console::log("Sync tick: Running at $time"); - $this->collectSchedules($dbForPlatform, $getProjectDB, $lastSyncUpdate); + $this->collectSchedules($dbForPlatform, $getProjectDB, $lastSyncUpdate, $isResourceBlocked); }); while (true) { @@ -112,7 +113,7 @@ abstract class ScheduleBase extends Action } } - private function collectSchedules(Database $dbForPlatform, callable $getProjectDB, string &$lastSyncUpdate): void + private function collectSchedules(Database $dbForPlatform, callable $getProjectDB, string &$lastSyncUpdate, callable $isResourceBlocked): void { // If we haven't synced yet, load all active schedules $initialLoad = $lastSyncUpdate === "0"; @@ -178,34 +179,40 @@ abstract class ScheduleBase extends Action $paginationQueries[] = Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate); } - $results = $dbForPlatform->find('schedules', $paginationQueries); + $collectionId = static::getCollectionId(); + $schedules = $dbForPlatform->find('schedules', $paginationQueries); + $sum = count($schedules); + $total += $sum; - $sum = count($results); - $total = $total + $sum; + foreach ($schedules as $schedule) { + $existing = $this->schedules[$schedule->getSequence()] ?? null; + $updated = strtotime($existing['resourceUpdatedAt'] ?? '0') !== strtotime($schedule['resourceUpdatedAt'] ?? '0'); - foreach ($results as $document) { - $localDocument = $this->schedules[$document->getSequence()] ?? null; - - if ($localDocument !== null) { - if (!$document['active']) { - Console::info("Removing: {$document['resourceType']}::{$document['resourceId']}"); - unset($this->schedules[$document->getSequence()]); - } elseif (strtotime($localDocument['resourceUpdatedAt']) !== strtotime($document['resourceUpdatedAt'])) { - Console::info("Updating: {$document['resourceType']}::{$document['resourceId']}"); - $this->schedules[$document->getSequence()] = $getSchedule($document); - } - } else { + if ($existing === null || $updated) { try { - $this->schedules[$document->getSequence()] = $getSchedule($document); + $candidate = $getSchedule($schedule); } catch (\Throwable $th) { - $collectionId = static::getCollectionId(); - Console::error("Failed to load schedule for project {$document['projectId']} {$collectionId} {$document['resourceId']}"); + Console::error("Failed to load schedule for project {$schedule['projectId']} {$collectionId} {$schedule['resourceId']}"); Console::error($th->getMessage()); + continue; } + + if (!$candidate['active']) { + unset($this->schedules[$schedule->getSequence()]); + continue; + } + + if ($isResourceBlocked($candidate['project'], $collectionId, $candidate['resourceId'])) { + unset($this->schedules[$schedule->getSequence()]); + continue; + } + + Console::info("Updating: {$schedule['resourceType']}::{$schedule['resourceId']}"); + $this->schedules[$schedule->getSequence()] = $candidate; } } - $latestDocument = \end($results); + $latestDocument = \end($schedules); } $lastSyncUpdate = $time;