mirror of
https://github.com/appwrite/appwrite
synced 2026-05-23 08:58:35 +00:00
Merge branch '1.6.x' into feat/index-length
This commit is contained in:
commit
9731953c09
4 changed files with 28 additions and 18 deletions
|
|
@ -91,13 +91,18 @@ abstract class ScheduleBase extends Action
|
|||
});
|
||||
|
||||
while (true) {
|
||||
$this->enqueueResources($pools, $dbForPlatform, $getProjectDB);
|
||||
$this->scheduleTelemetryCount->record(count($this->schedules), ['resourceType' => static::getSupportedResource()]);
|
||||
sleep(static::ENQUEUE_TIMER);
|
||||
try {
|
||||
go(fn () => $this->enqueueResources($pools, $dbForPlatform, $getProjectDB));
|
||||
$this->scheduleTelemetryCount->record(count($this->schedules), ['resourceType' => static::getSupportedResource()]);
|
||||
sleep(static::ENQUEUE_TIMER);
|
||||
} catch (\Throwable $th) {
|
||||
Console::error('Failed to enqueue resources: ' . $th->getMessage());
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private function collectSchedules(Group $pools, Database $dbForPlatform, callable $getProjectDB, ?string &$lastSyncUpdate): void
|
||||
private function collectSchedules(Group $pools, Database $dbForPlatform, callable $getProjectDB, string &$lastSyncUpdate): void
|
||||
{
|
||||
// If we haven't synced yet, load all active schedules
|
||||
$initialLoad = $lastSyncUpdate === "0";
|
||||
|
|
@ -202,10 +207,8 @@ abstract class ScheduleBase extends Action
|
|||
Console::success("{$total} resources were loaded in " . $duration . " seconds");
|
||||
}
|
||||
|
||||
protected function recordEnqueueDelay(string $expectedExecutionSchedule): void
|
||||
protected function recordEnqueueDelay(\DateTime $expectedExecutionSchedule): void
|
||||
{
|
||||
$now = strtotime('now');
|
||||
$scheduledAt = strtotime($expectedExecutionSchedule);
|
||||
$this->enqueueDelayTelemetry->record($now - $scheduledAt, ['resourceType' => static::getSupportedResource()]);
|
||||
$this->enqueueDelayTelemetry->record(time() - $expectedExecutionSchedule->getTimestamp(), ['resourceType' => static::getSupportedResource()]);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -59,7 +59,7 @@ class ScheduleExecutions extends ScheduleBase
|
|||
|
||||
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
|
||||
|
||||
\go(function () use ($queueForFunctions, $schedule, $delay, $data) {
|
||||
\go(function () use ($queueForFunctions, $schedule, $scheduledAt, $delay, $data) {
|
||||
Co::sleep($delay);
|
||||
|
||||
$queueForFunctions->setType('schedule')
|
||||
|
|
@ -75,7 +75,7 @@ class ScheduleExecutions extends ScheduleBase
|
|||
->setUserId($data['userId'] ?? '')
|
||||
->trigger();
|
||||
|
||||
$this->recordEnqueueDelay($schedule['schedule']);
|
||||
$this->recordEnqueueDelay($scheduledAt);
|
||||
});
|
||||
|
||||
$dbForPlatform->deleteDocument(
|
||||
|
|
|
|||
|
|
@ -46,7 +46,13 @@ class ScheduleFunctions extends ScheduleBase
|
|||
$delayedExecutions = []; // Group executions with same delay to share one coroutine
|
||||
|
||||
foreach ($this->schedules as $key => $schedule) {
|
||||
$cron = new CronExpression($schedule['schedule']);
|
||||
try {
|
||||
$cron = new CronExpression($schedule['schedule']);
|
||||
} catch (\InvalidArgumentException) {
|
||||
// ignore invalid cron expressions
|
||||
continue;
|
||||
}
|
||||
|
||||
$nextDate = $cron->getNextRunDate();
|
||||
$next = DateTime::format($nextDate);
|
||||
|
||||
|
|
@ -66,17 +72,18 @@ class ScheduleFunctions extends ScheduleBase
|
|||
$delayedExecutions[$delay] = [];
|
||||
}
|
||||
|
||||
$delayedExecutions[$delay][] = $key;
|
||||
$delayedExecutions[$delay][] = ['key' => $key, 'nextDate' => $nextDate];
|
||||
}
|
||||
|
||||
foreach ($delayedExecutions as $delay => $scheduleKeys) {
|
||||
\go(function () use ($delay, $scheduleKeys, $pools, $dbForPlatform) {
|
||||
foreach ($delayedExecutions as $delay => $schedules) {
|
||||
\go(function () use ($delay, $schedules, $pools, $dbForPlatform) {
|
||||
\sleep($delay); // in seconds
|
||||
|
||||
$queue = $pools->get('publisher')->pop();
|
||||
$connection = $queue->getResource();
|
||||
|
||||
foreach ($scheduleKeys as $scheduleKey) {
|
||||
foreach ($schedules as $delayConfig) {
|
||||
$scheduleKey = $delayConfig['key'];
|
||||
// Ensure schedule was not deleted
|
||||
if (!\array_key_exists($scheduleKey, $this->schedules)) {
|
||||
return;
|
||||
|
|
@ -96,7 +103,7 @@ class ScheduleFunctions extends ScheduleBase
|
|||
->setProject($schedule['project'])
|
||||
->trigger();
|
||||
|
||||
$this->recordEnqueueDelay($schedule['schedule']);
|
||||
$this->recordEnqueueDelay($delayConfig['nextDate']);
|
||||
}
|
||||
|
||||
$queue->reclaim();
|
||||
|
|
|
|||
|
|
@ -40,7 +40,7 @@ class ScheduleMessages extends ScheduleBase
|
|||
continue;
|
||||
}
|
||||
|
||||
\go(function () use ($schedule, $pools, $dbForPlatform) {
|
||||
\go(function () use ($schedule, $scheduledAt, $pools, $dbForPlatform) {
|
||||
$queue = $pools->get('publisher')->pop();
|
||||
$connection = $queue->getResource();
|
||||
$queueForMessaging = new Messaging($connection);
|
||||
|
|
@ -59,7 +59,7 @@ class ScheduleMessages extends ScheduleBase
|
|||
);
|
||||
|
||||
$queue->reclaim();
|
||||
$this->recordEnqueueDelay($schedule['schedule']);
|
||||
$this->recordEnqueueDelay($scheduledAt);
|
||||
unset($this->schedules[$schedule['$internalId']]);
|
||||
});
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue