Merge pull request #11851 from appwrite/chore-migrate-audits-certificates-screenshots-to-publishers

This commit is contained in:
Chirag Aggarwal 2026-04-15 15:13:18 +05:30 committed by GitHub
commit 49c93c635d
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
29 changed files with 495 additions and 210 deletions

View file

@ -2,10 +2,10 @@
require_once __DIR__ . '/init.php';
use Appwrite\Event\Certificate;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Publisher\Certificate as CertificatePublisher;
use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher;
use Appwrite\Event\Publisher\Usage as UsagePublisher;
use Appwrite\Platform\Appwrite;
@ -253,6 +253,10 @@ $container->set('publisherForUsage', fn (Publisher $publisher) => new UsagePubli
$publisher,
new Queue(System::getEnv('_APP_STATS_USAGE_QUEUE_NAME', Event::STATS_USAGE_QUEUE_NAME))
), ['publisher']);
$container->set('publisherForCertificates', fn (Publisher $publisher) => new CertificatePublisher(
$publisher,
new Queue(System::getEnv('_APP_CERTIFICATES_QUEUE_NAME', Event::CERTIFICATES_QUEUE_NAME))
), ['publisher']);
$container->set('publisherForStatsResources', fn (Publisher $publisher) => new StatsResourcesPublisher(
$publisher,
new Queue(System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME))
@ -263,9 +267,6 @@ $container->set('queueForFunctions', function (Publisher $publisher) {
$container->set('queueForDeletes', function (Publisher $publisher) {
return new Delete($publisher);
}, ['publisher']);
$container->set('queueForCertificates', function (Publisher $publisher) {
return new Certificate($publisher);
}, ['publisher']);
$container->set('logError', function (Registry $register) {
return function (Throwable $error, string $namespace, string $action) use ($register) {
Console::error('[Error] Timestamp: ' . date('c', time()));

View file

@ -7,9 +7,9 @@ use Ahc\Jwt\JWTException;
use Appwrite\Auth\Key;
use Appwrite\Bus\Events\ExecutionCompleted;
use Appwrite\Bus\Events\RequestCompleted;
use Appwrite\Event\Certificate;
use Appwrite\Event\Delete as DeleteEvent;
use Appwrite\Event\Event;
use Appwrite\Event\Publisher\Certificate;
use Appwrite\Extend\Exception as AppwriteException;
use Appwrite\Network\Cors;
use Appwrite\Platform\Appwrite;
@ -1014,11 +1014,11 @@ Http::init()
->inject('request')
->inject('console')
->inject('dbForPlatform')
->inject('queueForCertificates')
->inject('publisherForCertificates')
->inject('platform')
->inject('authorization')
->inject('certifiedDomains')
->action(function (Request $request, Document $console, Database $dbForPlatform, Certificate $queueForCertificates, array $platform, Authorization $authorization, Table $certifiedDomains) {
->action(function (Request $request, Document $console, Database $dbForPlatform, Certificate $publisherForCertificates, array $platform, Authorization $authorization, Table $certifiedDomains) {
$hostname = $request->getHostname();
$platformHostnames = $platform['hostnames'] ?? [];
@ -1044,7 +1044,7 @@ Http::init()
}
// 4. Check/create rule (requires DB access)
$authorization->skip(function () use ($dbForPlatform, $domain, $console, $queueForCertificates, $certifiedDomains) {
$authorization->skip(function () use ($dbForPlatform, $domain, $console, $publisherForCertificates, $certifiedDomains) {
try {
// TODO: (@Meldiron) Remove after 1.7.x migration
$isMd5 = System::getEnv('_APP_RULES_FORMAT') === 'md5';
@ -1100,10 +1100,11 @@ Http::init()
$dbForPlatform->createDocument('rules', $document);
Console::info('Issuing a TLS certificate for the main domain (' . $domain->get() . ') in a few seconds...');
$queueForCertificates
->setDomain($document)
->setSkipRenewCheck(true)
->trigger();
$publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate(
project: $console,
domain: $document,
skipRenewCheck: true,
));
} catch (Duplicate $e) {
Console::info('Certificate already exists');
} finally {

View file

@ -3,15 +3,17 @@
use Appwrite\Auth\Key;
use Appwrite\Auth\MFA\Type\TOTP;
use Appwrite\Bus\Events\RequestCompleted;
use Appwrite\Event\Audit;
use Appwrite\Event\Build;
use Appwrite\Event\Context\Audit as AuditContext;
use Appwrite\Event\Database as EventDatabase;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Message\Audit as AuditMessage;
use Appwrite\Event\Message\Usage as UsageMessage;
use Appwrite\Event\Messaging;
use Appwrite\Event\Publisher\Audit;
use Appwrite\Event\Publisher\Usage as UsagePublisher;
use Appwrite\Event\Realtime;
use Appwrite\Event\Webhook;
@ -88,7 +90,7 @@ Http::init()
->inject('request')
->inject('dbForPlatform')
->inject('dbForProject')
->inject('queueForAudits')
->inject('auditContext')
->inject('project')
->inject('user')
->inject('session')
@ -97,7 +99,7 @@ Http::init()
->inject('team')
->inject('apiKey')
->inject('authorization')
->action(function (Http $utopia, Request $request, Database $dbForPlatform, Database $dbForProject, Audit $queueForAudits, Document $project, User $user, ?Document $session, array $servers, string $mode, Document $team, ?Key $apiKey, Authorization $authorization) {
->action(function (Http $utopia, Request $request, Database $dbForPlatform, Database $dbForProject, AuditContext $auditContext, Document $project, User $user, ?Document $session, array $servers, string $mode, Document $team, ?Key $apiKey, Authorization $authorization) {
$route = $utopia->getRoute();
if ($route === null) {
throw new AppwriteException(AppwriteException::GENERAL_ROUTE_NOT_FOUND);
@ -193,7 +195,7 @@ Http::init()
'name' => $apiKey->getName(),
]);
$queueForAudits->setUser($user);
$auditContext->user = $user;
}
// For standard keys, update last accessed time
@ -264,7 +266,7 @@ Http::init()
API_KEY_ORGANIZATION => ACTIVITY_TYPE_KEY_ORGANIZATION,
default => ACTIVITY_TYPE_KEY_PROJECT,
});
$queueForAudits->setUser($userClone);
$auditContext->user = $userClone;
}
// Apply permission
@ -486,7 +488,7 @@ Http::init()
->inject('user')
->inject('queueForEvents')
->inject('queueForMessaging')
->inject('queueForAudits')
->inject('auditContext')
->inject('queueForDeletes')
->inject('queueForDatabase')
->inject('queueForBuilds')
@ -503,7 +505,7 @@ Http::init()
->inject('telemetry')
->inject('platform')
->inject('authorization')
->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Context $usage, Func $queueForFunctions, Mail $queueForMails, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform, Authorization $authorization) {
->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Messaging $queueForMessaging, AuditContext $auditContext, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Context $usage, Func $queueForFunctions, Mail $queueForMails, Database $dbForProject, callable $timelimit, Document $resourceToken, string $mode, ?Key $apiKey, array $plan, Document $devKey, Telemetry $telemetry, array $platform, Authorization $authorization) {
$response->setUser($user);
$request->setUser($user);
@ -596,13 +598,12 @@ Http::init()
->setProject($project)
->setUser($user);
$queueForAudits
->setMode($mode)
->setUserAgent($request->getUserAgent(''))
->setIP($request->getIP())
->setHostname($request->getHostname())
->setEvent($route->getLabel('audits.event', ''))
->setProject($project);
$auditContext->mode = $mode;
$auditContext->userAgent = $request->getUserAgent('');
$auditContext->ip = $request->getIP();
$auditContext->hostname = $request->getHostname();
$auditContext->event = $route->getLabel('audits.event', '');
$auditContext->project = $project;
/* If a session exists, use the user associated with the session */
if (! $user->isEmpty()) {
@ -611,7 +612,7 @@ Http::init()
if (empty($user->getAttribute('type'))) {
$userClone->setAttribute('type', $mode === APP_MODE_ADMIN ? ACTIVITY_TYPE_ADMIN : ACTIVITY_TYPE_USER);
}
$queueForAudits->setUser($userClone);
$auditContext->user = $userClone;
}
/* Auto-set projects */
@ -790,7 +791,8 @@ Http::shutdown()
->inject('project')
->inject('user')
->inject('queueForEvents')
->inject('queueForAudits')
->inject('auditContext')
->inject('publisherForAudits')
->inject('usage')
->inject('publisherForUsage')
->inject('queueForDeletes')
@ -807,7 +809,7 @@ Http::shutdown()
->inject('bus')
->inject('apiKey')
->inject('mode')
->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, Audit $queueForAudits, Context $usage, UsagePublisher $publisherForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, Authorization $authorization, callable $timelimit, EventProcessor $eventProcessor, Bus $bus, ?Key $apiKey, string $mode) use ($parseLabel) {
->action(function (Http $utopia, Request $request, Response $response, Document $project, User $user, Event $queueForEvents, AuditContext $auditContext, Audit $publisherForAudits, Context $usage, UsagePublisher $publisherForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Func $queueForFunctions, Event $queueForWebhooks, Realtime $queueForRealtime, Database $dbForProject, Authorization $authorization, callable $timelimit, EventProcessor $eventProcessor, Bus $bus, ?Key $apiKey, string $mode) use ($parseLabel) {
$responsePayload = $response->getPayload();
@ -902,7 +904,7 @@ Http::shutdown()
if (! empty($pattern)) {
$resource = $parseLabel($pattern, $responsePayload, $requestParams, $user);
if (! empty($resource) && $resource !== $pattern) {
$queueForAudits->setResource($resource);
$auditContext->resource = $resource;
}
}
@ -912,8 +914,8 @@ Http::shutdown()
if (empty($user->getAttribute('type'))) {
$userClone->setAttribute('type', $mode === APP_MODE_ADMIN ? ACTIVITY_TYPE_ADMIN : ACTIVITY_TYPE_USER);
}
$queueForAudits->setUser($userClone);
} elseif ($queueForAudits->getUser() === null || $queueForAudits->getUser()->isEmpty()) {
$auditContext->user = $userClone;
} elseif ($auditContext->user === null || $auditContext->user->isEmpty()) {
/**
* User in the request is empty, and no user was set for auditing previously.
* This indicates:
@ -931,24 +933,21 @@ Http::shutdown()
'name' => 'Guest',
]);
$queueForAudits->setUser($user);
$auditContext->user = $user;
}
if (! empty($queueForAudits->getResource()) && ! $queueForAudits->getUser()->isEmpty()) {
$auditUser = $auditContext->user;
if (! empty($auditContext->resource) && ! \is_null($auditUser) && ! $auditUser->isEmpty()) {
/**
* audits.payload is switched to default true
* in order to auto audit payload for all endpoints
*/
$pattern = $route->getLabel('audits.payload', true);
if (! empty($pattern)) {
$queueForAudits->setPayload($responsePayload);
$auditContext->payload = $responsePayload;
}
foreach ($queueForEvents->getParams() as $key => $value) {
$queueForAudits->setParam($key, $value);
}
$queueForAudits->trigger();
$publisherForAudits->enqueue(AuditMessage::fromContext($auditContext));
}
if (! empty($queueForDeletes->getType())) {

View file

@ -1,8 +1,11 @@
<?php
use Appwrite\Event\Event;
use Appwrite\Event\Publisher\Audit as AuditPublisher;
use Appwrite\Event\Publisher\Certificate as CertificatePublisher;
use Appwrite\Event\Publisher\Execution as ExecutionPublisher;
use Appwrite\Event\Publisher\Migration as MigrationPublisher;
use Appwrite\Event\Publisher\Screenshot as ScreenshotPublisher;
use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher;
use Appwrite\Event\Publisher\Usage as UsagePublisher;
use Appwrite\Utopia\Database\Documents\User;
@ -81,6 +84,18 @@ $container->set('publisherMessaging', function (Publisher $publisher) {
$container->set('publisherWebhooks', function (Publisher $publisher) {
return $publisher;
}, ['publisher']);
$container->set('publisherForAudits', fn (Publisher $publisher) => new AuditPublisher(
$publisher,
new Queue(System::getEnv('_APP_AUDITS_QUEUE_NAME', Event::AUDITS_QUEUE_NAME))
), ['publisher']);
$container->set('publisherForCertificates', fn (Publisher $publisher) => new CertificatePublisher(
$publisher,
new Queue(System::getEnv('_APP_CERTIFICATES_QUEUE_NAME', Event::CERTIFICATES_QUEUE_NAME))
), ['publisher']);
$container->set('publisherForScreenshots', fn (Publisher $publisher) => new ScreenshotPublisher(
$publisher,
new Queue(System::getEnv('_APP_SCREENSHOTS_QUEUE_NAME', Event::SCREENSHOTS_QUEUE_NAME))
), ['publisher']);
$container->set('publisherForUsage', fn (Publisher $publisher) => new UsagePublisher(
$publisher,
new Queue(System::getEnv('_APP_STATS_USAGE_QUEUE_NAME', Event::STATS_USAGE_QUEUE_NAME))

View file

@ -4,9 +4,8 @@ use Ahc\Jwt\JWT;
use Ahc\Jwt\JWTException;
use Appwrite\Auth\Key;
use Appwrite\Databases\TransactionState;
use Appwrite\Event\Audit as AuditEvent;
use Appwrite\Event\Build;
use Appwrite\Event\Certificate;
use Appwrite\Event\Context\Audit as AuditContext;
use Appwrite\Event\Database as EventDatabase;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
@ -14,7 +13,6 @@ use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
use Appwrite\Event\Realtime;
use Appwrite\Event\Screenshot;
use Appwrite\Event\Webhook;
use Appwrite\Extend\Exception;
use Appwrite\Functions\EventProcessor;
@ -128,9 +126,6 @@ return function (Container $container): void {
$container->set('queueForBuilds', function (Publisher $publisher) {
return new Build($publisher);
}, ['publisher']);
$container->set('queueForScreenshots', function (Publisher $publisher) {
return new Screenshot($publisher);
}, ['publisher']);
$container->set('queueForDatabase', function (Publisher $publisher) {
return new EventDatabase($publisher);
}, ['publisher']);
@ -149,18 +144,13 @@ return function (Container $container): void {
$container->set('usage', function () {
return new UsageContext();
}, []);
$container->set('queueForAudits', function (Publisher $publisher) {
return new AuditEvent($publisher);
}, ['publisher']);
$container->set('auditContext', fn () => new AuditContext(), []);
$container->set('queueForFunctions', function (Publisher $publisher) {
return new Func($publisher);
}, ['publisher']);
$container->set('eventProcessor', function () {
return new EventProcessor();
}, []);
$container->set('queueForCertificates', function (Publisher $publisher) {
return new Certificate($publisher);
}, ['publisher']);
$container->set('dbForPlatform', function (Group $pools, Cache $cache, Authorization $authorization) {
$adapter = new DatabasePool($pools->get('console'));
$database = new Database($adapter, $cache);

View file

@ -1,8 +1,6 @@
<?php
use Appwrite\Event\Audit;
use Appwrite\Event\Build;
use Appwrite\Event\Certificate;
use Appwrite\Event\Database as EventDatabase;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
@ -10,7 +8,6 @@ use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
use Appwrite\Event\Realtime;
use Appwrite\Event\Screenshot;
use Appwrite\Event\Webhook;
use Appwrite\Usage\Context;
use Appwrite\Utopia\Database\Documents\User;
@ -311,10 +308,6 @@ return function (Container $container): void {
return new Build($publisher);
}, ['publisher']);
$container->set('queueForScreenshots', function (Publisher $publisher) {
return new Screenshot($publisher);
}, ['publisher']);
$container->set('queueForDeletes', function (Publisher $publisher) {
return new Delete($publisher);
}, ['publisher']);
@ -323,10 +316,6 @@ return function (Container $container): void {
return new Event($publisher);
}, ['publisher']);
$container->set('queueForAudits', function (Publisher $publisher) {
return new Audit($publisher);
}, ['publisher']);
$container->set('queueForWebhooks', function (Publisher $publisher) {
return new Webhook($publisher);
}, ['publisher']);
@ -339,10 +328,6 @@ return function (Container $container): void {
return new Realtime();
}, []);
$container->set('queueForCertificates', function (Publisher $publisher) {
return new Certificate($publisher);
}, ['publisher']);
$container->set('deviceForSites', function (Document $project, Telemetry $telemetry) {
return new TelemetryDevice($telemetry, getDevice(APP_STORAGE_SITES . '/app-' . $project->getId()));
}, ['project', 'telemetry']);

View file

@ -0,0 +1,34 @@
<?php
namespace Appwrite\Event\Context;
use Utopia\Database\Document;
class Audit
{
public function __construct(
public ?Document $project = null,
public ?Document $user = null,
public string $mode = '',
public string $userAgent = '',
public string $ip = '',
public string $hostname = '',
public string $event = '',
public string $resource = '',
public array $payload = [],
) {
}
public function isEmpty(): bool
{
return $this->project === null
&& $this->user === null
&& $this->mode === ''
&& $this->userAgent === ''
&& $this->ip === ''
&& $this->hostname === ''
&& $this->event === ''
&& $this->resource === ''
&& $this->payload === [];
}
}

View file

@ -0,0 +1,71 @@
<?php
namespace Appwrite\Event\Message;
use Appwrite\Event\Context\Audit as AuditContext;
use Utopia\Database\Document;
final class Audit extends Base
{
public function __construct(
public readonly string $event,
public readonly array $payload,
public readonly Document $project = new Document(),
public readonly Document $user = new Document(),
public readonly string $resource = '',
public readonly string $mode = '',
public readonly string $ip = '',
public readonly string $userAgent = '',
public readonly string $hostname = '',
) {
}
public function toArray(): array
{
return [
'project' => [
'$id' => $this->project->getId(),
'$sequence' => $this->project->getSequence(),
'database' => $this->project->getAttribute('database', ''),
],
'user' => $this->user->getArrayCopy(),
'payload' => $this->payload,
'resource' => $this->resource,
'mode' => $this->mode,
'ip' => $this->ip,
'userAgent' => $this->userAgent,
'event' => $this->event,
'hostname' => $this->hostname,
];
}
public static function fromArray(array $data): static
{
return new self(
event: $data['event'] ?? '',
payload: $data['payload'] ?? [],
project: new Document($data['project'] ?? []),
user: new Document($data['user'] ?? []),
resource: $data['resource'] ?? '',
mode: $data['mode'] ?? '',
ip: $data['ip'] ?? '',
userAgent: $data['userAgent'] ?? '',
hostname: $data['hostname'] ?? '',
);
}
public static function fromContext(AuditContext $context): static
{
return new self(
event: $context->event,
payload: $context->payload,
project: $context->project ?? new Document(),
user: $context->user ?? new Document(),
resource: $context->resource,
mode: $context->mode,
ip: $context->ip,
userAgent: $context->userAgent,
hostname: $context->hostname,
);
}
}

View file

@ -0,0 +1,43 @@
<?php
namespace Appwrite\Event\Message;
use Utopia\Database\Document;
final class Certificate extends Base
{
public function __construct(
public readonly Document $project,
public readonly Document $domain,
public readonly bool $skipRenewCheck = false,
public readonly ?string $validationDomain = null,
public readonly string $action = \Appwrite\Event\Certificate::ACTION_GENERATION,
) {
}
public function toArray(): array
{
return [
'project' => [
'$id' => $this->project->getId(),
'$sequence' => $this->project->getSequence(),
'database' => $this->project->getAttribute('database', ''),
],
'domain' => $this->domain->getArrayCopy(),
'skipRenewCheck' => $this->skipRenewCheck,
'validationDomain' => $this->validationDomain,
'action' => $this->action,
];
}
public static function fromArray(array $data): static
{
return new self(
project: new Document($data['project'] ?? []),
domain: new Document($data['domain'] ?? []),
skipRenewCheck: $data['skipRenewCheck'] ?? false,
validationDomain: $data['validationDomain'] ?? null,
action: $data['action'] ?? \Appwrite\Event\Certificate::ACTION_GENERATION,
);
}
}

View file

@ -0,0 +1,34 @@
<?php
namespace Appwrite\Event\Message;
use Utopia\Database\Document;
final class Screenshot extends Base
{
public function __construct(
public readonly Document $project,
public readonly string $deploymentId,
) {
}
public function toArray(): array
{
return [
'project' => [
'$id' => $this->project->getId(),
'$sequence' => $this->project->getSequence(),
'database' => $this->project->getAttribute('database', ''),
],
'deploymentId' => $this->deploymentId,
];
}
public static function fromArray(array $data): static
{
return new self(
project: new Document($data['project'] ?? []),
deploymentId: $data['deploymentId'] ?? '',
);
}
}

View file

@ -0,0 +1,35 @@
<?php
namespace Appwrite\Event\Publisher;
use Appwrite\Event\Message\Audit as AuditMessage;
use Utopia\Console;
use Utopia\Queue\Publisher;
use Utopia\Queue\Queue;
readonly class Audit extends Base
{
public function __construct(
Publisher $publisher,
protected Queue $queue
) {
parent::__construct($publisher);
}
public function enqueue(AuditMessage $message): string|bool
{
// Audit delivery is best-effort and should never fail the request lifecycle.
try {
return $this->publish($this->queue, $message);
} catch (\Throwable $th) {
Console::error('[Audit] Failed to publish audit message: ' . $th->getMessage());
return false;
}
}
public function getSize(bool $failed = false): int
{
return $this->getQueueSize($this->queue, $failed);
}
}

View file

@ -0,0 +1,27 @@
<?php
namespace Appwrite\Event\Publisher;
use Appwrite\Event\Message\Certificate as CertificateMessage;
use Utopia\Queue\Publisher;
use Utopia\Queue\Queue;
readonly class Certificate extends Base
{
public function __construct(
Publisher $publisher,
protected Queue $queue
) {
parent::__construct($publisher);
}
public function enqueue(CertificateMessage $message): string|bool
{
return $this->publish($this->queue, $message);
}
public function getSize(bool $failed = false): int
{
return $this->getQueueSize($this->queue, $failed);
}
}

View file

@ -0,0 +1,27 @@
<?php
namespace Appwrite\Event\Publisher;
use Appwrite\Event\Message\Screenshot as ScreenshotMessage;
use Utopia\Queue\Publisher;
use Utopia\Queue\Queue;
readonly class Screenshot extends Base
{
public function __construct(
Publisher $publisher,
protected Queue $queue
) {
parent::__construct($publisher);
}
public function enqueue(ScreenshotMessage $message): string|bool
{
return $this->publish($this->queue, $message);
}
public function getSize(bool $failed = false): int
{
return $this->getQueueSize($this->queue, $failed);
}
}

View file

@ -6,9 +6,9 @@ use Ahc\Jwt\JWT;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Message\Usage as UsageMessage;
use Appwrite\Event\Publisher\Screenshot;
use Appwrite\Event\Publisher\Usage as UsagePublisher;
use Appwrite\Event\Realtime;
use Appwrite\Event\Screenshot;
use Appwrite\Event\Webhook;
use Appwrite\Filter\BranchDomain as BranchDomainFilter;
use Appwrite\Usage\Context;
@ -58,7 +58,7 @@ class Builds extends Action
->inject('project')
->inject('dbForPlatform')
->inject('queueForEvents')
->inject('queueForScreenshots')
->inject('publisherForScreenshots')
->inject('queueForWebhooks')
->inject('queueForFunctions')
->inject('queueForRealtime')
@ -84,7 +84,7 @@ class Builds extends Action
Document $project,
Database $dbForPlatform,
Event $queueForEvents,
Screenshot $queueForScreenshots,
Screenshot $publisherForScreenshots,
Webhook $queueForWebhooks,
Func $queueForFunctions,
Realtime $queueForRealtime,
@ -126,7 +126,7 @@ class Builds extends Action
$deviceForFunctions,
$deviceForSites,
$deviceForFiles,
$queueForScreenshots,
$publisherForScreenshots,
$queueForWebhooks,
$queueForFunctions,
$queueForRealtime,
@ -161,7 +161,7 @@ class Builds extends Action
Device $deviceForFunctions,
Device $deviceForSites,
Device $deviceForFiles,
Screenshot $queueForScreenshots,
Screenshot $publisherForScreenshots,
Webhook $queueForWebhooks,
Func $queueForFunctions,
Realtime $queueForRealtime,
@ -1120,10 +1120,10 @@ class Builds extends Action
/** Screenshot site */
if ($resource->getCollection() === 'sites') {
$queueForScreenshots
->setDeploymentId($deployment->getId())
->setProject($project)
->trigger();
$publisherForScreenshots->enqueue(new \Appwrite\Event\Message\Screenshot(
project: $project,
deploymentId: $deployment->getId(),
));
Console::log('Site screenshot queued');
}

View file

@ -3,6 +3,7 @@
namespace Appwrite\Platform\Modules\Functions\Workers;
use Ahc\Jwt\JWT;
use Appwrite\Event\Message\Screenshot;
use Appwrite\Event\Realtime;
use Appwrite\Permission;
use Appwrite\Role;
@ -62,9 +63,11 @@ class Screenshots extends Action
throw new \Exception('Missing payload');
}
$screenshotMessage = Screenshot::fromArray($payload);
Console::log('Site screenshot started');
$deploymentId = $payload['deploymentId'] ?? null;
$deploymentId = $screenshotMessage->deploymentId;
$deployment = $dbForProject->getDocument('deployments', $deploymentId);
if ($deployment->isEmpty()) {

View file

@ -2,7 +2,7 @@
namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Audits;
use Appwrite\Event\Audit;
use Appwrite\Event\Publisher\Audit;
use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@ -42,16 +42,16 @@ class Get extends Base
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queueForAudits')
->inject('publisherForAudits')
->inject('response')
->callback($this->action(...));
}
public function action(int|string $threshold, Audit $queueForAudits, Response $response): void
public function action(int|string $threshold, Audit $publisherForAudits, Response $response): void
{
$threshold = (int) $threshold;
$size = $queueForAudits->getSize();
$size = $publisherForAudits->getSize();
$this->assertQueueThreshold($size, $threshold);

View file

@ -2,7 +2,7 @@
namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Certificates;
use Appwrite\Event\Certificate;
use Appwrite\Event\Publisher\Certificate;
use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@ -42,16 +42,16 @@ class Get extends Base
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queueForCertificates')
->inject('publisherForCertificates')
->inject('response')
->callback($this->action(...));
}
public function action(int|string $threshold, Certificate $queueForCertificates, Response $response): void
public function action(int|string $threshold, Certificate $publisherForCertificates, Response $response): void
{
$threshold = (int) $threshold;
$size = $queueForCertificates->getSize();
$size = $publisherForCertificates->getSize();
$this->assertQueueThreshold($size, $threshold);

View file

@ -2,19 +2,19 @@
namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Failed;
use Appwrite\Event\Audit;
use Appwrite\Event\Build;
use Appwrite\Event\Certificate;
use Appwrite\Event\Database;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
use Appwrite\Event\Publisher\Audit;
use Appwrite\Event\Publisher\Certificate;
use Appwrite\Event\Publisher\Migration as MigrationPublisher;
use Appwrite\Event\Publisher\Screenshot;
use Appwrite\Event\Publisher\StatsResources as StatsResourcesPublisher;
use Appwrite\Event\Publisher\Usage as UsagePublisher;
use Appwrite\Event\Screenshot;
use Appwrite\Event\Webhook;
use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base;
use Appwrite\SDK\AuthType;
@ -75,17 +75,17 @@ class Get extends Base
->inject('response')
->inject('queueForDatabase')
->inject('queueForDeletes')
->inject('queueForAudits')
->inject('publisherForAudits')
->inject('queueForMails')
->inject('queueForFunctions')
->inject('publisherForStatsResources')
->inject('publisherForUsage')
->inject('queueForWebhooks')
->inject('queueForCertificates')
->inject('publisherForCertificates')
->inject('queueForBuilds')
->inject('queueForMessaging')
->inject('publisherForMigrations')
->inject('queueForScreenshots')
->inject('publisherForScreenshots')
->callback($this->action(...));
}
@ -95,32 +95,32 @@ class Get extends Base
Response $response,
Database $queueForDatabase,
Delete $queueForDeletes,
Audit $queueForAudits,
Audit $publisherForAudits,
Mail $queueForMails,
Func $queueForFunctions,
StatsResourcesPublisher $publisherForStatsResources,
UsagePublisher $publisherForUsage,
Webhook $queueForWebhooks,
Certificate $queueForCertificates,
Certificate $publisherForCertificates,
Build $queueForBuilds,
Messaging $queueForMessaging,
MigrationPublisher $publisherForMigrations,
Screenshot $queueForScreenshots,
Screenshot $publisherForScreenshots,
): void {
$threshold = (int) $threshold;
$queue = match ($name) {
System::getEnv('_APP_DATABASE_QUEUE_NAME', Event::DATABASE_QUEUE_NAME) => $queueForDatabase,
System::getEnv('_APP_DELETE_QUEUE_NAME', Event::DELETE_QUEUE_NAME) => $queueForDeletes,
System::getEnv('_APP_AUDITS_QUEUE_NAME', Event::AUDITS_QUEUE_NAME) => $queueForAudits,
System::getEnv('_APP_AUDITS_QUEUE_NAME', Event::AUDITS_QUEUE_NAME) => $publisherForAudits,
System::getEnv('_APP_MAILS_QUEUE_NAME', Event::MAILS_QUEUE_NAME) => $queueForMails,
System::getEnv('_APP_FUNCTIONS_QUEUE_NAME', Event::FUNCTIONS_QUEUE_NAME) => $queueForFunctions,
System::getEnv('_APP_STATS_RESOURCES_QUEUE_NAME', Event::STATS_RESOURCES_QUEUE_NAME) => $publisherForStatsResources,
System::getEnv('_APP_STATS_USAGE_QUEUE_NAME', Event::STATS_USAGE_QUEUE_NAME) => $publisherForUsage,
System::getEnv('_APP_WEBHOOK_QUEUE_NAME', Event::WEBHOOK_QUEUE_NAME) => $queueForWebhooks,
System::getEnv('_APP_CERTIFICATES_QUEUE_NAME', Event::CERTIFICATES_QUEUE_NAME) => $queueForCertificates,
System::getEnv('_APP_CERTIFICATES_QUEUE_NAME', Event::CERTIFICATES_QUEUE_NAME) => $publisherForCertificates,
System::getEnv('_APP_BUILDS_QUEUE_NAME', Event::BUILDS_QUEUE_NAME) => $queueForBuilds,
System::getEnv('_APP_SCREENSHOTS_QUEUE_NAME', Event::SCREENSHOTS_QUEUE_NAME) => $queueForScreenshots,
System::getEnv('_APP_SCREENSHOTS_QUEUE_NAME', Event::SCREENSHOTS_QUEUE_NAME) => $publisherForScreenshots,
System::getEnv('_APP_MESSAGING_QUEUE_NAME', Event::MESSAGING_QUEUE_NAME) => $queueForMessaging,
System::getEnv('_APP_MIGRATIONS_QUEUE_NAME', Event::MIGRATIONS_QUEUE_NAME) => $publisherForMigrations,
};

View file

@ -2,7 +2,7 @@
namespace Appwrite\Platform\Modules\Health\Http\Health\Queue\Logs;
use Appwrite\Event\Audit;
use Appwrite\Event\Publisher\Audit;
use Appwrite\Platform\Modules\Health\Http\Health\Queue\Base;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
@ -42,16 +42,16 @@ class Get extends Base
contentType: ContentType::JSON
))
->param('threshold', 5000, new Integer(true), 'Queue size threshold. When hit (equal or higher), endpoint returns server error. Default value is 5000.', true)
->inject('queueForAudits')
->inject('publisherForAudits')
->inject('response')
->callback($this->action(...));
}
public function action(int|string $threshold, Audit $queueForAudits, Response $response): void
public function action(int|string $threshold, Audit $publisherForAudits, Response $response): void
{
$threshold = (int) $threshold;
$size = $queueForAudits->getSize();
$size = $publisherForAudits->getSize();
$this->assertQueueThreshold($size, $threshold);

View file

@ -2,8 +2,8 @@
namespace Appwrite\Platform\Modules\Proxy\Http\Rules\API;
use Appwrite\Event\Certificate;
use Appwrite\Event\Event;
use Appwrite\Event\Publisher\Certificate;
use Appwrite\Extend\Exception;
use Appwrite\Platform\Modules\Proxy\Action;
use Appwrite\SDK\AuthType;
@ -62,7 +62,7 @@ class Create extends Action
->param('domain', null, new ValidatorDomain(), 'Domain name.')
->inject('response')
->inject('project')
->inject('queueForCertificates')
->inject('publisherForCertificates')
->inject('queueForEvents')
->inject('dbForPlatform')
->inject('platform')
@ -70,7 +70,7 @@ class Create extends Action
->callback($this->action(...));
}
public function action(string $domain, Response $response, Document $project, Certificate $queueForCertificates, Event $queueForEvents, Database $dbForPlatform, array $platform, Log $log)
public function action(string $domain, Response $response, Document $project, Certificate $publisherForCertificates, Event $queueForEvents, Database $dbForPlatform, array $platform, Log $log)
{
$this->validateDomainRestrictions($domain, $platform);
@ -114,13 +114,14 @@ class Create extends Action
}
if ($rule->getAttribute('status', '') === RULE_STATUS_CERTIFICATE_GENERATING) {
$queueForCertificates
->setDomain(new Document([
$publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate(
project: $project,
domain: new Document([
'domain' => $rule->getAttribute('domain'),
'domainType' => $rule->getAttribute('deploymentResourceType', $rule->getAttribute('type')),
]))
->setAction(Certificate::ACTION_GENERATION)
->trigger();
]),
action: \Appwrite\Event\Certificate::ACTION_GENERATION,
));
}
$queueForEvents->setParam('ruleId', $rule->getId());

View file

@ -2,8 +2,8 @@
namespace Appwrite\Platform\Modules\Proxy\Http\Rules\Function;
use Appwrite\Event\Certificate;
use Appwrite\Event\Event;
use Appwrite\Event\Publisher\Certificate;
use Appwrite\Extend\Exception;
use Appwrite\Platform\Modules\Proxy\Action;
use Appwrite\SDK\AuthType;
@ -66,7 +66,7 @@ class Create extends Action
->param('branch', '', new Text(255, 0), 'Name of VCS branch to deploy changes automatically', true)
->inject('response')
->inject('project')
->inject('queueForCertificates')
->inject('publisherForCertificates')
->inject('queueForEvents')
->inject('dbForPlatform')
->inject('dbForProject')
@ -75,7 +75,7 @@ class Create extends Action
->callback($this->action(...));
}
public function action(string $domain, string $functionId, string $branch, Response $response, Document $project, Certificate $queueForCertificates, Event $queueForEvents, Database $dbForPlatform, Database $dbForProject, array $platform, Log $log)
public function action(string $domain, string $functionId, string $branch, Response $response, Document $project, Certificate $publisherForCertificates, Event $queueForEvents, Database $dbForPlatform, Database $dbForProject, array $platform, Log $log)
{
$this->validateDomainRestrictions($domain, $platform);
@ -132,13 +132,14 @@ class Create extends Action
}
if ($rule->getAttribute('status', '') === RULE_STATUS_CERTIFICATE_GENERATING) {
$queueForCertificates
->setDomain(new Document([
$publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate(
project: $project,
domain: new Document([
'domain' => $rule->getAttribute('domain'),
'domainType' => $rule->getAttribute('deploymentResourceType', $rule->getAttribute('type')),
]))
->setAction(Certificate::ACTION_GENERATION)
->trigger();
]),
action: \Appwrite\Event\Certificate::ACTION_GENERATION,
));
}
$queueForEvents->setParam('ruleId', $rule->getId());

View file

@ -2,8 +2,8 @@
namespace Appwrite\Platform\Modules\Proxy\Http\Rules\Redirect;
use Appwrite\Event\Certificate;
use Appwrite\Event\Event;
use Appwrite\Event\Publisher\Certificate;
use Appwrite\Extend\Exception;
use Appwrite\Platform\Modules\Proxy\Action;
use Appwrite\SDK\AuthType;
@ -69,7 +69,7 @@ class Create extends Action
->param('resourceType', '', new WhiteList(['site', 'function']), 'Type of parent resource.')
->inject('response')
->inject('project')
->inject('queueForCertificates')
->inject('publisherForCertificates')
->inject('queueForEvents')
->inject('dbForPlatform')
->inject('dbForProject')
@ -78,7 +78,7 @@ class Create extends Action
->callback($this->action(...));
}
public function action(string $domain, string $url, int $statusCode, string $resourceId, string $resourceType, Response $response, Document $project, Certificate $queueForCertificates, Event $queueForEvents, Database $dbForPlatform, Database $dbForProject, array $platform, Log $log)
public function action(string $domain, string $url, int $statusCode, string $resourceId, string $resourceType, Response $response, Document $project, Certificate $publisherForCertificates, Event $queueForEvents, Database $dbForPlatform, Database $dbForProject, array $platform, Log $log)
{
$this->validateDomainRestrictions($domain, $platform);
@ -136,13 +136,14 @@ class Create extends Action
}
if ($rule->getAttribute('status', '') === RULE_STATUS_CERTIFICATE_GENERATING) {
$queueForCertificates
->setDomain(new Document([
$publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate(
project: $project,
domain: new Document([
'domain' => $rule->getAttribute('domain'),
'domainType' => $rule->getAttribute('deploymentResourceType', $rule->getAttribute('type')),
]))
->setAction(Certificate::ACTION_GENERATION)
->trigger();
]),
action: \Appwrite\Event\Certificate::ACTION_GENERATION,
));
}
$queueForEvents->setParam('ruleId', $rule->getId());

View file

@ -2,8 +2,8 @@
namespace Appwrite\Platform\Modules\Proxy\Http\Rules\Site;
use Appwrite\Event\Certificate;
use Appwrite\Event\Event;
use Appwrite\Event\Publisher\Certificate;
use Appwrite\Extend\Exception;
use Appwrite\Platform\Modules\Proxy\Action;
use Appwrite\SDK\AuthType;
@ -66,7 +66,7 @@ class Create extends Action
->param('branch', '', new Text(255, 0), 'Name of VCS branch to deploy changes automatically', true)
->inject('response')
->inject('project')
->inject('queueForCertificates')
->inject('publisherForCertificates')
->inject('queueForEvents')
->inject('dbForPlatform')
->inject('dbForProject')
@ -75,7 +75,7 @@ class Create extends Action
->callback($this->action(...));
}
public function action(string $domain, string $siteId, ?string $branch, Response $response, Document $project, Certificate $queueForCertificates, Event $queueForEvents, Database $dbForPlatform, Database $dbForProject, array $platform, Log $log)
public function action(string $domain, string $siteId, ?string $branch, Response $response, Document $project, Certificate $publisherForCertificates, Event $queueForEvents, Database $dbForPlatform, Database $dbForProject, array $platform, Log $log)
{
$this->validateDomainRestrictions($domain, $platform);
@ -132,13 +132,14 @@ class Create extends Action
}
if ($rule->getAttribute('status', '') === RULE_STATUS_CERTIFICATE_GENERATING) {
$queueForCertificates
->setDomain(new Document([
$publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate(
project: $project,
domain: new Document([
'domain' => $rule->getAttribute('domain'),
'domainType' => $rule->getAttribute('deploymentResourceType', $rule->getAttribute('type')),
]))
->setAction(Certificate::ACTION_GENERATION)
->trigger();
]),
action: \Appwrite\Event\Certificate::ACTION_GENERATION,
));
}
$queueForEvents->setParam('ruleId', $rule->getId());

View file

@ -2,8 +2,8 @@
namespace Appwrite\Platform\Modules\Proxy\Http\Rules\Verification;
use Appwrite\Event\Certificate;
use Appwrite\Event\Event;
use Appwrite\Event\Publisher\Certificate;
use Appwrite\Extend\Exception;
use Appwrite\Platform\Modules\Proxy\Action;
use Appwrite\SDK\AuthType;
@ -56,7 +56,7 @@ class Update extends Action
))
->param('ruleId', '', fn (Database $dbForProject) => new UID($dbForProject->getAdapter()->getMaxUIDLength()), 'Rule ID.', false, ['dbForProject'])
->inject('response')
->inject('queueForCertificates')
->inject('publisherForCertificates')
->inject('queueForEvents')
->inject('project')
->inject('dbForPlatform')
@ -67,7 +67,7 @@ class Update extends Action
public function action(
string $ruleId,
Response $response,
Certificate $queueForCertificates,
Certificate $publisherForCertificates,
Event $queueForEvents,
Document $project,
Database $dbForPlatform,
@ -110,12 +110,13 @@ class Update extends Action
}
// Issue a TLS certificate when DNS verification is successful
$queueForCertificates
->setDomain(new Document([
$publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate(
project: $project,
domain: new Document([
'domain' => $rule->getAttribute('domain'),
'domainType' => $rule->getAttribute('deploymentResourceType', $rule->getAttribute('type')),
]))
->trigger();
]),
));
if (!empty($certificate)) {
$rule->setAttribute('renewAt', $certificate->getAttribute('renewDate', ''));

View file

@ -2,7 +2,7 @@
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Certificate;
use Appwrite\Event\Publisher\Certificate;
use DateTime;
use Swoole\Coroutine\Channel;
use Swoole\Process;
@ -29,16 +29,16 @@ class Interval extends Action
->desc('Schedules tasks on regular intervals by publishing them to our queues')
->inject('dbForPlatform')
->inject('getProjectDB')
->inject('queueForCertificates')
->inject('publisherForCertificates')
->callback($this->action(...));
}
public function action(Database $dbForPlatform, callable $getProjectDB, Certificate $queueForCertificates): void
public function action(Database $dbForPlatform, callable $getProjectDB, Certificate $publisherForCertificates): void
{
Console::title('Interval V1');
Console::success(APP_NAME . ' interval process v1 has started');
$timers = $this->runTasks($dbForPlatform, $getProjectDB, $queueForCertificates);
$timers = $this->runTasks($dbForPlatform, $getProjectDB, $publisherForCertificates);
$chan = new Channel(1);
Process::signal(SIGTERM, function () use ($chan) {
@ -52,16 +52,16 @@ class Interval extends Action
}
}
public function runTasks(Database $dbForPlatform, callable $getProjectDB, Certificate $queueForCertificates): array
public function runTasks(Database $dbForPlatform, callable $getProjectDB, Certificate $publisherForCertificates): array
{
$timers = [];
$tasks = $this->getTasks();
foreach ($tasks as $task) {
$timers[] = Timer::tick($task['interval'], function () use ($task, $dbForPlatform, $getProjectDB, $queueForCertificates) {
$timers[] = Timer::tick($task['interval'], function () use ($task, $dbForPlatform, $getProjectDB, $publisherForCertificates) {
$taskName = $task['name'];
Span::init("interval.{$taskName}");
try {
$task['callback']($dbForPlatform, $getProjectDB, $queueForCertificates);
$task['callback']($dbForPlatform, $getProjectDB, $publisherForCertificates);
} catch (\Exception $e) {
Span::error($e);
} finally {
@ -80,15 +80,15 @@ class Interval extends Action
return [
[
'name' => 'domainVerification',
"callback" => function (Database $dbForPlatform, callable $getProjectDB, Certificate $queueForCertificates) {
$this->verifyDomain($dbForPlatform, $queueForCertificates);
"callback" => function (Database $dbForPlatform, callable $getProjectDB, Certificate $publisherForCertificates) {
$this->verifyDomain($dbForPlatform, $publisherForCertificates);
},
'interval' => $intervalDomainVerification * 1000,
]
];
}
private function verifyDomain(Database $dbForPlatform, Certificate $queueForCertificates): void
private function verifyDomain(Database $dbForPlatform, Certificate $publisherForCertificates): void
{
$time = DatabaseDateTime::now();
$fromTime = new DateTime('-3 days'); // Max 3 days old
@ -115,13 +115,17 @@ class Interval extends Action
foreach ($rules as $rule) {
try {
$queueForCertificates
->setDomain(new Document([
$publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate(
project: new Document([
'$id' => $rule->getAttribute('projectId', ''),
'$sequence' => $rule->getAttribute('projectInternalId', 0),
]),
domain: new Document([
'domain' => $rule->getAttribute('domain'),
'domainType' => $rule->getAttribute('deploymentResourceType', $rule->getAttribute('type')),
]))
->setAction(Certificate::ACTION_DOMAIN_VERIFICATION)
->trigger();
]),
action: \Appwrite\Event\Certificate::ACTION_DOMAIN_VERIFICATION,
));
$processed++;
} catch (\Throwable $th) {
$failed++;

View file

@ -2,8 +2,8 @@
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Certificate;
use Appwrite\Event\Delete;
use Appwrite\Event\Publisher\Certificate;
use DateInterval;
use DateTime;
use Utopia\Console;
@ -29,12 +29,12 @@ class Maintenance extends Action
->param('type', 'loop', new WhiteList(['loop', 'trigger']), 'How to run task. "loop" is meant for container entrypoint, and "trigger" for manual execution.')
->inject('dbForPlatform')
->inject('console')
->inject('queueForCertificates')
->inject('publisherForCertificates')
->inject('queueForDeletes')
->callback($this->action(...));
}
public function action(string $type, Database $dbForPlatform, Document $console, Certificate $queueForCertificates, Delete $queueForDeletes): void
public function action(string $type, Database $dbForPlatform, Document $console, Certificate $publisherForCertificates, Delete $queueForDeletes): void
{
Console::title('Maintenance V1');
Console::success(APP_NAME . ' maintenance process v1 has started');
@ -59,7 +59,7 @@ class Maintenance extends Action
$delay = $next->getTimestamp() - $now->getTimestamp();
}
$action = function () use ($interval, $cacheRetention, $schedulesDeletionRetention, $usageStatsRetentionHourly, $dbForPlatform, $console, $queueForDeletes, $queueForCertificates) {
$action = function () use ($interval, $cacheRetention, $schedulesDeletionRetention, $usageStatsRetentionHourly, $dbForPlatform, $console, $queueForDeletes, $publisherForCertificates) {
$time = DatabaseDateTime::now();
Console::info("[{$time}] Notifying workers with maintenance tasks every {$interval} seconds");
@ -92,7 +92,7 @@ class Maintenance extends Action
->trigger();
$this->notifyDeleteConnections($queueForDeletes);
$this->renewCertificates($dbForPlatform, $queueForCertificates);
$this->renewCertificates($dbForPlatform, $publisherForCertificates);
$this->notifyDeleteCache($cacheRetention, $queueForDeletes);
$this->notifyDeleteSchedules($schedulesDeletionRetention, $queueForDeletes);
$this->notifyDeleteCSVExports($queueForDeletes);
@ -124,7 +124,7 @@ class Maintenance extends Action
->trigger();
}
private function renewCertificates(Database $dbForPlatform, Certificate $queueForCertificate): void
private function renewCertificates(Database $dbForPlatform, Certificate $publisherForCertificate): void
{
$time = DatabaseDateTime::now();
@ -158,13 +158,17 @@ class Maintenance extends Action
continue;
}
$queueForCertificate
->setDomain(new Document([
$publisherForCertificate->enqueue(new \Appwrite\Event\Message\Certificate(
project: new Document([
'$id' => $rule->getAttribute('projectId', ''),
'$sequence' => $rule->getAttribute('projectInternalId', 0),
]),
domain: new Document([
'domain' => $rule->getAttribute('domain'),
'domainType' => $rule->getAttribute('deploymentResourceType', $rule->getAttribute('type')),
]))
->setAction(Certificate::ACTION_GENERATION)
->trigger();
]),
action: \Appwrite\Event\Certificate::ACTION_GENERATION,
));
}
}

View file

@ -2,7 +2,7 @@
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Certificate;
use Appwrite\Event\Publisher\Certificate;
use Utopia\Console;
use Utopia\Database\Database;
use Utopia\Database\Document;
@ -29,11 +29,11 @@ class SSL extends Action
->param('skip-check', 'true', new Boolean(true), 'If DNS and renew check should be skipped. Defaults to true, and when true, all jobs will result in certificate generation attempt.', true)
->inject('console')
->inject('dbForPlatform')
->inject('queueForCertificates')
->inject('publisherForCertificates')
->callback($this->action(...));
}
public function action(string $domain, bool|string $skipCheck, Document $console, Database $dbForPlatform, Certificate $queueForCertificates): void
public function action(string $domain, bool|string $skipCheck, Document $console, Database $dbForPlatform, Certificate $publisherForCertificates): void
{
$domain = new Domain(!empty($domain) ? $domain : '');
if (!$domain->isKnown() || $domain->isTest()) {
@ -98,12 +98,13 @@ class SSL extends Action
Console::info('Updated existing rule ' . $rule->getId() . ' for domain: ' . $domain->get());
}
$queueForCertificates
->setDomain(new Document([
'domain' => $domain->get()
]))
->setSkipRenewCheck($skipCheck)
->trigger();
$publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate(
project: $console,
domain: new Document([
'domain' => $domain->get(),
]),
skipRenewCheck: $skipCheck,
));
Console::success('Scheduled a job to issue a TLS certificate for domain: ' . $domain->get());
}

View file

@ -2,6 +2,7 @@
namespace Appwrite\Platform\Workers;
use Appwrite\Event\Message\Audit;
use Exception;
use Throwable;
use Utopia\Console;
@ -40,7 +41,6 @@ class Audits extends Action
$this
->desc('Audits worker')
->inject('message')
->inject('project')
->inject('getAudit')
->callback($this->action(...));
@ -50,14 +50,13 @@ class Audits extends Action
/**
* @param Message $message
* @param Document $project
* @param callable(Document): \Utopia\Audit\Audit $getAudit
* @return Commit|NoCommit
* @throws Throwable
* @throws \Utopia\Database\Exception
* @throws Structure
*/
public function action(Message $message, Document $project, callable $getAudit): Commit|NoCommit
public function action(Message $message, callable $getAudit): Commit|NoCommit
{
$payload = $message->getPayload() ?? [];
@ -65,19 +64,21 @@ class Audits extends Action
throw new Exception('Missing payload');
}
$auditMessage = Audit::fromArray($payload);
Console::info('Aggregating audit logs');
$event = $payload['event'] ?? '';
$event = $auditMessage->event;
$auditPayload = '';
if ($project->getId() === 'console') {
$auditPayload = $payload['payload'] ?? '';
if ($auditMessage->project->getId() === 'console') {
$auditPayload = $auditMessage->payload;
}
$mode = $payload['mode'] ?? '';
$resource = $payload['resource'] ?? '';
$userAgent = $payload['userAgent'] ?? '';
$ip = $payload['ip'] ?? '';
$user = new Document($payload['user'] ?? []);
$mode = $auditMessage->mode;
$resource = $auditMessage->resource;
$userAgent = $auditMessage->userAgent;
$ip = $auditMessage->ip;
$user = $auditMessage->user;
$impersonatorUserId = $user->getAttribute('impersonatorUserId');
$actorUserId = $impersonatorUserId ?: $user->getId();
@ -126,14 +127,14 @@ class Audits extends Action
];
}
if (isset($this->logs[$project->getSequence()])) {
$this->logs[$project->getSequence()]['logs'][] = $eventData;
if (isset($this->logs[$auditMessage->project->getSequence()])) {
$this->logs[$auditMessage->project->getSequence()]['logs'][] = $eventData;
} else {
$this->logs[$project->getSequence()] = [
$this->logs[$auditMessage->project->getSequence()] = [
'project' => new Document([
'$id' => $project->getId(),
'$sequence' => $project->getSequence(),
'database' => $project->getAttribute('database'),
'$id' => $auditMessage->project->getId(),
'$sequence' => $auditMessage->project->getSequence(),
'database' => $auditMessage->project->getAttribute('database'),
]),
'logs' => [$eventData]
];

View file

@ -3,10 +3,10 @@
namespace Appwrite\Platform\Workers;
use Appwrite\Certificates\Adapter as CertificatesAdapter;
use Appwrite\Event\Certificate;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Publisher\Certificate;
use Appwrite\Event\Realtime;
use Appwrite\Event\Webhook;
use Appwrite\Extend\Exception as AppwriteException;
@ -55,7 +55,7 @@ class Certificates extends Action
->inject('queueForWebhooks')
->inject('queueForFunctions')
->inject('queueForRealtime')
->inject('queueForCertificates')
->inject('publisherForCertificates')
->inject('log')
->inject('certificates')
->inject('plan')
@ -71,7 +71,7 @@ class Certificates extends Action
* @param Webhook $queueForWebhooks
* @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @param Certificate $queueForCertificates
* @param Certificate $publisherForCertificates
* @param Log $log
* @param CertificatesAdapter $certificates
* @param array $plan
@ -88,7 +88,7 @@ class Certificates extends Action
Webhook $queueForWebhooks,
Func $queueForFunctions,
Realtime $queueForRealtime,
Certificate $queueForCertificates,
Certificate $publisherForCertificates,
Log $log,
CertificatesAdapter $certificates,
array $plan,
@ -100,21 +100,22 @@ class Certificates extends Action
throw new Exception('Missing payload');
}
$document = new Document($payload['domain'] ?? []);
$certificateMessage = \Appwrite\Event\Message\Certificate::fromArray($payload);
$document = $certificateMessage->domain;
$domain = new Domain($document->getAttribute('domain', ''));
$domainType = $document->getAttribute('domainType');
$skipRenewCheck = $payload['skipRenewCheck'] ?? false;
$validationDomain = $payload['validationDomain'] ?? null;
$action = $payload['action'] ?? Certificate::ACTION_GENERATION;
$skipRenewCheck = $certificateMessage->skipRenewCheck;
$validationDomain = $certificateMessage->validationDomain;
$action = $certificateMessage->action;
$log->addTag('domain', $domain->get());
switch ($action) {
case Certificate::ACTION_DOMAIN_VERIFICATION:
$this->handleDomainVerificationAction($domain, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $queueForCertificates, $log, $authorization, $validationDomain);
case \Appwrite\Event\Certificate::ACTION_DOMAIN_VERIFICATION:
$this->handleDomainVerificationAction($domain, $dbForPlatform, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $publisherForCertificates, $log, $authorization, $validationDomain);
break;
case Certificate::ACTION_GENERATION:
case \Appwrite\Event\Certificate::ACTION_GENERATION:
$this->handleCertificateGenerationAction($domain, $domainType, $dbForPlatform, $queueForMails, $queueForEvents, $queueForWebhooks, $queueForFunctions, $queueForRealtime, $log, $certificates, $authorization, $skipRenewCheck, $plan, $validationDomain);
break;
@ -130,7 +131,7 @@ class Certificates extends Action
* @param Webhook $queueForWebhooks
* @param Func $queueForFunctions
* @param Realtime $queueForRealtime
* @param Certificate $queueForCertificates
* @param Certificate $publisherForCertificates
* @param Log $log
* @param ValidatorAuthorization $authorization
* @param string|null $validationDomain
@ -146,7 +147,7 @@ class Certificates extends Action
Webhook $queueForWebhooks,
Func $queueForFunctions,
Realtime $queueForRealtime,
Certificate $queueForCertificates,
Certificate $publisherForCertificates,
Log $log,
ValidatorAuthorization $authorization,
?string $validationDomain = null
@ -188,13 +189,17 @@ class Certificates extends Action
// Issue a TLS certificate when domain is verified
if ($rule->getAttribute('status', '') === RULE_STATUS_CERTIFICATE_GENERATING) {
$queueForCertificates
->setDomain(new Document([
$publisherForCertificates->enqueue(new \Appwrite\Event\Message\Certificate(
project: new Document([
'$id' => $rule->getAttribute('projectId', ''),
'$sequence' => $rule->getAttribute('projectInternalId', 0),
]),
domain: new Document([
'domain' => $rule->getAttribute('domain'),
'domainType' => $rule->getAttribute('deploymentResourceType', $rule->getAttribute('type')),
]))
->setAction(Certificate::ACTION_GENERATION)
->trigger();
]),
action: \Appwrite\Event\Certificate::ACTION_GENERATION,
));
Console::success('Certificate generation triggered successfully.');
}