Add transaction staging support for document operations

Co-authored-by: jakeb994 <jakeb994@gmail.com>
This commit is contained in:
Cursor Agent 2025-08-11 08:43:30 +00:00
parent 480f8c97ca
commit 5886c53ce3
9 changed files with 373 additions and 0 deletions

View file

@ -94,6 +94,51 @@ class Decrement extends Action
throw new Exception($this->getParentNotFoundException());
}
// Handle transaction staging
if ($transactionId !== null) {
$transaction = $dbForProject->getDocument('transactions', $transactionId);
if ($transaction->isEmpty() || $transaction->getAttribute('status', '') !== 'pending') {
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Invalid or nonpending transaction');
}
// Stage the operation in transaction logs
$staged = new Document([
'$id' => ID::unique(),
'databaseInternalId' => $database->getSequence(),
'collectionInternalId' => $collection->getSequence(),
'transactionInternalId' => $transaction->getSequence(),
'documentId' => $documentId,
'action' => 'decrement',
'data' => [
'attribute' => $attribute,
'value' => $value,
'min' => $min,
],
]);
$dbForProject->withTransaction(function () use ($dbForProject, $transactionId, $staged) {
$dbForProject->createDocument('transactionLogs', $staged);
$dbForProject->increaseDocumentAttribute(
'transactions',
$transactionId,
'operations',
1
);
});
// Return successful response without actually decrementing
$mockDocument = new Document([
'$id' => $documentId,
'$collectionId' => $collectionId,
'$databaseId' => $databaseId,
$attribute => $value, // Mock response - actual value would be computed during commit
]);
$response
->setStatusCode(SwooleResponse::STATUS_CODE_OK)
->dynamic($mockDocument, $this->getResponseModel());
return;
}
try {
$document = $dbForProject->decreaseDocumentAttribute(
collection: 'database_' . $database->getSequence() . '_collection_' . $collection->getSequence(),

View file

@ -94,6 +94,51 @@ class Increment extends Action
throw new Exception($this->getParentNotFoundException());
}
// Handle transaction staging
if ($transactionId !== null) {
$transaction = $dbForProject->getDocument('transactions', $transactionId);
if ($transaction->isEmpty() || $transaction->getAttribute('status', '') !== 'pending') {
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Invalid or nonpending transaction');
}
// Stage the operation in transaction logs
$staged = new Document([
'$id' => ID::unique(),
'databaseInternalId' => $database->getSequence(),
'collectionInternalId' => $collection->getSequence(),
'transactionInternalId' => $transaction->getSequence(),
'documentId' => $documentId,
'action' => 'increment',
'data' => [
'attribute' => $attribute,
'value' => $value,
'max' => $max,
],
]);
$dbForProject->withTransaction(function () use ($dbForProject, $transactionId, $staged) {
$dbForProject->createDocument('transactionLogs', $staged);
$dbForProject->increaseDocumentAttribute(
'transactions',
$transactionId,
'operations',
1
);
});
// Return successful response without actually incrementing
$mockDocument = new Document([
'$id' => $documentId,
'$collectionId' => $collectionId,
'$databaseId' => $databaseId,
$attribute => $value, // Mock response - actual value would be computed during commit
]);
$response
->setStatusCode(SwooleResponse::STATUS_CODE_OK)
->dynamic($mockDocument, $this->getResponseModel());
return;
}
try {
$document = $dbForProject->increaseDocumentAttribute(
collection: 'database_' . $database->getSequence() . '_collection_' . $collection->getSequence(),

View file

@ -109,6 +109,44 @@ class Delete extends Action
throw new Exception(Exception::GENERAL_QUERY_INVALID, $e->getMessage());
}
// Handle transaction staging
if ($transactionId !== null) {
$transaction = $dbForProject->getDocument('transactions', $transactionId);
if ($transaction->isEmpty() || $transaction->getAttribute('status', '') !== 'pending') {
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Invalid or nonpending transaction');
}
// Stage the operation in transaction logs
$staged = new Document([
'$id' => ID::unique(),
'databaseInternalId' => $database->getSequence(),
'collectionInternalId' => $collection->getSequence(),
'transactionInternalId' => $transaction->getSequence(),
'documentId' => null, // Bulk operation doesn't have specific document ID
'action' => 'bulkDelete',
'data' => [
'queries' => $queries,
],
]);
$dbForProject->withTransaction(function () use ($dbForProject, $transactionId, $staged) {
$dbForProject->createDocument('transactionLogs', $staged);
$dbForProject->increaseDocumentAttribute(
'transactions',
$transactionId,
'operations',
1
);
});
// Return successful response without actually deleting documents
$response->dynamic(new Document([
$this->getSdkGroup() => [],
'total' => 0, // Can't predict how many would be deleted
]), $this->getResponseModel());
return;
}
$documents = [];
try {

View file

@ -121,6 +121,45 @@ class Update extends Action
throw new Exception(Exception::GENERAL_QUERY_INVALID, $e->getMessage());
}
// Handle transaction staging
if ($transactionId !== null) {
$transaction = $dbForProject->getDocument('transactions', $transactionId);
if ($transaction->isEmpty() || $transaction->getAttribute('status', '') !== 'pending') {
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Invalid or nonpending transaction');
}
// Stage the operation in transaction logs
$staged = new Document([
'$id' => ID::unique(),
'databaseInternalId' => $database->getSequence(),
'collectionInternalId' => $collection->getSequence(),
'transactionInternalId' => $transaction->getSequence(),
'documentId' => null, // Bulk operation doesn't have specific document ID
'action' => 'bulkUpdate',
'data' => [
'data' => $data,
'queries' => $queries,
],
]);
$dbForProject->withTransaction(function () use ($dbForProject, $transactionId, $staged) {
$dbForProject->createDocument('transactionLogs', $staged);
$dbForProject->increaseDocumentAttribute(
'transactions',
$transactionId,
'operations',
1
);
});
// Return successful response without actually updating documents
$response->dynamic(new Document([
$this->getSdkGroup() => [],
'total' => 0, // Can't predict how many would be updated
]), $this->getResponseModel());
return;
}
if ($data['$permissions']) {
$validator = new Permissions();
if (!$validator->isValid($data['$permissions'])) {

View file

@ -100,6 +100,45 @@ class Upsert extends Action
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Bulk upsert is not supported for ' . $this->getSdkNamespace() . ' with relationship attributes');
}
// Handle transaction staging
if ($transactionId !== null) {
$transaction = $dbForProject->getDocument('transactions', $transactionId);
if ($transaction->isEmpty() || $transaction->getAttribute('status', '') !== 'pending') {
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Invalid or nonpending transaction');
}
// Stage the operations in transaction logs
$staged = [];
foreach ($documents as $document) {
$staged[] = new Document([
'$id' => ID::unique(),
'databaseInternalId' => $database->getSequence(),
'collectionInternalId' => $collection->getSequence(),
'transactionInternalId' => $transaction->getSequence(),
'documentId' => $document['$id'] ?? ID::unique(),
'action' => 'upsert',
'data' => $document,
]);
}
$dbForProject->withTransaction(function () use ($dbForProject, $transactionId, $staged) {
$dbForProject->createDocuments('transactionLogs', $staged);
$dbForProject->increaseDocumentAttribute(
'transactions',
$transactionId,
'operations',
\count($staged)
);
});
// Return successful response without actually upserting documents
$response->dynamic(new Document([
$this->getSdkGroup() => [],
'total' => \count($documents),
]), $this->getResponseModel());
return;
}
foreach ($documents as $key => $document) {
$documents[$key] = new Document($document);
}

View file

@ -189,6 +189,57 @@ class Create extends Action
throw new Exception($this->getParentNotFoundException());
}
// Handle transaction staging
if ($transactionId !== null) {
$transaction = $dbForProject->getDocument('transactions', $transactionId);
if ($transaction->isEmpty() || $transaction->getAttribute('status', '') !== 'pending') {
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Invalid or nonpending transaction');
}
// Stage the operation(s) in transaction logs
$staged = [];
foreach ($documents as $document) {
$staged[] = new Document([
'$id' => ID::unique(),
'databaseInternalId' => $database->getSequence(),
'collectionInternalId' => $collection->getSequence(),
'transactionInternalId' => $transaction->getSequence(),
'documentId' => $document['$id'] ?? $documentId ?? ID::unique(),
'action' => 'create',
'data' => $document,
]);
}
$dbForProject->withTransaction(function () use ($dbForProject, $transactionId, $staged) {
$dbForProject->createDocuments('transactionLogs', $staged);
$dbForProject->increaseDocumentAttribute(
'transactions',
$transactionId,
'operations',
\count($staged)
);
});
// Return successful response without actually creating documents
if ($isBulk) {
$response->dynamic(new Document([
$this->getSdkGroup() => [],
'total' => \count($documents),
]), $this->getBulkResponseModel());
} else {
$mockDocument = new Document([
'$id' => $documents[0]['$id'] ?? $documentId,
'$collectionId' => $collectionId,
'$databaseId' => $databaseId,
...$documents[0]
]);
$response
->setStatusCode(SwooleResponse::STATUS_CODE_CREATED)
->dynamic($mockDocument, $this->getResponseModel());
}
return;
}
$hasRelationships = \array_filter(
$collection->getAttribute('attributes', []),
fn ($attribute) => $attribute->getAttribute('type') === Database::VAR_RELATIONSHIP

View file

@ -104,6 +104,39 @@ class Delete extends Action
throw new Exception($this->getNotFoundException());
}
// Handle transaction staging
if ($transactionId !== null) {
$transaction = $dbForProject->getDocument('transactions', $transactionId);
if ($transaction->isEmpty() || $transaction->getAttribute('status', '') !== 'pending') {
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Invalid or nonpending transaction');
}
// Stage the operation in transaction logs
$staged = new Document([
'$id' => ID::unique(),
'databaseInternalId' => $database->getSequence(),
'collectionInternalId' => $collection->getSequence(),
'transactionInternalId' => $transaction->getSequence(),
'documentId' => $documentId,
'action' => 'delete',
'data' => [],
]);
$dbForProject->withTransaction(function () use ($dbForProject, $transactionId, $staged) {
$dbForProject->createDocument('transactionLogs', $staged);
$dbForProject->increaseDocumentAttribute(
'transactions',
$transactionId,
'operations',
1
);
});
// Return successful response without actually deleting document
$response->noContent();
return;
}
try {
$dbForProject->withRequestTimestamp($requestTimestamp, function () use ($dbForProject, $database, $collection, $documentId) {
$dbForProject->deleteDocument(

View file

@ -128,6 +128,48 @@ class Update extends Action
throw new Exception($this->getNotFoundException());
}
// Handle transaction staging
if ($transactionId !== null) {
$transaction = $dbForProject->getDocument('transactions', $transactionId);
if ($transaction->isEmpty() || $transaction->getAttribute('status', '') !== 'pending') {
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Invalid or nonpending transaction');
}
// Stage the operation in transaction logs
$staged = new Document([
'$id' => ID::unique(),
'databaseInternalId' => $database->getSequence(),
'collectionInternalId' => $collection->getSequence(),
'transactionInternalId' => $transaction->getSequence(),
'documentId' => $documentId,
'action' => 'update',
'data' => $data,
]);
$dbForProject->withTransaction(function () use ($dbForProject, $transactionId, $staged) {
$dbForProject->createDocument('transactionLogs', $staged);
$dbForProject->increaseDocumentAttribute(
'transactions',
$transactionId,
'operations',
1
);
});
// Return successful response without actually updating document
$mockDocument = new Document([
'$id' => $documentId,
'$collectionId' => $collectionId,
'$databaseId' => $databaseId,
...$document->getArrayCopy(),
...$data
]);
$response
->setStatusCode(SwooleResponse::STATUS_CODE_OK)
->dynamic($mockDocument, $this->getResponseModel());
return;
}
// Map aggregate permissions into the multiple permissions they represent.
$permissions = Permission::aggregate($permissions, [
Database::PERMISSION_READ,

View file

@ -111,6 +111,47 @@ class Upsert extends Action
throw new Exception($this->getParentNotFoundException());
}
// Handle transaction staging
if ($transactionId !== null) {
$transaction = $dbForProject->getDocument('transactions', $transactionId);
if ($transaction->isEmpty() || $transaction->getAttribute('status', '') !== 'pending') {
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Invalid or nonpending transaction');
}
// Stage the operation in transaction logs
$staged = new Document([
'$id' => ID::unique(),
'databaseInternalId' => $database->getSequence(),
'collectionInternalId' => $collection->getSequence(),
'transactionInternalId' => $transaction->getSequence(),
'documentId' => $documentId,
'action' => 'upsert',
'data' => $data,
]);
$dbForProject->withTransaction(function () use ($dbForProject, $transactionId, $staged) {
$dbForProject->createDocument('transactionLogs', $staged);
$dbForProject->increaseDocumentAttribute(
'transactions',
$transactionId,
'operations',
1
);
});
// Return successful response without actually upserting document
$mockDocument = new Document([
'$id' => $documentId,
'$collectionId' => $collectionId,
'$databaseId' => $databaseId,
...$data
]);
$response
->setStatusCode(SwooleResponse::STATUS_CODE_CREATED)
->dynamic($mockDocument, $this->getResponseModel());
return;
}
$allowedPermissions = [
Database::PERMISSION_READ,
Database::PERMISSION_UPDATE,