Merge pull request #8735 from appwrite/feat-realtime-projects-channel

feat(realtime): projects channels
This commit is contained in:
Torsten Dittmann 2024-09-30 17:37:24 +02:00 committed by GitHub
commit 80449c5c10
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 85 additions and 46 deletions

View file

@ -266,6 +266,7 @@ class Realtime extends Adapter
break;
case 'rules':
$channels[] = 'console';
$channels[] = 'projects.' . $project->getId();
$projectId = 'console';
$roles = [Role::team($project->getAttribute('teamId'))->toString()];
break;
@ -284,6 +285,7 @@ class Realtime extends Adapter
case 'databases':
if (in_array($parts[4] ?? [], ['attributes', 'indexes'])) {
$channels[] = 'console';
$channels[] = 'projects.' . $project->getId();
$projectId = 'console';
$roles = [Role::team($project->getAttribute('teamId'))->toString()];
} elseif (($parts[4] ?? '') === 'documents') {
@ -323,6 +325,7 @@ class Realtime extends Adapter
if ($parts[2] === 'executions') {
if (!empty($payload->getRead())) {
$channels[] = 'console';
$channels[] = 'projects.' . $project->getId();
$channels[] = 'executions';
$channels[] = 'executions.' . $payload->getId();
$channels[] = 'functions.' . $payload->getAttribute('functionId');
@ -330,6 +333,7 @@ class Realtime extends Adapter
}
} elseif ($parts[2] === 'deployments') {
$channels[] = 'console';
$channels[] = 'projects.' . $project->getId();
$projectId = 'console';
$roles = [Role::team($project->getAttribute('teamId'))->toString()];
}
@ -337,6 +341,7 @@ class Realtime extends Adapter
break;
case 'migrations':
$channels[] = 'console';
$channels[] = 'projects.' . $project->getId();
$projectId = 'console';
$roles = [Role::team($project->getAttribute('teamId'))->toString()];
break;

View file

@ -587,7 +587,8 @@ class Functions extends Action
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $allEvents[0],
payload: $execution
payload: $execution,
project: $project
);
Realtime::send(
projectId: 'console',

View file

@ -17,7 +17,6 @@ use Utopia\Database\Exception\Restricted;
use Utopia\Database\Exception\Structure;
use Utopia\Database\Helpers\ID;
use Utopia\Logger\Log;
use Utopia\Logger\Log\Breadcrumb;
use Utopia\Migration\Destination;
use Utopia\Migration\Destinations\Appwrite as DestinationAppwrite;
use Utopia\Migration\Exception as MigrationException;
@ -330,7 +329,7 @@ class Migrations extends Action
foreach ($sourceErrors as $error) {
/** @var $sourceErrors $error */
$message = "Error occurred while fetching '{$error->getResourceName()}:{$error->getResourceId()}' from source with message: '{$error->getMessage()}'";
if($error->getPrevious()){
if ($error->getPrevious()) {
$message .= " Message: ".$error->getPrevious()->getMessage() . " File: ".$error->getPrevious()->getFile() . " Line: ".$error->getPrevious()->getLine();
}
@ -339,7 +338,7 @@ class Migrations extends Action
foreach ($destinationErrors as $error) {
$message = "Error occurred while pushing '{$error->getResourceName()}:{$error->getResourceId()}' to destination with message: '{$error->getMessage()}'";
if($error->getPrevious()){
if ($error->getPrevious()) {
$message .= " Message: ".$error->getPrevious()->getMessage() . " File: ".$error->getPrevious()->getFile() . " Line: ".$error->getPrevious()->getLine();
}

View file

@ -300,7 +300,7 @@ class Swagger2 extends Format
break;
}
if($class === 'Utopia\Validator\AnyOf') {
if ($class === 'Utopia\Validator\AnyOf') {
$validator = $param['validator']->getValidators()[0];
$class = \get_class($validator);
}

View file

@ -7,25 +7,34 @@ use WebSocket\ConnectionException;
trait RealtimeBase
{
private function getWebsocket($channels = [], $headers = [], $projectId = null): WebSocketClient
{
private function getWebsocket(
array $channels = [],
array $headers = [],
string $projectId = null
): WebSocketClient {
if (is_null($projectId)) {
$projectId = $this->getProject()['$id'];
}
$headers = array_merge([
'Origin' => 'appwrite.test'
], $headers);
$headers = array_merge(
[
"Origin" => "appwrite.test",
],
$headers
);
$query = [
'project' => $projectId,
'channels' => $channels
"project" => $projectId,
"channels" => $channels,
];
return new WebSocketClient('ws://appwrite-traefik/v1/realtime?' . http_build_query($query), [
'headers' => $headers,
'timeout' => 30,
]);
return new WebSocketClient(
"ws://appwrite-traefik/v1/realtime?" . http_build_query($query),
[
"headers" => $headers,
"timeout" => 30,
]
);
}
public function testConnection(): void
@ -33,7 +42,7 @@ trait RealtimeBase
/**
* Test for SUCCESS
*/
$client = $this->getWebsocket(['documents']);
$client = $this->getWebsocket(["documents"]);
$this->assertNotEmpty($client->receive());
$client->close();
}
@ -43,11 +52,11 @@ trait RealtimeBase
$client = $this->getWebsocket();
$payload = json_decode($client->receive(), true);
$this->assertArrayHasKey('type', $payload);
$this->assertArrayHasKey('data', $payload);
$this->assertEquals('error', $payload['type']);
$this->assertEquals(1008, $payload['data']['code']);
$this->assertEquals('Missing channels', $payload['data']['message']);
$this->assertArrayHasKey("type", $payload);
$this->assertArrayHasKey("data", $payload);
$this->assertEquals("error", $payload["type"]);
$this->assertEquals(1008, $payload["data"]["code"]);
$this->assertEquals("Missing channels", $payload["data"]["message"]);
\usleep(250000); // 250ms
$this->expectException(ConnectionException::class); // Check if server disconnnected client
$client->close();
@ -55,18 +64,24 @@ trait RealtimeBase
public function testConnectionFailureUnknownProject(): void
{
$client = new WebSocketClient('ws://appwrite-traefik/v1/realtime?project=123', [
'headers' => [
'Origin' => 'appwrite.test'
$client = new WebSocketClient(
"ws://appwrite-traefik/v1/realtime?project=123",
[
"headers" => [
"Origin" => "appwrite.test",
],
]
]);
);
$payload = json_decode($client->receive(), true);
$this->assertArrayHasKey('type', $payload);
$this->assertArrayHasKey('data', $payload);
$this->assertEquals('error', $payload['type']);
$this->assertEquals(1008, $payload['data']['code']);
$this->assertEquals('Missing or unknown project ID', $payload['data']['message']);
$this->assertArrayHasKey("type", $payload);
$this->assertArrayHasKey("data", $payload);
$this->assertEquals("error", $payload["type"]);
$this->assertEquals(1008, $payload["data"]["code"]);
$this->assertEquals(
"Missing or unknown project ID",
$payload["data"]["message"]
);
\usleep(250000); // 250ms
$this->expectException(ConnectionException::class); // Check if server disconnnected client
$client->close();

View file

@ -19,7 +19,7 @@ class RealtimeConsoleClientTest extends Scope
use ProjectCustom;
use SideConsole;
public function testManualAuthentication()
public function testManualAuthentication(): void
{
$user = $this->getUser();
$userId = $user['$id'] ?? '';
@ -124,7 +124,7 @@ class RealtimeConsoleClientTest extends Scope
$client->close();
}
public function testAttributes()
public function testAttributes(): array
{
$user = $this->getUser();
$projectId = 'console';
@ -184,6 +184,7 @@ class RealtimeConsoleClientTest extends Scope
'required' => true,
]);
$projectId = $this->getProject()['$id'];
$attributeKey = $name['body']['key'];
$this->assertEquals($name['headers']['status-code'], 202);
@ -198,8 +199,9 @@ class RealtimeConsoleClientTest extends Scope
$this->assertEquals('event', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertArrayHasKey('timestamp', $response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertCount(2, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertContains("projects.{$projectId}", $response['data']['channels']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}.attributes.*.create", $response['data']['events']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}.attributes.*", $response['data']['events']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}", $response['data']['events']);
@ -218,8 +220,9 @@ class RealtimeConsoleClientTest extends Scope
$this->assertEquals('event', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertArrayHasKey('timestamp', $response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertCount(2, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertContains("projects.{$projectId}", $response['data']['channels']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}.attributes.*.update", $response['data']['events']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}.attributes.*", $response['data']['events']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}", $response['data']['events']);
@ -276,6 +279,8 @@ class RealtimeConsoleClientTest extends Scope
]);
$this->assertEquals($index['headers']['status-code'], 202);
$projectId = $this->getProject()['$id'];
$indexKey = $index['body']['key'];
$response = json_decode($client->receive(), true);
@ -285,8 +290,9 @@ class RealtimeConsoleClientTest extends Scope
$this->assertEquals('event', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertArrayHasKey('timestamp', $response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertCount(2, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertContains("projects.{$projectId}", $response['data']['channels']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}.indexes.*.create", $response['data']['events']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}.indexes.*", $response['data']['events']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}", $response['data']['events']);
@ -303,8 +309,9 @@ class RealtimeConsoleClientTest extends Scope
$this->assertEquals('event', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertArrayHasKey('timestamp', $response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertCount(2, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertContains("projects.{$projectId}", $response['data']['channels']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}.indexes.*.update", $response['data']['events']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}.indexes.*", $response['data']['events']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}", $response['data']['events']);
@ -343,6 +350,8 @@ class RealtimeConsoleClientTest extends Scope
$this->assertContains('console', $response['data']['channels']);
$this->assertNotEmpty($response['data']['user']);
$projectId = $this->getProject()['$id'];
/**
* Test Delete Index
*/
@ -353,6 +362,7 @@ class RealtimeConsoleClientTest extends Scope
], $this->getHeaders()));
$this->assertEquals($attribute['headers']['status-code'], 204);
$response = json_decode($client->receive(), true);
$this->assertArrayHasKey('type', $response);
@ -360,8 +370,9 @@ class RealtimeConsoleClientTest extends Scope
$this->assertEquals('event', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertArrayHasKey('timestamp', $response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertCount(2, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertContains("projects.{$projectId}", $response['data']['channels']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}.indexes.*.update", $response['data']['events']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}.indexes.*", $response['data']['events']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}", $response['data']['events']);
@ -377,8 +388,9 @@ class RealtimeConsoleClientTest extends Scope
$this->assertEquals('event', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertArrayHasKey('timestamp', $response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertCount(2, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertContains("projects.{$projectId}", $response['data']['channels']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}.indexes.*.delete", $response['data']['events']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}.indexes.*", $response['data']['events']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}", $response['data']['events']);
@ -416,10 +428,12 @@ class RealtimeConsoleClientTest extends Scope
$this->assertContains('console', $response['data']['channels']);
$this->assertNotEmpty($response['data']['user']);
$attributeKey = 'name';
$projectId = $this->getProject()['$id'];
/**
* Test Delete Attribute
*/
$attributeKey = 'name';
$attribute = $this->client->call(Client::METHOD_DELETE, '/databases/' . $databaseId . '/collections/' . $data['actorsId'] . '/attributes/' . $attributeKey, array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $this->getProject()['$id'],
@ -433,8 +447,9 @@ class RealtimeConsoleClientTest extends Scope
$this->assertEquals('event', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertArrayHasKey('timestamp', $response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertCount(2, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertContains("projects.{$projectId}", $response['data']['channels']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}.attributes.*.update", $response['data']['events']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}.attributes.*", $response['data']['events']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}", $response['data']['events']);
@ -450,8 +465,9 @@ class RealtimeConsoleClientTest extends Scope
$this->assertEquals('event', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertArrayHasKey('timestamp', $response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertCount(2, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertContains("projects.{$projectId}", $response['data']['channels']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}.attributes.*.delete", $response['data']['events']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}.attributes.*", $response['data']['events']);
$this->assertContains("databases.{$databaseId}.collections.{$actorsId}", $response['data']['events']);
@ -505,10 +521,10 @@ class RealtimeConsoleClientTest extends Scope
/**
* Test Create Deployment
*/
$folder = 'php';
$code = realpath(__DIR__ . '/../../../resources/functions') . "/$folder/code.tar.gz";
$this->packageCode($folder);
$projectId = $this->getProject()['$id'];
$deployment = $this->client->call(Client::METHOD_POST, '/functions/' . $functionId . '/deployments', array_merge([
'content-type' => 'multipart/form-data',
@ -530,8 +546,9 @@ class RealtimeConsoleClientTest extends Scope
$this->assertEquals('event', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertArrayHasKey('timestamp', $response['data']);
$this->assertCount(1, $response['data']['channels']);
$this->assertCount(2, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertContains("projects.{$projectId}", $response['data']['channels']);
// $this->assertContains("functions.{$functionId}.deployments.{$deploymentId}.create", $response['data']['events']); TODO @christyjacob4 : enable test once we allow functions.* events
$this->assertNotEmpty($response['data']['payload']);

View file

@ -1340,8 +1340,9 @@ class RealtimeCustomClientTest extends Scope
$this->assertEquals('event', $response['type']);
$this->assertNotEmpty($response['data']);
$this->assertArrayHasKey('timestamp', $response['data']);
$this->assertCount(4, $response['data']['channels']);
$this->assertCount(5, $response['data']['channels']);
$this->assertContains('console', $response['data']['channels']);
$this->assertContains("projects.{$this->getProject()['$id']}", $response['data']['channels']);
$this->assertContains('executions', $response['data']['channels']);
$this->assertContains("executions.{$executionId}", $response['data']['channels']);
$this->assertContains("functions.{$functionId}", $response['data']['channels']);
@ -1362,8 +1363,9 @@ class RealtimeCustomClientTest extends Scope
$this->assertEquals('event', $responseUpdate['type']);
$this->assertNotEmpty($responseUpdate['data']);
$this->assertArrayHasKey('timestamp', $responseUpdate['data']);
$this->assertCount(4, $responseUpdate['data']['channels']);
$this->assertCount(5, $responseUpdate['data']['channels']);
$this->assertContains('console', $responseUpdate['data']['channels']);
$this->assertContains("projects.{$this->getProject()['$id']}", $response['data']['channels']);
$this->assertContains('executions', $responseUpdate['data']['channels']);
$this->assertContains("executions.{$executionId}", $responseUpdate['data']['channels']);
$this->assertContains("functions.{$functionId}", $responseUpdate['data']['channels']);