Merge pull request #9731 from appwrite/PLA-2683

chore(audits): return queue pre-fetch results
This commit is contained in:
Christy Jacob 2025-06-04 13:44:36 +04:00 committed by GitHub
commit 3a1cd22be5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -12,13 +12,13 @@ use Utopia\Database\Exception\Authorization;
use Utopia\Database\Exception\Structure; use Utopia\Database\Exception\Structure;
use Utopia\Platform\Action; use Utopia\Platform\Action;
use Utopia\Queue\Message; use Utopia\Queue\Message;
use Utopia\Queue\Result\Commit;
use Utopia\Queue\Result\NoCommit;
use Utopia\System\System; use Utopia\System\System;
class Audits extends Action class Audits extends Action
{ {
protected const BATCH_SIZE_DEVELOPMENT = 1; // smaller batch size for development protected const int BATCH_AGGREGATION_INTERVAL = 60; // in seconds
protected const BATCH_SIZE_PRODUCTION = 5_000;
protected const BATCH_AGGREGATION_INTERVAL = 60; // in seconds
private int $lastTriggeredTime = 0; private int $lastTriggeredTime = 0;
@ -27,9 +27,7 @@ class Audits extends Action
protected function getBatchSize(): int protected function getBatchSize(): int
{ {
return System::getEnv('_APP_ENV', 'development') === 'development' return intval(System::getEnv('_APP_QUEUE_PREFETCH_COUNT', 1));
? self::BATCH_SIZE_DEVELOPMENT
: self::BATCH_SIZE_PRODUCTION;
} }
public static function getName(): string public static function getName(): string
@ -57,13 +55,13 @@ class Audits extends Action
* @param Message $message * @param Message $message
* @param callable $getProjectDB * @param callable $getProjectDB
* @param Document $project * @param Document $project
* @return void * @return Commit|NoCommit
* @throws Throwable * @throws Throwable
* @throws \Utopia\Database\Exception * @throws \Utopia\Database\Exception
* @throws Authorization * @throws Authorization
* @throws Structure * @throws Structure
*/ */
public function action(Message $message, callable $getProjectDB, Document $project): void public function action(Message $message, callable $getProjectDB, Document $project): Commit|NoCommit
{ {
$payload = $message->getPayload() ?? []; $payload = $message->getPayload() ?? [];
@ -123,29 +121,32 @@ class Audits extends Action
// Check if we should process the batch by checking both for the batch size and the elapsed time // Check if we should process the batch by checking both for the batch size and the elapsed time
$batchSize = $this->getBatchSize(); $batchSize = $this->getBatchSize();
$shouldProcessBatch = \count($this->logs) >= $batchSize; $logCount = array_reduce($this->logs, fn (int $current, $logs) => $current + count($logs['logs']), 0);
if (!$shouldProcessBatch && \count($this->logs) > 0) { $shouldProcessBatch = $logCount >= $batchSize;
if (!$shouldProcessBatch && $logCount > 0) {
$shouldProcessBatch = (\time() - $this->lastTriggeredTime) >= self::BATCH_AGGREGATION_INTERVAL; $shouldProcessBatch = (\time() - $this->lastTriggeredTime) >= self::BATCH_AGGREGATION_INTERVAL;
} }
if ($shouldProcessBatch) { if (!$shouldProcessBatch) {
try { return new NoCommit();
foreach ($this->logs as $internalId => $projectLogs) {
$dbForProject = $getProjectDB($projectLogs['project']);
Console::log('Processing batch with ' . count($projectLogs['logs']) . ' events');
$audit = new Audit($dbForProject);
$audit->logBatch($projectLogs['logs']);
Console::success('Audit logs processed successfully');
unset($this->logs[$internalId]);
}
} catch (Throwable $e) {
Console::error('Error processing audit logs: ' . $e->getMessage());
} finally {
$this->lastTriggeredTime = time();
}
} }
try {
foreach ($this->logs as $internalId => $projectLogs) {
$dbForProject = $getProjectDB($projectLogs['project']);
Console::log('Processing batch with ' . count($projectLogs['logs']) . ' events');
$audit = new Audit($dbForProject);
$audit->logBatch($projectLogs['logs']);
Console::success('Audit logs processed successfully');
unset($this->logs[$internalId]);
}
} catch (Throwable $e) {
Console::error('Error processing audit logs: ' . $e->getMessage());
}
$this->lastTriggeredTime = time();
return new Commit();
} }
} }