From bbce53cda5d93d464d7b01d5071bacdc8edd0726 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Wed, 21 Feb 2024 01:06:35 +1300 Subject: [PATCH] Auto trigger messaging events --- app/controllers/api/account.php | 12 ++-- app/controllers/api/messaging.php | 24 +++---- app/controllers/api/teams.php | 4 +- app/controllers/shared/api.php | 12 ++-- app/init.php | 8 +++ src/Appwrite/Event/Messaging.php | 25 +++++++- .../Platform/Tasks/ScheduleMessages.php | 1 + src/Appwrite/Platform/Workers/Messaging.php | 63 +++++++++---------- 8 files changed, 89 insertions(+), 60 deletions(-) diff --git a/app/controllers/api/account.php b/app/controllers/api/account.php index dd009864a7..11facf04a7 100644 --- a/app/controllers/api/account.php +++ b/app/controllers/api/account.php @@ -1773,10 +1773,10 @@ App::post('/v1/account/tokens/phone') ]); $queueForMessaging + ->setType(MESSAGE_SEND_TYPE_INTERNAL) ->setMessage($messageDoc) ->setRecipients([$phone]) - ->setProviderType(MESSAGE_TYPE_SMS) - ->trigger(); + ->setProviderType(MESSAGE_TYPE_SMS); $queueForEvents->setPayload( $response->output( @@ -3314,10 +3314,10 @@ App::post('/v1/account/verification/phone') ]); $queueForMessaging + ->setType(MESSAGE_SEND_TYPE_INTERNAL) ->setMessage($messageDoc) ->setRecipients([$user->getAttribute('phone')]) - ->setProviderType(MESSAGE_TYPE_SMS) - ->trigger(); + ->setProviderType(MESSAGE_TYPE_SMS); $queueForEvents ->setParam('userId', $user->getId()) @@ -3677,14 +3677,14 @@ App::post('/v1/account/mfa/challenge') } $queueForMessaging + ->setType(MESSAGE_SEND_TYPE_INTERNAL) ->setMessage(new Document([ '$id' => $challenge->getId(), 'data' => [ 'content' => $code, ], ])) - ->setRecipients([$user->getAttribute('phone')]) - ->trigger(); + ->setRecipients([$user->getAttribute('phone')]); break; case 'email': if (empty(App::getEnv('_APP_SMTP_HOST'))) { diff --git a/app/controllers/api/messaging.php b/app/controllers/api/messaging.php index 8e6c73f3bc..d8af44f7fd 100644 --- a/app/controllers/api/messaging.php +++ b/app/controllers/api/messaging.php @@ -2635,8 +2635,8 @@ App::post('/v1/messaging/messages/email') switch ($status) { case MessageStatus::PROCESSING: $queueForMessaging - ->setMessageId($message->getId()) - ->trigger(); + ->setType(MESSAGE_SEND_TYPE_EXTERNAL) + ->setMessageId($message->getId()); break; case MessageStatus::SCHEDULED: $schedule = $dbForConsole->createDocument('schedules', new Document([ @@ -2744,8 +2744,8 @@ App::post('/v1/messaging/messages/sms') switch ($status) { case MessageStatus::PROCESSING: $queueForMessaging - ->setMessageId($message->getId()) - ->trigger(); + ->setType(MESSAGE_SEND_TYPE_EXTERNAL) + ->setMessageId($message->getId()); break; case MessageStatus::SCHEDULED: $schedule = $dbForConsole->createDocument('schedules', new Document([ @@ -2870,8 +2870,8 @@ App::post('/v1/messaging/messages/push') switch ($status) { case MessageStatus::PROCESSING: $queueForMessaging - ->setMessageId($message->getId()) - ->trigger(); + ->setType(MESSAGE_SEND_TYPE_EXTERNAL) + ->setMessageId($message->getId()); break; case MessageStatus::SCHEDULED: $schedule = $dbForConsole->createDocument('schedules', new Document([ @@ -3263,8 +3263,8 @@ App::patch('/v1/messaging/messages/email/:messageId') if ($status === MessageStatus::PROCESSING) { $queueForMessaging - ->setMessageId($message->getId()) - ->trigger(); + ->setType(MESSAGE_SEND_TYPE_EXTERNAL) + ->setMessageId($message->getId()); } $queueForEvents @@ -3382,8 +3382,8 @@ App::patch('/v1/messaging/messages/sms/:messageId') if ($status === MessageStatus::PROCESSING) { $queueForMessaging - ->setMessageId($message->getId()) - ->trigger(); + ->setType(MESSAGE_SEND_TYPE_EXTERNAL) + ->setMessageId($message->getId()); } $queueForEvents @@ -3541,8 +3541,8 @@ App::patch('/v1/messaging/messages/push/:messageId') if ($status === MessageStatus::PROCESSING) { $queueForMessaging - ->setMessageId($message->getId()) - ->trigger(); + ->setType(MESSAGE_SEND_TYPE_EXTERNAL) + ->setMessageId($message->getId()); } $queueForEvents diff --git a/app/controllers/api/teams.php b/app/controllers/api/teams.php index 775fc27bb7..d5db918cd7 100644 --- a/app/controllers/api/teams.php +++ b/app/controllers/api/teams.php @@ -658,10 +658,10 @@ App::post('/v1/teams/:teamId/memberships') ]); $queueForMessaging + ->setType(MESSAGE_SEND_TYPE_INTERNAL) ->setMessage($messageDoc) ->setRecipients([$phone]) - ->setProviderType('SMS') - ->trigger(); + ->setProviderType('SMS'); } } diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index 7becb3c522..7f8041396f 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -384,9 +384,6 @@ App::init() ->setProject($project) ->setUser($user); - $queueForMessaging - ->setProject($project); - $queueForAudits ->setMode($mode) ->setUserAgent($request->getUserAgent('')) @@ -395,10 +392,10 @@ App::init() ->setProject($project) ->setUser($user); - $queueForDeletes->setProject($project); $queueForDatabase->setProject($project); $queueForBuilds->setProject($project); + $queueForMessaging->setProject($project); $dbForProject ->on(Database::EVENT_DOCUMENT_CREATE, 'calculate-usage', fn ($event, $document) => $databaseListener($event, $document, $project, $queueForUsage, $dbForProject)) @@ -517,11 +514,12 @@ App::shutdown() ->inject('queueForDeletes') ->inject('queueForDatabase') ->inject('queueForBuilds') + ->inject('queueForMessaging') ->inject('dbForProject') ->inject('queueForFunctions') ->inject('mode') ->inject('dbForConsole') - ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole) use ($parseLabel) { + ->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole) use ($parseLabel) { $responsePayload = $response->getPayload(); @@ -626,6 +624,10 @@ App::shutdown() $queueForBuilds->trigger(); } + if (!empty($queueForMessaging->getType())) { + $queueForBuilds->trigger(); + } + /** * Cache label */ diff --git a/app/init.php b/app/init.php index 4e75b5198b..8935fc7265 100644 --- a/app/init.php +++ b/app/init.php @@ -142,9 +142,11 @@ const APP_SOCIAL_DEV = 'https://dev.to/appwrite'; const APP_SOCIAL_STACKSHARE = 'https://stackshare.io/appwrite'; const APP_SOCIAL_YOUTUBE = 'https://www.youtube.com/c/appwrite?sub_confirmation=1'; const APP_HOSTNAME_INTERNAL = 'appwrite'; + // Database Reconnect const DATABASE_RECONNECT_SLEEP = 2; const DATABASE_RECONNECT_MAX_ATTEMPTS = 10; + // Database Worker Types const DATABASE_TYPE_CREATE_ATTRIBUTE = 'createAttribute'; const DATABASE_TYPE_CREATE_INDEX = 'createIndex'; @@ -152,9 +154,11 @@ const DATABASE_TYPE_DELETE_ATTRIBUTE = 'deleteAttribute'; const DATABASE_TYPE_DELETE_INDEX = 'deleteIndex'; const DATABASE_TYPE_DELETE_COLLECTION = 'deleteCollection'; const DATABASE_TYPE_DELETE_DATABASE = 'deleteDatabase'; + // Build Worker Types const BUILD_TYPE_DEPLOYMENT = 'deployment'; const BUILD_TYPE_RETRY = 'retry'; + // Deletion Types const DELETE_TYPE_DATABASES = 'databases'; const DELETE_TYPE_DOCUMENT = 'document'; @@ -180,6 +184,10 @@ const DELETE_TYPE_TOPIC = 'topic'; const DELETE_TYPE_TARGET = 'target'; const DELETE_TYPE_EXPIRED_TARGETS = 'invalid_targets'; const DELETE_TYPE_SESSION_TARGETS = 'session_targets'; + +// Message types +const MESSAGE_SEND_TYPE_INTERNAL = 'internal'; +const MESSAGE_SEND_TYPE_EXTERNAL = 'external'; // Mail Types const MAIL_TYPE_VERIFICATION = 'verification'; const MAIL_TYPE_MAGIC_SESSION = 'magicSession'; diff --git a/src/Appwrite/Event/Messaging.php b/src/Appwrite/Event/Messaging.php index 9201799355..b39af7e4fe 100644 --- a/src/Appwrite/Event/Messaging.php +++ b/src/Appwrite/Event/Messaging.php @@ -8,13 +8,13 @@ use Utopia\Queue\Client; class Messaging extends Event { + protected string $type = ''; protected ?string $messageId = null; protected ?Document $message = null; protected ?array $recipients = null; protected ?string $scheduledAt = null; protected ?string $providerType = null; - public function __construct(protected Connection $connection) { parent::__construct($connection); @@ -24,6 +24,29 @@ class Messaging extends Event ->setClass(Event::MESSAGING_CLASS_NAME); } + /** + * Sets type for the build event. + * + * @param string $type Can be `MESSAGE_TYPE_INTERNAL` or `MESSAGE_TYPE_EXTERNAL`. + * @return self + */ + public function setType(string $type): self + { + $this->type = $type; + + return $this; + } + + /** + * Returns set type for the function event. + * + * @return string + */ + public function getType(): string + { + return $this->type; + } + /** * Sets recipient for the messaging event. * diff --git a/src/Appwrite/Platform/Tasks/ScheduleMessages.php b/src/Appwrite/Platform/Tasks/ScheduleMessages.php index cc641b434a..bc9b6d37d2 100644 --- a/src/Appwrite/Platform/Tasks/ScheduleMessages.php +++ b/src/Appwrite/Platform/Tasks/ScheduleMessages.php @@ -50,6 +50,7 @@ class ScheduleMessages extends ScheduleBase $queueForMessaging = new Messaging($connection); $queueForMessaging + ->setType(MESSAGE_SEND_TYPE_EXTERNAL) ->setMessageId($schedule['resourceId']) ->setProject($schedule['project']) ->trigger(); diff --git a/src/Appwrite/Platform/Workers/Messaging.php b/src/Appwrite/Platform/Workers/Messaging.php index 083eae4e0a..2b954e4a9e 100644 --- a/src/Appwrite/Platform/Workers/Messaging.php +++ b/src/Appwrite/Platform/Workers/Messaging.php @@ -44,7 +44,7 @@ class Messaging extends Action } /** - * @throws Exception + * @throws \Exception */ public function __construct() { @@ -63,7 +63,7 @@ class Messaging extends Action * @param Database $dbForProject * @param Usage $queueForUsage * @return void - * @throws Exception + * @throws \Exception */ public function action(Message $message, Log $log, Database $dbForProject, Usage $queueForUsage): void { @@ -73,28 +73,27 @@ class Messaging extends Action throw new Exception('Missing payload'); } + $type = $payload['type'] ?? ''; + $project = new Document($payload['project'] ?? []); - if ( - !\is_null($payload['message']) - && !\is_null($payload['recipients']) - && $payload['providerType'] === MESSAGE_TYPE_SMS - ) { - // Message was triggered internally - $this->processInternalSMSMessage( - new Document($payload['message']), - new Document($payload['project'] ?? []), - $payload['recipients'], - $queueForUsage, - $log, - ); - } else { - $message = $dbForProject->getDocument('messages', $payload['messageId']); + switch ($type) { + case MESSAGE_SEND_TYPE_INTERNAL: + $message = new Document($payload['message'] ?? []); + $recipients = $payload['recipients'] ?? []; - $this->processMessage($dbForProject, $message); + $this->sendInternalSMSMessage($message, $project, $recipients, $queueForUsage, $log); + break; + case MESSAGE_SEND_TYPE_EXTERNAL: + $message = $dbForProject->getDocument('messages', $payload['messageId']); + + $this->sendExternalMessage($dbForProject, $message); + break; + default: + throw new Exception('Unknown message type: ' . $type); } } - private function processMessage(Database $dbForProject, Document $message): void + private function sendExternalMessage(Database $dbForProject, Document $message): void { $topicIds = $message->getAttribute('topics', []); $targetIds = $message->getAttribute('targets', []); @@ -216,9 +215,9 @@ class Messaging extends Action $identifiers = $identifiers[$providerId]; $adapter = match ($provider->getAttribute('type')) { - MESSAGE_TYPE_SMS => $this->sms($provider), - MESSAGE_TYPE_PUSH => $this->push($provider), - MESSAGE_TYPE_EMAIL => $this->email($provider), + MESSAGE_TYPE_SMS => $this->getSmsAdapter($provider), + MESSAGE_TYPE_PUSH => $this->getPushAdapter($provider), + MESSAGE_TYPE_EMAIL => $this->getEmailAdapter($provider), default => throw new Exception(Exception::PROVIDER_INCORRECT_TYPE) }; @@ -234,7 +233,7 @@ class Messaging extends Action $messageData->setAttribute('to', $batch); $data = match ($provider->getAttribute('type')) { - MESSAGE_TYPE_SMS => $this->buildSMSMessage($messageData, $provider), + MESSAGE_TYPE_SMS => $this->buildSmsMessage($messageData, $provider), MESSAGE_TYPE_PUSH => $this->buildPushMessage($messageData), MESSAGE_TYPE_EMAIL => $this->buildEmailMessage($dbForProject, $messageData, $provider), default => throw new Exception(Exception::PROVIDER_INCORRECT_TYPE) @@ -312,7 +311,7 @@ class Messaging extends Action $dbForProject->updateDocument('messages', $message->getId(), $message); } - private function processInternalSMSMessage(Document $message, Document $project, array $recipients, Usage $queueForUsage, Log $log): void + private function sendInternalSMSMessage(Document $message, Document $project, array $recipients, Usage $queueForUsage, Log $log): void { if (empty(App::getEnv('_APP_SMS_PROVIDER')) || empty(App::getEnv('_APP_SMS_FROM'))) { throw new \Exception('Skipped SMS processing. Missing "_APP_SMS_PROVIDER" or "_APP_SMS_FROM" environment variables.'); @@ -375,7 +374,7 @@ class Messaging extends Action ] ]); - $adapter = $this->sms($provider); + $adapter = $this->getSmsAdapter($provider); $maxBatchSize = $adapter->getMaxMessagesPerRequest(); $batches = \array_chunk($recipients, $maxBatchSize); @@ -385,7 +384,7 @@ class Messaging extends Action return function () use ($batch, $message, $provider, $adapter, $batchIndex, $project, $queueForUsage) { $message->setAttribute('to', $batch); - $data = $this->buildSMSMessage($message, $provider); + $data = $this->buildSmsMessage($message, $provider); try { $adapter->send($data); @@ -401,11 +400,7 @@ class Messaging extends Action }, $batches)); } - public function shutdown(): void - { - } - - private function sms(Document $provider): ?SMSAdapter + private function getSmsAdapter(Document $provider): ?SMSAdapter { $credentials = $provider->getAttribute('credentials'); @@ -420,7 +415,7 @@ class Messaging extends Action }; } - private function push(Document $provider): ?PushAdapter + private function getPushAdapter(Document $provider): ?PushAdapter { $credentials = $provider->getAttribute('credentials'); @@ -437,7 +432,7 @@ class Messaging extends Action }; } - private function email(Document $provider): ?EmailAdapter + private function getEmailAdapter(Document $provider): ?EmailAdapter { $credentials = $provider->getAttribute('credentials', []); $options = $provider->getAttribute('options', []); @@ -503,7 +498,7 @@ class Messaging extends Action return new Email($to, $subject, $content, $fromName, $fromEmail, $replyToName, $replyToEmail, $cc, $bcc, null, $html); } - private function buildSMSMessage(Document $message, Document $provider): SMS + private function buildSmsMessage(Document $message, Document $provider): SMS { $to = $message['to']; $content = $message['data']['content'];