mirror of
https://github.com/appwrite/appwrite
synced 2026-04-21 13:37:16 +00:00
Merge remote-tracking branch 'origin/1.9.x' into feat-docker-geo-18x
This commit is contained in:
commit
2c5303111e
5 changed files with 647 additions and 13 deletions
|
|
@ -1070,9 +1070,6 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
|
|||
$realtime->subscribe($projectId, $connection, $subscriptionId, $roles, $channels, $queries);
|
||||
}
|
||||
|
||||
// subscribe() overwrites the connection entry; restore auth so later onMessage uses the same context.
|
||||
$realtime->connections[$connection]['authorization'] = $authorization;
|
||||
|
||||
$responsePayload = json_encode([
|
||||
'type' => 'response',
|
||||
'data' => [
|
||||
|
|
@ -1102,6 +1099,58 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
|
|||
|
||||
break;
|
||||
|
||||
case 'unsubscribe':
|
||||
if (!\is_array($message['data']) || !\array_is_list($message['data'])) {
|
||||
throw new Exception(Exception::REALTIME_MESSAGE_FORMAT_INVALID, 'Payload is not valid.');
|
||||
}
|
||||
|
||||
// Validate every payload before executing any removal so an invalid entry
|
||||
// later in the batch does not leave earlier entries half-applied on the server.
|
||||
$validatedIds = [];
|
||||
foreach ($message['data'] as $payload) {
|
||||
if (
|
||||
!\is_array($payload)
|
||||
|| !\array_key_exists('subscriptionId', $payload)
|
||||
|| !\is_string($payload['subscriptionId'])
|
||||
|| $payload['subscriptionId'] === ''
|
||||
) {
|
||||
throw new Exception(Exception::REALTIME_MESSAGE_FORMAT_INVALID, 'Each unsubscribe payload must include a non-empty subscriptionId.');
|
||||
}
|
||||
$validatedIds[] = $payload['subscriptionId'];
|
||||
}
|
||||
|
||||
$unsubscribeResults = [];
|
||||
foreach ($validatedIds as $subscriptionId) {
|
||||
$wasRemoved = $realtime->unsubscribeSubscription($connection, $subscriptionId);
|
||||
$unsubscribeResults[] = [
|
||||
'subscriptionId' => $subscriptionId,
|
||||
'removed' => $wasRemoved,
|
||||
];
|
||||
}
|
||||
|
||||
$unsubscribeResponsePayload = json_encode([
|
||||
'type' => 'response',
|
||||
'data' => [
|
||||
'to' => 'unsubscribe',
|
||||
'success' => true,
|
||||
'subscriptions' => $unsubscribeResults,
|
||||
],
|
||||
]);
|
||||
|
||||
$server->send([$connection], $unsubscribeResponsePayload);
|
||||
|
||||
if ($project !== null && !$project->isEmpty()) {
|
||||
$unsubscribeOutboundBytes = \strlen($unsubscribeResponsePayload);
|
||||
|
||||
if ($unsubscribeOutboundBytes > 0) {
|
||||
triggerStats([
|
||||
METRIC_REALTIME_OUTBOUND => $unsubscribeOutboundBytes,
|
||||
], $project->getId());
|
||||
}
|
||||
}
|
||||
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new Exception(Exception::REALTIME_MESSAGE_FORMAT_INVALID, 'Message type is not valid.');
|
||||
}
|
||||
|
|
|
|||
|
|
@ -114,14 +114,24 @@ class Realtime extends MessagingAdapter
|
|||
}
|
||||
}
|
||||
|
||||
// Keep userId from onOpen/authentication when provided.
|
||||
// Fallback to existing stored value for subsequent subscribe upserts.
|
||||
$this->connections[$identifier] = [
|
||||
// Union channels/roles across all subscriptions on the connection; overwriting would
|
||||
// leave getSubscriptionMetadata and full unsubscribe operating on stale state.
|
||||
$existing = $this->connections[$identifier] ?? [];
|
||||
$existingChannels = $existing['channels'] ?? [];
|
||||
$existingRoles = $existing['roles'] ?? [];
|
||||
|
||||
$entry = [
|
||||
'projectId' => $projectId,
|
||||
'roles' => $roles,
|
||||
'userId' => $userId ?? ($this->connections[$identifier]['userId'] ?? ''),
|
||||
'channels' => $channels
|
||||
'roles' => \array_values(\array_unique(\array_merge($existingRoles, $roles))),
|
||||
'userId' => $userId ?? ($existing['userId'] ?? ''),
|
||||
'channels' => \array_values(\array_unique(\array_merge($existingChannels, $channels))),
|
||||
];
|
||||
|
||||
if (\array_key_exists('authorization', $existing)) {
|
||||
$entry['authorization'] = $existing['authorization'];
|
||||
}
|
||||
|
||||
$this->connections[$identifier] = $entry;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -206,6 +216,87 @@ class Realtime extends MessagingAdapter
|
|||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes a single subscription from a connection, keeping the connection alive so
|
||||
* the client can resubscribe. Idempotent — returns true only when something was removed.
|
||||
*
|
||||
* @param mixed $connection
|
||||
* @param string $subscriptionId
|
||||
* @return bool
|
||||
*/
|
||||
public function unsubscribeSubscription(mixed $connection, string $subscriptionId): bool
|
||||
{
|
||||
$projectId = $this->connections[$connection]['projectId'] ?? '';
|
||||
if ($projectId === '' || !isset($this->subscriptions[$projectId])) {
|
||||
return false;
|
||||
}
|
||||
|
||||
$removed = false;
|
||||
|
||||
foreach ($this->subscriptions[$projectId] as $role => $byChannel) {
|
||||
foreach ($byChannel as $channel => $byConnection) {
|
||||
if (!isset($byConnection[$connection][$subscriptionId])) {
|
||||
continue;
|
||||
}
|
||||
|
||||
unset($this->subscriptions[$projectId][$role][$channel][$connection][$subscriptionId]);
|
||||
$removed = true;
|
||||
|
||||
if (empty($this->subscriptions[$projectId][$role][$channel][$connection])) {
|
||||
unset($this->subscriptions[$projectId][$role][$channel][$connection]);
|
||||
}
|
||||
if (empty($this->subscriptions[$projectId][$role][$channel])) {
|
||||
unset($this->subscriptions[$projectId][$role][$channel]);
|
||||
}
|
||||
}
|
||||
if (empty($this->subscriptions[$projectId][$role])) {
|
||||
unset($this->subscriptions[$projectId][$role]);
|
||||
}
|
||||
}
|
||||
|
||||
if (empty($this->subscriptions[$projectId])) {
|
||||
unset($this->subscriptions[$projectId]);
|
||||
}
|
||||
|
||||
if ($removed) {
|
||||
$this->recomputeConnectionState($connection);
|
||||
}
|
||||
|
||||
return $removed;
|
||||
}
|
||||
|
||||
/**
|
||||
* Recomputes the cached channels on the connection entry from the subscriptions tree.
|
||||
* Called after per-subscription removal so stale channel entries do not linger for later reads.
|
||||
*
|
||||
* Roles are deliberately NOT recomputed here. They represent the connection's authorization
|
||||
* context (set at onOpen, replaced on `authentication` / permission-change) and must survive
|
||||
* per-subscription removal — otherwise a client that unsubscribes every subscription and then
|
||||
* resubscribes would subscribe with an empty roles array and silently receive nothing.
|
||||
*
|
||||
* @param mixed $connection
|
||||
* @return void
|
||||
*/
|
||||
private function recomputeConnectionState(mixed $connection): void
|
||||
{
|
||||
if (!isset($this->connections[$connection])) {
|
||||
return;
|
||||
}
|
||||
|
||||
$projectId = $this->connections[$connection]['projectId'] ?? '';
|
||||
$channels = [];
|
||||
|
||||
foreach ($this->subscriptions[$projectId] ?? [] as $byChannel) {
|
||||
foreach ($byChannel as $channel => $byConnection) {
|
||||
if (isset($byConnection[$connection])) {
|
||||
$channels[$channel] = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
$this->connections[$connection]['channels'] = \array_keys($channels);
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if Channel has a subscriber.
|
||||
* @param string $projectId
|
||||
|
|
|
|||
|
|
@ -195,9 +195,25 @@ class Migrations extends Action
|
|||
$migrationOptions = $migration->getAttribute('options');
|
||||
/** @var Database|null $projectDB */
|
||||
$projectDB = null;
|
||||
if ($credentials['projectId']) {
|
||||
$useAppwriteApiSource = false;
|
||||
if ($source === SourceAppwrite::getName() && empty($credentials['projectId'])) {
|
||||
throw new \Exception('Source projectId is required for Appwrite migrations');
|
||||
}
|
||||
|
||||
if (! empty($credentials['projectId'])) {
|
||||
$this->sourceProject = $this->dbForPlatform->getDocument('projects', $credentials['projectId']);
|
||||
$projectDB = call_user_func($this->getProjectDB, $this->sourceProject);
|
||||
if ($this->sourceProject->isEmpty()) {
|
||||
throw new \Exception('Source project not found for provided projectId');
|
||||
}
|
||||
|
||||
$sourceRegion = $this->sourceProject->getAttribute('region', 'default');
|
||||
$destinationRegion = $this->project->getAttribute('region', 'default');
|
||||
$useAppwriteApiSource = $source === SourceAppwrite::getName()
|
||||
&& $destination === DestinationAppwrite::getName()
|
||||
&& $sourceRegion !== $destinationRegion;
|
||||
if (! $useAppwriteApiSource) {
|
||||
$projectDB = call_user_func($this->getProjectDB, $this->sourceProject);
|
||||
}
|
||||
}
|
||||
$getDatabasesDB = fn (Document $database): Database =>
|
||||
$this->getDatabasesDBForProject($database);
|
||||
|
|
@ -233,7 +249,7 @@ class Migrations extends Action
|
|||
$credentials['endpoint'],
|
||||
$credentials['apiKey'],
|
||||
$getDatabasesDB,
|
||||
SourceAppwrite::SOURCE_DATABASE,
|
||||
$useAppwriteApiSource ? SourceAppwrite::SOURCE_API : SourceAppwrite::SOURCE_DATABASE,
|
||||
$projectDB,
|
||||
$queries
|
||||
),
|
||||
|
|
@ -578,9 +594,10 @@ class Migrations extends Action
|
|||
|
||||
protected function getDatabasesDBForProject(Document $database)
|
||||
{
|
||||
if ($this->sourceProject) {
|
||||
if (isset($this->sourceProject) && ! $this->sourceProject->isEmpty()) {
|
||||
return ($this->getDatabasesDB)($database, $this->sourceProject);
|
||||
}
|
||||
|
||||
return ($this->getDatabasesDB)($database);
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -164,6 +164,20 @@ class RealtimeCustomClientQueryTestWithMessage extends Scope
|
|||
return $response;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param array<int, array<string, mixed>> $payloadEntries
|
||||
* @return array<string, mixed>
|
||||
*/
|
||||
private function sendUnsubscribeMessage(WebSocketClient $client, array $payloadEntries): array
|
||||
{
|
||||
$client->send(\json_encode([
|
||||
'type' => 'unsubscribe',
|
||||
'data' => $payloadEntries,
|
||||
]));
|
||||
|
||||
return \json_decode($client->receive(), true);
|
||||
}
|
||||
|
||||
/**
|
||||
* subscriptionId: update with id from connected, create by omitting id, explicit new id,
|
||||
* duplicate id in one bulk (last wins), mixed bulk, idempotent repeat, empty queries → select-all.
|
||||
|
|
@ -293,6 +307,282 @@ class RealtimeCustomClientQueryTestWithMessage extends Scope
|
|||
$client->close();
|
||||
}
|
||||
|
||||
/**
|
||||
* Update a subscription's queries/channels by reusing its subscriptionId.
|
||||
* Verifies the update takes effect on live event filtering (not just the response echo),
|
||||
* sibling subscriptions are untouched, unknown ids upsert as new, empty queries fall
|
||||
* back to select-all, and a removed id can be recreated by subscribing again.
|
||||
*/
|
||||
public function testUpdateSubscriptionAndEdgeCases(): void
|
||||
{
|
||||
$user = $this->getUser();
|
||||
$userId = $user['$id'] ?? '';
|
||||
$session = $user['session'] ?? '';
|
||||
$projectId = $this->getProject()['$id'];
|
||||
$headers = [
|
||||
'origin' => 'http://localhost',
|
||||
'cookie' => 'a_session_' . $projectId . '=' . $session,
|
||||
];
|
||||
|
||||
$queryString = \http_build_query(['project' => $projectId]);
|
||||
$client = new WebSocketClient(
|
||||
'ws://appwrite.test/v1/realtime?' . $queryString,
|
||||
[
|
||||
'headers' => $headers,
|
||||
'timeout' => 10,
|
||||
]
|
||||
);
|
||||
$connected = \json_decode($client->receive(), true);
|
||||
$this->assertEquals('connected', $connected['type'] ?? null);
|
||||
|
||||
$triggerAccountEvent = function () use ($projectId, $session): void {
|
||||
$this->client->call(Client::METHOD_PATCH, '/account/name', \array_merge([
|
||||
'origin' => 'http://localhost',
|
||||
'content-type' => 'application/json',
|
||||
'x-appwrite-project' => $projectId,
|
||||
'cookie' => 'a_session_' . $projectId . '=' . $session,
|
||||
]), ['name' => 'Update Sub Test ' . \uniqid()]);
|
||||
};
|
||||
|
||||
// subA matches current user, subB never matches
|
||||
$created = $this->sendSubscribeMessage($client, [
|
||||
[
|
||||
'channels' => ['account'],
|
||||
'queries' => [Query::equal('$id', [$userId])->toString()],
|
||||
],
|
||||
[
|
||||
'channels' => ['account'],
|
||||
'queries' => [Query::equal('$id', ['no-match-initial'])->toString()],
|
||||
],
|
||||
]);
|
||||
$subA = $created['data']['subscriptions'][0]['subscriptionId'];
|
||||
$subB = $created['data']['subscriptions'][1]['subscriptionId'];
|
||||
$this->assertNotSame($subA, $subB);
|
||||
|
||||
$triggerAccountEvent();
|
||||
$event = \json_decode($client->receive(), true);
|
||||
$this->assertEquals('event', $event['type']);
|
||||
$this->assertSame([$subA], $event['data']['subscriptions']);
|
||||
|
||||
// Swap: A -> non-matching, B -> matching. Same ids returned, server-side filter swaps.
|
||||
$swap = $this->sendSubscribeMessage($client, [
|
||||
[
|
||||
'subscriptionId' => $subA,
|
||||
'channels' => ['account'],
|
||||
'queries' => [Query::equal('$id', ['no-match-swapped'])->toString()],
|
||||
],
|
||||
[
|
||||
'subscriptionId' => $subB,
|
||||
'channels' => ['account'],
|
||||
'queries' => [Query::equal('$id', [$userId])->toString()],
|
||||
],
|
||||
]);
|
||||
$this->assertSame($subA, $swap['data']['subscriptions'][0]['subscriptionId']);
|
||||
$this->assertSame($subB, $swap['data']['subscriptions'][1]['subscriptionId']);
|
||||
|
||||
$triggerAccountEvent();
|
||||
$event = \json_decode($client->receive(), true);
|
||||
$this->assertEquals('event', $event['type']);
|
||||
$this->assertSame([$subB], $event['data']['subscriptions']);
|
||||
|
||||
// Sibling isolation: updating only subA must leave subB's matching filter intact.
|
||||
$isolation = $this->sendSubscribeMessage($client, [[
|
||||
'subscriptionId' => $subA,
|
||||
'channels' => ['account'],
|
||||
'queries' => [Query::equal('$id', [$userId])->toString()],
|
||||
]]);
|
||||
$this->assertSame($subA, $isolation['data']['subscriptions'][0]['subscriptionId']);
|
||||
|
||||
$triggerAccountEvent();
|
||||
$event = \json_decode($client->receive(), true);
|
||||
$this->assertEquals('event', $event['type']);
|
||||
$this->assertEqualsCanonicalizing([$subA, $subB], $event['data']['subscriptions']);
|
||||
|
||||
// Empty queries on update -> select-all; subA still matches every event on the channel.
|
||||
$empty = $this->sendSubscribeMessage($client, [[
|
||||
'subscriptionId' => $subA,
|
||||
'channels' => ['account'],
|
||||
'queries' => [],
|
||||
]]);
|
||||
$this->assertSame($subA, $empty['data']['subscriptions'][0]['subscriptionId']);
|
||||
|
||||
$triggerAccountEvent();
|
||||
$event = \json_decode($client->receive(), true);
|
||||
$this->assertEquals('event', $event['type']);
|
||||
$this->assertEqualsCanonicalizing([$subA, $subB], $event['data']['subscriptions']);
|
||||
|
||||
// Unknown subscriptionId upserts as a new subscription.
|
||||
$ghostId = ID::unique();
|
||||
$ghost = $this->sendSubscribeMessage($client, [[
|
||||
'subscriptionId' => $ghostId,
|
||||
'channels' => ['account'],
|
||||
'queries' => [Query::equal('$id', [$userId])->toString()],
|
||||
]]);
|
||||
$this->assertSame($ghostId, $ghost['data']['subscriptions'][0]['subscriptionId']);
|
||||
$this->assertNotSame($subA, $ghostId);
|
||||
$this->assertNotSame($subB, $ghostId);
|
||||
|
||||
$triggerAccountEvent();
|
||||
$event = \json_decode($client->receive(), true);
|
||||
$this->assertEquals('event', $event['type']);
|
||||
$this->assertEqualsCanonicalizing([$subA, $subB, $ghostId], $event['data']['subscriptions']);
|
||||
|
||||
// Update after unsubscribe: subscribing with the removed id recreates it.
|
||||
$unsub = $this->sendUnsubscribeMessage($client, [['subscriptionId' => $subA]]);
|
||||
$this->assertTrue($unsub['data']['subscriptions'][0]['removed']);
|
||||
|
||||
$triggerAccountEvent();
|
||||
$event = \json_decode($client->receive(), true);
|
||||
$this->assertEquals('event', $event['type']);
|
||||
$this->assertEqualsCanonicalizing([$subB, $ghostId], $event['data']['subscriptions']);
|
||||
|
||||
$recreated = $this->sendSubscribeMessage($client, [[
|
||||
'subscriptionId' => $subA,
|
||||
'channels' => ['account'],
|
||||
'queries' => [Query::equal('$id', [$userId])->toString()],
|
||||
]]);
|
||||
$this->assertSame($subA, $recreated['data']['subscriptions'][0]['subscriptionId']);
|
||||
|
||||
$triggerAccountEvent();
|
||||
$event = \json_decode($client->receive(), true);
|
||||
$this->assertEquals('event', $event['type']);
|
||||
$this->assertEqualsCanonicalizing([$subA, $subB, $ghostId], $event['data']['subscriptions']);
|
||||
|
||||
$client->close();
|
||||
}
|
||||
|
||||
public function testUnsubscribeRemovesOnlyMatchingSubscription(): void
|
||||
{
|
||||
$user = $this->getUser();
|
||||
$userId = $user['$id'] ?? '';
|
||||
$session = $user['session'] ?? '';
|
||||
$projectId = $this->getProject()['$id'];
|
||||
$headers = [
|
||||
'origin' => 'http://localhost',
|
||||
'cookie' => 'a_session_' . $projectId . '=' . $session,
|
||||
];
|
||||
|
||||
$queryString = \http_build_query(['project' => $projectId]);
|
||||
$client = new WebSocketClient(
|
||||
'ws://appwrite.test/v1/realtime?' . $queryString,
|
||||
[
|
||||
'headers' => $headers,
|
||||
'timeout' => 10,
|
||||
]
|
||||
);
|
||||
|
||||
$connected = \json_decode($client->receive(), true);
|
||||
$this->assertEquals('connected', $connected['type'] ?? null);
|
||||
|
||||
// Two subscriptions on the `account` channel, both matching the current user
|
||||
$r1 = $this->sendSubscribeMessage($client, [[
|
||||
'channels' => ['account'],
|
||||
'queries' => [Query::equal('$id', [$userId])->toString()],
|
||||
]]);
|
||||
$subA = $r1['data']['subscriptions'][0]['subscriptionId'];
|
||||
|
||||
$r2 = $this->sendSubscribeMessage($client, [[
|
||||
'channels' => ['account'],
|
||||
'queries' => [Query::select(['*'])->toString()],
|
||||
]]);
|
||||
$subB = $r2['data']['subscriptions'][0]['subscriptionId'];
|
||||
|
||||
$this->assertNotSame($subA, $subB);
|
||||
|
||||
// Trigger an event -- both subscriptions should match
|
||||
$name = 'Unsubscribe Test ' . \uniqid();
|
||||
$this->client->call(Client::METHOD_PATCH, '/account/name', \array_merge([
|
||||
'origin' => 'http://localhost',
|
||||
'content-type' => 'application/json',
|
||||
'x-appwrite-project' => $projectId,
|
||||
'cookie' => 'a_session_' . $projectId . '=' . $session,
|
||||
]), ['name' => $name]);
|
||||
|
||||
$event = \json_decode($client->receive(), true);
|
||||
$this->assertEquals('event', $event['type']);
|
||||
$this->assertEqualsCanonicalizing([$subA, $subB], $event['data']['subscriptions']);
|
||||
|
||||
// Unsubscribe subA only
|
||||
$unsubA = $this->sendUnsubscribeMessage($client, [['subscriptionId' => $subA]]);
|
||||
$this->assertEquals('response', $unsubA['type']);
|
||||
$this->assertEquals('unsubscribe', $unsubA['data']['to']);
|
||||
$this->assertTrue($unsubA['data']['success']);
|
||||
$this->assertCount(1, $unsubA['data']['subscriptions']);
|
||||
$this->assertSame($subA, $unsubA['data']['subscriptions'][0]['subscriptionId']);
|
||||
$this->assertTrue($unsubA['data']['subscriptions'][0]['removed']);
|
||||
|
||||
// Trigger another event -- only subB should match now
|
||||
$name = 'Unsubscribe Test ' . \uniqid();
|
||||
$this->client->call(Client::METHOD_PATCH, '/account/name', \array_merge([
|
||||
'origin' => 'http://localhost',
|
||||
'content-type' => 'application/json',
|
||||
'x-appwrite-project' => $projectId,
|
||||
'cookie' => 'a_session_' . $projectId . '=' . $session,
|
||||
]), ['name' => $name]);
|
||||
|
||||
$event = \json_decode($client->receive(), true);
|
||||
$this->assertEquals('event', $event['type']);
|
||||
$this->assertSame([$subB], $event['data']['subscriptions']);
|
||||
|
||||
// Idempotent: unsubscribing subA again reports removed=false
|
||||
$unsubAgain = $this->sendUnsubscribeMessage($client, [['subscriptionId' => $subA]]);
|
||||
$this->assertTrue($unsubAgain['data']['success']);
|
||||
$this->assertFalse($unsubAgain['data']['subscriptions'][0]['removed']);
|
||||
|
||||
// Connection is still alive -- ping still works
|
||||
$client->send(\json_encode(['type' => 'ping']));
|
||||
$pong = \json_decode($client->receive(), true);
|
||||
$this->assertEquals('pong', $pong['type']);
|
||||
|
||||
// Invalid payloads are rejected
|
||||
$errNonString = $this->sendUnsubscribeMessage($client, [['subscriptionId' => 123]]);
|
||||
$this->assertEquals('error', $errNonString['type']);
|
||||
$this->assertStringContainsString('subscriptionId', $errNonString['data']['message']);
|
||||
|
||||
$errEmpty = $this->sendUnsubscribeMessage($client, [['subscriptionId' => '']]);
|
||||
$this->assertEquals('error', $errEmpty['type']);
|
||||
|
||||
$errMissing = $this->sendUnsubscribeMessage($client, [['channels' => ['foo']]]);
|
||||
$this->assertEquals('error', $errMissing['type']);
|
||||
|
||||
$errNonList = $this->sendUnsubscribeMessage($client, ['subscriptionId' => $subB]);
|
||||
$this->assertEquals('error', $errNonList['type']);
|
||||
|
||||
// A batch with a valid id followed by an invalid one must be rejected atomically:
|
||||
// the valid id must remain subscribed, not be quietly removed before validation fails.
|
||||
$partial = $this->sendUnsubscribeMessage($client, [
|
||||
['subscriptionId' => $subB],
|
||||
['subscriptionId' => 999],
|
||||
]);
|
||||
$this->assertEquals('error', $partial['type']);
|
||||
|
||||
$name = 'Partial Rejection Test ' . \uniqid();
|
||||
$this->client->call(Client::METHOD_PATCH, '/account/name', \array_merge([
|
||||
'origin' => 'http://localhost',
|
||||
'content-type' => 'application/json',
|
||||
'x-appwrite-project' => $projectId,
|
||||
'cookie' => 'a_session_' . $projectId . '=' . $session,
|
||||
]), ['name' => $name]);
|
||||
|
||||
$event = \json_decode($client->receive(), true);
|
||||
$this->assertEquals('event', $event['type']);
|
||||
$this->assertSame([$subB], $event['data']['subscriptions']);
|
||||
|
||||
// Bulk unsubscribe: remaining subB plus a never-existed id -- response mirrors input order
|
||||
$bulk = $this->sendUnsubscribeMessage($client, [
|
||||
['subscriptionId' => $subB],
|
||||
['subscriptionId' => 'does-not-exist'],
|
||||
]);
|
||||
$this->assertTrue($bulk['data']['success']);
|
||||
$this->assertCount(2, $bulk['data']['subscriptions']);
|
||||
$this->assertSame($subB, $bulk['data']['subscriptions'][0]['subscriptionId']);
|
||||
$this->assertTrue($bulk['data']['subscriptions'][0]['removed']);
|
||||
$this->assertSame('does-not-exist', $bulk['data']['subscriptions'][1]['subscriptionId']);
|
||||
$this->assertFalse($bulk['data']['subscriptions'][1]['removed']);
|
||||
|
||||
$client->close();
|
||||
}
|
||||
|
||||
public function testInvalidQueryShouldNotSubscribe(): void
|
||||
{
|
||||
$user = $this->getUser();
|
||||
|
|
|
|||
|
|
@ -147,6 +147,193 @@ class MessagingTest extends TestCase
|
|||
$this->assertEmpty($realtime->subscriptions);
|
||||
}
|
||||
|
||||
public function testSubscribeUnionsChannelsAndRoles(): void
|
||||
{
|
||||
$realtime = new Realtime();
|
||||
|
||||
$realtime->subscribe(
|
||||
'1',
|
||||
1,
|
||||
'sub-a',
|
||||
[Role::user(ID::custom('123'))->toString()],
|
||||
['documents'],
|
||||
);
|
||||
|
||||
$realtime->subscribe(
|
||||
'1',
|
||||
1,
|
||||
'sub-b',
|
||||
[Role::users()->toString()],
|
||||
['files'],
|
||||
);
|
||||
|
||||
$connection = $realtime->connections[1];
|
||||
|
||||
$this->assertContains('documents', $connection['channels']);
|
||||
$this->assertContains('files', $connection['channels']);
|
||||
$this->assertContains(Role::user(ID::custom('123'))->toString(), $connection['roles']);
|
||||
$this->assertContains(Role::users()->toString(), $connection['roles']);
|
||||
$this->assertCount(2, $connection['channels']);
|
||||
$this->assertCount(2, $connection['roles']);
|
||||
}
|
||||
|
||||
public function testUnsubscribeSubscriptionRemovesOnlyOneSubscription(): void
|
||||
{
|
||||
$realtime = new Realtime();
|
||||
|
||||
$realtime->subscribe(
|
||||
'1',
|
||||
1,
|
||||
'sub-a',
|
||||
[Role::user(ID::custom('123'))->toString()],
|
||||
['documents'],
|
||||
);
|
||||
|
||||
$realtime->subscribe(
|
||||
'1',
|
||||
1,
|
||||
'sub-b',
|
||||
[Role::users()->toString()],
|
||||
['files'],
|
||||
);
|
||||
|
||||
$removed = $realtime->unsubscribeSubscription(1, 'sub-a');
|
||||
|
||||
$this->assertTrue($removed);
|
||||
$this->assertArrayHasKey(1, $realtime->connections);
|
||||
|
||||
// sub-a is fully cleaned from the tree
|
||||
$this->assertArrayNotHasKey(
|
||||
Role::user(ID::custom('123'))->toString(),
|
||||
$realtime->subscriptions['1']
|
||||
);
|
||||
|
||||
// sub-b still delivers
|
||||
$event = [
|
||||
'project' => '1',
|
||||
'roles' => [Role::users()->toString()],
|
||||
'data' => [
|
||||
'channels' => ['files'],
|
||||
],
|
||||
];
|
||||
$receivers = array_keys($realtime->getSubscribers($event));
|
||||
$this->assertEquals([1], $receivers);
|
||||
|
||||
// Channels recomputed: sub-a's channel is gone
|
||||
$this->assertSame(['files'], $realtime->connections[1]['channels']);
|
||||
|
||||
// Roles are connection-level auth context — union of both subscribe calls preserved
|
||||
$this->assertContains(Role::user(ID::custom('123'))->toString(), $realtime->connections[1]['roles']);
|
||||
$this->assertContains(Role::users()->toString(), $realtime->connections[1]['roles']);
|
||||
}
|
||||
|
||||
public function testUnsubscribeSubscriptionIsIdempotent(): void
|
||||
{
|
||||
$realtime = new Realtime();
|
||||
|
||||
$realtime->subscribe(
|
||||
'1',
|
||||
1,
|
||||
'sub-a',
|
||||
[Role::users()->toString()],
|
||||
['documents'],
|
||||
);
|
||||
|
||||
$this->assertFalse($realtime->unsubscribeSubscription(1, 'does-not-exist'));
|
||||
$this->assertFalse($realtime->unsubscribeSubscription(99, 'sub-a'));
|
||||
|
||||
// Original sub is untouched
|
||||
$event = [
|
||||
'project' => '1',
|
||||
'roles' => [Role::users()->toString()],
|
||||
'data' => [
|
||||
'channels' => ['documents'],
|
||||
],
|
||||
];
|
||||
$this->assertEquals([1], array_keys($realtime->getSubscribers($event)));
|
||||
}
|
||||
|
||||
public function testUnsubscribeSubscriptionKeepsConnectionWhenLastSubRemoved(): void
|
||||
{
|
||||
$realtime = new Realtime();
|
||||
|
||||
$realtime->subscribe(
|
||||
'1',
|
||||
1,
|
||||
'sub-a',
|
||||
[Role::users()->toString()],
|
||||
['documents'],
|
||||
);
|
||||
|
||||
$this->assertTrue($realtime->unsubscribeSubscription(1, 'sub-a'));
|
||||
|
||||
$this->assertArrayHasKey(1, $realtime->connections);
|
||||
$this->assertSame([], $realtime->connections[1]['channels']);
|
||||
// Roles preserved so a later resubscribe on the same connection still has auth context
|
||||
$this->assertSame([Role::users()->toString()], $realtime->connections[1]['roles']);
|
||||
$this->assertArrayNotHasKey('1', $realtime->subscriptions);
|
||||
}
|
||||
|
||||
public function testResubscribeAfterUnsubscribingLastSubDelivers(): void
|
||||
{
|
||||
$realtime = new Realtime();
|
||||
|
||||
$realtime->subscribe(
|
||||
'1',
|
||||
1,
|
||||
'sub-a',
|
||||
[Role::users()->toString()],
|
||||
['documents'],
|
||||
);
|
||||
|
||||
$this->assertTrue($realtime->unsubscribeSubscription(1, 'sub-a'));
|
||||
|
||||
// Simulate the message-based subscribe path reading stored roles
|
||||
$storedRoles = $realtime->connections[1]['roles'];
|
||||
$this->assertNotEmpty($storedRoles, 'connection roles must survive per-subscription removal');
|
||||
|
||||
$realtime->subscribe('1', 1, 'sub-b', $storedRoles, ['files']);
|
||||
|
||||
$event = [
|
||||
'project' => '1',
|
||||
'roles' => [Role::users()->toString()],
|
||||
'data' => [
|
||||
'channels' => ['files'],
|
||||
],
|
||||
];
|
||||
$this->assertEquals([1], array_keys($realtime->getSubscribers($event)));
|
||||
}
|
||||
|
||||
public function testSubscribeAfterOnOpenEmptySentinelPreservesUnion(): void
|
||||
{
|
||||
$realtime = new Realtime();
|
||||
|
||||
// Mirrors the onOpen empty-channels path: subscribe with '' id, empty channels
|
||||
$realtime->subscribe(
|
||||
'1',
|
||||
1,
|
||||
'',
|
||||
[Role::users()->toString()],
|
||||
[],
|
||||
[],
|
||||
'user-123',
|
||||
);
|
||||
|
||||
// Now a real subscription comes in via the subscribe message type
|
||||
$realtime->subscribe(
|
||||
'1',
|
||||
1,
|
||||
'sub-a',
|
||||
[Role::user(ID::custom('user-123'))->toString()],
|
||||
['documents'],
|
||||
);
|
||||
|
||||
$this->assertSame('user-123', $realtime->connections[1]['userId']);
|
||||
$this->assertContains('documents', $realtime->connections[1]['channels']);
|
||||
$this->assertContains(Role::users()->toString(), $realtime->connections[1]['roles']);
|
||||
$this->assertContains(Role::user(ID::custom('user-123'))->toString(), $realtime->connections[1]['roles']);
|
||||
}
|
||||
|
||||
public function testConvertChannelsGuest(): void
|
||||
{
|
||||
$user = new Document([
|
||||
|
|
|
|||
Loading…
Reference in a new issue