Merge pull request #11228 from appwrite/feat-span-messaging-worker

This commit is contained in:
Chirag Aggarwal 2026-02-03 14:15:07 +05:30 committed by GitHub
commit b7df5cfcc1
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 120 additions and 28 deletions

View file

@ -44,13 +44,19 @@ use Utopia\Queue\Message;
use Utopia\Queue\Publisher;
use Utopia\Queue\Server;
use Utopia\Registry\Registry;
use Utopia\Span\Exporter;
use Utopia\Span\Span;
use Utopia\Span\Storage;
use Utopia\Storage\Device\Telemetry as TelemetryDevice;
use Utopia\System\System;
use Utopia\Telemetry\Adapter as Telemetry;
use Utopia\Telemetry\Adapter\None as NoTelemetry;
Runtime::enableCoroutine();
Span::setStorage(new Storage\Coroutine());
Span::addExporter(new Exporter\Stdout());
global $register;
Server::setResource('register', fn () => $register);
Server::setResource('authorization', function () {

View file

@ -5,7 +5,6 @@ namespace Appwrite\Platform\Workers;
use Appwrite\Event\StatsUsage;
use Appwrite\Messaging\Status as MessageStatus;
use Swoole\Runtime;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
@ -39,6 +38,7 @@ use Utopia\Messaging\Messages\SMS;
use Utopia\Messaging\Priority;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
use Utopia\Span\Span;
use Utopia\Storage\Device;
use Utopia\Storage\Device\Local;
use Utopia\Storage\Storage;
@ -100,20 +100,31 @@ class Messaging extends Action
$type = $payload['type'] ?? '';
switch ($type) {
case MESSAGE_SEND_TYPE_INTERNAL:
$message = new Document($payload['message'] ?? []);
$recipients = $payload['recipients'] ?? [];
Span::init('messaging');
Span::add('project', $project->getId());
Span::add('type', $type);
$this->sendInternalSMSMessage($message, $project, $recipients, $log);
break;
case MESSAGE_SEND_TYPE_EXTERNAL:
$message = $dbForProject->getDocument('messages', $payload['messageId']);
try {
switch ($type) {
case MESSAGE_SEND_TYPE_INTERNAL:
$message = new Document($payload['message'] ?? []);
$recipients = $payload['recipients'] ?? [];
$this->sendExternalMessage($dbForProject, $message, $deviceForFiles, $project, $queueForStatsUsage);
break;
default:
throw new \Exception('Unknown message type: ' . $type);
$this->sendInternalSMSMessage($message, $project, $recipients, $log);
break;
case MESSAGE_SEND_TYPE_EXTERNAL:
$message = $dbForProject->getDocument('messages', $payload['messageId']);
$this->sendExternalMessage($dbForProject, $message, $deviceForFiles, $project, $queueForStatsUsage);
break;
default:
throw new \Exception('Unknown message type: ' . $type);
}
} catch (\Throwable $e) {
Span::error($e);
throw $e;
} finally {
Span::current()->finish();
}
}
@ -129,10 +140,11 @@ class Messaging extends Action
$userIds = $message->getAttribute('users', []);
$providerType = $message->getAttribute('providerType');
Console::log(json_encode([
'project' => $project->getId(),
'type' => $providerType,
]));
Span::add('messageId', $message->getId());
Span::add('providerType', $providerType);
Span::add('topicsCount', \count($topicIds));
Span::add('usersCount', \count($userIds));
Span::add('targetsCount', \count($targetIds));
/**
* @var array<Document> $allTargets
@ -177,13 +189,31 @@ class Messaging extends Action
\array_push($allTargets, ...$targets);
}
Span::add('recipientsTotal', \count($allTargets));
// Extract country codes for SMS targets
if ($providerType === MESSAGE_TYPE_SMS && !empty($allTargets)) {
$countryCodes = [];
foreach ($allTargets as $target) {
$identifier = $target->getAttribute('identifier', '');
$countryCode = $this->extractCountryCode($identifier);
if ($countryCode !== null) {
$countryCodes[$countryCode] = ($countryCodes[$countryCode] ?? 0) + 1;
}
}
if (!empty($countryCodes)) {
Span::add('countryCodes', \json_encode($countryCodes));
}
}
if (empty($allTargets)) {
$dbForProject->updateDocument('messages', $message->getId(), $message->setAttributes([
'status' => MessageStatus::FAILED,
'deliveryErrors' => ['No valid recipients found.']
]));
Console::warning('No valid recipients found.');
Span::add('status', 'failed');
Span::add('error', 'No valid recipients found.');
return;
}
@ -198,10 +228,14 @@ class Messaging extends Action
'deliveryErrors' => ['No enabled provider found.']
]));
Console::warning('No enabled provider found.');
Span::add('status', 'failed');
Span::add('error', 'No enabled provider found.');
return;
}
Span::add('provider', $default->getAttribute('provider'));
Span::add('providerName', $default->getAttribute('name'));
/**
* @var array<string, array<string, null>> $identifiers
*/
@ -343,10 +377,15 @@ class Messaging extends Action
if (\count($message->getAttribute('deliveryErrors')) > 0) {
$message->setAttribute('status', MessageStatus::FAILED);
Span::add('status', 'failed');
} else {
$message->setAttribute('status', MessageStatus::SENT);
Span::add('status', 'sent');
}
Span::add('deliveredTotal', $deliveredTotal);
Span::add('errorsTotal', \count($deliveryErrors));
$message->removeAttribute('to');
foreach ($providers as $provider) {
@ -392,12 +431,27 @@ class Messaging extends Action
private function sendInternalSMSMessage(Document $message, Document $project, array $recipients, Log $log): void
{
Span::add('recipientsCount', \count($recipients));
// Extract country codes from phone numbers
$countryCodes = [];
foreach ($recipients as $recipient) {
$countryCode = $this->extractCountryCode($recipient);
if ($countryCode !== null) {
$countryCodes[$countryCode] = ($countryCodes[$countryCode] ?? 0) + 1;
}
}
if (!empty($countryCodes)) {
Span::add('countryCodes', \json_encode($countryCodes));
}
if ($this->adapter === null) {
$this->adapter = $this->createInternalSMSAdapter();
}
if ($this->adapter === null) {
Console::warning('Skipped SMS processing. SMS adapter is not set.');
Span::add('status', 'skipped');
Span::add('warning', 'SMS adapter is not set.');
return;
}
@ -405,18 +459,17 @@ class Messaging extends Action
throw new \Exception('Project not set in payload');
}
Console::log(json_encode([
'project' => $project->getId(),
'type' => 'internal-sms'
]));
$denyList = System::getEnv('_APP_SMS_PROJECTS_DENY_LIST', '');
$denyList = explode(',', $denyList);
if (\in_array($project->getId(), $denyList)) {
Console::error('Project is in the deny list. Skipping...');
Span::add('status', 'denied');
Span::add('error', 'Project is in the deny list.');
return;
}
$from = System::getEnv('_APP_SMS_FROM', '');
Span::add('from', $from);
$sms = new SMS(
$recipients,
$message->getAttribute('data')['content'],
@ -425,7 +478,10 @@ class Messaging extends Action
try {
$result = $this->adapter->send($sms);
Span::add('status', 'sent');
Span::add('deliveredTo', $result['deliveredTo'] ?? 0);
} catch (\Throwable $th) {
Span::add('status', 'failed');
throw new \Exception('Failed sending to targets with error: ' . $th->getMessage());
}
}
@ -699,7 +755,7 @@ class Messaging extends Action
private function createInternalSMSAdapter(): ?SMSAdapter
{
if (empty(System::getEnv('_APP_SMS_PROVIDER')) || empty(System::getEnv('_APP_SMS_FROM'))) {
Console::warning('Skipped SMS processing. Missing "_APP_SMS_PROVIDER" or "_APP_SMS_FROM" environment variables.');
Span::add('warning', 'Skipped SMS processing. Missing "_APP_SMS_PROVIDER" or "_APP_SMS_FROM" environment variables.');
return null;
}
@ -745,13 +801,13 @@ class Messaging extends Action
$provider = $this->createProviderFromDSN($localDSN);
$adapter = $this->getSmsAdapter($provider);
} catch (\Exception) {
Console::warning('Unable to create adapter: ' . $localDSN->getHost());
Span::add('warning', 'Unable to create adapter: ' . $localDSN->getHost());
continue;
}
$callingCode = $localDSN->getParam('local', '');
if (empty($callingCode)) {
Console::warning('Unable to register adapter: ' . $localDSN->getHost() . '. Missing `local` parameter.');
Span::add('warning', 'Unable to register adapter: ' . $localDSN->getHost() . '. Missing `local` parameter.');
continue;
}
@ -822,4 +878,34 @@ class Messaging extends Action
return $provider;
}
/**
* Extract country calling code from a phone number using known country codes.
*/
private function extractCountryCode(string $phoneNumber): ?string
{
if (!\str_starts_with($phoneNumber, '+')) {
return null;
}
$number = \substr($phoneNumber, 1);
if (empty($number)) {
return null;
}
$phoneCodes = Config::getParam('locale-phones', []);
$codes = \array_unique(\array_values($phoneCodes));
// Sort by length descending to match longest codes first (e.g., 1868 before 1)
\usort($codes, fn ($a, $b) => \strlen($b) - \strlen($a));
foreach ($codes as $code) {
if (\str_starts_with($number, $code)) {
return $code;
}
}
return null;
}
}