diff --git a/src/Appwrite/Platform/Workers/Audits.php b/src/Appwrite/Platform/Workers/Audits.php index ed5ff8010a..962bc622d1 100644 --- a/src/Appwrite/Platform/Workers/Audits.php +++ b/src/Appwrite/Platform/Workers/Audits.php @@ -12,12 +12,12 @@ use Utopia\Database\Exception\Authorization; use Utopia\Database\Exception\Structure; use Utopia\Platform\Action; use Utopia\Queue\Message; +use Utopia\Queue\Result\Commit; +use Utopia\Queue\Result\NoCommit; use Utopia\System\System; class Audits extends Action { - protected const BATCH_SIZE_DEVELOPMENT = 1; // smaller batch size for development - protected const BATCH_SIZE_PRODUCTION = 5_000; protected const BATCH_AGGREGATION_INTERVAL = 60; // in seconds private int $lastTriggeredTime = 0; @@ -27,9 +27,7 @@ class Audits extends Action protected function getBatchSize(): int { - return System::getEnv('_APP_ENV', 'development') === 'development' - ? self::BATCH_SIZE_DEVELOPMENT - : self::BATCH_SIZE_PRODUCTION; + return intval(System::getEnv('_APP_QUEUE_PREFETCH_COUNT', 1)); } public static function getName(): string @@ -57,13 +55,13 @@ class Audits extends Action * @param Message $message * @param callable $getProjectDB * @param Document $project - * @return void + * @return Commit|NoCommit * @throws Throwable * @throws \Utopia\Database\Exception * @throws Authorization * @throws Structure */ - public function action(Message $message, callable $getProjectDB, Document $project): void + public function action(Message $message, callable $getProjectDB, Document $project): Commit|NoCommit { $payload = $message->getPayload() ?? []; @@ -119,29 +117,34 @@ class Audits extends Action // Check if we should process the batch by checking both for the batch size and the elapsed time $batchSize = $this->getBatchSize(); - $shouldProcessBatch = \count($this->logs) >= $batchSize; - if (!$shouldProcessBatch && \count($this->logs) > 0) { + $logCount = array_reduce($this->logs, fn (int $current, $logs) => $current + count($logs), 0); + $shouldProcessBatch = $logCount >= $batchSize; + if (!$shouldProcessBatch && $logCount > 0) { $shouldProcessBatch = (\time() - $this->lastTriggeredTime) >= self::BATCH_AGGREGATION_INTERVAL; } - if ($shouldProcessBatch) { - try { - foreach ($this->logs as $internalId => $projectLogs) { - $dbForProject = $getProjectDB($projectLogs['project']); + if (!$shouldProcessBatch) { + return new NoCommit(); + } - Console::log('Processing batch with ' . count($projectLogs['logs']) . ' events'); - $audit = new Audit($dbForProject); + try { + foreach ($this->logs as $internalId => $projectLogs) { + $dbForProject = $getProjectDB($projectLogs['project']); - $audit->logBatch($projectLogs['logs']); - Console::success('Audit logs processed successfully'); + Console::log('Processing batch with ' . count($projectLogs['logs']) . ' events'); + $audit = new Audit($dbForProject); - unset($this->logs[$internalId]); - } - } catch (Throwable $e) { - Console::error('Error processing audit logs: ' . $e->getMessage()); - } finally { - $this->lastTriggeredTime = time(); + $audit->logBatch($projectLogs['logs']); + Console::success('Audit logs processed successfully'); + + unset($this->logs[$internalId]); } + return new Commit(); + } catch (Throwable $e) { + Console::error('Error processing audit logs: ' . $e->getMessage()); + return new NoCommit(); + } finally { + $this->lastTriggeredTime = time(); } } }