Refactor ScheduleBase class to enhance schedule loading and project management.

This commit is contained in:
shimon 2025-12-07 16:57:30 +02:00
parent d483fbd43b
commit 7462b7ae17

View file

@ -123,9 +123,7 @@ abstract class ScheduleBase extends Action
$sum = $limit;
$total = 0;
$latestDocument = null;
$collectionId = static::getCollectionId();
$schedulesToProcess = [];
$updatedProjectIds = []; // Track project IDs from updated/new schedules
while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)];
@ -134,6 +132,8 @@ abstract class ScheduleBase extends Action
$paginationQueries[] = Query::cursorAfter($latestDocument);
}
// Temporarly accepting both 'fra' and 'default'
// When all migrated, only use _APP_REGION with 'default' as default value
$regions = [System::getEnv('_APP_REGION', 'default')];
if (!in_array('default', $regions)) {
$regions[] = 'default';
@ -151,97 +151,138 @@ abstract class ScheduleBase extends Action
$paginationQueries[] = Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate);
}
$collectionId = static::getCollectionId();
$schedules = $dbForPlatform->find('schedules', $paginationQueries);
$sum = count($schedules);
$total += $sum;
foreach ($schedules as $schedule) {
$existing = $this->schedules[$schedule->getSequence()] ?? null;
$updated = strtotime($existing['resourceUpdatedAt'] ?? '0') !== strtotime($schedule['resourceUpdatedAt'] ?? '0');
$updated = strtotime($existing['resourceUpdatedAt'] ?? '0') !== strtotime($schedule->getAttribute('resourceUpdatedAt') ?? '0');
if ($existing === null || $updated) {
// Early filtering: skip if not active (only for updates, initial load already filters)
if (!$initialLoad && !$schedule->getAttribute('active', true)) {
try {
$candidate = [
'$sequence' => $schedule->getSequence(),
'$id' => $schedule->getId(),
'projectId' => $schedule->getAttribute('projectId'),
'resourceId' => $schedule->getAttribute('resourceId'),
'resourceType' => $schedule->getAttribute('resourceType'),
'schedule' => $schedule->getAttribute('schedule'),
'active' => $schedule->getAttribute('active'),
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
];
} catch (\Throwable $th) {
Console::error("Failed to load schedule for project {$schedule->getAttribute('projectId')} {$collectionId} {$schedule->getAttribute('resourceId')}");
Console::error($th->getMessage());
continue;
}
// In case the resource is not active (deleted).
if (!$candidate['active']) {
Console::error("Resource is not active: {$candidate['resourceType']}::{$candidate['resourceId']}");
unset($this->schedules[$schedule->getSequence()]);
continue;
}
$schedulesToProcess[] = $schedule;
Console::info("Updating: {$candidate['resourceType']}::{$candidate['resourceId']}");
$this->schedules[$schedule->getSequence()] = $candidate;
// Track projectId for updated/new schedules
$updatedProjectIds[] = $candidate['projectId'];
}
}
$latestDocument = \end($schedules);
}
if (empty($schedulesToProcess)) {
$lastSyncUpdate = $time;
$duration = microtime(true) - $loadStart;
$this->collectSchedulesTelemetryDuration->record($duration, ['initial' => $initialLoad, 'resourceType' => static::getSupportedResource()]);
$this->collectSchedulesTelemetryCount->record($total, ['initial' => $initialLoad, 'resourceType' => static::getSupportedResource()]);
Console::success("{$total} resources were loaded in " . $duration . " seconds");
if (empty($this->schedules)) {
Console::success("No resources found");
return;
}
// Cache projects to avoid fetching the same project multiple times
$projectsCache = [];
// On initial load: load all projects from all schedules
if ($initialLoad) {
$projectIds = array_unique(array_map(fn($schedule) => $schedule['projectId'], $this->schedules));
} else {
// Only load projects for updated/new schedules
$projectIds = array_unique($updatedProjectIds);
}
// Build existing project map from schedules that already have projects loaded
$map = [];
foreach ($this->schedules as $schedule) {
if (isset($schedule['project'])) {
$map[$schedule['projectId']] = $schedule['project'];
}
}
// Only load projects that we don't already have in memory
$projectIdsToLoad = array_filter($projectIds, fn($projectId) => !isset($map[$projectId]));
if (!empty($projectIdsToLoad)) {
$projectIdsToLoad = array_values($projectIdsToLoad);
$batchSize = 10_000;
$batches = array_chunk($projectIdsToLoad, $batchSize);
$projectsLoadStart = microtime(true);
foreach ($batches as $batch) {
$documents = $dbForPlatform->find('projects', [
Query::equal('$id', $batch),
Query::limit(count($batch)),
]);
foreach ($schedulesToProcess as $schedule) {
$projectId = $schedule->getAttribute('projectId');
// Fetch project if not already cached
if (!isset($projectsCache[$projectId])) {
try {
$projectsCache[$projectId] = $dbForPlatform->getDocument('projects', $projectId);
} catch (\Throwable $th) {
Console::error("Failed to load project {$projectId}: " . $th->getMessage());
continue;
foreach ($documents as $document) {
$map[$document->getId()] = $document;
}
}
$project = $projectsCache[$projectId];
if ($project === null) {
$projectsLoadDuration = microtime(true) - $projectsLoadStart;
Console::success("Projects map loaded in " . $projectsLoadDuration . " seconds with " . count($projectIdsToLoad) . " new projects (total: " . count($map) . " projects)");
} else {
Console::success("No new projects to load (using " . count($map) . " cached projects)");
}
foreach ($this->schedules as $sequence => $schedule) {
$project = $map[$schedule['projectId']];
// In case the resource is blocked.
if ($isResourceBlocked($project, $collectionId, $schedule['resourceId'])) {
Console::error("Resource blocked: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}");
unset($this->schedules[$sequence]);
continue;
}
try {
$dbForProject = $getProjectDB($project);
$resourceId = $schedule->getAttribute('resourceId');
$resource = $dbForProject->getDocument($collectionId, $resourceId);
$candidate = [
'$sequence' => $schedule->getSequence(),
'$id' => $schedule->getId(),
'resourceId' => $resourceId,
'schedule' => $schedule->getAttribute('schedule'),
'active' => $schedule->getAttribute('active'),
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
'project' => $project,
'resource' => $resource,
];
// Early filtering checks
if (!$candidate['active']) {
unset($this->schedules[$schedule->getSequence()]);
continue;
}
if ($isResourceBlocked($candidate['project'], $collectionId, $candidate['resourceId'])) {
unset($this->schedules[$schedule->getSequence()]);
continue;
}
Console::info("loading: project:: " . $candidate['project']->getId() . " " . static::getSupportedResource() . "::{$candidate['resourceId']}");
$this->schedules[$schedule->getSequence()] = $candidate;
} catch (\Throwable $th) {
Console::error("Failed to load schedule for project {$project->getId()} {$collectionId} {$schedule->getAttribute('resourceId')}: " . $th->getMessage());
if (empty($project)) {
Console::error("Project not found: projetId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}");
unset($this->schedules[$sequence]);
continue;
}
$this->schedules[$sequence]['project'] = $project;
// In case the resource is not found (project deleted).
try {
$resource = $getProjectDB($project)->getDocument(static::getCollectionId(), $schedule['resourceId']);
} catch (\Throwable $th) {
Console::error("Failed to load resource: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}");
Console::error($th->getMessage());
unset($this->schedules[$sequence]);
continue;
}
if (empty($resource)) {
Console::error("Resource not found: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}");
unset($this->schedules[$sequence]);
continue;
}
$this->schedules[$sequence]['resource'] = $resource;
}
$lastSyncUpdate = $time;
$duration = microtime(true) - $loadStart;
$this->collectSchedulesTelemetryDuration->record($duration, ['initial' => $initialLoad, 'resourceType' => static::getSupportedResource()]);
$this->collectSchedulesTelemetryCount->record($total, ['initial' => $initialLoad, 'resourceType' => static::getSupportedResource()]);
Console::success("{$total} resources were loaded in " . $duration . " seconds");
Console::success("Timer loaded {$total} " . static::getName() . " in " . $duration . " seconds");
}
protected function recordEnqueueDelay(\DateTime $expectedExecutionSchedule): void