Merge pull request #10913 from appwrite/refactor-functions-schedule

Refactor schedule base
This commit is contained in:
Jake Barnby 2025-12-16 08:33:09 +00:00 committed by GitHub
commit b986cd806c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -7,7 +7,6 @@ use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Exception;
use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
use Utopia\Platform\Action;
@ -115,36 +114,7 @@ abstract class ScheduleBase extends Action
private function collectSchedules(Database $dbForPlatform, callable $getProjectDB, string &$lastSyncUpdate, callable $isResourceBlocked): void
{
// If we haven't synced yet, load all active schedules
$initialLoad = $lastSyncUpdate === "0";
/**
* Extract only necessary attributes to lower memory used.
*
* @return array
* @throws Exception
* @var Document $schedule
*/
$getSchedule = function (Document $schedule) use ($dbForPlatform, $getProjectDB): array {
$project = $dbForPlatform->getDocument('projects', $schedule->getAttribute('projectId'));
$resource = $getProjectDB($project)->getDocument(
static::getCollectionId(),
$schedule->getAttribute('resourceId')
);
return [
'$sequence' => $schedule->getSequence(),
'$id' => $schedule->getId(),
'resourceId' => $schedule->getAttribute('resourceId'),
'schedule' => $schedule->getAttribute('schedule'),
'active' => $schedule->getAttribute('active'),
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
'project' => $project, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
'resource' => $resource, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
];
};
$loadStart = microtime(true);
$time = DateTime::now();
@ -152,6 +122,7 @@ abstract class ScheduleBase extends Action
$sum = $limit;
$total = 0;
$latestDocument = null;
$updatedProjectIds = []; // Track project IDs from updated/new schedules
while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)];
@ -186,40 +157,130 @@ abstract class ScheduleBase extends Action
foreach ($schedules as $schedule) {
$existing = $this->schedules[$schedule->getSequence()] ?? null;
$updated = strtotime($existing['resourceUpdatedAt'] ?? '0') !== strtotime($schedule['resourceUpdatedAt'] ?? '0');
$updated = strtotime($existing['resourceUpdatedAt'] ?? '0') !== strtotime($schedule->getAttribute('resourceUpdatedAt') ?? '0');
if ($existing === null || $updated) {
try {
$candidate = $getSchedule($schedule);
$candidate = [
'$sequence' => $schedule->getSequence(),
'$id' => $schedule->getId(),
'projectId' => $schedule->getAttribute('projectId'),
'resourceId' => $schedule->getAttribute('resourceId'),
'resourceType' => $schedule->getAttribute('resourceType'),
'schedule' => $schedule->getAttribute('schedule'),
'active' => $schedule->getAttribute('active'),
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
];
} catch (\Throwable $th) {
Console::error("Failed to load schedule for project {$schedule['projectId']} {$collectionId} {$schedule['resourceId']}");
Console::error("Failed to load schedule for project {$schedule->getAttribute('projectId')} {$collectionId} {$schedule->getAttribute('resourceId')}");
Console::error($th->getMessage());
continue;
}
// In case the resource is not active (deleted).
if (!$candidate['active']) {
Console::error("Resource is not active: {$candidate['resourceType']}::{$candidate['resourceId']}");
unset($this->schedules[$schedule->getSequence()]);
continue;
}
if ($isResourceBlocked($candidate['project'], $collectionId, $candidate['resourceId'])) {
unset($this->schedules[$schedule->getSequence()]);
continue;
}
Console::info("Updating: {$schedule['resourceType']}::{$schedule['resourceId']}");
Console::info("Updating: {$candidate['resourceType']}::{$candidate['resourceId']}");
$this->schedules[$schedule->getSequence()] = $candidate;
// Track projectId for updated/new schedules
$updatedProjectIds[] = $candidate['projectId'];
}
}
$latestDocument = \end($schedules);
}
if (empty($this->schedules)) {
Console::success("No resources found");
}
// On initial load: load all projects from all schedules
if ($initialLoad) {
$projectIds = array_unique(array_map(fn ($schedule) => $schedule['projectId'], $this->schedules));
} else {
// Only load projects for updated/new schedules
$projectIds = array_unique($updatedProjectIds);
}
// Build existing project map from schedules that already have projects loaded
$map = [];
foreach ($this->schedules as $schedule) {
if (isset($schedule['project'])) {
$map[$schedule['projectId']] = $schedule['project'];
}
}
// Only load projects that we don't already have in memory
$projectIdsToLoad = array_filter($projectIds, fn ($projectId) => !isset($map[$projectId]));
if (!empty($projectIdsToLoad)) {
$projectIdsToLoad = array_values($projectIdsToLoad);
$batchSize = 10_000;
$batches = array_chunk($projectIdsToLoad, $batchSize);
$projectsLoadStart = microtime(true);
foreach ($batches as $batch) {
$documents = $dbForPlatform->find('projects', [
Query::equal('$id', $batch),
Query::limit(count($batch)),
]);
foreach ($documents as $document) {
$map[$document->getId()] = $document;
}
}
$projectsLoadDuration = microtime(true) - $projectsLoadStart;
Console::success("Projects map loaded in " . $projectsLoadDuration . " seconds with " . count($projectIdsToLoad) . " new projects (total: " . count($map) . " projects)");
} else {
Console::success("No new projects to load (using " . count($map) . " cached projects)");
}
foreach ($this->schedules as $sequence => $schedule) {
$project = $map[$schedule['projectId']] ?? null;
if ($project === null || $project->isEmpty()) {
Console::error("Project not found: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}");
unset($this->schedules[$sequence]);
continue;
}
// In case the resource is blocked.
if ($isResourceBlocked($project, $collectionId, $schedule['resourceId'])) {
Console::error("Resource blocked: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}");
unset($this->schedules[$sequence]);
continue;
}
$this->schedules[$sequence]['project'] = $project;
// In case the resource is not found (project deleted).
try {
$resource = $getProjectDB($project)->getDocument(static::getCollectionId(), $schedule['resourceId']);
} catch (\Throwable $th) {
Console::error("Failed to load resource: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}");
Console::error($th->getMessage());
unset($this->schedules[$sequence]);
continue;
}
if ($resource->isEmpty()) {
Console::error("Resource not found: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}");
unset($this->schedules[$sequence]);
continue;
}
$this->schedules[$sequence]['resource'] = $resource;
}
$lastSyncUpdate = $time;
$duration = microtime(true) - $loadStart;
$this->collectSchedulesTelemetryDuration->record($duration, ['initial' => $initialLoad, 'resourceType' => static::getSupportedResource()]);
$this->collectSchedulesTelemetryCount->record($total, ['initial' => $initialLoad, 'resourceType' => static::getSupportedResource()]);
Console::success("{$total} resources were loaded in " . $duration . " seconds");
Console::success("Timer loaded {$total} " . static::getName() . " in " . $duration . " seconds");
}
protected function recordEnqueueDelay(\DateTime $expectedExecutionSchedule): void