Merge remote-tracking branch 'origin/1.6.x' into feat-pool-adapter

This commit is contained in:
Jake Barnby 2025-05-14 00:36:45 +12:00
commit a4239fc5dc
No known key found for this signature in database
GPG key ID: C437A8CC85B96E9C
4 changed files with 28 additions and 18 deletions

View file

@ -91,13 +91,18 @@ abstract class ScheduleBase extends Action
});
while (true) {
$this->enqueueResources($pools, $dbForPlatform, $getProjectDB);
$this->scheduleTelemetryCount->record(count($this->schedules), ['resourceType' => static::getSupportedResource()]);
sleep(static::ENQUEUE_TIMER);
try {
go(fn () => $this->enqueueResources($pools, $dbForPlatform, $getProjectDB));
$this->scheduleTelemetryCount->record(count($this->schedules), ['resourceType' => static::getSupportedResource()]);
sleep(static::ENQUEUE_TIMER);
} catch (\Throwable $th) {
Console::error('Failed to enqueue resources: ' . $th->getMessage());
}
}
}
private function collectSchedules(Group $pools, Database $dbForPlatform, callable $getProjectDB, ?string &$lastSyncUpdate): void
private function collectSchedules(Group $pools, Database $dbForPlatform, callable $getProjectDB, string &$lastSyncUpdate): void
{
// If we haven't synced yet, load all active schedules
$initialLoad = $lastSyncUpdate === "0";
@ -202,10 +207,8 @@ abstract class ScheduleBase extends Action
Console::success("{$total} resources were loaded in " . $duration . " seconds");
}
protected function recordEnqueueDelay(string $expectedExecutionSchedule): void
protected function recordEnqueueDelay(\DateTime $expectedExecutionSchedule): void
{
$now = strtotime('now');
$scheduledAt = strtotime($expectedExecutionSchedule);
$this->enqueueDelayTelemetry->record($now - $scheduledAt, ['resourceType' => static::getSupportedResource()]);
$this->enqueueDelayTelemetry->record(time() - $expectedExecutionSchedule->getTimestamp(), ['resourceType' => static::getSupportedResource()]);
}
}

View file

@ -59,7 +59,7 @@ class ScheduleExecutions extends ScheduleBase
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
\go(function () use ($queueForFunctions, $schedule, $delay, $data) {
\go(function () use ($queueForFunctions, $schedule, $scheduledAt, $delay, $data) {
Co::sleep($delay);
$queueForFunctions->setType('schedule')
@ -75,7 +75,7 @@ class ScheduleExecutions extends ScheduleBase
->setUserId($data['userId'] ?? '')
->trigger();
$this->recordEnqueueDelay($schedule['schedule']);
$this->recordEnqueueDelay($scheduledAt);
});
$dbForPlatform->deleteDocument(

View file

@ -46,7 +46,13 @@ class ScheduleFunctions extends ScheduleBase
$delayedExecutions = []; // Group executions with same delay to share one coroutine
foreach ($this->schedules as $key => $schedule) {
$cron = new CronExpression($schedule['schedule']);
try {
$cron = new CronExpression($schedule['schedule']);
} catch (\InvalidArgumentException) {
// ignore invalid cron expressions
continue;
}
$nextDate = $cron->getNextRunDate();
$next = DateTime::format($nextDate);
@ -66,17 +72,18 @@ class ScheduleFunctions extends ScheduleBase
$delayedExecutions[$delay] = [];
}
$delayedExecutions[$delay][] = $key;
$delayedExecutions[$delay][] = ['key' => $key, 'nextDate' => $nextDate];
}
foreach ($delayedExecutions as $delay => $scheduleKeys) {
\go(function () use ($delay, $scheduleKeys, $pools, $dbForPlatform) {
foreach ($delayedExecutions as $delay => $schedules) {
\go(function () use ($delay, $schedules, $pools, $dbForPlatform) {
\sleep($delay); // in seconds
$queue = $pools->get('publisher')->pop();
$connection = $queue->getResource();
foreach ($scheduleKeys as $scheduleKey) {
foreach ($schedules as $delayConfig) {
$scheduleKey = $delayConfig['key'];
// Ensure schedule was not deleted
if (!\array_key_exists($scheduleKey, $this->schedules)) {
return;
@ -96,7 +103,7 @@ class ScheduleFunctions extends ScheduleBase
->setProject($schedule['project'])
->trigger();
$this->recordEnqueueDelay($schedule['schedule']);
$this->recordEnqueueDelay($delayConfig['nextDate']);
}
$queue->reclaim();

View file

@ -40,7 +40,7 @@ class ScheduleMessages extends ScheduleBase
continue;
}
\go(function () use ($schedule, $pools, $dbForPlatform) {
\go(function () use ($schedule, $scheduledAt, $pools, $dbForPlatform) {
$queue = $pools->get('publisher')->pop();
$connection = $queue->getResource();
$queueForMessaging = new Messaging($connection);
@ -59,7 +59,7 @@ class ScheduleMessages extends ScheduleBase
);
$queue->reclaim();
$this->recordEnqueueDelay($schedule['schedule']);
$this->recordEnqueueDelay($scheduledAt);
unset($this->schedules[$schedule['$internalId']]);
});
}