Merge pull request #9519 from appwrite/fix-realtime-deployments

Fix: Realtime events in build worker
This commit is contained in:
Matej Bačo 2025-03-17 15:30:54 +01:00 committed by GitHub
commit f1b5807174
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 101 additions and 61 deletions

View file

@ -122,6 +122,14 @@ jobs:
docker load --input /tmp/${{ env.IMAGE }}.tar
docker compose up -d
sleep 10
- name: Wait for Open Runtimes
timeout-minutes: 3
run: |
while ! docker compose logs openruntimes-executor | grep -q "Executor is ready."; do
echo "Waiting for Executor to come online"
sleep 1
done
- name: Run General Tests
run: docker compose exec -T appwrite test /usr/src/code/tests/e2e/General --debug

View file

@ -190,6 +190,12 @@ class Builds extends Action
// Realtime preparation
$event = "{$resource->getCollection()}.[{$resourceKey}].deployments.[deploymentId].update";
$queueForRealtime
->setSubscribers(['console'])
->setProject($project)
->setEvent($event)
->setParam($resourceKey, $resource->getId())
->setParam('deploymentId', $deployment->getId());
$startTime = DateTime::now();
$durationStart = \microtime(true);
@ -205,6 +211,10 @@ class Builds extends Action
$deployment->setAttribute('status', 'processing');
$deployment = $dbForProject->updateDocument('deployments', $deployment->getId(), $deployment);
$queueForRealtime
->setPayload($deployment->getArrayCopy())
->trigger();
$source = $deployment->getAttribute('sourcePath', '');
$installationId = $deployment->getAttribute('installationId', '');
$providerRepositoryId = $deployment->getAttribute('providerRepositoryId', '');
@ -277,6 +287,10 @@ class Builds extends Action
->setAttribute('sourcePath', $source)
->setAttribute('sourceSize', $directorySize);
$deployment = $dbForProject->updateDocument('deployments', $deployment->getId(), $deployment);
$queueForRealtime
->setPayload($deployment->getArrayCopy())
->trigger();
}
} elseif ($isVcsEnabled) {
// VCS and VCS+Temaplte
@ -384,15 +398,7 @@ class Builds extends Action
$deployment->setAttribute('providerCommitUrl', "https://github.com/$cloneOwner/$cloneRepository/commit/$providerCommitHash");
$deployment = $dbForProject->updateDocument('deployments', $deployment->getId(), $deployment);
/**
* Trigger Realtime Event
*/
$queueForRealtime
->setSubscribers(['console'])
->setProject($project)
->setEvent($event)
->setParam($resourceKey, $resource->getId())
->setParam('deploymentId', $deployment->getId())
->setPayload($deployment->getArrayCopy())
->trigger();
}
@ -433,6 +439,10 @@ class Builds extends Action
->setAttribute('sourceSize', $directorySize);
$deployment = $dbForProject->updateDocument('deployments', $deployment->getId(), $deployment);
$queueForRealtime
->setPayload($deployment->getArrayCopy())
->trigger();
$this->runGitAction('processing', $github, $providerCommitHash, $owner, $repositoryName, $project, $resource, $deployment->getId(), $dbForProject, $dbForPlatform);
}
@ -440,6 +450,10 @@ class Builds extends Action
$deployment->setAttribute('status', 'building');
$deployment = $dbForProject->updateDocument('deployments', $deployment->getId(), $deployment);
$queueForRealtime
->setPayload($deployment->getArrayCopy())
->trigger();
if ($isVcsEnabled) {
$this->runGitAction('building', $github, $providerCommitHash, $owner, $repositoryName, $project, $resource, $deployment->getId(), $dbForProject, $dbForPlatform);
}
@ -465,11 +479,6 @@ class Builds extends Action
/** Trigger Realtime Event */
$queueForRealtime
->setSubscribers(['console'])
->setProject($project)
->setEvent($event)
->setParam($resourceKey, $resource->getId())
->setParam('deploymentId', $deployment->getId())
->setPayload($deployment->getArrayCopy())
->trigger();
@ -577,7 +586,6 @@ class Builds extends Action
try {
$command = $version === 'v2' ? 'tar -zxf /tmp/code.tar.gz -C /usr/code && cd /usr/local/src/ && ./build.sh' : 'tar -zxf /tmp/code.tar.gz -C /mnt/code && helpers/build.sh "' . \trim(\escapeshellarg($command), "\'") . '"';
// TODO: Detect adapter if adapter is empty
$response = $executor->createRuntime(
deploymentId: $deployment->getId(),
projectId: $project->getId(),
@ -598,13 +606,13 @@ class Builds extends Action
$err = $error;
}
}),
Co\go(function () use ($executor, $project, &$deployment, &$response, $dbForProject, $event, $timeout, &$err, $queueForRealtime, &$isCanceled, $resourceKey, $resource) {
Co\go(function () use ($executor, $project, &$deployment, &$response, $dbForProject, $timeout, &$err, $queueForRealtime, &$isCanceled) {
try {
$executor->getLogs(
deploymentId: $deployment->getId(),
projectId: $project->getId(),
timeout: $timeout,
callback: function ($logs) use (&$response, &$err, $dbForProject, $event, $project, &$isCanceled, &$deployment, $queueForRealtime, $resourceKey, $resource) {
callback: function ($logs) use (&$response, &$err, $dbForProject, &$isCanceled, &$deployment, $queueForRealtime) {
if ($isCanceled) {
return;
}
@ -640,15 +648,7 @@ class Builds extends Action
$deployment = $deployment->setAttribute('buildLogs', $currentLogs);
$deployment = $dbForProject->updateDocument('deployments', $deployment->getId(), $deployment);
/**
* Trigger Realtime Event
*/
$queueForRealtime
->setSubscribers(['console'])
->setProject($project)
->setEvent($event)
->setParam($resourceKey, $resource->getId())
->setParam('deploymentId', $deployment->getId())
->setPayload($deployment->getArrayCopy())
->trigger();
}
@ -729,6 +729,10 @@ class Builds extends Action
$deployment = $dbForProject->updateDocument('deployments', $deploymentId, $deployment);
$queueForRealtime
->setPayload($deployment->getArrayCopy())
->trigger();
if ($isVcsEnabled) {
$this->runGitAction('ready', $github, $providerCommitHash, $owner, $repositoryName, $project, $resource, $deployment->getId(), $dbForProject, $dbForPlatform);
}
@ -841,6 +845,10 @@ class Builds extends Action
}
$dbForProject->updateDocument('deployments', $deployment->getId(), $deployment);
$queueForRealtime
->setPayload($deployment->getArrayCopy())
->trigger();
} catch (\Throwable $th) {
Console::warning("Screenshot failed to generate:");
Console::warning($th->getMessage());
@ -873,7 +881,9 @@ class Builds extends Action
$queries[] = Query::equal("deploymentInternalId", [$oldDeploymentInternalId]);
}
$this->listRules($project, $queries, $dbForPlatform, function (Document $rule) use ($dbForPlatform, $deployment) {
$rulesUpdated = false;
$this->listRules($project, $queries, $dbForPlatform, function (Document $rule) use ($dbForPlatform, $deployment, &$rulesUpdated) {
$rulesUpdated = true;
$rule = $rule
->setAttribute('deploymentId', $deployment->getId())
->setAttribute('deploymentInternalId', $deployment->getInternalId());
@ -886,7 +896,6 @@ class Builds extends Action
$resource->setAttribute('deploymentId', $deployment->getId());
$resource->setAttribute('deploymentInternalId', $deployment->getInternalId());
$resource = $dbForProject->updateDocument('sites', $resource->getId(), $resource);
$queries = [
Query::equal("projectInternalId", [$project->getInternalId()]),
Query::equal("type", ["deployment"]),
@ -1001,19 +1010,15 @@ class Builds extends Action
$deployment = $dbForProject->updateDocument('deployments', $deploymentId, $deployment);
$queueForRealtime
->setPayload($deployment->getArrayCopy())
->trigger();
if ($isVcsEnabled) {
$this->runGitAction('failed', $github, $providerCommitHash, $owner, $repositoryName, $project, $resource, $deployment->getId(), $dbForProject, $dbForPlatform);
}
} finally {
/**
* Trigger Realtime Event
*/
$queueForRealtime
->setSubscribers(['console'])
->setProject($project)
->setEvent($event)
->setParam($resourceKey, $resource->getId())
->setParam('deploymentId', $deployment->getId())
->setPayload($deployment->getArrayCopy())
->trigger();

View file

@ -882,29 +882,12 @@ class UsageTest extends Scope
$this->assertEquals(201, $response['headers']['status-code']);
$this->assertNotEmpty($response['body']['$id']);
$response = $this->client->call(
Client::METHOD_POST,
'/functions/' . $functionId . '/deployments',
array_merge([
'content-type' => 'multipart/form-data',
'x-appwrite-project' => $this->getProject()['$id']
], $this->getHeaders()),
[
'entrypoint' => 'index.php',
'code' => $this->packageFunction('php'),
'activate' => true,
]
);
$deploymentId = $response['body']['$id'] ?? '';
$this->assertEquals(202, $response['headers']['status-code']);
$this->assertNotEmpty($response['body']['$id']);
$this->assertEquals(true, (new DatetimeValidator())->isValid($response['body']['$createdAt']));
$this->assertEquals('index.php', $response['body']['entrypoint']);
// Wait for deployment to build.
sleep(self::WAIT + 20);
$deploymentId = $this->setupDeployment($functionId, [
'entrypoint' => 'index.php',
'code' => $this->packageFunction('php'),
'activate' => true,
]);
$this->assertNotEmpty($deploymentId);
$response = $this->client->call(
Client::METHOD_PATCH,

View file

@ -534,10 +534,11 @@ class RealtimeConsoleClientTest extends Scope
'timeout' => 10
]);
$functionId = $response1['body']['$id'] ?? '';
$this->assertEquals(201, $response1['headers']['status-code']);
$functionId = $response1['body']['$id'];
$this->assertNotEmpty($functionId);
$projectId = 'console';
$client = $this->getWebsocket(['console'], [
@ -570,6 +571,9 @@ class RealtimeConsoleClientTest extends Scope
$this->assertEquals(202, $deployment['headers']['status-code']);
$deploymentId = $deployment['body']['$id'];
$this->assertNotEmpty($deploymentId);
$response = json_decode($client->receive(), true);
$this->assertArrayHasKey('type', $response);
@ -580,8 +584,48 @@ class RealtimeConsoleClientTest extends Scope
$this->assertCount(2, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertContains("projects.{$projectId}", $response['data']['channels']);
// $this->assertContains("functions.{$functionId}.deployments.{$deploymentId}.create", $response['data']['events']); TODO @christyjacob4 : enable test once we allow functions.* events
$this->assertNotEmpty($response['data']['payload']);
$this->assertEquals("waiting", $response['data']['payload']['status']);
$this->assertContains("functions.{$functionId}.deployments.{$deploymentId}.create", $response['data']['events']);
$response = json_decode($client->receive(), true);
$this->assertContains("functions.{$functionId}.deployments.{$deploymentId}.update", $response['data']['events']);
$this->assertContains('console', $response['data']['channels']);
$this->assertContains("projects.{$projectId}", $response['data']['channels']);
$this->assertEquals("processing", $response['data']['payload']['status']);
$response = json_decode($client->receive(), true);
$this->assertContains("functions.{$functionId}.deployments.{$deploymentId}.update", $response['data']['events']);
$this->assertContains('console', $response['data']['channels']);
$this->assertContains("projects.{$projectId}", $response['data']['channels']);
$this->assertEquals("building", $response['data']['payload']['status']);
$previousBuildLogs = null;
while (true) {
$response = json_decode($client->receive(), true);
$this->assertContains("functions.{$functionId}.deployments.{$deploymentId}.update", $response['data']['events']);
$this->assertContains('console', $response['data']['channels']);
$this->assertContains("projects.{$projectId}", $response['data']['channels']);
$this->assertArrayHasKey('buildLogs', $response['data']['payload']);
// Ignore comparasion for first payload
if ($previousBuildLogs !== null) {
$this->assertNotEquals($previousBuildLogs, $response['data']['payload']['buildLogs']);
}
$previousBuildLogs = $response['data']['payload']['buildLogs'];
$this->assertThat(
$response['data']['payload']['status'],
$this->logicalOr(
$this->equalTo('building'),
$this->equalTo('ready'),
),
);
if ($response['data']['payload']['status'] === 'ready') {
break;
}
}
$client->close();
}