appwrite/src/Appwrite/Platform/Tasks/ScheduleMessages.php

72 lines
1.9 KiB
PHP
Raw Normal View History

2024-01-11 03:06:59 +00:00
<?php
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Delete;
use Swoole\Timer;
2024-01-15 06:31:42 +00:00
use Utopia\Database\Document;
2024-01-11 03:06:59 +00:00
use Utopia\Platform\Action;
use Utopia\CLI\Console;
use Utopia\Database\DateTime;
use Utopia\Database\Query;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Appwrite\Event\Messaging;
use function Swoole\Coroutine\run;
class ScheduleMessages extends ScheduleBase
{
2024-01-15 05:43:17 +00:00
public const UPDATE_TIMER = 10; // seconds
2024-01-11 03:06:59 +00:00
public const ENQUEUE_TIMER = 60; // seconds
public static function getName(): string
{
return 'schedule-messages';
}
public static function getSupportedResource(): string
{
return 'message';
}
protected function enqueueResources(Group $pools, Database $dbForConsole): void
{
foreach ($this->schedules as $schedule) {
2024-01-15 05:43:17 +00:00
$now = DateTime::now();
$scheduledAt = DateTime::formatTz($schedule['scheduledAt']);
if ($scheduledAt > $now) {
continue;
}
2024-01-11 03:06:59 +00:00
\go(function () use ($schedule, $pools, $dbForConsole) {
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
$queueForMessaging = new Messaging($connection);
$queueForDeletes = new Delete($connection);
$queueForMessaging
->setMessageId($schedule['resourceId'])
->setProject($schedule['project'])
->trigger();
$dbForConsole->updateDocument(
'schedules',
$schedule['$id'],
2024-01-15 06:31:42 +00:00
new Document(['active' => false])
);
2024-01-11 03:06:59 +00:00
$queueForDeletes
->setType(DELETE_TYPE_SCHEDULES)
->setDocument($schedule)
->trigger();
$queue->reclaim();
2024-01-15 05:43:17 +00:00
unset($this->schedules[$schedule['resourceId']]);
2024-01-11 03:06:59 +00:00
});
}
}
}