From 7462b7ae170fce0a1bb6e6591ea107f9778e2b03 Mon Sep 17 00:00:00 2001 From: shimon Date: Sun, 7 Dec 2025 16:57:30 +0200 Subject: [PATCH] Refactor ScheduleBase class to enhance schedule loading and project management. --- src/Appwrite/Platform/Tasks/ScheduleBase.php | 163 ++++++++++++------- 1 file changed, 102 insertions(+), 61 deletions(-) diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index 641725e722..bd5ee6bfa0 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -123,9 +123,7 @@ abstract class ScheduleBase extends Action $sum = $limit; $total = 0; $latestDocument = null; - $collectionId = static::getCollectionId(); - - $schedulesToProcess = []; + $updatedProjectIds = []; // Track project IDs from updated/new schedules while ($sum === $limit) { $paginationQueries = [Query::limit($limit)]; @@ -134,6 +132,8 @@ abstract class ScheduleBase extends Action $paginationQueries[] = Query::cursorAfter($latestDocument); } + // Temporarly accepting both 'fra' and 'default' + // When all migrated, only use _APP_REGION with 'default' as default value $regions = [System::getEnv('_APP_REGION', 'default')]; if (!in_array('default', $regions)) { $regions[] = 'default'; @@ -151,97 +151,138 @@ abstract class ScheduleBase extends Action $paginationQueries[] = Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate); } + $collectionId = static::getCollectionId(); $schedules = $dbForPlatform->find('schedules', $paginationQueries); $sum = count($schedules); $total += $sum; foreach ($schedules as $schedule) { $existing = $this->schedules[$schedule->getSequence()] ?? null; - $updated = strtotime($existing['resourceUpdatedAt'] ?? '0') !== strtotime($schedule['resourceUpdatedAt'] ?? '0'); - + $updated = strtotime($existing['resourceUpdatedAt'] ?? '0') !== strtotime($schedule->getAttribute('resourceUpdatedAt') ?? '0'); + if ($existing === null || $updated) { - // Early filtering: skip if not active (only for updates, initial load already filters) - if (!$initialLoad && !$schedule->getAttribute('active', true)) { + try { + $candidate = [ + '$sequence' => $schedule->getSequence(), + '$id' => $schedule->getId(), + 'projectId' => $schedule->getAttribute('projectId'), + 'resourceId' => $schedule->getAttribute('resourceId'), + 'resourceType' => $schedule->getAttribute('resourceType'), + 'schedule' => $schedule->getAttribute('schedule'), + 'active' => $schedule->getAttribute('active'), + 'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'), + ]; + } catch (\Throwable $th) { + Console::error("Failed to load schedule for project {$schedule->getAttribute('projectId')} {$collectionId} {$schedule->getAttribute('resourceId')}"); + Console::error($th->getMessage()); + continue; + } + // In case the resource is not active (deleted). + if (!$candidate['active']) { + Console::error("Resource is not active: {$candidate['resourceType']}::{$candidate['resourceId']}"); unset($this->schedules[$schedule->getSequence()]); continue; } - $schedulesToProcess[] = $schedule; + Console::info("Updating: {$candidate['resourceType']}::{$candidate['resourceId']}"); + $this->schedules[$schedule->getSequence()] = $candidate; + + // Track projectId for updated/new schedules + $updatedProjectIds[] = $candidate['projectId']; } } $latestDocument = \end($schedules); } - - if (empty($schedulesToProcess)) { - $lastSyncUpdate = $time; - $duration = microtime(true) - $loadStart; - $this->collectSchedulesTelemetryDuration->record($duration, ['initial' => $initialLoad, 'resourceType' => static::getSupportedResource()]); - $this->collectSchedulesTelemetryCount->record($total, ['initial' => $initialLoad, 'resourceType' => static::getSupportedResource()]); - Console::success("{$total} resources were loaded in " . $duration . " seconds"); + if (empty($this->schedules)) { + Console::success("No resources found"); return; } - // Cache projects to avoid fetching the same project multiple times - $projectsCache = []; + // On initial load: load all projects from all schedules + if ($initialLoad) { + $projectIds = array_unique(array_map(fn($schedule) => $schedule['projectId'], $this->schedules)); + } else { + // Only load projects for updated/new schedules + $projectIds = array_unique($updatedProjectIds); + } + + // Build existing project map from schedules that already have projects loaded + $map = []; + foreach ($this->schedules as $schedule) { + if (isset($schedule['project'])) { + $map[$schedule['projectId']] = $schedule['project']; + } + } + + // Only load projects that we don't already have in memory + $projectIdsToLoad = array_filter($projectIds, fn($projectId) => !isset($map[$projectId])); + + if (!empty($projectIdsToLoad)) { + $projectIdsToLoad = array_values($projectIdsToLoad); + $batchSize = 10_000; + $batches = array_chunk($projectIdsToLoad, $batchSize); + $projectsLoadStart = microtime(true); + + foreach ($batches as $batch) { + $documents = $dbForPlatform->find('projects', [ + Query::equal('$id', $batch), + Query::limit(count($batch)), + ]); - foreach ($schedulesToProcess as $schedule) { - $projectId = $schedule->getAttribute('projectId'); - - // Fetch project if not already cached - if (!isset($projectsCache[$projectId])) { - try { - $projectsCache[$projectId] = $dbForPlatform->getDocument('projects', $projectId); - } catch (\Throwable $th) { - Console::error("Failed to load project {$projectId}: " . $th->getMessage()); - continue; + foreach ($documents as $document) { + $map[$document->getId()] = $document; } } - $project = $projectsCache[$projectId]; - if ($project === null) { + $projectsLoadDuration = microtime(true) - $projectsLoadStart; + Console::success("Projects map loaded in " . $projectsLoadDuration . " seconds with " . count($projectIdsToLoad) . " new projects (total: " . count($map) . " projects)"); + } else { + Console::success("No new projects to load (using " . count($map) . " cached projects)"); + } + + foreach ($this->schedules as $sequence => $schedule) { + $project = $map[$schedule['projectId']]; + + // In case the resource is blocked. + if ($isResourceBlocked($project, $collectionId, $schedule['resourceId'])) { + Console::error("Resource blocked: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}"); + unset($this->schedules[$sequence]); continue; } - try { - $dbForProject = $getProjectDB($project); - $resourceId = $schedule->getAttribute('resourceId'); - $resource = $dbForProject->getDocument($collectionId, $resourceId); - - $candidate = [ - '$sequence' => $schedule->getSequence(), - '$id' => $schedule->getId(), - 'resourceId' => $resourceId, - 'schedule' => $schedule->getAttribute('schedule'), - 'active' => $schedule->getAttribute('active'), - 'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'), - 'project' => $project, - 'resource' => $resource, - ]; - - // Early filtering checks - if (!$candidate['active']) { - unset($this->schedules[$schedule->getSequence()]); - continue; - } - - if ($isResourceBlocked($candidate['project'], $collectionId, $candidate['resourceId'])) { - unset($this->schedules[$schedule->getSequence()]); - continue; - } - - Console::info("loading: project:: " . $candidate['project']->getId() . " " . static::getSupportedResource() . "::{$candidate['resourceId']}"); - $this->schedules[$schedule->getSequence()] = $candidate; - } catch (\Throwable $th) { - Console::error("Failed to load schedule for project {$project->getId()} {$collectionId} {$schedule->getAttribute('resourceId')}: " . $th->getMessage()); + if (empty($project)) { + Console::error("Project not found: projetId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}"); + unset($this->schedules[$sequence]); + continue; } + + $this->schedules[$sequence]['project'] = $project; + + // In case the resource is not found (project deleted). + try { + $resource = $getProjectDB($project)->getDocument(static::getCollectionId(), $schedule['resourceId']); + } catch (\Throwable $th) { + Console::error("Failed to load resource: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}"); + Console::error($th->getMessage()); + unset($this->schedules[$sequence]); + continue; + } + + if (empty($resource)) { + Console::error("Resource not found: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}"); + unset($this->schedules[$sequence]); + continue; + } + + $this->schedules[$sequence]['resource'] = $resource; } $lastSyncUpdate = $time; $duration = microtime(true) - $loadStart; $this->collectSchedulesTelemetryDuration->record($duration, ['initial' => $initialLoad, 'resourceType' => static::getSupportedResource()]); $this->collectSchedulesTelemetryCount->record($total, ['initial' => $initialLoad, 'resourceType' => static::getSupportedResource()]); - Console::success("{$total} resources were loaded in " . $duration . " seconds"); + Console::success("Timer loaded {$total} " . static::getName() . " in " . $duration . " seconds"); } protected function recordEnqueueDelay(\DateTime $expectedExecutionSchedule): void