diff --git a/src/Appwrite/Platform/Workers/Messaging.php b/src/Appwrite/Platform/Workers/Messaging.php index aacd49f94a..ab3a9a41b1 100644 --- a/src/Appwrite/Platform/Workers/Messaging.php +++ b/src/Appwrite/Platform/Workers/Messaging.php @@ -86,7 +86,7 @@ class Messaging extends Action $payload = $message->getPayload() ?? []; if (empty($payload)) { - throw new Exception('Missing payload'); + throw new \Exception('Missing payload'); } $type = $payload['type'] ?? ''; @@ -105,7 +105,7 @@ class Messaging extends Action $this->sendExternalMessage($dbForProject, $message, $deviceForFiles, $deviceForLocalFiles,); break; default: - throw new Exception('Unknown message type: ' . $type); + throw new \Exception('Unknown message type: ' . $type); } } @@ -118,11 +118,12 @@ class Messaging extends Action $topicIds = $message->getAttribute('topics', []); $targetIds = $message->getAttribute('targets', []); $userIds = $message->getAttribute('users', []); + $providerType = $message->getAttribute('providerType'); /** - * @var array $recipients + * @var array $allTargets */ - $recipients = []; + $allTargets = []; if (\count($topicIds) > 0) { $topics = $dbForProject->find('topics', [ @@ -130,9 +131,11 @@ class Messaging extends Action Query::limit(\count($topicIds)), ]); foreach ($topics as $topic) { - $targets = \array_filter($topic->getAttribute('targets'), fn(Document $target) => - $target->getAttribute('providerType') === $message->getAttribute('providerType')); - $recipients = \array_merge($recipients, $targets); + $targets = \array_filter($topic->getAttribute('targets'), function (Document $target) use ($providerType) { + return $target->getAttribute('providerType') === $providerType; + }); + + \array_push($allTargets, ...$targets); } } @@ -142,23 +145,25 @@ class Messaging extends Action Query::limit(\count($userIds)), ]); foreach ($users as $user) { - $targets = \array_filter($user->getAttribute('targets'), fn(Document $target) => - $target->getAttribute('providerType') === $message->getAttribute('providerType')); - $recipients = \array_merge($recipients, $targets); + $targets = \array_filter($user->getAttribute('targets'), function (Document $target) use ($providerType) { + return $target->getAttribute('providerType') === $providerType; + }); + + \array_push($allTargets, ...$targets); } } if (\count($targetIds) > 0) { $targets = $dbForProject->find('targets', [ Query::equal('$id', $targetIds), + Query::equal('providerType', [$providerType]), Query::limit(\count($targetIds)), ]); - $targets = \array_filter($targets, fn(Document $target) => - $target->getAttribute('providerType') === $message->getAttribute('providerType')); - $recipients = \array_merge($recipients, $targets); + + \array_push($allTargets, ...$targets); } - if (empty($recipients)) { + if (empty($allTargets)) { $dbForProject->updateDocument('messages', $message->getId(), $message->setAttributes([ 'status' => MessageStatus::FAILED, 'deliveryErrors' => ['No valid recipients found.'] @@ -168,85 +173,82 @@ class Messaging extends Action return; } - $fallback = $dbForProject->findOne('providers', [ + $default = $dbForProject->findOne('providers', [ Query::equal('enabled', [true]), - Query::equal('type', [$recipients[0]->getAttribute('providerType')]), + Query::equal('type', [$providerType]), ]); - if ($fallback === false || $fallback->isEmpty()) { + if ($default === false || $default->isEmpty()) { $dbForProject->updateDocument('messages', $message->getId(), $message->setAttributes([ 'status' => MessageStatus::FAILED, - 'deliveryErrors' => ['No fallback provider found.'] + 'deliveryErrors' => ['No enabled provider found.'] ])); - Console::warning('No fallback provider found.'); + Console::warning('No enabled provider found.'); return; } /** - * @var array> $identifiers + * @var array> $identifiers */ $identifiers = []; /** - * @var Document[] $providers + * @var array $providers */ $providers = [ - $fallback->getId() => $fallback + $default->getId() => $default ]; - foreach ($recipients as $recipient) { - $providerId = $recipient->getAttribute('providerId'); + foreach ($allTargets as $target) { + $providerId = $target->getAttribute('providerId'); - if ( - !$providerId - && $fallback instanceof Document - && !$fallback->isEmpty() - && $fallback->getAttribute('enabled') - ) { - $providerId = $fallback->getId(); + if (!$providerId) { + $providerId = $default->getId(); } if ($providerId) { if (!\array_key_exists($providerId, $identifiers)) { $identifiers[$providerId] = []; } - $identifiers[$providerId][] = $recipient->getAttribute('identifier'); + // Use null as value to avoid duplicate keys + $identifiers[$providerId][$target->getAttribute('identifier')] = null; } } /** * @var array $results */ - $results = batch(\array_map(function ($providerId) use ($identifiers, $providers, $fallback, $message, $dbForProject, $deviceForFiles, $deviceForLocalFiles) { - return function () use ($providerId, $identifiers, $providers, $fallback, $message, $dbForProject, $deviceForFiles, $deviceForLocalFiles) { + $results = batch(\array_map(function ($providerId) use ($identifiers, &$providers, $default, $message, $dbForProject, $deviceForFiles, $deviceForLocalFiles) { + return function () use ($providerId, $identifiers, &$providers, $default, $message, $dbForProject, $deviceForFiles, $deviceForLocalFiles) { if (\array_key_exists($providerId, $providers)) { $provider = $providers[$providerId]; } else { $provider = $dbForProject->getDocument('providers', $providerId); if ($provider->isEmpty() || !$provider->getAttribute('enabled')) { - $provider = $fallback; + $provider = $default; } else { $providers[$providerId] = $provider; } } - $identifiers = $identifiers[$providerId]; + $identifiersForProvider = $identifiers[$providerId]; $adapter = match ($provider->getAttribute('type')) { MESSAGE_TYPE_SMS => $this->getSmsAdapter($provider), MESSAGE_TYPE_PUSH => $this->getPushAdapter($provider), MESSAGE_TYPE_EMAIL => $this->getEmailAdapter($provider), - default => throw new Exception(Exception::PROVIDER_INCORRECT_TYPE) + default => throw new \Exception('Provider with the requested ID is of the incorrect type') }; - $maxBatchSize = $adapter->getMaxMessagesPerRequest(); - $batches = \array_chunk($identifiers, $maxBatchSize); - $batchIndex = 0; + $batches = \array_chunk( + \array_keys($identifiersForProvider), + $adapter->getMaxMessagesPerRequest() + ); - return batch(\array_map(function ($batch) use ($message, $provider, $adapter, &$batchIndex, $dbForProject, $deviceForFiles, $deviceForLocalFiles) { - return function () use ($batch, $message, $provider, $adapter, &$batchIndex, $dbForProject, $deviceForFiles, $deviceForLocalFiles) { + return batch(\array_map(function ($batch) use ($message, $provider, $adapter, $dbForProject, $deviceForFiles, $deviceForLocalFiles) { + return function () use ($batch, $message, $provider, $adapter, $dbForProject, $deviceForFiles, $deviceForLocalFiles) { $deliveredTotal = 0; $deliveryErrors = []; $messageData = clone $message; @@ -256,7 +258,7 @@ class Messaging extends Action MESSAGE_TYPE_SMS => $this->buildSmsMessage($messageData, $provider), MESSAGE_TYPE_PUSH => $this->buildPushMessage($messageData), MESSAGE_TYPE_EMAIL => $this->buildEmailMessage($dbForProject, $messageData, $provider, $deviceForFiles, $deviceForLocalFiles), - default => throw new Exception(Exception::PROVIDER_INCORRECT_TYPE) + default => throw new \Exception('Provider with the requested ID is of the incorrect type') }; try { @@ -283,10 +285,8 @@ class Messaging extends Action } } } catch (\Throwable $e) { - $deliveryErrors[] = 'Failed sending to targets ' . $batchIndex + 1 . ' of ' . \count($batch) . ' with error: ' . $e->getMessage(); + $deliveryErrors[] = 'Failed sending to targets with error: ' . $e->getMessage(); } finally { - $batchIndex++; - return [ 'deliveredTotal' => $deliveredTotal, 'deliveryErrors' => $deliveryErrors, @@ -297,7 +297,7 @@ class Messaging extends Action }; }, \array_keys($identifiers))); - $results = array_merge(...$results); + $results = \array_merge(...$results); $deliveredTotal = 0; $deliveryErrors = []; @@ -330,7 +330,7 @@ class Messaging extends Action $dbForProject->updateDocument('messages', $message->getId(), $message); - // Delete any attachments that were downloaded to the local cache + // Delete any attachments that were downloaded to local storage if ($provider->getAttribute('type') === MESSAGE_TYPE_EMAIL) { if ($deviceForFiles->getType() === Storage::DEVICE_LOCAL) { return; @@ -345,12 +345,12 @@ class Messaging extends Action $bucket = $dbForProject->getDocument('buckets', $bucketId); if ($bucket->isEmpty()) { - throw new Exception(Exception::STORAGE_BUCKET_NOT_FOUND); + throw new \Exception('Storage bucket with the requested ID could not be found'); } $file = $dbForProject->getDocument('bucket_' . $bucket->getInternalId(), $fileId); if ($file->isEmpty()) { - throw new Exception(Exception::STORAGE_FILE_NOT_FOUND); + throw new \Exception('Storage file with the requested ID could not be found'); } $path = $file->getAttribute('path', ''); @@ -369,7 +369,7 @@ class Messaging extends Action } if ($project->isEmpty()) { - throw new Exception('Project not set in payload'); + throw new \Exception('Project not set in payload'); } Console::log('Project: ' . $project->getId()); @@ -427,12 +427,13 @@ class Messaging extends Action $adapter = $this->getSmsAdapter($provider); - $maxBatchSize = $adapter->getMaxMessagesPerRequest(); - $batches = \array_chunk($recipients, $maxBatchSize); - $batchIndex = 0; + $batches = \array_chunk( + $recipients, + $adapter->getMaxMessagesPerRequest() + ); - batch(\array_map(function ($batch) use ($message, $provider, $adapter, $batchIndex, $project, $queueForUsage) { - return function () use ($batch, $message, $provider, $adapter, $batchIndex, $project, $queueForUsage) { + batch(\array_map(function ($batch) use ($message, $provider, $adapter, $project, $queueForUsage) { + return function () use ($batch, $message, $provider, $adapter, $project, $queueForUsage) { $message->setAttribute('to', $batch); $data = $this->buildSmsMessage($message, $provider); @@ -445,7 +446,7 @@ class Messaging extends Action ->addMetric(METRIC_MESSAGES, 1) ->trigger(); } catch (\Throwable $e) { - throw new Exception('Failed sending to targets ' . $batchIndex + 1 . '-' . \count($batch) . ' with error: ' . $e->getMessage(), 500); + throw new \Exception('Failed sending to targets with error: ' . $e->getMessage()); } }; }, $batches)); @@ -556,19 +557,19 @@ class Messaging extends Action $bucket = $dbForProject->getDocument('buckets', $bucketId); if ($bucket->isEmpty()) { - throw new Exception(Exception::STORAGE_BUCKET_NOT_FOUND); + throw new \Exception('Storage bucket with the requested ID could not be found'); } $file = $dbForProject->getDocument('bucket_' . $bucket->getInternalId(), $fileId); if ($file->isEmpty()) { - throw new Exception(Exception::STORAGE_FILE_NOT_FOUND); + throw new \Exception('Storage file with the requested ID could not be found'); } $mimes = Config::getParam('storage-mimes'); $path = $file->getAttribute('path', ''); if (!$deviceForFiles->exists($path)) { - throw new Exception(Exception::STORAGE_FILE_NOT_FOUND, 'File not found in ' . $path); + throw new \Exception('File not found in ' . $path); } $contentType = 'text/plain';