desc('Audits worker') ->inject('message') ->inject('dbForProject') ->callback(fn ($message, $dbForProject) => $this->action($message, $dbForProject)); $this->lastTriggeredTime = time(); } /** * @param Message $message * @param Database $dbForProject * @return void * @throws Throwable * @throws \Utopia\Database\Exception * @throws Authorization * @throws Structure */ public function action(Message $message, Database $dbForProject): void { $payload = $message->getPayload() ?? []; if (empty($payload)) { throw new Exception('Missing payload'); } Console::info('Aggregating audit logs'); $event = $payload['event'] ?? ''; $auditPayload = $payload['payload'] ?? ''; $mode = $payload['mode'] ?? ''; $resource = $payload['resource'] ?? ''; $userAgent = $payload['userAgent'] ?? ''; $ip = $payload['ip'] ?? ''; $user = new Document($payload['user'] ?? []); $userName = $user->getAttribute('name', ''); $userEmail = $user->getAttribute('email', ''); $userType = $user->getAttribute('type', Auth::ACTIVITY_TYPE_USER); // Create event data $eventData = [ 'userId' => $user->getInternalId(), 'event' => $event, 'resource' => $resource, 'userAgent' => $userAgent, 'ip' => $ip, 'location' => '', 'data' => [ 'userId' => $user->getId(), 'userName' => $userName, 'userEmail' => $userEmail, 'userType' => $userType, 'mode' => $mode, 'data' => $auditPayload, ], 'timestamp' => DateTime::formatTz(DateTime::now()) ]; $this->logs[] = $eventData; // Check if we should process the batch by checking both for the batch size and the elapsed time $batchSize = $this->getBatchSize(); $shouldProcessBatch = count($this->logs) >= $batchSize; if (!$shouldProcessBatch && count($this->logs) > 0) { $shouldProcessBatch = (time() - $this->lastTriggeredTime) >= self::BATCH_AGGREGATION_INTERVAL; } if ($shouldProcessBatch) { Console::log('Processing batch with ' . count($this->logs) . ' events'); $audit = new Audit($dbForProject); try { $audit->logBatch($this->logs); Console::success('Audit logs processed successfully'); } catch (Throwable $e) { Console::error('Error processing audit logs: ' . $e->getMessage()); } finally { // Clear the pending events after successful batch processing $this->logs = []; $this->lastTriggeredTime = time(); } } } }