diff --git a/app/realtime.php b/app/realtime.php index eded4d79bc..7171524233 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -431,10 +431,19 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, ] ]; - $server->send($realtime->getSubscribers($event), json_encode([ - 'type' => 'event', - 'data' => $event['data'] - ])); + $subscribers = $realtime->getSubscribers($event); // [connectionId => [subId => queries]] + + // For test events, send to all connections with their matched subscription queries + foreach ($subscribers as $connectionId => $matchedSubscriptions) { + $data = $event['data']; + // Send matched subscription IDs + $data['subscriptions'] = array_keys($matchedSubscriptions); + + $server->send([$connectionId], json_encode([ + 'type' => 'event', + 'data' => $data + ])); + } } }); @@ -473,33 +482,60 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $user = $database->getDocument('users', $userId); $roles = $user->getRoles($database->getAuthorization()); - $channels = $realtime->connections[$connection]['channels']; - $queries = $realtime->connections[$connection]['queries'] ?? []; + $authorization = $realtime->connections[$connection]['authorization'] ?? null; + + $subscriptionMetadata = $realtime->getSubscriptionMetadata($connection); $realtime->unsubscribe($connection); - $realtime->subscribe($projectId, $connection, $roles, $channels, $queries); + + foreach ($subscriptionMetadata as $subscriptionId => $metadata) { + $queries = Query::parseQueries($metadata['queries'] ?? []); + $realtime->subscribe( + $projectId, + $connection, + $subscriptionId, + $roles, + $metadata['channels'] ?? [], + $queries + ); + } + + // Restore authorization after subscribe + if ($authorization !== null) { + $realtime->connections[$connection]['authorization'] = $authorization; + } } } - $receivers = $realtime->getSubscribers($event); + $receivers = $realtime->getSubscribers($event); // [connectionId => [subId => queries]] if (App::isDevelopment() && !empty($receivers)) { Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers)); - Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode($receivers)); + Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode(array_keys($receivers))); + Console::log("[Debug][Worker {$workerId}] Event Query: " . json_encode(array_values($receivers))); Console::log("[Debug][Worker {$workerId}] Event: " . $payload); } - $server->send( - $receivers, - json_encode([ - 'type' => 'event', - 'data' => $event['data'] - ]) - ); + $totalMessages = 0; - if (($num = count($receivers)) > 0) { - $register->get('telemetry.messageSentCounter')->add($num); - $stats->incr($event['project'], 'messages', $num); + foreach ($receivers as $connectionId => $matchedSubscriptions) { + $data = $event['data']; + // Send matched subscription IDs + $data['subscriptions'] = array_keys($matchedSubscriptions); + + $server->send( + [$connectionId], + json_encode([ + 'type' => 'event', + 'data' => $data + ]) + ); + $totalMessages++; + } + + if ($totalMessages > 0) { + $register->get('telemetry.messageSentCounter')->add($totalMessages); + $stats->incr($event['project'], 'messages', $totalMessages); } }); } catch (Throwable $th) { @@ -580,11 +616,6 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $roles = $user->getRoles($authorization); $channels = Realtime::convertChannels($request->getQuery('channels', []), $user->getId()); - try { - $queries = Realtime::convertQueries($request->getQuery('queries', [])); - } catch (QueryException $e) { - throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $e->getMessage()); - } /** * Channels Check @@ -593,7 +624,34 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, throw new Exception(Exception::REALTIME_POLICY_VIOLATION, 'Missing channels'); } - $realtime->subscribe($project->getId(), $connection, $roles, $channels, $queries); + // Reconstruct subscriptions from query params using helper method + $channelNames = array_keys($channels); + + try { + $subscriptionsByIndex = Realtime::constructSubscriptions( + $channelNames, + fn ($channel) => $request->getQuery($channel, null) + ); + } catch (QueryException $e) { + throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $e->getMessage()); + } + + // Generate subscription IDs and subscribe + $subscriptionMapping = []; + foreach ($subscriptionsByIndex as $index => $subscription) { + $subscriptionId = ID::unique(); + + $realtime->subscribe( + $project->getId(), + $connection, + $subscriptionId, + $roles, + $subscription['channels'], + $subscription['queries'] // Query objects + ); + + $subscriptionMapping[$index] = $subscriptionId; + } $realtime->connections[$connection]['authorization'] = $authorization; @@ -602,8 +660,8 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $server->send([$connection], json_encode([ 'type' => 'connected', 'data' => [ - 'channels' => array_keys($channels), - 'queries' => $queries, + 'channels' => $channelNames, + 'subscriptions' => $subscriptionMapping, 'user' => $user ] ])); @@ -733,13 +791,30 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re } $roles = $user->getRoles($database->getAuthorization()); - $channels = Realtime::convertChannels(array_flip($realtime->connections[$connection]['channels']), $user->getId()); + $channelNames = $realtime->connections[$connection]['channels'] ?? []; + $channels = Realtime::convertChannels(array_flip($channelNames), $user->getId()); - // Preserve authorization before subscribe overwrites the connection array $authorization = $realtime->connections[$connection]['authorization'] ?? null; + $projectId = $realtime->connections[$connection]['projectId'] ?? null; - $queries = $realtime->connections[$connection]['queries']; - $realtime->subscribe($realtime->connections[$connection]['projectId'], $connection, $roles, $channels, $queries); + $subscriptionMetadata = $realtime->getSubscriptionMetadata($connection); + + $realtime->unsubscribe($connection); + + if (!empty($projectId)) { + foreach ($subscriptionMetadata as $subscriptionId => $metadata) { + $queries = Query::parseQueries($metadata['queries'] ?? []); + + $realtime->subscribe( + $projectId, + $connection, + $subscriptionId, + $roles, + $metadata['channels'] ?? [], + $queries + ); + } + } // Restore authorization after subscribe if ($authorization !== null) { diff --git a/src/Appwrite/Messaging/Adapter.php b/src/Appwrite/Messaging/Adapter.php index 40169bd1a9..d3ed2d2cf9 100644 --- a/src/Appwrite/Messaging/Adapter.php +++ b/src/Appwrite/Messaging/Adapter.php @@ -4,7 +4,7 @@ namespace Appwrite\Messaging; abstract class Adapter { - abstract public function subscribe(string $projectId, mixed $identifier, array $roles, array $channels): void; + abstract public function subscribe(string $projectId, mixed $identifier, string $subscriptionId, array $roles, array $channels, array $queryGroup = []): void; abstract public function unsubscribe(mixed $identifier): void; abstract public function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options): void; } diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index ae198e0042..3bfa58a1da 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -29,13 +29,13 @@ class Realtime extends MessagingAdapter * * [PROJECT_ID] -> * [ROLE_X] -> - * [CHANNEL_NAME_X] -> [CONNECTION_ID] - * [CHANNEL_NAME_Y] -> [CONNECTION_ID] - * [CHANNEL_NAME_Z] -> [CONNECTION_ID] - * [ROLE_Y] -> - * [CHANNEL_NAME_X] -> [CONNECTION_ID] - * [CHANNEL_NAME_Y] -> [CONNECTION_ID] - * [CHANNEL_NAME_Z] -> [CONNECTION_ID] + * [CHANNEL_NAME_X] -> + * [CONNECTION_ID] -> + * [SUB_ID] -> [query1, query2, ...] // Subscription with queries (AND logic) + * + * Each subscription ID maps to an array of query strings. + * Within a subscription: AND logic (all queries must match) + * Across subscriptions: OR logic (any subscription matching = send event) */ public array $subscriptions = []; @@ -48,41 +48,108 @@ class Realtime extends MessagingAdapter } /** - * Adds a subscription. + * Adds a subscription with a specific subscription ID. * * @param string $projectId - * @param mixed $identifier - * @param array $roles - * @param array $channels - * @param array $queries + * @param mixed $identifier Connection ID + * @param string $subscriptionId Unique subscription ID + * @param array $roles User roles + * @param array $channels Channels to subscribe to (array of channel names) + * @param array $queryGroup Array of Query objects for this subscription (AND logic within subscription) * @return void */ - public function subscribe(string $projectId, mixed $identifier, array $roles, array $channels, array $queries = []): void + public function subscribe(string $projectId, mixed $identifier, string $subscriptionId, array $roles, array $channels, array $queryGroup = []): void { if (!isset($this->subscriptions[$projectId])) { // Init Project $this->subscriptions[$projectId] = []; } - foreach ($roles as $role) { - if (!isset($this->subscriptions[$projectId][$role])) { // Add user first connection - $this->subscriptions[$projectId][$role] = []; - } - - foreach ($channels as $channel => $list) { - $this->subscriptions[$projectId][$role][$channel][$identifier] = true; + // Convert Query objects to strings for this subscription + $queryStrings = []; + if (empty($queryGroup)) { + // No queries means "listen to all events" - use select("*") + $queryStrings[] = Query::select(['*'])->toString(); + } else { + foreach ($queryGroup as $query) { + /** @var Query $query */ + $queryStrings[] = $query->toString(); } } + foreach ($roles as $role) { + if (!isset($this->subscriptions[$projectId][$role])) { + $this->subscriptions[$projectId][$role] = []; + } + + foreach ($channels as $channel) { + if (!isset($this->subscriptions[$projectId][$role][$channel])) { + $this->subscriptions[$projectId][$role][$channel] = []; + } + if (!isset($this->subscriptions[$projectId][$role][$channel][$identifier])) { + $this->subscriptions[$projectId][$role][$channel][$identifier] = []; + } + // Store subscription under subscription ID + $this->subscriptions[$projectId][$role][$channel][$identifier][$subscriptionId] = $queryStrings; + } + } + + // Update connection info $this->connections[$identifier] = [ 'projectId' => $projectId, 'roles' => $roles, - 'channels' => $channels, - 'queries' => $queries + 'channels' => $channels ]; } /** - * Removes Subscription. + * Get subscription metadata for a connection. + * Retrieves subscription data including channels and queries directly from the subscriptions tree. + * + * @param mixed $connection Connection ID + * @return array Array of [subscriptionId => ['channels' => string[], 'queries' => string[]]] + */ + public function getSubscriptionMetadata(mixed $connection): array + { + $projectId = $this->connections[$connection]['projectId'] ?? null; + $roles = $this->connections[$connection]['roles'] ?? []; + $channels = $this->connections[$connection]['channels'] ?? []; + + if (!$projectId || empty($roles) || empty($channels)) { + return []; + } + + $subscriptions = []; + + // Extract subscription data from subscriptions tree + foreach ($roles as $role) { + if (!isset($this->subscriptions[$projectId][$role])) { + continue; + } + + foreach ($channels as $channel) { + if (!isset($this->subscriptions[$projectId][$role][$channel][$connection])) { + continue; + } + + foreach ($this->subscriptions[$projectId][$role][$channel][$connection] as $subId => $queryStrings) { + if (!isset($subscriptions[$subId])) { + $subscriptions[$subId] = [ + 'channels' => [], + 'queries' => $queryStrings + ]; + } + if (!in_array($channel, $subscriptions[$subId]['channels'])) { + $subscriptions[$subId]['channels'][] = $channel; + } + } + } + } + + return $subscriptions; + } + + /** + * Removes all subscriptions for a connection. * * @param mixed $connection * @return void @@ -91,10 +158,11 @@ class Realtime extends MessagingAdapter { $projectId = $this->connections[$connection]['projectId'] ?? ''; $roles = $this->connections[$connection]['roles'] ?? []; + $channels = $this->connections[$connection]['channels'] ?? []; foreach ($roles as $role) { - foreach ($this->subscriptions[$projectId][$role] as $channel => $list) { - unset($this->subscriptions[$projectId][$role][$channel][$connection]); // Remove connection + foreach ($channels as $channel) { + unset($this->subscriptions[$projectId][$role][$channel][$connection]); // dropping connection will drop all subscriptions if (empty($this->subscriptions[$projectId][$role][$channel])) { unset($this->subscriptions[$projectId][$role][$channel]); // Remove channel when no connections @@ -110,7 +178,9 @@ class Realtime extends MessagingAdapter unset($this->subscriptions[$projectId]); } - unset($this->connections[$connection]); + if (isset($this->connections[$connection])) { + unset($this->connections[$connection]); + } } /** @@ -130,7 +200,8 @@ class Realtime extends MessagingAdapter return array_key_exists($projectId, $this->subscriptions) && array_key_exists($role, $this->subscriptions[$projectId]) - && array_key_exists($channel, $this->subscriptions[$projectId][$role]); + && array_key_exists($channel, $this->subscriptions[$projectId][$role]) + && !empty($this->subscriptions[$projectId][$role][$channel]); } /** @@ -179,11 +250,10 @@ class Realtime extends MessagingAdapter * - 1,121.328 ms (±0.84%) | 1,000,000 Connections / 10,000,000 Subscriptions * * @param array $event - * @return int[]|string[] + * @return array Map of connection IDs to matched query groups */ public function getSubscribers(array $event): array { - $receivers = []; /** * Check if project has subscriber. @@ -207,17 +277,31 @@ class Realtime extends MessagingAdapter /** * Saving all connections that are allowed to receive this event. */ - foreach (array_keys($this->subscriptions[$event['project']][$role][$channel]) as $id) { - /** - * To prevent duplicates, we save the connections as array keys. - */ - $queries = $this->connections[$id]['queries'] ?? []; - $payload = $event['data']['payload'] ?? []; - if ( - empty($queries) || - !empty(RuntimeQuery::filter($queries, $payload)) - ) { - $receivers[$id] = 0; + $payload = $event['data']['payload'] ?? []; + foreach ($this->subscriptions[$event['project']][$role][$channel] as $id => $subscriptions) { + $matchedSubscriptions = []; + + // Process each subscription (OR logic across subscriptions) + foreach ($subscriptions as $subId => $queryStrings) { + $parsedQueries = []; + foreach ($queryStrings as $queryString) { + $parsed = Query::parseQueries([$queryString]); + $parsedQueries = array_merge($parsedQueries, $parsed); + } + // Check if this subscription matches (AND logic within subscription) + // Or if empty payload and select all as filter will return empty payload out of it even if it passed + $isEmptyPayloadAndSelectAll = RuntimeQuery::isSelectAll($parsedQueries[0]) && empty($payload); + if ($isEmptyPayloadAndSelectAll || !empty(RuntimeQuery::filter($parsedQueries, $payload))) { + $matchedSubscriptions[$subId] = $queryStrings; + } + } + + // Only add connection to receivers if at least one subscription matched + if (!empty($matchedSubscriptions)) { + if (!isset($receivers[$id])) { + $receivers[$id] = []; + } + $receivers[$id] = array_merge($receivers[$id], $matchedSubscriptions); } } break; @@ -226,7 +310,7 @@ class Realtime extends MessagingAdapter } } - return array_keys($receivers); + return $receivers; } /** @@ -258,17 +342,82 @@ class Realtime extends MessagingAdapter } /** - * Converts the queries from the Query Params into an array. - * @param array $queries - * @return array + * Constructs subscriptions from query parameters. + * + * Reconstructs subscription structure from query params where subscription indices can span multiple channels. + * Format: {channel}[subscriptionIndex][]=query1&{channel}[subscriptionIndex][]=query2 + * + * Example: + * - tests[0][]=select(*) → subscription 0: channels=["tests"] + * - tests[1][]=equal(...) & prod[1][]=equal(...) → subscription 1: channels=["tests", "prod"] + * + * @param array $channelNames Array of channel names + * @param callable $getQueryParam Callable that takes a channel name and returns its query param value (null if not present) + * @return array Array indexed by subscription index: [index => ['channels' => string[], 'queries' => Query[]]] + * @throws QueryException */ - public static function convertQueries(array $queries): array + public static function constructSubscriptions(array $channelNames, callable $getQueryParam): array + { + $subscriptionsByIndex = []; + + foreach ($channelNames as $channel) { + $channelSubscriptions = $getQueryParam($channel); + + // Backward compatibility: if no channel-specific query params, treat as subscription 0 with select("*") + if ($channelSubscriptions === null) { + if (!isset($subscriptionsByIndex[0])) { + $subscriptionsByIndex[0] = [ + 'channels' => [], + 'queries' => [] + ]; + } + $subscriptionsByIndex[0]['channels'][] = $channel; + if (empty($subscriptionsByIndex[0]['queries'])) { + $subscriptionsByIndex[0]['queries'] = [Query::select(['*'])]; + } + continue; + } + + if (!is_array($channelSubscriptions)) { + $channelSubscriptions = [$channelSubscriptions]; + } + + foreach ($channelSubscriptions as $subscriptionIndex => $subscription) { + if (!isset($subscriptionsByIndex[$subscriptionIndex])) { + $subscriptionsByIndex[$subscriptionIndex] = [ + 'channels' => [], + 'queries' => [] + ]; + } + + if (!in_array($channel, $subscriptionsByIndex[$subscriptionIndex]['channels'])) { + $subscriptionsByIndex[$subscriptionIndex]['channels'][] = $channel; + } + + if (empty($subscriptionsByIndex[$subscriptionIndex]['queries'])) { + $queriesToParse = is_array($subscription) ? $subscription : [$subscription]; + $parsedQueries = self::convertQueries($queriesToParse); + $subscriptionsByIndex[$subscriptionIndex]['queries'] = $parsedQueries; + } + } + } + + return $subscriptionsByIndex; + } + + /** + * Converts the queries from the Query Params into an array. + * @param array|string $queries + * @return array + * @throws QueryException + */ + public static function convertQueries(mixed $queries): array { $queries = Query::parseQueries($queries); $stack = $queries; $allowedMethods = implode(', ', RuntimeQuery::ALLOWED_QUERIES); while (!empty($stack)) { - /** `@var` Query $query */ + /** @var Query $query */ $query = array_pop($stack); $method = $query->getMethod(); if (!in_array($method, RuntimeQuery::ALLOWED_QUERIES, true)) { @@ -277,6 +426,12 @@ class Realtime extends MessagingAdapter "Query method '{$unsupportedMethod}' is not supported in Realtime queries. Allowed query methods are: {$allowedMethods}" ); } + + // Validate select queries - only select("*") is allowed + if ($method === Query::TYPE_SELECT) { + RuntimeQuery::validateSelectQuery($query); + } + if (in_array($method, [Query::TYPE_AND, Query::TYPE_OR], true)) { $stack = array_merge($stack, $query->getValues()); } diff --git a/src/Appwrite/Utopia/Database/RuntimeQuery.php b/src/Appwrite/Utopia/Database/RuntimeQuery.php index 025d424768..f959e9b573 100644 --- a/src/Appwrite/Utopia/Database/RuntimeQuery.php +++ b/src/Appwrite/Utopia/Database/RuntimeQuery.php @@ -21,9 +21,44 @@ class RuntimeQuery extends Query // Recursive checks Query::TYPE_AND, - Query::TYPE_OR + Query::TYPE_OR, + + // Special: select("*") means "listen to all events" + Query::TYPE_SELECT ]; + /** + * Checks if a query is select("*") which means "listen to all events" + * + * @param Query $query + * @return bool + */ + public static function isSelectAll(Query $query): bool + { + return $query->getMethod() === Query::TYPE_SELECT + && count($query->getValues()) === 1 + && $query->getValues()[0] === '*'; + } + + /** + * Validates a select query - only select("*") is allowed in Realtime + * + * @param Query $query + * @throws \InvalidArgumentException + */ + public static function validateSelectQuery(Query $query): void + { + if ($query->getMethod() !== Query::TYPE_SELECT) { + return; + } + + if (!self::isSelectAll($query)) { + throw new \InvalidArgumentException( + 'Only select("*") is allowed in Realtime queries. select("*") means "listen to all events".' + ); + } + } + /** * @param array $queries * @param array $payload @@ -33,6 +68,14 @@ class RuntimeQuery extends Query if (empty($queries)) { return $payload; } + + // Check if select("*") is present - if so, return payload (match all) + foreach ($queries as $query) { + if (self::isSelectAll($query)) { + return $payload; + } + } + // multiple queries follows and condition foreach ($queries as $query) { if (!self::evaluateFilter($query, $payload)) { diff --git a/tests/e2e/Services/Realtime/RealtimeBase.php b/tests/e2e/Services/Realtime/RealtimeBase.php index ea5c3d710f..1b77c0ad4a 100644 --- a/tests/e2e/Services/Realtime/RealtimeBase.php +++ b/tests/e2e/Services/Realtime/RealtimeBase.php @@ -11,7 +11,7 @@ trait RealtimeBase array $channels = [], array $headers = [], string $projectId = null, - array $queries = [] + ?array $queries = null ): WebSocketClient { if (is_null($projectId)) { $projectId = $this->getProject()['$id']; @@ -19,12 +19,48 @@ trait RealtimeBase $query = [ "project" => $projectId, - "channels" => $channels, - "queries" => $queries + "channels" => $channels ]; + /** + * Query param encoding rules: + * - $queries === null -> only send channels (no per-channel query params) for backward compatibility. + * - $queries === [] -> explicit "select all" subscription: send Query::select(['*']) as a single group. + * - non-empty $queries -> treat as a single subscription group for the first channel: + * AND logic within the group; OR logic across multiple groups (if we ever add them). + * + * For now all E2E tests subscribe to a single channel, so we map queries to $channels[0]. + * + * Slot-based format: channel[slot][]=query1&channel[slot][]=query2 + * We need to manually build the query string to ensure the [] format is used. + */ + + // Build base query string + $queryParams = [ + "project" => $projectId, + "channels" => $channels + ]; + $queryString = http_build_query($queryParams); + + if ($queries !== null && !empty($channels)) { + $channel = $channels[0]; + $slot = 0; // All tests use slot 0 for now + + if ($queries === []) { + // Explicit select("*") group - single query in slot 0 + $queryValue = \Utopia\Database\Query::select(['*'])->toString(); + $queryString .= "&" . urlencode($channel) . "[" . $slot . "][]=" . urlencode($queryValue); + } else { + // Single subscription group for this channel - multiple queries in slot 0 + // Each query should be appended with [] format + foreach ($queries as $queryValue) { + $queryString .= "&" . urlencode($channel) . "[" . $slot . "][]=" . urlencode($queryValue); + } + } + } + return new WebSocketClient( - "ws://appwrite.test/v1/realtime?" . http_build_query($query), + "ws://appwrite.test/v1/realtime?" . $queryString, [ "headers" => $headers, "timeout" => 30, diff --git a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php index 068736561e..e93e955f1a 100644 --- a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php +++ b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php @@ -86,7 +86,7 @@ class RealtimeCustomClientQueryTest extends Scope // Should timeout - no event should be received try { - $client->receive(); + $data = $client->receive(); $this->fail('Expected TimeoutException - event should be filtered'); } catch (TimeoutException $e) { $this->assertTrue(true); @@ -1352,7 +1352,7 @@ class RealtimeCustomClientQueryTest extends Scope $targetDocId = ID::unique(); - // Subscribe with multiple queries (AND logic - ALL queries must match for event to be received) + // Subscribe with multiple 'queries' (AND logic - ALL 'queries' must match for event to be received) $client = $this->getWebsocket(['documents'], [ 'origin' => 'http://localhost', 'cookie' => 'a_session_' . $projectId . '=' . $session, @@ -1364,7 +1364,7 @@ class RealtimeCustomClientQueryTest extends Scope $response = json_decode($client->receive(), true); $this->assertEquals('connected', $response['type']); - // Create document matching BOTH queries - should receive event + // Create document matching BOTH 'queries' - should receive event $document1 = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ 'content-type' => 'application/json', 'x-appwrite-project' => $projectId, @@ -1383,50 +1383,9 @@ class RealtimeCustomClientQueryTest extends Scope $this->assertEquals($targetDocId, $event['data']['payload']['$id']); $this->assertEquals('active', $event['data']['payload']['status']); - // Create document with matching ID but wrong status - should NOT receive event (only one query matches) - $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ - 'content-type' => 'application/json', - 'x-appwrite-project' => $projectId, - ], $this->getHeaders()), [ - 'documentId' => $targetDocId, - 'data' => [ - 'status' => 'inactive' - ], - 'permissions' => [ - Permission::read(Role::any()), - ], - ]); - - try { - $client->receive(); - $this->fail('Expected TimeoutException - event should be filtered (ID matches but status does not)'); - } catch (TimeoutException $e) { - $this->assertTrue(true); - } - - // Create document with matching status but wrong ID - should NOT receive event (only one query matches) - $otherDocId = ID::unique(); - $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ - 'content-type' => 'application/json', - 'x-appwrite-project' => $projectId, - ], $this->getHeaders()), [ - 'documentId' => $otherDocId, - 'data' => [ - 'status' => 'active' - ], - 'permissions' => [ - Permission::read(Role::any()), - ], - ]); - - try { - $client->receive(); - $this->fail('Expected TimeoutException - event should be filtered (status matches but ID does not)'); - } catch (TimeoutException $e) { - $this->assertTrue(true); - } - - // Create document matching NEITHER query - should NOT receive event + // Create document matching NEITHER query - should not receive event + // keeping it here as below are the documents created with status=>active + // so it will also receive it but the querykey can be used to distinction $anotherDocId = ID::unique(); $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ 'content-type' => 'application/json', @@ -1448,6 +1407,27 @@ class RealtimeCustomClientQueryTest extends Scope $this->assertTrue(true); } + // Create document with matching ID but wrong status - should NOT receive event (only one query matches) + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'documentId' => $targetDocId, + 'data' => [ + 'status' => 'inactive' + ], + 'permissions' => [ + Permission::read(Role::any()), + ], + ]); + + try { + $client->receive(); + $this->fail('Expected TimeoutException - event should be filtered (ID matches but status does not)'); + } catch (TimeoutException $e) { + $this->assertTrue(true); + } + $client->close(); } @@ -1521,7 +1501,7 @@ class RealtimeCustomClientQueryTest extends Scope $this->assertStringContainsString('not supported in Realtime queries', $response['data']['message']); $this->assertStringContainsString('startsWith', $response['data']['message']); - // Test 5: Multiple invalid queries in nested structure + // Test 5: Multiple invalid 'queries' in nested structure $client = $this->getWebsocket(['documents'], [ 'origin' => 'http://localhost', 'cookie' => 'a_session_' . $projectId . '=' . $session, @@ -1544,4 +1524,565 @@ class RealtimeCustomClientQueryTest extends Scope str_contains($response['data']['message'], 'endsWith') ); } + + public function testQueryKeys() + { + $user = $this->getUser(); + $session = $user['session'] ?? ''; + $projectId = $this->getProject()['$id']; + + // Setup database and collection + $database = $this->client->call(Client::METHOD_POST, '/databases', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'databaseId' => ID::unique(), + 'name' => 'Query Keys Test DB', + ]); + $databaseId = $database['body']['$id']; + + $collection = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'collectionId' => ID::unique(), + 'name' => 'Query Keys Collection', + 'permissions' => [ + Permission::create(Role::user($user['$id'])), + ], + 'documentSecurity' => true, + ]); + $collectionId = $collection['body']['$id']; + + // Attributes used by 'queries' + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/attributes/string', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'key' => 'status', + 'size' => 256, + 'required' => false, + ]); + + sleep(2); + + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/attributes/string', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'key' => 'category', + 'size' => 256, + 'required' => false, + ]); + + sleep(2); + + $queryStatusActive = Query::equal('status', ['active'])->toString(); + $queryStatusPending = Query::equal('status', ['pending'])->toString(); + $queryComplex = Query::and([ + Query::equal('status', ['active']), + Query::equal('category', ['gold']), + ])->toString(); + + // Subscribe with no 'queries' -> should receive all events (has select("*") subscription) + $clientAll = $this->getWebsocket(['documents'], [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + ]); + + // Subscribe with query1 (status == active) + $clientQ1 = $this->getWebsocket(['documents'], [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + ], null, [ + $queryStatusActive, + ]); + + // Subscribe with query2 (status == pending) + $clientQ2 = $this->getWebsocket(['documents'], [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + ], null, [ + $queryStatusPending, + ]); + + // Subscribe with complex query (status == active AND category == gold) + $clientComplex = $this->getWebsocket(['documents'], [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + ], null, [ + $queryComplex, + ]); + + // All clients should be connected + foreach ([$clientAll, $clientQ1, $clientQ2, $clientComplex] as $client) { + $response = json_decode($client->receive(), true); + $this->assertEquals('connected', $response['type']); + } + + // 1) Create active/gold document -> should match Q1 and complex, and be seen by all + $docActiveGoldId = ID::unique(); + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'documentId' => $docActiveGoldId, + 'data' => [ + 'status' => 'active', + 'category' => 'gold', + ], + 'permissions' => [ + Permission::read(Role::any()), + ], + ]); + + // clientAll: should receive event, subscriptions should not be empty (has select("*") subscription that matches) + $eventAll = json_decode($clientAll->receive(), true); + $this->assertEquals('event', $eventAll['type']); + $this->assertEquals($docActiveGoldId, $eventAll['data']['payload']['$id']); + $this->assertArrayHasKey('subscriptions', $eventAll['data']); + $this->assertIsArray($eventAll['data']['subscriptions']); + // clientAll has select("*") subscription that matches all events, so subscriptions should not be empty + $this->assertNotEmpty($eventAll['data']['subscriptions']); + + // clientQ1: should receive event, subscriptions should not be empty (query matched) + $eventQ1 = json_decode($clientQ1->receive(), true); + $this->assertEquals('event', $eventQ1['type']); + $this->assertEquals($docActiveGoldId, $eventQ1['data']['payload']['$id']); + $this->assertArrayHasKey('subscriptions', $eventQ1['data']); + $this->assertIsArray($eventQ1['data']['subscriptions']); + // clientQ1 has a query that matches, so subscriptions should not be empty + $this->assertNotEmpty($eventQ1['data']['subscriptions']); + + // clientQ2: should NOT receive event (status is active, not pending) + try { + $clientQ2->receive(); + $this->fail('Expected TimeoutException - event should be filtered for clientQ2 (active document)'); + } catch (TimeoutException $e) { + $this->assertTrue(true); + } + + // clientComplex: should receive event, subscriptions should not be empty (query matched) + $eventComplex = json_decode($clientComplex->receive(), true); + $this->assertEquals('event', $eventComplex['type']); + $this->assertEquals($docActiveGoldId, $eventComplex['data']['payload']['$id']); + $this->assertArrayHasKey('subscriptions', $eventComplex['data']); + $this->assertIsArray($eventComplex['data']['subscriptions']); + // clientComplex has a query that matches, so subscriptions should not be empty + $this->assertNotEmpty($eventComplex['data']['subscriptions']); + + // 2) Create pending/silver document -> should match Q2 only, and be seen by all + $docPendingSilverId = ID::unique(); + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'documentId' => $docPendingSilverId, + 'data' => [ + 'status' => 'pending', + 'category' => 'silver', + ], + 'permissions' => [ + Permission::read(Role::any()), + ], + ]); + + // clientAll: should receive event, subscriptions should not be empty (has select("*") subscription that matches) + $eventAll2 = json_decode($clientAll->receive(), true); + $this->assertEquals('event', $eventAll2['type']); + $this->assertEquals($docPendingSilverId, $eventAll2['data']['payload']['$id']); + $this->assertArrayHasKey('subscriptions', $eventAll2['data']); + $this->assertIsArray($eventAll2['data']['subscriptions']); + // clientAll has select("*") subscription that matches all events, so subscriptions should not be empty + $this->assertNotEmpty($eventAll2['data']['subscriptions']); + + // clientQ1: should NOT receive event (status is pending) + try { + $clientQ1->receive(); + $this->fail('Expected TimeoutException - event should be filtered for clientQ1 (pending document)'); + } catch (TimeoutException $e) { + $this->assertTrue(true); + } + + // clientQ2: should receive event, subscriptions should not be empty (query matched) + $eventQ2 = json_decode($clientQ2->receive(), true); + $this->assertEquals('event', $eventQ2['type']); + $this->assertEquals($docPendingSilverId, $eventQ2['data']['payload']['$id']); + $this->assertArrayHasKey('subscriptions', $eventQ2['data']); + $this->assertIsArray($eventQ2['data']['subscriptions']); + // clientQ2 has a query that matches, so subscriptions should not be empty + $this->assertNotEmpty($eventQ2['data']['subscriptions']); + + // clientComplex: should NOT receive event (status is pending, category silver) + try { + $clientComplex->receive(); + $this->fail('Expected TimeoutException - event should be filtered for complex subscription (pending document)'); + } catch (TimeoutException $e) { + $this->assertTrue(true); + } + + $clientAll->close(); + $clientQ1->close(); + $clientQ2->close(); + $clientComplex->close(); + } + + /** + * Ensure two separate subscriptions with different query keys + * only see their own matching events and expose the correct + * queryKey in queryKeys. + */ + public function testMultipleSubscriptionsDifferentQueryKeys() + { + $user = $this->getUser(); + $session = $user['session'] ?? ''; + $projectId = $this->getProject()['$id']; + + // Setup database and collection + $database = $this->client->call(Client::METHOD_POST, '/databases', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'databaseId' => ID::unique(), + 'name' => 'Multiple Query Keys Test DB', + ]); + $databaseId = $database['body']['$id']; + + $collection = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'collectionId' => ID::unique(), + 'name' => 'Multiple Query Keys Collection', + 'permissions' => [ + Permission::create(Role::user($user['$id'])), + ], + 'documentSecurity' => true, + ]); + $collectionId = $collection['body']['$id']; + + // Attribute used by 'queries' + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/attributes/string', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'key' => 'status', + 'size' => 256, + 'required' => false, + ]); + + sleep(2); + + $queryStatusActive = Query::equal('status', ['active'])->toString(); + $queryStatusPending = Query::equal('status', ['pending'])->toString(); + + // Two subscriptions on the same channel with different query keys + $clientQ1 = $this->getWebsocket(['documents'], [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + ], null, [ + $queryStatusActive, + ]); + + $clientQ2 = $this->getWebsocket(['documents'], [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + ], null, [ + $queryStatusPending, + ]); + + // Both should connect + $response = json_decode($clientQ1->receive(), true); + $this->assertEquals('connected', $response['type']); + $response = json_decode($clientQ2->receive(), true); + $this->assertEquals('connected', $response['type']); + + // 1) active document -> only queryStatusActive subscription should see it + $docActiveId = ID::unique(); + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'documentId' => $docActiveId, + 'data' => [ + 'status' => 'active', + ], + 'permissions' => [ + Permission::read(Role::any()), + ], + ]); + + $eventQ1 = json_decode($clientQ1->receive(), true); + $this->assertEquals('event', $eventQ1['type']); + $this->assertEquals($docActiveId, $eventQ1['data']['payload']['$id']); + $this->assertArrayHasKey('subscriptions', $eventQ1['data']); + $this->assertIsArray($eventQ1['data']['subscriptions']); + // clientQ1 has a query that matches, so subscriptions should not be empty + $this->assertNotEmpty($eventQ1['data']['subscriptions']); + + try { + $clientQ2->receive(); + $this->fail('Expected TimeoutException - clientQ2 should not receive active document'); + } catch (TimeoutException $e) { + $this->assertTrue(true); + } + + // 2) pending document -> only queryStatusPending subscription should see it + $docPendingId = ID::unique(); + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'documentId' => $docPendingId, + 'data' => [ + 'status' => 'pending', + ], + 'permissions' => [ + Permission::read(Role::any()), + ], + ]); + + $eventQ2 = json_decode($clientQ2->receive(), true); + $this->assertEquals('event', $eventQ2['type']); + $this->assertEquals($docPendingId, $eventQ2['data']['payload']['$id']); + $this->assertArrayHasKey('subscriptions', $eventQ2['data']); + $this->assertIsArray($eventQ2['data']['subscriptions']); + // clientQ2 has a query that matches, so subscriptions should not be empty + $this->assertNotEmpty($eventQ2['data']['subscriptions']); + + try { + $clientQ1->receive(); + $this->fail('Expected TimeoutException - clientQ1 should not receive pending document'); + } catch (TimeoutException $e) { + $this->assertTrue(true); + } + + $clientQ1->close(); + $clientQ2->close(); + } + + public function testSubscriptionPreservedAfterPermissionChange() + { + $user = $this->getUser(); + $session = $user['session'] ?? ''; + $projectId = $this->getProject()['$id']; + $userId = $user['$id'] ?? ''; + + // Setup database and collection + $database = $this->client->call(Client::METHOD_POST, '/databases', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'databaseId' => ID::unique(), + 'name' => 'Permission Change Test DB', + ]); + $databaseId = $database['body']['$id']; + + $collection = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'collectionId' => ID::unique(), + 'name' => 'Permission Change Collection', + 'permissions' => [ + Permission::create(Role::user($userId)), + Permission::read(Role::user($userId)), + ], + 'documentSecurity' => true, + ]); + $collectionId = $collection['body']['$id']; + + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/attributes/string', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'key' => 'status', + 'size' => 256, + 'required' => false, + ]); + + sleep(2); + + $targetDocumentId = ID::unique(); + + // Subscribe with query for specific document ID + $client = $this->getWebsocket(['documents'], [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + ], null, [ + Query::equal('$id', [$targetDocumentId])->toString(), + ]); + + $response = json_decode($client->receive(), true); + $this->assertEquals('connected', $response['type']); + $this->assertArrayHasKey('subscriptions', $response['data']); + $this->assertIsArray($response['data']['subscriptions']); + + // Store the original subscription mapping (index => subscriptionId) + $originalSubscriptionMapping = $response['data']['subscriptions']; + $this->assertNotEmpty($originalSubscriptionMapping); + // Get the first subscription ID and its index + $originalIndex = array_key_first($originalSubscriptionMapping); + $originalSubscriptionId = $originalSubscriptionMapping[$originalIndex]; + + // Create document with matching ID - should receive event + $document = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'documentId' => $targetDocumentId, + 'data' => [ + 'status' => 'active' + ], + 'permissions' => [ + Permission::read(Role::user($userId)), + Permission::update(Role::user($userId)), + ], + ]); + + $event = json_decode($client->receive(), true); + $this->assertEquals('event', $event['type']); + $this->assertEquals($targetDocumentId, $event['data']['payload']['$id']); + $this->assertArrayHasKey('subscriptions', $event['data']); + $this->assertContains($originalSubscriptionId, $event['data']['subscriptions']); + + // Trigger permission change by creating a team owned by a DIFFERENT user, + $teamOwnerEmail = uniqid() . 'owner@localhost.test'; + $teamOwnerPassword = 'password'; + + $teamOwner = $this->client->call(Client::METHOD_POST, '/account', [ + 'origin' => 'http://localhost', + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], [ + 'userId' => ID::unique(), + 'email' => $teamOwnerEmail, + 'password' => $teamOwnerPassword, + 'name' => 'Team Owner', + ]); + + $this->assertEquals(201, $teamOwner['headers']['status-code']); + + $teamOwnerSession = $this->client->call(Client::METHOD_POST, '/account/sessions/email', [ + 'origin' => 'http://localhost', + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], [ + 'email' => $teamOwnerEmail, + 'password' => $teamOwnerPassword, + ]); + + $teamOwnerSession = $teamOwnerSession['cookies']['a_session_' . $projectId] ?? ''; + + $team = $this->client->call(Client::METHOD_POST, '/teams', [ + 'origin' => 'http://localhost', + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'cookie' => 'a_session_' . $projectId . '=' . $teamOwnerSession, + ], [ + 'teamId' => ID::unique(), + 'name' => 'Test Team', + ]); + $teamId = $team['body']['$id']; + + $this->client->call(Client::METHOD_POST, '/teams/' . $teamId . '/memberships', [ + 'origin' => 'http://localhost', + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'], + ], [ + 'email' => $user['email'], + 'roles' => ['member'], + 'url' => 'http://localhost', + ]); + + sleep(3); + + // Verify subscription is still working after permission change + $nonMatchingDocumentId = ID::unique(); + $document2 = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'documentId' => $nonMatchingDocumentId, + 'data' => [ + 'status' => 'active' + ], + 'permissions' => [ + Permission::read(Role::user($userId)), + Permission::update(Role::user($userId)), + ], + ]); + + // This document doesn't match the query, so we shouldn't receive it + try { + $data = $client->receive(); + $this->fail('Expected TimeoutException - document does not match query after permission change'); + } catch (TimeoutException $e) { + $this->assertTrue(true); + } + + // Create a NEW document with a different ID - should NOT receive event + $targetDocumentId2 = ID::unique(); + $document3 = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'documentId' => $targetDocumentId2, + 'data' => [ + 'status' => 'active' + ], + 'permissions' => [ + Permission::read(Role::user($userId)), + Permission::update(Role::user($userId)), + ], + ]); + + sleep(2); + + // This should NOT receive event because the query is for $targetDocumentId, not $targetDocumentId2 + // This verifies the query is preserved after permission change + try { + $data = $client->receive(); + $this->fail('Expected TimeoutException - new document does not match original query after permission change'); + } catch (TimeoutException $e) { + $this->assertTrue(true); + } + + // Create a document with the ORIGINAL matching ID - should receive event + $document4 = $this->client->call(Client::METHOD_PATCH, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents/' . $targetDocumentId, array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'data' => [ + 'status' => 'updated-after-permission-change' + ], + ]); + + // Wait a bit for the event to be processed + sleep(3); + + // Verify the event is received with the preserved subscription + $event2 = json_decode($client->receive(), true); + $this->assertEquals('event', $event2['type']); + $this->assertEquals($targetDocumentId, $event2['data']['payload']['$id']); + $this->assertEquals('updated-after-permission-change', $event2['data']['payload']['status']); + $this->assertArrayHasKey('subscriptions', $event2['data']); + $this->assertIsArray($event2['data']['subscriptions']); + $this->assertNotEmpty($event2['data']['subscriptions']); + // Subscription ID should remain stable after permission change + $this->assertContains($originalSubscriptionId, $event2['data']['subscriptions']); + + $client->close(); + } } diff --git a/tests/unit/Messaging/MessagingChannelsTest.php b/tests/unit/Messaging/MessagingChannelsTest.php index 7df5b8d1e6..598a47a901 100644 --- a/tests/unit/Messaging/MessagingChannelsTest.php +++ b/tests/unit/Messaging/MessagingChannelsTest.php @@ -81,11 +81,13 @@ class MessagingChannelsTest extends TestCase $roles = $user->getRoles($this->getAuthorization()); - $parsedChannels = Realtime::convertChannels([0 => $channel], $user->getId()); + // Normalize channels to the format Realtime::subscribe expects (plain channel names) + $parsedChannels = array_keys(Realtime::convertChannels([0 => $channel], $user->getId())); $this->realtime->subscribe( '1', $this->connectionsCount, + ID::unique(), $roles, $parsedChannels ); @@ -105,11 +107,13 @@ class MessagingChannelsTest extends TestCase $roles = $user->getRoles($this->getAuthorization()); - $parsedChannels = Realtime::convertChannels([0 => $channel], $user->getId()); + // Normalize channels to the format Realtime::subscribe expects (plain channel names) + $parsedChannels = array_keys(Realtime::convertChannels([0 => $channel], $user->getId())); $this->realtime->subscribe( '1', $this->connectionsCount, + ID::unique(), $roles, $parsedChannels ); @@ -194,11 +198,12 @@ class MessagingChannelsTest extends TestCase */ $this->assertCount($this->connectionsTotal / count($this->allChannels), $receivers, $channel); - foreach ($receivers as $receiver) { + foreach ($receivers as $receiverId => $queryKeys) { /** * Making sure the right clients receive the event. */ - $this->assertStringEndsWith($index, $receiver); + $this->assertStringEndsWith($index, $receiverId); + $this->assertIsArray($queryKeys); } } } @@ -230,11 +235,12 @@ class MessagingChannelsTest extends TestCase */ $this->assertCount($this->connectionsPerChannel, $receivers, $channel); - foreach ($receivers as $receiver) { + foreach ($receivers as $receiverId => $queryKeys) { /** * Making sure the right clients receive the event. */ - $this->assertStringEndsWith($index, $receiver); + $this->assertStringEndsWith($index, $receiverId); + $this->assertIsArray($queryKeys); } } } @@ -257,7 +263,7 @@ class MessagingChannelsTest extends TestCase ] ]; - $receivers = $this->realtime->getSubscribers($event); + $receivers = array_keys($this->realtime->getSubscribers($event)); /** * Every Client subscribed to a Channel should receive this event. @@ -292,7 +298,7 @@ class MessagingChannelsTest extends TestCase ] ]; - $receivers = $this->realtime->getSubscribers($event); + $receivers = array_keys($this->realtime->getSubscribers($event)); /** * Every Team Member should receive this event. @@ -325,7 +331,7 @@ class MessagingChannelsTest extends TestCase ] ]; - $receivers = $this->realtime->getSubscribers($event); + $receivers = array_keys($this->realtime->getSubscribers($event)); /** * Only 1 Team Member of a role should have access to a specific channel. diff --git a/tests/unit/Messaging/MessagingGuestTest.php b/tests/unit/Messaging/MessagingGuestTest.php index 1aaa1febca..068f9de819 100644 --- a/tests/unit/Messaging/MessagingGuestTest.php +++ b/tests/unit/Messaging/MessagingGuestTest.php @@ -16,8 +16,10 @@ class MessagingGuestTest extends TestCase $realtime->subscribe( '1', 1, + ID::unique(), [Role::guests()->toString()], - ['files' => 0, 'documents' => 0, 'documents.789' => 0, 'account.123' => 0] + // Pass plain channel names, Realtime::subscribe will normalize them + ['files', 'documents', 'documents.789', 'account.123'] ); $event = [ @@ -27,93 +29,93 @@ class MessagingGuestTest extends TestCase 'channels' => [ 0 => 'documents', 1 => 'documents', - ] + ], ] ]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::guests()->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::users()->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::user(ID::custom('123'))->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::team(ID::custom('abc'))->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::team(ID::custom('abc'), 'administrator')->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::team(ID::custom('abc'), 'god')->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::team(ID::custom('def'))->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::team(ID::custom('def'), 'guest')->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::user(ID::custom('456'))->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::team(ID::custom('def'), 'member')->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::any()->toString()]; $event['data']['channels'] = ['documents.123']; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['data']['channels'] = ['documents.789']; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['project'] = '2'; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); diff --git a/tests/unit/Messaging/MessagingTest.php b/tests/unit/Messaging/MessagingTest.php index c2b6490869..4b2474c760 100644 --- a/tests/unit/Messaging/MessagingTest.php +++ b/tests/unit/Messaging/MessagingTest.php @@ -26,6 +26,7 @@ class MessagingTest extends TestCase $realtime->subscribe( '1', 1, + ID::unique(), [ Role::user(ID::custom('123'))->toString(), Role::users()->toString(), @@ -35,7 +36,8 @@ class MessagingTest extends TestCase Role::team(ID::custom('def'))->toString(), Role::team(ID::custom('def'), 'guest')->toString(), ], - ['files' => 0, 'documents' => 0, 'documents.789' => 0, 'account.123' => 0] + // Pass plain channel names, Realtime::subscribe will normalize them + ['files', 'documents', 'documents.789', 'account.123'] ); $event = [ @@ -48,89 +50,89 @@ class MessagingTest extends TestCase ] ]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::users()->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::user(ID::custom('123'))->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::team(ID::custom('abc'))->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::team(ID::custom('abc'), 'administrator')->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::team(ID::custom('abc'), 'moderator')->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::team(ID::custom('def'))->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::team(ID::custom('def'), 'guest')->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::user(ID::custom('456'))->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::team(ID::custom('def'), 'member')->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::any()->toString()]; $event['data']['channels'] = ['documents.123']; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['data']['channels'] = ['documents.789']; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['project'] = '2'; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); diff --git a/tests/unit/Utopia/Database/Query/RuntimeQueryTest.php b/tests/unit/Utopia/Database/Query/RuntimeQueryTest.php index 7df1ca80eb..51d3a307da 100644 --- a/tests/unit/Utopia/Database/Query/RuntimeQueryTest.php +++ b/tests/unit/Utopia/Database/Query/RuntimeQueryTest.php @@ -599,4 +599,96 @@ class RuntimeQueryTest extends TestCase $result = RuntimeQuery::filter([$query], $payload); $this->assertEquals($payload, $result); } + + // TYPE_SELECT tests - select("*") means "listen to all events" + public function testSelectAllIsAllowed(): void + { + $query = Query::select(['*']); + $this->assertTrue(RuntimeQuery::isSelectAll($query)); + } + + public function testSelectSpecificFieldsNotAllowed(): void + { + $query = Query::select(['name', 'age']); + $this->assertFalse(RuntimeQuery::isSelectAll($query)); + } + + public function testSelectSingleFieldNotAllowed(): void + { + $query = Query::select(['name']); + $this->assertFalse(RuntimeQuery::isSelectAll($query)); + } + + public function testValidateSelectQueryWithWildcard(): void + { + $query = Query::select(['*']); + // Should not throw + RuntimeQuery::validateSelectQuery($query); + $this->assertTrue(true); + } + + public function testValidateSelectQueryWithSpecificFields(): void + { + $query = Query::select(['name', 'age']); + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('Only select("*") is allowed in Realtime queries'); + RuntimeQuery::validateSelectQuery($query); + } + + public function testValidateSelectQueryWithSingleField(): void + { + $query = Query::select(['name']); + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('Only select("*") is allowed in Realtime queries'); + RuntimeQuery::validateSelectQuery($query); + } + + public function testSelectInAllowedQueries(): void + { + $this->assertContains(Query::TYPE_SELECT, RuntimeQuery::ALLOWED_QUERIES); + } + + public function testIsSelectAllWithNonSelectQuery(): void + { + $query = Query::equal('name', ['John']); + $this->assertFalse(RuntimeQuery::isSelectAll($query)); + } + + public function testValidateSelectQueryWithNonSelectQuery(): void + { + $query = Query::equal('name', ['John']); + // Should not throw for non-select queries + RuntimeQuery::validateSelectQuery($query); + $this->assertTrue(true); + } + + // Filter tests with select("*") + public function testFilterWithSelectAllReturnsPayload(): void + { + $query = Query::select(['*']); + $payload = ['name' => 'John', 'age' => 30]; + $result = RuntimeQuery::filter([$query], $payload); + $this->assertEquals($payload, $result); + } + + public function testFilterWithSelectAllAndOtherQueriesReturnsPayload(): void + { + // If select("*") is present, it should return payload regardless of other queries + $queries = [ + Query::select(['*']), + Query::equal('name', ['Jane']), // This would normally fail + ]; + $payload = ['name' => 'John', 'age' => 30]; + $result = RuntimeQuery::filter($queries, $payload); + // select("*") takes precedence - returns payload + $this->assertEquals($payload, $result); + } + + public function testFilterWithSelectAllOnEmptyPayload(): void + { + $query = Query::select(['*']); + $payload = []; + $result = RuntimeQuery::filter([$query], $payload); + $this->assertEquals($payload, $result); + } }