This commit is contained in:
Jake Barnby 2025-10-07 18:53:38 +13:00
parent a06d96699b
commit 61b073a6a2
No known key found for this signature in database
GPG key ID: C437A8CC85B96E9C
4 changed files with 71 additions and 89 deletions

24
composer.lock generated
View file

@ -1927,16 +1927,16 @@
},
{
"name": "phpseclib/phpseclib",
"version": "3.0.46",
"version": "3.0.47",
"source": {
"type": "git",
"url": "https://github.com/phpseclib/phpseclib.git",
"reference": "56483a7de62a6c2a6635e42e93b8a9e25d4f0ec6"
"reference": "9d6ca36a6c2dd434765b1071b2644a1c683b385d"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/56483a7de62a6c2a6635e42e93b8a9e25d4f0ec6",
"reference": "56483a7de62a6c2a6635e42e93b8a9e25d4f0ec6",
"url": "https://api.github.com/repos/phpseclib/phpseclib/zipball/9d6ca36a6c2dd434765b1071b2644a1c683b385d",
"reference": "9d6ca36a6c2dd434765b1071b2644a1c683b385d",
"shasum": ""
},
"require": {
@ -2017,7 +2017,7 @@
],
"support": {
"issues": "https://github.com/phpseclib/phpseclib/issues",
"source": "https://github.com/phpseclib/phpseclib/tree/3.0.46"
"source": "https://github.com/phpseclib/phpseclib/tree/3.0.47"
},
"funding": [
{
@ -2033,7 +2033,7 @@
"type": "tidelift"
}
],
"time": "2025-06-26T16:29:55+00:00"
"time": "2025-10-06T01:07:24+00:00"
},
{
"name": "psr/container",
@ -3635,16 +3635,16 @@
},
{
"name": "utopia-php/database",
"version": "2.2.0",
"version": "2.3.1",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/database.git",
"reference": "19a3ab2ae99578861dd1a7b4fdbfb62b37d09447"
"reference": "a91e04080d7f13c35c4885dea0ffebc33cd33e1f"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/database/zipball/19a3ab2ae99578861dd1a7b4fdbfb62b37d09447",
"reference": "19a3ab2ae99578861dd1a7b4fdbfb62b37d09447",
"url": "https://api.github.com/repos/utopia-php/database/zipball/a91e04080d7f13c35c4885dea0ffebc33cd33e1f",
"reference": "a91e04080d7f13c35c4885dea0ffebc33cd33e1f",
"shasum": ""
},
"require": {
@ -3685,9 +3685,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/database/issues",
"source": "https://github.com/utopia-php/database/tree/2.2.0"
"source": "https://github.com/utopia-php/database/tree/2.3.1"
},
"time": "2025-09-26T02:23:30+00:00"
"time": "2025-10-06T04:29:14+00:00"
},
{
"name": "utopia-php/detector",

View file

@ -45,32 +45,27 @@ class TransactionState
?string $transactionId = null,
array $queries = []
): Document {
// If no transaction, use normal database retrieval
if ($transactionId === null) {
return $this->dbForProject->getDocument($collectionId, $documentId, $queries);
}
$state = $this->getTransactionState($transactionId);
// Check if document exists in transaction state
if (isset($state[$collectionId][$documentId])) {
$docState = $state[$collectionId][$documentId];
if (!$docState['exists']) {
// Document was deleted in transaction
return new Document();
}
if ($docState['action'] === 'create') {
// Document was created in transaction, return the created version
return $this->applyProjection($docState['document'], $queries);
}
if ($docState['action'] === 'update' || $docState['action'] === 'upsert') {
// This is an update to an existing document, merge with committed version
// Merge with committed version
$committedDoc = $this->dbForProject->getDocument($collectionId, $documentId, $queries);
if (!$committedDoc->isEmpty()) {
// Apply the updates from transaction
foreach ($docState['document']->getAttributes() as $key => $value) {
if ($key !== '$id') {
$committedDoc->setAttribute($key, $value);
@ -79,13 +74,11 @@ class TransactionState
// Reapply projection in case transaction added new fields
return $this->applyProjection($committedDoc, $queries);
} elseif ($docState['action'] === 'upsert') {
// Upsert created a new document since committed doc doesn't exist
return $this->applyProjection($docState['document'], $queries);
}
}
}
// Document not affected by transaction, return committed version
return $this->dbForProject->getDocument($collectionId, $documentId, $queries);
}
@ -307,18 +300,21 @@ class TransactionState
/**
* Apply bulk upsert to documents in transaction state
*
* This allows bulk operations within a transaction to see each other's changes.
* This merges partial upsert data with full documents from transaction state,
* preventing validation errors when upserting documents created in the same transaction.
*
* @param string $collectionId Collection ID
* @param array $documents Array of Document objects to upsert
* @param array $documents Array of Document objects to upsert (can be partial)
* @param array &$state Transaction state (passed by reference)
* @return void
* @return array Merged documents ready for database upsert
*/
public function applyBulkUpsertToState(
string $collectionId,
array $documents,
array &$state
): void {
): array {
$mergedDocuments = [];
foreach ($documents as $doc) {
if (!($doc instanceof Document)) {
continue;
@ -329,7 +325,7 @@ class TransactionState
continue;
}
// If document exists in state, update it; otherwise it will be handled by DB upsert
// If document exists in state, update it and use the merged version
if (isset($state[$collectionId][$docId])) {
// Apply updates to existing state document
foreach ($doc->getArrayCopy() as $key => $value) {
@ -337,8 +333,15 @@ class TransactionState
$state[$collectionId][$docId]->setAttribute($key, $value);
}
}
// Use the full merged document from state
$mergedDocuments[] = $state[$collectionId][$docId];
} else {
// Document not in state - use original partial data for DB upsert
$mergedDocuments[] = $doc;
}
}
return $mergedDocuments;
}
/**

View file

@ -113,7 +113,6 @@ class Create extends Action
throw new Exception(Exception::COLLECTION_NOT_FOUND);
}
// Check if collection has relationships for bulk operations
if (\in_array($operation['action'], ['bulkCreate', 'bulkUpdate', 'bulkUpsert', 'bulkDelete'])) {
$hasRelationships = \array_filter(
$collection->getAttribute('attributes', []),

View file

@ -103,7 +103,6 @@ class Update extends Action
*/
public function action(string $transactionId, bool $commit, bool $rollback, UtopiaResponse $response, Database $dbForProject, Document $user, TransactionState $transactionState, Delete $queueForDeletes, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks): void
{
if (!$commit && !$rollback) {
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Either commit or rollback must be true');
}
@ -128,8 +127,6 @@ class Update extends Action
if ($commit) {
$operations = [];
// Track metrics for usage stats
$totalOperations = 0;
$databaseOperations = [];
@ -139,15 +136,12 @@ class Update extends Action
'status' => 'committing',
])));
// Fetch operations ordered by sequence by default to replay operations in exact order they were created
$operations = Authorization::skip(fn () => $dbForProject->find('transactionLogs', [
Query::equal('transactionInternalId', [$transaction->getSequence()]),
Query::orderAsc(),
Query::limit(PHP_INT_MAX),
]));
// Track transaction state for cross-operation visibility
$state = [];
foreach ($operations as $operation) {
@ -159,7 +153,6 @@ class Update extends Action
$action = $operation['action'];
$data = $operation['data'];
// For delete operations, fetch the document before deleting for realtime events
if ($action === 'delete' && $documentId && empty($data)) {
$doc = $dbForProject->getDocument($collectionId, $documentId);
if (!$doc->isEmpty()) {
@ -168,7 +161,6 @@ class Update extends Action
}
}
// Track operations for stats
$totalOperations++;
$databaseOperations[$databaseInternalId] = ($databaseOperations[$databaseInternalId] ?? 0) + 1;
@ -176,7 +168,6 @@ class Update extends Action
$data = $data->getArrayCopy();
}
// Execute the operation based on its type
switch ($action) {
case 'create':
$this->handleCreateOperation($dbForProject, $collectionId, $documentId, $data, $createdAt, $state);
@ -217,44 +208,37 @@ class Update extends Action
new Document(['status' => 'committed'])
));
// Clear the transaction logs
$queueForDeletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($transaction);
});
} catch (NotFoundException $e) {
// Transaction has been rolled back, now mark it as failed
Authorization::skip(fn () => $dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
])));
throw new Exception(Exception::DOCUMENT_NOT_FOUND, previous: $e);
} catch (DuplicateException|ConflictException $e) {
// Transaction has been rolled back, now mark it as failed
Authorization::skip(fn () => $dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
])));
throw new Exception(Exception::TRANSACTION_CONFLICT, previous: $e);
} catch (StructureException $e) {
// Transaction has been rolled back, now mark it as failed
Authorization::skip(fn () => $dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
])));
throw new Exception(Exception::DOCUMENT_INVALID_STRUCTURE, $e->getMessage());
} catch (LimitException $e) {
// Transaction has been rolled back, now mark it as failed
Authorization::skip(fn () => $dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
])));
throw new Exception(Exception::ATTRIBUTE_LIMIT_EXCEEDED, $e->getMessage());
} catch (TransactionException $e) {
// Transaction has been rolled back, now mark it as failed
Authorization::skip(fn () => $dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
])));
throw new Exception(Exception::TRANSACTION_FAILED, $e->getMessage());
} catch (QueryException $e) {
// Transaction has been rolled back, now mark it as failed
Authorization::skip(fn () => $dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
])));
@ -264,7 +248,6 @@ class Update extends Action
$queueForStatsUsage
->addMetric(METRIC_DATABASES_OPERATIONS_WRITES, $totalOperations);
// Add per-database metrics
foreach ($databaseOperations as $sequence => $count) {
$queueForStatsUsage->addMetric(
str_replace('{databaseInternalId}', $sequence, METRIC_DATABASE_ID_OPERATIONS_WRITES),
@ -272,8 +255,6 @@ class Update extends Action
);
}
// Trigger realtime events for each operation
foreach ($operations as $operation) {
$databaseInternalId = $operation['databaseInternalId'];
$collectionInternalId = $operation['collectionInternalId'];
@ -316,7 +297,6 @@ class Update extends Action
$eventAction = 'create';
$docId = $documentId ?? $data['$id'] ?? null;
if ($docId) {
// Fetch the created document from the database
$doc = $dbForProject->getDocument($collectionId, $docId);
if (!$doc->isEmpty()) {
$documentsToTrigger[] = $doc;
@ -328,7 +308,6 @@ class Update extends Action
case 'decrement':
$eventAction = 'update';
if ($documentId) {
// Fetch the updated document from the database
$doc = $dbForProject->getDocument($collectionId, $documentId);
if (!$doc->isEmpty()) {
$documentsToTrigger[] = $doc;
@ -338,15 +317,13 @@ class Update extends Action
case 'delete':
$eventAction = 'delete';
if ($documentId && !empty($data)) {
// For delete, use the fetched document data (fetched before deletion)
$documentsToTrigger[] = new Document(array_merge($data, ['$id' => $documentId]));
}
break;
case 'upsert':
$eventAction = 'update'; // Upsert is treated as update for events
$eventAction = 'update';
$docId = $documentId ?? $data['$id'] ?? null;
if ($docId) {
// Fetch the upserted document from the database
$doc = $dbForProject->getDocument($collectionId, $docId);
if (!$doc->isEmpty()) {
$documentsToTrigger[] = $doc;
@ -360,15 +337,11 @@ class Update extends Action
break;
}
// Trigger events for each document
$eventString = "databases.[databaseId].{$contextKey}s.[{$groupId}].{$resourcePlural}.[{$resourceId}]." . $eventAction;
$queueForEvents->setEvent($eventString);
foreach ($documentsToTrigger as $doc) {
// Add table/collection IDs to the payload for realtime channels
$payload = $doc->getArrayCopy();
$payload['$tableId'] = $collection->getId();
$payload['$collectionId'] = $collection->getId();
@ -464,7 +437,6 @@ class Update extends Action
$dependent = isset($state[$collectionId][$documentId]);
if ($dependent) {
// Update the state document directly without timestamp wrapper
$state[$collectionId][$documentId] = $dbForProject->updateDocument(
$collectionId,
$documentId,
@ -473,7 +445,6 @@ class Update extends Action
return;
}
// Use timestamp wrapper for independent operations
$dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $documentId, $data, &$state) {
$document = $dbForProject->updateDocument(
$collectionId,
@ -510,15 +481,21 @@ class Update extends Action
$dependent = isset($state[$collectionId][$documentId]);
if ($dependent) {
// Upsert the state document directly without timestamp wrapper
// Merge partial upsert data with full document from transaction state
$existingDoc = $state[$collectionId][$documentId];
foreach ($data as $key => $value) {
if ($key !== '$id') {
$existingDoc->setAttribute($key, $value);
}
}
$state[$collectionId][$documentId] = $dbForProject->upsertDocument(
$collectionId,
new Document($data),
$existingDoc,
);
return;
}
// Use timestamp wrapper for independent operations
$dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $documentId, $data, &$state) {
$state[$collectionId][$documentId] = $dbForProject->upsertDocument(
$collectionId,
@ -549,13 +526,11 @@ class Update extends Action
$dependent = isset($state[$collectionId][$documentId]);
if ($dependent) {
// Delete without timestamp wrapper
$dbForProject->deleteDocument($collectionId, $documentId);
unset($state[$collectionId][$documentId]);
return;
}
// Use timestamp wrapper for independent operations
$dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $documentId, &$state) {
$deleted = $dbForProject->deleteDocument($collectionId, $documentId);
if (!$deleted) {
@ -591,7 +566,6 @@ class Update extends Action
$dependent = isset($state[$collectionId][$documentId]);
if ($dependent) {
// Increment without timestamp wrapper
$state[$collectionId][$documentId] = $dbForProject->increaseDocumentAttribute(
collection: $collectionId,
id: $documentId,
@ -602,7 +576,6 @@ class Update extends Action
return;
}
// Use timestamp wrapper for independent operations
$dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $documentId, $data) {
$dbForProject->increaseDocumentAttribute(
collection: $collectionId,
@ -638,7 +611,6 @@ class Update extends Action
$dependent = isset($state[$collectionId][$documentId]);
if ($dependent) {
// Decrement without timestamp wrapper
$state[$collectionId][$documentId] = $dbForProject->decreaseDocumentAttribute(
collection: $collectionId,
id: $documentId,
@ -649,8 +621,6 @@ class Update extends Action
return;
}
// Use timestamp wrapper for independent operations
// Use timestamp wrapper for independent operations
$dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $documentId, $data, &$state) {
$state[$collectionId][$documentId] = $dbForProject->decreaseDocumentAttribute(
collection: $collectionId,
@ -681,7 +651,6 @@ class Update extends Action
array &$state
): void {
$dbForProject->withRequestTimestamp($createdAt, function () use ($dbForProject, $collectionId, $data, &$state) {
// Convert data arrays to Document objects if needed
$documents = \array_map(function ($doc) {
return $doc instanceof Document ? $doc : new Document($doc);
}, $data);
@ -721,29 +690,50 @@ class Update extends Action
$queries = Query::parseQueries($data['queries'] ?? []);
$updateData = new Document($data['data']);
// First, update documents in the committed database
$dependentDocs = [];
$transactionState->applyBulkUpdateToState($collectionId, $updateData, $queries, $state);
// Clone the document before passing to updateDocuments to prevent mutation
// The database layer mutates the input document, which would corrupt transaction state
$dbForProject->updateDocuments(
$collectionId,
$updateData,
clone $updateData,
$queries,
onNext: function (Document $updated, Document $old) use (&$state, $collectionId, $createdAt) {
// Check if this document was created/modified in this transaction
onNext: function (Document $updated, Document $old) use (&$state, $collectionId, $createdAt, &$dependentDocs) {
$dependent = isset($state[$collectionId][$updated->getId()]);
// If not in transaction state, check for timestamp conflicts
if (!$dependent) {
if ($dependent) {
$dependentDocs[] = $updated->getId();
} else {
$oldUpdatedAt = new \DateTime($old->getUpdatedAt());
if ($oldUpdatedAt > $createdAt) {
throw new ConflictException('Document was updated after the request timestamp');
}
$state[$collectionId][$updated->getId()] = $updated;
}
$state[$collectionId][$updated->getId()] = $updated;
}
);
// Also update documents in the transaction state that match the query
$transactionState->applyBulkUpdateToState($collectionId, $updateData, $queries, $state);
// Re-write dependent documents from state to database to fix partial updates
if (!empty($dependentDocs)) {
$documentsToRewrite = [];
foreach ($dependentDocs as $docId) {
if (isset($state[$collectionId][$docId])) {
$documentsToRewrite[] = $state[$collectionId][$docId];
}
}
if (!empty($documentsToRewrite)) {
$dbForProject->upsertDocuments(
$collectionId,
$documentsToRewrite,
onNext: function (Document $upserted) use (&$state, $collectionId) {
$state[$collectionId][$upserted->getId()] = $upserted;
}
);
}
}
}
/**
@ -767,25 +757,19 @@ class Update extends Action
\DateTime $createdAt,
array &$state
): void {
// Convert data arrays to Document objects if needed
$documents = \array_map(function ($doc) {
return $doc instanceof Document ? $doc : new Document($doc);
}, $data);
// First, apply upserts to documents in the transaction state
// This ensures documents created in this transaction are updated properly
$transactionState->applyBulkUpsertToState($collectionId, $documents, $state);
$mergedDocuments = $transactionState->applyBulkUpsertToState($collectionId, $documents, $state);
// Then run bulk upsert on committed database, checking manually in callback
$dbForProject->upsertDocuments(
$collectionId,
$documents,
$mergedDocuments,
onNext: function (Document $upserted, ?Document $old) use (&$state, $collectionId, $createdAt) {
if ($old !== null) {
// This is an update - check if document was created/modified in this transaction
$dependent = isset($state[$collectionId][$upserted->getId()]);
// If not in transaction state, check for timestamp conflicts
if (!$dependent) {
$oldUpdatedAt = new \DateTime($old->getUpdatedAt());
if ($oldUpdatedAt > $createdAt) {
@ -794,7 +778,6 @@ class Update extends Action
}
}
// If $old is null, this is a create operation - no timestamp check needed
$state[$collectionId][$upserted->getId()] = $upserted;
}
);
@ -830,7 +813,6 @@ class Update extends Action
onNext: function (Document $deleted, Document $old) use (&$state, $collectionId, $createdAt) {
$dependent = isset($state[$collectionId][$deleted->getId()]);
// If not in transaction state, check for timestamp conflicts
if (!$dependent) {
$oldUpdatedAt = new \DateTime($old->getUpdatedAt());
if ($oldUpdatedAt > $createdAt) {
@ -838,14 +820,12 @@ class Update extends Action
}
}
// Remove from state after successful deletion
if (isset($state[$collectionId][$deleted->getId()])) {
unset($state[$collectionId][$deleted->getId()]);
}
}
);
// Also delete documents in the transaction state that match the query
$transactionState->applyBulkDeleteToState($collectionId, $queries, $state);
}
}