paused = $paused; return $this; } /** * Get paused state for this event. */ public function getPaused(): bool { return $this->paused; } /** * Set queue used for this event. * * @param string $queue * @return static */ public function setQueue(string $queue): static { $this->queue = $queue; return $this; } /** * Get queue used for this event. * * @return string */ public function getQueue(): string { return $this->queue; } /** * Set TTL (time-to-live) for jobs in this queue. * * @param int $ttl TTL in seconds * @return static */ public function setTTL(int $ttl): static { $this->ttl = $ttl; return $this; } /** * Get TTL (time-to-live) for jobs in this queue. * * @return int */ public function getTTL(): int { return $this->ttl; } /** * Set event name used for this event. * @param string $event * @return static */ public function setEvent(string $event): static { $this->event = $event; return $this; } /** * Get event name used for this event. * * @return string */ public function getEvent(): string { return $this->event; } /** * Set project for this event. * * @param Document $project * @return static */ public function setProject(Document $project): static { $this->project = $project; return $this; } /** * Get project for this event. * * @return ?Document */ public function getProject(): ?Document { return $this->project; } /** * Set platform for this event. * * @param array $platform * @return static */ public function setPlatform(array $platform): static { $this->platform = $platform; return $this; } /** * Get platform for this event. * * @return array */ public function getPlatform(): array { return $this->platform; } /** * Set user for this event. * * @param Document $user * @return static */ public function setUser(Document $user): static { $this->user = $user; return $this; } /** * Set user ID for this event. * * @return static */ public function setUserId(string $userId): static { $this->userId = $userId; return $this; } /** * Get user responsible for triggering this event. * * @return ?Document */ public function getUser(): ?Document { return $this->user; } /** * Get user responsible for triggering this event. */ public function getUserId(): ?string { return $this->userId; } /** * Set payload for this event. * * @param array $payload * @param array $sensitive * @return static */ public function setPayload(array $payload, array $sensitive = []): static { $this->payload = $payload; foreach ($sensitive as $key) { $this->sensitive[$key] = true; } return $this; } /** * Get payload for this event. * * @return array */ public function getPayload(): array { return $this->payload; } /** * Set context for this event. * * @param string $key * @param Document $context * @return static */ public function setContext(string $key, Document $context): self { $this->context[$key] = $context; return $this; } /** * Get context for this event. * * @param string $key * * @return null|Document */ public function getContext(string $key): ?Document { return $this->context[$key] ?? null; } /** * Set class used for this event. * @param string $class * @return static */ public function setClass(string $class): self { $this->class = $class; return $this; } /** * Get class used for this event. * * @return string */ public function getClass(): string { return $this->class; } /** * Set param of event. * * @param string $key * @param mixed $value * @return self */ public function setParam(string $key, mixed $value): self { $this->params[$key] = $value; return $this; } /** * Get param of event. * * @param string $key * @return mixed */ public function getParam(string $key): mixed { return $this->params[$key] ?? null; } /** * Get all params of the event. * * @return array */ public function getParams(): array { return $this->params; } /** * Get trimmed values for sensitive/large payload fields. * Override this method in child classes to add more fields to trim. * * @return array */ protected function trimPayload(): array { $trimmed = []; if ($this->project) { $trimmed['project'] = new Document([ '$id' => $this->project->getId(), '$sequence' => $this->project->getSequence(), 'database' => $this->project->getAttribute('database') ]); } return $trimmed; } /** * Execute Event. * * @return string|bool * @throws InvalidArgumentException */ public function trigger(): string|bool { if ($this->paused) { return false; } /** The getter is required since events like Databases need to override the queue name depending on the project */ $queue = new Queue($this->getQueue(), 'utopia-queue', $this->getTTL()); // Merge the base payload with any trimmed values $payload = array_merge($this->preparePayload(), $this->trimPayload()); try { return $this->publisher->enqueue($queue, $payload); } catch (\Throwable $th) { if ($this->critical) { throw $th; } return false; } } /** * Prepare payload for queue. Can be overridden by child classes to customize payload. * * @return array */ protected function preparePayload(): array { return [ 'project' => $this->project, 'user' => $this->user, 'userId' => $this->userId, 'payload' => $this->payload, 'context' => $this->context, 'events' => Event::generateEvents($this->getEvent(), $this->getParams()) ]; } /** * Resets event. * * @return self */ public function reset(): self { $this->params = []; $this->sensitive = []; $this->event = ''; $this->payload = []; return $this; } /** * Parses event pattern and returns the parts in their respective section. * * @param string $pattern * @return array */ public static function parseEventPattern(string $pattern): array { $parts = \explode('.', $pattern); $count = \count($parts); /** * Identify all sections of the pattern. */ $type = $parts[0] ?? false; $resource = $parts[1] ?? false; $hasSubResource = $count > 3 && \str_starts_with($parts[3], '['); $hasSubSubResource = $count > 5 && \str_starts_with($parts[5], '[') && $hasSubResource; if ($hasSubResource) { $subType = $parts[2]; $subResource = $parts[3]; } if ($hasSubSubResource) { $subSubType = $parts[4]; $subSubResource = $parts[5]; if ($count == 8) { $attribute = $parts[7]; } } if ($hasSubResource && !$hasSubSubResource) { if ($count === 6) { $attribute = $parts[5]; } } if (!$hasSubResource) { if ($count === 4) { $attribute = $parts[3]; } } $subType ??= false; $subResource ??= false; $subSubType ??= false; $subSubResource ??= false; $attribute ??= false; $action = match (true) { !$hasSubResource && $count > 2 => $parts[2], $hasSubSubResource => $parts[6] ?? false, $hasSubResource && $count > 4 => $parts[4], default => false }; return [ 'type' => $type, 'resource' => $resource, 'subType' => $subType, 'subResource' => $subResource, 'subSubType' => $subSubType, 'subSubResource' => $subSubResource, 'action' => $action, 'attribute' => $attribute, ]; } /** * Generates all possible events from a pattern. * * @param string $pattern * @param array $params * @param ?Document $database * @param ?Document $database * @return array * @throws \InvalidArgumentException */ public static function generateEvents(string $pattern, array $params = [], ?Document $database = null): array { // $params = \array_filter($params, fn($param) => !\is_array($param)); $paramKeys = \array_keys($params); $paramValues = \array_values($params); $patterns = []; $parsed = self::parseEventPattern($pattern); // to switch the resource types from databases to the required prefix // eg; all databases events get fired with databases. prefix which mainly depicts legacy type // so a projection from databases to the actual prefix(documentsdb, vectorsdb,etc) if ((str_contains($pattern, 'databases.') && $database && $database->getAttribute('type') !== 'legacy')) { $parsed = self::getDatabaseTypeEvents($database, $parsed); } $type = $parsed['type']; $resource = $parsed['resource']; $subType = $parsed['subType']; $subResource = $parsed['subResource']; $subSubType = $parsed['subSubType']; $subSubResource = $parsed['subSubResource']; $action = $parsed['action']; $attribute = $parsed['attribute']; if ($resource && !\in_array(\trim($resource, "\[\]"), $paramKeys)) { throw new InvalidArgumentException("{$resource} is missing from the params."); } if ($subResource && !\in_array(\trim($subResource, "\[\]"), $paramKeys)) { throw new InvalidArgumentException("{$subResource} is missing from the params."); } if ($subSubResource && !\in_array(\trim($subSubResource, "\[\]"), $paramKeys)) { throw new InvalidArgumentException("{$subSubResource} is missing from the params."); } /** * Create all possible patterns including placeholders. */ if ($action) { if ($subSubResource) { if ($attribute) { $patterns[] = \implode('.', [$type, $resource, $subType, $subResource, $subSubType, $subSubResource, $action, $attribute]); } $patterns[] = \implode('.', [$type, $resource, $subType, $subResource, $subSubType, $subSubResource, $action]); $patterns[] = \implode('.', [$type, $resource, $subType, $subResource, $subSubType, $subSubResource]); } elseif ($subResource) { if ($attribute) { $patterns[] = \implode('.', [$type, $resource, $subType, $subResource, $action, $attribute]); } $patterns[] = \implode('.', [$type, $resource, $subType, $subResource, $action]); $patterns[] = \implode('.', [$type, $resource, $subType, $subResource]); } else { $patterns[] = \implode('.', [$type, $resource, $action]); } if ($attribute) { $patterns[] = \implode('.', [$type, $resource, $action, $attribute]); } } if ($subSubResource) { $patterns[] = \implode('.', [$type, $resource, $subType, $subResource, $subSubType, $subSubResource]); } if ($subResource) { $patterns[] = \implode('.', [$type, $resource, $subType, $subResource]); } $patterns[] = \implode('.', [$type, $resource]); /** * Removes all duplicates. */ $patterns = \array_unique($patterns); /** * Set all possible values of the patterns and replace placeholders. */ $events = []; foreach ($patterns as $eventPattern) { $events[] = \str_replace($paramKeys, $paramValues, $eventPattern); $events[] = \str_replace($paramKeys, '*', $eventPattern); foreach ($paramKeys as $key) { foreach ($paramKeys as $current) { if ($subSubResource) { foreach ($paramKeys as $subCurrent) { if ($subCurrent === $current || $subCurrent === $key) { continue; } $filtered1 = \array_filter($paramKeys, fn (string $k) => $k === $subCurrent); $events[] = \str_replace($paramKeys, $paramValues, \str_replace($filtered1, '*', $eventPattern)); $filtered2 = \array_filter($paramKeys, fn (string $k) => $k === $current); $events[] = \str_replace($paramKeys, $paramValues, \str_replace($filtered2, '*', \str_replace($filtered1, '*', $eventPattern))); $events[] = \str_replace($paramKeys, $paramValues, \str_replace($filtered2, '*', $eventPattern)); } } else { if ($current === $key) { continue; } $filtered = \array_filter($paramKeys, fn (string $k) => $k === $current); $events[] = \str_replace($paramKeys, $paramValues, \str_replace($filtered, '*', $eventPattern)); } } } } /** * Remove [] from the events. */ $events = \array_map(fn (string $event) => \str_replace(['[', ']'], '', $event), $events); $events = \array_unique($events); /** * Force a non-assoc array. */ $eventValues = \array_values($events); $databaseType = $database?->getAttribute('type', 'legacy'); if ($database !== null && !\in_array($databaseType, ['legacy', 'tablesdb'], true)) { return $eventValues; } return Event::mirrorCollectionEvents($pattern, $eventValues[0], $eventValues); } /** * Generate a function event from a base event * * @param Event $event * */ public function from(Event $event): static { $this->project = $event->getProject(); $this->user = $event->getUser(); $this->payload = $event->getPayload(); $this->sensitive = $event->sensitive; $this->event = $event->getEvent(); $this->params = $event->getParams(); $this->context = $event->context; return $this; } /** * Adds table/collection counterpart events for backward compatibility. * * Example: * * `databases.*.collections.*.documents.*.update` →\ * `[databases.*.collections.*.documents.*.update, databases.*.tables.*.rows.*.update]` * * `databases.*.tables.*.rows.*.update` →\ * `[databases.*.tables.*.rows.*.update, databases.*.collections.*.documents.*.update]` */ private static function mirrorCollectionEvents(string $pattern, string $firstEvent, array $events): array { $collectionsToTablesMap = [ 'documents' => 'rows', 'collections' => 'tables', 'attributes' => 'columns', ]; $tablesToCollectionsMap = [ 'rows' => 'documents', 'tables' => 'collections', 'columns' => 'attributes', ]; $databasesEventMap = [ 'tablesdb' => 'databases', 'tables' => 'collections', 'rows' => 'documents', 'columns' => 'attributes' ]; if ( ( str_contains($pattern, 'databases.') && ( str_contains($firstEvent, 'collections') || str_contains($firstEvent, 'tables') ) ) || ( str_contains($firstEvent, 'tablesdb.') ) ) { $pairedEvents = []; foreach ($events as $event) { $pairedEvents[] = $event; // tablesdb needs databases event with tables and collections if (str_contains($event, 'tablesdb')) { $databasesSideEvent = self::replaceEventSegments($event, $databasesEventMap); $pairedEvents[] = $databasesSideEvent; $tableSideEvent = self::replaceEventSegments($databasesSideEvent, $collectionsToTablesMap); $pairedEvents[] = $tableSideEvent; } elseif (str_contains($event, 'collections')) { $tableSideEvent = self::replaceEventSegments($event, $collectionsToTablesMap); $pairedEvents[] = $tableSideEvent; } elseif (str_contains($event, 'tables')) { $collectionSideEvent = self::replaceEventSegments($event, $tablesToCollectionsMap); $pairedEvents[] = $collectionSideEvent; } } $events = $pairedEvents; } // mirrored events can have duplicates in case of smaller events // array unique can turns list to hasmap in case duplicates present // so forcing array value will turn this to array list always return array_values(array_unique($events)); } /** * Replace only exact event path segments, never partial substrings. */ private static function replaceEventSegments(string $event, array $map): string { $parts = \explode('.', $event); $parts = \array_map( fn (string $part) => $map[$part] ?? $part, $parts ); return \implode('.', $parts); } /** * Maps event terminology based on database type */ private static function getDatabaseTypeEvents(Document $database, array $event): array { $eventMap = []; switch ($database->getAttribute('type')) { case 'tablesdb': $eventMap = [ 'databases' => 'tablesdb', 'documents' => 'rows', 'collections' => 'tables', 'attributes' => 'columns', ]; break; case 'documentsdb': case 'vectorsdb': // sending the type itself(eg: documentsdb, vectorsdb) $eventMap = [ 'databases' => $database->getAttribute('type') ]; break; } foreach ($event as $eventKey => $eventValue) { if (isset($eventMap[$eventValue])) { $event[$eventKey] = $eventMap[$eventValue]; } } return $event; } /** * Returns the size of the queue. * * @param bool $failed Whether to include failed events in the count. * @return int The size of the queue. */ public function getSize(bool $failed = false): int { $queue = new Queue($this->getQueue()); return $this->publisher->getQueueSize($queue, $failed); } }