appwrite/src/Appwrite/Platform/Tasks/ScheduleBase.php

216 lines
8.8 KiB
PHP
Raw Normal View History

2024-01-11 03:06:59 +00:00
<?php
namespace Appwrite\Platform\Tasks;
use Swoole\Timer;
use Utopia\CLI\Console;
2024-10-08 07:54:40 +00:00
use Utopia\Database\Database;
2024-01-11 03:06:59 +00:00
use Utopia\Database\DateTime;
use Utopia\Database\Document;
2024-03-06 17:34:21 +00:00
use Utopia\Database\Exception;
2024-01-11 03:06:59 +00:00
use Utopia\Database\Query;
2024-12-17 14:19:26 +00:00
use Utopia\Database\Validator\Authorization;
2024-03-06 17:34:21 +00:00
use Utopia\Platform\Action;
use Utopia\Pools\Group;
2025-04-30 01:04:19 +00:00
use Utopia\Queue\Broker\Pool as BrokerPool;
2024-04-01 11:02:47 +00:00
use Utopia\System\System;
2024-01-11 03:06:59 +00:00
2024-10-08 07:54:40 +00:00
use function Swoole\Coroutine\run;
2024-01-11 03:06:59 +00:00
abstract class ScheduleBase extends Action
{
protected const UPDATE_TIMER = 10; //seconds
protected const ENQUEUE_TIMER = 60; //seconds
protected array $schedules = [];
2025-04-30 01:04:19 +00:00
protected BrokerPool $publisher;
2024-01-11 03:06:59 +00:00
abstract public static function getName(): string;
abstract public static function getSupportedResource(): string;
2024-05-29 15:59:10 +00:00
abstract public static function getCollectionId(): string;
abstract protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void;
2024-01-11 03:06:59 +00:00
public function __construct()
{
$type = static::getSupportedResource();
$this
->desc("Execute {$type}s scheduled in Appwrite")
->inject('pools')
->inject('dbForPlatform')
2024-01-11 03:06:59 +00:00
->inject('getProjectDB')
->callback(fn (Group $pools, Database $dbForPlatform, callable $getProjectDB) => $this->action($pools, $dbForPlatform, $getProjectDB));
2024-01-11 03:06:59 +00:00
}
2024-12-17 14:19:26 +00:00
protected function updateProjectAccess(Document $project, Database $dbForPlatform): void
{
if (!$project->isEmpty() && $project->getId() !== 'console') {
$accessedAt = $project->getAttribute('accessedAt', '');
if (DateTime::formatTz(DateTime::addSeconds(new \DateTime(), -APP_PROJECT_ACCESS)) > $accessedAt) {
$project->setAttribute('accessedAt', DateTime::now());
Authorization::skip(fn () => $dbForPlatform->updateDocument('projects', $project->getId(), $project));
}
}
}
2024-01-11 03:06:59 +00:00
/**
* 1. Load all documents from 'schedules' collection to create local copy
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
*/
public function action(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
2024-01-11 03:06:59 +00:00
{
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
2025-04-30 01:04:19 +00:00
$this->publisher = new BrokerPool($pools->get('publisher'));
2024-01-11 03:06:59 +00:00
/**
* Extract only necessary attributes to lower memory used.
*
* @return array
* @throws Exception
* @var Document $schedule
*/
$getSchedule = function (Document $schedule) use ($dbForPlatform, $getProjectDB): array {
$project = $dbForPlatform->getDocument('projects', $schedule->getAttribute('projectId'));
2024-01-11 03:06:59 +00:00
$resource = $getProjectDB($project)->getDocument(
2024-05-29 16:09:14 +00:00
static::getCollectionId(),
2024-01-11 03:06:59 +00:00
$schedule->getAttribute('resourceId')
);
return [
'$internalId' => $schedule->getInternalId(),
'$id' => $schedule->getId(),
2024-01-11 03:06:59 +00:00
'resourceId' => $schedule->getAttribute('resourceId'),
'schedule' => $schedule->getAttribute('schedule'),
'active' => $schedule->getAttribute('active'),
2024-01-11 03:06:59 +00:00
'resourceUpdatedAt' => $schedule->getAttribute('resourceUpdatedAt'),
'project' => $project, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
'resource' => $resource, // TODO: @Meldiron Send only ID to worker to reduce memory usage here
];
};
$lastSyncUpdate = DateTime::now();
$limit = 10_000;
$sum = $limit;
$total = 0;
$loadStart = \microtime(true);
$latestDocument = null;
while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)];
if ($latestDocument) {
$paginationQueries[] = Query::cursorAfter($latestDocument);
}
2025-03-25 20:34:09 +00:00
// Temporarly accepting both 'fra' and 'default'
// When all migrated, only use _APP_REGION with 'default' as default value
$regions = [System::getEnv('_APP_REGION', 'default')];
if (!in_array('default', $regions)) {
$regions[] = 'default';
}
$results = $dbForPlatform->find('schedules', \array_merge($paginationQueries, [
2025-03-25 20:34:09 +00:00
Query::equal('region', $regions),
2024-01-11 03:06:59 +00:00
Query::equal('resourceType', [static::getSupportedResource()]),
Query::equal('active', [true]),
]));
$sum = \count($results);
$total = $total + $sum;
foreach ($results as $document) {
try {
$this->schedules[$document->getInternalId()] = $getSchedule($document);
2024-01-11 03:06:59 +00:00
} catch (\Throwable $th) {
2024-05-29 16:09:14 +00:00
$collectionId = static::getCollectionId();
2024-02-20 14:25:01 +00:00
Console::error("Failed to load schedule for project {$document['projectId']} {$collectionId} {$document['resourceId']}");
2024-01-11 03:06:59 +00:00
Console::error($th->getMessage());
}
}
$latestDocument = \end($results);
}
Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds");
Console::success("Starting timers at " . DateTime::now());
run(function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $pools, $getProjectDB) {
2024-10-08 07:54:40 +00:00
/**
* The timer synchronize $schedules copy with database collection.
*/
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForPlatform, &$lastSyncUpdate, $getSchedule, $pools) {
2024-10-08 07:54:40 +00:00
$time = DateTime::now();
$timerStart = \microtime(true);
2024-10-01 14:30:47 +00:00
2024-10-08 07:54:40 +00:00
$limit = 1000;
$sum = $limit;
$total = 0;
$latestDocument = null;
2024-09-20 16:30:05 +00:00
2024-10-08 07:54:40 +00:00
Console::log("Sync tick: Running at $time");
2024-09-20 16:30:05 +00:00
2024-10-08 07:54:40 +00:00
while ($sum === $limit) {
$paginationQueries = [Query::limit($limit)];
2024-09-20 16:30:05 +00:00
2024-10-08 07:54:40 +00:00
if ($latestDocument) {
$paginationQueries[] = Query::cursorAfter($latestDocument);
}
2024-01-11 03:06:59 +00:00
// Temporarly accepting both 'fra' and 'default'
// When all migrated, only use _APP_REGION with 'default' as default value
$regions = [System::getEnv('_APP_REGION', 'default')];
if (!in_array('default', $regions)) {
$regions[] = 'default';
}
$results = $dbForPlatform->find('schedules', \array_merge($paginationQueries, [
Query::equal('region', $regions),
2024-10-08 07:54:40 +00:00
Query::equal('resourceType', [static::getSupportedResource()]),
Query::greaterThanEqual('resourceUpdatedAt', $lastSyncUpdate),
]));
$sum = count($results);
$total = $total + $sum;
foreach ($results as $document) {
$localDocument = $this->schedules[$document->getInternalId()] ?? null;
// Check if resource has been updated since last sync
$org = $localDocument !== null ? \strtotime($localDocument['resourceUpdatedAt']) : null;
$new = \strtotime($document['resourceUpdatedAt']);
if (!$document['active']) {
Console::info("Removing: {$document['resourceType']}::{$document['resourceId']}");
unset($this->schedules[$document->getInternalId()]);
} elseif ($new !== $org) {
Console::info("Updating: {$document['resourceType']}::{$document['resourceId']}");
$this->schedules[$document->getInternalId()] = $getSchedule($document);
}
2024-10-01 14:30:47 +00:00
}
2024-10-08 07:54:40 +00:00
$latestDocument = \end($results);
2024-01-11 03:06:59 +00:00
}
2024-10-08 07:54:40 +00:00
$lastSyncUpdate = $time;
$timerEnd = \microtime(true);
2024-01-11 03:06:59 +00:00
2024-10-08 07:54:40 +00:00
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
});
2024-01-11 03:06:59 +00:00
2024-10-08 07:54:40 +00:00
Timer::tick(
static::ENQUEUE_TIMER * 1000,
fn () => $this->enqueueResources($pools, $dbForPlatform, $getProjectDB)
2024-10-08 07:54:40 +00:00
);
2024-06-03 18:09:58 +00:00
$this->enqueueResources($pools, $dbForPlatform, $getProjectDB);
2024-10-08 07:54:40 +00:00
});
2024-01-11 03:06:59 +00:00
}
}