Merge pull request #11202 from appwrite/realtime-query-subscriptions

This commit is contained in:
Jake Barnby 2026-02-03 11:44:13 +00:00 committed by GitHub
commit 952f209a44
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
10 changed files with 1122 additions and 170 deletions

View file

@ -431,10 +431,19 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
]
];
$server->send($realtime->getSubscribers($event), json_encode([
'type' => 'event',
'data' => $event['data']
]));
$subscribers = $realtime->getSubscribers($event); // [connectionId => [subId => queries]]
// For test events, send to all connections with their matched subscription queries
foreach ($subscribers as $connectionId => $matchedSubscriptions) {
$data = $event['data'];
// Send matched subscription IDs
$data['subscriptions'] = array_keys($matchedSubscriptions);
$server->send([$connectionId], json_encode([
'type' => 'event',
'data' => $data
]));
}
}
});
@ -473,33 +482,60 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
$user = $database->getDocument('users', $userId);
$roles = $user->getRoles($database->getAuthorization());
$channels = $realtime->connections[$connection]['channels'];
$queries = $realtime->connections[$connection]['queries'] ?? [];
$authorization = $realtime->connections[$connection]['authorization'] ?? null;
$subscriptionMetadata = $realtime->getSubscriptionMetadata($connection);
$realtime->unsubscribe($connection);
$realtime->subscribe($projectId, $connection, $roles, $channels, $queries);
foreach ($subscriptionMetadata as $subscriptionId => $metadata) {
$queries = Query::parseQueries($metadata['queries'] ?? []);
$realtime->subscribe(
$projectId,
$connection,
$subscriptionId,
$roles,
$metadata['channels'] ?? [],
$queries
);
}
// Restore authorization after subscribe
if ($authorization !== null) {
$realtime->connections[$connection]['authorization'] = $authorization;
}
}
}
$receivers = $realtime->getSubscribers($event);
$receivers = $realtime->getSubscribers($event); // [connectionId => [subId => queries]]
if (App::isDevelopment() && !empty($receivers)) {
Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers));
Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode($receivers));
Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode(array_keys($receivers)));
Console::log("[Debug][Worker {$workerId}] Event Query: " . json_encode(array_values($receivers)));
Console::log("[Debug][Worker {$workerId}] Event: " . $payload);
}
$server->send(
$receivers,
json_encode([
'type' => 'event',
'data' => $event['data']
])
);
$totalMessages = 0;
if (($num = count($receivers)) > 0) {
$register->get('telemetry.messageSentCounter')->add($num);
$stats->incr($event['project'], 'messages', $num);
foreach ($receivers as $connectionId => $matchedSubscriptions) {
$data = $event['data'];
// Send matched subscription IDs
$data['subscriptions'] = array_keys($matchedSubscriptions);
$server->send(
[$connectionId],
json_encode([
'type' => 'event',
'data' => $data
])
);
$totalMessages++;
}
if ($totalMessages > 0) {
$register->get('telemetry.messageSentCounter')->add($totalMessages);
$stats->incr($event['project'], 'messages', $totalMessages);
}
});
} catch (Throwable $th) {
@ -580,11 +616,6 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
$roles = $user->getRoles($authorization);
$channels = Realtime::convertChannels($request->getQuery('channels', []), $user->getId());
try {
$queries = Realtime::convertQueries($request->getQuery('queries', []));
} catch (QueryException $e) {
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $e->getMessage());
}
/**
* Channels Check
@ -593,7 +624,34 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, 'Missing channels');
}
$realtime->subscribe($project->getId(), $connection, $roles, $channels, $queries);
// Reconstruct subscriptions from query params using helper method
$channelNames = array_keys($channels);
try {
$subscriptionsByIndex = Realtime::constructSubscriptions(
$channelNames,
fn ($channel) => $request->getQuery($channel, null)
);
} catch (QueryException $e) {
throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $e->getMessage());
}
// Generate subscription IDs and subscribe
$subscriptionMapping = [];
foreach ($subscriptionsByIndex as $index => $subscription) {
$subscriptionId = ID::unique();
$realtime->subscribe(
$project->getId(),
$connection,
$subscriptionId,
$roles,
$subscription['channels'],
$subscription['queries'] // Query objects
);
$subscriptionMapping[$index] = $subscriptionId;
}
$realtime->connections[$connection]['authorization'] = $authorization;
@ -602,8 +660,8 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
$server->send([$connection], json_encode([
'type' => 'connected',
'data' => [
'channels' => array_keys($channels),
'queries' => $queries,
'channels' => $channelNames,
'subscriptions' => $subscriptionMapping,
'user' => $user
]
]));
@ -733,13 +791,30 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
}
$roles = $user->getRoles($database->getAuthorization());
$channels = Realtime::convertChannels(array_flip($realtime->connections[$connection]['channels']), $user->getId());
$channelNames = $realtime->connections[$connection]['channels'] ?? [];
$channels = Realtime::convertChannels(array_flip($channelNames), $user->getId());
// Preserve authorization before subscribe overwrites the connection array
$authorization = $realtime->connections[$connection]['authorization'] ?? null;
$projectId = $realtime->connections[$connection]['projectId'] ?? null;
$queries = $realtime->connections[$connection]['queries'];
$realtime->subscribe($realtime->connections[$connection]['projectId'], $connection, $roles, $channels, $queries);
$subscriptionMetadata = $realtime->getSubscriptionMetadata($connection);
$realtime->unsubscribe($connection);
if (!empty($projectId)) {
foreach ($subscriptionMetadata as $subscriptionId => $metadata) {
$queries = Query::parseQueries($metadata['queries'] ?? []);
$realtime->subscribe(
$projectId,
$connection,
$subscriptionId,
$roles,
$metadata['channels'] ?? [],
$queries
);
}
}
// Restore authorization after subscribe
if ($authorization !== null) {

View file

@ -4,7 +4,7 @@ namespace Appwrite\Messaging;
abstract class Adapter
{
abstract public function subscribe(string $projectId, mixed $identifier, array $roles, array $channels): void;
abstract public function subscribe(string $projectId, mixed $identifier, string $subscriptionId, array $roles, array $channels, array $queryGroup = []): void;
abstract public function unsubscribe(mixed $identifier): void;
abstract public function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options): void;
}

View file

@ -29,13 +29,13 @@ class Realtime extends MessagingAdapter
*
* [PROJECT_ID] ->
* [ROLE_X] ->
* [CHANNEL_NAME_X] -> [CONNECTION_ID]
* [CHANNEL_NAME_Y] -> [CONNECTION_ID]
* [CHANNEL_NAME_Z] -> [CONNECTION_ID]
* [ROLE_Y] ->
* [CHANNEL_NAME_X] -> [CONNECTION_ID]
* [CHANNEL_NAME_Y] -> [CONNECTION_ID]
* [CHANNEL_NAME_Z] -> [CONNECTION_ID]
* [CHANNEL_NAME_X] ->
* [CONNECTION_ID] ->
* [SUB_ID] -> [query1, query2, ...] // Subscription with queries (AND logic)
*
* Each subscription ID maps to an array of query strings.
* Within a subscription: AND logic (all queries must match)
* Across subscriptions: OR logic (any subscription matching = send event)
*/
public array $subscriptions = [];
@ -48,41 +48,108 @@ class Realtime extends MessagingAdapter
}
/**
* Adds a subscription.
* Adds a subscription with a specific subscription ID.
*
* @param string $projectId
* @param mixed $identifier
* @param array $roles
* @param array $channels
* @param array $queries
* @param mixed $identifier Connection ID
* @param string $subscriptionId Unique subscription ID
* @param array $roles User roles
* @param array $channels Channels to subscribe to (array of channel names)
* @param array $queryGroup Array of Query objects for this subscription (AND logic within subscription)
* @return void
*/
public function subscribe(string $projectId, mixed $identifier, array $roles, array $channels, array $queries = []): void
public function subscribe(string $projectId, mixed $identifier, string $subscriptionId, array $roles, array $channels, array $queryGroup = []): void
{
if (!isset($this->subscriptions[$projectId])) { // Init Project
$this->subscriptions[$projectId] = [];
}
foreach ($roles as $role) {
if (!isset($this->subscriptions[$projectId][$role])) { // Add user first connection
$this->subscriptions[$projectId][$role] = [];
}
foreach ($channels as $channel => $list) {
$this->subscriptions[$projectId][$role][$channel][$identifier] = true;
// Convert Query objects to strings for this subscription
$queryStrings = [];
if (empty($queryGroup)) {
// No queries means "listen to all events" - use select("*")
$queryStrings[] = Query::select(['*'])->toString();
} else {
foreach ($queryGroup as $query) {
/** @var Query $query */
$queryStrings[] = $query->toString();
}
}
foreach ($roles as $role) {
if (!isset($this->subscriptions[$projectId][$role])) {
$this->subscriptions[$projectId][$role] = [];
}
foreach ($channels as $channel) {
if (!isset($this->subscriptions[$projectId][$role][$channel])) {
$this->subscriptions[$projectId][$role][$channel] = [];
}
if (!isset($this->subscriptions[$projectId][$role][$channel][$identifier])) {
$this->subscriptions[$projectId][$role][$channel][$identifier] = [];
}
// Store subscription under subscription ID
$this->subscriptions[$projectId][$role][$channel][$identifier][$subscriptionId] = $queryStrings;
}
}
// Update connection info
$this->connections[$identifier] = [
'projectId' => $projectId,
'roles' => $roles,
'channels' => $channels,
'queries' => $queries
'channels' => $channels
];
}
/**
* Removes Subscription.
* Get subscription metadata for a connection.
* Retrieves subscription data including channels and queries directly from the subscriptions tree.
*
* @param mixed $connection Connection ID
* @return array Array of [subscriptionId => ['channels' => string[], 'queries' => string[]]]
*/
public function getSubscriptionMetadata(mixed $connection): array
{
$projectId = $this->connections[$connection]['projectId'] ?? null;
$roles = $this->connections[$connection]['roles'] ?? [];
$channels = $this->connections[$connection]['channels'] ?? [];
if (!$projectId || empty($roles) || empty($channels)) {
return [];
}
$subscriptions = [];
// Extract subscription data from subscriptions tree
foreach ($roles as $role) {
if (!isset($this->subscriptions[$projectId][$role])) {
continue;
}
foreach ($channels as $channel) {
if (!isset($this->subscriptions[$projectId][$role][$channel][$connection])) {
continue;
}
foreach ($this->subscriptions[$projectId][$role][$channel][$connection] as $subId => $queryStrings) {
if (!isset($subscriptions[$subId])) {
$subscriptions[$subId] = [
'channels' => [],
'queries' => $queryStrings
];
}
if (!in_array($channel, $subscriptions[$subId]['channels'])) {
$subscriptions[$subId]['channels'][] = $channel;
}
}
}
}
return $subscriptions;
}
/**
* Removes all subscriptions for a connection.
*
* @param mixed $connection
* @return void
@ -91,10 +158,11 @@ class Realtime extends MessagingAdapter
{
$projectId = $this->connections[$connection]['projectId'] ?? '';
$roles = $this->connections[$connection]['roles'] ?? [];
$channels = $this->connections[$connection]['channels'] ?? [];
foreach ($roles as $role) {
foreach ($this->subscriptions[$projectId][$role] as $channel => $list) {
unset($this->subscriptions[$projectId][$role][$channel][$connection]); // Remove connection
foreach ($channels as $channel) {
unset($this->subscriptions[$projectId][$role][$channel][$connection]); // dropping connection will drop all subscriptions
if (empty($this->subscriptions[$projectId][$role][$channel])) {
unset($this->subscriptions[$projectId][$role][$channel]); // Remove channel when no connections
@ -110,7 +178,9 @@ class Realtime extends MessagingAdapter
unset($this->subscriptions[$projectId]);
}
unset($this->connections[$connection]);
if (isset($this->connections[$connection])) {
unset($this->connections[$connection]);
}
}
/**
@ -130,7 +200,8 @@ class Realtime extends MessagingAdapter
return array_key_exists($projectId, $this->subscriptions)
&& array_key_exists($role, $this->subscriptions[$projectId])
&& array_key_exists($channel, $this->subscriptions[$projectId][$role]);
&& array_key_exists($channel, $this->subscriptions[$projectId][$role])
&& !empty($this->subscriptions[$projectId][$role][$channel]);
}
/**
@ -179,11 +250,10 @@ class Realtime extends MessagingAdapter
* - 1,121.328 ms (±0.84%) | 1,000,000 Connections / 10,000,000 Subscriptions
*
* @param array $event
* @return int[]|string[]
* @return array<int|string, array> Map of connection IDs to matched query groups
*/
public function getSubscribers(array $event): array
{
$receivers = [];
/**
* Check if project has subscriber.
@ -207,17 +277,31 @@ class Realtime extends MessagingAdapter
/**
* Saving all connections that are allowed to receive this event.
*/
foreach (array_keys($this->subscriptions[$event['project']][$role][$channel]) as $id) {
/**
* To prevent duplicates, we save the connections as array keys.
*/
$queries = $this->connections[$id]['queries'] ?? [];
$payload = $event['data']['payload'] ?? [];
if (
empty($queries) ||
!empty(RuntimeQuery::filter($queries, $payload))
) {
$receivers[$id] = 0;
$payload = $event['data']['payload'] ?? [];
foreach ($this->subscriptions[$event['project']][$role][$channel] as $id => $subscriptions) {
$matchedSubscriptions = [];
// Process each subscription (OR logic across subscriptions)
foreach ($subscriptions as $subId => $queryStrings) {
$parsedQueries = [];
foreach ($queryStrings as $queryString) {
$parsed = Query::parseQueries([$queryString]);
$parsedQueries = array_merge($parsedQueries, $parsed);
}
// Check if this subscription matches (AND logic within subscription)
// Or if empty payload and select all as filter will return empty payload out of it even if it passed
$isEmptyPayloadAndSelectAll = RuntimeQuery::isSelectAll($parsedQueries[0]) && empty($payload);
if ($isEmptyPayloadAndSelectAll || !empty(RuntimeQuery::filter($parsedQueries, $payload))) {
$matchedSubscriptions[$subId] = $queryStrings;
}
}
// Only add connection to receivers if at least one subscription matched
if (!empty($matchedSubscriptions)) {
if (!isset($receivers[$id])) {
$receivers[$id] = [];
}
$receivers[$id] = array_merge($receivers[$id], $matchedSubscriptions);
}
}
break;
@ -226,7 +310,7 @@ class Realtime extends MessagingAdapter
}
}
return array_keys($receivers);
return $receivers;
}
/**
@ -258,17 +342,82 @@ class Realtime extends MessagingAdapter
}
/**
* Converts the queries from the Query Params into an array.
* @param array $queries
* @return array
* Constructs subscriptions from query parameters.
*
* Reconstructs subscription structure from query params where subscription indices can span multiple channels.
* Format: {channel}[subscriptionIndex][]=query1&{channel}[subscriptionIndex][]=query2
*
* Example:
* - tests[0][]=select(*) subscription 0: channels=["tests"]
* - tests[1][]=equal(...) & prod[1][]=equal(...) subscription 1: channels=["tests", "prod"]
*
* @param array $channelNames Array of channel names
* @param callable $getQueryParam Callable that takes a channel name and returns its query param value (null if not present)
* @return array Array indexed by subscription index: [index => ['channels' => string[], 'queries' => Query[]]]
* @throws QueryException
*/
public static function convertQueries(array $queries): array
public static function constructSubscriptions(array $channelNames, callable $getQueryParam): array
{
$subscriptionsByIndex = [];
foreach ($channelNames as $channel) {
$channelSubscriptions = $getQueryParam($channel);
// Backward compatibility: if no channel-specific query params, treat as subscription 0 with select("*")
if ($channelSubscriptions === null) {
if (!isset($subscriptionsByIndex[0])) {
$subscriptionsByIndex[0] = [
'channels' => [],
'queries' => []
];
}
$subscriptionsByIndex[0]['channels'][] = $channel;
if (empty($subscriptionsByIndex[0]['queries'])) {
$subscriptionsByIndex[0]['queries'] = [Query::select(['*'])];
}
continue;
}
if (!is_array($channelSubscriptions)) {
$channelSubscriptions = [$channelSubscriptions];
}
foreach ($channelSubscriptions as $subscriptionIndex => $subscription) {
if (!isset($subscriptionsByIndex[$subscriptionIndex])) {
$subscriptionsByIndex[$subscriptionIndex] = [
'channels' => [],
'queries' => []
];
}
if (!in_array($channel, $subscriptionsByIndex[$subscriptionIndex]['channels'])) {
$subscriptionsByIndex[$subscriptionIndex]['channels'][] = $channel;
}
if (empty($subscriptionsByIndex[$subscriptionIndex]['queries'])) {
$queriesToParse = is_array($subscription) ? $subscription : [$subscription];
$parsedQueries = self::convertQueries($queriesToParse);
$subscriptionsByIndex[$subscriptionIndex]['queries'] = $parsedQueries;
}
}
}
return $subscriptionsByIndex;
}
/**
* Converts the queries from the Query Params into an array.
* @param array|string $queries
* @return array
* @throws QueryException
*/
public static function convertQueries(mixed $queries): array
{
$queries = Query::parseQueries($queries);
$stack = $queries;
$allowedMethods = implode(', ', RuntimeQuery::ALLOWED_QUERIES);
while (!empty($stack)) {
/** `@var` Query $query */
/** @var Query $query */
$query = array_pop($stack);
$method = $query->getMethod();
if (!in_array($method, RuntimeQuery::ALLOWED_QUERIES, true)) {
@ -277,6 +426,12 @@ class Realtime extends MessagingAdapter
"Query method '{$unsupportedMethod}' is not supported in Realtime queries. Allowed query methods are: {$allowedMethods}"
);
}
// Validate select queries - only select("*") is allowed
if ($method === Query::TYPE_SELECT) {
RuntimeQuery::validateSelectQuery($query);
}
if (in_array($method, [Query::TYPE_AND, Query::TYPE_OR], true)) {
$stack = array_merge($stack, $query->getValues());
}

View file

@ -21,9 +21,44 @@ class RuntimeQuery extends Query
// Recursive checks
Query::TYPE_AND,
Query::TYPE_OR
Query::TYPE_OR,
// Special: select("*") means "listen to all events"
Query::TYPE_SELECT
];
/**
* Checks if a query is select("*") which means "listen to all events"
*
* @param Query $query
* @return bool
*/
public static function isSelectAll(Query $query): bool
{
return $query->getMethod() === Query::TYPE_SELECT
&& count($query->getValues()) === 1
&& $query->getValues()[0] === '*';
}
/**
* Validates a select query - only select("*") is allowed in Realtime
*
* @param Query $query
* @throws \InvalidArgumentException
*/
public static function validateSelectQuery(Query $query): void
{
if ($query->getMethod() !== Query::TYPE_SELECT) {
return;
}
if (!self::isSelectAll($query)) {
throw new \InvalidArgumentException(
'Only select("*") is allowed in Realtime queries. select("*") means "listen to all events".'
);
}
}
/**
* @param array<Query> $queries
* @param array<string, mixed> $payload
@ -33,6 +68,14 @@ class RuntimeQuery extends Query
if (empty($queries)) {
return $payload;
}
// Check if select("*") is present - if so, return payload (match all)
foreach ($queries as $query) {
if (self::isSelectAll($query)) {
return $payload;
}
}
// multiple queries follows and condition
foreach ($queries as $query) {
if (!self::evaluateFilter($query, $payload)) {

View file

@ -11,7 +11,7 @@ trait RealtimeBase
array $channels = [],
array $headers = [],
string $projectId = null,
array $queries = []
?array $queries = null
): WebSocketClient {
if (is_null($projectId)) {
$projectId = $this->getProject()['$id'];
@ -19,12 +19,48 @@ trait RealtimeBase
$query = [
"project" => $projectId,
"channels" => $channels,
"queries" => $queries
"channels" => $channels
];
/**
* Query param encoding rules:
* - $queries === null -> only send channels (no per-channel query params) for backward compatibility.
* - $queries === [] -> explicit "select all" subscription: send Query::select(['*']) as a single group.
* - non-empty $queries -> treat as a single subscription group for the first channel:
* AND logic within the group; OR logic across multiple groups (if we ever add them).
*
* For now all E2E tests subscribe to a single channel, so we map queries to $channels[0].
*
* Slot-based format: channel[slot][]=query1&channel[slot][]=query2
* We need to manually build the query string to ensure the [] format is used.
*/
// Build base query string
$queryParams = [
"project" => $projectId,
"channels" => $channels
];
$queryString = http_build_query($queryParams);
if ($queries !== null && !empty($channels)) {
$channel = $channels[0];
$slot = 0; // All tests use slot 0 for now
if ($queries === []) {
// Explicit select("*") group - single query in slot 0
$queryValue = \Utopia\Database\Query::select(['*'])->toString();
$queryString .= "&" . urlencode($channel) . "[" . $slot . "][]=" . urlencode($queryValue);
} else {
// Single subscription group for this channel - multiple queries in slot 0
// Each query should be appended with [] format
foreach ($queries as $queryValue) {
$queryString .= "&" . urlencode($channel) . "[" . $slot . "][]=" . urlencode($queryValue);
}
}
}
return new WebSocketClient(
"ws://appwrite.test/v1/realtime?" . http_build_query($query),
"ws://appwrite.test/v1/realtime?" . $queryString,
[
"headers" => $headers,
"timeout" => 30,

View file

@ -86,7 +86,7 @@ class RealtimeCustomClientQueryTest extends Scope
// Should timeout - no event should be received
try {
$client->receive();
$data = $client->receive();
$this->fail('Expected TimeoutException - event should be filtered');
} catch (TimeoutException $e) {
$this->assertTrue(true);
@ -1352,7 +1352,7 @@ class RealtimeCustomClientQueryTest extends Scope
$targetDocId = ID::unique();
// Subscribe with multiple queries (AND logic - ALL queries must match for event to be received)
// Subscribe with multiple 'queries' (AND logic - ALL 'queries' must match for event to be received)
$client = $this->getWebsocket(['documents'], [
'origin' => 'http://localhost',
'cookie' => 'a_session_' . $projectId . '=' . $session,
@ -1364,7 +1364,7 @@ class RealtimeCustomClientQueryTest extends Scope
$response = json_decode($client->receive(), true);
$this->assertEquals('connected', $response['type']);
// Create document matching BOTH queries - should receive event
// Create document matching BOTH 'queries' - should receive event
$document1 = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
@ -1383,50 +1383,9 @@ class RealtimeCustomClientQueryTest extends Scope
$this->assertEquals($targetDocId, $event['data']['payload']['$id']);
$this->assertEquals('active', $event['data']['payload']['status']);
// Create document with matching ID but wrong status - should NOT receive event (only one query matches)
$this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
], $this->getHeaders()), [
'documentId' => $targetDocId,
'data' => [
'status' => 'inactive'
],
'permissions' => [
Permission::read(Role::any()),
],
]);
try {
$client->receive();
$this->fail('Expected TimeoutException - event should be filtered (ID matches but status does not)');
} catch (TimeoutException $e) {
$this->assertTrue(true);
}
// Create document with matching status but wrong ID - should NOT receive event (only one query matches)
$otherDocId = ID::unique();
$this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
], $this->getHeaders()), [
'documentId' => $otherDocId,
'data' => [
'status' => 'active'
],
'permissions' => [
Permission::read(Role::any()),
],
]);
try {
$client->receive();
$this->fail('Expected TimeoutException - event should be filtered (status matches but ID does not)');
} catch (TimeoutException $e) {
$this->assertTrue(true);
}
// Create document matching NEITHER query - should NOT receive event
// Create document matching NEITHER query - should not receive event
// keeping it here as below are the documents created with status=>active
// so it will also receive it but the querykey can be used to distinction
$anotherDocId = ID::unique();
$this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([
'content-type' => 'application/json',
@ -1448,6 +1407,27 @@ class RealtimeCustomClientQueryTest extends Scope
$this->assertTrue(true);
}
// Create document with matching ID but wrong status - should NOT receive event (only one query matches)
$this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
], $this->getHeaders()), [
'documentId' => $targetDocId,
'data' => [
'status' => 'inactive'
],
'permissions' => [
Permission::read(Role::any()),
],
]);
try {
$client->receive();
$this->fail('Expected TimeoutException - event should be filtered (ID matches but status does not)');
} catch (TimeoutException $e) {
$this->assertTrue(true);
}
$client->close();
}
@ -1521,7 +1501,7 @@ class RealtimeCustomClientQueryTest extends Scope
$this->assertStringContainsString('not supported in Realtime queries', $response['data']['message']);
$this->assertStringContainsString('startsWith', $response['data']['message']);
// Test 5: Multiple invalid queries in nested structure
// Test 5: Multiple invalid 'queries' in nested structure
$client = $this->getWebsocket(['documents'], [
'origin' => 'http://localhost',
'cookie' => 'a_session_' . $projectId . '=' . $session,
@ -1544,4 +1524,565 @@ class RealtimeCustomClientQueryTest extends Scope
str_contains($response['data']['message'], 'endsWith')
);
}
public function testQueryKeys()
{
$user = $this->getUser();
$session = $user['session'] ?? '';
$projectId = $this->getProject()['$id'];
// Setup database and collection
$database = $this->client->call(Client::METHOD_POST, '/databases', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
'x-appwrite-key' => $this->getProject()['apiKey']
]), [
'databaseId' => ID::unique(),
'name' => 'Query Keys Test DB',
]);
$databaseId = $database['body']['$id'];
$collection = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
'x-appwrite-key' => $this->getProject()['apiKey']
]), [
'collectionId' => ID::unique(),
'name' => 'Query Keys Collection',
'permissions' => [
Permission::create(Role::user($user['$id'])),
],
'documentSecurity' => true,
]);
$collectionId = $collection['body']['$id'];
// Attributes used by 'queries'
$this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/attributes/string', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
'x-appwrite-key' => $this->getProject()['apiKey']
]), [
'key' => 'status',
'size' => 256,
'required' => false,
]);
sleep(2);
$this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/attributes/string', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
'x-appwrite-key' => $this->getProject()['apiKey']
]), [
'key' => 'category',
'size' => 256,
'required' => false,
]);
sleep(2);
$queryStatusActive = Query::equal('status', ['active'])->toString();
$queryStatusPending = Query::equal('status', ['pending'])->toString();
$queryComplex = Query::and([
Query::equal('status', ['active']),
Query::equal('category', ['gold']),
])->toString();
// Subscribe with no 'queries' -> should receive all events (has select("*") subscription)
$clientAll = $this->getWebsocket(['documents'], [
'origin' => 'http://localhost',
'cookie' => 'a_session_' . $projectId . '=' . $session,
]);
// Subscribe with query1 (status == active)
$clientQ1 = $this->getWebsocket(['documents'], [
'origin' => 'http://localhost',
'cookie' => 'a_session_' . $projectId . '=' . $session,
], null, [
$queryStatusActive,
]);
// Subscribe with query2 (status == pending)
$clientQ2 = $this->getWebsocket(['documents'], [
'origin' => 'http://localhost',
'cookie' => 'a_session_' . $projectId . '=' . $session,
], null, [
$queryStatusPending,
]);
// Subscribe with complex query (status == active AND category == gold)
$clientComplex = $this->getWebsocket(['documents'], [
'origin' => 'http://localhost',
'cookie' => 'a_session_' . $projectId . '=' . $session,
], null, [
$queryComplex,
]);
// All clients should be connected
foreach ([$clientAll, $clientQ1, $clientQ2, $clientComplex] as $client) {
$response = json_decode($client->receive(), true);
$this->assertEquals('connected', $response['type']);
}
// 1) Create active/gold document -> should match Q1 and complex, and be seen by all
$docActiveGoldId = ID::unique();
$this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
], $this->getHeaders()), [
'documentId' => $docActiveGoldId,
'data' => [
'status' => 'active',
'category' => 'gold',
],
'permissions' => [
Permission::read(Role::any()),
],
]);
// clientAll: should receive event, subscriptions should not be empty (has select("*") subscription that matches)
$eventAll = json_decode($clientAll->receive(), true);
$this->assertEquals('event', $eventAll['type']);
$this->assertEquals($docActiveGoldId, $eventAll['data']['payload']['$id']);
$this->assertArrayHasKey('subscriptions', $eventAll['data']);
$this->assertIsArray($eventAll['data']['subscriptions']);
// clientAll has select("*") subscription that matches all events, so subscriptions should not be empty
$this->assertNotEmpty($eventAll['data']['subscriptions']);
// clientQ1: should receive event, subscriptions should not be empty (query matched)
$eventQ1 = json_decode($clientQ1->receive(), true);
$this->assertEquals('event', $eventQ1['type']);
$this->assertEquals($docActiveGoldId, $eventQ1['data']['payload']['$id']);
$this->assertArrayHasKey('subscriptions', $eventQ1['data']);
$this->assertIsArray($eventQ1['data']['subscriptions']);
// clientQ1 has a query that matches, so subscriptions should not be empty
$this->assertNotEmpty($eventQ1['data']['subscriptions']);
// clientQ2: should NOT receive event (status is active, not pending)
try {
$clientQ2->receive();
$this->fail('Expected TimeoutException - event should be filtered for clientQ2 (active document)');
} catch (TimeoutException $e) {
$this->assertTrue(true);
}
// clientComplex: should receive event, subscriptions should not be empty (query matched)
$eventComplex = json_decode($clientComplex->receive(), true);
$this->assertEquals('event', $eventComplex['type']);
$this->assertEquals($docActiveGoldId, $eventComplex['data']['payload']['$id']);
$this->assertArrayHasKey('subscriptions', $eventComplex['data']);
$this->assertIsArray($eventComplex['data']['subscriptions']);
// clientComplex has a query that matches, so subscriptions should not be empty
$this->assertNotEmpty($eventComplex['data']['subscriptions']);
// 2) Create pending/silver document -> should match Q2 only, and be seen by all
$docPendingSilverId = ID::unique();
$this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
], $this->getHeaders()), [
'documentId' => $docPendingSilverId,
'data' => [
'status' => 'pending',
'category' => 'silver',
],
'permissions' => [
Permission::read(Role::any()),
],
]);
// clientAll: should receive event, subscriptions should not be empty (has select("*") subscription that matches)
$eventAll2 = json_decode($clientAll->receive(), true);
$this->assertEquals('event', $eventAll2['type']);
$this->assertEquals($docPendingSilverId, $eventAll2['data']['payload']['$id']);
$this->assertArrayHasKey('subscriptions', $eventAll2['data']);
$this->assertIsArray($eventAll2['data']['subscriptions']);
// clientAll has select("*") subscription that matches all events, so subscriptions should not be empty
$this->assertNotEmpty($eventAll2['data']['subscriptions']);
// clientQ1: should NOT receive event (status is pending)
try {
$clientQ1->receive();
$this->fail('Expected TimeoutException - event should be filtered for clientQ1 (pending document)');
} catch (TimeoutException $e) {
$this->assertTrue(true);
}
// clientQ2: should receive event, subscriptions should not be empty (query matched)
$eventQ2 = json_decode($clientQ2->receive(), true);
$this->assertEquals('event', $eventQ2['type']);
$this->assertEquals($docPendingSilverId, $eventQ2['data']['payload']['$id']);
$this->assertArrayHasKey('subscriptions', $eventQ2['data']);
$this->assertIsArray($eventQ2['data']['subscriptions']);
// clientQ2 has a query that matches, so subscriptions should not be empty
$this->assertNotEmpty($eventQ2['data']['subscriptions']);
// clientComplex: should NOT receive event (status is pending, category silver)
try {
$clientComplex->receive();
$this->fail('Expected TimeoutException - event should be filtered for complex subscription (pending document)');
} catch (TimeoutException $e) {
$this->assertTrue(true);
}
$clientAll->close();
$clientQ1->close();
$clientQ2->close();
$clientComplex->close();
}
/**
* Ensure two separate subscriptions with different query keys
* only see their own matching events and expose the correct
* queryKey in queryKeys.
*/
public function testMultipleSubscriptionsDifferentQueryKeys()
{
$user = $this->getUser();
$session = $user['session'] ?? '';
$projectId = $this->getProject()['$id'];
// Setup database and collection
$database = $this->client->call(Client::METHOD_POST, '/databases', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
'x-appwrite-key' => $this->getProject()['apiKey']
]), [
'databaseId' => ID::unique(),
'name' => 'Multiple Query Keys Test DB',
]);
$databaseId = $database['body']['$id'];
$collection = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
'x-appwrite-key' => $this->getProject()['apiKey']
]), [
'collectionId' => ID::unique(),
'name' => 'Multiple Query Keys Collection',
'permissions' => [
Permission::create(Role::user($user['$id'])),
],
'documentSecurity' => true,
]);
$collectionId = $collection['body']['$id'];
// Attribute used by 'queries'
$this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/attributes/string', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
'x-appwrite-key' => $this->getProject()['apiKey']
]), [
'key' => 'status',
'size' => 256,
'required' => false,
]);
sleep(2);
$queryStatusActive = Query::equal('status', ['active'])->toString();
$queryStatusPending = Query::equal('status', ['pending'])->toString();
// Two subscriptions on the same channel with different query keys
$clientQ1 = $this->getWebsocket(['documents'], [
'origin' => 'http://localhost',
'cookie' => 'a_session_' . $projectId . '=' . $session,
], null, [
$queryStatusActive,
]);
$clientQ2 = $this->getWebsocket(['documents'], [
'origin' => 'http://localhost',
'cookie' => 'a_session_' . $projectId . '=' . $session,
], null, [
$queryStatusPending,
]);
// Both should connect
$response = json_decode($clientQ1->receive(), true);
$this->assertEquals('connected', $response['type']);
$response = json_decode($clientQ2->receive(), true);
$this->assertEquals('connected', $response['type']);
// 1) active document -> only queryStatusActive subscription should see it
$docActiveId = ID::unique();
$this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
], $this->getHeaders()), [
'documentId' => $docActiveId,
'data' => [
'status' => 'active',
],
'permissions' => [
Permission::read(Role::any()),
],
]);
$eventQ1 = json_decode($clientQ1->receive(), true);
$this->assertEquals('event', $eventQ1['type']);
$this->assertEquals($docActiveId, $eventQ1['data']['payload']['$id']);
$this->assertArrayHasKey('subscriptions', $eventQ1['data']);
$this->assertIsArray($eventQ1['data']['subscriptions']);
// clientQ1 has a query that matches, so subscriptions should not be empty
$this->assertNotEmpty($eventQ1['data']['subscriptions']);
try {
$clientQ2->receive();
$this->fail('Expected TimeoutException - clientQ2 should not receive active document');
} catch (TimeoutException $e) {
$this->assertTrue(true);
}
// 2) pending document -> only queryStatusPending subscription should see it
$docPendingId = ID::unique();
$this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
], $this->getHeaders()), [
'documentId' => $docPendingId,
'data' => [
'status' => 'pending',
],
'permissions' => [
Permission::read(Role::any()),
],
]);
$eventQ2 = json_decode($clientQ2->receive(), true);
$this->assertEquals('event', $eventQ2['type']);
$this->assertEquals($docPendingId, $eventQ2['data']['payload']['$id']);
$this->assertArrayHasKey('subscriptions', $eventQ2['data']);
$this->assertIsArray($eventQ2['data']['subscriptions']);
// clientQ2 has a query that matches, so subscriptions should not be empty
$this->assertNotEmpty($eventQ2['data']['subscriptions']);
try {
$clientQ1->receive();
$this->fail('Expected TimeoutException - clientQ1 should not receive pending document');
} catch (TimeoutException $e) {
$this->assertTrue(true);
}
$clientQ1->close();
$clientQ2->close();
}
public function testSubscriptionPreservedAfterPermissionChange()
{
$user = $this->getUser();
$session = $user['session'] ?? '';
$projectId = $this->getProject()['$id'];
$userId = $user['$id'] ?? '';
// Setup database and collection
$database = $this->client->call(Client::METHOD_POST, '/databases', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
'x-appwrite-key' => $this->getProject()['apiKey']
]), [
'databaseId' => ID::unique(),
'name' => 'Permission Change Test DB',
]);
$databaseId = $database['body']['$id'];
$collection = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
'x-appwrite-key' => $this->getProject()['apiKey']
]), [
'collectionId' => ID::unique(),
'name' => 'Permission Change Collection',
'permissions' => [
Permission::create(Role::user($userId)),
Permission::read(Role::user($userId)),
],
'documentSecurity' => true,
]);
$collectionId = $collection['body']['$id'];
$this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/attributes/string', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
'x-appwrite-key' => $this->getProject()['apiKey']
]), [
'key' => 'status',
'size' => 256,
'required' => false,
]);
sleep(2);
$targetDocumentId = ID::unique();
// Subscribe with query for specific document ID
$client = $this->getWebsocket(['documents'], [
'origin' => 'http://localhost',
'cookie' => 'a_session_' . $projectId . '=' . $session,
], null, [
Query::equal('$id', [$targetDocumentId])->toString(),
]);
$response = json_decode($client->receive(), true);
$this->assertEquals('connected', $response['type']);
$this->assertArrayHasKey('subscriptions', $response['data']);
$this->assertIsArray($response['data']['subscriptions']);
// Store the original subscription mapping (index => subscriptionId)
$originalSubscriptionMapping = $response['data']['subscriptions'];
$this->assertNotEmpty($originalSubscriptionMapping);
// Get the first subscription ID and its index
$originalIndex = array_key_first($originalSubscriptionMapping);
$originalSubscriptionId = $originalSubscriptionMapping[$originalIndex];
// Create document with matching ID - should receive event
$document = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
], $this->getHeaders()), [
'documentId' => $targetDocumentId,
'data' => [
'status' => 'active'
],
'permissions' => [
Permission::read(Role::user($userId)),
Permission::update(Role::user($userId)),
],
]);
$event = json_decode($client->receive(), true);
$this->assertEquals('event', $event['type']);
$this->assertEquals($targetDocumentId, $event['data']['payload']['$id']);
$this->assertArrayHasKey('subscriptions', $event['data']);
$this->assertContains($originalSubscriptionId, $event['data']['subscriptions']);
// Trigger permission change by creating a team owned by a DIFFERENT user,
$teamOwnerEmail = uniqid() . 'owner@localhost.test';
$teamOwnerPassword = 'password';
$teamOwner = $this->client->call(Client::METHOD_POST, '/account', [
'origin' => 'http://localhost',
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
], [
'userId' => ID::unique(),
'email' => $teamOwnerEmail,
'password' => $teamOwnerPassword,
'name' => 'Team Owner',
]);
$this->assertEquals(201, $teamOwner['headers']['status-code']);
$teamOwnerSession = $this->client->call(Client::METHOD_POST, '/account/sessions/email', [
'origin' => 'http://localhost',
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
], [
'email' => $teamOwnerEmail,
'password' => $teamOwnerPassword,
]);
$teamOwnerSession = $teamOwnerSession['cookies']['a_session_' . $projectId] ?? '';
$team = $this->client->call(Client::METHOD_POST, '/teams', [
'origin' => 'http://localhost',
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
'cookie' => 'a_session_' . $projectId . '=' . $teamOwnerSession,
], [
'teamId' => ID::unique(),
'name' => 'Test Team',
]);
$teamId = $team['body']['$id'];
$this->client->call(Client::METHOD_POST, '/teams/' . $teamId . '/memberships', [
'origin' => 'http://localhost',
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
'x-appwrite-key' => $this->getProject()['apiKey'],
], [
'email' => $user['email'],
'roles' => ['member'],
'url' => 'http://localhost',
]);
sleep(3);
// Verify subscription is still working after permission change
$nonMatchingDocumentId = ID::unique();
$document2 = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
], $this->getHeaders()), [
'documentId' => $nonMatchingDocumentId,
'data' => [
'status' => 'active'
],
'permissions' => [
Permission::read(Role::user($userId)),
Permission::update(Role::user($userId)),
],
]);
// This document doesn't match the query, so we shouldn't receive it
try {
$data = $client->receive();
$this->fail('Expected TimeoutException - document does not match query after permission change');
} catch (TimeoutException $e) {
$this->assertTrue(true);
}
// Create a NEW document with a different ID - should NOT receive event
$targetDocumentId2 = ID::unique();
$document3 = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
], $this->getHeaders()), [
'documentId' => $targetDocumentId2,
'data' => [
'status' => 'active'
],
'permissions' => [
Permission::read(Role::user($userId)),
Permission::update(Role::user($userId)),
],
]);
sleep(2);
// This should NOT receive event because the query is for $targetDocumentId, not $targetDocumentId2
// This verifies the query is preserved after permission change
try {
$data = $client->receive();
$this->fail('Expected TimeoutException - new document does not match original query after permission change');
} catch (TimeoutException $e) {
$this->assertTrue(true);
}
// Create a document with the ORIGINAL matching ID - should receive event
$document4 = $this->client->call(Client::METHOD_PATCH, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents/' . $targetDocumentId, array_merge([
'content-type' => 'application/json',
'x-appwrite-project' => $projectId,
], $this->getHeaders()), [
'data' => [
'status' => 'updated-after-permission-change'
],
]);
// Wait a bit for the event to be processed
sleep(3);
// Verify the event is received with the preserved subscription
$event2 = json_decode($client->receive(), true);
$this->assertEquals('event', $event2['type']);
$this->assertEquals($targetDocumentId, $event2['data']['payload']['$id']);
$this->assertEquals('updated-after-permission-change', $event2['data']['payload']['status']);
$this->assertArrayHasKey('subscriptions', $event2['data']);
$this->assertIsArray($event2['data']['subscriptions']);
$this->assertNotEmpty($event2['data']['subscriptions']);
// Subscription ID should remain stable after permission change
$this->assertContains($originalSubscriptionId, $event2['data']['subscriptions']);
$client->close();
}
}

View file

@ -81,11 +81,13 @@ class MessagingChannelsTest extends TestCase
$roles = $user->getRoles($this->getAuthorization());
$parsedChannels = Realtime::convertChannels([0 => $channel], $user->getId());
// Normalize channels to the format Realtime::subscribe expects (plain channel names)
$parsedChannels = array_keys(Realtime::convertChannels([0 => $channel], $user->getId()));
$this->realtime->subscribe(
'1',
$this->connectionsCount,
ID::unique(),
$roles,
$parsedChannels
);
@ -105,11 +107,13 @@ class MessagingChannelsTest extends TestCase
$roles = $user->getRoles($this->getAuthorization());
$parsedChannels = Realtime::convertChannels([0 => $channel], $user->getId());
// Normalize channels to the format Realtime::subscribe expects (plain channel names)
$parsedChannels = array_keys(Realtime::convertChannels([0 => $channel], $user->getId()));
$this->realtime->subscribe(
'1',
$this->connectionsCount,
ID::unique(),
$roles,
$parsedChannels
);
@ -194,11 +198,12 @@ class MessagingChannelsTest extends TestCase
*/
$this->assertCount($this->connectionsTotal / count($this->allChannels), $receivers, $channel);
foreach ($receivers as $receiver) {
foreach ($receivers as $receiverId => $queryKeys) {
/**
* Making sure the right clients receive the event.
*/
$this->assertStringEndsWith($index, $receiver);
$this->assertStringEndsWith($index, $receiverId);
$this->assertIsArray($queryKeys);
}
}
}
@ -230,11 +235,12 @@ class MessagingChannelsTest extends TestCase
*/
$this->assertCount($this->connectionsPerChannel, $receivers, $channel);
foreach ($receivers as $receiver) {
foreach ($receivers as $receiverId => $queryKeys) {
/**
* Making sure the right clients receive the event.
*/
$this->assertStringEndsWith($index, $receiver);
$this->assertStringEndsWith($index, $receiverId);
$this->assertIsArray($queryKeys);
}
}
}
@ -257,7 +263,7 @@ class MessagingChannelsTest extends TestCase
]
];
$receivers = $this->realtime->getSubscribers($event);
$receivers = array_keys($this->realtime->getSubscribers($event));
/**
* Every Client subscribed to a Channel should receive this event.
@ -292,7 +298,7 @@ class MessagingChannelsTest extends TestCase
]
];
$receivers = $this->realtime->getSubscribers($event);
$receivers = array_keys($this->realtime->getSubscribers($event));
/**
* Every Team Member should receive this event.
@ -325,7 +331,7 @@ class MessagingChannelsTest extends TestCase
]
];
$receivers = $this->realtime->getSubscribers($event);
$receivers = array_keys($this->realtime->getSubscribers($event));
/**
* Only 1 Team Member of a role should have access to a specific channel.

View file

@ -16,8 +16,10 @@ class MessagingGuestTest extends TestCase
$realtime->subscribe(
'1',
1,
ID::unique(),
[Role::guests()->toString()],
['files' => 0, 'documents' => 0, 'documents.789' => 0, 'account.123' => 0]
// Pass plain channel names, Realtime::subscribe will normalize them
['files', 'documents', 'documents.789', 'account.123']
);
$event = [
@ -27,93 +29,93 @@ class MessagingGuestTest extends TestCase
'channels' => [
0 => 'documents',
1 => 'documents',
]
],
]
];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertCount(1, $receivers);
$this->assertEquals(1, $receivers[0]);
$event['roles'] = [Role::guests()->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertCount(1, $receivers);
$this->assertEquals(1, $receivers[0]);
$event['roles'] = [Role::users()->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertEmpty($receivers);
$event['roles'] = [Role::user(ID::custom('123'))->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertEmpty($receivers);
$event['roles'] = [Role::team(ID::custom('abc'))->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertEmpty($receivers);
$event['roles'] = [Role::team(ID::custom('abc'), 'administrator')->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertEmpty($receivers);
$event['roles'] = [Role::team(ID::custom('abc'), 'god')->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertEmpty($receivers);
$event['roles'] = [Role::team(ID::custom('def'))->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertEmpty($receivers);
$event['roles'] = [Role::team(ID::custom('def'), 'guest')->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertEmpty($receivers);
$event['roles'] = [Role::user(ID::custom('456'))->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertEmpty($receivers);
$event['roles'] = [Role::team(ID::custom('def'), 'member')->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertEmpty($receivers);
$event['roles'] = [Role::any()->toString()];
$event['data']['channels'] = ['documents.123'];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertEmpty($receivers);
$event['data']['channels'] = ['documents.789'];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertCount(1, $receivers);
$this->assertEquals(1, $receivers[0]);
$event['project'] = '2';
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertEmpty($receivers);

View file

@ -26,6 +26,7 @@ class MessagingTest extends TestCase
$realtime->subscribe(
'1',
1,
ID::unique(),
[
Role::user(ID::custom('123'))->toString(),
Role::users()->toString(),
@ -35,7 +36,8 @@ class MessagingTest extends TestCase
Role::team(ID::custom('def'))->toString(),
Role::team(ID::custom('def'), 'guest')->toString(),
],
['files' => 0, 'documents' => 0, 'documents.789' => 0, 'account.123' => 0]
// Pass plain channel names, Realtime::subscribe will normalize them
['files', 'documents', 'documents.789', 'account.123']
);
$event = [
@ -48,89 +50,89 @@ class MessagingTest extends TestCase
]
];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertCount(1, $receivers);
$this->assertEquals(1, $receivers[0]);
$event['roles'] = [Role::users()->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertCount(1, $receivers);
$this->assertEquals(1, $receivers[0]);
$event['roles'] = [Role::user(ID::custom('123'))->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertCount(1, $receivers);
$this->assertEquals(1, $receivers[0]);
$event['roles'] = [Role::team(ID::custom('abc'))->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertCount(1, $receivers);
$this->assertEquals(1, $receivers[0]);
$event['roles'] = [Role::team(ID::custom('abc'), 'administrator')->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertCount(1, $receivers);
$this->assertEquals(1, $receivers[0]);
$event['roles'] = [Role::team(ID::custom('abc'), 'moderator')->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertCount(1, $receivers);
$this->assertEquals(1, $receivers[0]);
$event['roles'] = [Role::team(ID::custom('def'))->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertCount(1, $receivers);
$this->assertEquals(1, $receivers[0]);
$event['roles'] = [Role::team(ID::custom('def'), 'guest')->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertCount(1, $receivers);
$this->assertEquals(1, $receivers[0]);
$event['roles'] = [Role::user(ID::custom('456'))->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertEmpty($receivers);
$event['roles'] = [Role::team(ID::custom('def'), 'member')->toString()];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertEmpty($receivers);
$event['roles'] = [Role::any()->toString()];
$event['data']['channels'] = ['documents.123'];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertEmpty($receivers);
$event['data']['channels'] = ['documents.789'];
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertCount(1, $receivers);
$this->assertEquals(1, $receivers[0]);
$event['project'] = '2';
$receivers = $realtime->getSubscribers($event);
$receivers = array_keys($realtime->getSubscribers($event));
$this->assertEmpty($receivers);

View file

@ -599,4 +599,96 @@ class RuntimeQueryTest extends TestCase
$result = RuntimeQuery::filter([$query], $payload);
$this->assertEquals($payload, $result);
}
// TYPE_SELECT tests - select("*") means "listen to all events"
public function testSelectAllIsAllowed(): void
{
$query = Query::select(['*']);
$this->assertTrue(RuntimeQuery::isSelectAll($query));
}
public function testSelectSpecificFieldsNotAllowed(): void
{
$query = Query::select(['name', 'age']);
$this->assertFalse(RuntimeQuery::isSelectAll($query));
}
public function testSelectSingleFieldNotAllowed(): void
{
$query = Query::select(['name']);
$this->assertFalse(RuntimeQuery::isSelectAll($query));
}
public function testValidateSelectQueryWithWildcard(): void
{
$query = Query::select(['*']);
// Should not throw
RuntimeQuery::validateSelectQuery($query);
$this->assertTrue(true);
}
public function testValidateSelectQueryWithSpecificFields(): void
{
$query = Query::select(['name', 'age']);
$this->expectException(\InvalidArgumentException::class);
$this->expectExceptionMessage('Only select("*") is allowed in Realtime queries');
RuntimeQuery::validateSelectQuery($query);
}
public function testValidateSelectQueryWithSingleField(): void
{
$query = Query::select(['name']);
$this->expectException(\InvalidArgumentException::class);
$this->expectExceptionMessage('Only select("*") is allowed in Realtime queries');
RuntimeQuery::validateSelectQuery($query);
}
public function testSelectInAllowedQueries(): void
{
$this->assertContains(Query::TYPE_SELECT, RuntimeQuery::ALLOWED_QUERIES);
}
public function testIsSelectAllWithNonSelectQuery(): void
{
$query = Query::equal('name', ['John']);
$this->assertFalse(RuntimeQuery::isSelectAll($query));
}
public function testValidateSelectQueryWithNonSelectQuery(): void
{
$query = Query::equal('name', ['John']);
// Should not throw for non-select queries
RuntimeQuery::validateSelectQuery($query);
$this->assertTrue(true);
}
// Filter tests with select("*")
public function testFilterWithSelectAllReturnsPayload(): void
{
$query = Query::select(['*']);
$payload = ['name' => 'John', 'age' => 30];
$result = RuntimeQuery::filter([$query], $payload);
$this->assertEquals($payload, $result);
}
public function testFilterWithSelectAllAndOtherQueriesReturnsPayload(): void
{
// If select("*") is present, it should return payload regardless of other queries
$queries = [
Query::select(['*']),
Query::equal('name', ['Jane']), // This would normally fail
];
$payload = ['name' => 'John', 'age' => 30];
$result = RuntimeQuery::filter($queries, $payload);
// select("*") takes precedence - returns payload
$this->assertEquals($payload, $result);
}
public function testFilterWithSelectAllOnEmptyPayload(): void
{
$query = Query::select(['*']);
$payload = [];
$result = RuntimeQuery::filter([$query], $payload);
$this->assertEquals($payload, $result);
}
}