mirror of
https://github.com/appwrite/appwrite
synced 2026-05-23 08:58:35 +00:00
commit
7f4b5bc8d5
1 changed files with 61 additions and 60 deletions
|
|
@ -86,7 +86,7 @@ class Messaging extends Action
|
|||
$payload = $message->getPayload() ?? [];
|
||||
|
||||
if (empty($payload)) {
|
||||
throw new Exception('Missing payload');
|
||||
throw new \Exception('Missing payload');
|
||||
}
|
||||
|
||||
$type = $payload['type'] ?? '';
|
||||
|
|
@ -105,7 +105,7 @@ class Messaging extends Action
|
|||
$this->sendExternalMessage($dbForProject, $message, $deviceForFiles, $deviceForLocalFiles,);
|
||||
break;
|
||||
default:
|
||||
throw new Exception('Unknown message type: ' . $type);
|
||||
throw new \Exception('Unknown message type: ' . $type);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -118,11 +118,12 @@ class Messaging extends Action
|
|||
$topicIds = $message->getAttribute('topics', []);
|
||||
$targetIds = $message->getAttribute('targets', []);
|
||||
$userIds = $message->getAttribute('users', []);
|
||||
$providerType = $message->getAttribute('providerType');
|
||||
|
||||
/**
|
||||
* @var array<Document> $recipients
|
||||
* @var array<Document> $allTargets
|
||||
*/
|
||||
$recipients = [];
|
||||
$allTargets = [];
|
||||
|
||||
if (\count($topicIds) > 0) {
|
||||
$topics = $dbForProject->find('topics', [
|
||||
|
|
@ -130,9 +131,11 @@ class Messaging extends Action
|
|||
Query::limit(\count($topicIds)),
|
||||
]);
|
||||
foreach ($topics as $topic) {
|
||||
$targets = \array_filter($topic->getAttribute('targets'), fn(Document $target) =>
|
||||
$target->getAttribute('providerType') === $message->getAttribute('providerType'));
|
||||
$recipients = \array_merge($recipients, $targets);
|
||||
$targets = \array_filter($topic->getAttribute('targets'), function (Document $target) use ($providerType) {
|
||||
return $target->getAttribute('providerType') === $providerType;
|
||||
});
|
||||
|
||||
\array_push($allTargets, ...$targets);
|
||||
}
|
||||
}
|
||||
|
||||
|
|
@ -142,23 +145,25 @@ class Messaging extends Action
|
|||
Query::limit(\count($userIds)),
|
||||
]);
|
||||
foreach ($users as $user) {
|
||||
$targets = \array_filter($user->getAttribute('targets'), fn(Document $target) =>
|
||||
$target->getAttribute('providerType') === $message->getAttribute('providerType'));
|
||||
$recipients = \array_merge($recipients, $targets);
|
||||
$targets = \array_filter($user->getAttribute('targets'), function (Document $target) use ($providerType) {
|
||||
return $target->getAttribute('providerType') === $providerType;
|
||||
});
|
||||
|
||||
\array_push($allTargets, ...$targets);
|
||||
}
|
||||
}
|
||||
|
||||
if (\count($targetIds) > 0) {
|
||||
$targets = $dbForProject->find('targets', [
|
||||
Query::equal('$id', $targetIds),
|
||||
Query::equal('providerType', [$providerType]),
|
||||
Query::limit(\count($targetIds)),
|
||||
]);
|
||||
$targets = \array_filter($targets, fn(Document $target) =>
|
||||
$target->getAttribute('providerType') === $message->getAttribute('providerType'));
|
||||
$recipients = \array_merge($recipients, $targets);
|
||||
|
||||
\array_push($allTargets, ...$targets);
|
||||
}
|
||||
|
||||
if (empty($recipients)) {
|
||||
if (empty($allTargets)) {
|
||||
$dbForProject->updateDocument('messages', $message->getId(), $message->setAttributes([
|
||||
'status' => MessageStatus::FAILED,
|
||||
'deliveryErrors' => ['No valid recipients found.']
|
||||
|
|
@ -168,85 +173,82 @@ class Messaging extends Action
|
|||
return;
|
||||
}
|
||||
|
||||
$fallback = $dbForProject->findOne('providers', [
|
||||
$default = $dbForProject->findOne('providers', [
|
||||
Query::equal('enabled', [true]),
|
||||
Query::equal('type', [$recipients[0]->getAttribute('providerType')]),
|
||||
Query::equal('type', [$providerType]),
|
||||
]);
|
||||
|
||||
if ($fallback === false || $fallback->isEmpty()) {
|
||||
if ($default === false || $default->isEmpty()) {
|
||||
$dbForProject->updateDocument('messages', $message->getId(), $message->setAttributes([
|
||||
'status' => MessageStatus::FAILED,
|
||||
'deliveryErrors' => ['No fallback provider found.']
|
||||
'deliveryErrors' => ['No enabled provider found.']
|
||||
]));
|
||||
|
||||
Console::warning('No fallback provider found.');
|
||||
Console::warning('No enabled provider found.');
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* @var array<string, array<string>> $identifiers
|
||||
* @var array<string, array<string, null>> $identifiers
|
||||
*/
|
||||
$identifiers = [];
|
||||
|
||||
/**
|
||||
* @var Document[] $providers
|
||||
* @var array<Document> $providers
|
||||
*/
|
||||
$providers = [
|
||||
$fallback->getId() => $fallback
|
||||
$default->getId() => $default
|
||||
];
|
||||
|
||||
foreach ($recipients as $recipient) {
|
||||
$providerId = $recipient->getAttribute('providerId');
|
||||
foreach ($allTargets as $target) {
|
||||
$providerId = $target->getAttribute('providerId');
|
||||
|
||||
if (
|
||||
!$providerId
|
||||
&& $fallback instanceof Document
|
||||
&& !$fallback->isEmpty()
|
||||
&& $fallback->getAttribute('enabled')
|
||||
) {
|
||||
$providerId = $fallback->getId();
|
||||
if (!$providerId) {
|
||||
$providerId = $default->getId();
|
||||
}
|
||||
|
||||
if ($providerId) {
|
||||
if (!\array_key_exists($providerId, $identifiers)) {
|
||||
$identifiers[$providerId] = [];
|
||||
}
|
||||
$identifiers[$providerId][] = $recipient->getAttribute('identifier');
|
||||
// Use null as value to avoid duplicate keys
|
||||
$identifiers[$providerId][$target->getAttribute('identifier')] = null;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @var array<array> $results
|
||||
*/
|
||||
$results = batch(\array_map(function ($providerId) use ($identifiers, $providers, $fallback, $message, $dbForProject, $deviceForFiles, $deviceForLocalFiles) {
|
||||
return function () use ($providerId, $identifiers, $providers, $fallback, $message, $dbForProject, $deviceForFiles, $deviceForLocalFiles) {
|
||||
$results = batch(\array_map(function ($providerId) use ($identifiers, &$providers, $default, $message, $dbForProject, $deviceForFiles, $deviceForLocalFiles) {
|
||||
return function () use ($providerId, $identifiers, &$providers, $default, $message, $dbForProject, $deviceForFiles, $deviceForLocalFiles) {
|
||||
if (\array_key_exists($providerId, $providers)) {
|
||||
$provider = $providers[$providerId];
|
||||
} else {
|
||||
$provider = $dbForProject->getDocument('providers', $providerId);
|
||||
|
||||
if ($provider->isEmpty() || !$provider->getAttribute('enabled')) {
|
||||
$provider = $fallback;
|
||||
$provider = $default;
|
||||
} else {
|
||||
$providers[$providerId] = $provider;
|
||||
}
|
||||
}
|
||||
|
||||
$identifiers = $identifiers[$providerId];
|
||||
$identifiersForProvider = $identifiers[$providerId];
|
||||
|
||||
$adapter = match ($provider->getAttribute('type')) {
|
||||
MESSAGE_TYPE_SMS => $this->getSmsAdapter($provider),
|
||||
MESSAGE_TYPE_PUSH => $this->getPushAdapter($provider),
|
||||
MESSAGE_TYPE_EMAIL => $this->getEmailAdapter($provider),
|
||||
default => throw new Exception(Exception::PROVIDER_INCORRECT_TYPE)
|
||||
default => throw new \Exception('Provider with the requested ID is of the incorrect type')
|
||||
};
|
||||
|
||||
$maxBatchSize = $adapter->getMaxMessagesPerRequest();
|
||||
$batches = \array_chunk($identifiers, $maxBatchSize);
|
||||
$batchIndex = 0;
|
||||
$batches = \array_chunk(
|
||||
\array_keys($identifiersForProvider),
|
||||
$adapter->getMaxMessagesPerRequest()
|
||||
);
|
||||
|
||||
return batch(\array_map(function ($batch) use ($message, $provider, $adapter, &$batchIndex, $dbForProject, $deviceForFiles, $deviceForLocalFiles) {
|
||||
return function () use ($batch, $message, $provider, $adapter, &$batchIndex, $dbForProject, $deviceForFiles, $deviceForLocalFiles) {
|
||||
return batch(\array_map(function ($batch) use ($message, $provider, $adapter, $dbForProject, $deviceForFiles, $deviceForLocalFiles) {
|
||||
return function () use ($batch, $message, $provider, $adapter, $dbForProject, $deviceForFiles, $deviceForLocalFiles) {
|
||||
$deliveredTotal = 0;
|
||||
$deliveryErrors = [];
|
||||
$messageData = clone $message;
|
||||
|
|
@ -256,7 +258,7 @@ class Messaging extends Action
|
|||
MESSAGE_TYPE_SMS => $this->buildSmsMessage($messageData, $provider),
|
||||
MESSAGE_TYPE_PUSH => $this->buildPushMessage($messageData),
|
||||
MESSAGE_TYPE_EMAIL => $this->buildEmailMessage($dbForProject, $messageData, $provider, $deviceForFiles, $deviceForLocalFiles),
|
||||
default => throw new Exception(Exception::PROVIDER_INCORRECT_TYPE)
|
||||
default => throw new \Exception('Provider with the requested ID is of the incorrect type')
|
||||
};
|
||||
|
||||
try {
|
||||
|
|
@ -283,10 +285,8 @@ class Messaging extends Action
|
|||
}
|
||||
}
|
||||
} catch (\Throwable $e) {
|
||||
$deliveryErrors[] = 'Failed sending to targets ' . $batchIndex + 1 . ' of ' . \count($batch) . ' with error: ' . $e->getMessage();
|
||||
$deliveryErrors[] = 'Failed sending to targets with error: ' . $e->getMessage();
|
||||
} finally {
|
||||
$batchIndex++;
|
||||
|
||||
return [
|
||||
'deliveredTotal' => $deliveredTotal,
|
||||
'deliveryErrors' => $deliveryErrors,
|
||||
|
|
@ -297,7 +297,7 @@ class Messaging extends Action
|
|||
};
|
||||
}, \array_keys($identifiers)));
|
||||
|
||||
$results = array_merge(...$results);
|
||||
$results = \array_merge(...$results);
|
||||
|
||||
$deliveredTotal = 0;
|
||||
$deliveryErrors = [];
|
||||
|
|
@ -330,7 +330,7 @@ class Messaging extends Action
|
|||
|
||||
$dbForProject->updateDocument('messages', $message->getId(), $message);
|
||||
|
||||
// Delete any attachments that were downloaded to the local cache
|
||||
// Delete any attachments that were downloaded to local storage
|
||||
if ($provider->getAttribute('type') === MESSAGE_TYPE_EMAIL) {
|
||||
if ($deviceForFiles->getType() === Storage::DEVICE_LOCAL) {
|
||||
return;
|
||||
|
|
@ -345,12 +345,12 @@ class Messaging extends Action
|
|||
|
||||
$bucket = $dbForProject->getDocument('buckets', $bucketId);
|
||||
if ($bucket->isEmpty()) {
|
||||
throw new Exception(Exception::STORAGE_BUCKET_NOT_FOUND);
|
||||
throw new \Exception('Storage bucket with the requested ID could not be found');
|
||||
}
|
||||
|
||||
$file = $dbForProject->getDocument('bucket_' . $bucket->getInternalId(), $fileId);
|
||||
if ($file->isEmpty()) {
|
||||
throw new Exception(Exception::STORAGE_FILE_NOT_FOUND);
|
||||
throw new \Exception('Storage file with the requested ID could not be found');
|
||||
}
|
||||
|
||||
$path = $file->getAttribute('path', '');
|
||||
|
|
@ -369,7 +369,7 @@ class Messaging extends Action
|
|||
}
|
||||
|
||||
if ($project->isEmpty()) {
|
||||
throw new Exception('Project not set in payload');
|
||||
throw new \Exception('Project not set in payload');
|
||||
}
|
||||
|
||||
Console::log('Project: ' . $project->getId());
|
||||
|
|
@ -427,12 +427,13 @@ class Messaging extends Action
|
|||
|
||||
$adapter = $this->getSmsAdapter($provider);
|
||||
|
||||
$maxBatchSize = $adapter->getMaxMessagesPerRequest();
|
||||
$batches = \array_chunk($recipients, $maxBatchSize);
|
||||
$batchIndex = 0;
|
||||
$batches = \array_chunk(
|
||||
$recipients,
|
||||
$adapter->getMaxMessagesPerRequest()
|
||||
);
|
||||
|
||||
batch(\array_map(function ($batch) use ($message, $provider, $adapter, $batchIndex, $project, $queueForUsage) {
|
||||
return function () use ($batch, $message, $provider, $adapter, $batchIndex, $project, $queueForUsage) {
|
||||
batch(\array_map(function ($batch) use ($message, $provider, $adapter, $project, $queueForUsage) {
|
||||
return function () use ($batch, $message, $provider, $adapter, $project, $queueForUsage) {
|
||||
$message->setAttribute('to', $batch);
|
||||
|
||||
$data = $this->buildSmsMessage($message, $provider);
|
||||
|
|
@ -445,7 +446,7 @@ class Messaging extends Action
|
|||
->addMetric(METRIC_MESSAGES, 1)
|
||||
->trigger();
|
||||
} catch (\Throwable $e) {
|
||||
throw new Exception('Failed sending to targets ' . $batchIndex + 1 . '-' . \count($batch) . ' with error: ' . $e->getMessage(), 500);
|
||||
throw new \Exception('Failed sending to targets with error: ' . $e->getMessage());
|
||||
}
|
||||
};
|
||||
}, $batches));
|
||||
|
|
@ -556,19 +557,19 @@ class Messaging extends Action
|
|||
|
||||
$bucket = $dbForProject->getDocument('buckets', $bucketId);
|
||||
if ($bucket->isEmpty()) {
|
||||
throw new Exception(Exception::STORAGE_BUCKET_NOT_FOUND);
|
||||
throw new \Exception('Storage bucket with the requested ID could not be found');
|
||||
}
|
||||
|
||||
$file = $dbForProject->getDocument('bucket_' . $bucket->getInternalId(), $fileId);
|
||||
if ($file->isEmpty()) {
|
||||
throw new Exception(Exception::STORAGE_FILE_NOT_FOUND);
|
||||
throw new \Exception('Storage file with the requested ID could not be found');
|
||||
}
|
||||
|
||||
$mimes = Config::getParam('storage-mimes');
|
||||
$path = $file->getAttribute('path', '');
|
||||
|
||||
if (!$deviceForFiles->exists($path)) {
|
||||
throw new Exception(Exception::STORAGE_FILE_NOT_FOUND, 'File not found in ' . $path);
|
||||
throw new \Exception('File not found in ' . $path);
|
||||
}
|
||||
|
||||
$contentType = 'text/plain';
|
||||
|
|
|
|||
Loading…
Reference in a new issue