appwrite/src/Appwrite/Platform/Workers/Messaging.php

274 lines
10 KiB
PHP
Raw Normal View History

2022-06-08 13:57:34 +00:00
<?php
2023-05-29 16:32:33 +00:00
namespace Appwrite\Platform\Workers;
use Appwrite\Extend\Exception;
2022-06-08 13:57:34 +00:00
use Utopia\CLI\Console;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Query;
use Utopia\Messaging\Adapters\SMS as SMSAdapter;
use Utopia\Messaging\Adapters\SMS\Mock;
use Utopia\Messaging\Adapters\SMS\Msg91;
use Utopia\Messaging\Adapters\SMS\Telesign;
use Utopia\Messaging\Adapters\SMS\TextMagic;
use Utopia\Messaging\Adapters\SMS\Twilio;
use Utopia\Messaging\Adapters\SMS\Vonage;
use Utopia\Messaging\Adapters\Push as PushAdapter;
use Utopia\Messaging\Adapters\Push\APNS;
use Utopia\Messaging\Adapters\Push\FCM;
use Utopia\Messaging\Adapters\Email as EmailAdapter;
use Utopia\Messaging\Adapters\Email\Mailgun;
use Utopia\Messaging\Adapters\Email\SendGrid;
use Utopia\Messaging\Messages\Email;
use Utopia\Messaging\Messages\Push;
use Utopia\Messaging\Messages\SMS;
2023-10-04 10:45:59 +00:00
use function Swoole\Coroutine\batch;
2023-05-29 16:32:33 +00:00
class Messaging extends Action
2022-06-08 13:57:34 +00:00
{
2023-05-29 16:32:33 +00:00
public static function getName(): string
2022-06-08 13:57:34 +00:00
{
2023-10-04 10:45:59 +00:00
return "messaging";
}
2023-06-02 03:54:34 +00:00
/**
* @throws Exception
*/
2023-05-29 16:32:33 +00:00
public function __construct()
{
2023-05-29 16:32:33 +00:00
$this
->desc('Messaging worker')
->inject('message')
->inject('dbForProject')
->callback(fn(Message $message, Database $dbForProject) => $this->action($message, $dbForProject));
2022-06-08 13:57:34 +00:00
}
2023-06-02 03:54:34 +00:00
/**
2023-10-01 17:39:26 +00:00
* @param Message $message
* @param Database $dbForProject
2023-10-01 17:39:26 +00:00
* @return void
2023-06-02 03:54:34 +00:00
* @throws Exception
*/
public function action(Message $message, Database $dbForProject): void
2022-06-08 13:57:34 +00:00
{
2023-05-29 16:32:33 +00:00
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
2023-10-01 17:39:26 +00:00
Console::error('Payload arg not found');
return;
2023-05-29 16:32:33 +00:00
}
$message = $dbForProject->getDocument('messages', $payload['messageId']);
2023-10-26 14:14:06 +00:00
$this->processMessage($dbForProject, $message);
}
2023-10-26 14:14:06 +00:00
private function processMessage(Database $dbForProject, Document $message): void
{
2023-10-30 18:07:57 +00:00
$topicsId = $message->getAttribute('topics', []);
$targetsId = $message->getAttribute('targets', []);
$usersId = $message->getAttribute('users', []);
/**
* @var Document[] $recipients
*/
$recipients = [];
2023-10-30 18:07:57 +00:00
if (\count($topicsId) > 0) {
$topics = $dbForProject->find('topics', [Query::equal('$id', $topicsId)]);
foreach ($topics as $topic) {
$recipients = \array_merge($recipients, $topic->getAttribute('targets'));
}
}
2023-10-30 18:07:57 +00:00
if (\count($usersId) > 0) {
$users = $dbForProject->find('users', [Query::equal('$id', $usersId)]);
foreach ($users as $user) {
$recipients = \array_merge($recipients, $user->getAttribute('targets'));
}
}
2023-10-30 18:07:57 +00:00
if (\count($targetsId) > 0) {
$targets = $dbForProject->find('targets', [Query::equal('$id', $targetsId)]);
$recipients = \array_merge($recipients, $targets);
}
2023-10-26 14:14:06 +00:00
/**
* @var array<string, array<string>> $identifiersByProviderId
*/
$identifiersByProviderId = [];
/**
* @var Document[] $providers
*/
2023-10-26 14:14:06 +00:00
$providers = [];
foreach ($recipients as $recipient) {
$providerId = $recipient->getAttribute('providerId');
if (!isset($identifiersByProviderId[$providerId])) {
$identifiersByProviderId[$providerId] = [];
2023-10-26 14:14:06 +00:00
}
$identifiersByProviderId[$providerId][] = $recipient->getAttribute('identifier');
2023-10-26 14:14:06 +00:00
}
/**
* @var array[] $results
*/
$results = batch(\array_map(function ($providerId) use ($identifiersByProviderId, $providers, $message, $dbForProject) {
return function () use ($providerId, $identifiersByProviderId, $providers, $message, $dbForProject) {
2023-11-02 11:13:24 +00:00
$provider = $dbForProject->getDocument('providers', $providerId);
$providers[] = $provider;
$identifiers = $identifiersByProviderId[$providerId];
2023-10-26 14:14:06 +00:00
$adapter = match ($provider->getAttribute('type')) {
'sms' => $this->sms($provider),
'push' => $this->push($provider),
'email' => $this->email($provider),
2023-10-04 10:45:59 +00:00
default => throw new Exception(Exception::PROVIDER_INCORRECT_TYPE)
};
2023-10-26 14:14:06 +00:00
$maxBatchSize = $adapter->getMaxMessagesPerRequest();
$batches = \array_chunk($identifiers, $maxBatchSize);
$batchIndex = 0;
$results = batch(\array_map(function ($batch) use ($message, $provider, $adapter, $batchIndex) {
return function () use ($batch, $message, $provider, $adapter, $batchIndex) {
2023-10-30 18:07:57 +00:00
$deliveredTotal = 0;
2023-10-26 14:14:06 +00:00
$deliveryErrors = [];
$messageData = clone $message;
$messageData->setAttribute('to', $batch);
$data = match ($provider->getAttribute('type')) {
'sms' => $this->buildSMSMessage($messageData, $provider),
'push' => $this->buildPushMessage($messageData),
'email' => $this->buildEmailMessage($messageData, $provider),
default => throw new Exception(Exception::PROVIDER_INCORRECT_TYPE)
};
try {
$adapter->send($data);
2023-10-30 18:07:57 +00:00
$deliveredTotal += \count($batch);
2023-10-26 14:14:06 +00:00
} catch (\Exception $e) {
$deliveryErrors[] = 'Failed sending to targets ' . $batchIndex + 1 . '-' . \count($batch) . ' with error: ' . $e->getMessage();
} finally {
$batchIndex++;
return [
2023-10-30 18:07:57 +00:00
'deliveredTotal' => $deliveredTotal,
2023-10-26 14:14:06 +00:00
'deliveryErrors' => $deliveryErrors,
];
}
};
}, $batches));
return $results;
2023-10-04 10:45:59 +00:00
};
}, \array_keys($identifiersByProviderId)));
2023-10-26 14:14:06 +00:00
$results = array_merge(...$results);
2023-10-04 10:45:59 +00:00
2023-10-30 18:07:57 +00:00
$deliveredTotal = 0;
2023-10-04 10:45:59 +00:00
$deliveryErrors = [];
foreach ($results as $result) {
2023-10-30 18:07:57 +00:00
$deliveredTotal += $result['deliveredTotal'];
2023-10-04 10:45:59 +00:00
$deliveryErrors = \array_merge($deliveryErrors, $result['deliveryErrors']);
}
2023-10-06 13:53:46 +00:00
$message->setAttribute('deliveryErrors', $deliveryErrors);
2023-10-06 13:53:46 +00:00
if (\count($message->getAttribute('deliveryErrors')) > 0) {
$message->setAttribute('status', 'failed');
} else {
2023-10-06 13:53:46 +00:00
$message->setAttribute('status', 'sent');
}
2023-10-30 18:07:57 +00:00
$message->removeAttribute('to');
foreach ($providers as $provider) {
$message->setAttribute('search', "{$message->getAttribute('search')} {$provider->getAttribute('name')} {$provider->getAttribute('provider')} {$provider->getAttribute('type')}");
}
2023-10-30 18:07:57 +00:00
$message->setAttribute('deliveredTotal', $deliveredTotal);
2023-10-06 13:53:46 +00:00
$message->setAttribute('deliveredAt', DateTime::now());
2023-10-20 11:32:13 +00:00
$dbForProject->updateDocument('messages', $message->getId(), $message);
2022-06-08 13:57:34 +00:00
}
2023-08-22 16:47:32 +00:00
public function shutdown(): void
2022-06-08 13:57:34 +00:00
{
}
2023-10-06 13:53:46 +00:00
private function sms(Document $provider): ?SMSAdapter
2023-10-04 10:45:59 +00:00
{
2023-10-06 13:53:46 +00:00
$credentials = $provider->getAttribute('credentials');
return match ($provider->getAttribute('provider')) {
2023-10-04 10:45:59 +00:00
'mock' => new Mock('username', 'password'),
'twilio' => new Twilio($credentials['accountSid'], $credentials['authToken']),
'text-magic' => new TextMagic($credentials['username'], $credentials['apiKey']),
'telesign' => new Telesign($credentials['username'], $credentials['password']),
'msg91' => new Msg91($credentials['senderId'], $credentials['authKey']),
'vonage' => new Vonage($credentials['apiKey'], $credentials['apiSecret']),
default => null
};
}
2023-10-06 13:53:46 +00:00
private function push(Document $provider): ?PushAdapter
2023-10-04 10:45:59 +00:00
{
2023-10-06 13:53:46 +00:00
$credentials = $provider->getAttribute('credentials');
return match ($provider->getAttribute('provider')) {
2023-10-04 10:45:59 +00:00
'apns' => new APNS(
$credentials['authKey'],
$credentials['authKeyId'],
$credentials['teamId'],
$credentials['bundleId'],
$credentials['endpoint']
),
'fcm' => new FCM($credentials['serverKey']),
default => null
};
}
2023-10-06 13:53:46 +00:00
private function email(Document $provider): ?EmailAdapter
2023-10-04 10:45:59 +00:00
{
2023-10-06 13:53:46 +00:00
$credentials = $provider->getAttribute('credentials');
return match ($provider->getAttribute('provider')) {
2023-10-04 10:45:59 +00:00
'mailgun' => new Mailgun($credentials['apiKey'], $credentials['domain'], $credentials['isEuRegion']),
'sendgrid' => new SendGrid($credentials['apiKey']),
default => null
};
}
private function buildEmailMessage(Document $message, Document $provider): Email
{
$from = $provider['options']['from'];
$to = $message['to'];
$subject = $message['data']['subject'];
$content = $message['data']['content'];
$html = $message['data']['html'];
2023-10-04 10:45:59 +00:00
return new Email($to, $subject, $content, $from, null, $html);
}
2023-10-04 10:45:59 +00:00
private function buildSMSMessage(Document $message, Document $provider): SMS
{
$to = $message['to'];
$content = $message['data']['content'];
$from = $provider['options']['from'];
return new SMS($to, $content, $from);
}
2023-10-04 10:45:59 +00:00
private function buildPushMessage(Document $message): Push
{
$to = $message['to'];
$title = $message['data']['title'];
$body = $message['data']['body'];
$data = $message['data']['data'];
$action = $message['data']['action'];
$sound = $message['data']['sound'];
$icon = $message['data']['icon'];
$color = $message['data']['color'];
$tag = $message['data']['tag'];
$badge = $message['data']['badge'];
2023-10-04 10:45:59 +00:00
return new Push($to, $title, $body, $data, $action, $sound, $icon, $color, $tag, $badge);
}
2022-06-08 13:57:34 +00:00
}