diff --git a/src/Appwrite/Databases/TransactionState.php b/src/Appwrite/Databases/TransactionState.php index 1f8e7f65c8..85ecb41ae2 100644 --- a/src/Appwrite/Databases/TransactionState.php +++ b/src/Appwrite/Databases/TransactionState.php @@ -8,6 +8,11 @@ use Utopia\Database\Query; /** * Service for managing transaction state and providing transaction-aware document operations + * + * This class provides methods to: + * - Query documents with transaction awareness (getDocument, listDocuments, countDocuments) + * - Apply bulk operations to transaction state for cross-operation visibility + * - Replay transaction operations to build current state */ class TransactionState { @@ -18,161 +23,15 @@ class TransactionState $this->dbForProject = $dbForProject; } - /** - * Apply projection (select) semantics from queries to a document - */ - private function applyProjection(Document $doc, array $queries): Document - { - if (empty($queries)) { - return $doc; - } - - // Extract selections from queries - $selections = []; - foreach ($queries as $query) { - if ($query->getMethod() === Query::TYPE_SELECT) { - $values = $query->getValues(); - foreach ($values as $value) { - // Skip relationship selections (containing '.') - if (!\str_contains($value, '.')) { - $selections[] = $value; - } - } - } - } - - // If no selections or wildcard present, return document as-is - if (empty($selections) || \in_array('*', $selections)) { - return $doc; - } - - // Create a new document with only selected attributes - $projected = new Document(); - - // Always preserve internal attributes - $projected->setAttribute('$id', $doc->getId()); - $projected->setAttribute('$collection', $doc->getCollection()); - $projected->setAttribute('$createdAt', $doc->getCreatedAt()); - $projected->setAttribute('$updatedAt', $doc->getUpdatedAt()); - if ($doc->offsetExists('$permissions')) { - $projected->setAttribute('$permissions', $doc->getPermissions()); - } - - // Add selected attributes - foreach ($selections as $attribute) { - if ($doc->offsetExists($attribute)) { - $projected->setAttribute($attribute, $doc->getAttribute($attribute)); - } - } - - return $projected; - } - - /** - * Get the current state of a transaction by replaying its operations - */ - private function getTransactionState(string $transactionId): array - { - $transaction = $this->dbForProject->getDocument('transactions', $transactionId); - if ($transaction->isEmpty() || $transaction->getAttribute('status') !== 'pending') { - return []; - } - - // Fetch operations ordered by sequence to replay in exact order - $operations = $this->dbForProject->find('transactionLogs', [ - Query::equal('transactionInternalId', [$transaction->getSequence()]), - ]); - - $state = []; - - foreach ($operations as $operation) { - $databaseInternalId = $operation['databaseInternalId']; - $collectionInternalId = $operation['collectionInternalId']; - $collectionId = "database_{$databaseInternalId}_collection_{$collectionInternalId}"; - $documentId = $operation['documentId']; - $action = $operation['action']; - $data = $operation['data']; - - if ($data instanceof Document) { - $data = $data->getArrayCopy(); - } - - switch ($action) { - case 'create': - $docId = $documentId ?? ($data['$id'] ?? null); - if ($docId) { - $state[$collectionId][$docId] = [ - 'action' => 'create', - 'document' => new Document($data), - 'exists' => true - ]; - } - break; - case 'update': - if (isset($state[$collectionId][$documentId])) { - // Update existing document in transaction state - $existingDocument = $state[$collectionId][$documentId]['document']; - foreach ($data as $key => $value) { - if ($key !== '$id') { - $existingDocument->setAttribute($key, $value); - } - } - // Only set action to 'update' if it's not already 'create' or 'upsert' - $currentAction = $state[$collectionId][$documentId]['action']; - if ($currentAction !== 'create' && $currentAction !== 'upsert') { - $state[$collectionId][$documentId]['action'] = 'update'; - } - } else { - // Document doesn't exist in transaction state, will be merged with committed version - $state[$collectionId][$documentId] = [ - 'action' => 'update', - 'document' => new Document($data), - 'exists' => true - ]; - } - break; - - case 'upsert': - $docId = $documentId ?? ($data['$id'] ?? null); - if (!$docId) { - break; - } - $state[$collectionId][$docId] = [ - 'action' => 'upsert', - 'document' => new Document($data), - 'exists' => true - ]; - break; - - case 'delete': - $state[$collectionId][$documentId] = [ - 'action' => 'delete', - 'exists' => false - ]; - break; - - case 'bulkCreate': - if (is_array($data)) { - foreach ($data as $doc) { - if ($doc instanceof Document) { - $doc = $doc->getArrayCopy(); - } - $state[$collectionId][$doc['$id']] = [ - 'action' => 'create', - 'document' => new Document($doc), - 'exists' => true - ]; - } - } - break; - } - } - - return $state; - } /** * Get a document with transaction-aware logic + * + * @param string $collectionId Collection ID + * @param string $documentId Document ID + * @param string|null $transactionId Optional transaction ID + * @param array $queries Optional query filters + * @return Document */ public function getDocument( string $collectionId, @@ -187,7 +46,6 @@ class TransactionState $state = $this->getTransactionState($transactionId); - // Check if document exists in transaction state if (isset($state[$collectionId][$documentId])) { $docState = $state[$collectionId][$documentId]; @@ -212,8 +70,7 @@ class TransactionState $committedDoc->setAttribute($key, $value); } } - // committedDoc already has projection applied by dbForProject->getDocument() - // But we need to reapply in case transaction added new fields + // Reapply projection in case transaction added new fields return $this->applyProjection($committedDoc, $queries); } elseif ($docState['action'] === 'upsert') { // Upsert created a new document since committed doc doesn't exist @@ -228,6 +85,11 @@ class TransactionState /** * List documents with transaction-aware logic + * + * @param string $collectionId Collection ID + * @param string|null $transactionId Optional transaction ID + * @param array $queries Optional query filters + * @return array Array of Document objects */ public function listDocuments( string $collectionId, @@ -280,6 +142,11 @@ class TransactionState /** * Count documents with transaction-aware logic + * + * @param string $collectionId Collection ID + * @param string|null $transactionId Optional transaction ID + * @param array $queries Optional query filters + * @return int Document count */ public function countDocuments( string $collectionId, @@ -302,7 +169,6 @@ class TransactionState } // Build a set of committed document IDs that match the query - // We need to find which documents match the filters $committedDocs = $this->dbForProject->find($collectionId, $queries); $committedDocIds = []; foreach ($committedDocs as $doc) { @@ -320,9 +186,6 @@ class TransactionState } } elseif ($docState['action'] === 'create') { // Document was created in transaction - // We need to check if it would match the query filters - // For now, we'll conservatively add it if no filters are present - // or apply basic filter matching if ($this->documentMatchesFilters($docState['document'], $queries)) { $adjustedCount++; // New document that matches } @@ -348,7 +211,285 @@ class TransactionState } /** - * Check if a document matches filter queries (simplified implementation) + * Check if a document exists with transaction-aware logic + * + * @param string $collectionId Collection ID + * @param string $documentId Document ID + * @param string|null $transactionId Optional transaction ID + * @return bool True if document exists + */ + public function documentExists( + string $collectionId, + string $documentId, + ?string $transactionId = null + ): bool { + $doc = $this->getDocument($collectionId, $documentId, $transactionId); + return !$doc->isEmpty(); + } + + /** + * Apply bulk update to documents in transaction state that match queries + * + * This allows bulk operations within a transaction to see each other's changes. + * + * @param string $collectionId Collection ID + * @param Document $updateData Document with update values + * @param array $queries Query filters to match documents + * @param array &$state Transaction state (passed by reference) + * @return void + */ + public function applyBulkUpdateToState( + string $collectionId, + Document $updateData, + array $queries, + array &$state + ): void { + if (!isset($state[$collectionId])) { + return; + } + + foreach ($state[$collectionId] as $docId => $doc) { + if ($this->documentMatchesFilters($doc, $queries)) { + // Apply the update to the state document + foreach ($updateData->getArrayCopy() as $key => $value) { + if ($key !== '$id') { + $doc->setAttribute($key, $value); + } + } + } + } + } + + /** + * Apply bulk delete to documents in transaction state that match queries + * + * This allows bulk operations within a transaction to see each other's changes. + * + * @param string $collectionId Collection ID + * @param array $queries Query filters to match documents + * @param array &$state Transaction state (passed by reference) + * @return void + */ + public function applyBulkDeleteToState( + string $collectionId, + array $queries, + array &$state + ): void { + if (!isset($state[$collectionId])) { + return; + } + + foreach ($state[$collectionId] as $docId => $doc) { + if ($this->documentMatchesFilters($doc, $queries)) { + unset($state[$collectionId][$docId]); + } + } + } + + /** + * Apply bulk upsert to documents in transaction state + * + * This allows bulk operations within a transaction to see each other's changes. + * + * @param string $collectionId Collection ID + * @param array $documents Array of Document objects to upsert + * @param array &$state Transaction state (passed by reference) + * @return void + */ + public function applyBulkUpsertToState( + string $collectionId, + array $documents, + array &$state + ): void { + foreach ($documents as $doc) { + if (!($doc instanceof Document)) { + continue; + } + + $docId = $doc->getId(); + if (!$docId) { + continue; + } + + // If document exists in state, update it; otherwise it will be handled by DB upsert + if (isset($state[$collectionId][$docId])) { + // Apply updates to existing state document + foreach ($doc->getArrayCopy() as $key => $value) { + if ($key !== '$id') { + $state[$collectionId][$docId]->setAttribute($key, $value); + } + } + } + } + } + + /** + * Get the current state of a transaction by replaying its operations + * + * @param string $transactionId Transaction ID + * @return array State array with structure: [collectionId => [docId => ['action' => ..., 'document' => ..., 'exists' => ...]]] + */ + private function getTransactionState(string $transactionId): array + { + $transaction = $this->dbForProject->getDocument('transactions', $transactionId); + if ($transaction->isEmpty() || $transaction->getAttribute('status') !== 'pending') { + return []; + } + + // Fetch operations ordered by sequence to replay in exact order + $operations = $this->dbForProject->find('transactionLogs', [ + Query::equal('transactionInternalId', [$transaction->getSequence()]), + ]); + + $state = []; + + foreach ($operations as $operation) { + $databaseInternalId = $operation['databaseInternalId']; + $collectionInternalId = $operation['collectionInternalId']; + $collectionId = "database_{$databaseInternalId}_collection_{$collectionInternalId}"; + $documentId = $operation['documentId']; + $action = $operation['action']; + $data = $operation['data']; + + if ($data instanceof Document) { + $data = $data->getArrayCopy(); + } + + switch ($action) { + case 'create': + $docId = $documentId ?? ($data['$id'] ?? null); + if ($docId) { + $state[$collectionId][$docId] = [ + 'action' => 'create', + 'document' => new Document($data), + 'exists' => true + ]; + } + break; + + case 'update': + if (isset($state[$collectionId][$documentId])) { + // Update existing document in transaction state + $existingDocument = $state[$collectionId][$documentId]['document']; + foreach ($data as $key => $value) { + if ($key !== '$id') { + $existingDocument->setAttribute($key, $value); + } + } + // Only set action to 'update' if it's not already 'create' or 'upsert' + $currentAction = $state[$collectionId][$documentId]['action']; + if ($currentAction !== 'create' && $currentAction !== 'upsert') { + $state[$collectionId][$documentId]['action'] = 'update'; + } + } else { + // Document doesn't exist in transaction state, will be merged with committed version + $state[$collectionId][$documentId] = [ + 'action' => 'update', + 'document' => new Document($data), + 'exists' => true + ]; + } + break; + + case 'upsert': + $docId = $documentId ?? ($data['$id'] ?? null); + if (!$docId) { + break; + } + $state[$collectionId][$docId] = [ + 'action' => 'upsert', + 'document' => new Document($data), + 'exists' => true + ]; + break; + + case 'delete': + $state[$collectionId][$documentId] = [ + 'action' => 'delete', + 'exists' => false + ]; + break; + + case 'bulkCreate': + if (\is_array($data)) { + foreach ($data as $doc) { + if ($doc instanceof Document) { + $doc = $doc->getArrayCopy(); + } + $state[$collectionId][$doc['$id']] = [ + 'action' => 'create', + 'document' => new Document($doc), + 'exists' => true + ]; + } + } + break; + } + } + + return $state; + } + + /** + * Apply projection (select) semantics from queries to a document + * + * @param Document $doc Document to apply projection to + * @param array $queries Query array that may contain select queries + * @return Document Projected document + */ + private function applyProjection(Document $doc, array $queries): Document + { + if (empty($queries)) { + return $doc; + } + + // Extract selections from queries + $selections = []; + foreach ($queries as $query) { + if ($query->getMethod() === Query::TYPE_SELECT) { + $values = $query->getValues(); + foreach ($values as $value) { + // Skip relationship selections (containing '.') + if (!\str_contains($value, '.')) { + $selections[] = $value; + } + } + } + } + + // If no selections or wildcard present, return document as-is + if (empty($selections) || \in_array('*', $selections)) { + return $doc; + } + + // Create a new document with only selected attributes + $projected = new Document(); + + // Always preserve internal attributes + $projected->setAttribute('$id', $doc->getId()); + $projected->setAttribute('$collection', $doc->getCollection()); + $projected->setAttribute('$createdAt', $doc->getCreatedAt()); + $projected->setAttribute('$updatedAt', $doc->getUpdatedAt()); + if ($doc->offsetExists('$permissions')) { + $projected->setAttribute('$permissions', $doc->getPermissions()); + } + + // Add selected attributes + foreach ($selections as $attribute) { + if ($doc->offsetExists($attribute)) { + $projected->setAttribute($attribute, $doc->getAttribute($attribute)); + } + } + + return $projected; + } + + /** + * Check if a document matches filter queries + * + * @param Document $doc Document to check + * @param array $queries Query filters + * @return bool True if document matches all filters */ private function documentMatchesFilters(Document $doc, array $queries): bool { @@ -387,11 +528,13 @@ class TransactionState return false; } break; + case Query::TYPE_NOT_EQUAL: if (\in_array($docValue, $values)) { return false; } break; + case Query::TYPE_CONTAINS: $matches = false; foreach ($values as $value) { @@ -404,6 +547,7 @@ class TransactionState return false; } break; + case Query::TYPE_STARTS_WITH: $matches = false; foreach ($values as $value) { @@ -416,6 +560,7 @@ class TransactionState return false; } break; + case Query::TYPE_ENDS_WITH: $matches = false; foreach ($values as $value) { @@ -428,36 +573,43 @@ class TransactionState return false; } break; + case Query::TYPE_GREATER_THAN: if (!($docValue > $values[0])) { return false; } break; + case Query::TYPE_GREATER_THAN_EQUAL: if (!($docValue >= $values[0])) { return false; } break; + case Query::TYPE_LESSER_THAN: if (!($docValue < $values[0])) { return false; } break; + case Query::TYPE_LESSER_THAN_EQUAL: if (!($docValue <= $values[0])) { return false; } break; + case Query::TYPE_IS_NULL: if (!\is_null($docValue)) { return false; } break; + case Query::TYPE_IS_NOT_NULL: if (\is_null($docValue)) { return false; } break; + case Query::TYPE_BETWEEN: if (!($docValue >= $values[0] && $docValue <= $values[1])) { return false; @@ -468,16 +620,4 @@ class TransactionState return true; } - - /** - * Check if a document exists with transaction-aware logic - */ - public function documentExists( - string $collectionId, - string $documentId, - ?string $transactionId = null - ): bool { - $doc = $this->getDocument($collectionId, $documentId, $transactionId); - return !$doc->isEmpty(); - } } diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php index 37642a0cca..19ba8f10d8 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php @@ -2,6 +2,7 @@ namespace Appwrite\Platform\Modules\Databases\Http\Databases\Transactions; +use Appwrite\Databases\TransactionState; use Appwrite\Event\Delete; use Appwrite\Event\Event; use Appwrite\Event\StatsUsage; @@ -66,6 +67,7 @@ class Update extends Action ->param('rollback', false, new Boolean(), 'Rollback transaction?', true) ->inject('response') ->inject('dbForProject') + ->inject('transactionState') ->inject('queueForDeletes') ->inject('queueForEvents') ->inject('queueForStatsUsage') @@ -81,6 +83,7 @@ class Update extends Action * @param bool $rollback * @param UtopiaResponse $response * @param Database $dbForProject + * @param TransactionState $transactionState * @param Delete $queueForDeletes * @param Event $queueForEvents * @param StatsUsage $queueForStatsUsage @@ -96,7 +99,7 @@ class Update extends Action * @throws Structure * @throws \Utopia\Exception */ - public function action(string $transactionId, bool $commit, bool $rollback, UtopiaResponse $response, Database $dbForProject, Delete $queueForDeletes, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks): void + public function action(string $transactionId, bool $commit, bool $rollback, UtopiaResponse $response, Database $dbForProject, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks): void { if (!$commit && !$rollback) { throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Either commit or rollback must be true'); @@ -182,13 +185,13 @@ class Update extends Action $this->handleBulkCreateOperation($dbForProject, $collectionId, $data, $createdAt, $state); break; case 'bulkUpdate': - $this->handleBulkUpdateOperation($dbForProject, $collectionId, $data, $createdAt, $state); + $this->handleBulkUpdateOperation($dbForProject, $transactionState, $collectionId, $data, $createdAt, $state); break; case 'bulkUpsert': - $this->handleBulkUpsertOperation($dbForProject, $collectionId, $data, $createdAt, $state); + $this->handleBulkUpsertOperation($dbForProject, $transactionState, $collectionId, $data, $createdAt, $state); break; case 'bulkDelete': - $this->handleBulkDeleteOperation($dbForProject, $collectionId, $data, $createdAt, $state); + $this->handleBulkDeleteOperation($dbForProject, $transactionState, $collectionId, $data, $createdAt, $state); break; } } @@ -348,6 +351,14 @@ class Update extends Action /** * Handle create operation + * + * @param Database $dbForProject + * @param string $collectionId + * @param string|null $documentId + * @param array $data + * @param \DateTime $createdAt + * @param array &$state + * @return void * @throws \Utopia\Database\Exception */ private function handleCreateOperation( @@ -371,6 +382,14 @@ class Update extends Action /** * Handle update operation + * + * @param Database $dbForProject + * @param string $collectionId + * @param string $documentId + * @param array $data + * @param \DateTime $createdAt + * @param array &$state + * @return void * @throws ConflictException * @throws \Utopia\Database\Exception */ @@ -410,6 +429,14 @@ class Update extends Action /** * Handle upsert operation + * + * @param Database $dbForProject + * @param string $collectionId + * @param string|null $documentId + * @param array $data + * @param \DateTime $createdAt + * @param array &$state + * @return void * @throws \Utopia\Database\Exception */ private function handleUpsertOperation( @@ -442,6 +469,15 @@ class Update extends Action /** * Handle delete operation + * + * @param Database $dbForProject + * @param string $collectionId + * @param string $documentId + * @param \DateTime $createdAt + * @param array &$state + * @return void + * @throws \Utopia\Database\Exception + * @throws NotFoundException */ private function handleDeleteOperation( Database $dbForProject, @@ -473,6 +509,16 @@ class Update extends Action /** * Handle increment operation + * + * @param Database $dbForProject + * @param string $collectionId + * @param string $documentId + * @param array $data + * @param \DateTime $createdAt + * @param array &$state + * @return void + * @throws ConflictException + * @throws \Utopia\Database\Exception */ private function handleIncrementOperation( Database $dbForProject, @@ -510,6 +556,16 @@ class Update extends Action /** * Handle decrement operation + * + * @param Database $dbForProject + * @param string $collectionId + * @param string $documentId + * @param array $data + * @param \DateTime $createdAt + * @param array &$state + * @return void + * @throws ConflictException + * @throws \Utopia\Database\Exception */ private function handleDecrementOperation( Database $dbForProject, @@ -547,6 +603,14 @@ class Update extends Action /** * Handle bulk create operation + * + * @param Database $dbForProject + * @param string $collectionId + * @param array $data + * @param \DateTime $createdAt + * @param array &$state + * @return void + * @throws \Utopia\Database\Exception */ private function handleBulkCreateOperation( Database $dbForProject, @@ -573,22 +637,33 @@ class Update extends Action /** * Handle bulk update operation with manual timestamp checking + * + * @param Database $dbForProject + * @param TransactionState $transactionState + * @param string $collectionId + * @param array $data + * @param \DateTime $createdAt + * @param array &$state + * @return void * @throws \Utopia\Database\Exception * @throws \Utopia\Database\Exception\Query * @throws ConflictException */ private function handleBulkUpdateOperation( Database $dbForProject, + TransactionState $transactionState, string $collectionId, array $data, \DateTime $createdAt, array &$state ): void { $queries = Query::parseQueries($data['queries'] ?? []); + $updateData = new Document($data['data']); + // First, update documents in the committed database $dbForProject->updateDocuments( $collectionId, - new Document($data['data']), + $updateData, $queries, onNext: function (Document $updated, Document $old) use (&$state, $collectionId, $createdAt) { // Check if this document was created/modified in this transaction @@ -605,14 +680,27 @@ class Update extends Action $state[$collectionId][$updated->getId()] = $updated; } ); + + // Also update documents in the transaction state that match the query + $transactionState->applyBulkUpdateToState($collectionId, $updateData, $queries, $state); } /** * Handle bulk upsert operation with manual timestamp checking + * + * @param Database $dbForProject + * @param TransactionState $transactionState + * @param string $collectionId + * @param array $data + * @param \DateTime $createdAt + * @param array &$state + * @return void * @throws ConflictException + * @throws \Utopia\Database\Exception */ private function handleBulkUpsertOperation( Database $dbForProject, + TransactionState $transactionState, string $collectionId, array $data, \DateTime $createdAt, @@ -623,7 +711,11 @@ class Update extends Action return $doc instanceof Document ? $doc : new Document($doc); }, $data); - // Run bulk upsert without timestamp wrapper, checking manually in callback + // First, apply upserts to documents in the transaction state + // This ensures documents created in this transaction are updated properly + $transactionState->applyBulkUpsertToState($collectionId, $documents, $state); + + // Then run bulk upsert on committed database, checking manually in callback $dbForProject->upsertDocuments( $collectionId, $documents, @@ -649,11 +741,21 @@ class Update extends Action /** * Handle bulk delete operation with manual timestamp checking + * + * @param Database $dbForProject + * @param TransactionState $transactionState + * @param string $collectionId + * @param array $data + * @param \DateTime $createdAt + * @param array &$state + * @return void * @throws \Utopia\Database\Exception\Query * @throws ConflictException + * @throws \Utopia\Database\Exception */ private function handleBulkDeleteOperation( Database $dbForProject, + TransactionState $transactionState, string $collectionId, array $data, \DateTime $createdAt, @@ -681,5 +783,8 @@ class Update extends Action } } ); + + // Also delete documents in the transaction state that match the query + $transactionState->applyBulkDeleteToState($collectionId, $queries, $state); } }