From 61b073a6a227be9ff4626718a183532440010a06 Mon Sep 17 00:00:00 2001 From: Jake Barnby Date: Tue, 7 Oct 2025 18:53:38 +1300 Subject: [PATCH] Clean up --- composer.lock | 24 ++-- src/Appwrite/Databases/TransactionState.php | 29 ++--- .../Transactions/Operations/Create.php | 1 - .../Http/Databases/Transactions/Update.php | 106 +++++++----------- 4 files changed, 71 insertions(+), 89 deletions(-) diff --git a/composer.lock b/composer.lock index 8450c9df98..bad4ed32af 100644 --- a/composer.lock +++ b/composer.lock @@ -1927,16 +1927,16 @@ }, { "name": "phpseclib/phpseclib", - "version": "3.0.46", + "version": "3.0.47", "source": { "type": "git", "url": "https://github.com/phpseclib/phpseclib.git", - "reference": "56483a7de62a6c2a6635e42e93b8a9e25d4f0ec6" + "reference": "9d6ca36a6c2dd434765b1071b2644a1c683b385d" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/56483a7de62a6c2a6635e42e93b8a9e25d4f0ec6", - "reference": "56483a7de62a6c2a6635e42e93b8a9e25d4f0ec6", + "url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/9d6ca36a6c2dd434765b1071b2644a1c683b385d", + "reference": "9d6ca36a6c2dd434765b1071b2644a1c683b385d", "shasum": "" }, "require": { @@ -2017,7 +2017,7 @@ ], "support": { "issues": "https://github.com/phpseclib/phpseclib/issues", - "source": "https://github.com/phpseclib/phpseclib/tree/3.0.46" + "source": "https://github.com/phpseclib/phpseclib/tree/3.0.47" }, "funding": [ { @@ -2033,7 +2033,7 @@ "type": "tidelift" } ], - "time": "2025-06-26T16:29:55+00:00" + "time": "2025-10-06T01:07:24+00:00" }, { "name": "psr/container", @@ -3635,16 +3635,16 @@ }, { "name": "utopia-php/database", - "version": "2.2.0", + "version": "2.3.1", "source": { "type": "git", "url": "https://github.com/utopia-php/database.git", - "reference": "19a3ab2ae99578861dd1a7b4fdbfb62b37d09447" + "reference": "a91e04080d7f13c35c4885dea0ffebc33cd33e1f" }, "dist": { "type": "zip", - "url": "https://api.github.com/repos/utopia-php/database/zipball/19a3ab2ae99578861dd1a7b4fdbfb62b37d09447", - "reference": "19a3ab2ae99578861dd1a7b4fdbfb62b37d09447", + "url": "https://api.github.com/repos/utopia-php/database/zipball/a91e04080d7f13c35c4885dea0ffebc33cd33e1f", + "reference": "a91e04080d7f13c35c4885dea0ffebc33cd33e1f", "shasum": "" }, "require": { @@ -3685,9 +3685,9 @@ ], "support": { "issues": "https://github.com/utopia-php/database/issues", - "source": "https://github.com/utopia-php/database/tree/2.2.0" + "source": "https://github.com/utopia-php/database/tree/2.3.1" }, - "time": "2025-09-26T02:23:30+00:00" + "time": "2025-10-06T04:29:14+00:00" }, { "name": "utopia-php/detector", diff --git a/src/Appwrite/Databases/TransactionState.php b/src/Appwrite/Databases/TransactionState.php index 2d35ce92ae..522e160a87 100644 --- a/src/Appwrite/Databases/TransactionState.php +++ b/src/Appwrite/Databases/TransactionState.php @@ -45,32 +45,27 @@ class TransactionState ?string $transactionId = null, array $queries = [] ): Document { - // If no transaction, use normal database retrieval if ($transactionId === null) { return $this->dbForProject->getDocument($collectionId, $documentId, $queries); } $state = $this->getTransactionState($transactionId); - // Check if document exists in transaction state if (isset($state[$collectionId][$documentId])) { $docState = $state[$collectionId][$documentId]; if (!$docState['exists']) { - // Document was deleted in transaction return new Document(); } if ($docState['action'] === 'create') { - // Document was created in transaction, return the created version return $this->applyProjection($docState['document'], $queries); } if ($docState['action'] === 'update' || $docState['action'] === 'upsert') { - // This is an update to an existing document, merge with committed version + // Merge with committed version $committedDoc = $this->dbForProject->getDocument($collectionId, $documentId, $queries); if (!$committedDoc->isEmpty()) { - // Apply the updates from transaction foreach ($docState['document']->getAttributes() as $key => $value) { if ($key !== '$id') { $committedDoc->setAttribute($key, $value); @@ -79,13 +74,11 @@ class TransactionState // Reapply projection in case transaction added new fields return $this->applyProjection($committedDoc, $queries); } elseif ($docState['action'] === 'upsert') { - // Upsert created a new document since committed doc doesn't exist return $this->applyProjection($docState['document'], $queries); } } } - // Document not affected by transaction, return committed version return $this->dbForProject->getDocument($collectionId, $documentId, $queries); } @@ -307,18 +300,21 @@ class TransactionState /** * Apply bulk upsert to documents in transaction state * - * This allows bulk operations within a transaction to see each other's changes. + * This merges partial upsert data with full documents from transaction state, + * preventing validation errors when upserting documents created in the same transaction. * * @param string $collectionId Collection ID - * @param array $documents Array of Document objects to upsert + * @param array $documents Array of Document objects to upsert (can be partial) * @param array &$state Transaction state (passed by reference) - * @return void + * @return array Merged documents ready for database upsert */ public function applyBulkUpsertToState( string $collectionId, array $documents, array &$state - ): void { + ): array { + $mergedDocuments = []; + foreach ($documents as $doc) { if (!($doc instanceof Document)) { continue; @@ -329,7 +325,7 @@ class TransactionState continue; } - // If document exists in state, update it; otherwise it will be handled by DB upsert + // If document exists in state, update it and use the merged version if (isset($state[$collectionId][$docId])) { // Apply updates to existing state document foreach ($doc->getArrayCopy() as $key => $value) { @@ -337,8 +333,15 @@ class TransactionState $state[$collectionId][$docId]->setAttribute($key, $value); } } + // Use the full merged document from state + $mergedDocuments[] = $state[$collectionId][$docId]; + } else { + // Document not in state - use original partial data for DB upsert + $mergedDocuments[] = $doc; } } + + return $mergedDocuments; } /** diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Operations/Create.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Operations/Create.php index 5f6fa4f1a6..c3ba45bdce 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Operations/Create.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Operations/Create.php @@ -113,7 +113,6 @@ class Create extends Action throw new Exception(Exception::COLLECTION_NOT_FOUND); } - // Check if collection has relationships for bulk operations if (\in_array($operation['action'], ['bulkCreate', 'bulkUpdate', 'bulkUpsert', 'bulkDelete'])) { $hasRelationships = \array_filter( $collection->getAttribute('attributes', []), diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php index d522c699a8..e33a731d0e 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Databases/Transactions/Update.php @@ -103,7 +103,6 @@ class Update extends Action */ public function action(string $transactionId, bool $commit, bool $rollback, UtopiaResponse $response, Database $dbForProject, Document $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks): void { - if (!$commit && !$rollback) { throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Either commit or rollback must be true'); } @@ -128,8 +127,6 @@ class Update extends Action if ($commit) { $operations = []; - - // Track metrics for usage stats $totalOperations = 0; $databaseOperations = []; @@ -139,15 +136,12 @@ class Update extends Action 'status' => 'committing', ]))); - // Fetch operations ordered by sequence by default to replay operations in exact order they were created $operations = Authorization::skip(fn () => $dbForProject->find('transactionLogs', [ Query::equal('transactionInternalId', [$transaction->getSequence()]), Query::orderAsc(), Query::limit(PHP_INT_MAX), ])); - - // Track transaction state for cross-operation visibility $state = []; foreach ($operations as $operation) { @@ -159,7 +153,6 @@ class Update extends Action $action = $operation['action']; $data = $operation['data']; - // For delete operations, fetch the document before deleting for realtime events if ($action === 'delete' && $documentId && empty($data)) { $doc = $dbForProject->getDocument($collectionId, $documentId); if (!$doc->isEmpty()) { @@ -168,7 +161,6 @@ class Update extends Action } } - // Track operations for stats $totalOperations++; $databaseOperations[$databaseInternalId] = ($databaseOperations[$databaseInternalId] ?? 0) + 1; @@ -176,7 +168,6 @@ class Update extends Action $data = $data->getArrayCopy(); } - // Execute the operation based on its type switch ($action) { case 'create': $this->handleCreateOperation($dbForProject, $collectionId, $documentId, $data, $createdAt, $state); @@ -217,44 +208,37 @@ class Update extends Action new Document(['status' => 'committed']) )); - // Clear the transaction logs $queueForDeletes ->setType(DELETE_TYPE_DOCUMENT) ->setDocument($transaction); }); } catch (NotFoundException $e) { - // Transaction has been rolled back, now mark it as failed Authorization::skip(fn () => $dbForProject->updateDocument('transactions', $transactionId, new Document([ 'status' => 'failed', ]))); throw new Exception(Exception::DOCUMENT_NOT_FOUND, previous: $e); } catch (DuplicateException|ConflictException $e) { - // Transaction has been rolled back, now mark it as failed Authorization::skip(fn () => $dbForProject->updateDocument('transactions', $transactionId, new Document([ 'status' => 'failed', ]))); throw new Exception(Exception::TRANSACTION_CONFLICT, previous: $e); } catch (StructureException $e) { - // Transaction has been rolled back, now mark it as failed Authorization::skip(fn () => $dbForProject->updateDocument('transactions', $transactionId, new Document([ 'status' => 'failed', ]))); throw new Exception(Exception::DOCUMENT_INVALID_STRUCTURE, $e->getMessage()); } catch (LimitException $e) { - // Transaction has been rolled back, now mark it as failed Authorization::skip(fn () => $dbForProject->updateDocument('transactions', $transactionId, new Document([ 'status' => 'failed', ]))); throw new Exception(Exception::ATTRIBUTE_LIMIT_EXCEEDED, $e->getMessage()); } catch (TransactionException $e) { - // Transaction has been rolled back, now mark it as failed Authorization::skip(fn () => $dbForProject->updateDocument('transactions', $transactionId, new Document([ 'status' => 'failed', ]))); throw new Exception(Exception::TRANSACTION_FAILED, $e->getMessage()); } catch (QueryException $e) { - // Transaction has been rolled back, now mark it as failed Authorization::skip(fn () => $dbForProject->updateDocument('transactions', $transactionId, new Document([ 'status' => 'failed', ]))); @@ -264,7 +248,6 @@ class Update extends Action $queueForStatsUsage ->addMetric(METRIC_DATABASES_OPERATIONS_WRITES, $totalOperations); - // Add per-database metrics foreach ($databaseOperations as $sequence => $count) { $queueForStatsUsage->addMetric( str_replace('{databaseInternalId}', $sequence, METRIC_DATABASE_ID_OPERATIONS_WRITES), @@ -272,8 +255,6 @@ class Update extends Action ); } - // Trigger realtime events for each operation - foreach ($operations as $operation) { $databaseInternalId = $operation['databaseInternalId']; $collectionInternalId = $operation['collectionInternalId']; @@ -316,7 +297,6 @@ class Update extends Action $eventAction = 'create'; $docId = $documentId ?? $data['$id'] ?? null; if ($docId) { - // Fetch the created document from the database $doc = $dbForProject->getDocument($collectionId, $docId); if (!$doc->isEmpty()) { $documentsToTrigger[] = $doc; @@ -328,7 +308,6 @@ class Update extends Action case 'decrement': $eventAction = 'update'; if ($documentId) { - // Fetch the updated document from the database $doc = $dbForProject->getDocument($collectionId, $documentId); if (!$doc->isEmpty()) { $documentsToTrigger[] = $doc; @@ -338,15 +317,13 @@ class Update extends Action case 'delete': $eventAction = 'delete'; if ($documentId && !empty($data)) { - // For delete, use the fetched document data (fetched before deletion) $documentsToTrigger[] = new Document(array_merge($data, ['$id' => $documentId])); } break; case 'upsert': - $eventAction = 'update'; // Upsert is treated as update for events + $eventAction = 'update'; $docId = $documentId ?? $data['$id'] ?? null; if ($docId) { - // Fetch the upserted document from the database $doc = $dbForProject->getDocument($collectionId, $docId); if (!$doc->isEmpty()) { $documentsToTrigger[] = $doc; @@ -360,15 +337,11 @@ class Update extends Action break; } - // Trigger events for each document - $eventString = "databases.[databaseId].{$contextKey}s.[{$groupId}].{$resourcePlural}.[{$resourceId}]." . $eventAction; $queueForEvents->setEvent($eventString); foreach ($documentsToTrigger as $doc) { - - // Add table/collection IDs to the payload for realtime channels $payload = $doc->getArrayCopy(); $payload['$tableId'] = $collection->getId(); $payload['$collectionId'] = $collection->getId(); @@ -464,7 +437,6 @@ class Update extends Action $dependent = isset($state[$collectionId][$documentId]); if ($dependent) { - // Update the state document directly without timestamp wrapper $state[$collectionId][$documentId] = $dbForProject->updateDocument( $collectionId, $documentId, @@ -473,7 +445,6 @@ class Update extends Action return; } - // Use timestamp wrapper for independent operations $dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $documentId, $data, &$state) { $document = $dbForProject->updateDocument( $collectionId, @@ -510,15 +481,21 @@ class Update extends Action $dependent = isset($state[$collectionId][$documentId]); if ($dependent) { - // Upsert the state document directly without timestamp wrapper + // Merge partial upsert data with full document from transaction state + $existingDoc = $state[$collectionId][$documentId]; + foreach ($data as $key => $value) { + if ($key !== '$id') { + $existingDoc->setAttribute($key, $value); + } + } + $state[$collectionId][$documentId] = $dbForProject->upsertDocument( $collectionId, - new Document($data), + $existingDoc, ); return; } - // Use timestamp wrapper for independent operations $dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $documentId, $data, &$state) { $state[$collectionId][$documentId] = $dbForProject->upsertDocument( $collectionId, @@ -549,13 +526,11 @@ class Update extends Action $dependent = isset($state[$collectionId][$documentId]); if ($dependent) { - // Delete without timestamp wrapper $dbForProject->deleteDocument($collectionId, $documentId); unset($state[$collectionId][$documentId]); return; } - // Use timestamp wrapper for independent operations $dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $documentId, &$state) { $deleted = $dbForProject->deleteDocument($collectionId, $documentId); if (!$deleted) { @@ -591,7 +566,6 @@ class Update extends Action $dependent = isset($state[$collectionId][$documentId]); if ($dependent) { - // Increment without timestamp wrapper $state[$collectionId][$documentId] = $dbForProject->increaseDocumentAttribute( collection: $collectionId, id: $documentId, @@ -602,7 +576,6 @@ class Update extends Action return; } - // Use timestamp wrapper for independent operations $dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $documentId, $data) { $dbForProject->increaseDocumentAttribute( collection: $collectionId, @@ -638,7 +611,6 @@ class Update extends Action $dependent = isset($state[$collectionId][$documentId]); if ($dependent) { - // Decrement without timestamp wrapper $state[$collectionId][$documentId] = $dbForProject->decreaseDocumentAttribute( collection: $collectionId, id: $documentId, @@ -649,8 +621,6 @@ class Update extends Action return; } - // Use timestamp wrapper for independent operations - // Use timestamp wrapper for independent operations $dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $documentId, $data, &$state) { $state[$collectionId][$documentId] = $dbForProject->decreaseDocumentAttribute( collection: $collectionId, @@ -681,7 +651,6 @@ class Update extends Action array &$state ): void { $dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $data, &$state) { - // Convert data arrays to Document objects if needed $documents = \array_map(function ($doc) { return $doc instanceof Document ? $doc : new Document($doc); }, $data); @@ -721,29 +690,50 @@ class Update extends Action $queries = Query::parseQueries($data['queries'] ?? []); $updateData = new Document($data['data']); - // First, update documents in the committed database + $dependentDocs = []; + + $transactionState->applyBulkUpdateToState($collectionId, $updateData, $queries, $state); + + // Clone the document before passing to updateDocuments to prevent mutation + // The database layer mutates the input document, which would corrupt transaction state $dbForProject->updateDocuments( $collectionId, - $updateData, + clone $updateData, $queries, - onNext: function (Document $updated, Document $old) use (&$state, $collectionId, $createdAt) { - // Check if this document was created/modified in this transaction + onNext: function (Document $updated, Document $old) use (&$state, $collectionId, $createdAt, &$dependentDocs) { $dependent = isset($state[$collectionId][$updated->getId()]); - // If not in transaction state, check for timestamp conflicts - if (!$dependent) { + if ($dependent) { + $dependentDocs[] = $updated->getId(); + } else { $oldUpdatedAt = new \DateTime($old->getUpdatedAt()); if ($oldUpdatedAt > $createdAt) { throw new ConflictException('Document was updated after the request timestamp'); } + $state[$collectionId][$updated->getId()] = $updated; } - - $state[$collectionId][$updated->getId()] = $updated; } ); - // Also update documents in the transaction state that match the query - $transactionState->applyBulkUpdateToState($collectionId, $updateData, $queries, $state); + // Re-write dependent documents from state to database to fix partial updates + if (!empty($dependentDocs)) { + $documentsToRewrite = []; + foreach ($dependentDocs as $docId) { + if (isset($state[$collectionId][$docId])) { + $documentsToRewrite[] = $state[$collectionId][$docId]; + } + } + + if (!empty($documentsToRewrite)) { + $dbForProject->upsertDocuments( + $collectionId, + $documentsToRewrite, + onNext: function (Document $upserted) use (&$state, $collectionId) { + $state[$collectionId][$upserted->getId()] = $upserted; + } + ); + } + } } /** @@ -767,25 +757,19 @@ class Update extends Action \DateTime $createdAt, array &$state ): void { - // Convert data arrays to Document objects if needed $documents = \array_map(function ($doc) { return $doc instanceof Document ? $doc : new Document($doc); }, $data); - // First, apply upserts to documents in the transaction state - // This ensures documents created in this transaction are updated properly - $transactionState->applyBulkUpsertToState($collectionId, $documents, $state); + $mergedDocuments = $transactionState->applyBulkUpsertToState($collectionId, $documents, $state); - // Then run bulk upsert on committed database, checking manually in callback $dbForProject->upsertDocuments( $collectionId, - $documents, + $mergedDocuments, onNext: function (Document $upserted, ?Document $old) use (&$state, $collectionId, $createdAt) { if ($old !== null) { - // This is an update - check if document was created/modified in this transaction $dependent = isset($state[$collectionId][$upserted->getId()]); - // If not in transaction state, check for timestamp conflicts if (!$dependent) { $oldUpdatedAt = new \DateTime($old->getUpdatedAt()); if ($oldUpdatedAt > $createdAt) { @@ -794,7 +778,6 @@ class Update extends Action } } - // If $old is null, this is a create operation - no timestamp check needed $state[$collectionId][$upserted->getId()] = $upserted; } ); @@ -830,7 +813,6 @@ class Update extends Action onNext: function (Document $deleted, Document $old) use (&$state, $collectionId, $createdAt) { $dependent = isset($state[$collectionId][$deleted->getId()]); - // If not in transaction state, check for timestamp conflicts if (!$dependent) { $oldUpdatedAt = new \DateTime($old->getUpdatedAt()); if ($oldUpdatedAt > $createdAt) { @@ -838,14 +820,12 @@ class Update extends Action } } - // Remove from state after successful deletion if (isset($state[$collectionId][$deleted->getId()])) { unset($state[$collectionId][$deleted->getId()]); } } ); - // Also delete documents in the transaction state that match the query $transactionState->applyBulkDeleteToState($collectionId, $queries, $state); } }