diff --git a/app/cli.php b/app/cli.php index 73908510d9..a6267fa341 100644 --- a/app/cli.php +++ b/app/cli.php @@ -2,10 +2,10 @@ require_once __DIR__ . '/init.php'; -use Appwrite\Event\Certificate; use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Func; +use Appwrite\Event\Publisher\Certificate as CertificatePublisher; use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher; use Appwrite\Event\Publisher\Usage as UsagePublisher; use Appwrite\Platform\Appwrite; @@ -253,6 +253,10 @@ $container->set('publisherForUsage', fn (Publisher $publisher) => new UsagePubli $publisher, new Queue(System::getEnv('_APP_STATS_USAGE_QUEUE_NAME', Event::STATS_USAGE_QUEUE_NAME)) ), ['publisher']); +$container->set('publisherForCertificates', fn (Publisher $publisher) => new CertificatePublisher( + $publisher, + new Queue(System::getEnv('_APP_CERTIFICATES_QUEUE_NAME', Event::CERTIFICATES_QUEUE_NAME)) +), ['publisher']); $container->set('publisherForStatsResources', fn (Publisher $publisher) => new StatsResourcesPublisher( $publisher, new Queue(System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME)) @@ -263,9 +267,6 @@ $container->set('queueForFunctions', function (Publisher $publisher) { $container->set('queueForDeletes', function (Publisher $publisher) { return new Delete($publisher); }, ['publisher']); -$container->set('queueForCertificates', function (Publisher $publisher) { - return new Certificate($publisher); -}, ['publisher']); $container->set('logError', function (Registry $register) { return function (Throwable $error, string $namespace, string $action) use ($register) { Console::error('[Error] Timestamp: ' . date('c', time())); diff --git a/app/controllers/general.php b/app/controllers/general.php index 542effc091..b4f4a5c1d1 100644 --- a/app/controllers/general.php +++ b/app/controllers/general.php @@ -7,9 +7,9 @@ use Ahc\Jwt\JWTException; use Appwrite\Auth\Key; use Appwrite\Bus\Events\ExecutionCompleted; use Appwrite\Bus\Events\RequestCompleted; -use Appwrite\Event\Certificate; use Appwrite\Event\Delete as DeleteEvent; use Appwrite\Event\Event; +use Appwrite\Event\Publisher\Certificate; use Appwrite\Extend\Exception as AppwriteException; use Appwrite\Network\Cors; use Appwrite\Platform\Appwrite; @@ -1014,11 +1014,11 @@ Http::init() ->inject('request') ->inject('console') ->inject('dbForPlatform') - ->inject('queueForCertificates') + ->inject('publisherForCertificates') ->inject('platform') ->inject('authorization') ->inject('certifiedDomains') - ->action(function (Request $request, Document $console, Database $dbForPlatform, Certificate $queueForCertificates, array $platform, Authorization $authorization, Table $certifiedDomains) { + ->action(function (Request $request, Document $console, Database $dbForPlatform, Certificate $publisherForCertificates, array $platform, Authorization $authorization, Table $certifiedDomains) { $hostname = $request->getHostname(); $platformHostnames = $platform['hostnames'] ?? []; @@ -1044,7 +1044,7 @@ Http::init() } // 4. Check/create rule (requires DB access) - $authorization->skip(function () use ($dbForPlatform, $domain, $console, $queueForCertificates, $certifiedDomains) { + $authorization->skip(function () use ($dbForPlatform, $domain, $console, $publisherForCertificates, $certifiedDomains) { try { // TODO: (@Meldiron) Remove after 1.7.x migration $isMd5 = System::getEnv('_APP_RULES_FORMAT') === 'md5'; @@ -1100,10 +1100,11 @@ Http::init() $dbForPlatform->createDocument('rules', $document); Console::info('Issuing a TLS certificate for the main domain (' . $domain->get() . ') in a few seconds...'); - $queueForCertificates - ->setDomain($document) - ->setSkipRenewCheck(true) - ->trigger(); + $publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate( + project: $console, + domain: $document, + skipRenewCheck: true, + )); } catch (Duplicate $e) { Console::info('Certificate already exists'); } finally { diff --git a/app/controllers/shared/api.php b/app/controllers/shared/api.php index bd54a8300b..5567281e67 100644 --- a/app/controllers/shared/api.php +++ b/app/controllers/shared/api.php @@ -3,15 +3,17 @@ use Appwrite\Auth\Key; use Appwrite\Auth\MFA\Type\TOTP; use Appwrite\Bus\Events\RequestCompleted; -use Appwrite\Event\Audit; use Appwrite\Event\Build; +use Appwrite\Event\Context\Audit as AuditContext; use Appwrite\Event\Database as EventDatabase; use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; +use Appwrite\Event\Message\Audit as AuditMessage; use Appwrite\Event\Message\Usage as UsageMessage; use Appwrite\Event\Messaging; +use Appwrite\Event\Publisher\Audit; use Appwrite\Event\Publisher\Usage as UsagePublisher; use Appwrite\Event\Realtime; use Appwrite\Event\Webhook; @@ -88,7 +90,7 @@ Http::init() ->inject('request') ->inject('dbForPlatform') ->inject('dbForProject') - ->inject('queueForAudits') + ->inject('auditContext') ->inject('project') ->inject('user') ->inject('session') @@ -97,7 +99,7 @@ Http::init() ->inject('team') ->inject('apiKey') ->inject('authorization') - ->action(function (Http $utopia, Request $request, Database $dbForPlatform, Database $dbForProject, Audit $queueForAudits, Document $project, User $user, ?Document $session, array $servers, string $mode, Document $team, ?Key $apiKey, Authorization $authorization) { + ->action(function (Http $utopia, Request $request, Database $dbForPlatform, Database $dbForProject, AuditContext $auditContext, Document $project, User $user, ?Document $session, array $servers, string $mode, Document $team, ?Key $apiKey, Authorization $authorization) { $route = $utopia->getRoute(); if ($route === null) { throw new AppwriteException(AppwriteException::GENERAL_ROUTE_NOT_FOUND); @@ -193,7 +195,7 @@ Http::init() 'name' => $apiKey->getName(), ]); - $queueForAudits->setUser($user); + $auditContext->user = $user; } // For standard keys, update last accessed time @@ -264,7 +266,7 @@ Http::init() API_KEY_ORGANIZATION => ACTIVITY_TYPE_KEY_ORGANIZATION, default => ACTIVITY_TYPE_KEY_PROJECT, }); - $queueForAudits->setUser($userClone); + $auditContext->user = $userClone; } // Apply permission @@ -486,7 +488,7 @@ Http::init() ->inject('user') ->inject('queueForEvents') ->inject('queueForMessaging') - ->inject('queueForAudits') + ->inject('auditContext') ->inject('queueForDeletes') ->inject('queueForDatabase') ->inject('queueForBuilds') @@ -503,7 +505,7 @@ Http::init() ->inject('telemetry') ->inject('platform') ->inject('authorization') - ->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Context $usage, Func $queueForFunctions, Mail $queueForMails, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform, Authorization $authorization) { + ->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Messaging $queueForMessaging, AuditContext $auditContext, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Context $usage, Func $queueForFunctions, Mail $queueForMails, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform, Authorization $authorization) { $response->setUser($user); $request->setUser($user); @@ -596,13 +598,12 @@ Http::init() ->setProject($project) ->setUser($user); - $queueForAudits - ->setMode($mode) - ->setUserAgent($request->getUserAgent('')) - ->setIP($request->getIP()) - ->setHostname($request->getHostname()) - ->setEvent($route->getLabel('audits.event', '')) - ->setProject($project); + $auditContext->mode = $mode; + $auditContext->userAgent = $request->getUserAgent(''); + $auditContext->ip = $request->getIP(); + $auditContext->hostname = $request->getHostname(); + $auditContext->event = $route->getLabel('audits.event', ''); + $auditContext->project = $project; /* If a session exists, use the user associated with the session */ if (! $user->isEmpty()) { @@ -611,7 +612,7 @@ Http::init() if (empty($user->getAttribute('type'))) { $userClone->setAttribute('type', $mode === APP_MODE_ADMIN ? ACTIVITY_TYPE_ADMIN : ACTIVITY_TYPE_USER); } - $queueForAudits->setUser($userClone); + $auditContext->user = $userClone; } /* Auto-set projects */ @@ -790,7 +791,8 @@ Http::shutdown() ->inject('project') ->inject('user') ->inject('queueForEvents') - ->inject('queueForAudits') + ->inject('auditContext') + ->inject('publisherForAudits') ->inject('usage') ->inject('publisherForUsage') ->inject('queueForDeletes') @@ -807,7 +809,7 @@ Http::shutdown() ->inject('bus') ->inject('apiKey') ->inject('mode') - ->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, Context $usage, UsagePublisher $publisherForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, Authorization $authorization, callable $timelimit, EventProcessor $eventProcessor, Bus $bus, ?Key $apiKey, string $mode) use ($parseLabel) { + ->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, AuditContext $auditContext, Audit $publisherForAudits, Context $usage, UsagePublisher $publisherForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, Authorization $authorization, callable $timelimit, EventProcessor $eventProcessor, Bus $bus, ?Key $apiKey, string $mode) use ($parseLabel) { $responsePayload = $response->getPayload(); @@ -902,7 +904,7 @@ Http::shutdown() if (! empty($pattern)) { $resource = $parseLabel($pattern, $responsePayload, $requestParams, $user); if (! empty($resource) && $resource !== $pattern) { - $queueForAudits->setResource($resource); + $auditContext->resource = $resource; } } @@ -912,8 +914,8 @@ Http::shutdown() if (empty($user->getAttribute('type'))) { $userClone->setAttribute('type', $mode === APP_MODE_ADMIN ? ACTIVITY_TYPE_ADMIN : ACTIVITY_TYPE_USER); } - $queueForAudits->setUser($userClone); - } elseif ($queueForAudits->getUser() === null || $queueForAudits->getUser()->isEmpty()) { + $auditContext->user = $userClone; + } elseif ($auditContext->user === null || $auditContext->user->isEmpty()) { /** * User in the request is empty, and no user was set for auditing previously. * This indicates: @@ -931,24 +933,21 @@ Http::shutdown() 'name' => 'Guest', ]); - $queueForAudits->setUser($user); + $auditContext->user = $user; } - if (! empty($queueForAudits->getResource()) && ! $queueForAudits->getUser()->isEmpty()) { + $auditUser = $auditContext->user; + if (! empty($auditContext->resource) && ! \is_null($auditUser) && ! $auditUser->isEmpty()) { /** * audits.payload is switched to default true * in order to auto audit payload for all endpoints */ $pattern = $route->getLabel('audits.payload', true); if (! empty($pattern)) { - $queueForAudits->setPayload($responsePayload); + $auditContext->payload = $responsePayload; } - foreach ($queueForEvents->getParams() as $key => $value) { - $queueForAudits->setParam($key, $value); - } - - $queueForAudits->trigger(); + $publisherForAudits->enqueue(AuditMessage::fromContext($auditContext)); } if (! empty($queueForDeletes->getType())) { diff --git a/app/init/resources.php b/app/init/resources.php index 32d6e0a45f..d1bb7584bf 100644 --- a/app/init/resources.php +++ b/app/init/resources.php @@ -1,8 +1,11 @@ set('publisherMessaging', function (Publisher $publisher) { $container->set('publisherWebhooks', function (Publisher $publisher) { return $publisher; }, ['publisher']); +$container->set('publisherForAudits', fn (Publisher $publisher) => new AuditPublisher( + $publisher, + new Queue(System::getEnv('_APP_AUDITS_QUEUE_NAME', Event::AUDITS_QUEUE_NAME)) +), ['publisher']); +$container->set('publisherForCertificates', fn (Publisher $publisher) => new CertificatePublisher( + $publisher, + new Queue(System::getEnv('_APP_CERTIFICATES_QUEUE_NAME', Event::CERTIFICATES_QUEUE_NAME)) +), ['publisher']); +$container->set('publisherForScreenshots', fn (Publisher $publisher) => new ScreenshotPublisher( + $publisher, + new Queue(System::getEnv('_APP_SCREENSHOTS_QUEUE_NAME', Event::SCREENSHOTS_QUEUE_NAME)) +), ['publisher']); $container->set('publisherForUsage', fn (Publisher $publisher) => new UsagePublisher( $publisher, new Queue(System::getEnv('_APP_STATS_USAGE_QUEUE_NAME', Event::STATS_USAGE_QUEUE_NAME)) diff --git a/app/init/resources/request.php b/app/init/resources/request.php index 63e58e92f7..3f6196c460 100644 --- a/app/init/resources/request.php +++ b/app/init/resources/request.php @@ -4,9 +4,8 @@ use Ahc\Jwt\JWT; use Ahc\Jwt\JWTException; use Appwrite\Auth\Key; use Appwrite\Databases\TransactionState; -use Appwrite\Event\Audit as AuditEvent; use Appwrite\Event\Build; -use Appwrite\Event\Certificate; +use Appwrite\Event\Context\Audit as AuditContext; use Appwrite\Event\Database as EventDatabase; use Appwrite\Event\Delete; use Appwrite\Event\Event; @@ -14,7 +13,6 @@ use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; use Appwrite\Event\Realtime; -use Appwrite\Event\Screenshot; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception; use Appwrite\Functions\EventProcessor; @@ -128,9 +126,6 @@ return function (Container $container): void { $container->set('queueForBuilds', function (Publisher $publisher) { return new Build($publisher); }, ['publisher']); - $container->set('queueForScreenshots', function (Publisher $publisher) { - return new Screenshot($publisher); - }, ['publisher']); $container->set('queueForDatabase', function (Publisher $publisher) { return new EventDatabase($publisher); }, ['publisher']); @@ -149,18 +144,13 @@ return function (Container $container): void { $container->set('usage', function () { return new UsageContext(); }, []); - $container->set('queueForAudits', function (Publisher $publisher) { - return new AuditEvent($publisher); - }, ['publisher']); + $container->set('auditContext', fn () => new AuditContext(), []); $container->set('queueForFunctions', function (Publisher $publisher) { return new Func($publisher); }, ['publisher']); $container->set('eventProcessor', function () { return new EventProcessor(); }, []); - $container->set('queueForCertificates', function (Publisher $publisher) { - return new Certificate($publisher); - }, ['publisher']); $container->set('dbForPlatform', function (Group $pools, Cache $cache, Authorization $authorization) { $adapter = new DatabasePool($pools->get('console')); $database = new Database($adapter, $cache); diff --git a/app/init/worker/message.php b/app/init/worker/message.php index f893c84858..c505d4cb3a 100644 --- a/app/init/worker/message.php +++ b/app/init/worker/message.php @@ -1,8 +1,6 @@ set('queueForScreenshots', function (Publisher $publisher) { - return new Screenshot($publisher); - }, ['publisher']); - $container->set('queueForDeletes', function (Publisher $publisher) { return new Delete($publisher); }, ['publisher']); @@ -323,10 +316,6 @@ return function (Container $container): void { return new Event($publisher); }, ['publisher']); - $container->set('queueForAudits', function (Publisher $publisher) { - return new Audit($publisher); - }, ['publisher']); - $container->set('queueForWebhooks', function (Publisher $publisher) { return new Webhook($publisher); }, ['publisher']); @@ -339,10 +328,6 @@ return function (Container $container): void { return new Realtime(); }, []); - $container->set('queueForCertificates', function (Publisher $publisher) { - return new Certificate($publisher); - }, ['publisher']); - $container->set('deviceForSites', function (Document $project, Telemetry $telemetry) { return new TelemetryDevice($telemetry, getDevice(APP_STORAGE_SITES . '/app-' . $project->getId())); }, ['project', 'telemetry']); diff --git a/src/Appwrite/Event/Context/Audit.php b/src/Appwrite/Event/Context/Audit.php new file mode 100644 index 0000000000..1d41890476 --- /dev/null +++ b/src/Appwrite/Event/Context/Audit.php @@ -0,0 +1,34 @@ +project === null + && $this->user === null + && $this->mode === '' + && $this->userAgent === '' + && $this->ip === '' + && $this->hostname === '' + && $this->event === '' + && $this->resource === '' + && $this->payload === []; + } +} diff --git a/src/Appwrite/Event/Message/Audit.php b/src/Appwrite/Event/Message/Audit.php new file mode 100644 index 0000000000..ae5831c3b9 --- /dev/null +++ b/src/Appwrite/Event/Message/Audit.php @@ -0,0 +1,71 @@ + [ + '$id' => $this->project->getId(), + '$sequence' => $this->project->getSequence(), + 'database' => $this->project->getAttribute('database', ''), + ], + 'user' => $this->user->getArrayCopy(), + 'payload' => $this->payload, + 'resource' => $this->resource, + 'mode' => $this->mode, + 'ip' => $this->ip, + 'userAgent' => $this->userAgent, + 'event' => $this->event, + 'hostname' => $this->hostname, + ]; + } + + public static function fromArray(array $data): static + { + return new self( + event: $data['event'] ?? '', + payload: $data['payload'] ?? [], + project: new Document($data['project'] ?? []), + user: new Document($data['user'] ?? []), + resource: $data['resource'] ?? '', + mode: $data['mode'] ?? '', + ip: $data['ip'] ?? '', + userAgent: $data['userAgent'] ?? '', + hostname: $data['hostname'] ?? '', + ); + } + + public static function fromContext(AuditContext $context): static + { + return new self( + event: $context->event, + payload: $context->payload, + project: $context->project ?? new Document(), + user: $context->user ?? new Document(), + resource: $context->resource, + mode: $context->mode, + ip: $context->ip, + userAgent: $context->userAgent, + hostname: $context->hostname, + ); + } +} diff --git a/src/Appwrite/Event/Message/Certificate.php b/src/Appwrite/Event/Message/Certificate.php new file mode 100644 index 0000000000..a189bb8187 --- /dev/null +++ b/src/Appwrite/Event/Message/Certificate.php @@ -0,0 +1,43 @@ + [ + '$id' => $this->project->getId(), + '$sequence' => $this->project->getSequence(), + 'database' => $this->project->getAttribute('database', ''), + ], + 'domain' => $this->domain->getArrayCopy(), + 'skipRenewCheck' => $this->skipRenewCheck, + 'validationDomain' => $this->validationDomain, + 'action' => $this->action, + ]; + } + + public static function fromArray(array $data): static + { + return new self( + project: new Document($data['project'] ?? []), + domain: new Document($data['domain'] ?? []), + skipRenewCheck: $data['skipRenewCheck'] ?? false, + validationDomain: $data['validationDomain'] ?? null, + action: $data['action'] ?? \Appwrite\Event\Certificate::ACTION_GENERATION, + ); + } +} diff --git a/src/Appwrite/Event/Message/Screenshot.php b/src/Appwrite/Event/Message/Screenshot.php new file mode 100644 index 0000000000..a06cdfbfc0 --- /dev/null +++ b/src/Appwrite/Event/Message/Screenshot.php @@ -0,0 +1,34 @@ + [ + '$id' => $this->project->getId(), + '$sequence' => $this->project->getSequence(), + 'database' => $this->project->getAttribute('database', ''), + ], + 'deploymentId' => $this->deploymentId, + ]; + } + + public static function fromArray(array $data): static + { + return new self( + project: new Document($data['project'] ?? []), + deploymentId: $data['deploymentId'] ?? '', + ); + } +} diff --git a/src/Appwrite/Event/Publisher/Audit.php b/src/Appwrite/Event/Publisher/Audit.php new file mode 100644 index 0000000000..daa9a01fce --- /dev/null +++ b/src/Appwrite/Event/Publisher/Audit.php @@ -0,0 +1,35 @@ +publish($this->queue, $message); + } catch (\Throwable $th) { + Console::error('[Audit] Failed to publish audit message: ' . $th->getMessage()); + + return false; + } + } + + public function getSize(bool $failed = false): int + { + return $this->getQueueSize($this->queue, $failed); + } +} diff --git a/src/Appwrite/Event/Publisher/Certificate.php b/src/Appwrite/Event/Publisher/Certificate.php new file mode 100644 index 0000000000..472fb0d701 --- /dev/null +++ b/src/Appwrite/Event/Publisher/Certificate.php @@ -0,0 +1,27 @@ +publish($this->queue, $message); + } + + public function getSize(bool $failed = false): int + { + return $this->getQueueSize($this->queue, $failed); + } +} diff --git a/src/Appwrite/Event/Publisher/Screenshot.php b/src/Appwrite/Event/Publisher/Screenshot.php new file mode 100644 index 0000000000..2a0fa1e0f8 --- /dev/null +++ b/src/Appwrite/Event/Publisher/Screenshot.php @@ -0,0 +1,27 @@ +publish($this->queue, $message); + } + + public function getSize(bool $failed = false): int + { + return $this->getQueueSize($this->queue, $failed); + } +} diff --git a/src/Appwrite/Platform/Modules/Functions/Workers/Builds.php b/src/Appwrite/Platform/Modules/Functions/Workers/Builds.php index c6c4a0b38c..0071b03d2d 100644 --- a/src/Appwrite/Platform/Modules/Functions/Workers/Builds.php +++ b/src/Appwrite/Platform/Modules/Functions/Workers/Builds.php @@ -6,9 +6,9 @@ use Ahc\Jwt\JWT; use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Message\Usage as UsageMessage; +use Appwrite\Event\Publisher\Screenshot; use Appwrite\Event\Publisher\Usage as UsagePublisher; use Appwrite\Event\Realtime; -use Appwrite\Event\Screenshot; use Appwrite\Event\Webhook; use Appwrite\Filter\BranchDomain as BranchDomainFilter; use Appwrite\Usage\Context; @@ -58,7 +58,7 @@ class Builds extends Action ->inject('project') ->inject('dbForPlatform') ->inject('queueForEvents') - ->inject('queueForScreenshots') + ->inject('publisherForScreenshots') ->inject('queueForWebhooks') ->inject('queueForFunctions') ->inject('queueForRealtime') @@ -84,7 +84,7 @@ class Builds extends Action Document $project, Database $dbForPlatform, Event $queueForEvents, - Screenshot $queueForScreenshots, + Screenshot $publisherForScreenshots, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, @@ -126,7 +126,7 @@ class Builds extends Action $deviceForFunctions, $deviceForSites, $deviceForFiles, - $queueForScreenshots, + $publisherForScreenshots, $queueForWebhooks, $queueForFunctions, $queueForRealtime, @@ -161,7 +161,7 @@ class Builds extends Action Device $deviceForFunctions, Device $deviceForSites, Device $deviceForFiles, - Screenshot $queueForScreenshots, + Screenshot $publisherForScreenshots, Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, @@ -1120,10 +1120,10 @@ class Builds extends Action /** Screenshot site */ if ($resource->getCollection() === 'sites') { - $queueForScreenshots - ->setDeploymentId($deployment->getId()) - ->setProject($project) - ->trigger(); + $publisherForScreenshots->enqueue(new \Appwrite\Event\Message\Screenshot( + project: $project, + deploymentId: $deployment->getId(), + )); Console::log('Site screenshot queued'); } diff --git a/src/Appwrite/Platform/Modules/Functions/Workers/Screenshots.php b/src/Appwrite/Platform/Modules/Functions/Workers/Screenshots.php index 065fe477eb..423bf0bd41 100644 --- a/src/Appwrite/Platform/Modules/Functions/Workers/Screenshots.php +++ b/src/Appwrite/Platform/Modules/Functions/Workers/Screenshots.php @@ -3,6 +3,7 @@ namespace Appwrite\Platform\Modules\Functions\Workers; use Ahc\Jwt\JWT; +use Appwrite\Event\Message\Screenshot; use Appwrite\Event\Realtime; use Appwrite\Permission; use Appwrite\Role; @@ -62,9 +63,11 @@ class Screenshots extends Action throw new \Exception('Missing payload'); } + $screenshotMessage = Screenshot::fromArray($payload); + Console::log('Site screenshot started'); - $deploymentId = $payload['deploymentId'] ?? null; + $deploymentId = $screenshotMessage->deploymentId; $deployment = $dbForProject->getDocument('deployments', $deploymentId); if ($deployment->isEmpty()) { diff --git a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Audits/Get.php b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Audits/Get.php index e01e89641d..76c34a0a2a 100644 --- a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Audits/Get.php +++ b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Audits/Get.php @@ -2,7 +2,7 @@ namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Audits; -use Appwrite\Event\Audit; +use Appwrite\Event\Publisher\Audit; use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -42,16 +42,16 @@ class Get extends Base contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queueForAudits') + ->inject('publisherForAudits') ->inject('response') ->callback($this->action(...)); } - public function action(int|string $threshold, Audit $queueForAudits, Response $response): void + public function action(int|string $threshold, Audit $publisherForAudits, Response $response): void { $threshold = (int) $threshold; - $size = $queueForAudits->getSize(); + $size = $publisherForAudits->getSize(); $this->assertQueueThreshold($size, $threshold); diff --git a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Certificates/Get.php b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Certificates/Get.php index 6724f25094..82c45db172 100644 --- a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Certificates/Get.php +++ b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Certificates/Get.php @@ -2,7 +2,7 @@ namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Certificates; -use Appwrite\Event\Certificate; +use Appwrite\Event\Publisher\Certificate; use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -42,16 +42,16 @@ class Get extends Base contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queueForCertificates') + ->inject('publisherForCertificates') ->inject('response') ->callback($this->action(...)); } - public function action(int|string $threshold, Certificate $queueForCertificates, Response $response): void + public function action(int|string $threshold, Certificate $publisherForCertificates, Response $response): void { $threshold = (int) $threshold; - $size = $queueForCertificates->getSize(); + $size = $publisherForCertificates->getSize(); $this->assertQueueThreshold($size, $threshold); diff --git a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php index 1f7cc0bf33..6d77cc6e16 100644 --- a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php +++ b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Failed/Get.php @@ -2,19 +2,19 @@ namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Failed; -use Appwrite\Event\Audit; use Appwrite\Event\Build; -use Appwrite\Event\Certificate; use Appwrite\Event\Database; use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; use Appwrite\Event\Messaging; +use Appwrite\Event\Publisher\Audit; +use Appwrite\Event\Publisher\Certificate; use Appwrite\Event\Publisher\Migration as MigrationPublisher; +use Appwrite\Event\Publisher\Screenshot; use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher; use Appwrite\Event\Publisher\Usage as UsagePublisher; -use Appwrite\Event\Screenshot; use Appwrite\Event\Webhook; use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base; use Appwrite\SDK\AuthType; @@ -75,17 +75,17 @@ class Get extends Base ->inject('response') ->inject('queueForDatabase') ->inject('queueForDeletes') - ->inject('queueForAudits') + ->inject('publisherForAudits') ->inject('queueForMails') ->inject('queueForFunctions') ->inject('publisherForStatsResources') ->inject('publisherForUsage') ->inject('queueForWebhooks') - ->inject('queueForCertificates') + ->inject('publisherForCertificates') ->inject('queueForBuilds') ->inject('queueForMessaging') ->inject('publisherForMigrations') - ->inject('queueForScreenshots') + ->inject('publisherForScreenshots') ->callback($this->action(...)); } @@ -95,32 +95,32 @@ class Get extends Base Response $response, Database $queueForDatabase, Delete $queueForDeletes, - Audit $queueForAudits, + Audit $publisherForAudits, Mail $queueForMails, Func $queueForFunctions, StatsResourcesPublisher $publisherForStatsResources, UsagePublisher $publisherForUsage, Webhook $queueForWebhooks, - Certificate $queueForCertificates, + Certificate $publisherForCertificates, Build $queueForBuilds, Messaging $queueForMessaging, MigrationPublisher $publisherForMigrations, - Screenshot $queueForScreenshots, + Screenshot $publisherForScreenshots, ): void { $threshold = (int) $threshold; $queue = match ($name) { System::getEnv('_APP_DATABASE_QUEUE_NAME', Event::DATABASE_QUEUE_NAME) => $queueForDatabase, System::getEnv('_APP_DELETE_QUEUE_NAME', Event::DELETE_QUEUE_NAME) => $queueForDeletes, - System::getEnv('_APP_AUDITS_QUEUE_NAME', Event::AUDITS_QUEUE_NAME) => $queueForAudits, + System::getEnv('_APP_AUDITS_QUEUE_NAME', Event::AUDITS_QUEUE_NAME) => $publisherForAudits, System::getEnv('_APP_MAILS_QUEUE_NAME', Event::MAILS_QUEUE_NAME) => $queueForMails, System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME) => $queueForFunctions, System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME) => $publisherForStatsResources, System::getEnv('_APP_STATS_USAGE_QUEUE_NAME', Event::STATS_USAGE_QUEUE_NAME) => $publisherForUsage, System::getEnv('_APP_WEBHOOK_QUEUE_NAME', Event::WEBHOOK_QUEUE_NAME) => $queueForWebhooks, - System::getEnv('_APP_CERTIFICATES_QUEUE_NAME', Event::CERTIFICATES_QUEUE_NAME) => $queueForCertificates, + System::getEnv('_APP_CERTIFICATES_QUEUE_NAME', Event::CERTIFICATES_QUEUE_NAME) => $publisherForCertificates, System::getEnv('_APP_BUILDS_QUEUE_NAME', Event::BUILDS_QUEUE_NAME) => $queueForBuilds, - System::getEnv('_APP_SCREENSHOTS_QUEUE_NAME', Event::SCREENSHOTS_QUEUE_NAME) => $queueForScreenshots, + System::getEnv('_APP_SCREENSHOTS_QUEUE_NAME', Event::SCREENSHOTS_QUEUE_NAME) => $publisherForScreenshots, System::getEnv('_APP_MESSAGING_QUEUE_NAME', Event::MESSAGING_QUEUE_NAME) => $queueForMessaging, System::getEnv('_APP_MIGRATIONS_QUEUE_NAME', Event::MIGRATIONS_QUEUE_NAME) => $publisherForMigrations, }; diff --git a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Logs/Get.php b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Logs/Get.php index dd05aebc39..0a655662de 100644 --- a/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Logs/Get.php +++ b/src/Appwrite/Platform/Modules/Health/Http/Health/Queue/Logs/Get.php @@ -2,7 +2,7 @@ namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Logs; -use Appwrite\Event\Audit; +use Appwrite\Event\Publisher\Audit; use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base; use Appwrite\SDK\AuthType; use Appwrite\SDK\ContentType; @@ -42,16 +42,16 @@ class Get extends Base contentType: ContentType::JSON )) ->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true) - ->inject('queueForAudits') + ->inject('publisherForAudits') ->inject('response') ->callback($this->action(...)); } - public function action(int|string $threshold, Audit $queueForAudits, Response $response): void + public function action(int|string $threshold, Audit $publisherForAudits, Response $response): void { $threshold = (int) $threshold; - $size = $queueForAudits->getSize(); + $size = $publisherForAudits->getSize(); $this->assertQueueThreshold($size, $threshold); diff --git a/src/Appwrite/Platform/Modules/Proxy/Http/Rules/API/Create.php b/src/Appwrite/Platform/Modules/Proxy/Http/Rules/API/Create.php index bfa62ef920..a6a3e44194 100644 --- a/src/Appwrite/Platform/Modules/Proxy/Http/Rules/API/Create.php +++ b/src/Appwrite/Platform/Modules/Proxy/Http/Rules/API/Create.php @@ -2,8 +2,8 @@ namespace Appwrite\Platform\Modules\Proxy\Http\Rules\API; -use Appwrite\Event\Certificate; use Appwrite\Event\Event; +use Appwrite\Event\Publisher\Certificate; use Appwrite\Extend\Exception; use Appwrite\Platform\Modules\Proxy\Action; use Appwrite\SDK\AuthType; @@ -62,7 +62,7 @@ class Create extends Action ->param('domain', null, new ValidatorDomain(), 'Domain name.') ->inject('response') ->inject('project') - ->inject('queueForCertificates') + ->inject('publisherForCertificates') ->inject('queueForEvents') ->inject('dbForPlatform') ->inject('platform') @@ -70,7 +70,7 @@ class Create extends Action ->callback($this->action(...)); } - public function action(string $domain, Response $response, Document $project, Certificate $queueForCertificates, Event $queueForEvents, Database $dbForPlatform, array $platform, Log $log) + public function action(string $domain, Response $response, Document $project, Certificate $publisherForCertificates, Event $queueForEvents, Database $dbForPlatform, array $platform, Log $log) { $this->validateDomainRestrictions($domain, $platform); @@ -114,13 +114,14 @@ class Create extends Action } if ($rule->getAttribute('status', '') === RULE_STATUS_CERTIFICATE_GENERATING) { - $queueForCertificates - ->setDomain(new Document([ + $publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate( + project: $project, + domain: new Document([ 'domain' => $rule->getAttribute('domain'), 'domainType' => $rule->getAttribute('deploymentResourceType', $rule->getAttribute('type')), - ])) - ->setAction(Certificate::ACTION_GENERATION) - ->trigger(); + ]), + action: \Appwrite\Event\Certificate::ACTION_GENERATION, + )); } $queueForEvents->setParam('ruleId', $rule->getId()); diff --git a/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Function/Create.php b/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Function/Create.php index a61ce80c4b..4a8bd4897e 100644 --- a/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Function/Create.php +++ b/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Function/Create.php @@ -2,8 +2,8 @@ namespace Appwrite\Platform\Modules\Proxy\Http\Rules\Function; -use Appwrite\Event\Certificate; use Appwrite\Event\Event; +use Appwrite\Event\Publisher\Certificate; use Appwrite\Extend\Exception; use Appwrite\Platform\Modules\Proxy\Action; use Appwrite\SDK\AuthType; @@ -66,7 +66,7 @@ class Create extends Action ->param('branch', '', new Text(255, 0), 'Name of VCS branch to deploy changes automatically', true) ->inject('response') ->inject('project') - ->inject('queueForCertificates') + ->inject('publisherForCertificates') ->inject('queueForEvents') ->inject('dbForPlatform') ->inject('dbForProject') @@ -75,7 +75,7 @@ class Create extends Action ->callback($this->action(...)); } - public function action(string $domain, string $functionId, string $branch, Response $response, Document $project, Certificate $queueForCertificates, Event $queueForEvents, Database $dbForPlatform, Database $dbForProject, array $platform, Log $log) + public function action(string $domain, string $functionId, string $branch, Response $response, Document $project, Certificate $publisherForCertificates, Event $queueForEvents, Database $dbForPlatform, Database $dbForProject, array $platform, Log $log) { $this->validateDomainRestrictions($domain, $platform); @@ -132,13 +132,14 @@ class Create extends Action } if ($rule->getAttribute('status', '') === RULE_STATUS_CERTIFICATE_GENERATING) { - $queueForCertificates - ->setDomain(new Document([ + $publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate( + project: $project, + domain: new Document([ 'domain' => $rule->getAttribute('domain'), 'domainType' => $rule->getAttribute('deploymentResourceType', $rule->getAttribute('type')), - ])) - ->setAction(Certificate::ACTION_GENERATION) - ->trigger(); + ]), + action: \Appwrite\Event\Certificate::ACTION_GENERATION, + )); } $queueForEvents->setParam('ruleId', $rule->getId()); diff --git a/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Redirect/Create.php b/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Redirect/Create.php index 95c29f48e8..8a265ba5bb 100644 --- a/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Redirect/Create.php +++ b/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Redirect/Create.php @@ -2,8 +2,8 @@ namespace Appwrite\Platform\Modules\Proxy\Http\Rules\Redirect; -use Appwrite\Event\Certificate; use Appwrite\Event\Event; +use Appwrite\Event\Publisher\Certificate; use Appwrite\Extend\Exception; use Appwrite\Platform\Modules\Proxy\Action; use Appwrite\SDK\AuthType; @@ -69,7 +69,7 @@ class Create extends Action ->param('resourceType', '', new WhiteList(['site', 'function']), 'Type of parent resource.') ->inject('response') ->inject('project') - ->inject('queueForCertificates') + ->inject('publisherForCertificates') ->inject('queueForEvents') ->inject('dbForPlatform') ->inject('dbForProject') @@ -78,7 +78,7 @@ class Create extends Action ->callback($this->action(...)); } - public function action(string $domain, string $url, int $statusCode, string $resourceId, string $resourceType, Response $response, Document $project, Certificate $queueForCertificates, Event $queueForEvents, Database $dbForPlatform, Database $dbForProject, array $platform, Log $log) + public function action(string $domain, string $url, int $statusCode, string $resourceId, string $resourceType, Response $response, Document $project, Certificate $publisherForCertificates, Event $queueForEvents, Database $dbForPlatform, Database $dbForProject, array $platform, Log $log) { $this->validateDomainRestrictions($domain, $platform); @@ -136,13 +136,14 @@ class Create extends Action } if ($rule->getAttribute('status', '') === RULE_STATUS_CERTIFICATE_GENERATING) { - $queueForCertificates - ->setDomain(new Document([ + $publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate( + project: $project, + domain: new Document([ 'domain' => $rule->getAttribute('domain'), 'domainType' => $rule->getAttribute('deploymentResourceType', $rule->getAttribute('type')), - ])) - ->setAction(Certificate::ACTION_GENERATION) - ->trigger(); + ]), + action: \Appwrite\Event\Certificate::ACTION_GENERATION, + )); } $queueForEvents->setParam('ruleId', $rule->getId()); diff --git a/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Site/Create.php b/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Site/Create.php index ba99cefb42..a9dfa93a49 100644 --- a/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Site/Create.php +++ b/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Site/Create.php @@ -2,8 +2,8 @@ namespace Appwrite\Platform\Modules\Proxy\Http\Rules\Site; -use Appwrite\Event\Certificate; use Appwrite\Event\Event; +use Appwrite\Event\Publisher\Certificate; use Appwrite\Extend\Exception; use Appwrite\Platform\Modules\Proxy\Action; use Appwrite\SDK\AuthType; @@ -66,7 +66,7 @@ class Create extends Action ->param('branch', '', new Text(255, 0), 'Name of VCS branch to deploy changes automatically', true) ->inject('response') ->inject('project') - ->inject('queueForCertificates') + ->inject('publisherForCertificates') ->inject('queueForEvents') ->inject('dbForPlatform') ->inject('dbForProject') @@ -75,7 +75,7 @@ class Create extends Action ->callback($this->action(...)); } - public function action(string $domain, string $siteId, ?string $branch, Response $response, Document $project, Certificate $queueForCertificates, Event $queueForEvents, Database $dbForPlatform, Database $dbForProject, array $platform, Log $log) + public function action(string $domain, string $siteId, ?string $branch, Response $response, Document $project, Certificate $publisherForCertificates, Event $queueForEvents, Database $dbForPlatform, Database $dbForProject, array $platform, Log $log) { $this->validateDomainRestrictions($domain, $platform); @@ -132,13 +132,14 @@ class Create extends Action } if ($rule->getAttribute('status', '') === RULE_STATUS_CERTIFICATE_GENERATING) { - $queueForCertificates - ->setDomain(new Document([ + $publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate( + project: $project, + domain: new Document([ 'domain' => $rule->getAttribute('domain'), 'domainType' => $rule->getAttribute('deploymentResourceType', $rule->getAttribute('type')), - ])) - ->setAction(Certificate::ACTION_GENERATION) - ->trigger(); + ]), + action: \Appwrite\Event\Certificate::ACTION_GENERATION, + )); } $queueForEvents->setParam('ruleId', $rule->getId()); diff --git a/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Verification/Update.php b/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Verification/Update.php index 8a0d341132..9e81f6ff18 100644 --- a/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Verification/Update.php +++ b/src/Appwrite/Platform/Modules/Proxy/Http/Rules/Verification/Update.php @@ -2,8 +2,8 @@ namespace Appwrite\Platform\Modules\Proxy\Http\Rules\Verification; -use Appwrite\Event\Certificate; use Appwrite\Event\Event; +use Appwrite\Event\Publisher\Certificate; use Appwrite\Extend\Exception; use Appwrite\Platform\Modules\Proxy\Action; use Appwrite\SDK\AuthType; @@ -56,7 +56,7 @@ class Update extends Action )) ->param('ruleId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'Rule ID.', false, ['dbForProject']) ->inject('response') - ->inject('queueForCertificates') + ->inject('publisherForCertificates') ->inject('queueForEvents') ->inject('project') ->inject('dbForPlatform') @@ -67,7 +67,7 @@ class Update extends Action public function action( string $ruleId, Response $response, - Certificate $queueForCertificates, + Certificate $publisherForCertificates, Event $queueForEvents, Document $project, Database $dbForPlatform, @@ -110,12 +110,13 @@ class Update extends Action } // Issue a TLS certificate when DNS verification is successful - $queueForCertificates - ->setDomain(new Document([ + $publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate( + project: $project, + domain: new Document([ 'domain' => $rule->getAttribute('domain'), 'domainType' => $rule->getAttribute('deploymentResourceType', $rule->getAttribute('type')), - ])) - ->trigger(); + ]), + )); if (!empty($certificate)) { $rule->setAttribute('renewAt', $certificate->getAttribute('renewDate', '')); diff --git a/src/Appwrite/Platform/Tasks/Interval.php b/src/Appwrite/Platform/Tasks/Interval.php index a7d16e0a52..f5502a5986 100644 --- a/src/Appwrite/Platform/Tasks/Interval.php +++ b/src/Appwrite/Platform/Tasks/Interval.php @@ -2,7 +2,7 @@ namespace Appwrite\Platform\Tasks; -use Appwrite\Event\Certificate; +use Appwrite\Event\Publisher\Certificate; use DateTime; use Swoole\Coroutine\Channel; use Swoole\Process; @@ -29,16 +29,16 @@ class Interval extends Action ->desc('Schedules tasks on regular intervals by publishing them to our queues') ->inject('dbForPlatform') ->inject('getProjectDB') - ->inject('queueForCertificates') + ->inject('publisherForCertificates') ->callback($this->action(...)); } - public function action(Database $dbForPlatform, callable $getProjectDB, Certificate $queueForCertificates): void + public function action(Database $dbForPlatform, callable $getProjectDB, Certificate $publisherForCertificates): void { Console::title('Interval V1'); Console::success(APP_NAME . ' interval process v1 has started'); - $timers = $this->runTasks($dbForPlatform, $getProjectDB, $queueForCertificates); + $timers = $this->runTasks($dbForPlatform, $getProjectDB, $publisherForCertificates); $chan = new Channel(1); Process::signal(SIGTERM, function () use ($chan) { @@ -52,16 +52,16 @@ class Interval extends Action } } - public function runTasks(Database $dbForPlatform, callable $getProjectDB, Certificate $queueForCertificates): array + public function runTasks(Database $dbForPlatform, callable $getProjectDB, Certificate $publisherForCertificates): array { $timers = []; $tasks = $this->getTasks(); foreach ($tasks as $task) { - $timers[] = Timer::tick($task['interval'], function () use ($task, $dbForPlatform, $getProjectDB, $queueForCertificates) { + $timers[] = Timer::tick($task['interval'], function () use ($task, $dbForPlatform, $getProjectDB, $publisherForCertificates) { $taskName = $task['name']; Span::init("interval.{$taskName}"); try { - $task['callback']($dbForPlatform, $getProjectDB, $queueForCertificates); + $task['callback']($dbForPlatform, $getProjectDB, $publisherForCertificates); } catch (\Exception $e) { Span::error($e); } finally { @@ -80,15 +80,15 @@ class Interval extends Action return [ [ 'name' => 'domainVerification', - "callback" => function (Database $dbForPlatform, callable $getProjectDB, Certificate $queueForCertificates) { - $this->verifyDomain($dbForPlatform, $queueForCertificates); + "callback" => function (Database $dbForPlatform, callable $getProjectDB, Certificate $publisherForCertificates) { + $this->verifyDomain($dbForPlatform, $publisherForCertificates); }, 'interval' => $intervalDomainVerification * 1000, ] ]; } - private function verifyDomain(Database $dbForPlatform, Certificate $queueForCertificates): void + private function verifyDomain(Database $dbForPlatform, Certificate $publisherForCertificates): void { $time = DatabaseDateTime::now(); $fromTime = new DateTime('-3 days'); // Max 3 days old @@ -115,13 +115,17 @@ class Interval extends Action foreach ($rules as $rule) { try { - $queueForCertificates - ->setDomain(new Document([ + $publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate( + project: new Document([ + '$id' => $rule->getAttribute('projectId', ''), + '$sequence' => $rule->getAttribute('projectInternalId', 0), + ]), + domain: new Document([ 'domain' => $rule->getAttribute('domain'), 'domainType' => $rule->getAttribute('deploymentResourceType', $rule->getAttribute('type')), - ])) - ->setAction(Certificate::ACTION_DOMAIN_VERIFICATION) - ->trigger(); + ]), + action: \Appwrite\Event\Certificate::ACTION_DOMAIN_VERIFICATION, + )); $processed++; } catch (\Throwable $th) { $failed++; diff --git a/src/Appwrite/Platform/Tasks/Maintenance.php b/src/Appwrite/Platform/Tasks/Maintenance.php index c821435786..fe803f1292 100644 --- a/src/Appwrite/Platform/Tasks/Maintenance.php +++ b/src/Appwrite/Platform/Tasks/Maintenance.php @@ -2,8 +2,8 @@ namespace Appwrite\Platform\Tasks; -use Appwrite\Event\Certificate; use Appwrite\Event\Delete; +use Appwrite\Event\Publisher\Certificate; use DateInterval; use DateTime; use Utopia\Console; @@ -29,12 +29,12 @@ class Maintenance extends Action ->param('type', 'loop', new WhiteList(['loop', 'trigger']), 'How to run task. "loop" is meant for container entrypoint, and "trigger" for manual execution.') ->inject('dbForPlatform') ->inject('console') - ->inject('queueForCertificates') + ->inject('publisherForCertificates') ->inject('queueForDeletes') ->callback($this->action(...)); } - public function action(string $type, Database $dbForPlatform, Document $console, Certificate $queueForCertificates, Delete $queueForDeletes): void + public function action(string $type, Database $dbForPlatform, Document $console, Certificate $publisherForCertificates, Delete $queueForDeletes): void { Console::title('Maintenance V1'); Console::success(APP_NAME . ' maintenance process v1 has started'); @@ -59,7 +59,7 @@ class Maintenance extends Action $delay = $next->getTimestamp() - $now->getTimestamp(); } - $action = function () use ($interval, $cacheRetention, $schedulesDeletionRetention, $usageStatsRetentionHourly, $dbForPlatform, $console, $queueForDeletes, $queueForCertificates) { + $action = function () use ($interval, $cacheRetention, $schedulesDeletionRetention, $usageStatsRetentionHourly, $dbForPlatform, $console, $queueForDeletes, $publisherForCertificates) { $time = DatabaseDateTime::now(); Console::info("[{$time}] Notifying workers with maintenance tasks every {$interval} seconds"); @@ -92,7 +92,7 @@ class Maintenance extends Action ->trigger(); $this->notifyDeleteConnections($queueForDeletes); - $this->renewCertificates($dbForPlatform, $queueForCertificates); + $this->renewCertificates($dbForPlatform, $publisherForCertificates); $this->notifyDeleteCache($cacheRetention, $queueForDeletes); $this->notifyDeleteSchedules($schedulesDeletionRetention, $queueForDeletes); $this->notifyDeleteCSVExports($queueForDeletes); @@ -124,7 +124,7 @@ class Maintenance extends Action ->trigger(); } - private function renewCertificates(Database $dbForPlatform, Certificate $queueForCertificate): void + private function renewCertificates(Database $dbForPlatform, Certificate $publisherForCertificate): void { $time = DatabaseDateTime::now(); @@ -158,13 +158,17 @@ class Maintenance extends Action continue; } - $queueForCertificate - ->setDomain(new Document([ + $publisherForCertificate->enqueue(new \Appwrite\Event\Message\Certificate( + project: new Document([ + '$id' => $rule->getAttribute('projectId', ''), + '$sequence' => $rule->getAttribute('projectInternalId', 0), + ]), + domain: new Document([ 'domain' => $rule->getAttribute('domain'), 'domainType' => $rule->getAttribute('deploymentResourceType', $rule->getAttribute('type')), - ])) - ->setAction(Certificate::ACTION_GENERATION) - ->trigger(); + ]), + action: \Appwrite\Event\Certificate::ACTION_GENERATION, + )); } } diff --git a/src/Appwrite/Platform/Tasks/SSL.php b/src/Appwrite/Platform/Tasks/SSL.php index ef8283f168..cb33836a99 100644 --- a/src/Appwrite/Platform/Tasks/SSL.php +++ b/src/Appwrite/Platform/Tasks/SSL.php @@ -2,7 +2,7 @@ namespace Appwrite\Platform\Tasks; -use Appwrite\Event\Certificate; +use Appwrite\Event\Publisher\Certificate; use Utopia\Console; use Utopia\Database\Database; use Utopia\Database\Document; @@ -29,11 +29,11 @@ class SSL extends Action ->param('skip-check', 'true', new Boolean(true), 'If DNS and renew check should be skipped. Defaults to true, and when true, all jobs will result in certificate generation attempt.', true) ->inject('console') ->inject('dbForPlatform') - ->inject('queueForCertificates') + ->inject('publisherForCertificates') ->callback($this->action(...)); } - public function action(string $domain, bool|string $skipCheck, Document $console, Database $dbForPlatform, Certificate $queueForCertificates): void + public function action(string $domain, bool|string $skipCheck, Document $console, Database $dbForPlatform, Certificate $publisherForCertificates): void { $domain = new Domain(!empty($domain) ? $domain : ''); if (!$domain->isKnown() || $domain->isTest()) { @@ -98,12 +98,13 @@ class SSL extends Action Console::info('Updated existing rule ' . $rule->getId() . ' for domain: ' . $domain->get()); } - $queueForCertificates - ->setDomain(new Document([ - 'domain' => $domain->get() - ])) - ->setSkipRenewCheck($skipCheck) - ->trigger(); + $publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate( + project: $console, + domain: new Document([ + 'domain' => $domain->get(), + ]), + skipRenewCheck: $skipCheck, + )); Console::success('Scheduled a job to issue a TLS certificate for domain: ' . $domain->get()); } diff --git a/src/Appwrite/Platform/Workers/Audits.php b/src/Appwrite/Platform/Workers/Audits.php index 6bcc85bc36..e5a7950945 100644 --- a/src/Appwrite/Platform/Workers/Audits.php +++ b/src/Appwrite/Platform/Workers/Audits.php @@ -2,6 +2,7 @@ namespace Appwrite\Platform\Workers; +use Appwrite\Event\Message\Audit; use Exception; use Throwable; use Utopia\Console; @@ -40,7 +41,6 @@ class Audits extends Action $this ->desc('Audits worker') ->inject('message') - ->inject('project') ->inject('getAudit') ->callback($this->action(...)); @@ -50,14 +50,13 @@ class Audits extends Action /** * @param Message $message - * @param Document $project * @param callable(Document): \Utopia\Audit\Audit $getAudit * @return Commit|NoCommit * @throws Throwable * @throws \Utopia\Database\Exception * @throws Structure */ - public function action(Message $message, Document $project, callable $getAudit): Commit|NoCommit + public function action(Message $message, callable $getAudit): Commit|NoCommit { $payload = $message->getPayload() ?? []; @@ -65,19 +64,21 @@ class Audits extends Action throw new Exception('Missing payload'); } + $auditMessage = Audit::fromArray($payload); + Console::info('Aggregating audit logs'); - $event = $payload['event'] ?? ''; + $event = $auditMessage->event; $auditPayload = ''; - if ($project->getId() === 'console') { - $auditPayload = $payload['payload'] ?? ''; + if ($auditMessage->project->getId() === 'console') { + $auditPayload = $auditMessage->payload; } - $mode = $payload['mode'] ?? ''; - $resource = $payload['resource'] ?? ''; - $userAgent = $payload['userAgent'] ?? ''; - $ip = $payload['ip'] ?? ''; - $user = new Document($payload['user'] ?? []); + $mode = $auditMessage->mode; + $resource = $auditMessage->resource; + $userAgent = $auditMessage->userAgent; + $ip = $auditMessage->ip; + $user = $auditMessage->user; $impersonatorUserId = $user->getAttribute('impersonatorUserId'); $actorUserId = $impersonatorUserId ?: $user->getId(); @@ -126,14 +127,14 @@ class Audits extends Action ]; } - if (isset($this->logs[$project->getSequence()])) { - $this->logs[$project->getSequence()]['logs'][] = $eventData; + if (isset($this->logs[$auditMessage->project->getSequence()])) { + $this->logs[$auditMessage->project->getSequence()]['logs'][] = $eventData; } else { - $this->logs[$project->getSequence()] = [ + $this->logs[$auditMessage->project->getSequence()] = [ 'project' => new Document([ - '$id' => $project->getId(), - '$sequence' => $project->getSequence(), - 'database' => $project->getAttribute('database'), + '$id' => $auditMessage->project->getId(), + '$sequence' => $auditMessage->project->getSequence(), + 'database' => $auditMessage->project->getAttribute('database'), ]), 'logs' => [$eventData] ]; diff --git a/src/Appwrite/Platform/Workers/Certificates.php b/src/Appwrite/Platform/Workers/Certificates.php index 73509819a9..34234971d9 100644 --- a/src/Appwrite/Platform/Workers/Certificates.php +++ b/src/Appwrite/Platform/Workers/Certificates.php @@ -3,10 +3,10 @@ namespace Appwrite\Platform\Workers; use Appwrite\Certificates\Adapter as CertificatesAdapter; -use Appwrite\Event\Certificate; use Appwrite\Event\Event; use Appwrite\Event\Func; use Appwrite\Event\Mail; +use Appwrite\Event\Publisher\Certificate; use Appwrite\Event\Realtime; use Appwrite\Event\Webhook; use Appwrite\Extend\Exception as AppwriteException; @@ -55,7 +55,7 @@ class Certificates extends Action ->inject('queueForWebhooks') ->inject('queueForFunctions') ->inject('queueForRealtime') - ->inject('queueForCertificates') + ->inject('publisherForCertificates') ->inject('log') ->inject('certificates') ->inject('plan') @@ -71,7 +71,7 @@ class Certificates extends Action * @param Webhook $queueForWebhooks * @param Func $queueForFunctions * @param Realtime $queueForRealtime - * @param Certificate $queueForCertificates + * @param Certificate $publisherForCertificates * @param Log $log * @param CertificatesAdapter $certificates * @param array $plan @@ -88,7 +88,7 @@ class Certificates extends Action Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, - Certificate $queueForCertificates, + Certificate $publisherForCertificates, Log $log, CertificatesAdapter $certificates, array $plan, @@ -100,21 +100,22 @@ class Certificates extends Action throw new Exception('Missing payload'); } - $document = new Document($payload['domain'] ?? []); + $certificateMessage = \Appwrite\Event\Message\Certificate::fromArray($payload); + $document = $certificateMessage->domain; $domain = new Domain($document->getAttribute('domain', '')); $domainType = $document->getAttribute('domainType'); - $skipRenewCheck = $payload['skipRenewCheck'] ?? false; - $validationDomain = $payload['validationDomain'] ?? null; - $action = $payload['action'] ?? Certificate::ACTION_GENERATION; + $skipRenewCheck = $certificateMessage->skipRenewCheck; + $validationDomain = $certificateMessage->validationDomain; + $action = $certificateMessage->action; $log->addTag('domain', $domain->get()); switch ($action) { - case Certificate::ACTION_DOMAIN_VERIFICATION: - $this->handleDomainVerificationAction($domain, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForCertificates, $log, $authorization, $validationDomain); + case \Appwrite\Event\Certificate::ACTION_DOMAIN_VERIFICATION: + $this->handleDomainVerificationAction($domain, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $publisherForCertificates, $log, $authorization, $validationDomain); break; - case Certificate::ACTION_GENERATION: + case \Appwrite\Event\Certificate::ACTION_GENERATION: $this->handleCertificateGenerationAction($domain, $domainType, $dbForPlatform, $queueForMails, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $log, $certificates, $authorization, $skipRenewCheck, $plan, $validationDomain); break; @@ -130,7 +131,7 @@ class Certificates extends Action * @param Webhook $queueForWebhooks * @param Func $queueForFunctions * @param Realtime $queueForRealtime - * @param Certificate $queueForCertificates + * @param Certificate $publisherForCertificates * @param Log $log * @param ValidatorAuthorization $authorization * @param string|null $validationDomain @@ -146,7 +147,7 @@ class Certificates extends Action Webhook $queueForWebhooks, Func $queueForFunctions, Realtime $queueForRealtime, - Certificate $queueForCertificates, + Certificate $publisherForCertificates, Log $log, ValidatorAuthorization $authorization, ?string $validationDomain = null @@ -188,13 +189,17 @@ class Certificates extends Action // Issue a TLS certificate when domain is verified if ($rule->getAttribute('status', '') === RULE_STATUS_CERTIFICATE_GENERATING) { - $queueForCertificates - ->setDomain(new Document([ + $publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate( + project: new Document([ + '$id' => $rule->getAttribute('projectId', ''), + '$sequence' => $rule->getAttribute('projectInternalId', 0), + ]), + domain: new Document([ 'domain' => $rule->getAttribute('domain'), 'domainType' => $rule->getAttribute('deploymentResourceType', $rule->getAttribute('type')), - ])) - ->setAction(Certificate::ACTION_GENERATION) - ->trigger(); + ]), + action: \Appwrite\Event\Certificate::ACTION_GENERATION, + )); Console::success('Certificate generation triggered successfully.'); }