Merge pull request #8412 from appwrite/feat-schedule-execution-improvements

feat: schedule execution improvements
This commit is contained in:
Christy Jacob 2024-08-12 19:59:56 +04:00 committed by GitHub
commit ad43e1c419
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 44 additions and 59 deletions

View file

@ -3,6 +3,7 @@
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Func;
use Swoole\Coroutine as Co;
use Utopia\Database\Database;
use Utopia\Pools\Group;
@ -26,6 +27,7 @@ class ScheduleExecutions extends ScheduleBase
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
$queueForFunctions = new Func($connection);
$intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds');
foreach ($this->schedules as $schedule) {
if (!$schedule['active']) {
@ -38,25 +40,30 @@ class ScheduleExecutions extends ScheduleBase
continue;
}
$now = new \DateTime();
$scheduledAt = new \DateTime($schedule['schedule']);
if ($scheduledAt > $now) {
if ($scheduledAt <= $intervalEnd) {
continue;
}
$queueForFunctions
->setType('schedule')
// Set functionId instead of function as we don't have $dbForProject
// TODO: Refactor to use function instead of functionId
->setFunctionId($schedule['resource']['functionId'])
->setExecution($schedule['resource'])
->setMethod($schedule['data']['method'] ?? 'POST')
->setPath($schedule['data']['path'] ?? '/')
->setHeaders($schedule['data']['headers'] ?? [])
->setBody($schedule['data']['body'] ?? '')
->setProject($schedule['project'])
->trigger();
$delay = $scheduledAt->getTimestamp() - (new \DateTime())->getTimestamp();
\go(function () use ($queueForFunctions, $schedule, $delay) {
Co::sleep($delay);
$queueForFunctions
->setType('schedule')
// Set functionId instead of function as we don't have $dbForProject
// TODO: Refactor to use function instead of functionId
->setFunctionId($schedule['resource']['functionId'])
->setExecution($schedule['resource'])
->setMethod($schedule['data']['method'] ?? 'POST')
->setPath($schedule['data']['path'] ?? '/')
->setHeaders($schedule['data']['headers'] ?? [])
->setBody($schedule['data']['body'] ?? '')
->setProject($schedule['project'])
->trigger();
});
$dbForConsole->deleteDocument(
'schedules',

View file

@ -9,7 +9,6 @@ use Tests\E2E\Scopes\ProjectCustom;
use Tests\E2E\Scopes\Scope;
use Tests\E2E\Scopes\SideClient;
use Utopia\Config\Config;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Helpers\ID;
use Utopia\Database\Helpers\Role;
@ -195,10 +194,6 @@ class FunctionsCustomClientTest extends Scope
'execute' => [Role::user($this->getUser()['$id'])->toString()],
'runtime' => 'php-8.0',
'entrypoint' => 'index.php',
'events' => [
'users.*.create',
'users.*.delete',
],
'timeout' => 10,
]);
@ -217,66 +212,50 @@ class FunctionsCustomClientTest extends Scope
'code' => new CURLFile($code, 'application/x-gzip', \basename($code)),
'activate' => true
]);
$deploymentId = $deployment['body']['$id'] ?? '';
$this->assertEquals(202, $deployment['headers']['status-code']);
$this->awaitDeploymentIsBuilt($function['body']['$id'], $deploymentId);
$function = $this->client->call(Client::METHOD_PATCH, '/functions/' . $function['body']['$id'] . '/deployments/' . $deploymentId, [
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
], []);
$this->assertEquals(200, $function['headers']['status-code']);
$this->awaitDeploymentIsBuilt($function['body']['$id'], $deploymentId, true);
// Schedule execution for the future
\date_default_timezone_set('UTC');
$futureTime = (new \DateTime())->add(new \DateInterval('PT10S'))->format('Y-m-d H:i:s');
$futureTimeIso = DateTime::formatTz($futureTime);
$futureTime = (new \DateTime())->add(new \DateInterval('PT10S'));
$execution = $this->client->call(Client::METHOD_POST, '/functions/' . $function['body']['$id'] . '/executions', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()), [
'async' => true,
'scheduledAt' => $futureTime,
'scheduledAt' => $futureTime->format(\DateTime::ATOM),
'path' => '/custom',
'method' => 'GET',
'body' => 'hello',
'headers' => [
'content-type' => 'application/plain',
],
'method' => 'GET'
]);
$this->assertEquals(202, $execution['headers']['status-code']);
$this->assertEquals('scheduled', $execution['body']['status']);
$this->assertEquals($futureTimeIso, $execution['body']['scheduledAt']);
$executionId = $execution['body']['$id'];
// List executions and ensure it has schedule date
$response = $this->client->call(Client::METHOD_GET, '/functions/' . $function['body']['$id'] . '/executions', [
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
]);
sleep(10);
$this->assertEquals(200, $response['headers']['status-code']);
$this->assertGreaterThan(0, \count($response['body']['executions']));
$recentExecution = $response['body']['executions'][0];
$this->assertEquals($executionId, $recentExecution['$id']);
$this->assertEquals($futureTimeIso, $recentExecution['scheduledAt']);
$start = \microtime(true);
while (true) {
$execution = $this->client->call(Client::METHOD_GET, '/functions/' . $function['body']['$id'] . '/executions/' . $executionId, [
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
]);
sleep(20);
if ($execution['body']['status'] === 'completed') {
break;
}
$execution = $this->client->call(Client::METHOD_GET, '/functions/' . $function['body']['$id'] . '/executions/' . $executionId, [
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
'x-appwrite-key' => $this->getProject()['apiKey'],
]);
if (\microtime(true) - $start > 5) {
$this->fail('Execution did not complete within 5 seconds of schedule');
}
usleep(500000); // 0.5 seconds
}
$this->assertEquals(200, $execution['headers']['status-code']);
$this->assertEquals(200, $execution['body']['responseStatusCode']);
@ -284,7 +263,6 @@ class FunctionsCustomClientTest extends Scope
$this->assertEquals('/custom', $execution['body']['requestPath']);
$this->assertEquals('GET', $execution['body']['requestMethod']);
$this->assertGreaterThan(0, $execution['body']['duration']);
$this->assertEquals($futureTimeIso, $execution['body']['scheduledAt']);
/* Test for FAILURE */
@ -295,7 +273,7 @@ class FunctionsCustomClientTest extends Scope
'x-appwrite-project' => $this->getProject()['$id'],
], $this->getHeaders()), [
'async' => false,
'scheduledAt' => $futureTime,
'scheduledAt' => $futureTime->format(\DateTime::ATOM),
]);
$this->assertEquals(400, $execution['headers']['status-code']);