diff --git a/app/worker.php b/app/worker.php index 39f0695bb3..d0094222a7 100644 --- a/app/worker.php +++ b/app/worker.php @@ -44,13 +44,19 @@ use Utopia\Queue\Message; use Utopia\Queue\Publisher; use Utopia\Queue\Server; use Utopia\Registry\Registry; +use Utopia\Span\Exporter; +use Utopia\Span\Span; +use Utopia\Span\Storage; use Utopia\Storage\Device\Telemetry as TelemetryDevice; use Utopia\System\System; use Utopia\Telemetry\Adapter as Telemetry; use Utopia\Telemetry\Adapter\None as NoTelemetry; Runtime::enableCoroutine(); +Span::setStorage(new Storage\Coroutine()); +Span::addExporter(new Exporter\Stdout()); +global $register; Server::setResource('register', fn () => $register); Server::setResource('authorization', function () { diff --git a/src/Appwrite/Platform/Workers/Messaging.php b/src/Appwrite/Platform/Workers/Messaging.php index 7b604c3b19..d1001d5e50 100644 --- a/src/Appwrite/Platform/Workers/Messaging.php +++ b/src/Appwrite/Platform/Workers/Messaging.php @@ -5,7 +5,6 @@ namespace Appwrite\Platform\Workers; use Appwrite\Event\StatsUsage; use Appwrite\Messaging\Status as MessageStatus; use Swoole\Runtime; -use Utopia\CLI\Console; use Utopia\Config\Config; use Utopia\Database\Database; use Utopia\Database\DateTime; @@ -39,6 +38,7 @@ use Utopia\Messaging\Messages\SMS; use Utopia\Messaging\Priority; use Utopia\Platform\Action; use Utopia\Queue\Message; +use Utopia\Span\Span; use Utopia\Storage\Device; use Utopia\Storage\Device\Local; use Utopia\Storage\Storage; @@ -100,20 +100,31 @@ class Messaging extends Action $type = $payload['type'] ?? ''; - switch ($type) { - case MESSAGE_SEND_TYPE_INTERNAL: - $message = new Document($payload['message'] ?? []); - $recipients = $payload['recipients'] ?? []; + Span::init('messaging'); + Span::add('project', $project->getId()); + Span::add('type', $type); - $this->sendInternalSMSMessage($message, $project, $recipients, $log); - break; - case MESSAGE_SEND_TYPE_EXTERNAL: - $message = $dbForProject->getDocument('messages', $payload['messageId']); + try { + switch ($type) { + case MESSAGE_SEND_TYPE_INTERNAL: + $message = new Document($payload['message'] ?? []); + $recipients = $payload['recipients'] ?? []; - $this->sendExternalMessage($dbForProject, $message, $deviceForFiles, $project, $queueForStatsUsage); - break; - default: - throw new \Exception('Unknown message type: ' . $type); + $this->sendInternalSMSMessage($message, $project, $recipients, $log); + break; + case MESSAGE_SEND_TYPE_EXTERNAL: + $message = $dbForProject->getDocument('messages', $payload['messageId']); + + $this->sendExternalMessage($dbForProject, $message, $deviceForFiles, $project, $queueForStatsUsage); + break; + default: + throw new \Exception('Unknown message type: ' . $type); + } + } catch (\Throwable $e) { + Span::error($e); + throw $e; + } finally { + Span::current()->finish(); } } @@ -129,10 +140,11 @@ class Messaging extends Action $userIds = $message->getAttribute('users', []); $providerType = $message->getAttribute('providerType'); - Console::log(json_encode([ - 'project' => $project->getId(), - 'type' => $providerType, - ])); + Span::add('messageId', $message->getId()); + Span::add('providerType', $providerType); + Span::add('topicsCount', \count($topicIds)); + Span::add('usersCount', \count($userIds)); + Span::add('targetsCount', \count($targetIds)); /** * @var array $allTargets @@ -177,13 +189,31 @@ class Messaging extends Action \array_push($allTargets, ...$targets); } + Span::add('recipientsTotal', \count($allTargets)); + + // Extract country codes for SMS targets + if ($providerType === MESSAGE_TYPE_SMS && !empty($allTargets)) { + $countryCodes = []; + foreach ($allTargets as $target) { + $identifier = $target->getAttribute('identifier', ''); + $countryCode = $this->extractCountryCode($identifier); + if ($countryCode !== null) { + $countryCodes[$countryCode] = ($countryCodes[$countryCode] ?? 0) + 1; + } + } + if (!empty($countryCodes)) { + Span::add('countryCodes', \json_encode($countryCodes)); + } + } + if (empty($allTargets)) { $dbForProject->updateDocument('messages', $message->getId(), $message->setAttributes([ 'status' => MessageStatus::FAILED, 'deliveryErrors' => ['No valid recipients found.'] ])); - Console::warning('No valid recipients found.'); + Span::add('status', 'failed'); + Span::add('error', 'No valid recipients found.'); return; } @@ -198,10 +228,14 @@ class Messaging extends Action 'deliveryErrors' => ['No enabled provider found.'] ])); - Console::warning('No enabled provider found.'); + Span::add('status', 'failed'); + Span::add('error', 'No enabled provider found.'); return; } + Span::add('provider', $default->getAttribute('provider')); + Span::add('providerName', $default->getAttribute('name')); + /** * @var array> $identifiers */ @@ -343,10 +377,15 @@ class Messaging extends Action if (\count($message->getAttribute('deliveryErrors')) > 0) { $message->setAttribute('status', MessageStatus::FAILED); + Span::add('status', 'failed'); } else { $message->setAttribute('status', MessageStatus::SENT); + Span::add('status', 'sent'); } + Span::add('deliveredTotal', $deliveredTotal); + Span::add('errorsTotal', \count($deliveryErrors)); + $message->removeAttribute('to'); foreach ($providers as $provider) { @@ -392,12 +431,27 @@ class Messaging extends Action private function sendInternalSMSMessage(Document $message, Document $project, array $recipients, Log $log): void { + Span::add('recipientsCount', \count($recipients)); + + // Extract country codes from phone numbers + $countryCodes = []; + foreach ($recipients as $recipient) { + $countryCode = $this->extractCountryCode($recipient); + if ($countryCode !== null) { + $countryCodes[$countryCode] = ($countryCodes[$countryCode] ?? 0) + 1; + } + } + if (!empty($countryCodes)) { + Span::add('countryCodes', \json_encode($countryCodes)); + } + if ($this->adapter === null) { $this->adapter = $this->createInternalSMSAdapter(); } if ($this->adapter === null) { - Console::warning('Skipped SMS processing. SMS adapter is not set.'); + Span::add('status', 'skipped'); + Span::add('warning', 'SMS adapter is not set.'); return; } @@ -405,18 +459,17 @@ class Messaging extends Action throw new \Exception('Project not set in payload'); } - Console::log(json_encode([ - 'project' => $project->getId(), - 'type' => 'internal-sms' - ])); $denyList = System::getEnv('_APP_SMS_PROJECTS_DENY_LIST', ''); $denyList = explode(',', $denyList); if (\in_array($project->getId(), $denyList)) { - Console::error('Project is in the deny list. Skipping...'); + Span::add('status', 'denied'); + Span::add('error', 'Project is in the deny list.'); return; } $from = System::getEnv('_APP_SMS_FROM', ''); + Span::add('from', $from); + $sms = new SMS( $recipients, $message->getAttribute('data')['content'], @@ -425,7 +478,10 @@ class Messaging extends Action try { $result = $this->adapter->send($sms); + Span::add('status', 'sent'); + Span::add('deliveredTo', $result['deliveredTo'] ?? 0); } catch (\Throwable $th) { + Span::add('status', 'failed'); throw new \Exception('Failed sending to targets with error: ' . $th->getMessage()); } } @@ -699,7 +755,7 @@ class Messaging extends Action private function createInternalSMSAdapter(): ?SMSAdapter { if (empty(System::getEnv('_APP_SMS_PROVIDER')) || empty(System::getEnv('_APP_SMS_FROM'))) { - Console::warning('Skipped SMS processing. Missing "_APP_SMS_PROVIDER" or "_APP_SMS_FROM" environment variables.'); + Span::add('warning', 'Skipped SMS processing. Missing "_APP_SMS_PROVIDER" or "_APP_SMS_FROM" environment variables.'); return null; } @@ -745,13 +801,13 @@ class Messaging extends Action $provider = $this->createProviderFromDSN($localDSN); $adapter = $this->getSmsAdapter($provider); } catch (\Exception) { - Console::warning('Unable to create adapter: ' . $localDSN->getHost()); + Span::add('warning', 'Unable to create adapter: ' . $localDSN->getHost()); continue; } $callingCode = $localDSN->getParam('local', ''); if (empty($callingCode)) { - Console::warning('Unable to register adapter: ' . $localDSN->getHost() . '. Missing `local` parameter.'); + Span::add('warning', 'Unable to register adapter: ' . $localDSN->getHost() . '. Missing `local` parameter.'); continue; } @@ -822,4 +878,34 @@ class Messaging extends Action return $provider; } + + /** + * Extract country calling code from a phone number using known country codes. + */ + private function extractCountryCode(string $phoneNumber): ?string + { + if (!\str_starts_with($phoneNumber, '+')) { + return null; + } + + $number = \substr($phoneNumber, 1); + + if (empty($number)) { + return null; + } + + $phoneCodes = Config::getParam('locale-phones', []); + $codes = \array_unique(\array_values($phoneCodes)); + + // Sort by length descending to match longest codes first (e.g., 1868 before 1) + \usort($codes, fn ($a, $b) => \strlen($b) - \strlen($a)); + + foreach ($codes as $code) { + if (\str_starts_with($number, $code)) { + return $code; + } + } + + return null; + } }