Log batches per project

This commit is contained in:
Jake Barnby 2025-02-25 15:50:10 +13:00
parent 392b33ef26
commit 08f4ed2b50
No known key found for this signature in database
GPG key ID: C437A8CC85B96E9C

View file

@ -46,8 +46,9 @@ class Audits extends Action
$this
->desc('Audits worker')
->inject('message')
->inject('dbForProject')
->callback(fn ($message, $dbForProject) => $this->action($message, $dbForProject));
->inject('getProjectDB')
->inject('project')
->callback([$this, 'action']);
$this->lastTriggeredTime = time();
}
@ -55,14 +56,15 @@ class Audits extends Action
/**
* @param Message $message
* @param Database $dbForProject
* @param callable $getProjectDB
* @param Document $project
* @return void
* @throws Throwable
* @throws \Utopia\Database\Exception
* @throws Authorization
* @throws Structure
*/
public function action(Message $message, Database $dbForProject): void
public function action(Message $message, callable $getProjectDB, Document $project): void
{
$payload = $message->getPayload() ?? [];
@ -103,28 +105,43 @@ class Audits extends Action
'timestamp' => DateTime::formatTz(DateTime::now())
];
$this->logs[] = $eventData;
if (isset($this->logs[$project->getInternalId()])) {
$this->logs[$project->getInternalId()]['logs'][] = $eventData;
} else {
$this->logs[$project->getInternalId()] = [
'project' => new Document([
'$id' => $project->getId(),
'$internalId' => $project->getInternalId(),
'database' => $project->getAttribute('database'),
]),
'logs' => [$eventData]
];
}
// Check if we should process the batch by checking both for the batch size and the elapsed time
$batchSize = $this->getBatchSize();
$shouldProcessBatch = count($this->logs) >= $batchSize;
if (!$shouldProcessBatch && count($this->logs) > 0) {
$shouldProcessBatch = (time() - $this->lastTriggeredTime) >= self::BATCH_AGGREGATION_INTERVAL;
$shouldProcessBatch = \count($this->logs) >= $batchSize;
if (!$shouldProcessBatch && \count($this->logs) > 0) {
$shouldProcessBatch = (\time() - $this->lastTriggeredTime) >= self::BATCH_AGGREGATION_INTERVAL;
}
if ($shouldProcessBatch) {
Console::log('Processing batch with ' . count($this->logs) . ' events');
$audit = new Audit($dbForProject);
foreach ($this->logs as $projectLogs) {
$dbForProject = $getProjectDB($projectLogs['project']);
try {
$audit->logBatch($this->logs);
Console::success('Audit logs processed successfully');
} catch (Throwable $e) {
Console::error('Error processing audit logs: ' . $e->getMessage());
} finally {
// Clear the pending events after successful batch processing
$this->logs = [];
$this->lastTriggeredTime = time();
Console::log('Processing batch with ' . count($projectLogs['logs']) . ' events');
$audit = new Audit($dbForProject);
try {
$audit->logBatch($projectLogs['logs']);
Console::success('Audit logs processed successfully');
} catch (Throwable $e) {
Console::error('Error processing audit logs: ' . $e->getMessage());
} finally {
// Clear the pending events after successful batch processing
$this->logs = [];
$this->lastTriggeredTime = time();
}
}
}
}