diff --git a/src/Appwrite/Platform/Tasks/ScheduleBase.php b/src/Appwrite/Platform/Tasks/ScheduleBase.php index e9a0e1d333..3853f0d9b3 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleBase.php +++ b/src/Appwrite/Platform/Tasks/ScheduleBase.php @@ -7,7 +7,6 @@ use Utopia\CLI\Console; use Utopia\Database\Database; use Utopia\Database\DateTime; use Utopia\Database\Document; -use Utopia\Database\Exception; use Utopia\Database\Query; use Utopia\Database\Validator\Authorization; use Utopia\Platform\Action; @@ -115,36 +114,7 @@ abstract class ScheduleBase extends Action private function collectSchedules(Database $dbForPlatform, callable $getProjectDB, string &$lastSyncUpdate, callable $isResourceBlocked): void { - // If we haven't synced yet, load all active schedules $initialLoad = $lastSyncUpdate === "0"; - - /** - * Extract only necessary attributes to lower memory used. - * - * @return array - * @throws Exception - * @var Document $schedule - */ - $getSchedule = function (Document $schedule) use ($dbForPlatform, $getProjectDB): array { - $project = $dbForPlatform->getDocument('projects', $schedule->getAttribute('projectId')); - - $resource = $getProjectDB($project)->getDocument( - static::getCollectionId(), - $schedule->getAttribute('resourceId') - ); - - return [ - '$sequence' => $schedule->getSequence(), - '$id' => $schedule->getId(), - 'resourceId' => $schedule->getAttribute('resourceId'), - 'schedule' => $schedule->getAttribute('schedule'), - 'active' => $schedule->getAttribute('active'), - 'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'), - 'project' => $project, // TODO: @Meldiron Send only ID to worker to reduce memory usage here - 'resource' => $resource, // TODO: @Meldiron Send only ID to worker to reduce memory usage here - ]; - }; - $loadStart = microtime(true); $time = DateTime::now(); @@ -152,6 +122,7 @@ abstract class ScheduleBase extends Action $sum = $limit; $total = 0; $latestDocument = null; + $updatedProjectIds = []; // Track project IDs from updated/new schedules while ($sum === $limit) { $paginationQueries = [Query::limit($limit)]; @@ -186,40 +157,130 @@ abstract class ScheduleBase extends Action foreach ($schedules as $schedule) { $existing = $this->schedules[$schedule->getSequence()] ?? null; - $updated = strtotime($existing['resourceUpdatedAt'] ?? '0') !== strtotime($schedule['resourceUpdatedAt'] ?? '0'); + $updated = strtotime($existing['resourceUpdatedAt'] ?? '0') !== strtotime($schedule->getAttribute('resourceUpdatedAt') ?? '0'); if ($existing === null || $updated) { try { - $candidate = $getSchedule($schedule); + $candidate = [ + '$sequence' => $schedule->getSequence(), + '$id' => $schedule->getId(), + 'projectId' => $schedule->getAttribute('projectId'), + 'resourceId' => $schedule->getAttribute('resourceId'), + 'resourceType' => $schedule->getAttribute('resourceType'), + 'schedule' => $schedule->getAttribute('schedule'), + 'active' => $schedule->getAttribute('active'), + 'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'), + ]; } catch (\Throwable $th) { - Console::error("Failed to load schedule for project {$schedule['projectId']} {$collectionId} {$schedule['resourceId']}"); + Console::error("Failed to load schedule for project {$schedule->getAttribute('projectId')} {$collectionId} {$schedule->getAttribute('resourceId')}"); Console::error($th->getMessage()); continue; } - + // In case the resource is not active (deleted). if (!$candidate['active']) { + Console::error("Resource is not active: {$candidate['resourceType']}::{$candidate['resourceId']}"); unset($this->schedules[$schedule->getSequence()]); continue; } - if ($isResourceBlocked($candidate['project'], $collectionId, $candidate['resourceId'])) { - unset($this->schedules[$schedule->getSequence()]); - continue; - } - - Console::info("Updating: {$schedule['resourceType']}::{$schedule['resourceId']}"); + Console::info("Updating: {$candidate['resourceType']}::{$candidate['resourceId']}"); $this->schedules[$schedule->getSequence()] = $candidate; + + // Track projectId for updated/new schedules + $updatedProjectIds[] = $candidate['projectId']; } } $latestDocument = \end($schedules); } + if (empty($this->schedules)) { + Console::success("No resources found"); + } + + // On initial load: load all projects from all schedules + if ($initialLoad) { + $projectIds = array_unique(array_map(fn ($schedule) => $schedule['projectId'], $this->schedules)); + } else { + // Only load projects for updated/new schedules + $projectIds = array_unique($updatedProjectIds); + } + + // Build existing project map from schedules that already have projects loaded + $map = []; + foreach ($this->schedules as $schedule) { + if (isset($schedule['project'])) { + $map[$schedule['projectId']] = $schedule['project']; + } + } + + // Only load projects that we don't already have in memory + $projectIdsToLoad = array_filter($projectIds, fn ($projectId) => !isset($map[$projectId])); + + if (!empty($projectIdsToLoad)) { + $projectIdsToLoad = array_values($projectIdsToLoad); + $batchSize = 10_000; + $batches = array_chunk($projectIdsToLoad, $batchSize); + $projectsLoadStart = microtime(true); + + foreach ($batches as $batch) { + $documents = $dbForPlatform->find('projects', [ + Query::equal('$id', $batch), + Query::limit(count($batch)), + ]); + + foreach ($documents as $document) { + $map[$document->getId()] = $document; + } + } + + $projectsLoadDuration = microtime(true) - $projectsLoadStart; + Console::success("Projects map loaded in " . $projectsLoadDuration . " seconds with " . count($projectIdsToLoad) . " new projects (total: " . count($map) . " projects)"); + } else { + Console::success("No new projects to load (using " . count($map) . " cached projects)"); + } + + foreach ($this->schedules as $sequence => $schedule) { + $project = $map[$schedule['projectId']] ?? null; + + if ($project === null || $project->isEmpty()) { + Console::error("Project not found: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}"); + unset($this->schedules[$sequence]); + continue; + } + + // In case the resource is blocked. + if ($isResourceBlocked($project, $collectionId, $schedule['resourceId'])) { + Console::error("Resource blocked: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}"); + unset($this->schedules[$sequence]); + continue; + } + + $this->schedules[$sequence]['project'] = $project; + + // In case the resource is not found (project deleted). + try { + $resource = $getProjectDB($project)->getDocument(static::getCollectionId(), $schedule['resourceId']); + } catch (\Throwable $th) { + Console::error("Failed to load resource: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}"); + Console::error($th->getMessage()); + unset($this->schedules[$sequence]); + continue; + } + + if ($resource->isEmpty()) { + Console::error("Resource not found: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}"); + unset($this->schedules[$sequence]); + continue; + } + + $this->schedules[$sequence]['resource'] = $resource; + } $lastSyncUpdate = $time; $duration = microtime(true) - $loadStart; $this->collectSchedulesTelemetryDuration->record($duration, ['initial' => $initialLoad, 'resourceType' => static::getSupportedResource()]); $this->collectSchedulesTelemetryCount->record($total, ['initial' => $initialLoad, 'resourceType' => static::getSupportedResource()]); - Console::success("{$total} resources were loaded in " . $duration . " seconds"); + Console::success("Timer loaded {$total} " . static::getName() . " in " . $duration . " seconds"); } protected function recordEnqueueDelay(\DateTime $expectedExecutionSchedule): void