Merge pull request #9680 from appwrite/non-critical-events

feat: allow non-critical events to ignore exceptions when enqueuing the event
This commit is contained in:
Christy Jacob 2025-04-23 19:18:19 +04:00 committed by GitHub
commit a11cbc3799
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
8 changed files with 54 additions and 28 deletions

View file

@ -12,6 +12,8 @@ class Audit extends Event
protected string $ip = '';
protected string $hostname = '';
protected bool $critical = false;
public function __construct(protected Publisher $publisher)
{
parent::__construct($publisher);

View file

@ -57,6 +57,9 @@ class Event
protected ?string $userId = null;
protected bool $paused = false;
/** @var bool Non-critical events will not throw an exception when enqueuing of the event fails. */
protected bool $critical = true;
/**
* @param Publisher $publisher
* @return void
@ -351,7 +354,14 @@ class Event
// Merge the base payload with any trimmed values
$payload = array_merge($this->preparePayload(), $this->trimPayload());
return $this->publisher->enqueue($queue, $payload);
try {
return $this->publisher->enqueue($queue, $payload);
} catch (\Throwable $th) {
if ($this->critical) {
throw $th;
}
return false;
}
}
/**

View file

@ -2,15 +2,22 @@
namespace Appwrite\Event;
use Appwrite\Messaging\Adapter;
use Appwrite\Messaging\Adapter\Realtime as RealtimeAdapter;
use Utopia\Database\Document;
use Utopia\Database\Exception;
class Realtime extends Event
{
protected array $subscribers = [];
private Adapter $realtime;
protected bool $critical = false;
public function __construct()
{
$this->realtime = new Adapter\Realtime();
}
/**
@ -57,7 +64,7 @@ class Realtime extends Event
* Execute Event.
*
* @return string|bool
* @throws InvalidArgumentException
* @throws Exception
*/
public function trigger(): string|bool
{
@ -87,7 +94,7 @@ class Realtime extends Event
: [$target['projectId'] ?? $this->getProject()->getId()];
foreach ($projectIds as $projectId) {
RealtimeAdapter::send(
$this->realtime->send(
projectId: $projectId,
payload: $this->getRealtimePayload(),
events: $allEvents,

View file

@ -6,6 +6,8 @@ use Utopia\Queue\Publisher;
class StatsResources extends Event
{
protected bool $critical = false;
public function __construct(protected Publisher $publisher)
{
parent::__construct($publisher);

View file

@ -11,6 +11,8 @@ class StatsUsage extends Event
protected array $reduce = [];
protected array $disabled = [];
protected bool $critical = false;
public function __construct(protected Publisher $publisher)
{
parent::__construct($publisher);

View file

@ -6,5 +6,5 @@ abstract class Adapter
{
abstract public function subscribe(string $projectId, mixed $identifier, array $roles, array $channels): void;
abstract public function unsubscribe(mixed $identifier): void;
abstract public static function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options): void;
abstract public function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options): void;
}

View file

@ -7,6 +7,7 @@ use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Helpers\ID;
use Utopia\Database\Helpers\Role;
use Utopia\Pools\Pool;
class Realtime extends Adapter
{
@ -35,6 +36,14 @@ class Realtime extends Adapter
*/
public array $subscriptions = [];
private Pool $pubsubPool;
public function __construct()
{
global $register;
$this->pubsubPool = $register->get('pools')->get('pubsub');
}
/**
* Adds a subscription.
*
@ -129,7 +138,7 @@ class Realtime extends Adapter
* @param array $options
* @return void
*/
public static function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options = []): void
public function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options = []): void
{
if (empty($channels) || empty($roles) || empty($projectId)) {
return;
@ -138,26 +147,20 @@ class Realtime extends Adapter
$permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged'];
$userId = array_key_exists('userId', $options) ? $options['userId'] : null;
global $register;
$pubsub = $register->get('pools')->get('pubsub')->pop();
try {
/** @var \Appwrite\PubSub\Adapter $redis */
$redis = $pubsub->getResource();
$redis->publish('realtime', json_encode([
'project' => $projectId,
'roles' => $roles,
'permissionsChanged' => $permissionsChanged,
'userId' => $userId,
'data' => [
'events' => $events,
'channels' => $channels,
'timestamp' => DateTime::formatTz(DateTime::now()),
'payload' => $payload
]
]));
} finally {
$pubsub->reclaim();
}
$message = [
'project' => $projectId,
'roles' => $roles,
'permissionsChanged' => $permissionsChanged,
'userId' => $userId,
'data' => [
'events' => $events,
'channels' => $channels,
'timestamp' => DateTime::formatTz(DateTime::now()),
'payload' => $payload
]
];
$this->pubsubPool->use(fn (\Appwrite\PubSub\Adapter $pubsub) => $pubsub->publish('realtime', json_encode($message)));
}
/**

View file

@ -16,7 +16,7 @@ class Method
* Initialise a new SDK method
*
* @param string $namespace
* @param string|null $group
* @param ?string $group
* @param string $name
* @param string $description
* @param array<AuthType> $auth
@ -34,7 +34,7 @@ class Method
*/
public function __construct(
protected string $namespace,
protected string|null $group,
protected ?string $group,
protected string $name,
protected string $description,
protected array $auth,
@ -128,7 +128,7 @@ class Method
return $this->namespace;
}
public function getGroup(): string|null
public function getGroup(): ?string
{
return $this->group;
}