From fc14144a0b7e0483466886163a430608cacf3201 Mon Sep 17 00:00:00 2001 From: Fabian Gruber Date: Tue, 18 Mar 2025 12:29:32 +0100 Subject: [PATCH] chore(audits): return queue pre-fetch results --- src/Appwrite/Platform/Workers/Audits.php | 57 ++++++++++++------------ 1 file changed, 29 insertions(+), 28 deletions(-) diff --git a/src/Appwrite/Platform/Workers/Audits.php b/src/Appwrite/Platform/Workers/Audits.php index 76309145b8..ce8dbedd73 100644 --- a/src/Appwrite/Platform/Workers/Audits.php +++ b/src/Appwrite/Platform/Workers/Audits.php @@ -12,13 +12,13 @@ use Utopia\Database\Exception\Authorization; use Utopia\Database\Exception\Structure; use Utopia\Platform\Action; use Utopia\Queue\Message; +use Utopia\Queue\Result\Commit; +use Utopia\Queue\Result\NoCommit; use Utopia\System\System; class Audits extends Action { - protected const BATCH_SIZE_DEVELOPMENT = 1; // smaller batch size for development - protected const BATCH_SIZE_PRODUCTION = 5_000; - protected const BATCH_AGGREGATION_INTERVAL = 60; // in seconds + protected const int BATCH_AGGREGATION_INTERVAL = 60; // in seconds private int $lastTriggeredTime = 0; @@ -27,9 +27,7 @@ class Audits extends Action protected function getBatchSize(): int { - return System::getEnv('_APP_ENV', 'development') === 'development' - ? self::BATCH_SIZE_DEVELOPMENT - : self::BATCH_SIZE_PRODUCTION; + return intval(System::getEnv('_APP_QUEUE_PREFETCH_COUNT', 1)); } public static function getName(): string @@ -57,13 +55,13 @@ class Audits extends Action * @param Message $message * @param callable $getProjectDB * @param Document $project - * @return void + * @return Commit|NoCommit * @throws Throwable * @throws \Utopia\Database\Exception * @throws Authorization * @throws Structure */ - public function action(Message $message, callable $getProjectDB, Document $project): void + public function action(Message $message, callable $getProjectDB, Document $project): Commit|NoCommit { $payload = $message->getPayload() ?? []; @@ -123,29 +121,32 @@ class Audits extends Action // Check if we should process the batch by checking both for the batch size and the elapsed time $batchSize = $this->getBatchSize(); - $shouldProcessBatch = \count($this->logs) >= $batchSize; - if (!$shouldProcessBatch && \count($this->logs) > 0) { + $logCount = array_reduce($this->logs, fn (int $current, $logs) => $current + count($logs['logs']), 0); + $shouldProcessBatch = $logCount >= $batchSize; + if (!$shouldProcessBatch && $logCount > 0) { $shouldProcessBatch = (\time() - $this->lastTriggeredTime) >= self::BATCH_AGGREGATION_INTERVAL; } - if ($shouldProcessBatch) { - try { - foreach ($this->logs as $internalId => $projectLogs) { - $dbForProject = $getProjectDB($projectLogs['project']); - - Console::log('Processing batch with ' . count($projectLogs['logs']) . ' events'); - $audit = new Audit($dbForProject); - - $audit->logBatch($projectLogs['logs']); - Console::success('Audit logs processed successfully'); - - unset($this->logs[$internalId]); - } - } catch (Throwable $e) { - Console::error('Error processing audit logs: ' . $e->getMessage()); - } finally { - $this->lastTriggeredTime = time(); - } + if (!$shouldProcessBatch) { + return new NoCommit(); } + + try { + foreach ($this->logs as $internalId => $projectLogs) { + $dbForProject = $getProjectDB($projectLogs['project']); + + Console::log('Processing batch with ' . count($projectLogs['logs']) . ' events'); + $audit = new Audit($dbForProject); + + $audit->logBatch($projectLogs['logs']); + Console::success('Audit logs processed successfully'); + + unset($this->logs[$internalId]); + } + } catch (Throwable $e) { + Console::error('Error processing audit logs: ' . $e->getMessage()); + } + $this->lastTriggeredTime = time(); + return new Commit(); } }