mirror of
https://github.com/appwrite/appwrite
synced 2026-05-06 06:48:22 +00:00
commit
d45893d123
4 changed files with 78 additions and 64 deletions
|
|
@ -31,9 +31,9 @@ class Realtime extends MessagingAdapter
|
|||
* [ROLE_X] ->
|
||||
* [CHANNEL_NAME_X] ->
|
||||
* [CONNECTION_ID] ->
|
||||
* [SUB_ID] -> [query1, query2, ...] // Subscription with queries (AND logic)
|
||||
* [SUB_ID] -> ['strings' => [...], 'parsed' => [...]]
|
||||
*
|
||||
* Each subscription ID maps to an array of query strings.
|
||||
* Each subscription ID maps to query strings (for metadata) and pre-parsed Query objects (for filtering).
|
||||
* Within a subscription: AND logic (all queries must match)
|
||||
* Across subscriptions: OR logic (any subscription matching = send event)
|
||||
*/
|
||||
|
|
@ -64,18 +64,27 @@ class Realtime extends MessagingAdapter
|
|||
$this->subscriptions[$projectId] = [];
|
||||
}
|
||||
|
||||
// Convert Query objects to strings for this subscription
|
||||
// Convert Query objects to strings and store both for this subscription
|
||||
$queryStrings = [];
|
||||
$parsedQueries = [];
|
||||
if (empty($queryGroup)) {
|
||||
// No queries means "listen to all events" - use select("*")
|
||||
$queryStrings[] = Query::select(['*'])->toString();
|
||||
$selectAll = Query::select(['*']);
|
||||
$queryStrings[] = $selectAll->toString();
|
||||
$parsedQueries[] = $selectAll;
|
||||
} else {
|
||||
foreach ($queryGroup as $query) {
|
||||
/** @var Query $query */
|
||||
$queryStrings[] = $query->toString();
|
||||
$parsedQueries[] = $query;
|
||||
}
|
||||
}
|
||||
|
||||
$subscriptionData = [
|
||||
'strings' => $queryStrings,
|
||||
'parsed' => $parsedQueries,
|
||||
];
|
||||
|
||||
foreach ($roles as $role) {
|
||||
if (!isset($this->subscriptions[$projectId][$role])) {
|
||||
$this->subscriptions[$projectId][$role] = [];
|
||||
|
|
@ -88,8 +97,7 @@ class Realtime extends MessagingAdapter
|
|||
if (!isset($this->subscriptions[$projectId][$role][$channel][$identifier])) {
|
||||
$this->subscriptions[$projectId][$role][$channel][$identifier] = [];
|
||||
}
|
||||
// Store subscription under subscription ID
|
||||
$this->subscriptions[$projectId][$role][$channel][$identifier][$subscriptionId] = $queryStrings;
|
||||
$this->subscriptions[$projectId][$role][$channel][$identifier][$subscriptionId] = $subscriptionData;
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -131,14 +139,14 @@ class Realtime extends MessagingAdapter
|
|||
continue;
|
||||
}
|
||||
|
||||
foreach ($this->subscriptions[$projectId][$role][$channel][$connection] as $subId => $queryStrings) {
|
||||
foreach ($this->subscriptions[$projectId][$role][$channel][$connection] as $subId => $subscriptionData) {
|
||||
if (!isset($subscriptions[$subId])) {
|
||||
$subscriptions[$subId] = [
|
||||
'channels' => [],
|
||||
'queries' => $queryStrings
|
||||
'queries' => $subscriptionData['strings'] ?? []
|
||||
];
|
||||
}
|
||||
if (!in_array($channel, $subscriptions[$subId]['channels'])) {
|
||||
if (!\in_array($channel, $subscriptions[$subId]['channels'])) {
|
||||
$subscriptions[$subId]['channels'][] = $channel;
|
||||
}
|
||||
}
|
||||
|
|
@ -282,15 +290,14 @@ class Realtime extends MessagingAdapter
|
|||
$matchedSubscriptions = [];
|
||||
|
||||
// Process each subscription (OR logic across subscriptions)
|
||||
foreach ($subscriptions as $subId => $queryStrings) {
|
||||
$parsedQueries = [];
|
||||
foreach ($queryStrings as $queryString) {
|
||||
$parsed = Query::parseQueries([$queryString]);
|
||||
$parsedQueries = array_merge($parsedQueries, $parsed);
|
||||
}
|
||||
foreach ($subscriptions as $subId => $subscriptionData) {
|
||||
// Use pre-parsed queries instead of re-parsing on every event
|
||||
$parsedQueries = $subscriptionData['parsed'] ?? [];
|
||||
$queryStrings = $subscriptionData['strings'] ?? [];
|
||||
|
||||
// Check if this subscription matches (AND logic within subscription)
|
||||
// Or if empty payload and select all as filter will return empty payload out of it even if it passed
|
||||
$isEmptyPayloadAndSelectAll = RuntimeQuery::isSelectAll($parsedQueries[0]) && empty($payload);
|
||||
$isEmptyPayloadAndSelectAll = !empty($parsedQueries) && RuntimeQuery::isSelectAll($parsedQueries[0]) && empty($payload);
|
||||
if ($isEmptyPayloadAndSelectAll || !empty(RuntimeQuery::filter($parsedQueries, $payload))) {
|
||||
$matchedSubscriptions[$subId] = $queryStrings;
|
||||
}
|
||||
|
|
@ -301,7 +308,7 @@ class Realtime extends MessagingAdapter
|
|||
if (!isset($receivers[$id])) {
|
||||
$receivers[$id] = [];
|
||||
}
|
||||
$receivers[$id] = array_merge($receivers[$id], $matchedSubscriptions);
|
||||
$receivers[$id] += $matchedSubscriptions;
|
||||
}
|
||||
}
|
||||
break;
|
||||
|
|
@ -433,7 +440,7 @@ class Realtime extends MessagingAdapter
|
|||
}
|
||||
|
||||
if (in_array($method, [Query::TYPE_AND, Query::TYPE_OR], true)) {
|
||||
$stack = array_merge($stack, $query->getValues());
|
||||
\array_push($stack, ...$query->getValues());
|
||||
}
|
||||
}
|
||||
|
||||
|
|
|
|||
|
|
@ -293,8 +293,8 @@ abstract class Action extends DatabasesAction
|
|||
array &$collectionsCache,
|
||||
Authorization $authorization,
|
||||
?int &$operations = null,
|
||||
int $depth = 0,
|
||||
): bool {
|
||||
|
||||
if ($operations !== null && $document->isEmpty()) {
|
||||
return false;
|
||||
}
|
||||
|
|
@ -308,6 +308,11 @@ abstract class Action extends DatabasesAction
|
|||
$document->setAttribute('$databaseId', $database->getId());
|
||||
$document->setAttribute('$' . $this->getCollectionsEventsContext() . 'Id', $collectionId);
|
||||
|
||||
// Stop processing relationships if max depth reached
|
||||
if ($depth >= Database::RELATION_MAX_DEPTH) {
|
||||
return true;
|
||||
}
|
||||
|
||||
$relationships = $collectionsCache[$collectionId] ??= \array_filter(
|
||||
$collection->getAttribute('attributes', []),
|
||||
fn ($attr) => $attr->getAttribute('type') === Database::VAR_RELATIONSHIP
|
||||
|
|
@ -354,8 +359,9 @@ abstract class Action extends DatabasesAction
|
|||
document: $relation,
|
||||
dbForProject: $dbForProject,
|
||||
collectionsCache: $collectionsCache,
|
||||
authorization: $authorization,
|
||||
operations: $operations,
|
||||
authorization: $authorization
|
||||
depth: $depth + 1
|
||||
);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -538,13 +538,11 @@ class StatsUsage extends Action
|
|||
$this->statDocuments
|
||||
);
|
||||
Console::success('Usage logs pushed to Logs DB');
|
||||
|
||||
/**
|
||||
* todo: Do we need to unset $this->statDocuments?
|
||||
*/
|
||||
|
||||
} catch (Throwable $th) {
|
||||
Console::error($th->getMessage());
|
||||
} finally {
|
||||
// Clear statDocuments to prevent memory accumulation across batches
|
||||
$this->statDocuments = [];
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -104,48 +104,51 @@ class Webhooks extends Action
|
|||
$httpPass = $webhook->getAttribute('httpPass');
|
||||
$ch = \curl_init($webhook->getAttribute('url'));
|
||||
|
||||
\curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST');
|
||||
\curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
|
||||
\curl_setopt($ch, CURLOPT_HEADER, 0);
|
||||
\curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
|
||||
\curl_setopt($ch, CURLOPT_TIMEOUT, 15);
|
||||
\curl_setopt($ch, CURLOPT_MAXFILESIZE, self::MAX_FILE_SIZE);
|
||||
\curl_setopt($ch, CURLOPT_USERAGENT, \sprintf(
|
||||
APP_USERAGENT,
|
||||
System::getEnv('_APP_VERSION', 'UNKNOWN'),
|
||||
System::getEnv('_APP_EMAIL_SECURITY', System::getEnv('_APP_SYSTEM_SECURITY_EMAIL_ADDRESS', APP_EMAIL_SECURITY))
|
||||
));
|
||||
\curl_setopt(
|
||||
$ch,
|
||||
CURLOPT_HTTPHEADER,
|
||||
[
|
||||
'Content-Type: application/json',
|
||||
'Content-Length: ' . \strlen($payload),
|
||||
'X-' . APP_NAME . '-Webhook-Id: ' . $webhook->getId(),
|
||||
'X-' . APP_NAME . '-Webhook-Events: ' . implode(',', $events),
|
||||
'X-' . APP_NAME . '-Webhook-Name: ' . $webhook->getAttribute('name', ''),
|
||||
'X-' . APP_NAME . '-Webhook-User-Id: ' . $user->getId(),
|
||||
'X-' . APP_NAME . '-Webhook-Project-Id: ' . $project->getId(),
|
||||
'X-' . APP_NAME . '-Webhook-Signature: ' . $signature,
|
||||
]
|
||||
);
|
||||
curl_setopt($ch, CURLOPT_MAXREDIRS, 5);
|
||||
try {
|
||||
\curl_setopt($ch, CURLOPT_CUSTOMREQUEST, 'POST');
|
||||
\curl_setopt($ch, CURLOPT_POSTFIELDS, $payload);
|
||||
\curl_setopt($ch, CURLOPT_HEADER, 0);
|
||||
\curl_setopt($ch, CURLOPT_RETURNTRANSFER, 1);
|
||||
\curl_setopt($ch, CURLOPT_TIMEOUT, 15);
|
||||
\curl_setopt($ch, CURLOPT_MAXFILESIZE, self::MAX_FILE_SIZE);
|
||||
\curl_setopt($ch, CURLOPT_USERAGENT, \sprintf(
|
||||
APP_USERAGENT,
|
||||
System::getEnv('_APP_VERSION', 'UNKNOWN'),
|
||||
System::getEnv('_APP_EMAIL_SECURITY', System::getEnv('_APP_SYSTEM_SECURITY_EMAIL_ADDRESS', APP_EMAIL_SECURITY))
|
||||
));
|
||||
\curl_setopt(
|
||||
$ch,
|
||||
CURLOPT_HTTPHEADER,
|
||||
[
|
||||
'Content-Type: application/json',
|
||||
'Content-Length: ' . \strlen($payload),
|
||||
'X-' . APP_NAME . '-Webhook-Id: ' . $webhook->getId(),
|
||||
'X-' . APP_NAME . '-Webhook-Events: ' . implode(',', $events),
|
||||
'X-' . APP_NAME . '-Webhook-Name: ' . $webhook->getAttribute('name', ''),
|
||||
'X-' . APP_NAME . '-Webhook-User-Id: ' . $user->getId(),
|
||||
'X-' . APP_NAME . '-Webhook-Project-Id: ' . $project->getId(),
|
||||
'X-' . APP_NAME . '-Webhook-Signature: ' . $signature,
|
||||
]
|
||||
);
|
||||
\curl_setopt($ch, CURLOPT_MAXREDIRS, 5);
|
||||
|
||||
if (!$webhook->getAttribute('security', true)) {
|
||||
\curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
|
||||
\curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
|
||||
if (!$webhook->getAttribute('security', true)) {
|
||||
\curl_setopt($ch, CURLOPT_SSL_VERIFYHOST, false);
|
||||
\curl_setopt($ch, CURLOPT_SSL_VERIFYPEER, false);
|
||||
}
|
||||
|
||||
if (!empty($httpUser) && !empty($httpPass)) {
|
||||
\curl_setopt($ch, CURLOPT_USERPWD, "$httpUser:$httpPass");
|
||||
\curl_setopt($ch, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
|
||||
}
|
||||
|
||||
$responseBody = \curl_exec($ch);
|
||||
$curlError = \curl_error($ch);
|
||||
$statusCode = \curl_getinfo($ch, CURLINFO_RESPONSE_CODE);
|
||||
} finally {
|
||||
\curl_close($ch);
|
||||
}
|
||||
|
||||
if (!empty($httpUser) && !empty($httpPass)) {
|
||||
\curl_setopt($ch, CURLOPT_USERPWD, "$httpUser:$httpPass");
|
||||
\curl_setopt($ch, CURLOPT_HTTPAUTH, CURLAUTH_BASIC);
|
||||
}
|
||||
|
||||
$responseBody = \curl_exec($ch);
|
||||
$curlError = \curl_error($ch);
|
||||
$statusCode = curl_getinfo($ch, CURLINFO_RESPONSE_CODE);
|
||||
\curl_close($ch);
|
||||
|
||||
if (!empty($curlError) || $statusCode >= 400) {
|
||||
$dbForPlatform->increaseDocumentAttribute('webhooks', $webhook->getId(), 'attempts', 1);
|
||||
$webhook = $dbForPlatform->getDocument('webhooks', $webhook->getId());
|
||||
|
|
|
|||
Loading…
Reference in a new issue