From 08f4ed2b50c96b827196a32c9de6fe8460afd68f Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Tue, 25 Feb 2025 15:50:10 +1300 Subject: [PATCH] Log batches per project --- src/Appwrite/Platform/Workers/Audits.php | 55 ++++++++++++++++-------- 1 file changed, 36 insertions(+), 19 deletions(-) diff --git a/src/Appwrite/Platform/Workers/Audits.php b/src/Appwrite/Platform/Workers/Audits.php index f1ae46eea7..f645463aed 100644 --- a/src/Appwrite/Platform/Workers/Audits.php +++ b/src/Appwrite/Platform/Workers/Audits.php @@ -46,8 +46,9 @@ class Audits extends Action $this ->desc('Audits worker') ->inject('message') - ->inject('dbForProject') - ->callback(fn ($message, $dbForProject) => $this->action($message, $dbForProject)); + ->inject('getProjectDB') + ->inject('project') + ->callback([$this, 'action']); $this->lastTriggeredTime = time(); } @@ -55,14 +56,15 @@ class Audits extends Action /** * @param Message $message - * @param Database $dbForProject + * @param callable $getProjectDB + * @param Document $project * @return void * @throws Throwable * @throws \Utopia\Database\Exception * @throws Authorization * @throws Structure */ - public function action(Message $message, Database $dbForProject): void + public function action(Message $message, callable $getProjectDB, Document $project): void { $payload = $message->getPayload() ?? []; @@ -103,28 +105,43 @@ class Audits extends Action 'timestamp' => DateTime::formatTz(DateTime::now()) ]; - $this->logs[] = $eventData; + if (isset($this->logs[$project->getInternalId()])) { + $this->logs[$project->getInternalId()]['logs'][] = $eventData; + } else { + $this->logs[$project->getInternalId()] = [ + 'project' => new Document([ + '$id' => $project->getId(), + '$internalId' => $project->getInternalId(), + 'database' => $project->getAttribute('database'), + ]), + 'logs' => [$eventData] + ]; + } // Check if we should process the batch by checking both for the batch size and the elapsed time $batchSize = $this->getBatchSize(); - $shouldProcessBatch = count($this->logs) >= $batchSize; - if (!$shouldProcessBatch && count($this->logs) > 0) { - $shouldProcessBatch = (time() - $this->lastTriggeredTime) >= self::BATCH_AGGREGATION_INTERVAL; + $shouldProcessBatch = \count($this->logs) >= $batchSize; + if (!$shouldProcessBatch && \count($this->logs) > 0) { + $shouldProcessBatch = (\time() - $this->lastTriggeredTime) >= self::BATCH_AGGREGATION_INTERVAL; } if ($shouldProcessBatch) { - Console::log('Processing batch with ' . count($this->logs) . ' events'); - $audit = new Audit($dbForProject); + foreach ($this->logs as $projectLogs) { + $dbForProject = $getProjectDB($projectLogs['project']); - try { - $audit->logBatch($this->logs); - Console::success('Audit logs processed successfully'); - } catch (Throwable $e) { - Console::error('Error processing audit logs: ' . $e->getMessage()); - } finally { - // Clear the pending events after successful batch processing - $this->logs = []; - $this->lastTriggeredTime = time(); + Console::log('Processing batch with ' . count($projectLogs['logs']) . ' events'); + $audit = new Audit($dbForProject); + + try { + $audit->logBatch($projectLogs['logs']); + Console::success('Audit logs processed successfully'); + } catch (Throwable $e) { + Console::error('Error processing audit logs: ' . $e->getMessage()); + } finally { + // Clear the pending events after successful batch processing + $this->logs = []; + $this->lastTriggeredTime = time(); + } } } }