Merge pull request #10305 from appwrite/cursor/replay-database-transaction-logs-sequentially-d222

Replay database transaction logs sequentially
This commit is contained in:
Jake Barnby 2025-08-13 01:37:20 +12:00 committed by GitHub
commit 7ca9754ebf
No known key found for this signature in database
GPG key ID: B5690EEEBB952194

View file

@ -93,124 +93,84 @@ class Update extends Action
'status' => 'committing',
]));
// Fetch operations ordered by creation time to maintain exact sequence
$operations = $dbForProject->find('transactionLogs', [
Query::equal('transactionInternalId', [$transaction->getSequence()]),
Query::orderAsc('$createdAt'),
]);
$creates
= $updates
= $deletes
= $increments
= $decrements
= $bulkUpdates
= $bulkDeletes
= [];
foreach ($operations as $operation) {
$databaseInternalId = $operation['databaseInternalId'];
$collectionInternalId = $operation['collectionInternalId'];
$documentId = $operation['documentId'];
switch ($operation['action']) {
case 'create':
$creates[$databaseInternalId][$collectionInternalId][] = new Document([
'$id' => $documentId ?? ID::unique(),
...$operation['data']
]);
break;
case 'update':
case 'upsert':
$updates[$databaseInternalId][$collectionInternalId][] = new Document([
'$id' => $documentId,
...$operation['data'],
]);
break;
case 'delete':
$deletes[$databaseInternalId][$collectionInternalId][] = $documentId;
break;
case 'increment':
$increments[$databaseInternalId][$collectionInternalId][] = [
'attribute' => $operation['data']['attribute'],
'value' => $operation['data']['value'] ?? 1,
'max' => $operation['data']['max'] ?? null,
];
break;
case 'decrement':
$decrements[$databaseInternalId][$collectionInternalId][] = [
'attribute' => $operation['data']['attribute'],
'value' => $operation['data']['value'] ?? 1,
'min' => $operation['data']['min'] ?? null,
];
break;
case 'bulkUpdate':
$bulkUpdates[$databaseInternalId][$collectionInternalId][] = [
'data' => $operation['data']['data'] ?? null,
'queries' => $operation['data']['queries'] ?? [],
];
break;
case 'bulkDelete':
$bulkDeletes[$databaseInternalId][$collectionInternalId][] = [
'queries' => $operation['data']['queries'] ?? [],
];
break;
}
}
try {
foreach ($creates as $dbId => $cols) {
foreach ($cols as $colId => $docs) {
$dbForProject->createDocuments("database_{$dbId}_collection_{$colId}", $docs);
}
}
foreach ($updates as $dbId => $cols) {
foreach ($cols as $colId => $docs) {
$dbForProject->createOrUpdateDocuments("database_{$dbId}_collection_{$colId}", $docs);
}
}
foreach ($deletes as $dbId => $cols) {
foreach ($cols as $colId => $ids) {
$dbForProject->deleteDocuments("database_{$dbId}_collection_{$colId}", [
Query::equal('$id', $ids),
]);
}
}
foreach ($increments as $dbId => $cols) {
foreach ($cols as $colId => $increments) {
foreach ($increments as $increment) {
$dbForProject->increaseDocumentAttribute(
"database_{$dbId}_collection_{$colId}",
$increment['attribute'],
$increment['value'],
$increment['max']
);
// Replay operations in exact order they were created
foreach ($operations as $operation) {
$databaseInternalId = $operation['databaseInternalId'];
$collectionInternalId = $operation['collectionInternalId'];
$documentId = $operation['documentId'];
$action = $operation['action'];
$data = $operation['data'];
$operationCreatedAt = new \DateTime($operation['$createdAt']);
$collectionName = "database_{$databaseInternalId}_collection_{$collectionInternalId}";
// Wrap each operation with the timestamp from when it was logged
$dbForProject->withRequestTimestamp($operationCreatedAt, function () use ($dbForProject, $action, $collectionName, $documentId, $data) {
switch ($action) {
case 'create':
$document = new Document([
'$id' => $documentId ?? ID::unique(),
...$data
]);
$dbForProject->createDocument($collectionName, $document);
break;
case 'update':
case 'upsert':
$document = new Document([
'$id' => $documentId,
...$data,
]);
$dbForProject->createOrUpdateDocument($collectionName, $document);
break;
case 'delete':
$dbForProject->deleteDocument($collectionName, $documentId);
break;
case 'increment':
$dbForProject->increaseDocumentAttribute(
collection: $collectionName,
id: $documentId,
attribute: $data['attribute'],
value: $data['value'] ?? 1,
max: $data['max'] ?? null
);
break;
case 'decrement':
$dbForProject->decreaseDocumentAttribute(
collection: $collectionName,
id: $documentId,
attribute: $data['attribute'],
value: $data['value'] ?? 1,
min: $data['min'] ?? null
);
break;
case 'bulkUpdate':
$dbForProject->updateDocuments(
$collectionName,
$data['data'] ?? null,
$data['queries'] ?? []
);
break;
case 'bulkDelete':
$dbForProject->deleteDocuments(
$collectionName,
$data['queries'] ?? []
);
break;
}
}
}
foreach ($decrements as $dbId => $cols) {
foreach ($cols as $colId => $decrements) {
foreach ($decrements as $decrement) {
$dbForProject->decreaseDocumentAttribute(
"database_{$dbId}_collection_{$colId}",
$decrement['attribute'],
$decrement['value'],
$decrement['min']
);
}
}
}
foreach ($bulkUpdates as $dbId => $cols) {
foreach ($cols as $colId => $updates) {
foreach ($updates as $update) {
$dbForProject->updateDocuments("database_{$dbId}_collection_{$colId}", $update['data'], $update['queries']);
}
}
}
foreach ($bulkDeletes as $dbId => $cols) {
foreach ($cols as $colId => $deletes) {
foreach ($deletes as $delete) {
$dbForProject->deleteDocuments("database_{$dbId}_collection_{$colId}", $delete['queries']);
}
}
});
}
$dbForProject->updateDocument('transactions', $transactionId, new Document([