feat: optimise events payloads

This commit is contained in:
Christy Jacob 2025-01-16 11:35:22 +05:30
parent 2f83bec071
commit de6fced412
11 changed files with 50 additions and 62 deletions

32
composer.lock generated
View file

@ -1430,16 +1430,16 @@
},
{
"name": "open-telemetry/gen-otlp-protobuf",
"version": "1.2.1",
"version": "1.5.0",
"source": {
"type": "git",
"url": "https://github.com/opentelemetry-php/gen-otlp-protobuf.git",
"reference": "66c3b98e998a726691c92e6405a82e6e7b8b169d"
"reference": "585bafddd4ae6565de154610b10a787a455c9ba0"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/opentelemetry-php/gen-otlp-protobuf/zipball/66c3b98e998a726691c92e6405a82e6e7b8b169d",
"reference": "66c3b98e998a726691c92e6405a82e6e7b8b169d",
"url": "https://api.github.com/repos/opentelemetry-php/gen-otlp-protobuf/zipball/585bafddd4ae6565de154610b10a787a455c9ba0",
"reference": "585bafddd4ae6565de154610b10a787a455c9ba0",
"shasum": ""
},
"require": {
@ -1489,7 +1489,7 @@
"issues": "https://github.com/open-telemetry/opentelemetry-php/issues",
"source": "https://github.com/open-telemetry/opentelemetry-php"
},
"time": "2024-10-30T11:49:49+00:00"
"time": "2025-01-15T23:07:07+00:00"
},
{
"name": "open-telemetry/sdk",
@ -4095,16 +4095,16 @@
},
{
"name": "utopia-php/platform",
"version": "0.7.1",
"version": "0.7.2",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/platform.git",
"reference": "3433a0f1a54988f2a59c735f507745cb2c24638a"
"reference": "6f9243848f1c6466f6509fd01c7e18306a6d8caf"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/platform/zipball/3433a0f1a54988f2a59c735f507745cb2c24638a",
"reference": "3433a0f1a54988f2a59c735f507745cb2c24638a",
"url": "https://api.github.com/repos/utopia-php/platform/zipball/6f9243848f1c6466f6509fd01c7e18306a6d8caf",
"reference": "6f9243848f1c6466f6509fd01c7e18306a6d8caf",
"shasum": ""
},
"require": {
@ -4139,9 +4139,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/platform/issues",
"source": "https://github.com/utopia-php/platform/tree/0.7.1"
"source": "https://github.com/utopia-php/platform/tree/0.7.2"
},
"time": "2024-10-22T10:27:49+00:00"
"time": "2025-01-15T05:56:26+00:00"
},
{
"name": "utopia-php/pools",
@ -5126,16 +5126,16 @@
},
{
"name": "laravel/pint",
"version": "v1.19.0",
"version": "v1.20.0",
"source": {
"type": "git",
"url": "https://github.com/laravel/pint.git",
"reference": "8169513746e1bac70c85d6ea1524d9225d4886f0"
"reference": "53072e8ea22213a7ed168a8a15b96fbb8b82d44b"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/laravel/pint/zipball/8169513746e1bac70c85d6ea1524d9225d4886f0",
"reference": "8169513746e1bac70c85d6ea1524d9225d4886f0",
"url": "https://api.github.com/repos/laravel/pint/zipball/53072e8ea22213a7ed168a8a15b96fbb8b82d44b",
"reference": "53072e8ea22213a7ed168a8a15b96fbb8b82d44b",
"shasum": ""
},
"require": {
@ -5188,7 +5188,7 @@
"issues": "https://github.com/laravel/pint/issues",
"source": "https://github.com/laravel/pint"
},
"time": "2024-12-30T16:20:10+00:00"
"time": "2025-01-14T16:20:53+00:00"
},
{
"name": "matthiasmullie/minify",

View file

@ -136,7 +136,10 @@ class Event
*/
public function setProject(Document $project): self
{
$this->project = $project;
$this->project = new Document([
'$id' => $project->getId(),
'$internalId' => $project->getInternalId()
]);
return $this;
}

View file

@ -20,11 +20,7 @@ class Webhook extends Event
{
/** Filter out context and trim project to keep the payload small */
$this->context = [];
$this->project = new Document([
'$id' => $this->project->getId(),
'$internalId' => $this->project->getInternalId(),
]);
return parent::trigger();
}
}

View file

@ -46,6 +46,7 @@ class Builds extends Action
$this
->desc('Builds worker')
->inject('message')
->inject('project')
->inject('dbForPlatform')
->inject('queueForEvents')
->inject('queueForFunctions')
@ -54,11 +55,12 @@ class Builds extends Action
->inject('dbForProject')
->inject('deviceForFunctions')
->inject('log')
->callback(fn ($message, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Usage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log) => $this->action($message, $dbForPlatform, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $deviceForFunctions, $log));
->callback(fn ($message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Usage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log) => $this->action($message, $project, $dbForPlatform, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $deviceForFunctions, $log));
}
/**
* @param Message $message
* @param Document $project
* @param Database $dbForPlatform
* @param Event $queueForEvents
* @param Func $queueForFunctions
@ -70,7 +72,7 @@ class Builds extends Action
* @return void
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log): void
public function action(Message $message, Document $project, Database $dbForPlatform, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -79,7 +81,6 @@ class Builds extends Action
}
$type = $payload['type'] ?? '';
$project = new Document($payload['project'] ?? []);
$resource = new Document($payload['resource'] ?? []);
$deployment = new Document($payload['deployment'] ?? []);
$template = new Document($payload['template'] ?? []);

View file

@ -34,21 +34,23 @@ class Databases extends Action
$this
->desc('Databases worker')
->inject('message')
->inject('project')
->inject('dbForPlatform')
->inject('dbForProject')
->inject('log')
->callback(fn (Message $message, Database $dbForPlatform, Database $dbForProject, Log $log) => $this->action($message, $dbForPlatform, $dbForProject, $log));
->callback(fn (Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log) => $this->action($message, $project, $dbForPlatform, $dbForProject, $log));
}
/**
* @param Message $message
* @param Document $project
* @param Database $dbForPlatform
* @param Database $dbForProject
* @param Log $log
* @return void
* @throws \Exception
*/
public function action(Message $message, Database $dbForPlatform, Database $dbForProject, Log $log): void
public function action(Message $message, Document $project, Database $dbForPlatform, Database $dbForProject, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -57,7 +59,6 @@ class Databases extends Action
}
$type = $payload['type'];
$project = new Document($payload['project']);
$collection = new Document($payload['collection'] ?? []);
$document = new Document($payload['document'] ?? []);
$database = new Document($payload['database'] ?? []);

View file

@ -43,6 +43,7 @@ class Deletes extends Action
$this
->desc('Deletes worker')
->inject('message')
->inject('project')
->inject('dbForPlatform')
->inject('getProjectDB')
->inject('timelimit')
@ -55,8 +56,8 @@ class Deletes extends Action
->inject('auditRetention')
->inject('log')
->callback(
fn ($message, $dbForPlatform, callable $getProjectDB, callable $timelimit, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, string $executionRetention, string $auditRetention, Log $log) =>
$this->action($message, $dbForPlatform, $getProjectDB, $timelimit, $deviceForFiles, $deviceForFunctions, $deviceForBuilds, $deviceForCache, $certificates, $executionRetention, $auditRetention, $log)
fn ($message, Document $project, Database $dbForPlatform, callable $getProjectDB, callable $timelimit, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, string $executionRetention, string $auditRetention, Log $log) =>
$this->action($message, $project, $dbForPlatform, $getProjectDB, $timelimit, $deviceForFiles, $deviceForFunctions, $deviceForBuilds, $deviceForCache, $certificates, $executionRetention, $auditRetention, $log)
);
}
@ -64,7 +65,7 @@ class Deletes extends Action
* @throws Exception
* @throws Throwable
*/
public function action(Message $message, Database $dbForPlatform, callable $getProjectDB, callable $timelimit, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, string $executionRetention, string $auditRetention, Log $log): void
public function action(Message $message, Document $project, Database $dbForPlatform, callable $getProjectDB, callable $timelimit, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, string $executionRetention, string $auditRetention, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -78,7 +79,6 @@ class Deletes extends Action
$resource = $payload['resource'] ?? null;
$resourceType = $payload['resourceType'] ?? null;
$document = new Document($payload['document'] ?? []);
$project = new Document($payload['project'] ?? []);
$log->addTag('projectId', $project->getId());
$log->addTag('type', $type);

View file

@ -58,15 +58,17 @@ class Messaging extends Action
$this
->desc('Messaging worker')
->inject('message')
->inject('project')
->inject('log')
->inject('dbForProject')
->inject('deviceForFiles')
->inject('queueForUsage')
->callback(fn (Message $message, Log $log, Database $dbForProject, Device $deviceForFiles, Usage $queueForUsage) => $this->action($message, $log, $dbForProject, $deviceForFiles, $queueForUsage));
->callback(fn (Message $message, Document $project, Log $log, Database $dbForProject, Device $deviceForFiles, Usage $queueForUsage) => $this->action($message, $project, $log, $dbForProject, $deviceForFiles, $queueForUsage));
}
/**
* @param Message $message
* @param Document $project
* @param Log $log
* @param Database $dbForProject
* @param Device $deviceForFiles
@ -76,6 +78,7 @@ class Messaging extends Action
*/
public function action(
Message $message,
Document $project,
Log $log,
Database $dbForProject,
Device $deviceForFiles,
@ -89,7 +92,6 @@ class Messaging extends Action
}
$type = $payload['type'] ?? '';
$project = new Document($payload['project'] ?? []);
switch ($type) {
case MESSAGE_SEND_TYPE_INTERNAL:

View file

@ -51,16 +51,17 @@ class Migrations extends Action
$this
->desc('Migrations worker')
->inject('message')
->inject('project')
->inject('dbForProject')
->inject('dbForPlatform')
->inject('logError')
->callback(fn (Message $message, Database $dbForProject, Database $dbForPlatform, callable $logError) => $this->action($message, $dbForProject, $dbForPlatform, $logError));
->callback(fn (Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError) => $this->action($message, $project, $dbForProject, $dbForPlatform, $logError));
}
/**
* @throws Exception
*/
public function action(Message $message, Database $dbForProject, Database $dbForPlatform, callable $logError): void
public function action(Message $message, Document $project, Database $dbForProject, Database $dbForPlatform, callable $logError): void
{
$payload = $message->getPayload() ?? [];
@ -69,7 +70,6 @@ class Migrations extends Action
}
$events = $payload['events'] ?? [];
$project = new Document($payload['project'] ?? []);
$migration = new Document($payload['migration'] ?? []);
if ($project->getId() === 'console') {

View file

@ -34,10 +34,11 @@ class Usage extends Action
$this
->desc('Usage worker')
->inject('message')
->inject('project')
->inject('getProjectDB')
->inject('queueForUsageDump')
->callback(function (Message $message, callable $getProjectDB, UsageDump $queueForUsageDump) {
$this->action($message, $getProjectDB, $queueForUsageDump);
->callback(function (Message $message, Document $project, callable $getProjectDB, UsageDump $queueForUsageDump) {
$this->action($message, $project, $getProjectDB, $queueForUsageDump);
});
$this->aggregationInterval = (int) System::getEnv('_APP_USAGE_AGGREGATION_INTERVAL', '20');
@ -46,21 +47,20 @@ class Usage extends Action
/**
* @param Message $message
* @param Document $project
* @param callable $getProjectDB
* @param UsageDump $queueForUsageDump
* @return void
* @throws \Utopia\Database\Exception
* @throws Exception
*/
public function action(Message $message, callable $getProjectDB, UsageDump $queueForUsageDump): void
public function action(Message $message, Document $project, callable $getProjectDB, UsageDump $queueForUsageDump): void
{
$payload = $message->getPayload() ?? [];
if (empty($payload)) {
throw new Exception('Missing payload');
}
$document = $payload['project'] ?? [];
$project = new Document($document);
if (empty($project->getAttribute('database'))) {
var_dump($payload);

View file

@ -59,26 +59,11 @@ class UsageDump extends Action
foreach ($payload['stats'] ?? [] as $stats) {
//$project = new Document($stats['project'] ?? []);
/**
* Start temp bug fallback
*/
$document = $stats['project'] ?? [];
if (!empty($document['$uid'])) {
$document['$id'] = $document['$uid'];
}
$project = new Document($document);
if (empty($project->getAttribute('database'))) {
continue;
}
$project = new Document($stats['project'] ?? []);
/**
* End temp bug fallback
*/
$numberOfKeys = !empty($stats['keys']) ? count($stats['keys']) : 0;
$receivedAt = $stats['receivedAt'] ?? 'NONE';
if ($numberOfKeys === 0) {

View file

@ -32,22 +32,24 @@ class Webhooks extends Action
$this
->desc('Webhooks worker')
->inject('message')
->inject('project')
->inject('dbForPlatform')
->inject('queueForMails')
->inject('queueForUsage')
->inject('log')
->callback(fn (Message $message, Database $dbForPlatform, Mail $queueForMails, Usage $queueForUsage, Log $log) => $this->action($message, $dbForPlatform, $queueForMails, $queueForUsage, $log));
->callback(fn (Message $message, Document $project, Database $dbForPlatform, Mail $queueForMails, Usage $queueForUsage, Log $log) => $this->action($message, $project, $dbForPlatform, $queueForMails, $queueForUsage, $log));
}
/**
* @param Message $message
* @param Document $project
* @param Database $dbForPlatform
* @param Mail $queueForMails
* @param Log $log
* @return void
* @throws Exception
*/
public function action(Message $message, Database $dbForPlatform, Mail $queueForMails, Usage $queueForUsage, Log $log): void
public function action(Message $message, Document $project, Database $dbForPlatform, Mail $queueForMails, Usage $queueForUsage, Log $log): void
{
$this->errors = [];
$payload = $message->getPayload() ?? [];
@ -60,8 +62,6 @@ class Webhooks extends Action
$webhookPayload = json_encode($payload['payload']);
$user = new Document($payload['user'] ?? []);
$project = new Document($payload['project']);
$project = $dbForPlatform->getDocument('projects', $project->getId());
$log->addTag('projectId', $project->getId());
foreach ($project->getAttribute('webhooks', []) as $webhook) {