mirror of
https://github.com/appwrite/appwrite
synced 2026-05-22 00:18:25 +00:00
Merge branch '1.8.x' of https://github.com/appwrite/appwrite into worker-error-add-previous
This commit is contained in:
commit
d3e0050739
1 changed files with 102 additions and 41 deletions
|
|
@ -7,7 +7,6 @@ use Utopia\CLI\Console;
|
|||
use Utopia\Database\Database;
|
||||
use Utopia\Database\DateTime;
|
||||
use Utopia\Database\Document;
|
||||
use Utopia\Database\Exception;
|
||||
use Utopia\Database\Query;
|
||||
use Utopia\Database\Validator\Authorization;
|
||||
use Utopia\Platform\Action;
|
||||
|
|
@ -115,36 +114,7 @@ abstract class ScheduleBase extends Action
|
|||
|
||||
private function collectSchedules(Database $dbForPlatform, callable $getProjectDB, string &$lastSyncUpdate, callable $isResourceBlocked): void
|
||||
{
|
||||
// If we haven't synced yet, load all active schedules
|
||||
$initialLoad = $lastSyncUpdate === "0";
|
||||
|
||||
/**
|
||||
* Extract only necessary attributes to lower memory used.
|
||||
*
|
||||
* @return array
|
||||
* @throws Exception
|
||||
* @var Document $schedule
|
||||
*/
|
||||
$getSchedule = function (Document $schedule) use ($dbForPlatform, $getProjectDB): array {
|
||||
$project = $dbForPlatform->getDocument('projects', $schedule->getAttribute('projectId'));
|
||||
|
||||
$resource = $getProjectDB($project)->getDocument(
|
||||
static::getCollectionId(),
|
||||
$schedule->getAttribute('resourceId')
|
||||
);
|
||||
|
||||
return [
|
||||
'$sequence' => $schedule->getSequence(),
|
||||
'$id' => $schedule->getId(),
|
||||
'resourceId' => $schedule->getAttribute('resourceId'),
|
||||
'schedule' => $schedule->getAttribute('schedule'),
|
||||
'active' => $schedule->getAttribute('active'),
|
||||
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
|
||||
'project' => $project, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
|
||||
'resource' => $resource, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
|
||||
];
|
||||
};
|
||||
|
||||
$loadStart = microtime(true);
|
||||
$time = DateTime::now();
|
||||
|
||||
|
|
@ -152,6 +122,7 @@ abstract class ScheduleBase extends Action
|
|||
$sum = $limit;
|
||||
$total = 0;
|
||||
$latestDocument = null;
|
||||
$updatedProjectIds = []; // Track project IDs from updated/new schedules
|
||||
|
||||
while ($sum === $limit) {
|
||||
$paginationQueries = [Query::limit($limit)];
|
||||
|
|
@ -186,40 +157,130 @@ abstract class ScheduleBase extends Action
|
|||
|
||||
foreach ($schedules as $schedule) {
|
||||
$existing = $this->schedules[$schedule->getSequence()] ?? null;
|
||||
$updated = strtotime($existing['resourceUpdatedAt'] ?? '0') !== strtotime($schedule['resourceUpdatedAt'] ?? '0');
|
||||
$updated = strtotime($existing['resourceUpdatedAt'] ?? '0') !== strtotime($schedule->getAttribute('resourceUpdatedAt') ?? '0');
|
||||
|
||||
if ($existing === null || $updated) {
|
||||
try {
|
||||
$candidate = $getSchedule($schedule);
|
||||
$candidate = [
|
||||
'$sequence' => $schedule->getSequence(),
|
||||
'$id' => $schedule->getId(),
|
||||
'projectId' => $schedule->getAttribute('projectId'),
|
||||
'resourceId' => $schedule->getAttribute('resourceId'),
|
||||
'resourceType' => $schedule->getAttribute('resourceType'),
|
||||
'schedule' => $schedule->getAttribute('schedule'),
|
||||
'active' => $schedule->getAttribute('active'),
|
||||
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
|
||||
];
|
||||
} catch (\Throwable $th) {
|
||||
Console::error("Failed to load schedule for project {$schedule['projectId']} {$collectionId} {$schedule['resourceId']}");
|
||||
Console::error("Failed to load schedule for project {$schedule->getAttribute('projectId')} {$collectionId} {$schedule->getAttribute('resourceId')}");
|
||||
Console::error($th->getMessage());
|
||||
continue;
|
||||
}
|
||||
|
||||
// In case the resource is not active (deleted).
|
||||
if (!$candidate['active']) {
|
||||
Console::error("Resource is not active: {$candidate['resourceType']}::{$candidate['resourceId']}");
|
||||
unset($this->schedules[$schedule->getSequence()]);
|
||||
continue;
|
||||
}
|
||||
|
||||
if ($isResourceBlocked($candidate['project'], $collectionId, $candidate['resourceId'])) {
|
||||
unset($this->schedules[$schedule->getSequence()]);
|
||||
continue;
|
||||
}
|
||||
|
||||
Console::info("Updating: {$schedule['resourceType']}::{$schedule['resourceId']}");
|
||||
Console::info("Updating: {$candidate['resourceType']}::{$candidate['resourceId']}");
|
||||
$this->schedules[$schedule->getSequence()] = $candidate;
|
||||
|
||||
// Track projectId for updated/new schedules
|
||||
$updatedProjectIds[] = $candidate['projectId'];
|
||||
}
|
||||
}
|
||||
|
||||
$latestDocument = \end($schedules);
|
||||
}
|
||||
if (empty($this->schedules)) {
|
||||
Console::success("No resources found");
|
||||
}
|
||||
|
||||
// On initial load: load all projects from all schedules
|
||||
if ($initialLoad) {
|
||||
$projectIds = array_unique(array_map(fn ($schedule) => $schedule['projectId'], $this->schedules));
|
||||
} else {
|
||||
// Only load projects for updated/new schedules
|
||||
$projectIds = array_unique($updatedProjectIds);
|
||||
}
|
||||
|
||||
// Build existing project map from schedules that already have projects loaded
|
||||
$map = [];
|
||||
foreach ($this->schedules as $schedule) {
|
||||
if (isset($schedule['project'])) {
|
||||
$map[$schedule['projectId']] = $schedule['project'];
|
||||
}
|
||||
}
|
||||
|
||||
// Only load projects that we don't already have in memory
|
||||
$projectIdsToLoad = array_filter($projectIds, fn ($projectId) => !isset($map[$projectId]));
|
||||
|
||||
if (!empty($projectIdsToLoad)) {
|
||||
$projectIdsToLoad = array_values($projectIdsToLoad);
|
||||
$batchSize = 10_000;
|
||||
$batches = array_chunk($projectIdsToLoad, $batchSize);
|
||||
$projectsLoadStart = microtime(true);
|
||||
|
||||
foreach ($batches as $batch) {
|
||||
$documents = $dbForPlatform->find('projects', [
|
||||
Query::equal('$id', $batch),
|
||||
Query::limit(count($batch)),
|
||||
]);
|
||||
|
||||
foreach ($documents as $document) {
|
||||
$map[$document->getId()] = $document;
|
||||
}
|
||||
}
|
||||
|
||||
$projectsLoadDuration = microtime(true) - $projectsLoadStart;
|
||||
Console::success("Projects map loaded in " . $projectsLoadDuration . " seconds with " . count($projectIdsToLoad) . " new projects (total: " . count($map) . " projects)");
|
||||
} else {
|
||||
Console::success("No new projects to load (using " . count($map) . " cached projects)");
|
||||
}
|
||||
|
||||
foreach ($this->schedules as $sequence => $schedule) {
|
||||
$project = $map[$schedule['projectId']] ?? null;
|
||||
|
||||
if ($project === null || $project->isEmpty()) {
|
||||
Console::error("Project not found: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}");
|
||||
unset($this->schedules[$sequence]);
|
||||
continue;
|
||||
}
|
||||
|
||||
// In case the resource is blocked.
|
||||
if ($isResourceBlocked($project, $collectionId, $schedule['resourceId'])) {
|
||||
Console::error("Resource blocked: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}");
|
||||
unset($this->schedules[$sequence]);
|
||||
continue;
|
||||
}
|
||||
|
||||
$this->schedules[$sequence]['project'] = $project;
|
||||
|
||||
// In case the resource is not found (project deleted).
|
||||
try {
|
||||
$resource = $getProjectDB($project)->getDocument(static::getCollectionId(), $schedule['resourceId']);
|
||||
} catch (\Throwable $th) {
|
||||
Console::error("Failed to load resource: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}");
|
||||
Console::error($th->getMessage());
|
||||
unset($this->schedules[$sequence]);
|
||||
continue;
|
||||
}
|
||||
|
||||
if ($resource->isEmpty()) {
|
||||
Console::error("Resource not found: projectId::{$schedule['projectId']} resourceId::{$schedule['resourceId']}");
|
||||
unset($this->schedules[$sequence]);
|
||||
continue;
|
||||
}
|
||||
|
||||
$this->schedules[$sequence]['resource'] = $resource;
|
||||
}
|
||||
|
||||
$lastSyncUpdate = $time;
|
||||
$duration = microtime(true) - $loadStart;
|
||||
$this->collectSchedulesTelemetryDuration->record($duration, ['initial' => $initialLoad, 'resourceType' => static::getSupportedResource()]);
|
||||
$this->collectSchedulesTelemetryCount->record($total, ['initial' => $initialLoad, 'resourceType' => static::getSupportedResource()]);
|
||||
Console::success("{$total} resources were loaded in " . $duration . " seconds");
|
||||
Console::success("Timer loaded {$total} " . static::getName() . " in " . $duration . " seconds");
|
||||
}
|
||||
|
||||
protected function recordEnqueueDelay(\DateTime $expectedExecutionSchedule): void
|
||||
|
|
|
|||
Loading…
Reference in a new issue