From 17dc8e3f8ad82afc911e492b0c4be9218f0305a3 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Thu, 27 Mar 2025 21:37:10 +1300 Subject: [PATCH] Revert "chore(audits): return queue pre-fetch results" --- src/Appwrite/Platform/Workers/Audits.php | 49 +++++++++++------------- 1 file changed, 23 insertions(+), 26 deletions(-) diff --git a/src/Appwrite/Platform/Workers/Audits.php b/src/Appwrite/Platform/Workers/Audits.php index 962bc622d1..ed5ff8010a 100644 --- a/src/Appwrite/Platform/Workers/Audits.php +++ b/src/Appwrite/Platform/Workers/Audits.php @@ -12,12 +12,12 @@ use Utopia\Database\Exception\Authorization; use Utopia\Database\Exception\Structure; use Utopia\Platform\Action; use Utopia\Queue\Message; -use Utopia\Queue\Result\Commit; -use Utopia\Queue\Result\NoCommit; use Utopia\System\System; class Audits extends Action { + protected const BATCH_SIZE_DEVELOPMENT = 1; // smaller batch size for development + protected const BATCH_SIZE_PRODUCTION = 5_000; protected const BATCH_AGGREGATION_INTERVAL = 60; // in seconds private int $lastTriggeredTime = 0; @@ -27,7 +27,9 @@ class Audits extends Action protected function getBatchSize(): int { - return intval(System::getEnv('_APP_QUEUE_PREFETCH_COUNT', 1)); + return System::getEnv('_APP_ENV', 'development') === 'development' + ? self::BATCH_SIZE_DEVELOPMENT + : self::BATCH_SIZE_PRODUCTION; } public static function getName(): string @@ -55,13 +57,13 @@ class Audits extends Action * @param Message $message * @param callable $getProjectDB * @param Document $project - * @return Commit|NoCommit + * @return void * @throws Throwable * @throws \Utopia\Database\Exception * @throws Authorization * @throws Structure */ - public function action(Message $message, callable $getProjectDB, Document $project): Commit|NoCommit + public function action(Message $message, callable $getProjectDB, Document $project): void { $payload = $message->getPayload() ?? []; @@ -117,34 +119,29 @@ class Audits extends Action // Check if we should process the batch by checking both for the batch size and the elapsed time $batchSize = $this->getBatchSize(); - $logCount = array_reduce($this->logs, fn (int $current, $logs) => $current + count($logs), 0); - $shouldProcessBatch = $logCount >= $batchSize; - if (!$shouldProcessBatch && $logCount > 0) { + $shouldProcessBatch = \count($this->logs) >= $batchSize; + if (!$shouldProcessBatch && \count($this->logs) > 0) { $shouldProcessBatch = (\time() - $this->lastTriggeredTime) >= self::BATCH_AGGREGATION_INTERVAL; } - if (!$shouldProcessBatch) { - return new NoCommit(); - } + if ($shouldProcessBatch) { + try { + foreach ($this->logs as $internalId => $projectLogs) { + $dbForProject = $getProjectDB($projectLogs['project']); - try { - foreach ($this->logs as $internalId => $projectLogs) { - $dbForProject = $getProjectDB($projectLogs['project']); + Console::log('Processing batch with ' . count($projectLogs['logs']) . ' events'); + $audit = new Audit($dbForProject); - Console::log('Processing batch with ' . count($projectLogs['logs']) . ' events'); - $audit = new Audit($dbForProject); + $audit->logBatch($projectLogs['logs']); + Console::success('Audit logs processed successfully'); - $audit->logBatch($projectLogs['logs']); - Console::success('Audit logs processed successfully'); - - unset($this->logs[$internalId]); + unset($this->logs[$internalId]); + } + } catch (Throwable $e) { + Console::error('Error processing audit logs: ' . $e->getMessage()); + } finally { + $this->lastTriggeredTime = time(); } - return new Commit(); - } catch (Throwable $e) { - Console::error('Error processing audit logs: ' . $e->getMessage()); - return new NoCommit(); - } finally { - $this->lastTriggeredTime = time(); } } }