diff --git a/src/Appwrite/Platform/Modules/Databases/Http/Transactions/Update.php b/src/Appwrite/Platform/Modules/Databases/Http/Transactions/Update.php index d32e8d4c20..8793847a38 100644 --- a/src/Appwrite/Platform/Modules/Databases/Http/Transactions/Update.php +++ b/src/Appwrite/Platform/Modules/Databases/Http/Transactions/Update.php @@ -93,124 +93,84 @@ class Update extends Action 'status' => 'committing', ])); + // Fetch operations ordered by creation time to maintain exact sequence $operations = $dbForProject->find('transactionLogs', [ Query::equal('transactionInternalId', [$transaction->getSequence()]), + Query::orderAsc('$createdAt'), ]); - $creates - = $updates - = $deletes - = $increments - = $decrements - = $bulkUpdates - = $bulkDeletes - = []; - - foreach ($operations as $operation) { - $databaseInternalId = $operation['databaseInternalId']; - $collectionInternalId = $operation['collectionInternalId']; - $documentId = $operation['documentId']; - - switch ($operation['action']) { - case 'create': - $creates[$databaseInternalId][$collectionInternalId][] = new Document([ - '$id' => $documentId ?? ID::unique(), - ...$operation['data'] - ]); - break; - case 'update': - case 'upsert': - $updates[$databaseInternalId][$collectionInternalId][] = new Document([ - '$id' => $documentId, - ...$operation['data'], - ]); - break; - case 'delete': - $deletes[$databaseInternalId][$collectionInternalId][] = $documentId; - break; - case 'increment': - $increments[$databaseInternalId][$collectionInternalId][] = [ - 'attribute' => $operation['data']['attribute'], - 'value' => $operation['data']['value'] ?? 1, - 'max' => $operation['data']['max'] ?? null, - ]; - break; - case 'decrement': - $decrements[$databaseInternalId][$collectionInternalId][] = [ - 'attribute' => $operation['data']['attribute'], - 'value' => $operation['data']['value'] ?? 1, - 'min' => $operation['data']['min'] ?? null, - ]; - break; - case 'bulkUpdate': - $bulkUpdates[$databaseInternalId][$collectionInternalId][] = [ - 'data' => $operation['data']['data'] ?? null, - 'queries' => $operation['data']['queries'] ?? [], - ]; - break; - case 'bulkDelete': - $bulkDeletes[$databaseInternalId][$collectionInternalId][] = [ - 'queries' => $operation['data']['queries'] ?? [], - ]; - break; - } - } - try { - foreach ($creates as $dbId => $cols) { - foreach ($cols as $colId => $docs) { - $dbForProject->createDocuments("database_{$dbId}_collection_{$colId}", $docs); - } - } - foreach ($updates as $dbId => $cols) { - foreach ($cols as $colId => $docs) { - $dbForProject->createOrUpdateDocuments("database_{$dbId}_collection_{$colId}", $docs); - } - } - foreach ($deletes as $dbId => $cols) { - foreach ($cols as $colId => $ids) { - $dbForProject->deleteDocuments("database_{$dbId}_collection_{$colId}", [ - Query::equal('$id', $ids), - ]); - } - } - foreach ($increments as $dbId => $cols) { - foreach ($cols as $colId => $increments) { - foreach ($increments as $increment) { - $dbForProject->increaseDocumentAttribute( - "database_{$dbId}_collection_{$colId}", - $increment['attribute'], - $increment['value'], - $increment['max'] - ); + // Replay operations in exact order they were created + foreach ($operations as $operation) { + $databaseInternalId = $operation['databaseInternalId']; + $collectionInternalId = $operation['collectionInternalId']; + $documentId = $operation['documentId']; + $action = $operation['action']; + $data = $operation['data']; + $operationCreatedAt = new \DateTime($operation['$createdAt']); + + $collectionName = "database_{$databaseInternalId}_collection_{$collectionInternalId}"; + + // Wrap each operation with the timestamp from when it was logged + $dbForProject->withRequestTimestamp($operationCreatedAt, function () use ($dbForProject, $action, $collectionName, $documentId, $data) { + switch ($action) { + case 'create': + $document = new Document([ + '$id' => $documentId ?? ID::unique(), + ...$data + ]); + $dbForProject->createDocument($collectionName, $document); + break; + + case 'update': + case 'upsert': + $document = new Document([ + '$id' => $documentId, + ...$data, + ]); + $dbForProject->createOrUpdateDocument($collectionName, $document); + break; + + case 'delete': + $dbForProject->deleteDocument($collectionName, $documentId); + break; + + case 'increment': + $dbForProject->increaseDocumentAttribute( + collection: $collectionName, + id: $documentId, + attribute: $data['attribute'], + value: $data['value'] ?? 1, + max: $data['max'] ?? null + ); + break; + + case 'decrement': + $dbForProject->decreaseDocumentAttribute( + collection: $collectionName, + id: $documentId, + attribute: $data['attribute'], + value: $data['value'] ?? 1, + min: $data['min'] ?? null + ); + break; + + case 'bulkUpdate': + $dbForProject->updateDocuments( + $collectionName, + $data['data'] ?? null, + $data['queries'] ?? [] + ); + break; + + case 'bulkDelete': + $dbForProject->deleteDocuments( + $collectionName, + $data['queries'] ?? [] + ); + break; } - } - } - foreach ($decrements as $dbId => $cols) { - foreach ($cols as $colId => $decrements) { - foreach ($decrements as $decrement) { - $dbForProject->decreaseDocumentAttribute( - "database_{$dbId}_collection_{$colId}", - $decrement['attribute'], - $decrement['value'], - $decrement['min'] - ); - } - } - } - foreach ($bulkUpdates as $dbId => $cols) { - foreach ($cols as $colId => $updates) { - foreach ($updates as $update) { - $dbForProject->updateDocuments("database_{$dbId}_collection_{$colId}", $update['data'], $update['queries']); - } - } - } - foreach ($bulkDeletes as $dbId => $cols) { - foreach ($cols as $colId => $deletes) { - foreach ($deletes as $delete) { - $dbForProject->deleteDocuments("database_{$dbId}_collection_{$colId}", $delete['queries']); - } - } + }); } $dbForProject->updateDocument('transactions', $transactionId, new Document([