From b785dea7acccb1f801a22f12de87d5251bccf4ae Mon Sep 17 00:00:00 2001 From: ArnabChatterjee20k Date: Wed, 28 Jan 2026 18:40:30 +0530 Subject: [PATCH 01/11] added query per subscription and queryKeys along with the messages --- app/realtime.php | 32 +- src/Appwrite/Messaging/Adapter/Realtime.php | 74 ++-- .../RealtimeCustomClientQueryTest.php | 328 ++++++++++++++++++ 3 files changed, 397 insertions(+), 37 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index eded4d79bc..b22427c239 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -481,25 +481,33 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, } } - $receivers = $realtime->getSubscribers($event); + $receivers = $realtime->getSubscribers($event); // [connectionId => matchedQueryKeys[]] if (App::isDevelopment() && !empty($receivers)) { Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers)); - Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode($receivers)); + Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode(array_keys($receivers))); Console::log("[Debug][Worker {$workerId}] Event: " . $payload); } - $server->send( - $receivers, - json_encode([ - 'type' => 'event', - 'data' => $event['data'] - ]) - ); + $totalMessages = 0; - if (($num = count($receivers)) > 0) { - $register->get('telemetry.messageSentCounter')->add($num); - $stats->incr($event['project'], 'messages', $num); + foreach ($receivers as $connectionId => $matchedQueryKeys) { + $data = $event['data']; + $data['queryKeys'] = $matchedQueryKeys; + + $server->send( + [$connectionId], + json_encode([ + 'type' => 'event', + 'data' => $data + ]) + ); + $totalMessages++; + } + + if ($totalMessages > 0) { + $register->get('telemetry.messageSentCounter')->add($totalMessages); + $stats->incr($event['project'], 'messages', $totalMessages); } }); } catch (Throwable $th) { diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 9e03a7aaf7..35c6e9c710 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -29,13 +29,19 @@ class Realtime extends MessagingAdapter * * [PROJECT_ID] -> * [ROLE_X] -> - * [CHANNEL_NAME_X] -> [CONNECTION_ID] - * [CHANNEL_NAME_Y] -> [CONNECTION_ID] - * [CHANNEL_NAME_Z] -> [CONNECTION_ID] + * [CHANNEL_NAME_X] -> + * [CONNECTION_ID] -> [QUERY_KEY] => true + * [CHANNEL_NAME_Y] -> + * [CONNECTION_ID] -> [QUERY_KEY] => true + * [CHANNEL_NAME_Z] -> + * [CONNECTION_ID] -> [QUERY_KEY] => true * [ROLE_Y] -> - * [CHANNEL_NAME_X] -> [CONNECTION_ID] - * [CHANNEL_NAME_Y] -> [CONNECTION_ID] - * [CHANNEL_NAME_Z] -> [CONNECTION_ID] + * [CHANNEL_NAME_X] -> + * [CONNECTION_ID] -> [QUERY_KEY] => true + * [CHANNEL_NAME_Y] -> + * [CONNECTION_ID] -> [QUERY_KEY] => true + * [CHANNEL_NAME_Z] -> + * [CONNECTION_ID] -> [QUERY_KEY] => true */ public array $subscriptions = []; @@ -63,21 +69,34 @@ class Realtime extends MessagingAdapter $this->subscriptions[$projectId] = []; } + $queryKeys = []; + if (empty($queries)) { + $queryKeys[] = ''; + } else { + foreach ($queries as $query) { + /** @var Query $query */ + $queryKeys[] = $query->toString(); + } + } + foreach ($roles as $role) { if (!isset($this->subscriptions[$projectId][$role])) { // Add user first connection $this->subscriptions[$projectId][$role] = []; } foreach ($channels as $channel => $list) { - $this->subscriptions[$projectId][$role][$channel][$identifier] = true; + if (!isset($this->subscriptions[$projectId][$role][$channel][$identifier])) { + $this->subscriptions[$projectId][$role][$channel][$identifier] = []; + } + foreach ($queryKeys as $queryKey) { + $this->subscriptions[$projectId][$role][$channel][$identifier][$queryKey] = true; + } } } - $this->connections[$identifier] = [ 'projectId' => $projectId, 'roles' => $roles, - 'channels' => $channels, - 'queries' => $queries + 'channels' => $channels ]; } @@ -91,10 +110,11 @@ class Realtime extends MessagingAdapter { $projectId = $this->connections[$connection]['projectId'] ?? ''; $roles = $this->connections[$connection]['roles'] ?? []; + $channels = $this->connections[$connection]['channels'] ?? []; foreach ($roles as $role) { - foreach ($this->subscriptions[$projectId][$role] as $channel => $list) { - unset($this->subscriptions[$projectId][$role][$channel][$connection]); // Remove connection + foreach ($channels as $channel => $list) { + unset($this->subscriptions[$projectId][$role][$channel][$connection]); // dropping connection will drop the queries as well if (empty($this->subscriptions[$projectId][$role][$channel])) { unset($this->subscriptions[$projectId][$role][$channel]); // Remove channel when no connections @@ -130,7 +150,8 @@ class Realtime extends MessagingAdapter return array_key_exists($projectId, $this->subscriptions) && array_key_exists($role, $this->subscriptions[$projectId]) - && array_key_exists($channel, $this->subscriptions[$projectId][$role]); + && array_key_exists($channel, $this->subscriptions[$projectId][$role]) + && !empty($this->subscriptions[$projectId][$role][$channel]); } /** @@ -207,18 +228,21 @@ class Realtime extends MessagingAdapter /** * Saving all connections that are allowed to receive this event. */ - foreach (array_keys($this->subscriptions[$event['project']][$role][$channel]) as $id) { - /** - * To prevent duplicates, we save the connections as array keys. - */ - $queries = $this->connections[$id]['queries'] ?? []; - $payload = $event['data']['payload'] ?? []; - if ( - empty($queries) || - !empty(RuntimeQuery::filter($queries, $payload)) - ) { - $receivers[$id] = 0; + $payload = $event['data']['payload'] ?? []; + foreach ($this->subscriptions[$event['project']][$role][$channel] as $id => $queryMap) { + $matchedQueryKeys = []; + // for representing a all query subscribed channel + if (isset($queryMap[''])) { + $matchedQueryKeys[] = ''; + } else { + foreach (array_keys($queryMap) as $queryKey) { + $parsed = Query::parseQueries([$queryKey]); + if (!empty(RuntimeQuery::filter($parsed, $payload))) { + $matchedQueryKeys[] = $queryKey; + } + } } + $receivers[$id] = $matchedQueryKeys; } break; } @@ -226,7 +250,7 @@ class Realtime extends MessagingAdapter } } - return array_keys($receivers); + return $receivers; } /** diff --git a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php index 068736561e..f69ef187e5 100644 --- a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php +++ b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php @@ -1544,4 +1544,332 @@ class RealtimeCustomClientQueryTest extends Scope str_contains($response['data']['message'], 'endsWith') ); } + + public function testQueryKeys() + { + $user = $this->getUser(); + $session = $user['session'] ?? ''; + $projectId = $this->getProject()['$id']; + + // Setup database and collection + $database = $this->client->call(Client::METHOD_POST, '/databases', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'databaseId' => ID::unique(), + 'name' => 'Query Keys Test DB', + ]); + $databaseId = $database['body']['$id']; + + $collection = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'collectionId' => ID::unique(), + 'name' => 'Query Keys Collection', + 'permissions' => [ + Permission::create(Role::user($user['$id'])), + ], + 'documentSecurity' => true, + ]); + $collectionId = $collection['body']['$id']; + + // Attributes used by queries + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/attributes/string', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'key' => 'status', + 'size' => 256, + 'required' => false, + ]); + + sleep(2); + + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/attributes/string', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'key' => 'category', + 'size' => 256, + 'required' => false, + ]); + + sleep(2); + + $queryStatusActive = Query::equal('status', ['active'])->toString(); + $queryStatusPending = Query::equal('status', ['pending'])->toString(); + $queryComplex = Query::and([ + Query::equal('status', ['active']), + Query::equal('category', ['gold']), + ])->toString(); + + // Subscribe with no queries -> should receive all events, queryKeys = [''] + $clientAll = $this->getWebsocket(['documents'], [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + ]); + + // Subscribe with query1 (status == active) + $clientQ1 = $this->getWebsocket(['documents'], [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + ], null, [ + $queryStatusActive, + ]); + + // Subscribe with query2 (status == pending) + $clientQ2 = $this->getWebsocket(['documents'], [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + ], null, [ + $queryStatusPending, + ]); + + // Subscribe with complex query (status == active AND category == gold) + $clientComplex = $this->getWebsocket(['documents'], [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + ], null, [ + $queryComplex, + ]); + + // All clients should be connected + foreach ([$clientAll, $clientQ1, $clientQ2, $clientComplex] as $client) { + $response = json_decode($client->receive(), true); + $this->assertEquals('connected', $response['type']); + } + + // 1) Create active/gold document -> should match Q1 and complex, and be seen by all + $docActiveGoldId = ID::unique(); + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'documentId' => $docActiveGoldId, + 'data' => [ + 'status' => 'active', + 'category' => 'gold', + ], + 'permissions' => [ + Permission::read(Role::any()), + ], + ]); + + // clientAll: should receive event, queryKeys = [''] + $eventAll = json_decode($clientAll->receive(), true); + $this->assertEquals('event', $eventAll['type']); + $this->assertEquals($docActiveGoldId, $eventAll['data']['payload']['$id']); + $this->assertArrayHasKey('queryKeys', $eventAll['data']); + $this->assertIsArray($eventAll['data']['queryKeys']); + $this->assertEquals([''], $eventAll['data']['queryKeys']); + + // clientQ1: should receive event, queryKeys contains queryStatusActive + $eventQ1 = json_decode($clientQ1->receive(), true); + $this->assertEquals('event', $eventQ1['type']); + $this->assertEquals($docActiveGoldId, $eventQ1['data']['payload']['$id']); + $this->assertContains($queryStatusActive, $eventQ1['data']['queryKeys']); + + // clientQ2: should NOT receive event (status is active, not pending) + try { + $clientQ2->receive(); + $this->fail('Expected TimeoutException - event should be filtered for clientQ2 (active document)'); + } catch (TimeoutException $e) { + $this->assertTrue(true); + } + + // clientComplex: should receive event, queryKeys contains queryComplex + $eventComplex = json_decode($clientComplex->receive(), true); + $this->assertEquals('event', $eventComplex['type']); + $this->assertEquals($docActiveGoldId, $eventComplex['data']['payload']['$id']); + $this->assertContains($queryComplex, $eventComplex['data']['queryKeys']); + + // 2) Create pending/silver document -> should match Q2 only, and be seen by all + $docPendingSilverId = ID::unique(); + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'documentId' => $docPendingSilverId, + 'data' => [ + 'status' => 'pending', + 'category' => 'silver', + ], + 'permissions' => [ + Permission::read(Role::any()), + ], + ]); + + // clientAll: should receive event, queryKeys = [''] + $eventAll2 = json_decode($clientAll->receive(), true); + $this->assertEquals('event', $eventAll2['type']); + $this->assertEquals($docPendingSilverId, $eventAll2['data']['payload']['$id']); + $this->assertArrayHasKey('queryKeys', $eventAll2['data']); + $this->assertIsArray($eventAll2['data']['queryKeys']); + $this->assertEquals([''], $eventAll2['data']['queryKeys']); + + // clientQ1: should NOT receive event (status is pending) + try { + $clientQ1->receive(); + $this->fail('Expected TimeoutException - event should be filtered for clientQ1 (pending document)'); + } catch (TimeoutException $e) { + $this->assertTrue(true); + } + + // clientQ2: should receive event, queryKeys contains queryStatusPending + $eventQ2 = json_decode($clientQ2->receive(), true); + $this->assertEquals('event', $eventQ2['type']); + $this->assertEquals($docPendingSilverId, $eventQ2['data']['payload']['$id']); + $this->assertContains($queryStatusPending, $eventQ2['data']['queryKeys']); + + // clientComplex: should NOT receive event (status is pending, category silver) + try { + $clientComplex->receive(); + $this->fail('Expected TimeoutException - event should be filtered for complex subscription (pending document)'); + } catch (TimeoutException $e) { + $this->assertTrue(true); + } + + $clientAll->close(); + $clientQ1->close(); + $clientQ2->close(); + $clientComplex->close(); + } + + /** + * Ensure two separate subscriptions with different query keys + * only see their own matching events and expose the correct + * queryKey in queryKeys. + */ + public function testMultipleSubscriptionsDifferentQueryKeys() + { + $user = $this->getUser(); + $session = $user['session'] ?? ''; + $projectId = $this->getProject()['$id']; + + // Setup database and collection + $database = $this->client->call(Client::METHOD_POST, '/databases', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'databaseId' => ID::unique(), + 'name' => 'Multiple Query Keys Test DB', + ]); + $databaseId = $database['body']['$id']; + + $collection = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'collectionId' => ID::unique(), + 'name' => 'Multiple Query Keys Collection', + 'permissions' => [ + Permission::create(Role::user($user['$id'])), + ], + 'documentSecurity' => true, + ]); + $collectionId = $collection['body']['$id']; + + // Attribute used by queries + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/attributes/string', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'key' => 'status', + 'size' => 256, + 'required' => false, + ]); + + sleep(2); + + $queryStatusActive = Query::equal('status', ['active'])->toString(); + $queryStatusPending = Query::equal('status', ['pending'])->toString(); + + // Two subscriptions on the same channel with different query keys + $clientQ1 = $this->getWebsocket(['documents'], [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + ], null, [ + $queryStatusActive, + ]); + + $clientQ2 = $this->getWebsocket(['documents'], [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + ], null, [ + $queryStatusPending, + ]); + + // Both should connect + $response = json_decode($clientQ1->receive(), true); + $this->assertEquals('connected', $response['type']); + $response = json_decode($clientQ2->receive(), true); + $this->assertEquals('connected', $response['type']); + + // 1) active document -> only queryStatusActive subscription should see it + $docActiveId = ID::unique(); + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'documentId' => $docActiveId, + 'data' => [ + 'status' => 'active', + ], + 'permissions' => [ + Permission::read(Role::any()), + ], + ]); + + $eventQ1 = json_decode($clientQ1->receive(), true); + $this->assertEquals('event', $eventQ1['type']); + $this->assertEquals($docActiveId, $eventQ1['data']['payload']['$id']); + $this->assertArrayHasKey('queryKeys', $eventQ1['data']); + $this->assertContains($queryStatusActive, $eventQ1['data']['queryKeys']); + + try { + $clientQ2->receive(); + $this->fail('Expected TimeoutException - clientQ2 should not receive active document'); + } catch (TimeoutException $e) { + $this->assertTrue(true); + } + + // 2) pending document -> only queryStatusPending subscription should see it + $docPendingId = ID::unique(); + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'documentId' => $docPendingId, + 'data' => [ + 'status' => 'pending', + ], + 'permissions' => [ + Permission::read(Role::any()), + ], + ]); + + $eventQ2 = json_decode($clientQ2->receive(), true); + $this->assertEquals('event', $eventQ2['type']); + $this->assertEquals($docPendingId, $eventQ2['data']['payload']['$id']); + $this->assertArrayHasKey('queryKeys', $eventQ2['data']); + $this->assertContains($queryStatusPending, $eventQ2['data']['queryKeys']); + + try { + $clientQ1->receive(); + $this->fail('Expected TimeoutException - clientQ1 should not receive pending document'); + } catch (TimeoutException $e) { + $this->assertTrue(true); + } + + $clientQ1->close(); + $clientQ2->close(); + } } From cbe13d71f1146cdc0dd47043ea731a6d0d76d445 Mon Sep 17 00:00:00 2001 From: ArnabChatterjee20k Date: Wed, 28 Jan 2026 19:04:42 +0530 Subject: [PATCH 02/11] removed redundant key --- src/Appwrite/Messaging/Adapter/Realtime.php | 19 ++++++++++--------- .../RealtimeCustomClientQueryTest.php | 10 +++++----- 2 files changed, 15 insertions(+), 14 deletions(-) diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 35c6e9c710..6dbfa7c7dc 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -231,18 +231,19 @@ class Realtime extends MessagingAdapter $payload = $event['data']['payload'] ?? []; foreach ($this->subscriptions[$event['project']][$role][$channel] as $id => $queryMap) { $matchedQueryKeys = []; - // for representing a all query subscribed channel if (isset($queryMap[''])) { - $matchedQueryKeys[] = ''; - } else { - foreach (array_keys($queryMap) as $queryKey) { - $parsed = Query::parseQueries([$queryKey]); - if (!empty(RuntimeQuery::filter($parsed, $payload))) { - $matchedQueryKeys[] = $queryKey; - } + $receivers[$id] = $matchedQueryKeys; + continue; + } + foreach (array_keys($queryMap) as $queryKey) { + $parsed = Query::parseQueries([$queryKey]); + if (!empty(RuntimeQuery::filter($parsed, $payload))) { + $matchedQueryKeys[] = $queryKey; } } - $receivers[$id] = $matchedQueryKeys; + if (!empty($matchedQueryKeys)) { + $receivers[$id] = $matchedQueryKeys; + } } break; } diff --git a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php index f69ef187e5..72a589062b 100644 --- a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php +++ b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php @@ -1608,7 +1608,7 @@ class RealtimeCustomClientQueryTest extends Scope Query::equal('category', ['gold']), ])->toString(); - // Subscribe with no queries -> should receive all events, queryKeys = [''] + // Subscribe with no queries -> should receive all events, queryKeys = [] $clientAll = $this->getWebsocket(['documents'], [ 'origin' => 'http://localhost', 'cookie' => 'a_session_' . $projectId . '=' . $session, @@ -1660,13 +1660,13 @@ class RealtimeCustomClientQueryTest extends Scope ], ]); - // clientAll: should receive event, queryKeys = [''] + // clientAll: should receive event, queryKeys = [] $eventAll = json_decode($clientAll->receive(), true); $this->assertEquals('event', $eventAll['type']); $this->assertEquals($docActiveGoldId, $eventAll['data']['payload']['$id']); $this->assertArrayHasKey('queryKeys', $eventAll['data']); $this->assertIsArray($eventAll['data']['queryKeys']); - $this->assertEquals([''], $eventAll['data']['queryKeys']); + $this->assertCount(0, $eventAll['data']['queryKeys']); // clientQ1: should receive event, queryKeys contains queryStatusActive $eventQ1 = json_decode($clientQ1->receive(), true); @@ -1704,13 +1704,13 @@ class RealtimeCustomClientQueryTest extends Scope ], ]); - // clientAll: should receive event, queryKeys = [''] + // clientAll: should receive event, queryKeys = [] $eventAll2 = json_decode($clientAll->receive(), true); $this->assertEquals('event', $eventAll2['type']); $this->assertEquals($docPendingSilverId, $eventAll2['data']['payload']['$id']); $this->assertArrayHasKey('queryKeys', $eventAll2['data']); $this->assertIsArray($eventAll2['data']['queryKeys']); - $this->assertEquals([''], $eventAll2['data']['queryKeys']); + $this->assertCount(0, $eventAll2['data']['queryKeys']); // clientQ1: should NOT receive event (status is pending) try { From 64392c1520312dafc7f15d19b5ba95f3b01185b0 Mon Sep 17 00:00:00 2001 From: ArnabChatterjee20k Date: Thu, 29 Jan 2026 11:38:20 +0530 Subject: [PATCH 03/11] fixed failing tests --- app/realtime.php | 3 +- .../unit/Messaging/MessagingChannelsTest.php | 16 ++++++----- tests/unit/Messaging/MessagingGuestTest.php | 28 +++++++++---------- tests/unit/Messaging/MessagingTest.php | 26 ++++++++--------- 4 files changed, 37 insertions(+), 36 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index b22427c239..676d530fee 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -746,8 +746,7 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re // Preserve authorization before subscribe overwrites the connection array $authorization = $realtime->connections[$connection]['authorization'] ?? null; - $queries = $realtime->connections[$connection]['queries']; - $realtime->subscribe($realtime->connections[$connection]['projectId'], $connection, $roles, $channels, $queries); + $realtime->subscribe($realtime->connections[$connection]['projectId'], $connection, $roles, $channels); // Restore authorization after subscribe if ($authorization !== null) { diff --git a/tests/unit/Messaging/MessagingChannelsTest.php b/tests/unit/Messaging/MessagingChannelsTest.php index 7df5b8d1e6..fc3fa802ad 100644 --- a/tests/unit/Messaging/MessagingChannelsTest.php +++ b/tests/unit/Messaging/MessagingChannelsTest.php @@ -194,11 +194,12 @@ class MessagingChannelsTest extends TestCase */ $this->assertCount($this->connectionsTotal / count($this->allChannels), $receivers, $channel); - foreach ($receivers as $receiver) { + foreach ($receivers as $receiverId => $queryKeys) { /** * Making sure the right clients receive the event. */ - $this->assertStringEndsWith($index, $receiver); + $this->assertStringEndsWith($index, $receiverId); + $this->assertIsArray($queryKeys); } } } @@ -230,11 +231,12 @@ class MessagingChannelsTest extends TestCase */ $this->assertCount($this->connectionsPerChannel, $receivers, $channel); - foreach ($receivers as $receiver) { + foreach ($receivers as $receiverId => $queryKeys) { /** * Making sure the right clients receive the event. */ - $this->assertStringEndsWith($index, $receiver); + $this->assertStringEndsWith($index, $receiverId); + $this->assertIsArray($queryKeys); } } } @@ -257,7 +259,7 @@ class MessagingChannelsTest extends TestCase ] ]; - $receivers = $this->realtime->getSubscribers($event); + $receivers = array_keys($this->realtime->getSubscribers($event)); /** * Every Client subscribed to a Channel should receive this event. @@ -292,7 +294,7 @@ class MessagingChannelsTest extends TestCase ] ]; - $receivers = $this->realtime->getSubscribers($event); + $receivers = array_keys($this->realtime->getSubscribers($event)); /** * Every Team Member should receive this event. @@ -325,7 +327,7 @@ class MessagingChannelsTest extends TestCase ] ]; - $receivers = $this->realtime->getSubscribers($event); + $receivers = array_keys($this->realtime->getSubscribers($event)); /** * Only 1 Team Member of a role should have access to a specific channel. diff --git a/tests/unit/Messaging/MessagingGuestTest.php b/tests/unit/Messaging/MessagingGuestTest.php index 1aaa1febca..f6e2cb67b3 100644 --- a/tests/unit/Messaging/MessagingGuestTest.php +++ b/tests/unit/Messaging/MessagingGuestTest.php @@ -31,89 +31,89 @@ class MessagingGuestTest extends TestCase ] ]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::guests()->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::users()->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::user(ID::custom('123'))->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::team(ID::custom('abc'))->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::team(ID::custom('abc'), 'administrator')->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::team(ID::custom('abc'), 'god')->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::team(ID::custom('def'))->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::team(ID::custom('def'), 'guest')->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::user(ID::custom('456'))->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::team(ID::custom('def'), 'member')->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::any()->toString()]; $event['data']['channels'] = ['documents.123']; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['data']['channels'] = ['documents.789']; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['project'] = '2'; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); diff --git a/tests/unit/Messaging/MessagingTest.php b/tests/unit/Messaging/MessagingTest.php index c2b6490869..dfcb7f2fff 100644 --- a/tests/unit/Messaging/MessagingTest.php +++ b/tests/unit/Messaging/MessagingTest.php @@ -48,89 +48,89 @@ class MessagingTest extends TestCase ] ]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::users()->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::user(ID::custom('123'))->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::team(ID::custom('abc'))->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::team(ID::custom('abc'), 'administrator')->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::team(ID::custom('abc'), 'moderator')->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::team(ID::custom('def'))->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::team(ID::custom('def'), 'guest')->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['roles'] = [Role::user(ID::custom('456'))->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::team(ID::custom('def'), 'member')->toString()]; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['roles'] = [Role::any()->toString()]; $event['data']['channels'] = ['documents.123']; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); $event['data']['channels'] = ['documents.789']; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertCount(1, $receivers); $this->assertEquals(1, $receivers[0]); $event['project'] = '2'; - $receivers = $realtime->getSubscribers($event); + $receivers = array_keys($realtime->getSubscribers($event)); $this->assertEmpty($receivers); From 077068e9fcf53935f065f3cd84d16eb6995e3025 Mon Sep 17 00:00:00 2001 From: ArnabChatterjee20k Date: Thu, 29 Jan 2026 14:20:05 +0530 Subject: [PATCH 04/11] updated tests --- app/realtime.php | 1 + .../RealtimeCustomClientQueryTest.php | 48 +++++++++++++------ 2 files changed, 34 insertions(+), 15 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index 676d530fee..6479f64a13 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -486,6 +486,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, if (App::isDevelopment() && !empty($receivers)) { Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers)); Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode(array_keys($receivers))); + Console::log("[Debug][Worker {$workerId}] QueryKeys: " . array_values($receivers)); Console::log("[Debug][Worker {$workerId}] Event: " . $payload); } diff --git a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php index 72a589062b..2bddc27bfc 100644 --- a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php +++ b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php @@ -1383,6 +1383,30 @@ class RealtimeCustomClientQueryTest extends Scope $this->assertEquals($targetDocId, $event['data']['payload']['$id']); $this->assertEquals('active', $event['data']['payload']['status']); + // Create document matching NEITHER query - should not receive event + // keeping it here as below are the documents created with status=>active + // so it will also receive it but the querykey can be used to distinction + $anotherDocId = ID::unique(); + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'documentId' => $anotherDocId, + 'data' => [ + 'status' => 'inactive' + ], + 'permissions' => [ + Permission::read(Role::any()), + ], + ]); + + try { + $client->receive(); + $this->fail('Expected TimeoutException - event should be filtered (neither query matches)'); + } catch (TimeoutException $e) { + $this->assertTrue(true); + } + // Create document with matching ID but wrong status - should NOT receive event (only one query matches) $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ 'content-type' => 'application/json', @@ -1404,7 +1428,7 @@ class RealtimeCustomClientQueryTest extends Scope $this->assertTrue(true); } - // Create document with matching status but wrong ID - should NOT receive event (only one query matches) + // Create document with matching status but wrong ID - should receive event but the queryKeys should be only status matching as the model is subscription based similar to channels(only one query matches) $otherDocId = ID::unique(); $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ 'content-type' => 'application/json', @@ -1419,14 +1443,9 @@ class RealtimeCustomClientQueryTest extends Scope ], ]); - try { - $client->receive(); - $this->fail('Expected TimeoutException - event should be filtered (status matches but ID does not)'); - } catch (TimeoutException $e) { - $this->assertTrue(true); - } - - // Create document matching NEITHER query - should NOT receive event + // Create document matching NEITHER query + // above document created with status=>active + // so it will also receive it but the querykey can be used to distinction $anotherDocId = ID::unique(); $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ 'content-type' => 'application/json', @@ -1441,12 +1460,11 @@ class RealtimeCustomClientQueryTest extends Scope ], ]); - try { - $client->receive(); - $this->fail('Expected TimeoutException - event should be filtered (neither query matches)'); - } catch (TimeoutException $e) { - $this->assertTrue(true); - } + $data = json_decode($client->receive(), true); + $this->assertIsArray($data['data']['queryKeys']); + $this->assertEquals(1, count($data['data']['queryKeys'])); + $this->assertNotContains(Query::equal('status', ['inactive'])->toString(), $data['data']['queryKeys']); + $this->assertContains(Query::equal('status', ['active'])->toString(), $data['data']['queryKeys']); $client->close(); } From 6e576b51406a45f73097d3f1fe6a838a4dda57e5 Mon Sep 17 00:00:00 2001 From: ArnabChatterjee20k Date: Mon, 2 Feb 2026 19:45:21 +0530 Subject: [PATCH 05/11] added realtime query new channel and channel wise filters --- app/realtime.php | 82 +++++++++-- src/Appwrite/Messaging/Adapter/Realtime.php | 131 +++++++++++------- src/Appwrite/Utopia/Database/RuntimeQuery.php | 45 +++++- tests/e2e/Services/Realtime/RealtimeBase.php | 27 +++- .../RealtimeCustomClientQueryTest.php | 104 +++++++------- .../Database/Query/RuntimeQueryTest.php | 92 ++++++++++++ 6 files changed, 358 insertions(+), 123 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index 6479f64a13..cd57fbd686 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -6,6 +6,7 @@ use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Network\Validator\Origin; use Appwrite\PubSub\Adapter\Pool as PubSubPool; use Appwrite\Utopia\Database\Documents\User; +use Appwrite\Utopia\Database\RuntimeQuery; use Appwrite\Utopia\Request; use Appwrite\Utopia\Response; use Swoole\Coroutine; @@ -431,7 +432,12 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, ] ]; - $server->send($realtime->getSubscribers($event), json_encode([ + $subscribers = $realtime->getSubscribers($event); + $connections = array_keys($subscribers); + $queries = array_values($subscribers); + $event['data']['queries'] = $queries; + + $server->send($connections, json_encode([ 'type' => 'event', 'data' => $event['data'] ])); @@ -474,10 +480,24 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $roles = $user->getRoles($database->getAuthorization()); $channels = $realtime->connections[$connection]['channels']; - $queries = $realtime->connections[$connection]['queries'] ?? []; + $queries = []; + $authorization = $realtime->connections[$connection]['authorization'] ?? null; $realtime->unsubscribe($connection); - $realtime->subscribe($projectId, $connection, $roles, $channels, $queries); + + // Re-subscribe with the same queries for each channel + foreach ($channels as $channel => $identifierKey) { + $channelQueries = $queries[$channel] ?? [[Query::select(['*'])->toString()]]; + foreach ($channelQueries as $subscription) { + $currentChannelQueries = Realtime::convertQueries($subscription); + $realtime->subscribe($projectId, $connection, $roles, [$channel => $identifierKey], $currentChannelQueries); + } + } + + // Restore authorization and queries after subscribe + if ($authorization !== null) { + $realtime->connections[$connection]['authorization'] = $authorization; + } } } @@ -494,7 +514,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, foreach ($receivers as $connectionId => $matchedQueryKeys) { $data = $event['data']; - $data['queryKeys'] = $matchedQueryKeys; + $data['queries'] = $matchedQueryKeys; $server->send( [$connectionId], @@ -589,11 +609,6 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $roles = $user->getRoles($authorization); $channels = Realtime::convertChannels($request->getQuery('channels', []), $user->getId()); - try { - $queries = Realtime::convertQueries($request->getQuery('queries', [])); - } catch (QueryException $e) { - throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $e->getMessage()); - } /** * Channels Check @@ -602,9 +617,39 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, throw new Exception(Exception::REALTIME_POLICY_VIOLATION, 'Missing channels'); } - $realtime->subscribe($project->getId(), $connection, $roles, $channels, $queries); + $allQueries = []; + foreach ($channels as $channel => $identifierKey) { + // Get the channel's query parameter (array of subscriptions) + // Format: {channel}[0][]=select("*") (all events), {channel}[1][]={query1}&{channel}[1][]={query2} + $channelSubscriptions = $request->getQuery($channel, null); + + // Backward compatibility: if no channel-specific query params, treat as single subscription with select("*") + if ($channelSubscriptions === null) { + $channelSubscriptions = [[Query::select(['*'])->toString()]]; + } + + // Ensure it's an array + if (!is_array($channelSubscriptions)) { + $channelSubscriptions = [$channelSubscriptions]; + } + + $channelQueries = []; + foreach ($channelSubscriptions as $subscriptionIndex => $subscription) { + try { + // Parse and validate the queries + $currentChannelQueries = Realtime::convertQueries($subscription); + + $realtime->subscribe($project->getId(), $connection, $roles, [$channel => $identifierKey], $currentChannelQueries); + $channelQueries[] = $subscription; + } catch (QueryException $e) { + throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $e->getMessage()); + } + } + $allQueries[$channel] = $channelQueries; + } $realtime->connections[$connection]['authorization'] = $authorization; + $realtime->connections[$connection]['queries'] = $allQueries; $user = empty($user->getId()) ? null : $response->output($user, Response::MODEL_ACCOUNT); @@ -612,7 +657,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, 'type' => 'connected', 'data' => [ 'channels' => array_keys($channels), - 'queries' => $queries, + 'queries' => $allQueries, 'user' => $user ] ])); @@ -744,15 +789,24 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re $roles = $user->getRoles($database->getAuthorization()); $channels = Realtime::convertChannels(array_flip($realtime->connections[$connection]['channels']), $user->getId()); - // Preserve authorization before subscribe overwrites the connection array + // Preserve authorization and queries before subscribe overwrites the connection array $authorization = $realtime->connections[$connection]['authorization'] ?? null; + $queries = $realtime->connections[$connection]['queries'] ?? []; - $realtime->subscribe($realtime->connections[$connection]['projectId'], $connection, $roles, $channels); + // Re-subscribe with the same queries for each channel + foreach ($channels as $channel => $identifierKey) { + $channelQueries = $queries[$channel] ?? [[Query::select(['*'])->toString()]]; + foreach ($channelQueries as $subscription) { + $currentChannelQueries = Realtime::convertQueries($subscription); + $realtime->subscribe($realtime->connections[$connection]['projectId'], $connection, $roles, [$channel => $identifierKey], $currentChannelQueries); + } + } - // Restore authorization after subscribe + // Restore authorization and queries after subscribe if ($authorization !== null) { $realtime->connections[$connection]['authorization'] = $authorization; } + $realtime->connections[$connection]['queries'] = $queries; $user = $response->output($user, Response::MODEL_ACCOUNT); $server->send([$connection], json_encode([ diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 6dbfa7c7dc..5aa39dde65 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -30,18 +30,15 @@ class Realtime extends MessagingAdapter * [PROJECT_ID] -> * [ROLE_X] -> * [CHANNEL_NAME_X] -> - * [CONNECTION_ID] -> [QUERY_KEY] => true - * [CHANNEL_NAME_Y] -> - * [CONNECTION_ID] -> [QUERY_KEY] => true - * [CHANNEL_NAME_Z] -> - * [CONNECTION_ID] -> [QUERY_KEY] => true - * [ROLE_Y] -> - * [CHANNEL_NAME_X] -> - * [CONNECTION_ID] -> [QUERY_KEY] => true - * [CHANNEL_NAME_Y] -> - * [CONNECTION_ID] -> [QUERY_KEY] => true - * [CHANNEL_NAME_Z] -> - * [CONNECTION_ID] -> [QUERY_KEY] => true + * [CONNECTION_ID] -> [ + * [query1, query2], // Subscription group 0 - AND logic within group + * [query3], // Subscription group 1 + * [query4, query5], // Subscription group 2 - OR logic across groups + * ] + * + * Each subscription group is an array of query strings. + * Within a group: AND logic (all queries must match) + * Across groups: OR logic (any group matching = send event) */ public array $subscriptions = []; @@ -54,28 +51,30 @@ class Realtime extends MessagingAdapter } /** - * Adds a subscription. + * Adds a subscription group. * * @param string $projectId - * @param mixed $identifier - * @param array $roles - * @param array $channels - * @param array $queries + * @param mixed $identifier Connection ID + * @param array $roles User roles + * @param array $channels Channels to subscribe to + * @param array $queryGroup Array of Query objects for this subscription group (AND logic within group) * @return void */ - public function subscribe(string $projectId, mixed $identifier, array $roles, array $channels, array $queries = []): void + public function subscribe(string $projectId, mixed $identifier, array $roles, array $channels, array $queryGroup = []): void { if (!isset($this->subscriptions[$projectId])) { // Init Project $this->subscriptions[$projectId] = []; } - $queryKeys = []; - if (empty($queries)) { - $queryKeys[] = ''; + // Convert Query objects to strings for this subscription group + $queryStrings = []; + if (empty($queryGroup)) { + // No queries means "listen to all events" - use select("*") + $queryStrings[] = Query::select(['*'])->toString(); } else { - foreach ($queries as $query) { + foreach ($queryGroup as $query) { /** @var Query $query */ - $queryKeys[] = $query->toString(); + $queryStrings[] = $query->toString(); } } @@ -88,16 +87,23 @@ class Realtime extends MessagingAdapter if (!isset($this->subscriptions[$projectId][$role][$channel][$identifier])) { $this->subscriptions[$projectId][$role][$channel][$identifier] = []; } - foreach ($queryKeys as $queryKey) { - $this->subscriptions[$projectId][$role][$channel][$identifier][$queryKey] = true; - } + // Add this query group as a new subscription group (array of query strings) + $this->subscriptions[$projectId][$role][$channel][$identifier][] = $queryStrings; } } - $this->connections[$identifier] = [ - 'projectId' => $projectId, - 'roles' => $roles, - 'channels' => $channels - ]; + + // Keep a complete view of channels for unsubscribe(), even if subscribe() is called repeatedly + if (!isset($this->connections[$identifier])) { + $this->connections[$identifier] = [ + 'projectId' => $projectId, + 'roles' => $roles, + 'channels' => $channels + ]; + } else { + $this->connections[$identifier]['projectId'] = $projectId; + $this->connections[$identifier]['roles'] = $roles; + $this->connections[$identifier]['channels'] = \array_merge($this->connections[$identifier]['channels'], $channels); + } } /** @@ -200,11 +206,10 @@ class Realtime extends MessagingAdapter * - 1,121.328 ms (±0.84%) | 1,000,000 Connections / 10,000,000 Subscriptions * * @param array $event - * @return int[]|string[] + * @return array Map of connection IDs to matched query groups */ public function getSubscribers(array $event): array { - $receivers = []; /** * Check if project has subscriber. @@ -229,20 +234,33 @@ class Realtime extends MessagingAdapter * Saving all connections that are allowed to receive this event. */ $payload = $event['data']['payload'] ?? []; - foreach ($this->subscriptions[$event['project']][$role][$channel] as $id => $queryMap) { - $matchedQueryKeys = []; - if (isset($queryMap[''])) { - $receivers[$id] = $matchedQueryKeys; - continue; - } - foreach (array_keys($queryMap) as $queryKey) { - $parsed = Query::parseQueries([$queryKey]); - if (!empty(RuntimeQuery::filter($parsed, $payload))) { - $matchedQueryKeys[] = $queryKey; + foreach ($this->subscriptions[$event['project']][$role][$channel] as $id => $subscriptionGroups) { + $matchedGroups = []; + + // Process each subscription group (OR logic across groups) + foreach ($subscriptionGroups as $queryGroup) { + // Parse all queries in this group + $parsedQueries = []; + foreach ($queryGroup as $queryString) { + $parsed = Query::parseQueries([$queryString]); + $parsedQueries = array_merge($parsedQueries, $parsed); + } + + // Check if this group matches (AND logic within group) + // RuntimeQuery::filter handles select("*") - returns payload if present + if (!empty(RuntimeQuery::filter($parsedQueries, $payload))) { + // This group matched - add it to matched groups + $matchedGroups[] = $queryGroup; } } - if (!empty($matchedQueryKeys)) { - $receivers[$id] = $matchedQueryKeys; + + // Only add to receivers if at least one group matched + if (!empty($matchedGroups)) { + if (!isset($receivers[$id])) { + $receivers[$id] = []; + } + // Store matched groups (each group is an array of query strings) + $receivers[$id] = array_merge($receivers[$id], $matchedGroups); } } break; @@ -251,6 +269,16 @@ class Realtime extends MessagingAdapter } } + // De-duplicate groups per connection (same connection can match via multiple roles) + foreach ($receivers as $id => $groups) { + $unique = []; + foreach ($groups as $group) { + $key = \json_encode($group); + $unique[$key] = $group; + } + $receivers[$id] = \array_values($unique); + } + return $receivers; } @@ -284,16 +312,17 @@ class Realtime extends MessagingAdapter /** * Converts the queries from the Query Params into an array. - * @param array $queries + * @param array|string $queries * @return array + * @throws QueryException */ - public static function convertQueries(array $queries): array + public static function convertQueries(mixed $queries): array { $queries = Query::parseQueries($queries); $stack = $queries; $allowedMethods = implode(', ', RuntimeQuery::ALLOWED_QUERIES); while (!empty($stack)) { - /** `@var` Query $query */ + /** @var Query $query */ $query = array_pop($stack); $method = $query->getMethod(); if (!in_array($method, RuntimeQuery::ALLOWED_QUERIES, true)) { @@ -302,6 +331,12 @@ class Realtime extends MessagingAdapter "Query method '{$unsupportedMethod}' is not supported in Realtime queries. Allowed query methods are: {$allowedMethods}" ); } + + // Validate select queries - only select("*") is allowed + if ($method === Query::TYPE_SELECT) { + RuntimeQuery::validateSelectQuery($query); + } + if (in_array($method, [Query::TYPE_AND, Query::TYPE_OR], true)) { $stack = array_merge($stack, $query->getValues()); } diff --git a/src/Appwrite/Utopia/Database/RuntimeQuery.php b/src/Appwrite/Utopia/Database/RuntimeQuery.php index 025d424768..f959e9b573 100644 --- a/src/Appwrite/Utopia/Database/RuntimeQuery.php +++ b/src/Appwrite/Utopia/Database/RuntimeQuery.php @@ -21,9 +21,44 @@ class RuntimeQuery extends Query // Recursive checks Query::TYPE_AND, - Query::TYPE_OR + Query::TYPE_OR, + + // Special: select("*") means "listen to all events" + Query::TYPE_SELECT ]; + /** + * Checks if a query is select("*") which means "listen to all events" + * + * @param Query $query + * @return bool + */ + public static function isSelectAll(Query $query): bool + { + return $query->getMethod() === Query::TYPE_SELECT + && count($query->getValues()) === 1 + && $query->getValues()[0] === '*'; + } + + /** + * Validates a select query - only select("*") is allowed in Realtime + * + * @param Query $query + * @throws \InvalidArgumentException + */ + public static function validateSelectQuery(Query $query): void + { + if ($query->getMethod() !== Query::TYPE_SELECT) { + return; + } + + if (!self::isSelectAll($query)) { + throw new \InvalidArgumentException( + 'Only select("*") is allowed in Realtime queries. select("*") means "listen to all events".' + ); + } + } + /** * @param array $queries * @param array $payload @@ -33,6 +68,14 @@ class RuntimeQuery extends Query if (empty($queries)) { return $payload; } + + // Check if select("*") is present - if so, return payload (match all) + foreach ($queries as $query) { + if (self::isSelectAll($query)) { + return $payload; + } + } + // multiple queries follows and condition foreach ($queries as $query) { if (!self::evaluateFilter($query, $payload)) { diff --git a/tests/e2e/Services/Realtime/RealtimeBase.php b/tests/e2e/Services/Realtime/RealtimeBase.php index ea5c3d710f..46eb7e660b 100644 --- a/tests/e2e/Services/Realtime/RealtimeBase.php +++ b/tests/e2e/Services/Realtime/RealtimeBase.php @@ -11,7 +11,7 @@ trait RealtimeBase array $channels = [], array $headers = [], string $projectId = null, - array $queries = [] + ?array $queries = null ): WebSocketClient { if (is_null($projectId)) { $projectId = $this->getProject()['$id']; @@ -19,10 +19,31 @@ trait RealtimeBase $query = [ "project" => $projectId, - "channels" => $channels, - "queries" => $queries + "channels" => $channels ]; + /** + * Query param encoding rules: + * - $queries === null -> only send channels (no per-channel query params) for backward compatibility. + * - $queries === [] -> explicit "select all" subscription: send Query::select(['*']) as a single group. + * - non-empty $queries -> treat as a single subscription group for the first channel: + * AND logic within the group; OR logic across multiple groups (if we ever add them). + * + * For now all E2E tests subscribe to a single channel, so we map queries to $channels[0]. + */ + + if ($queries !== null && !empty($channels)) { + $channel = $channels[0]; + + if ($queries === []) { + // Explicit select("*") group + $query[$channel][0] = [\Utopia\Database\Query::select(['*'])->toString()]; + } else { + // Single subscription group for this channel + $query[$channel][0] = $queries; + } + } + return new WebSocketClient( "ws://appwrite.test/v1/realtime?" . http_build_query($query), [ diff --git a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php index 2bddc27bfc..e2864730aa 100644 --- a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php +++ b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php @@ -1352,7 +1352,7 @@ class RealtimeCustomClientQueryTest extends Scope $targetDocId = ID::unique(); - // Subscribe with multiple queries (AND logic - ALL queries must match for event to be received) + // Subscribe with multiple 'queries' (AND logic - ALL 'queries' must match for event to be received) $client = $this->getWebsocket(['documents'], [ 'origin' => 'http://localhost', 'cookie' => 'a_session_' . $projectId . '=' . $session, @@ -1364,7 +1364,7 @@ class RealtimeCustomClientQueryTest extends Scope $response = json_decode($client->receive(), true); $this->assertEquals('connected', $response['type']); - // Create document matching BOTH queries - should receive event + // Create document matching BOTH 'queries' - should receive event $document1 = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ 'content-type' => 'application/json', 'x-appwrite-project' => $projectId, @@ -1428,44 +1428,6 @@ class RealtimeCustomClientQueryTest extends Scope $this->assertTrue(true); } - // Create document with matching status but wrong ID - should receive event but the queryKeys should be only status matching as the model is subscription based similar to channels(only one query matches) - $otherDocId = ID::unique(); - $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ - 'content-type' => 'application/json', - 'x-appwrite-project' => $projectId, - ], $this->getHeaders()), [ - 'documentId' => $otherDocId, - 'data' => [ - 'status' => 'active' - ], - 'permissions' => [ - Permission::read(Role::any()), - ], - ]); - - // Create document matching NEITHER query - // above document created with status=>active - // so it will also receive it but the querykey can be used to distinction - $anotherDocId = ID::unique(); - $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ - 'content-type' => 'application/json', - 'x-appwrite-project' => $projectId, - ], $this->getHeaders()), [ - 'documentId' => $anotherDocId, - 'data' => [ - 'status' => 'inactive' - ], - 'permissions' => [ - Permission::read(Role::any()), - ], - ]); - - $data = json_decode($client->receive(), true); - $this->assertIsArray($data['data']['queryKeys']); - $this->assertEquals(1, count($data['data']['queryKeys'])); - $this->assertNotContains(Query::equal('status', ['inactive'])->toString(), $data['data']['queryKeys']); - $this->assertContains(Query::equal('status', ['active'])->toString(), $data['data']['queryKeys']); - $client->close(); } @@ -1539,7 +1501,7 @@ class RealtimeCustomClientQueryTest extends Scope $this->assertStringContainsString('not supported in Realtime queries', $response['data']['message']); $this->assertStringContainsString('startsWith', $response['data']['message']); - // Test 5: Multiple invalid queries in nested structure + // Test 5: Multiple invalid 'queries' in nested structure $client = $this->getWebsocket(['documents'], [ 'origin' => 'http://localhost', 'cookie' => 'a_session_' . $projectId . '=' . $session, @@ -1594,7 +1556,7 @@ class RealtimeCustomClientQueryTest extends Scope ]); $collectionId = $collection['body']['$id']; - // Attributes used by queries + // Attributes used by 'queries' $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/attributes/string', array_merge([ 'content-type' => 'application/json', 'x-appwrite-project' => $projectId, @@ -1626,7 +1588,7 @@ class RealtimeCustomClientQueryTest extends Scope Query::equal('category', ['gold']), ])->toString(); - // Subscribe with no queries -> should receive all events, queryKeys = [] + // Subscribe with no 'queries' -> should receive all events, queryKeys = [] $clientAll = $this->getWebsocket(['documents'], [ 'origin' => 'http://localhost', 'cookie' => 'a_session_' . $projectId . '=' . $session, @@ -1682,15 +1644,20 @@ class RealtimeCustomClientQueryTest extends Scope $eventAll = json_decode($clientAll->receive(), true); $this->assertEquals('event', $eventAll['type']); $this->assertEquals($docActiveGoldId, $eventAll['data']['payload']['$id']); - $this->assertArrayHasKey('queryKeys', $eventAll['data']); - $this->assertIsArray($eventAll['data']['queryKeys']); - $this->assertCount(0, $eventAll['data']['queryKeys']); + $this->assertArrayHasKey('queries', $eventAll['data']); + $this->assertIsArray($eventAll['data']['queries']); // clientQ1: should receive event, queryKeys contains queryStatusActive $eventQ1 = json_decode($clientQ1->receive(), true); $this->assertEquals('event', $eventQ1['type']); $this->assertEquals($docActiveGoldId, $eventQ1['data']['payload']['$id']); - $this->assertContains($queryStatusActive, $eventQ1['data']['queryKeys']); + $flatQueriesQ1 = []; + foreach ($eventQ1['data']['queries'] as $group) { + foreach ($group as $q) { + $flatQueriesQ1[] = $q; + } + } + $this->assertContains($queryStatusActive, $flatQueriesQ1); // clientQ2: should NOT receive event (status is active, not pending) try { @@ -1704,7 +1671,13 @@ class RealtimeCustomClientQueryTest extends Scope $eventComplex = json_decode($clientComplex->receive(), true); $this->assertEquals('event', $eventComplex['type']); $this->assertEquals($docActiveGoldId, $eventComplex['data']['payload']['$id']); - $this->assertContains($queryComplex, $eventComplex['data']['queryKeys']); + $flatQueriesComplex = []; + foreach ($eventComplex['data']['queries'] as $group) { + foreach ($group as $q) { + $flatQueriesComplex[] = $q; + } + } + $this->assertContains($queryComplex, $flatQueriesComplex); // 2) Create pending/silver document -> should match Q2 only, and be seen by all $docPendingSilverId = ID::unique(); @@ -1726,9 +1699,8 @@ class RealtimeCustomClientQueryTest extends Scope $eventAll2 = json_decode($clientAll->receive(), true); $this->assertEquals('event', $eventAll2['type']); $this->assertEquals($docPendingSilverId, $eventAll2['data']['payload']['$id']); - $this->assertArrayHasKey('queryKeys', $eventAll2['data']); - $this->assertIsArray($eventAll2['data']['queryKeys']); - $this->assertCount(0, $eventAll2['data']['queryKeys']); + $this->assertArrayHasKey('queries', $eventAll2['data']); + $this->assertIsArray($eventAll2['data']['queries']); // clientQ1: should NOT receive event (status is pending) try { @@ -1742,7 +1714,13 @@ class RealtimeCustomClientQueryTest extends Scope $eventQ2 = json_decode($clientQ2->receive(), true); $this->assertEquals('event', $eventQ2['type']); $this->assertEquals($docPendingSilverId, $eventQ2['data']['payload']['$id']); - $this->assertContains($queryStatusPending, $eventQ2['data']['queryKeys']); + $flatQueriesQ2 = []; + foreach ($eventQ2['data']['queries'] as $group) { + foreach ($group as $q) { + $flatQueriesQ2[] = $q; + } + } + $this->assertContains($queryStatusPending, $flatQueriesQ2); // clientComplex: should NOT receive event (status is pending, category silver) try { @@ -1794,7 +1772,7 @@ class RealtimeCustomClientQueryTest extends Scope ]); $collectionId = $collection['body']['$id']; - // Attribute used by queries + // Attribute used by 'queries' $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/attributes/string', array_merge([ 'content-type' => 'application/json', 'x-appwrite-project' => $projectId, @@ -1849,8 +1827,14 @@ class RealtimeCustomClientQueryTest extends Scope $eventQ1 = json_decode($clientQ1->receive(), true); $this->assertEquals('event', $eventQ1['type']); $this->assertEquals($docActiveId, $eventQ1['data']['payload']['$id']); - $this->assertArrayHasKey('queryKeys', $eventQ1['data']); - $this->assertContains($queryStatusActive, $eventQ1['data']['queryKeys']); + $this->assertArrayHasKey('queries', $eventQ1['data']); + $flatQ1 = []; + foreach ($eventQ1['data']['queries'] as $group) { + foreach ($group as $q) { + $flatQ1[] = $q; + } + } + $this->assertContains($queryStatusActive, $flatQ1); try { $clientQ2->receive(); @@ -1877,8 +1861,14 @@ class RealtimeCustomClientQueryTest extends Scope $eventQ2 = json_decode($clientQ2->receive(), true); $this->assertEquals('event', $eventQ2['type']); $this->assertEquals($docPendingId, $eventQ2['data']['payload']['$id']); - $this->assertArrayHasKey('queryKeys', $eventQ2['data']); - $this->assertContains($queryStatusPending, $eventQ2['data']['queryKeys']); + $this->assertArrayHasKey('queries', $eventQ2['data']); + $flatQ2 = []; + foreach ($eventQ2['data']['queries'] as $group) { + foreach ($group as $q) { + $flatQ2[] = $q; + } + } + $this->assertContains($queryStatusPending, $flatQ2); try { $clientQ1->receive(); diff --git a/tests/unit/Utopia/Database/Query/RuntimeQueryTest.php b/tests/unit/Utopia/Database/Query/RuntimeQueryTest.php index 7df1ca80eb..51d3a307da 100644 --- a/tests/unit/Utopia/Database/Query/RuntimeQueryTest.php +++ b/tests/unit/Utopia/Database/Query/RuntimeQueryTest.php @@ -599,4 +599,96 @@ class RuntimeQueryTest extends TestCase $result = RuntimeQuery::filter([$query], $payload); $this->assertEquals($payload, $result); } + + // TYPE_SELECT tests - select("*") means "listen to all events" + public function testSelectAllIsAllowed(): void + { + $query = Query::select(['*']); + $this->assertTrue(RuntimeQuery::isSelectAll($query)); + } + + public function testSelectSpecificFieldsNotAllowed(): void + { + $query = Query::select(['name', 'age']); + $this->assertFalse(RuntimeQuery::isSelectAll($query)); + } + + public function testSelectSingleFieldNotAllowed(): void + { + $query = Query::select(['name']); + $this->assertFalse(RuntimeQuery::isSelectAll($query)); + } + + public function testValidateSelectQueryWithWildcard(): void + { + $query = Query::select(['*']); + // Should not throw + RuntimeQuery::validateSelectQuery($query); + $this->assertTrue(true); + } + + public function testValidateSelectQueryWithSpecificFields(): void + { + $query = Query::select(['name', 'age']); + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('Only select("*") is allowed in Realtime queries'); + RuntimeQuery::validateSelectQuery($query); + } + + public function testValidateSelectQueryWithSingleField(): void + { + $query = Query::select(['name']); + $this->expectException(\InvalidArgumentException::class); + $this->expectExceptionMessage('Only select("*") is allowed in Realtime queries'); + RuntimeQuery::validateSelectQuery($query); + } + + public function testSelectInAllowedQueries(): void + { + $this->assertContains(Query::TYPE_SELECT, RuntimeQuery::ALLOWED_QUERIES); + } + + public function testIsSelectAllWithNonSelectQuery(): void + { + $query = Query::equal('name', ['John']); + $this->assertFalse(RuntimeQuery::isSelectAll($query)); + } + + public function testValidateSelectQueryWithNonSelectQuery(): void + { + $query = Query::equal('name', ['John']); + // Should not throw for non-select queries + RuntimeQuery::validateSelectQuery($query); + $this->assertTrue(true); + } + + // Filter tests with select("*") + public function testFilterWithSelectAllReturnsPayload(): void + { + $query = Query::select(['*']); + $payload = ['name' => 'John', 'age' => 30]; + $result = RuntimeQuery::filter([$query], $payload); + $this->assertEquals($payload, $result); + } + + public function testFilterWithSelectAllAndOtherQueriesReturnsPayload(): void + { + // If select("*") is present, it should return payload regardless of other queries + $queries = [ + Query::select(['*']), + Query::equal('name', ['Jane']), // This would normally fail + ]; + $payload = ['name' => 'John', 'age' => 30]; + $result = RuntimeQuery::filter($queries, $payload); + // select("*") takes precedence - returns payload + $this->assertEquals($payload, $result); + } + + public function testFilterWithSelectAllOnEmptyPayload(): void + { + $query = Query::select(['*']); + $payload = []; + $result = RuntimeQuery::filter([$query], $payload); + $this->assertEquals($payload, $result); + } } From 3c2f0cc982780fde8097e99603d79b727a335ee5 Mon Sep 17 00:00:00 2001 From: ArnabChatterjee20k Date: Mon, 2 Feb 2026 21:50:23 +0530 Subject: [PATCH 06/11] updated to new slot based filtering for realtime --- app/realtime.php | 149 +++++++++------- src/Appwrite/Messaging/Adapter.php | 2 +- src/Appwrite/Messaging/Adapter/Realtime.php | 167 ++++++++++++------ tests/e2e/Services/Realtime/RealtimeBase.php | 25 ++- .../RealtimeCustomClientQueryTest.php | 3 +- 5 files changed, 220 insertions(+), 126 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index cd57fbd686..fdd6f8a5ec 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -432,15 +432,19 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, ] ]; - $subscribers = $realtime->getSubscribers($event); - $connections = array_keys($subscribers); - $queries = array_values($subscribers); - $event['data']['queries'] = $queries; + $subscribers = $realtime->getSubscribers($event); // [connectionId => [subId => queries]] + + // For test events, send to all connections with their matched subscription queries + foreach ($subscribers as $connectionId => $matchedSubscriptions) { + $data = $event['data']; + // Send matched subscription IDs + $data['subscriptions'] = array_keys($matchedSubscriptions); - $server->send($connections, json_encode([ - 'type' => 'event', - 'data' => $event['data'] - ])); + $server->send([$connectionId], json_encode([ + 'type' => 'event', + 'data' => $data + ])); + } } }); @@ -479,42 +483,52 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $user = $database->getDocument('users', $userId); $roles = $user->getRoles($database->getAuthorization()); - $channels = $realtime->connections[$connection]['channels']; - $queries = []; + $channels = $realtime->connections[$connection]['channels'] ?? []; + $subscriptionMapping = $realtime->connections[$connection]['subscriptions'] ?? []; $authorization = $realtime->connections[$connection]['authorization'] ?? null; $realtime->unsubscribe($connection); - // Re-subscribe with the same queries for each channel - foreach ($channels as $channel => $identifierKey) { - $channelQueries = $queries[$channel] ?? [[Query::select(['*'])->toString()]]; - foreach ($channelQueries as $subscription) { - $currentChannelQueries = Realtime::convertQueries($subscription); - $realtime->subscribe($projectId, $connection, $roles, [$channel => $identifierKey], $currentChannelQueries); - } + // Re-subscribe with the same subscription structure + // Note: We can't fully reconstruct the original subscription indices from stored data, + // so we'll create new subscriptions. In practice, permissions changes should preserve + // the subscription structure, but this is a limitation of the current design. + if (!empty($channels)) { + // Create a single subscription with select("*") for all channels + $subscriptionId = ID::unique(); + $realtime->subscribe( + $projectId, + $connection, + $subscriptionId, + $roles, + $channels, + [Query::select(['*'])] + ); + $realtime->connections[$connection]['subscriptions'] = [0 => $subscriptionId]; } - // Restore authorization and queries after subscribe + // Restore authorization after subscribe if ($authorization !== null) { $realtime->connections[$connection]['authorization'] = $authorization; } } } - $receivers = $realtime->getSubscribers($event); // [connectionId => matchedQueryKeys[]] + $receivers = $realtime->getSubscribers($event); // [connectionId => [subId => queries]] if (App::isDevelopment() && !empty($receivers)) { Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers)); Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode(array_keys($receivers))); - Console::log("[Debug][Worker {$workerId}] QueryKeys: " . array_values($receivers)); + Console::log("[Debug][Worker {$workerId}] Event Query: " . json_encode($receivers)); Console::log("[Debug][Worker {$workerId}] Event: " . $payload); } $totalMessages = 0; - foreach ($receivers as $connectionId => $matchedQueryKeys) { + foreach ($receivers as $connectionId => $matchedSubscriptions) { $data = $event['data']; - $data['queries'] = $matchedQueryKeys; + // Send matched subscription IDs + $data['subscriptions'] = array_keys($matchedSubscriptions); $server->send( [$connectionId], @@ -617,47 +631,47 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, throw new Exception(Exception::REALTIME_POLICY_VIOLATION, 'Missing channels'); } - $allQueries = []; - foreach ($channels as $channel => $identifierKey) { - // Get the channel's query parameter (array of subscriptions) - // Format: {channel}[0][]=select("*") (all events), {channel}[1][]={query1}&{channel}[1][]={query2} - $channelSubscriptions = $request->getQuery($channel, null); + // Reconstruct subscriptions from query params using helper method + $channelNames = array_keys($channels); + try { + $subscriptionsByIndex = Realtime::constructSubscriptions( + $channelNames, + fn($channel) => $request->getQuery($channel, null) + ); + } catch (QueryException $e) { + throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $e->getMessage()); + } - // Backward compatibility: if no channel-specific query params, treat as single subscription with select("*") - if ($channelSubscriptions === null) { - $channelSubscriptions = [[Query::select(['*'])->toString()]]; - } + // Generate subscription IDs and subscribe + $subscriptionMapping = []; + foreach ($subscriptionsByIndex as $index => $subscription) { + // Generate unique subscription ID + $subscriptionId = ID::unique(); - // Ensure it's an array - if (!is_array($channelSubscriptions)) { - $channelSubscriptions = [$channelSubscriptions]; - } + // Subscribe with all channels for this subscription + $realtime->subscribe( + $project->getId(), + $connection, + $subscriptionId, + $roles, + $subscription['channels'], + $subscription['queries'] // Query objects + ); - $channelQueries = []; - foreach ($channelSubscriptions as $subscriptionIndex => $subscription) { - try { - // Parse and validate the queries - $currentChannelQueries = Realtime::convertQueries($subscription); - - $realtime->subscribe($project->getId(), $connection, $roles, [$channel => $identifierKey], $currentChannelQueries); - $channelQueries[] = $subscription; - } catch (QueryException $e) { - throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $e->getMessage()); - } - } - $allQueries[$channel] = $channelQueries; + // Store mapping: subscription index -> subscription ID + $subscriptionMapping[$index] = $subscriptionId; } $realtime->connections[$connection]['authorization'] = $authorization; - $realtime->connections[$connection]['queries'] = $allQueries; + $realtime->connections[$connection]['subscriptions'] = $subscriptionMapping; $user = empty($user->getId()) ? null : $response->output($user, Response::MODEL_ACCOUNT); $server->send([$connection], json_encode([ 'type' => 'connected', 'data' => [ - 'channels' => array_keys($channels), - 'queries' => $allQueries, + 'channels' => $channelNames, + 'subscriptions' => $subscriptionMapping, 'user' => $user ] ])); @@ -787,26 +801,35 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re } $roles = $user->getRoles($database->getAuthorization()); - $channels = Realtime::convertChannels(array_flip($realtime->connections[$connection]['channels']), $user->getId()); + $channelNames = $realtime->connections[$connection]['channels'] ?? []; + $channels = Realtime::convertChannels(array_flip($channelNames), $user->getId()); - // Preserve authorization and queries before subscribe overwrites the connection array + // Preserve authorization and subscription mapping before subscribe overwrites the connection array $authorization = $realtime->connections[$connection]['authorization'] ?? null; - $queries = $realtime->connections[$connection]['queries'] ?? []; + $subscriptionMapping = $realtime->connections[$connection]['subscriptions'] ?? []; - // Re-subscribe with the same queries for each channel - foreach ($channels as $channel => $identifierKey) { - $channelQueries = $queries[$channel] ?? [[Query::select(['*'])->toString()]]; - foreach ($channelQueries as $subscription) { - $currentChannelQueries = Realtime::convertQueries($subscription); - $realtime->subscribe($realtime->connections[$connection]['projectId'], $connection, $roles, [$channel => $identifierKey], $currentChannelQueries); - } + // Re-subscribe with the same subscription structure + // Note: We can't fully reconstruct the original subscription indices from stored data, + // so we'll create new subscriptions. In practice, authentication should preserve + // the subscription structure, but this is a limitation of the current design. + if (!empty($channelNames)) { + // Create a single subscription with select("*") for all channels + $subscriptionId = ID::unique(); + $realtime->subscribe( + $realtime->connections[$connection]['projectId'], + $connection, + $subscriptionId, + $roles, + $channelNames, + [Query::select(['*'])] + ); + $realtime->connections[$connection]['subscriptions'] = [0 => $subscriptionId]; } - // Restore authorization and queries after subscribe + // Restore authorization after subscribe if ($authorization !== null) { $realtime->connections[$connection]['authorization'] = $authorization; } - $realtime->connections[$connection]['queries'] = $queries; $user = $response->output($user, Response::MODEL_ACCOUNT); $server->send([$connection], json_encode([ diff --git a/src/Appwrite/Messaging/Adapter.php b/src/Appwrite/Messaging/Adapter.php index 40169bd1a9..d3ed2d2cf9 100644 --- a/src/Appwrite/Messaging/Adapter.php +++ b/src/Appwrite/Messaging/Adapter.php @@ -4,7 +4,7 @@ namespace Appwrite\Messaging; abstract class Adapter { - abstract public function subscribe(string $projectId, mixed $identifier, array $roles, array $channels): void; + abstract public function subscribe(string $projectId, mixed $identifier, string $subscriptionId, array $roles, array $channels, array $queryGroup = []): void; abstract public function unsubscribe(mixed $identifier): void; abstract public function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options): void; } diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 5aa39dde65..cc1e5c03f2 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -30,15 +30,12 @@ class Realtime extends MessagingAdapter * [PROJECT_ID] -> * [ROLE_X] -> * [CHANNEL_NAME_X] -> - * [CONNECTION_ID] -> [ - * [query1, query2], // Subscription group 0 - AND logic within group - * [query3], // Subscription group 1 - * [query4, query5], // Subscription group 2 - OR logic across groups - * ] + * [CONNECTION_ID] -> + * [SUB_ID] -> [query1, query2, ...] // Subscription with queries (AND logic) * - * Each subscription group is an array of query strings. - * Within a group: AND logic (all queries must match) - * Across groups: OR logic (any group matching = send event) + * Each subscription ID maps to an array of query strings. + * Within a subscription: AND logic (all queries must match) + * Across subscriptions: OR logic (any subscription matching = send event) */ public array $subscriptions = []; @@ -51,22 +48,23 @@ class Realtime extends MessagingAdapter } /** - * Adds a subscription group. + * Adds a subscription with a specific subscription ID. * * @param string $projectId * @param mixed $identifier Connection ID + * @param string $subscriptionId Unique subscription ID * @param array $roles User roles - * @param array $channels Channels to subscribe to - * @param array $queryGroup Array of Query objects for this subscription group (AND logic within group) + * @param array $channels Channels to subscribe to (array of channel names) + * @param array $queryGroup Array of Query objects for this subscription (AND logic within subscription) * @return void */ - public function subscribe(string $projectId, mixed $identifier, array $roles, array $channels, array $queryGroup = []): void + public function subscribe(string $projectId, mixed $identifier, string $subscriptionId, array $roles, array $channels, array $queryGroup = []): void { if (!isset($this->subscriptions[$projectId])) { // Init Project $this->subscriptions[$projectId] = []; } - // Convert Query objects to strings for this subscription group + // Convert Query objects to strings for this subscription $queryStrings = []; if (empty($queryGroup)) { // No queries means "listen to all events" - use select("*") @@ -79,35 +77,32 @@ class Realtime extends MessagingAdapter } foreach ($roles as $role) { - if (!isset($this->subscriptions[$projectId][$role])) { // Add user first connection + if (!isset($this->subscriptions[$projectId][$role])) { $this->subscriptions[$projectId][$role] = []; } - foreach ($channels as $channel => $list) { + foreach ($channels as $channel) { + if (!isset($this->subscriptions[$projectId][$role][$channel])) { + $this->subscriptions[$projectId][$role][$channel] = []; + } if (!isset($this->subscriptions[$projectId][$role][$channel][$identifier])) { $this->subscriptions[$projectId][$role][$channel][$identifier] = []; } - // Add this query group as a new subscription group (array of query strings) - $this->subscriptions[$projectId][$role][$channel][$identifier][] = $queryStrings; + // Store subscription under subscription ID + $this->subscriptions[$projectId][$role][$channel][$identifier][$subscriptionId] = $queryStrings; } } - // Keep a complete view of channels for unsubscribe(), even if subscribe() is called repeatedly - if (!isset($this->connections[$identifier])) { - $this->connections[$identifier] = [ - 'projectId' => $projectId, - 'roles' => $roles, - 'channels' => $channels - ]; - } else { - $this->connections[$identifier]['projectId'] = $projectId; - $this->connections[$identifier]['roles'] = $roles; - $this->connections[$identifier]['channels'] = \array_merge($this->connections[$identifier]['channels'], $channels); - } + // Update connection info + $this->connections[$identifier] = [ + 'projectId' => $projectId, + 'roles' => $roles, + 'channels' => $channels + ]; } /** - * Removes Subscription. + * Removes all subscriptions for a connection. * * @param mixed $connection * @return void @@ -119,8 +114,8 @@ class Realtime extends MessagingAdapter $channels = $this->connections[$connection]['channels'] ?? []; foreach ($roles as $role) { - foreach ($channels as $channel => $list) { - unset($this->subscriptions[$projectId][$role][$channel][$connection]); // dropping connection will drop the queries as well + foreach ($channels as $channel) { + unset($this->subscriptions[$projectId][$role][$channel][$connection]); // dropping connection will drop all subscriptions if (empty($this->subscriptions[$projectId][$role][$channel])) { unset($this->subscriptions[$projectId][$role][$channel]); // Remove channel when no connections @@ -234,33 +229,31 @@ class Realtime extends MessagingAdapter * Saving all connections that are allowed to receive this event. */ $payload = $event['data']['payload'] ?? []; - foreach ($this->subscriptions[$event['project']][$role][$channel] as $id => $subscriptionGroups) { - $matchedGroups = []; + foreach ($this->subscriptions[$event['project']][$role][$channel] as $id => $subscriptions) { + $matchedSubscriptions = []; - // Process each subscription group (OR logic across groups) - foreach ($subscriptionGroups as $queryGroup) { - // Parse all queries in this group + // Process each subscription (OR logic across subscriptions) + foreach ($subscriptions as $subId => $queryStrings) { + // Parse all queries in this subscription $parsedQueries = []; - foreach ($queryGroup as $queryString) { + foreach ($queryStrings as $queryString) { $parsed = Query::parseQueries([$queryString]); $parsedQueries = array_merge($parsedQueries, $parsed); } - // Check if this group matches (AND logic within group) - // RuntimeQuery::filter handles select("*") - returns payload if present + // Check if this subscription matches (AND logic within subscription) if (!empty(RuntimeQuery::filter($parsedQueries, $payload))) { - // This group matched - add it to matched groups - $matchedGroups[] = $queryGroup; + // This subscription matched - add subscription ID to matched subscriptions + $matchedSubscriptions[$subId] = $queryStrings; } } - // Only add to receivers if at least one group matched - if (!empty($matchedGroups)) { + // Only add connection to receivers if at least one subscription matched + if (!empty($matchedSubscriptions)) { if (!isset($receivers[$id])) { $receivers[$id] = []; } - // Store matched groups (each group is an array of query strings) - $receivers[$id] = array_merge($receivers[$id], $matchedGroups); + $receivers[$id] = array_merge($receivers[$id], $matchedSubscriptions); } } break; @@ -269,16 +262,6 @@ class Realtime extends MessagingAdapter } } - // De-duplicate groups per connection (same connection can match via multiple roles) - foreach ($receivers as $id => $groups) { - $unique = []; - foreach ($groups as $group) { - $key = \json_encode($group); - $unique[$key] = $group; - } - $receivers[$id] = \array_values($unique); - } - return $receivers; } @@ -310,6 +293,78 @@ class Realtime extends MessagingAdapter return $channels; } + /** + * Constructs subscriptions from query parameters. + * + * Reconstructs subscription structure from query params where subscription indices can span multiple channels. + * Format: {channel}[subscriptionIndex][]=query1&{channel}[subscriptionIndex][]=query2 + * + * Example: + * - tests[0][]=select(*) → subscription 0: channels=["tests"] + * - tests[1][]=equal(...) & prod[1][]=equal(...) → subscription 1: channels=["tests", "prod"] + * + * @param array $channelNames Array of channel names + * @param callable $getQueryParam Callable that takes a channel name and returns its query param value (null if not present) + * @return array Array indexed by subscription index: [index => ['channels' => string[], 'queries' => Query[]]] + * @throws QueryException + */ + public static function constructSubscriptions(array $channelNames, callable $getQueryParam): array + { + $subscriptionsByIndex = []; + + foreach ($channelNames as $channel) { + $channelSubscriptions = $getQueryParam($channel); + + // Backward compatibility: if no channel-specific query params, treat as subscription 0 with select("*") + if ($channelSubscriptions === null) { + if (!isset($subscriptionsByIndex[0])) { + $subscriptionsByIndex[0] = [ + 'channels' => [], + 'queries' => [] + ]; + } + $subscriptionsByIndex[0]['channels'][] = $channel; + if (empty($subscriptionsByIndex[0]['queries'])) { + $subscriptionsByIndex[0]['queries'] = [Query::select(['*'])]; + } + continue; + } + + // Ensure it's an array + if (!is_array($channelSubscriptions)) { + $channelSubscriptions = [$channelSubscriptions]; + } + + // Process each subscription index for this channel + foreach ($channelSubscriptions as $subscriptionIndex => $subscription) { + if (!isset($subscriptionsByIndex[$subscriptionIndex])) { + $subscriptionsByIndex[$subscriptionIndex] = [ + 'channels' => [], + 'queries' => [] + ]; + } + + // Add this channel to the subscription + if (!in_array($channel, $subscriptionsByIndex[$subscriptionIndex]['channels'])) { + $subscriptionsByIndex[$subscriptionIndex]['channels'][] = $channel; + } + + // Set queries for this subscription (queries are the same across all channels in a subscription) + if (empty($subscriptionsByIndex[$subscriptionIndex]['queries'])) { + // Ensure $subscription is an array (handle both array and string inputs) + $queriesToParse = is_array($subscription) ? $subscription : [$subscription]; + + // Parse and validate the queries + $parsedQueries = self::convertQueries($queriesToParse); + // Store Query objects + $subscriptionsByIndex[$subscriptionIndex]['queries'] = $parsedQueries; + } + } + } + + return $subscriptionsByIndex; + } + /** * Converts the queries from the Query Params into an array. * @param array|string $queries diff --git a/tests/e2e/Services/Realtime/RealtimeBase.php b/tests/e2e/Services/Realtime/RealtimeBase.php index 46eb7e660b..36bd5ef2bb 100644 --- a/tests/e2e/Services/Realtime/RealtimeBase.php +++ b/tests/e2e/Services/Realtime/RealtimeBase.php @@ -30,22 +30,37 @@ trait RealtimeBase * AND logic within the group; OR logic across multiple groups (if we ever add them). * * For now all E2E tests subscribe to a single channel, so we map queries to $channels[0]. + * + * Slot-based format: channel[slot][]=query1&channel[slot][]=query2 + * We need to manually build the query string to ensure the [] format is used. */ + // Build base query string + $queryParams = [ + "project" => $projectId, + "channels" => $channels + ]; + $queryString = http_build_query($queryParams); + if ($queries !== null && !empty($channels)) { $channel = $channels[0]; + $slot = 0; // All tests use slot 0 for now if ($queries === []) { - // Explicit select("*") group - $query[$channel][0] = [\Utopia\Database\Query::select(['*'])->toString()]; + // Explicit select("*") group - single query in slot 0 + $queryValue = \Utopia\Database\Query::select(['*'])->toString(); + $queryString .= "&" . urlencode($channel) . "[" . $slot . "][]=" . urlencode($queryValue); } else { - // Single subscription group for this channel - $query[$channel][0] = $queries; + // Single subscription group for this channel - multiple queries in slot 0 + // Each query should be appended with [] format + foreach ($queries as $queryValue) { + $queryString .= "&" . urlencode($channel) . "[" . $slot . "][]=" . urlencode($queryValue); + } } } return new WebSocketClient( - "ws://appwrite.test/v1/realtime?" . http_build_query($query), + "ws://appwrite.test/v1/realtime?" . $queryString, [ "headers" => $headers, "timeout" => 30, diff --git a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php index e2864730aa..add51cf7e3 100644 --- a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php +++ b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php @@ -86,7 +86,8 @@ class RealtimeCustomClientQueryTest extends Scope // Should timeout - no event should be received try { - $client->receive(); + $data = $client->receive(); + var_dump($data); $this->fail('Expected TimeoutException - event should be filtered'); } catch (TimeoutException $e) { $this->assertTrue(true); From 10e971adedf07ba5e2af4dbf131f90d98f0f6672 Mon Sep 17 00:00:00 2001 From: ArnabChatterjee20k Date: Tue, 3 Feb 2026 10:46:01 +0530 Subject: [PATCH 07/11] updated tests and empty subscriptionsid issue --- app/realtime.php | 3 +- src/Appwrite/Messaging/Adapter/Realtime.php | 16 ++-- .../RealtimeCustomClientQueryTest.php | 81 ++++++++----------- 3 files changed, 42 insertions(+), 58 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index fdd6f8a5ec..44c934f8dd 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -519,7 +519,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, if (App::isDevelopment() && !empty($receivers)) { Console::log("[Debug][Worker {$workerId}] Receivers: " . count($receivers)); Console::log("[Debug][Worker {$workerId}] Receivers Connection IDs: " . json_encode(array_keys($receivers))); - Console::log("[Debug][Worker {$workerId}] Event Query: " . json_encode($receivers)); + Console::log("[Debug][Worker {$workerId}] Event Query: " . json_encode(array_values($receivers))); Console::log("[Debug][Worker {$workerId}] Event: " . $payload); } @@ -633,6 +633,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, // Reconstruct subscriptions from query params using helper method $channelNames = array_keys($channels); + try { $subscriptionsByIndex = Realtime::constructSubscriptions( $channelNames, diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index cc1e5c03f2..c3434c40cc 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -230,7 +230,9 @@ class Realtime extends MessagingAdapter */ $payload = $event['data']['payload'] ?? []; foreach ($this->subscriptions[$event['project']][$role][$channel] as $id => $subscriptions) { - $matchedSubscriptions = []; + if (!isset($receivers[$id])) { + $receivers[$id] = []; + } // Process each subscription (OR logic across subscriptions) foreach ($subscriptions as $subId => $queryStrings) { @@ -244,17 +246,11 @@ class Realtime extends MessagingAdapter // Check if this subscription matches (AND logic within subscription) if (!empty(RuntimeQuery::filter($parsedQueries, $payload))) { // This subscription matched - add subscription ID to matched subscriptions - $matchedSubscriptions[$subId] = $queryStrings; + if (!isset($receivers[$id][$subId])) { + $receivers[$id][$subId] = $queryStrings; + } } } - - // Only add connection to receivers if at least one subscription matched - if (!empty($matchedSubscriptions)) { - if (!isset($receivers[$id])) { - $receivers[$id] = []; - } - $receivers[$id] = array_merge($receivers[$id], $matchedSubscriptions); - } } break; } diff --git a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php index add51cf7e3..296644eabf 100644 --- a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php +++ b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php @@ -1589,7 +1589,7 @@ class RealtimeCustomClientQueryTest extends Scope Query::equal('category', ['gold']), ])->toString(); - // Subscribe with no 'queries' -> should receive all events, queryKeys = [] + // Subscribe with no 'queries' -> should receive all events (has select("*") subscription) $clientAll = $this->getWebsocket(['documents'], [ 'origin' => 'http://localhost', 'cookie' => 'a_session_' . $projectId . '=' . $session, @@ -1641,24 +1641,23 @@ class RealtimeCustomClientQueryTest extends Scope ], ]); - // clientAll: should receive event, queryKeys = [] + // clientAll: should receive event, subscriptions should not be empty (has select("*") subscription that matches) $eventAll = json_decode($clientAll->receive(), true); $this->assertEquals('event', $eventAll['type']); $this->assertEquals($docActiveGoldId, $eventAll['data']['payload']['$id']); - $this->assertArrayHasKey('queries', $eventAll['data']); - $this->assertIsArray($eventAll['data']['queries']); + $this->assertArrayHasKey('subscriptions', $eventAll['data']); + $this->assertIsArray($eventAll['data']['subscriptions']); + // clientAll has select("*") subscription that matches all events, so subscriptions should not be empty + $this->assertNotEmpty($eventAll['data']['subscriptions']); - // clientQ1: should receive event, queryKeys contains queryStatusActive + // clientQ1: should receive event, subscriptions should not be empty (query matched) $eventQ1 = json_decode($clientQ1->receive(), true); $this->assertEquals('event', $eventQ1['type']); $this->assertEquals($docActiveGoldId, $eventQ1['data']['payload']['$id']); - $flatQueriesQ1 = []; - foreach ($eventQ1['data']['queries'] as $group) { - foreach ($group as $q) { - $flatQueriesQ1[] = $q; - } - } - $this->assertContains($queryStatusActive, $flatQueriesQ1); + $this->assertArrayHasKey('subscriptions', $eventQ1['data']); + $this->assertIsArray($eventQ1['data']['subscriptions']); + // clientQ1 has a query that matches, so subscriptions should not be empty + $this->assertNotEmpty($eventQ1['data']['subscriptions']); // clientQ2: should NOT receive event (status is active, not pending) try { @@ -1668,17 +1667,14 @@ class RealtimeCustomClientQueryTest extends Scope $this->assertTrue(true); } - // clientComplex: should receive event, queryKeys contains queryComplex + // clientComplex: should receive event, subscriptions should not be empty (query matched) $eventComplex = json_decode($clientComplex->receive(), true); $this->assertEquals('event', $eventComplex['type']); $this->assertEquals($docActiveGoldId, $eventComplex['data']['payload']['$id']); - $flatQueriesComplex = []; - foreach ($eventComplex['data']['queries'] as $group) { - foreach ($group as $q) { - $flatQueriesComplex[] = $q; - } - } - $this->assertContains($queryComplex, $flatQueriesComplex); + $this->assertArrayHasKey('subscriptions', $eventComplex['data']); + $this->assertIsArray($eventComplex['data']['subscriptions']); + // clientComplex has a query that matches, so subscriptions should not be empty + $this->assertNotEmpty($eventComplex['data']['subscriptions']); // 2) Create pending/silver document -> should match Q2 only, and be seen by all $docPendingSilverId = ID::unique(); @@ -1696,12 +1692,14 @@ class RealtimeCustomClientQueryTest extends Scope ], ]); - // clientAll: should receive event, queryKeys = [] + // clientAll: should receive event, subscriptions should not be empty (has select("*") subscription that matches) $eventAll2 = json_decode($clientAll->receive(), true); $this->assertEquals('event', $eventAll2['type']); $this->assertEquals($docPendingSilverId, $eventAll2['data']['payload']['$id']); - $this->assertArrayHasKey('queries', $eventAll2['data']); - $this->assertIsArray($eventAll2['data']['queries']); + $this->assertArrayHasKey('subscriptions', $eventAll2['data']); + $this->assertIsArray($eventAll2['data']['subscriptions']); + // clientAll has select("*") subscription that matches all events, so subscriptions should not be empty + $this->assertNotEmpty($eventAll2['data']['subscriptions']); // clientQ1: should NOT receive event (status is pending) try { @@ -1711,17 +1709,14 @@ class RealtimeCustomClientQueryTest extends Scope $this->assertTrue(true); } - // clientQ2: should receive event, queryKeys contains queryStatusPending + // clientQ2: should receive event, subscriptions should not be empty (query matched) $eventQ2 = json_decode($clientQ2->receive(), true); $this->assertEquals('event', $eventQ2['type']); $this->assertEquals($docPendingSilverId, $eventQ2['data']['payload']['$id']); - $flatQueriesQ2 = []; - foreach ($eventQ2['data']['queries'] as $group) { - foreach ($group as $q) { - $flatQueriesQ2[] = $q; - } - } - $this->assertContains($queryStatusPending, $flatQueriesQ2); + $this->assertArrayHasKey('subscriptions', $eventQ2['data']); + $this->assertIsArray($eventQ2['data']['subscriptions']); + // clientQ2 has a query that matches, so subscriptions should not be empty + $this->assertNotEmpty($eventQ2['data']['subscriptions']); // clientComplex: should NOT receive event (status is pending, category silver) try { @@ -1828,14 +1823,10 @@ class RealtimeCustomClientQueryTest extends Scope $eventQ1 = json_decode($clientQ1->receive(), true); $this->assertEquals('event', $eventQ1['type']); $this->assertEquals($docActiveId, $eventQ1['data']['payload']['$id']); - $this->assertArrayHasKey('queries', $eventQ1['data']); - $flatQ1 = []; - foreach ($eventQ1['data']['queries'] as $group) { - foreach ($group as $q) { - $flatQ1[] = $q; - } - } - $this->assertContains($queryStatusActive, $flatQ1); + $this->assertArrayHasKey('subscriptions', $eventQ1['data']); + $this->assertIsArray($eventQ1['data']['subscriptions']); + // clientQ1 has a query that matches, so subscriptions should not be empty + $this->assertNotEmpty($eventQ1['data']['subscriptions']); try { $clientQ2->receive(); @@ -1862,14 +1853,10 @@ class RealtimeCustomClientQueryTest extends Scope $eventQ2 = json_decode($clientQ2->receive(), true); $this->assertEquals('event', $eventQ2['type']); $this->assertEquals($docPendingId, $eventQ2['data']['payload']['$id']); - $this->assertArrayHasKey('queries', $eventQ2['data']); - $flatQ2 = []; - foreach ($eventQ2['data']['queries'] as $group) { - foreach ($group as $q) { - $flatQ2[] = $q; - } - } - $this->assertContains($queryStatusPending, $flatQ2); + $this->assertArrayHasKey('subscriptions', $eventQ2['data']); + $this->assertIsArray($eventQ2['data']['subscriptions']); + // clientQ2 has a query that matches, so subscriptions should not be empty + $this->assertNotEmpty($eventQ2['data']['subscriptions']); try { $clientQ1->receive(); From d18d4d8263c36478dc6c7f21e209c41283e09aec Mon Sep 17 00:00:00 2001 From: ArnabChatterjee20k Date: Tue, 3 Feb 2026 11:43:23 +0530 Subject: [PATCH 08/11] updated subscription process --- app/realtime.php | 58 ++++++---- src/Appwrite/Messaging/Adapter/Realtime.php | 107 ++++++++++++++++--- tests/e2e/Services/Realtime/RealtimeBase.php | 2 +- 3 files changed, 129 insertions(+), 38 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index 44c934f8dd..b800e4c194 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -6,7 +6,6 @@ use Appwrite\Messaging\Adapter\Realtime; use Appwrite\Network\Validator\Origin; use Appwrite\PubSub\Adapter\Pool as PubSubPool; use Appwrite\Utopia\Database\Documents\User; -use Appwrite\Utopia\Database\RuntimeQuery; use Appwrite\Utopia\Request; use Appwrite\Utopia\Response; use Swoole\Coroutine; @@ -433,7 +432,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, ]; $subscribers = $realtime->getSubscribers($event); // [connectionId => [subId => queries]] - + // For test events, send to all connections with their matched subscription queries foreach ($subscribers as $connectionId => $matchedSubscriptions) { $data = $event['data']; @@ -484,29 +483,46 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $roles = $user->getRoles($database->getAuthorization()); $channels = $realtime->connections[$connection]['channels'] ?? []; - $subscriptionMapping = $realtime->connections[$connection]['subscriptions'] ?? []; $authorization = $realtime->connections[$connection]['authorization'] ?? null; + $oldSubscriptionMapping = $realtime->connections[$connection]['subscriptions'] ?? []; + + // Get subscription metadata from subscriptions tree before unsubscribing + $subscriptionMetadata = $realtime->getSubscriptionIds($connection); $realtime->unsubscribe($connection); - // Re-subscribe with the same subscription structure - // Note: We can't fully reconstruct the original subscription indices from stored data, - // so we'll create new subscriptions. In practice, permissions changes should preserve - // the subscription structure, but this is a limitation of the current design. - if (!empty($channels)) { - // Create a single subscription with select("*") for all channels - $subscriptionId = ID::unique(); - $realtime->subscribe( - $projectId, - $connection, - $subscriptionId, - $roles, - $channels, - [Query::select(['*'])] - ); - $realtime->connections[$connection]['subscriptions'] = [0 => $subscriptionId]; + // Re-subscribe with the original subscription structure and queries + $newSubscriptionMapping = []; + if (!empty($subscriptionMetadata)) { + // Restore each subscription with its original channels and queries + foreach ($subscriptionMetadata as $oldSubId => $metadata) { + $subscriptionId = ID::unique(); + + // Parse query strings back to Query objects for subscribe + $queries = Query::parseQueries($metadata['queries']); + + $realtime->subscribe( + $projectId, + $connection, + $subscriptionId, + $roles, + $metadata['channels'], + $queries + ); + + // Find the index of the old subscription ID in the mapping + $oldIndex = array_search($oldSubId, $oldSubscriptionMapping); + if ($oldIndex !== false) { + $newSubscriptionMapping[$oldIndex] = $subscriptionId; + } else { + // If not found in mapping, use a new index + $newSubscriptionMapping[] = $subscriptionId; + } + } } + $realtime->connections[$connection]['subscriptions'] = $newSubscriptionMapping; + // Restore authorization after subscribe if ($authorization !== null) { $realtime->connections[$connection]['authorization'] = $authorization; @@ -633,11 +649,11 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, // Reconstruct subscriptions from query params using helper method $channelNames = array_keys($channels); - + try { $subscriptionsByIndex = Realtime::constructSubscriptions( $channelNames, - fn($channel) => $request->getQuery($channel, null) + fn ($channel) => $request->getQuery($channel, null) ); } catch (QueryException $e) { throw new Exception(Exception::REALTIME_POLICY_VIOLATION, $e->getMessage()); diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index c3434c40cc..586996186d 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -94,11 +94,75 @@ class Realtime extends MessagingAdapter } // Update connection info - $this->connections[$identifier] = [ - 'projectId' => $projectId, - 'roles' => $roles, - 'channels' => $channels - ]; + if (!isset($this->connections[$identifier])) { + $this->connections[$identifier] = [ + 'projectId' => $projectId, + 'roles' => $roles, + 'channels' => $channels, + 'subscriptions' => [] + ]; + } else { + // Update existing connection info + $this->connections[$identifier]['projectId'] = $projectId; + $this->connections[$identifier]['roles'] = $roles; + $this->connections[$identifier]['channels'] = $channels; + // Initialize subscriptions array if not exists + if (!isset($this->connections[$identifier]['subscriptions'])) { + $this->connections[$identifier]['subscriptions'] = []; + } + } + + // Add subscription ID to connections array + if (!in_array($subscriptionId, $this->connections[$identifier]['subscriptions'], true)) { + $this->connections[$identifier]['subscriptions'][] = $subscriptionId; + } + } + + /** + * Get subscription metadata for a connection. + * Retrieves subscription data including channels and queries directly from the subscriptions tree. + * + * @param mixed $connection Connection ID + * @return array Array of [subscriptionId => ['channels' => string[], 'queries' => string[]]] + */ + public function getSubscriptionIds(mixed $connection): array + { + $projectId = $this->connections[$connection]['projectId'] ?? null; + $roles = $this->connections[$connection]['roles'] ?? []; + $channels = $this->connections[$connection]['channels'] ?? []; + + if (!$projectId || empty($roles) || empty($channels)) { + return []; + } + + $subscriptions = []; + + // Extract subscription data from subscriptions tree + foreach ($roles as $role) { + if (!isset($this->subscriptions[$projectId][$role])) { + continue; + } + + foreach ($channels as $channel) { + if (!isset($this->subscriptions[$projectId][$role][$channel][$connection])) { + continue; + } + + foreach ($this->subscriptions[$projectId][$role][$channel][$connection] as $subId => $queryStrings) { + if (!isset($subscriptions[$subId])) { + $subscriptions[$subId] = [ + 'channels' => [], + 'queries' => $queryStrings + ]; + } + if (!in_array($channel, $subscriptions[$subId]['channels'])) { + $subscriptions[$subId]['channels'][] = $channel; + } + } + } + } + + return $subscriptions; } /** @@ -131,7 +195,14 @@ class Realtime extends MessagingAdapter unset($this->subscriptions[$projectId]); } - unset($this->connections[$connection]); + // Remove subscriptions array from connection before unsetting + if (isset($this->connections[$connection]['subscriptions'])) { + unset($this->connections[$connection]['subscriptions']); + } + + if (isset($this->connections[$connection])) { + unset($this->connections[$connection]); + } } /** @@ -230,9 +301,7 @@ class Realtime extends MessagingAdapter */ $payload = $event['data']['payload'] ?? []; foreach ($this->subscriptions[$event['project']][$role][$channel] as $id => $subscriptions) { - if (!isset($receivers[$id])) { - $receivers[$id] = []; - } + $matchedSubscriptions = []; // Process each subscription (OR logic across subscriptions) foreach ($subscriptions as $subId => $queryStrings) { @@ -246,11 +315,17 @@ class Realtime extends MessagingAdapter // Check if this subscription matches (AND logic within subscription) if (!empty(RuntimeQuery::filter($parsedQueries, $payload))) { // This subscription matched - add subscription ID to matched subscriptions - if (!isset($receivers[$id][$subId])) { - $receivers[$id][$subId] = $queryStrings; - } + $matchedSubscriptions[$subId] = $queryStrings; } } + + // Only add connection to receivers if at least one subscription matched + if (!empty($matchedSubscriptions)) { + if (!isset($receivers[$id])) { + $receivers[$id] = []; + } + $receivers[$id] = array_merge($receivers[$id], $matchedSubscriptions); + } } break; } @@ -291,14 +366,14 @@ class Realtime extends MessagingAdapter /** * Constructs subscriptions from query parameters. - * + * * Reconstructs subscription structure from query params where subscription indices can span multiple channels. * Format: {channel}[subscriptionIndex][]=query1&{channel}[subscriptionIndex][]=query2 - * + * * Example: * - tests[0][]=select(*) → subscription 0: channels=["tests"] * - tests[1][]=equal(...) & prod[1][]=equal(...) → subscription 1: channels=["tests", "prod"] - * + * * @param array $channelNames Array of channel names * @param callable $getQueryParam Callable that takes a channel name and returns its query param value (null if not present) * @return array Array indexed by subscription index: [index => ['channels' => string[], 'queries' => Query[]]] @@ -349,7 +424,7 @@ class Realtime extends MessagingAdapter if (empty($subscriptionsByIndex[$subscriptionIndex]['queries'])) { // Ensure $subscription is an array (handle both array and string inputs) $queriesToParse = is_array($subscription) ? $subscription : [$subscription]; - + // Parse and validate the queries $parsedQueries = self::convertQueries($queriesToParse); // Store Query objects diff --git a/tests/e2e/Services/Realtime/RealtimeBase.php b/tests/e2e/Services/Realtime/RealtimeBase.php index 36bd5ef2bb..1b77c0ad4a 100644 --- a/tests/e2e/Services/Realtime/RealtimeBase.php +++ b/tests/e2e/Services/Realtime/RealtimeBase.php @@ -30,7 +30,7 @@ trait RealtimeBase * AND logic within the group; OR logic across multiple groups (if we ever add them). * * For now all E2E tests subscribe to a single channel, so we map queries to $channels[0]. - * + * * Slot-based format: channel[slot][]=query1&channel[slot][]=query2 * We need to manually build the query string to ensure the [] format is used. */ From a10fbc64c7467a69b96f012ed415f1c23e459551 Mon Sep 17 00:00:00 2001 From: ArnabChatterjee20k Date: Tue, 3 Feb 2026 13:46:05 +0530 Subject: [PATCH 09/11] updated subscription after permission change --- app/realtime.php | 85 +++---- src/Appwrite/Messaging/Adapter/Realtime.php | 43 +--- .../RealtimeCustomClientQueryTest.php | 219 +++++++++++++++++- 3 files changed, 253 insertions(+), 94 deletions(-) diff --git a/app/realtime.php b/app/realtime.php index b800e4c194..7171524233 100644 --- a/app/realtime.php +++ b/app/realtime.php @@ -482,47 +482,24 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats, $user = $database->getDocument('users', $userId); $roles = $user->getRoles($database->getAuthorization()); - $channels = $realtime->connections[$connection]['channels'] ?? []; $authorization = $realtime->connections[$connection]['authorization'] ?? null; - $oldSubscriptionMapping = $realtime->connections[$connection]['subscriptions'] ?? []; - // Get subscription metadata from subscriptions tree before unsubscribing - $subscriptionMetadata = $realtime->getSubscriptionIds($connection); + $subscriptionMetadata = $realtime->getSubscriptionMetadata($connection); $realtime->unsubscribe($connection); - // Re-subscribe with the original subscription structure and queries - $newSubscriptionMapping = []; - if (!empty($subscriptionMetadata)) { - // Restore each subscription with its original channels and queries - foreach ($subscriptionMetadata as $oldSubId => $metadata) { - $subscriptionId = ID::unique(); - - // Parse query strings back to Query objects for subscribe - $queries = Query::parseQueries($metadata['queries']); - - $realtime->subscribe( - $projectId, - $connection, - $subscriptionId, - $roles, - $metadata['channels'], - $queries - ); - - // Find the index of the old subscription ID in the mapping - $oldIndex = array_search($oldSubId, $oldSubscriptionMapping); - if ($oldIndex !== false) { - $newSubscriptionMapping[$oldIndex] = $subscriptionId; - } else { - // If not found in mapping, use a new index - $newSubscriptionMapping[] = $subscriptionId; - } - } + foreach ($subscriptionMetadata as $subscriptionId => $metadata) { + $queries = Query::parseQueries($metadata['queries'] ?? []); + $realtime->subscribe( + $projectId, + $connection, + $subscriptionId, + $roles, + $metadata['channels'] ?? [], + $queries + ); } - $realtime->connections[$connection]['subscriptions'] = $newSubscriptionMapping; - // Restore authorization after subscribe if ($authorization !== null) { $realtime->connections[$connection]['authorization'] = $authorization; @@ -662,10 +639,8 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, // Generate subscription IDs and subscribe $subscriptionMapping = []; foreach ($subscriptionsByIndex as $index => $subscription) { - // Generate unique subscription ID $subscriptionId = ID::unique(); - // Subscribe with all channels for this subscription $realtime->subscribe( $project->getId(), $connection, @@ -675,12 +650,10 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server, $subscription['queries'] // Query objects ); - // Store mapping: subscription index -> subscription ID $subscriptionMapping[$index] = $subscriptionId; } $realtime->connections[$connection]['authorization'] = $authorization; - $realtime->connections[$connection]['subscriptions'] = $subscriptionMapping; $user = empty($user->getId()) ? null : $response->output($user, Response::MODEL_ACCOUNT); @@ -821,26 +794,26 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re $channelNames = $realtime->connections[$connection]['channels'] ?? []; $channels = Realtime::convertChannels(array_flip($channelNames), $user->getId()); - // Preserve authorization and subscription mapping before subscribe overwrites the connection array $authorization = $realtime->connections[$connection]['authorization'] ?? null; - $subscriptionMapping = $realtime->connections[$connection]['subscriptions'] ?? []; + $projectId = $realtime->connections[$connection]['projectId'] ?? null; - // Re-subscribe with the same subscription structure - // Note: We can't fully reconstruct the original subscription indices from stored data, - // so we'll create new subscriptions. In practice, authentication should preserve - // the subscription structure, but this is a limitation of the current design. - if (!empty($channelNames)) { - // Create a single subscription with select("*") for all channels - $subscriptionId = ID::unique(); - $realtime->subscribe( - $realtime->connections[$connection]['projectId'], - $connection, - $subscriptionId, - $roles, - $channelNames, - [Query::select(['*'])] - ); - $realtime->connections[$connection]['subscriptions'] = [0 => $subscriptionId]; + $subscriptionMetadata = $realtime->getSubscriptionMetadata($connection); + + $realtime->unsubscribe($connection); + + if (!empty($projectId)) { + foreach ($subscriptionMetadata as $subscriptionId => $metadata) { + $queries = Query::parseQueries($metadata['queries'] ?? []); + + $realtime->subscribe( + $projectId, + $connection, + $subscriptionId, + $roles, + $metadata['channels'] ?? [], + $queries + ); + } } // Restore authorization after subscribe diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 586996186d..3ec6a99e6e 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -94,28 +94,11 @@ class Realtime extends MessagingAdapter } // Update connection info - if (!isset($this->connections[$identifier])) { - $this->connections[$identifier] = [ - 'projectId' => $projectId, - 'roles' => $roles, - 'channels' => $channels, - 'subscriptions' => [] - ]; - } else { - // Update existing connection info - $this->connections[$identifier]['projectId'] = $projectId; - $this->connections[$identifier]['roles'] = $roles; - $this->connections[$identifier]['channels'] = $channels; - // Initialize subscriptions array if not exists - if (!isset($this->connections[$identifier]['subscriptions'])) { - $this->connections[$identifier]['subscriptions'] = []; - } - } - - // Add subscription ID to connections array - if (!in_array($subscriptionId, $this->connections[$identifier]['subscriptions'], true)) { - $this->connections[$identifier]['subscriptions'][] = $subscriptionId; - } + $this->connections[$identifier] = [ + 'projectId' => $projectId, + 'roles' => $roles, + 'channels' => $channels + ]; } /** @@ -125,7 +108,7 @@ class Realtime extends MessagingAdapter * @param mixed $connection Connection ID * @return array Array of [subscriptionId => ['channels' => string[], 'queries' => string[]]] */ - public function getSubscriptionIds(mixed $connection): array + public function getSubscriptionMetadata(mixed $connection): array { $projectId = $this->connections[$connection]['projectId'] ?? null; $roles = $this->connections[$connection]['roles'] ?? []; @@ -195,11 +178,6 @@ class Realtime extends MessagingAdapter unset($this->subscriptions[$projectId]); } - // Remove subscriptions array from connection before unsetting - if (isset($this->connections[$connection]['subscriptions'])) { - unset($this->connections[$connection]['subscriptions']); - } - if (isset($this->connections[$connection])) { unset($this->connections[$connection]); } @@ -311,7 +289,6 @@ class Realtime extends MessagingAdapter $parsed = Query::parseQueries([$queryString]); $parsedQueries = array_merge($parsedQueries, $parsed); } - // Check if this subscription matches (AND logic within subscription) if (!empty(RuntimeQuery::filter($parsedQueries, $payload))) { // This subscription matched - add subscription ID to matched subscriptions @@ -401,12 +378,10 @@ class Realtime extends MessagingAdapter continue; } - // Ensure it's an array if (!is_array($channelSubscriptions)) { $channelSubscriptions = [$channelSubscriptions]; } - // Process each subscription index for this channel foreach ($channelSubscriptions as $subscriptionIndex => $subscription) { if (!isset($subscriptionsByIndex[$subscriptionIndex])) { $subscriptionsByIndex[$subscriptionIndex] = [ @@ -415,19 +390,13 @@ class Realtime extends MessagingAdapter ]; } - // Add this channel to the subscription if (!in_array($channel, $subscriptionsByIndex[$subscriptionIndex]['channels'])) { $subscriptionsByIndex[$subscriptionIndex]['channels'][] = $channel; } - // Set queries for this subscription (queries are the same across all channels in a subscription) if (empty($subscriptionsByIndex[$subscriptionIndex]['queries'])) { - // Ensure $subscription is an array (handle both array and string inputs) $queriesToParse = is_array($subscription) ? $subscription : [$subscription]; - - // Parse and validate the queries $parsedQueries = self::convertQueries($queriesToParse); - // Store Query objects $subscriptionsByIndex[$subscriptionIndex]['queries'] = $parsedQueries; } } diff --git a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php index 296644eabf..e93e955f1a 100644 --- a/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php +++ b/tests/e2e/Services/Realtime/RealtimeCustomClientQueryTest.php @@ -87,7 +87,6 @@ class RealtimeCustomClientQueryTest extends Scope // Should timeout - no event should be received try { $data = $client->receive(); - var_dump($data); $this->fail('Expected TimeoutException - event should be filtered'); } catch (TimeoutException $e) { $this->assertTrue(true); @@ -1868,4 +1867,222 @@ class RealtimeCustomClientQueryTest extends Scope $clientQ1->close(); $clientQ2->close(); } + + public function testSubscriptionPreservedAfterPermissionChange() + { + $user = $this->getUser(); + $session = $user['session'] ?? ''; + $projectId = $this->getProject()['$id']; + $userId = $user['$id'] ?? ''; + + // Setup database and collection + $database = $this->client->call(Client::METHOD_POST, '/databases', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'databaseId' => ID::unique(), + 'name' => 'Permission Change Test DB', + ]); + $databaseId = $database['body']['$id']; + + $collection = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'collectionId' => ID::unique(), + 'name' => 'Permission Change Collection', + 'permissions' => [ + Permission::create(Role::user($userId)), + Permission::read(Role::user($userId)), + ], + 'documentSecurity' => true, + ]); + $collectionId = $collection['body']['$id']; + + $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/attributes/string', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'] + ]), [ + 'key' => 'status', + 'size' => 256, + 'required' => false, + ]); + + sleep(2); + + $targetDocumentId = ID::unique(); + + // Subscribe with query for specific document ID + $client = $this->getWebsocket(['documents'], [ + 'origin' => 'http://localhost', + 'cookie' => 'a_session_' . $projectId . '=' . $session, + ], null, [ + Query::equal('$id', [$targetDocumentId])->toString(), + ]); + + $response = json_decode($client->receive(), true); + $this->assertEquals('connected', $response['type']); + $this->assertArrayHasKey('subscriptions', $response['data']); + $this->assertIsArray($response['data']['subscriptions']); + + // Store the original subscription mapping (index => subscriptionId) + $originalSubscriptionMapping = $response['data']['subscriptions']; + $this->assertNotEmpty($originalSubscriptionMapping); + // Get the first subscription ID and its index + $originalIndex = array_key_first($originalSubscriptionMapping); + $originalSubscriptionId = $originalSubscriptionMapping[$originalIndex]; + + // Create document with matching ID - should receive event + $document = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'documentId' => $targetDocumentId, + 'data' => [ + 'status' => 'active' + ], + 'permissions' => [ + Permission::read(Role::user($userId)), + Permission::update(Role::user($userId)), + ], + ]); + + $event = json_decode($client->receive(), true); + $this->assertEquals('event', $event['type']); + $this->assertEquals($targetDocumentId, $event['data']['payload']['$id']); + $this->assertArrayHasKey('subscriptions', $event['data']); + $this->assertContains($originalSubscriptionId, $event['data']['subscriptions']); + + // Trigger permission change by creating a team owned by a DIFFERENT user, + $teamOwnerEmail = uniqid() . 'owner@localhost.test'; + $teamOwnerPassword = 'password'; + + $teamOwner = $this->client->call(Client::METHOD_POST, '/account', [ + 'origin' => 'http://localhost', + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], [ + 'userId' => ID::unique(), + 'email' => $teamOwnerEmail, + 'password' => $teamOwnerPassword, + 'name' => 'Team Owner', + ]); + + $this->assertEquals(201, $teamOwner['headers']['status-code']); + + $teamOwnerSession = $this->client->call(Client::METHOD_POST, '/account/sessions/email', [ + 'origin' => 'http://localhost', + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], [ + 'email' => $teamOwnerEmail, + 'password' => $teamOwnerPassword, + ]); + + $teamOwnerSession = $teamOwnerSession['cookies']['a_session_' . $projectId] ?? ''; + + $team = $this->client->call(Client::METHOD_POST, '/teams', [ + 'origin' => 'http://localhost', + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'cookie' => 'a_session_' . $projectId . '=' . $teamOwnerSession, + ], [ + 'teamId' => ID::unique(), + 'name' => 'Test Team', + ]); + $teamId = $team['body']['$id']; + + $this->client->call(Client::METHOD_POST, '/teams/' . $teamId . '/memberships', [ + 'origin' => 'http://localhost', + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + 'x-appwrite-key' => $this->getProject()['apiKey'], + ], [ + 'email' => $user['email'], + 'roles' => ['member'], + 'url' => 'http://localhost', + ]); + + sleep(3); + + // Verify subscription is still working after permission change + $nonMatchingDocumentId = ID::unique(); + $document2 = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'documentId' => $nonMatchingDocumentId, + 'data' => [ + 'status' => 'active' + ], + 'permissions' => [ + Permission::read(Role::user($userId)), + Permission::update(Role::user($userId)), + ], + ]); + + // This document doesn't match the query, so we shouldn't receive it + try { + $data = $client->receive(); + $this->fail('Expected TimeoutException - document does not match query after permission change'); + } catch (TimeoutException $e) { + $this->assertTrue(true); + } + + // Create a NEW document with a different ID - should NOT receive event + $targetDocumentId2 = ID::unique(); + $document3 = $this->client->call(Client::METHOD_POST, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents', array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'documentId' => $targetDocumentId2, + 'data' => [ + 'status' => 'active' + ], + 'permissions' => [ + Permission::read(Role::user($userId)), + Permission::update(Role::user($userId)), + ], + ]); + + sleep(2); + + // This should NOT receive event because the query is for $targetDocumentId, not $targetDocumentId2 + // This verifies the query is preserved after permission change + try { + $data = $client->receive(); + $this->fail('Expected TimeoutException - new document does not match original query after permission change'); + } catch (TimeoutException $e) { + $this->assertTrue(true); + } + + // Create a document with the ORIGINAL matching ID - should receive event + $document4 = $this->client->call(Client::METHOD_PATCH, '/databases/' . $databaseId . '/collections/' . $collectionId . '/documents/' . $targetDocumentId, array_merge([ + 'content-type' => 'application/json', + 'x-appwrite-project' => $projectId, + ], $this->getHeaders()), [ + 'data' => [ + 'status' => 'updated-after-permission-change' + ], + ]); + + // Wait a bit for the event to be processed + sleep(3); + + // Verify the event is received with the preserved subscription + $event2 = json_decode($client->receive(), true); + $this->assertEquals('event', $event2['type']); + $this->assertEquals($targetDocumentId, $event2['data']['payload']['$id']); + $this->assertEquals('updated-after-permission-change', $event2['data']['payload']['status']); + $this->assertArrayHasKey('subscriptions', $event2['data']); + $this->assertIsArray($event2['data']['subscriptions']); + $this->assertNotEmpty($event2['data']['subscriptions']); + // Subscription ID should remain stable after permission change + $this->assertContains($originalSubscriptionId, $event2['data']['subscriptions']); + + $client->close(); + } } From 43196123589ce106080eb4005a5512779adbd81b Mon Sep 17 00:00:00 2001 From: ArnabChatterjee20k Date: Tue, 3 Feb 2026 14:16:35 +0530 Subject: [PATCH 10/11] updated unit tests --- src/Appwrite/Messaging/Adapter/Realtime.php | 2 -- .../unit/Messaging/MessagingChannelsTest.php | 23 +++++++++++++------ tests/unit/Messaging/MessagingGuestTest.php | 8 +++++-- tests/unit/Messaging/MessagingTest.php | 8 +++++-- 4 files changed, 28 insertions(+), 13 deletions(-) diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 3ec6a99e6e..04c916fca4 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -283,7 +283,6 @@ class Realtime extends MessagingAdapter // Process each subscription (OR logic across subscriptions) foreach ($subscriptions as $subId => $queryStrings) { - // Parse all queries in this subscription $parsedQueries = []; foreach ($queryStrings as $queryString) { $parsed = Query::parseQueries([$queryString]); @@ -291,7 +290,6 @@ class Realtime extends MessagingAdapter } // Check if this subscription matches (AND logic within subscription) if (!empty(RuntimeQuery::filter($parsedQueries, $payload))) { - // This subscription matched - add subscription ID to matched subscriptions $matchedSubscriptions[$subId] = $queryStrings; } } diff --git a/tests/unit/Messaging/MessagingChannelsTest.php b/tests/unit/Messaging/MessagingChannelsTest.php index fc3fa802ad..0e925e22f2 100644 --- a/tests/unit/Messaging/MessagingChannelsTest.php +++ b/tests/unit/Messaging/MessagingChannelsTest.php @@ -81,11 +81,13 @@ class MessagingChannelsTest extends TestCase $roles = $user->getRoles($this->getAuthorization()); - $parsedChannels = Realtime::convertChannels([0 => $channel], $user->getId()); + // Normalize channels to the format Realtime::subscribe expects (plain channel names) + $parsedChannels = array_keys(Realtime::convertChannels([0 => $channel], $user->getId())); $this->realtime->subscribe( '1', $this->connectionsCount, + ID::unique(), $roles, $parsedChannels ); @@ -105,11 +107,13 @@ class MessagingChannelsTest extends TestCase $roles = $user->getRoles($this->getAuthorization()); - $parsedChannels = Realtime::convertChannels([0 => $channel], $user->getId()); + // Normalize channels to the format Realtime::subscribe expects (plain channel names) + $parsedChannels = array_keys(Realtime::convertChannels([0 => $channel], $user->getId())); $this->realtime->subscribe( '1', $this->connectionsCount, + ID::unique(), $roles, $parsedChannels ); @@ -183,7 +187,8 @@ class MessagingChannelsTest extends TestCase 'data' => [ 'channels' => [ 0 => $channel, - ] + ], + 'payload' => ['_match' => true], ] ]; @@ -220,7 +225,8 @@ class MessagingChannelsTest extends TestCase 'data' => [ 'channels' => [ 0 => $channel, - ] + ], + 'payload' => ['_match' => true], ] ]; @@ -255,7 +261,8 @@ class MessagingChannelsTest extends TestCase 'data' => [ 'channels' => [ 0 => $channel, - ] + ], + 'payload' => ['_match' => true], ] ]; @@ -290,7 +297,8 @@ class MessagingChannelsTest extends TestCase 'data' => [ 'channels' => [ 0 => $channel, - ] + ], + 'payload' => ['_match' => true], ] ]; @@ -323,7 +331,8 @@ class MessagingChannelsTest extends TestCase 'data' => [ 'channels' => [ 0 => $channel, - ] + ], + 'payload' => ['_match' => true], ] ]; diff --git a/tests/unit/Messaging/MessagingGuestTest.php b/tests/unit/Messaging/MessagingGuestTest.php index f6e2cb67b3..79ef0e0d33 100644 --- a/tests/unit/Messaging/MessagingGuestTest.php +++ b/tests/unit/Messaging/MessagingGuestTest.php @@ -16,8 +16,10 @@ class MessagingGuestTest extends TestCase $realtime->subscribe( '1', 1, + ID::unique(), [Role::guests()->toString()], - ['files' => 0, 'documents' => 0, 'documents.789' => 0, 'account.123' => 0] + // Pass plain channel names, Realtime::subscribe will normalize them + ['files', 'documents', 'documents.789', 'account.123'] ); $event = [ @@ -27,7 +29,9 @@ class MessagingGuestTest extends TestCase 'channels' => [ 0 => 'documents', 1 => 'documents', - ] + ], + // Non-empty payload so default select(\"*\") subscriptions match + 'payload' => ['_match' => true], ] ]; diff --git a/tests/unit/Messaging/MessagingTest.php b/tests/unit/Messaging/MessagingTest.php index dfcb7f2fff..96bba8b72d 100644 --- a/tests/unit/Messaging/MessagingTest.php +++ b/tests/unit/Messaging/MessagingTest.php @@ -26,6 +26,7 @@ class MessagingTest extends TestCase $realtime->subscribe( '1', 1, + ID::unique(), [ Role::user(ID::custom('123'))->toString(), Role::users()->toString(), @@ -35,7 +36,8 @@ class MessagingTest extends TestCase Role::team(ID::custom('def'))->toString(), Role::team(ID::custom('def'), 'guest')->toString(), ], - ['files' => 0, 'documents' => 0, 'documents.789' => 0, 'account.123' => 0] + // Pass plain channel names, Realtime::subscribe will normalize them + ['files', 'documents', 'documents.789', 'account.123'] ); $event = [ @@ -44,7 +46,9 @@ class MessagingTest extends TestCase 'data' => [ 'channels' => [ 0 => 'account.123', - ] + ], + // Non-empty payload so default select(\"*\") subscriptions match + 'payload' => ['_match' => true], ] ]; From 0528c0d69381e1137c5b9933982c735ebad8cb33 Mon Sep 17 00:00:00 2001 From: ArnabChatterjee20k Date: Tue, 3 Feb 2026 14:29:34 +0530 Subject: [PATCH 11/11] updated the case for select all and empty payload --- src/Appwrite/Messaging/Adapter/Realtime.php | 4 +++- tests/unit/Messaging/MessagingChannelsTest.php | 15 +++++---------- tests/unit/Messaging/MessagingGuestTest.php | 2 -- tests/unit/Messaging/MessagingTest.php | 4 +--- 4 files changed, 9 insertions(+), 16 deletions(-) diff --git a/src/Appwrite/Messaging/Adapter/Realtime.php b/src/Appwrite/Messaging/Adapter/Realtime.php index 04c916fca4..033297dd65 100644 --- a/src/Appwrite/Messaging/Adapter/Realtime.php +++ b/src/Appwrite/Messaging/Adapter/Realtime.php @@ -289,7 +289,9 @@ class Realtime extends MessagingAdapter $parsedQueries = array_merge($parsedQueries, $parsed); } // Check if this subscription matches (AND logic within subscription) - if (!empty(RuntimeQuery::filter($parsedQueries, $payload))) { + // Or if empty payload and select all as filter will return empty payload out of it even if it passed + $isEmptyPayloadAndSelectAll = RuntimeQuery::isSelectAll($parsedQueries[0]) && empty($payload); + if ($isEmptyPayloadAndSelectAll || !empty(RuntimeQuery::filter($parsedQueries, $payload))) { $matchedSubscriptions[$subId] = $queryStrings; } } diff --git a/tests/unit/Messaging/MessagingChannelsTest.php b/tests/unit/Messaging/MessagingChannelsTest.php index 0e925e22f2..598a47a901 100644 --- a/tests/unit/Messaging/MessagingChannelsTest.php +++ b/tests/unit/Messaging/MessagingChannelsTest.php @@ -187,8 +187,7 @@ class MessagingChannelsTest extends TestCase 'data' => [ 'channels' => [ 0 => $channel, - ], - 'payload' => ['_match' => true], + ] ] ]; @@ -225,8 +224,7 @@ class MessagingChannelsTest extends TestCase 'data' => [ 'channels' => [ 0 => $channel, - ], - 'payload' => ['_match' => true], + ] ] ]; @@ -261,8 +259,7 @@ class MessagingChannelsTest extends TestCase 'data' => [ 'channels' => [ 0 => $channel, - ], - 'payload' => ['_match' => true], + ] ] ]; @@ -297,8 +294,7 @@ class MessagingChannelsTest extends TestCase 'data' => [ 'channels' => [ 0 => $channel, - ], - 'payload' => ['_match' => true], + ] ] ]; @@ -331,8 +327,7 @@ class MessagingChannelsTest extends TestCase 'data' => [ 'channels' => [ 0 => $channel, - ], - 'payload' => ['_match' => true], + ] ] ]; diff --git a/tests/unit/Messaging/MessagingGuestTest.php b/tests/unit/Messaging/MessagingGuestTest.php index 79ef0e0d33..068f9de819 100644 --- a/tests/unit/Messaging/MessagingGuestTest.php +++ b/tests/unit/Messaging/MessagingGuestTest.php @@ -30,8 +30,6 @@ class MessagingGuestTest extends TestCase 0 => 'documents', 1 => 'documents', ], - // Non-empty payload so default select(\"*\") subscriptions match - 'payload' => ['_match' => true], ] ]; diff --git a/tests/unit/Messaging/MessagingTest.php b/tests/unit/Messaging/MessagingTest.php index 96bba8b72d..4b2474c760 100644 --- a/tests/unit/Messaging/MessagingTest.php +++ b/tests/unit/Messaging/MessagingTest.php @@ -46,9 +46,7 @@ class MessagingTest extends TestCase 'data' => [ 'channels' => [ 0 => 'account.123', - ], - // Non-empty payload so default select(\"*\") subscriptions match - 'payload' => ['_match' => true], + ] ] ];