Fix tests

This commit is contained in:
Jake Barnby 2025-10-04 00:46:13 +13:00
parent deb3d75375
commit 89a47953d8
No known key found for this signature in database
GPG key ID: C437A8CC85B96E9C
2 changed files with 77 additions and 56 deletions

View file

@ -188,6 +188,8 @@ class TransactionState
$adjustedCount = $baseCount;
$filters = $this->extractFilters($queries);
// Apply transaction state changes to the count
foreach ($state[$collectionId] as $docId => $docState) {
if (!$docState['exists']) {
@ -197,13 +199,13 @@ class TransactionState
}
} elseif ($docState['action'] === 'create') {
// Document was created in transaction
if ($this->documentMatchesFilters($docState['document'], $queries)) {
if ($this->documentMatchesFilters($docState['document'], $filters)) {
$adjustedCount++; // New document that matches
}
} elseif ($docState['action'] === 'update' || $docState['action'] === 'upsert') {
// Document was updated/upserted
$wasInResults = isset($committedDocIds[$docId]);
$nowMatches = $this->documentMatchesFilters($docState['document'], $queries);
$nowMatches = $this->documentMatchesFilters($docState['document'], $filters);
if (!$wasInResults && $nowMatches && $docState['action'] === 'upsert') {
$adjustedCount++; // Upsert created new document that matches
@ -259,8 +261,10 @@ class TransactionState
return;
}
$filters = $this->extractFilters($queries);
foreach ($state[$collectionId] as $docId => $doc) {
if ($this->documentMatchesFilters($doc, $queries)) {
if ($this->documentMatchesFilters($doc, $filters)) {
// Apply the update to the state document
foreach ($updateData->getArrayCopy() as $key => $value) {
if ($key !== '$id') {
@ -290,8 +294,10 @@ class TransactionState
return;
}
$filters = $this->extractFilters($queries);
foreach ($state[$collectionId] as $docId => $doc) {
if ($this->documentMatchesFilters($doc, $queries)) {
if ($this->documentMatchesFilters($doc, $filters)) {
unset($state[$collectionId][$docId]);
}
}
@ -501,19 +507,17 @@ class TransactionState
}
/**
* Check if a document matches filter queries
* Extract only filter queries from a query array
*
* @param Document $doc Document to check
* @param array $queries Query filters
* @return bool True if document matches all filters
* @param array $queries Query array
* @return array Filtered queries
*/
private function documentMatchesFilters(Document $doc, array $queries): bool
private function extractFilters(array $queries): array
{
// Extract filter queries
$filters = [];
foreach ($queries as $query) {
$method = $query->getMethod();
// Only process filter queries, not limit/offset/cursor/select
// Only process filter queries, not limit/offset/cursor/select/order
if (!\in_array($method, [
Query::TYPE_LIMIT,
Query::TYPE_OFFSET,
@ -526,7 +530,18 @@ class TransactionState
$filters[] = $query;
}
}
return $filters;
}
/**
* Check if a document matches filter queries
*
* @param Document $doc Document to check
* @param array $filters Pre-filtered Query filters (use extractFilters first)
* @return bool True if document matches all filters
*/
private function documentMatchesFilters(Document $doc, array $filters): bool
{
// If no filters, document matches
if (empty($filters)) {
return true;

View file

@ -129,22 +129,22 @@ class Update extends Action
$totalOperations = 0;
$databaseOperations = [];
$dbForProject->withTransaction(function () use ($dbForProject, $transactionState, $queueForDeletes, $transactionId, &$transaction, &$operations, &$totalOperations, &$databaseOperations, $queueForEvents, $queueForStatsUsage, $queueForRealtime, $queueForFunctions, $queueForWebhooks) {
$dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'committing',
]));
try {
$dbForProject->withTransaction(function () use ($dbForProject, $transactionState, $queueForDeletes, $transactionId, &$transaction, &$operations, &$totalOperations, &$databaseOperations, $queueForEvents, $queueForStatsUsage, $queueForRealtime, $queueForFunctions, $queueForWebhooks) {
$dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'committing',
]));
// Fetch operations ordered by sequence by default to replay operations in exact order they were created
$operations = $dbForProject->find('transactionLogs', [
Query::equal('transactionInternalId', [$transaction->getSequence()]),
Query::orderAsc(),
Query::limit(PHP_INT_MAX),
]);
// Fetch operations ordered by sequence by default to replay operations in exact order they were created
$operations = $dbForProject->find('transactionLogs', [
Query::equal('transactionInternalId', [$transaction->getSequence()]),
Query::orderAsc(),
Query::limit(PHP_INT_MAX),
]);
// Track transaction state for cross-operation visibility
$state = [];
// Track transaction state for cross-operation visibility
$state = [];
try {
foreach ($operations as $operation) {
$databaseInternalId = $operation['databaseInternalId'];
$collectionInternalId = $operation['collectionInternalId'];
@ -207,38 +207,44 @@ class Update extends Action
$queueForDeletes
->setType(DELETE_TYPE_DOCUMENT)
->setDocument($transaction);
} catch (NotFoundException $e) {
$dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
]));
throw new Exception(Exception::DOCUMENT_NOT_FOUND, previous: $e);
} catch (DuplicateException|ConflictException $e) {
$dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
]));
throw new Exception(Exception::TRANSACTION_CONFLICT, previous: $e);
} catch (StructureException $e) {
$dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
]));
throw new Exception(Exception::DOCUMENT_INVALID_STRUCTURE, $e->getMessage());
} catch (LimitException $e) {
$dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
]));
throw new Exception(Exception::ATTRIBUTE_LIMIT_EXCEEDED, $e->getMessage());
} catch (TransactionException $e) {
$dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
]));
throw new Exception(Exception::TRANSACTION_FAILED, $e->getMessage());
} catch (QueryException $e) {
$dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
]));
throw new Exception(Exception::GENERAL_QUERY_INVALID, $e->getMessage());
}
});
});
} catch (NotFoundException $e) {
// Transaction has been rolled back, now mark it as failed
$dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
]));
throw new Exception(Exception::DOCUMENT_NOT_FOUND, previous: $e);
} catch (DuplicateException|ConflictException $e) {
// Transaction has been rolled back, now mark it as failed
$dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
]));
throw new Exception(Exception::TRANSACTION_CONFLICT, previous: $e);
} catch (StructureException $e) {
// Transaction has been rolled back, now mark it as failed
$dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
]));
throw new Exception(Exception::DOCUMENT_INVALID_STRUCTURE, $e->getMessage());
} catch (LimitException $e) {
// Transaction has been rolled back, now mark it as failed
$dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
]));
throw new Exception(Exception::ATTRIBUTE_LIMIT_EXCEEDED, $e->getMessage());
} catch (TransactionException $e) {
// Transaction has been rolled back, now mark it as failed
$dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
]));
throw new Exception(Exception::TRANSACTION_FAILED, $e->getMessage());
} catch (QueryException $e) {
// Transaction has been rolled back, now mark it as failed
$dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
]));
throw new Exception(Exception::GENERAL_QUERY_INVALID, $e->getMessage());
}
$queueForStatsUsage
->addMetric(METRIC_DATABASES_OPERATIONS_WRITES, $totalOperations);