mirror of
https://github.com/appwrite/appwrite
synced 2026-05-06 06:48:22 +00:00
Merge pull request #8639 from appwrite/fix-scheduled-executions
Fix: scheduled executions data
This commit is contained in:
commit
af126a28ae
8 changed files with 90 additions and 29 deletions
|
|
@ -1920,7 +1920,7 @@ App::post('/v1/functions/:functionId/executions')
|
|||
'path' => $path,
|
||||
'method' => $method,
|
||||
'body' => $body,
|
||||
'jwt' => $jwt,
|
||||
'userId' => $user->getId()
|
||||
];
|
||||
|
||||
$schedule = $dbForConsole->createDocument('schedules', new Document([
|
||||
|
|
|
|||
|
|
@ -54,6 +54,7 @@ class Event
|
|||
protected array $context = [];
|
||||
protected ?Document $project = null;
|
||||
protected ?Document $user = null;
|
||||
protected ?string $userId = null;
|
||||
protected bool $paused = false;
|
||||
|
||||
/**
|
||||
|
|
@ -145,6 +146,18 @@ class Event
|
|||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set user ID for this event.
|
||||
*
|
||||
* @return self
|
||||
*/
|
||||
public function setUserId(string $userId): self
|
||||
{
|
||||
$this->userId = $userId;
|
||||
|
||||
return $this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get user responsible for triggering this event.
|
||||
*
|
||||
|
|
@ -155,6 +168,14 @@ class Event
|
|||
return $this->user;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get user responsible for triggering this event.
|
||||
*/
|
||||
public function getUserId(): ?string
|
||||
{
|
||||
return $this->userId;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set payload for this event.
|
||||
*
|
||||
|
|
@ -303,6 +324,7 @@ class Event
|
|||
return $client->enqueue([
|
||||
'project' => $this->project,
|
||||
'user' => $this->user,
|
||||
'userId' => $this->userId,
|
||||
'payload' => $this->payload,
|
||||
'context' => $this->context,
|
||||
'events' => Event::generateEvents($this->getEvent(), $this->getParams())
|
||||
|
|
|
|||
|
|
@ -222,6 +222,7 @@ class Func extends Event
|
|||
return $client->enqueue([
|
||||
'project' => $this->project,
|
||||
'user' => $this->user,
|
||||
'userId' => $this->userId,
|
||||
'function' => $this->function,
|
||||
'functionId' => $this->functionId,
|
||||
'execution' => $this->execution,
|
||||
|
|
|
|||
|
|
@ -45,23 +45,27 @@ class ScheduleExecutions extends ScheduleBase
|
|||
continue;
|
||||
}
|
||||
|
||||
$data = $dbForConsole->getDocument(
|
||||
'schedules',
|
||||
$schedule['$id'],
|
||||
)->getAttribute('data', []);
|
||||
|
||||
$delay = $scheduledAt->getTimestamp() - (new \DateTime())->getTimestamp();
|
||||
|
||||
|
||||
\go(function () use ($queueForFunctions, $schedule, $delay) {
|
||||
\go(function () use ($queueForFunctions, $schedule, $delay, $data) {
|
||||
Co::sleep($delay);
|
||||
|
||||
$queueForFunctions
|
||||
->setType('schedule')
|
||||
$queueForFunctions->setType('schedule')
|
||||
// Set functionId instead of function as we don't have $dbForProject
|
||||
// TODO: Refactor to use function instead of functionId
|
||||
->setFunctionId($schedule['resource']['functionId'])
|
||||
->setExecution($schedule['resource'])
|
||||
->setMethod($schedule['data']['method'] ?? 'POST')
|
||||
->setPath($schedule['data']['path'] ?? '/')
|
||||
->setHeaders($schedule['data']['headers'] ?? [])
|
||||
->setBody($schedule['data']['body'] ?? '')
|
||||
->setMethod($data['method'] ?? 'POST')
|
||||
->setPath($data['path'] ?? '/')
|
||||
->setHeaders($data['headers'] ?? [])
|
||||
->setBody($data['body'] ?? '')
|
||||
->setProject($schedule['project'])
|
||||
->setUserId($data['userId'] ?? '')
|
||||
->trigger();
|
||||
});
|
||||
|
||||
|
|
|
|||
|
|
@ -71,12 +71,6 @@ class Functions extends Action
|
|||
throw new Exception('Missing payload');
|
||||
}
|
||||
|
||||
$payload = $message->getPayload() ?? [];
|
||||
|
||||
if (empty($payload)) {
|
||||
throw new Exception('Missing payload');
|
||||
}
|
||||
|
||||
$type = $payload['type'] ?? '';
|
||||
$events = $payload['events'] ?? [];
|
||||
$data = $payload['body'] ?? '';
|
||||
|
|
@ -85,9 +79,23 @@ class Functions extends Action
|
|||
$function = new Document($payload['function'] ?? []);
|
||||
$functionId = $payload['functionId'] ?? '';
|
||||
$user = new Document($payload['user'] ?? []);
|
||||
$userId = $payload['userId'] ?? '';
|
||||
$method = $payload['method'] ?? 'POST';
|
||||
$headers = $payload['headers'] ?? [];
|
||||
$path = $payload['path'] ?? '/';
|
||||
$jwt = $payload['jwt'] ?? '';
|
||||
|
||||
if ($user->isEmpty() && !empty($userId)) {
|
||||
$user = $dbForProject->getDocument('users', $userId);
|
||||
}
|
||||
|
||||
if (empty($jwt) && !$user->isEmpty()) {
|
||||
$jwtExpiry = $function->getAttribute('timeout', 900);
|
||||
$jwtObj = new JWT(System::getEnv('_APP_OPENSSL_KEY_V1'), 'HS256', $jwtExpiry, 0);
|
||||
$jwt = $jwtObj->encode([
|
||||
'userId' => $user->getId(),
|
||||
]);
|
||||
}
|
||||
|
||||
if ($project->getId() === 'console') {
|
||||
return;
|
||||
|
|
@ -157,7 +165,6 @@ class Functions extends Action
|
|||
*/
|
||||
switch ($type) {
|
||||
case 'http':
|
||||
$jwt = $payload['jwt'] ?? '';
|
||||
$execution = new Document($payload['execution'] ?? []);
|
||||
$user = new Document($payload['user'] ?? []);
|
||||
$this->execute(
|
||||
|
|
@ -194,9 +201,9 @@ class Functions extends Action
|
|||
path: $path,
|
||||
method: $method,
|
||||
headers: $headers,
|
||||
data: null,
|
||||
user: null,
|
||||
jwt: null,
|
||||
data: $data,
|
||||
user: $user,
|
||||
jwt: $jwt,
|
||||
event: null,
|
||||
eventData: null,
|
||||
executionId: $execution->getId() ?? null
|
||||
|
|
|
|||
|
|
@ -164,7 +164,8 @@ class FunctionsCustomClientTest extends Scope
|
|||
$this->assertEquals($executions['body']['executions'][1]['status'], 'completed');
|
||||
$this->assertEquals($executions['body']['executions'][1]['responseStatusCode'], 200);
|
||||
$this->assertEquals($executions['body']['executions'][1]['responseBody'], '');
|
||||
$this->assertEquals($executions['body']['executions'][1]['logs'], '');
|
||||
$this->assertNotEmpty($executions['body']['executions'][1]['logs'], '');
|
||||
$this->assertNotEmpty($executions['body']['executions'][1]['errors'], '');
|
||||
$this->assertGreaterThan(0, $executions['body']['executions'][1]['duration']);
|
||||
|
||||
// Cleanup : Delete function
|
||||
|
|
@ -228,17 +229,22 @@ class FunctionsCustomClientTest extends Scope
|
|||
], $this->getHeaders()), [
|
||||
'async' => true,
|
||||
'scheduledAt' => $futureTime->format(\DateTime::ATOM),
|
||||
'path' => '/custom',
|
||||
'method' => 'GET'
|
||||
'path' => '/custom-path',
|
||||
'method' => 'PATCH',
|
||||
'body' => 'custom-body',
|
||||
'headers' => [
|
||||
'x-custom-header' => 'custom-value'
|
||||
]
|
||||
]);
|
||||
|
||||
$this->assertEquals(202, $execution['headers']['status-code']);
|
||||
$this->assertEquals('scheduled', $execution['body']['status']);
|
||||
$this->assertEquals('PATCH', $execution['body']['requestMethod']);
|
||||
$this->assertEquals('/custom-path', $execution['body']['requestPath']);
|
||||
$this->assertCount(0, $execution['body']['requestHeaders']);
|
||||
|
||||
$executionId = $execution['body']['$id'];
|
||||
|
||||
sleep(60 + 60 + 15); // up to 1 minute round up, 1 minute schedule postpone, 15s cold start safety
|
||||
|
||||
$start = \microtime(true);
|
||||
while (true) {
|
||||
$execution = $this->client->call(Client::METHOD_GET, '/functions/' . $function['body']['$id'] . '/executions/' . $executionId, [
|
||||
|
|
@ -251,7 +257,8 @@ class FunctionsCustomClientTest extends Scope
|
|||
break;
|
||||
}
|
||||
|
||||
if (\microtime(true) - $start > 10) {
|
||||
$timeout = 60 + 60 + 15; // up to 1 minute round up, 1 minute schedule postpone, 15s cold start safety
|
||||
if (\microtime(true) - $start > $timeout) {
|
||||
$this->fail('Scheduled execution did not complete with status ' . $execution['body']['status'] . ': ' . \json_encode($execution));
|
||||
}
|
||||
|
||||
|
|
@ -261,8 +268,14 @@ class FunctionsCustomClientTest extends Scope
|
|||
$this->assertEquals(200, $execution['headers']['status-code']);
|
||||
$this->assertEquals(200, $execution['body']['responseStatusCode']);
|
||||
$this->assertEquals('completed', $execution['body']['status']);
|
||||
$this->assertEquals('/custom', $execution['body']['requestPath']);
|
||||
$this->assertEquals('GET', $execution['body']['requestMethod']);
|
||||
$this->assertEquals('/custom-path', $execution['body']['requestPath']);
|
||||
$this->assertEquals('PATCH', $execution['body']['requestMethod']);
|
||||
$this->assertStringContainsString('body-is-custom-body', $execution['body']['logs']);
|
||||
$this->assertStringContainsString('custom-header-is-custom-value', $execution['body']['logs']);
|
||||
$this->assertStringContainsString('method-is-patch', $execution['body']['logs']);
|
||||
$this->assertStringContainsString('path-is-/custom-path', $execution['body']['logs']);
|
||||
$this->assertStringContainsString('user-is-' . $this->getUser()['$id'], $execution['body']['logs']);
|
||||
$this->assertStringContainsString('jwt-is-valid', $execution['body']['logs']);
|
||||
$this->assertGreaterThan(0, $execution['body']['duration']);
|
||||
|
||||
/* Test for FAILURE */
|
||||
|
|
|
|||
|
|
@ -1179,8 +1179,8 @@ class FunctionsCustomServerTest extends Scope
|
|||
$this->assertStringContainsString('8.0', $execution['body']['responseBody']);
|
||||
$this->assertStringContainsString('Global Variable Value', $execution['body']['responseBody']);
|
||||
// $this->assertStringContainsString('êä', $execution['body']['responseBody']); // tests unknown utf-8 chars
|
||||
$this->assertEquals('', $execution['body']['errors']);
|
||||
$this->assertEquals('', $execution['body']['logs']);
|
||||
$this->assertNotEmpty($execution['body']['errors']);
|
||||
$this->assertNotEmpty($execution['body']['logs']);
|
||||
$this->assertLessThan(10, $execution['body']['duration']);
|
||||
|
||||
$execution = $this->client->call(Client::METHOD_POST, '/functions/' . $data['functionId'] . '/executions', array_merge([
|
||||
|
|
|
|||
|
|
@ -1,6 +1,20 @@
|
|||
<?php
|
||||
|
||||
return function ($context) {
|
||||
$context->log('body-is-' . ($context->req->body ?? ''));
|
||||
$context->log('custom-header-is-' . ($context->req->headers['x-custom-header'] ?? ''));
|
||||
$context->log('method-is-' . \strtolower($context->req->method ?? ''));
|
||||
$context->log('path-is-' . ($context->req->path ?? ''));
|
||||
$context->log('user-is-' . $context->req->headers['x-appwrite-user-id'] ?? '');
|
||||
|
||||
if (empty($context->req->headers['x-appwrite-user-jwt'] ?? '')) {
|
||||
$context->log('jwt-is-invalid');
|
||||
} else {
|
||||
$context->log('jwt-is-valid');
|
||||
}
|
||||
|
||||
$context->error('error-log-works');
|
||||
|
||||
$statusCode = $context->req->query['code'] ?? '200';
|
||||
|
||||
return $context->res->json([
|
||||
|
|
|
|||
Loading…
Reference in a new issue