mirror of
https://github.com/appwrite/appwrite
synced 2026-05-22 16:38:32 +00:00
Clean up state helper
This commit is contained in:
parent
522a4d2a62
commit
71b2164154
2 changed files with 423 additions and 178 deletions
|
|
@ -8,6 +8,11 @@ use Utopia\Database\Query;
|
|||
|
||||
/**
|
||||
* Service for managing transaction state and providing transaction-aware document operations
|
||||
*
|
||||
* This class provides methods to:
|
||||
* - Query documents with transaction awareness (getDocument, listDocuments, countDocuments)
|
||||
* - Apply bulk operations to transaction state for cross-operation visibility
|
||||
* - Replay transaction operations to build current state
|
||||
*/
|
||||
class TransactionState
|
||||
{
|
||||
|
|
@ -18,161 +23,15 @@ class TransactionState
|
|||
$this->dbForProject = $dbForProject;
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply projection (select) semantics from queries to a document
|
||||
*/
|
||||
private function applyProjection(Document $doc, array $queries): Document
|
||||
{
|
||||
if (empty($queries)) {
|
||||
return $doc;
|
||||
}
|
||||
|
||||
// Extract selections from queries
|
||||
$selections = [];
|
||||
foreach ($queries as $query) {
|
||||
if ($query->getMethod() === Query::TYPE_SELECT) {
|
||||
$values = $query->getValues();
|
||||
foreach ($values as $value) {
|
||||
// Skip relationship selections (containing '.')
|
||||
if (!\str_contains($value, '.')) {
|
||||
$selections[] = $value;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If no selections or wildcard present, return document as-is
|
||||
if (empty($selections) || \in_array('*', $selections)) {
|
||||
return $doc;
|
||||
}
|
||||
|
||||
// Create a new document with only selected attributes
|
||||
$projected = new Document();
|
||||
|
||||
// Always preserve internal attributes
|
||||
$projected->setAttribute('$id', $doc->getId());
|
||||
$projected->setAttribute('$collection', $doc->getCollection());
|
||||
$projected->setAttribute('$createdAt', $doc->getCreatedAt());
|
||||
$projected->setAttribute('$updatedAt', $doc->getUpdatedAt());
|
||||
if ($doc->offsetExists('$permissions')) {
|
||||
$projected->setAttribute('$permissions', $doc->getPermissions());
|
||||
}
|
||||
|
||||
// Add selected attributes
|
||||
foreach ($selections as $attribute) {
|
||||
if ($doc->offsetExists($attribute)) {
|
||||
$projected->setAttribute($attribute, $doc->getAttribute($attribute));
|
||||
}
|
||||
}
|
||||
|
||||
return $projected;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current state of a transaction by replaying its operations
|
||||
*/
|
||||
private function getTransactionState(string $transactionId): array
|
||||
{
|
||||
$transaction = $this->dbForProject->getDocument('transactions', $transactionId);
|
||||
if ($transaction->isEmpty() || $transaction->getAttribute('status') !== 'pending') {
|
||||
return [];
|
||||
}
|
||||
|
||||
// Fetch operations ordered by sequence to replay in exact order
|
||||
$operations = $this->dbForProject->find('transactionLogs', [
|
||||
Query::equal('transactionInternalId', [$transaction->getSequence()]),
|
||||
]);
|
||||
|
||||
$state = [];
|
||||
|
||||
foreach ($operations as $operation) {
|
||||
$databaseInternalId = $operation['databaseInternalId'];
|
||||
$collectionInternalId = $operation['collectionInternalId'];
|
||||
$collectionId = "database_{$databaseInternalId}_collection_{$collectionInternalId}";
|
||||
$documentId = $operation['documentId'];
|
||||
$action = $operation['action'];
|
||||
$data = $operation['data'];
|
||||
|
||||
if ($data instanceof Document) {
|
||||
$data = $data->getArrayCopy();
|
||||
}
|
||||
|
||||
switch ($action) {
|
||||
case 'create':
|
||||
$docId = $documentId ?? ($data['$id'] ?? null);
|
||||
if ($docId) {
|
||||
$state[$collectionId][$docId] = [
|
||||
'action' => 'create',
|
||||
'document' => new Document($data),
|
||||
'exists' => true
|
||||
];
|
||||
}
|
||||
break;
|
||||
case 'update':
|
||||
if (isset($state[$collectionId][$documentId])) {
|
||||
// Update existing document in transaction state
|
||||
$existingDocument = $state[$collectionId][$documentId]['document'];
|
||||
foreach ($data as $key => $value) {
|
||||
if ($key !== '$id') {
|
||||
$existingDocument->setAttribute($key, $value);
|
||||
}
|
||||
}
|
||||
// Only set action to 'update' if it's not already 'create' or 'upsert'
|
||||
$currentAction = $state[$collectionId][$documentId]['action'];
|
||||
if ($currentAction !== 'create' && $currentAction !== 'upsert') {
|
||||
$state[$collectionId][$documentId]['action'] = 'update';
|
||||
}
|
||||
} else {
|
||||
// Document doesn't exist in transaction state, will be merged with committed version
|
||||
$state[$collectionId][$documentId] = [
|
||||
'action' => 'update',
|
||||
'document' => new Document($data),
|
||||
'exists' => true
|
||||
];
|
||||
}
|
||||
break;
|
||||
|
||||
case 'upsert':
|
||||
$docId = $documentId ?? ($data['$id'] ?? null);
|
||||
if (!$docId) {
|
||||
break;
|
||||
}
|
||||
$state[$collectionId][$docId] = [
|
||||
'action' => 'upsert',
|
||||
'document' => new Document($data),
|
||||
'exists' => true
|
||||
];
|
||||
break;
|
||||
|
||||
case 'delete':
|
||||
$state[$collectionId][$documentId] = [
|
||||
'action' => 'delete',
|
||||
'exists' => false
|
||||
];
|
||||
break;
|
||||
|
||||
case 'bulkCreate':
|
||||
if (is_array($data)) {
|
||||
foreach ($data as $doc) {
|
||||
if ($doc instanceof Document) {
|
||||
$doc = $doc->getArrayCopy();
|
||||
}
|
||||
$state[$collectionId][$doc['$id']] = [
|
||||
'action' => 'create',
|
||||
'document' => new Document($doc),
|
||||
'exists' => true
|
||||
];
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return $state;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a document with transaction-aware logic
|
||||
*
|
||||
* @param string $collectionId Collection ID
|
||||
* @param string $documentId Document ID
|
||||
* @param string|null $transactionId Optional transaction ID
|
||||
* @param array $queries Optional query filters
|
||||
* @return Document
|
||||
*/
|
||||
public function getDocument(
|
||||
string $collectionId,
|
||||
|
|
@ -187,7 +46,6 @@ class TransactionState
|
|||
|
||||
$state = $this->getTransactionState($transactionId);
|
||||
|
||||
|
||||
// Check if document exists in transaction state
|
||||
if (isset($state[$collectionId][$documentId])) {
|
||||
$docState = $state[$collectionId][$documentId];
|
||||
|
|
@ -212,8 +70,7 @@ class TransactionState
|
|||
$committedDoc->setAttribute($key, $value);
|
||||
}
|
||||
}
|
||||
// committedDoc already has projection applied by dbForProject->getDocument()
|
||||
// But we need to reapply in case transaction added new fields
|
||||
// Reapply projection in case transaction added new fields
|
||||
return $this->applyProjection($committedDoc, $queries);
|
||||
} elseif ($docState['action'] === 'upsert') {
|
||||
// Upsert created a new document since committed doc doesn't exist
|
||||
|
|
@ -228,6 +85,11 @@ class TransactionState
|
|||
|
||||
/**
|
||||
* List documents with transaction-aware logic
|
||||
*
|
||||
* @param string $collectionId Collection ID
|
||||
* @param string|null $transactionId Optional transaction ID
|
||||
* @param array $queries Optional query filters
|
||||
* @return array Array of Document objects
|
||||
*/
|
||||
public function listDocuments(
|
||||
string $collectionId,
|
||||
|
|
@ -280,6 +142,11 @@ class TransactionState
|
|||
|
||||
/**
|
||||
* Count documents with transaction-aware logic
|
||||
*
|
||||
* @param string $collectionId Collection ID
|
||||
* @param string|null $transactionId Optional transaction ID
|
||||
* @param array $queries Optional query filters
|
||||
* @return int Document count
|
||||
*/
|
||||
public function countDocuments(
|
||||
string $collectionId,
|
||||
|
|
@ -302,7 +169,6 @@ class TransactionState
|
|||
}
|
||||
|
||||
// Build a set of committed document IDs that match the query
|
||||
// We need to find which documents match the filters
|
||||
$committedDocs = $this->dbForProject->find($collectionId, $queries);
|
||||
$committedDocIds = [];
|
||||
foreach ($committedDocs as $doc) {
|
||||
|
|
@ -320,9 +186,6 @@ class TransactionState
|
|||
}
|
||||
} elseif ($docState['action'] === 'create') {
|
||||
// Document was created in transaction
|
||||
// We need to check if it would match the query filters
|
||||
// For now, we'll conservatively add it if no filters are present
|
||||
// or apply basic filter matching
|
||||
if ($this->documentMatchesFilters($docState['document'], $queries)) {
|
||||
$adjustedCount++; // New document that matches
|
||||
}
|
||||
|
|
@ -348,7 +211,285 @@ class TransactionState
|
|||
}
|
||||
|
||||
/**
|
||||
* Check if a document matches filter queries (simplified implementation)
|
||||
* Check if a document exists with transaction-aware logic
|
||||
*
|
||||
* @param string $collectionId Collection ID
|
||||
* @param string $documentId Document ID
|
||||
* @param string|null $transactionId Optional transaction ID
|
||||
* @return bool True if document exists
|
||||
*/
|
||||
public function documentExists(
|
||||
string $collectionId,
|
||||
string $documentId,
|
||||
?string $transactionId = null
|
||||
): bool {
|
||||
$doc = $this->getDocument($collectionId, $documentId, $transactionId);
|
||||
return !$doc->isEmpty();
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply bulk update to documents in transaction state that match queries
|
||||
*
|
||||
* This allows bulk operations within a transaction to see each other's changes.
|
||||
*
|
||||
* @param string $collectionId Collection ID
|
||||
* @param Document $updateData Document with update values
|
||||
* @param array $queries Query filters to match documents
|
||||
* @param array &$state Transaction state (passed by reference)
|
||||
* @return void
|
||||
*/
|
||||
public function applyBulkUpdateToState(
|
||||
string $collectionId,
|
||||
Document $updateData,
|
||||
array $queries,
|
||||
array &$state
|
||||
): void {
|
||||
if (!isset($state[$collectionId])) {
|
||||
return;
|
||||
}
|
||||
|
||||
foreach ($state[$collectionId] as $docId => $doc) {
|
||||
if ($this->documentMatchesFilters($doc, $queries)) {
|
||||
// Apply the update to the state document
|
||||
foreach ($updateData->getArrayCopy() as $key => $value) {
|
||||
if ($key !== '$id') {
|
||||
$doc->setAttribute($key, $value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply bulk delete to documents in transaction state that match queries
|
||||
*
|
||||
* This allows bulk operations within a transaction to see each other's changes.
|
||||
*
|
||||
* @param string $collectionId Collection ID
|
||||
* @param array $queries Query filters to match documents
|
||||
* @param array &$state Transaction state (passed by reference)
|
||||
* @return void
|
||||
*/
|
||||
public function applyBulkDeleteToState(
|
||||
string $collectionId,
|
||||
array $queries,
|
||||
array &$state
|
||||
): void {
|
||||
if (!isset($state[$collectionId])) {
|
||||
return;
|
||||
}
|
||||
|
||||
foreach ($state[$collectionId] as $docId => $doc) {
|
||||
if ($this->documentMatchesFilters($doc, $queries)) {
|
||||
unset($state[$collectionId][$docId]);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply bulk upsert to documents in transaction state
|
||||
*
|
||||
* This allows bulk operations within a transaction to see each other's changes.
|
||||
*
|
||||
* @param string $collectionId Collection ID
|
||||
* @param array $documents Array of Document objects to upsert
|
||||
* @param array &$state Transaction state (passed by reference)
|
||||
* @return void
|
||||
*/
|
||||
public function applyBulkUpsertToState(
|
||||
string $collectionId,
|
||||
array $documents,
|
||||
array &$state
|
||||
): void {
|
||||
foreach ($documents as $doc) {
|
||||
if (!($doc instanceof Document)) {
|
||||
continue;
|
||||
}
|
||||
|
||||
$docId = $doc->getId();
|
||||
if (!$docId) {
|
||||
continue;
|
||||
}
|
||||
|
||||
// If document exists in state, update it; otherwise it will be handled by DB upsert
|
||||
if (isset($state[$collectionId][$docId])) {
|
||||
// Apply updates to existing state document
|
||||
foreach ($doc->getArrayCopy() as $key => $value) {
|
||||
if ($key !== '$id') {
|
||||
$state[$collectionId][$docId]->setAttribute($key, $value);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the current state of a transaction by replaying its operations
|
||||
*
|
||||
* @param string $transactionId Transaction ID
|
||||
* @return array State array with structure: [collectionId => [docId => ['action' => ..., 'document' => ..., 'exists' => ...]]]
|
||||
*/
|
||||
private function getTransactionState(string $transactionId): array
|
||||
{
|
||||
$transaction = $this->dbForProject->getDocument('transactions', $transactionId);
|
||||
if ($transaction->isEmpty() || $transaction->getAttribute('status') !== 'pending') {
|
||||
return [];
|
||||
}
|
||||
|
||||
// Fetch operations ordered by sequence to replay in exact order
|
||||
$operations = $this->dbForProject->find('transactionLogs', [
|
||||
Query::equal('transactionInternalId', [$transaction->getSequence()]),
|
||||
]);
|
||||
|
||||
$state = [];
|
||||
|
||||
foreach ($operations as $operation) {
|
||||
$databaseInternalId = $operation['databaseInternalId'];
|
||||
$collectionInternalId = $operation['collectionInternalId'];
|
||||
$collectionId = "database_{$databaseInternalId}_collection_{$collectionInternalId}";
|
||||
$documentId = $operation['documentId'];
|
||||
$action = $operation['action'];
|
||||
$data = $operation['data'];
|
||||
|
||||
if ($data instanceof Document) {
|
||||
$data = $data->getArrayCopy();
|
||||
}
|
||||
|
||||
switch ($action) {
|
||||
case 'create':
|
||||
$docId = $documentId ?? ($data['$id'] ?? null);
|
||||
if ($docId) {
|
||||
$state[$collectionId][$docId] = [
|
||||
'action' => 'create',
|
||||
'document' => new Document($data),
|
||||
'exists' => true
|
||||
];
|
||||
}
|
||||
break;
|
||||
|
||||
case 'update':
|
||||
if (isset($state[$collectionId][$documentId])) {
|
||||
// Update existing document in transaction state
|
||||
$existingDocument = $state[$collectionId][$documentId]['document'];
|
||||
foreach ($data as $key => $value) {
|
||||
if ($key !== '$id') {
|
||||
$existingDocument->setAttribute($key, $value);
|
||||
}
|
||||
}
|
||||
// Only set action to 'update' if it's not already 'create' or 'upsert'
|
||||
$currentAction = $state[$collectionId][$documentId]['action'];
|
||||
if ($currentAction !== 'create' && $currentAction !== 'upsert') {
|
||||
$state[$collectionId][$documentId]['action'] = 'update';
|
||||
}
|
||||
} else {
|
||||
// Document doesn't exist in transaction state, will be merged with committed version
|
||||
$state[$collectionId][$documentId] = [
|
||||
'action' => 'update',
|
||||
'document' => new Document($data),
|
||||
'exists' => true
|
||||
];
|
||||
}
|
||||
break;
|
||||
|
||||
case 'upsert':
|
||||
$docId = $documentId ?? ($data['$id'] ?? null);
|
||||
if (!$docId) {
|
||||
break;
|
||||
}
|
||||
$state[$collectionId][$docId] = [
|
||||
'action' => 'upsert',
|
||||
'document' => new Document($data),
|
||||
'exists' => true
|
||||
];
|
||||
break;
|
||||
|
||||
case 'delete':
|
||||
$state[$collectionId][$documentId] = [
|
||||
'action' => 'delete',
|
||||
'exists' => false
|
||||
];
|
||||
break;
|
||||
|
||||
case 'bulkCreate':
|
||||
if (\is_array($data)) {
|
||||
foreach ($data as $doc) {
|
||||
if ($doc instanceof Document) {
|
||||
$doc = $doc->getArrayCopy();
|
||||
}
|
||||
$state[$collectionId][$doc['$id']] = [
|
||||
'action' => 'create',
|
||||
'document' => new Document($doc),
|
||||
'exists' => true
|
||||
];
|
||||
}
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
return $state;
|
||||
}
|
||||
|
||||
/**
|
||||
* Apply projection (select) semantics from queries to a document
|
||||
*
|
||||
* @param Document $doc Document to apply projection to
|
||||
* @param array $queries Query array that may contain select queries
|
||||
* @return Document Projected document
|
||||
*/
|
||||
private function applyProjection(Document $doc, array $queries): Document
|
||||
{
|
||||
if (empty($queries)) {
|
||||
return $doc;
|
||||
}
|
||||
|
||||
// Extract selections from queries
|
||||
$selections = [];
|
||||
foreach ($queries as $query) {
|
||||
if ($query->getMethod() === Query::TYPE_SELECT) {
|
||||
$values = $query->getValues();
|
||||
foreach ($values as $value) {
|
||||
// Skip relationship selections (containing '.')
|
||||
if (!\str_contains($value, '.')) {
|
||||
$selections[] = $value;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// If no selections or wildcard present, return document as-is
|
||||
if (empty($selections) || \in_array('*', $selections)) {
|
||||
return $doc;
|
||||
}
|
||||
|
||||
// Create a new document with only selected attributes
|
||||
$projected = new Document();
|
||||
|
||||
// Always preserve internal attributes
|
||||
$projected->setAttribute('$id', $doc->getId());
|
||||
$projected->setAttribute('$collection', $doc->getCollection());
|
||||
$projected->setAttribute('$createdAt', $doc->getCreatedAt());
|
||||
$projected->setAttribute('$updatedAt', $doc->getUpdatedAt());
|
||||
if ($doc->offsetExists('$permissions')) {
|
||||
$projected->setAttribute('$permissions', $doc->getPermissions());
|
||||
}
|
||||
|
||||
// Add selected attributes
|
||||
foreach ($selections as $attribute) {
|
||||
if ($doc->offsetExists($attribute)) {
|
||||
$projected->setAttribute($attribute, $doc->getAttribute($attribute));
|
||||
}
|
||||
}
|
||||
|
||||
return $projected;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a document matches filter queries
|
||||
*
|
||||
* @param Document $doc Document to check
|
||||
* @param array $queries Query filters
|
||||
* @return bool True if document matches all filters
|
||||
*/
|
||||
private function documentMatchesFilters(Document $doc, array $queries): bool
|
||||
{
|
||||
|
|
@ -387,11 +528,13 @@ class TransactionState
|
|||
return false;
|
||||
}
|
||||
break;
|
||||
|
||||
case Query::TYPE_NOT_EQUAL:
|
||||
if (\in_array($docValue, $values)) {
|
||||
return false;
|
||||
}
|
||||
break;
|
||||
|
||||
case Query::TYPE_CONTAINS:
|
||||
$matches = false;
|
||||
foreach ($values as $value) {
|
||||
|
|
@ -404,6 +547,7 @@ class TransactionState
|
|||
return false;
|
||||
}
|
||||
break;
|
||||
|
||||
case Query::TYPE_STARTS_WITH:
|
||||
$matches = false;
|
||||
foreach ($values as $value) {
|
||||
|
|
@ -416,6 +560,7 @@ class TransactionState
|
|||
return false;
|
||||
}
|
||||
break;
|
||||
|
||||
case Query::TYPE_ENDS_WITH:
|
||||
$matches = false;
|
||||
foreach ($values as $value) {
|
||||
|
|
@ -428,36 +573,43 @@ class TransactionState
|
|||
return false;
|
||||
}
|
||||
break;
|
||||
|
||||
case Query::TYPE_GREATER_THAN:
|
||||
if (!($docValue > $values[0])) {
|
||||
return false;
|
||||
}
|
||||
break;
|
||||
|
||||
case Query::TYPE_GREATER_THAN_EQUAL:
|
||||
if (!($docValue >= $values[0])) {
|
||||
return false;
|
||||
}
|
||||
break;
|
||||
|
||||
case Query::TYPE_LESSER_THAN:
|
||||
if (!($docValue < $values[0])) {
|
||||
return false;
|
||||
}
|
||||
break;
|
||||
|
||||
case Query::TYPE_LESSER_THAN_EQUAL:
|
||||
if (!($docValue <= $values[0])) {
|
||||
return false;
|
||||
}
|
||||
break;
|
||||
|
||||
case Query::TYPE_IS_NULL:
|
||||
if (!\is_null($docValue)) {
|
||||
return false;
|
||||
}
|
||||
break;
|
||||
|
||||
case Query::TYPE_IS_NOT_NULL:
|
||||
if (\is_null($docValue)) {
|
||||
return false;
|
||||
}
|
||||
break;
|
||||
|
||||
case Query::TYPE_BETWEEN:
|
||||
if (!($docValue >= $values[0] && $docValue <= $values[1])) {
|
||||
return false;
|
||||
|
|
@ -468,16 +620,4 @@ class TransactionState
|
|||
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if a document exists with transaction-aware logic
|
||||
*/
|
||||
public function documentExists(
|
||||
string $collectionId,
|
||||
string $documentId,
|
||||
?string $transactionId = null
|
||||
): bool {
|
||||
$doc = $this->getDocument($collectionId, $documentId, $transactionId);
|
||||
return !$doc->isEmpty();
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -2,6 +2,7 @@
|
|||
|
||||
namespace Appwrite\Platform\Modules\Databases\Http\Databases\Transactions;
|
||||
|
||||
use Appwrite\Databases\TransactionState;
|
||||
use Appwrite\Event\Delete;
|
||||
use Appwrite\Event\Event;
|
||||
use Appwrite\Event\StatsUsage;
|
||||
|
|
@ -66,6 +67,7 @@ class Update extends Action
|
|||
->param('rollback', false, new Boolean(), 'Rollback transaction?', true)
|
||||
->inject('response')
|
||||
->inject('dbForProject')
|
||||
->inject('transactionState')
|
||||
->inject('queueForDeletes')
|
||||
->inject('queueForEvents')
|
||||
->inject('queueForStatsUsage')
|
||||
|
|
@ -81,6 +83,7 @@ class Update extends Action
|
|||
* @param bool $rollback
|
||||
* @param UtopiaResponse $response
|
||||
* @param Database $dbForProject
|
||||
* @param TransactionState $transactionState
|
||||
* @param Delete $queueForDeletes
|
||||
* @param Event $queueForEvents
|
||||
* @param StatsUsage $queueForStatsUsage
|
||||
|
|
@ -96,7 +99,7 @@ class Update extends Action
|
|||
* @throws Structure
|
||||
* @throws \Utopia\Exception
|
||||
*/
|
||||
public function action(string $transactionId, bool $commit, bool $rollback, UtopiaResponse $response, Database $dbForProject, Delete $queueForDeletes, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks): void
|
||||
public function action(string $transactionId, bool $commit, bool $rollback, UtopiaResponse $response, Database $dbForProject, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks): void
|
||||
{
|
||||
if (!$commit && !$rollback) {
|
||||
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Either commit or rollback must be true');
|
||||
|
|
@ -182,13 +185,13 @@ class Update extends Action
|
|||
$this->handleBulkCreateOperation($dbForProject, $collectionId, $data, $createdAt, $state);
|
||||
break;
|
||||
case 'bulkUpdate':
|
||||
$this->handleBulkUpdateOperation($dbForProject, $collectionId, $data, $createdAt, $state);
|
||||
$this->handleBulkUpdateOperation($dbForProject, $transactionState, $collectionId, $data, $createdAt, $state);
|
||||
break;
|
||||
case 'bulkUpsert':
|
||||
$this->handleBulkUpsertOperation($dbForProject, $collectionId, $data, $createdAt, $state);
|
||||
$this->handleBulkUpsertOperation($dbForProject, $transactionState, $collectionId, $data, $createdAt, $state);
|
||||
break;
|
||||
case 'bulkDelete':
|
||||
$this->handleBulkDeleteOperation($dbForProject, $collectionId, $data, $createdAt, $state);
|
||||
$this->handleBulkDeleteOperation($dbForProject, $transactionState, $collectionId, $data, $createdAt, $state);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
|
@ -348,6 +351,14 @@ class Update extends Action
|
|||
|
||||
/**
|
||||
* Handle create operation
|
||||
*
|
||||
* @param Database $dbForProject
|
||||
* @param string $collectionId
|
||||
* @param string|null $documentId
|
||||
* @param array $data
|
||||
* @param \DateTime $createdAt
|
||||
* @param array &$state
|
||||
* @return void
|
||||
* @throws \Utopia\Database\Exception
|
||||
*/
|
||||
private function handleCreateOperation(
|
||||
|
|
@ -371,6 +382,14 @@ class Update extends Action
|
|||
|
||||
/**
|
||||
* Handle update operation
|
||||
*
|
||||
* @param Database $dbForProject
|
||||
* @param string $collectionId
|
||||
* @param string $documentId
|
||||
* @param array $data
|
||||
* @param \DateTime $createdAt
|
||||
* @param array &$state
|
||||
* @return void
|
||||
* @throws ConflictException
|
||||
* @throws \Utopia\Database\Exception
|
||||
*/
|
||||
|
|
@ -410,6 +429,14 @@ class Update extends Action
|
|||
|
||||
/**
|
||||
* Handle upsert operation
|
||||
*
|
||||
* @param Database $dbForProject
|
||||
* @param string $collectionId
|
||||
* @param string|null $documentId
|
||||
* @param array $data
|
||||
* @param \DateTime $createdAt
|
||||
* @param array &$state
|
||||
* @return void
|
||||
* @throws \Utopia\Database\Exception
|
||||
*/
|
||||
private function handleUpsertOperation(
|
||||
|
|
@ -442,6 +469,15 @@ class Update extends Action
|
|||
|
||||
/**
|
||||
* Handle delete operation
|
||||
*
|
||||
* @param Database $dbForProject
|
||||
* @param string $collectionId
|
||||
* @param string $documentId
|
||||
* @param \DateTime $createdAt
|
||||
* @param array &$state
|
||||
* @return void
|
||||
* @throws \Utopia\Database\Exception
|
||||
* @throws NotFoundException
|
||||
*/
|
||||
private function handleDeleteOperation(
|
||||
Database $dbForProject,
|
||||
|
|
@ -473,6 +509,16 @@ class Update extends Action
|
|||
|
||||
/**
|
||||
* Handle increment operation
|
||||
*
|
||||
* @param Database $dbForProject
|
||||
* @param string $collectionId
|
||||
* @param string $documentId
|
||||
* @param array $data
|
||||
* @param \DateTime $createdAt
|
||||
* @param array &$state
|
||||
* @return void
|
||||
* @throws ConflictException
|
||||
* @throws \Utopia\Database\Exception
|
||||
*/
|
||||
private function handleIncrementOperation(
|
||||
Database $dbForProject,
|
||||
|
|
@ -510,6 +556,16 @@ class Update extends Action
|
|||
|
||||
/**
|
||||
* Handle decrement operation
|
||||
*
|
||||
* @param Database $dbForProject
|
||||
* @param string $collectionId
|
||||
* @param string $documentId
|
||||
* @param array $data
|
||||
* @param \DateTime $createdAt
|
||||
* @param array &$state
|
||||
* @return void
|
||||
* @throws ConflictException
|
||||
* @throws \Utopia\Database\Exception
|
||||
*/
|
||||
private function handleDecrementOperation(
|
||||
Database $dbForProject,
|
||||
|
|
@ -547,6 +603,14 @@ class Update extends Action
|
|||
|
||||
/**
|
||||
* Handle bulk create operation
|
||||
*
|
||||
* @param Database $dbForProject
|
||||
* @param string $collectionId
|
||||
* @param array $data
|
||||
* @param \DateTime $createdAt
|
||||
* @param array &$state
|
||||
* @return void
|
||||
* @throws \Utopia\Database\Exception
|
||||
*/
|
||||
private function handleBulkCreateOperation(
|
||||
Database $dbForProject,
|
||||
|
|
@ -573,22 +637,33 @@ class Update extends Action
|
|||
|
||||
/**
|
||||
* Handle bulk update operation with manual timestamp checking
|
||||
*
|
||||
* @param Database $dbForProject
|
||||
* @param TransactionState $transactionState
|
||||
* @param string $collectionId
|
||||
* @param array $data
|
||||
* @param \DateTime $createdAt
|
||||
* @param array &$state
|
||||
* @return void
|
||||
* @throws \Utopia\Database\Exception
|
||||
* @throws \Utopia\Database\Exception\Query
|
||||
* @throws ConflictException
|
||||
*/
|
||||
private function handleBulkUpdateOperation(
|
||||
Database $dbForProject,
|
||||
TransactionState $transactionState,
|
||||
string $collectionId,
|
||||
array $data,
|
||||
\DateTime $createdAt,
|
||||
array &$state
|
||||
): void {
|
||||
$queries = Query::parseQueries($data['queries'] ?? []);
|
||||
$updateData = new Document($data['data']);
|
||||
|
||||
// First, update documents in the committed database
|
||||
$dbForProject->updateDocuments(
|
||||
$collectionId,
|
||||
new Document($data['data']),
|
||||
$updateData,
|
||||
$queries,
|
||||
onNext: function (Document $updated, Document $old) use (&$state, $collectionId, $createdAt) {
|
||||
// Check if this document was created/modified in this transaction
|
||||
|
|
@ -605,14 +680,27 @@ class Update extends Action
|
|||
$state[$collectionId][$updated->getId()] = $updated;
|
||||
}
|
||||
);
|
||||
|
||||
// Also update documents in the transaction state that match the query
|
||||
$transactionState->applyBulkUpdateToState($collectionId, $updateData, $queries, $state);
|
||||
}
|
||||
|
||||
/**
|
||||
* Handle bulk upsert operation with manual timestamp checking
|
||||
*
|
||||
* @param Database $dbForProject
|
||||
* @param TransactionState $transactionState
|
||||
* @param string $collectionId
|
||||
* @param array $data
|
||||
* @param \DateTime $createdAt
|
||||
* @param array &$state
|
||||
* @return void
|
||||
* @throws ConflictException
|
||||
* @throws \Utopia\Database\Exception
|
||||
*/
|
||||
private function handleBulkUpsertOperation(
|
||||
Database $dbForProject,
|
||||
TransactionState $transactionState,
|
||||
string $collectionId,
|
||||
array $data,
|
||||
\DateTime $createdAt,
|
||||
|
|
@ -623,7 +711,11 @@ class Update extends Action
|
|||
return $doc instanceof Document ? $doc : new Document($doc);
|
||||
}, $data);
|
||||
|
||||
// Run bulk upsert without timestamp wrapper, checking manually in callback
|
||||
// First, apply upserts to documents in the transaction state
|
||||
// This ensures documents created in this transaction are updated properly
|
||||
$transactionState->applyBulkUpsertToState($collectionId, $documents, $state);
|
||||
|
||||
// Then run bulk upsert on committed database, checking manually in callback
|
||||
$dbForProject->upsertDocuments(
|
||||
$collectionId,
|
||||
$documents,
|
||||
|
|
@ -649,11 +741,21 @@ class Update extends Action
|
|||
|
||||
/**
|
||||
* Handle bulk delete operation with manual timestamp checking
|
||||
*
|
||||
* @param Database $dbForProject
|
||||
* @param TransactionState $transactionState
|
||||
* @param string $collectionId
|
||||
* @param array $data
|
||||
* @param \DateTime $createdAt
|
||||
* @param array &$state
|
||||
* @return void
|
||||
* @throws \Utopia\Database\Exception\Query
|
||||
* @throws ConflictException
|
||||
* @throws \Utopia\Database\Exception
|
||||
*/
|
||||
private function handleBulkDeleteOperation(
|
||||
Database $dbForProject,
|
||||
TransactionState $transactionState,
|
||||
string $collectionId,
|
||||
array $data,
|
||||
\DateTime $createdAt,
|
||||
|
|
@ -681,5 +783,8 @@ class Update extends Action
|
|||
}
|
||||
}
|
||||
);
|
||||
|
||||
// Also delete documents in the transaction state that match the query
|
||||
$transactionState->applyBulkDeleteToState($collectionId, $queries, $state);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
Loading…
Reference in a new issue