Add transaction support for databases with staging and commit/rollback

Co-authored-by: jakeb994 <jakeb994@gmail.com>
This commit is contained in:
Cursor Agent 2025-08-11 08:25:58 +00:00
parent cbf210b9e6
commit 480f8c97ca
24 changed files with 684 additions and 9 deletions

View file

@ -74,6 +74,7 @@ class Decrement extends Action
->param('attribute', '', new Key(), 'Attribute key.')
->param('value', 1, new Numeric(), 'Value to increment the attribute by. The value must be a number.', true)
->param('min', null, new Numeric(), 'Minimum value for the attribute. If the current value is lesser than this value, an exception will be thrown.', true)
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('response')
->inject('dbForProject')
->inject('queueForEvents')
@ -81,7 +82,7 @@ class Decrement extends Action
->callback($this->action(...));
}
public function action(string $databaseId, string $collectionId, string $documentId, string $attribute, int|float $value, int|float|null $min, UtopiaResponse $response, Database $dbForProject, Event $queueForEvents, StatsUsage $queueForStatsUsage): void
public function action(string $databaseId, string $collectionId, string $documentId, string $attribute, int|float $value, int|float|null $min, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, Event $queueForEvents, StatsUsage $queueForStatsUsage): void
{
$database = Authorization::skip(fn () => $dbForProject->getDocument('databases', $databaseId));
if ($database->isEmpty()) {

View file

@ -74,6 +74,7 @@ class Increment extends Action
->param('attribute', '', new Key(), 'Attribute key.')
->param('value', 1, new Numeric(), 'Value to increment the attribute by. The value must be a number.', true)
->param('max', null, new Numeric(), 'Maximum value for the attribute. If the current value is greater than this value, an error will be thrown.', true)
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('response')
->inject('dbForProject')
->inject('queueForEvents')
@ -81,7 +82,7 @@ class Increment extends Action
->callback($this->action(...));
}
public function action(string $databaseId, string $collectionId, string $documentId, string $attribute, int|float $value, int|float|null $max, UtopiaResponse $response, Database $dbForProject, Event $queueForEvents, StatsUsage $queueForStatsUsage): void
public function action(string $databaseId, string $collectionId, string $documentId, string $attribute, int|float $value, int|float|null $max, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, Event $queueForEvents, StatsUsage $queueForStatsUsage): void
{
$database = Authorization::skip(fn () => $dbForProject->getDocument('databases', $databaseId));
if ($database->isEmpty()) {

View file

@ -70,6 +70,7 @@ class Delete extends Action
->param('databaseId', '', new UID(), 'Database ID.')
->param('collectionId', '', new UID(), 'Collection ID. You can create a new collection using the Database service [server integration](https://appwrite.io/docs/server/databases#databasesCreateCollection).')
->param('queries', [], new ArrayList(new Text(APP_LIMIT_ARRAY_ELEMENT_SIZE), APP_LIMIT_ARRAY_PARAMS_SIZE), 'Array of query strings generated using the Query class provided by the SDK. [Learn more about queries](https://appwrite.io/docs/queries). Maximum of ' . APP_LIMIT_ARRAY_PARAMS_SIZE . ' queries are allowed, each ' . APP_LIMIT_ARRAY_ELEMENT_SIZE . ' characters long.', true)
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('response')
->inject('dbForProject')
->inject('queueForStatsUsage')
@ -81,7 +82,7 @@ class Delete extends Action
->callback($this->action(...));
}
public function action(string $databaseId, string $collectionId, array $queries, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void
public function action(string $databaseId, string $collectionId, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void
{
$database = $dbForProject->getDocument('databases', $databaseId);
if ($database->isEmpty()) {

View file

@ -74,6 +74,7 @@ class Update extends Action
->param('collectionId', '', new UID(), 'Collection ID.')
->param('data', [], new JSON(), 'Document data as JSON object. Include only attribute and value pairs to be updated.', true)
->param('queries', [], new ArrayList(new Text(APP_LIMIT_ARRAY_ELEMENT_SIZE), APP_LIMIT_ARRAY_PARAMS_SIZE), 'Array of query strings generated using the Query class provided by the SDK. [Learn more about queries](https://appwrite.io/docs/queries). Maximum of ' . APP_LIMIT_ARRAY_PARAMS_SIZE . ' queries are allowed, each ' . APP_LIMIT_ARRAY_ELEMENT_SIZE . ' characters long.', true)
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('response')
->inject('dbForProject')
->inject('queueForStatsUsage')
@ -85,7 +86,7 @@ class Update extends Action
->callback($this->action(...));
}
public function action(string $databaseId, string $collectionId, string|array $data, array $queries, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void
public function action(string $databaseId, string $collectionId, string|array $data, array $queries, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, Event $queueForEvents, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks, array $plan): void
{
$data = \is_string($data)
? \json_decode($data, true)

View file

@ -71,6 +71,7 @@ class Upsert extends Action
->param('databaseId', '', new UID(), 'Database ID.')
->param('collectionId', '', new UID(), 'Collection ID.')
->param('documents', [], fn (array $plan) => new ArrayList(new JSON(), $plan['databasesBatchSize'] ?? APP_LIMIT_DATABASE_BATCH), 'Array of document data as JSON objects. May contain partial documents.', false, ['plan'])
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('response')
->inject('dbForProject')
->inject('queueForStatsUsage')
@ -78,7 +79,7 @@ class Upsert extends Action
->callback($this->action(...));
}
public function action(string $databaseId, string $collectionId, array $documents, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, array $plan): void
public function action(string $databaseId, string $collectionId, array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, StatsUsage $queueForStatsUsage, array $plan): void
{
$database = $dbForProject->getDocument('databases', $databaseId);
if ($database->isEmpty()) {

View file

@ -117,6 +117,7 @@ class Create extends Action
->param('data', [], new JSON(), 'Document data as JSON object.', true)
->param('permissions', null, new Permissions(APP_LIMIT_ARRAY_PARAMS_SIZE, [Database::PERMISSION_READ, Database::PERMISSION_UPDATE, Database::PERMISSION_DELETE, Database::PERMISSION_WRITE]), 'An array of permissions strings. By default, only the current user is granted all permissions. [Learn more about permissions](https://appwrite.io/docs/permissions).', true)
->param('documents', [], fn (array $plan) => new ArrayList(new JSON(), $plan['databasesBatchSize'] ?? APP_LIMIT_DATABASE_BATCH), 'Array of documents data as JSON objects.', true, ['plan'])
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('response')
->inject('dbForProject')
->inject('user')
@ -127,7 +128,7 @@ class Create extends Action
->inject('queueForWebhooks')
->callback($this->action(...));
}
public function action(string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, ?array $documents, UtopiaResponse $response, Database $dbForProject, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks): void
public function action(string $databaseId, string $documentId, string $collectionId, string|array $data, ?array $permissions, ?array $documents, ?string $transactionId, UtopiaResponse $response, Database $dbForProject, Document $user, Event $queueForEvents, StatsUsage $queueForStatsUsage, Event $queueForRealtime, Event $queueForFunctions, Event $queueForWebhooks): void
{
$data = \is_string($data)
? \json_decode($data, true)

View file

@ -71,6 +71,7 @@ class Delete extends Action
->param('databaseId', '', new UID(), 'Database ID.')
->param('collectionId', '', new UID(), 'Collection ID. You can create a new collection using the Database service [server integration](https://appwrite.io/docs/server/databases#databasesCreateCollection).')
->param('documentId', '', new UID(), 'Document ID.')
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('requestTimestamp')
->inject('response')
->inject('dbForProject')
@ -79,7 +80,7 @@ class Delete extends Action
->callback($this->action(...));
}
public function action(string $databaseId, string $collectionId, string $documentId, ?\DateTime $requestTimestamp, UtopiaResponse $response, Database $dbForProject, Event $queueForEvents, StatsUsage $queueForStatsUsage): void
public function action(string $databaseId, string $collectionId, string $documentId, ?string $transactionId, ?\DateTime $requestTimestamp, UtopiaResponse $response, Database $dbForProject, Event $queueForEvents, StatsUsage $queueForStatsUsage): void
{
$database = Authorization::skip(fn () => $dbForProject->getDocument('databases', $databaseId));

View file

@ -77,6 +77,7 @@ class Update extends Action
->param('documentId', '', new UID(), 'Document ID.')
->param('data', [], new JSON(), 'Document data as JSON object. Include only attribute and value pairs to be updated.', true)
->param('permissions', null, new Permissions(APP_LIMIT_ARRAY_PARAMS_SIZE, [Database::PERMISSION_READ, Database::PERMISSION_UPDATE, Database::PERMISSION_DELETE, Database::PERMISSION_WRITE]), 'An array of permissions strings. By default, the current permissions are inherited. [Learn more about permissions](https://appwrite.io/docs/permissions).', true)
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('requestTimestamp')
->inject('response')
->inject('dbForProject')
@ -85,7 +86,7 @@ class Update extends Action
->callback($this->action(...));
}
public function action(string $databaseId, string $collectionId, string $documentId, string|array $data, ?array $permissions, ?\DateTime $requestTimestamp, UtopiaResponse $response, Database $dbForProject, Event $queueForEvents, StatsUsage $queueForStatsUsage): void
public function action(string $databaseId, string $collectionId, string $documentId, string|array $data, ?array $permissions, ?string $transactionId, ?\DateTime $requestTimestamp, UtopiaResponse $response, Database $dbForProject, Event $queueForEvents, StatsUsage $queueForStatsUsage): void
{
$data = (\is_string($data)) ? \json_decode($data, true) : $data; // Cast to JSON array

View file

@ -80,6 +80,7 @@ class Upsert extends Action
->param('documentId', '', new CustomId(), 'Document ID.')
->param('data', [], new JSON(), 'Document data as JSON object. Include all required attributes of the document to be created or updated.')
->param('permissions', null, new Permissions(APP_LIMIT_ARRAY_PARAMS_SIZE, [Database::PERMISSION_READ, Database::PERMISSION_UPDATE, Database::PERMISSION_DELETE, Database::PERMISSION_WRITE]), 'An array of permissions strings. By default, the current permissions are inherited. [Learn more about permissions](https://appwrite.io/docs/permissions).', true)
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('requestTimestamp')
->inject('response')
->inject('user')
@ -89,7 +90,7 @@ class Upsert extends Action
->callback($this->action(...));
}
public function action(string $databaseId, string $collectionId, string $documentId, string|array $data, ?array $permissions, ?\DateTime $requestTimestamp, UtopiaResponse $response, Document $user, Database $dbForProject, Event $queueForEvents, StatsUsage $queueForStatsUsage): void
public function action(string $databaseId, string $collectionId, string $documentId, string|array $data, ?array $permissions, ?string $transactionId, ?\DateTime $requestTimestamp, UtopiaResponse $response, Document $user, Database $dbForProject, Event $queueForEvents, StatsUsage $queueForStatsUsage): void
{
$data = (\is_string($data)) ? \json_decode($data, true) : $data; // Cast to JSON array

View file

@ -56,6 +56,7 @@ class Delete extends DocumentsDelete
->param('databaseId', '', new UID(), 'Database ID.')
->param('tableId', '', new UID(), 'Table ID. You can create a new table using the Database service [server integration](https://appwrite.io/docs/server/tables#tablesCreate).')
->param('queries', [], new ArrayList(new Text(APP_LIMIT_ARRAY_ELEMENT_SIZE), APP_LIMIT_ARRAY_PARAMS_SIZE), 'Array of query strings generated using the Query class provided by the SDK. [Learn more about queries](https://appwrite.io/docs/queries). Maximum of ' . APP_LIMIT_ARRAY_PARAMS_SIZE . ' queries are allowed, each ' . APP_LIMIT_ARRAY_ELEMENT_SIZE . ' characters long.', true)
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('response')
->inject('dbForProject')
->inject('queueForStatsUsage')

View file

@ -58,6 +58,7 @@ class Update extends DocumentsUpdate
->param('tableId', '', new UID(), 'Table ID.')
->param('data', [], new JSON(), 'Row data as JSON object. Include only column and value pairs to be updated.', true)
->param('queries', [], new ArrayList(new Text(APP_LIMIT_ARRAY_ELEMENT_SIZE), APP_LIMIT_ARRAY_PARAMS_SIZE), 'Array of query strings generated using the Query class provided by the SDK. [Learn more about queries](https://appwrite.io/docs/queries). Maximum of ' . APP_LIMIT_ARRAY_PARAMS_SIZE . ' queries are allowed, each ' . APP_LIMIT_ARRAY_ELEMENT_SIZE . ' characters long.', true)
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('response')
->inject('dbForProject')
->inject('queueForStatsUsage')

View file

@ -58,6 +58,7 @@ class Upsert extends DocumentsUpsert
->param('databaseId', '', new UID(), 'Database ID.')
->param('tableId', '', new UID(), 'Table ID.')
->param('rows', [], fn (array $plan) => new ArrayList(new JSON(), $plan['databasesBatchSize'] ?? APP_LIMIT_DATABASE_BATCH), 'Array of row data as JSON objects. May contain partial rows.', false, ['plan'])
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('response')
->inject('dbForProject')
->inject('queueForStatsUsage')

View file

@ -60,6 +60,7 @@ class Decrement extends DecrementDocumentAttribute
->param('column', '', new Key(), 'Column key.')
->param('value', 1, new Numeric(), 'Value to increment the column by. The value must be a number.', true)
->param('min', null, new Numeric(), 'Minimum value for the column. If the current value is lesser than this value, an exception will be thrown.', true)
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('response')
->inject('dbForProject')
->inject('queueForEvents')

View file

@ -60,6 +60,7 @@ class Increment extends IncrementDocumentAttribute
->param('column', '', new Key(), 'Column key.')
->param('value', 1, new Numeric(), 'Value to increment the column by. The value must be a number.', true)
->param('max', null, new Numeric(), 'Maximum value for the column. If the current value is greater than this value, an error will be thrown.', true)
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('response')
->inject('dbForProject')
->inject('queueForEvents')

View file

@ -96,6 +96,7 @@ class Create extends DocumentCreate
->param('data', [], new JSON(), 'Row data as JSON object.', true)
->param('permissions', null, new Permissions(APP_LIMIT_ARRAY_PARAMS_SIZE, [Database::PERMISSION_READ, Database::PERMISSION_UPDATE, Database::PERMISSION_DELETE, Database::PERMISSION_WRITE]), 'An array of permissions strings. By default, only the current user is granted all permissions. [Learn more about permissions](https://appwrite.io/docs/permissions).', true)
->param('rows', [], fn (array $plan) => new ArrayList(new JSON(), $plan['databasesBatchSize'] ?? APP_LIMIT_DATABASE_BATCH), 'Array of documents data as JSON objects.', true, ['plan'])
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('response')
->inject('dbForProject')
->inject('user')

View file

@ -61,6 +61,7 @@ class Delete extends DocumentDelete
->param('databaseId', '', new UID(), 'Database ID.')
->param('tableId', '', new UID(), 'Table ID. You can create a new table using the Database service [server integration](https://appwrite.io/docs/server/tables#tablesCreate).')
->param('rowId', '', new UID(), 'Row ID.')
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('requestTimestamp')
->inject('response')
->inject('dbForProject')

View file

@ -60,6 +60,7 @@ class Update extends DocumentUpdate
->param('rowId', '', new UID(), 'Row ID.')
->param('data', [], new JSON(), 'Row data as JSON object. Include only columns and value pairs to be updated.', true)
->param('permissions', null, new Permissions(APP_LIMIT_ARRAY_PARAMS_SIZE, [Database::PERMISSION_READ, Database::PERMISSION_UPDATE, Database::PERMISSION_DELETE, Database::PERMISSION_WRITE]), 'An array of permissions strings. By default, the current permissions are inherited. [Learn more about permissions](https://appwrite.io/docs/permissions).', true)
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('requestTimestamp')
->inject('response')
->inject('dbForProject')

View file

@ -62,6 +62,7 @@ class Upsert extends DocumentUpsert
->param('rowId', '', new UID(), 'Row ID.')
->param('data', [], new JSON(), 'Row data as JSON object. Include all required columns of the row to be created or updated.', true)
->param('permissions', null, new Permissions(APP_LIMIT_ARRAY_PARAMS_SIZE, [Database::PERMISSION_READ, Database::PERMISSION_UPDATE, Database::PERMISSION_DELETE, Database::PERMISSION_WRITE]), 'An array of permissions strings. By default, the current permissions are inherited. [Learn more about permissions](https://appwrite.io/docs/permissions).', true)
->param('transactionId', null, new UID(), 'Transaction ID for staging the operation.', true)
->inject('requestTimestamp')
->inject('response')
->inject('user')

View file

@ -0,0 +1,117 @@
<?php
namespace Appwrite\Platform\Modules\Databases\Http\Transactions;
use Appwrite\Extend\Exception;
use Appwrite\Platform\Action;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
use Appwrite\SDK\Method;
use Appwrite\SDK\Response as SDKResponse;
use Appwrite\Utopia\Database\Validator\Operation;
use Appwrite\Utopia\Response as UtopiaResponse;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Helpers\ID;
use Utopia\Database\Validator\UID;
use Utopia\Swoole\Response as SwooleResponse;
use Utopia\Validator\ArrayList;
class AddOperations extends Action
{
public static function getName(): string
{
return 'createOperations';
}
protected function getResponseModel(): string
{
return UtopiaResponse::MODEL_TRANSACTION;
}
public function __construct()
{
$this
->setHttpMethod(self::HTTP_REQUEST_METHOD_POST)
->setHttpPath('/v1/databases/transactions/:transactionId/operations')
->desc('Add operations to transaction')
->groups(['api', 'database', 'transactions'])
->label('scope', 'transactions.write')
->label('resourceType', RESOURCE_TYPE_DATABASES)
->label('sdk', new Method(
namespace: 'databases',
group: 'transactions',
name: 'createOperations',
description: '/docs/references/databases/create-operations.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: SwooleResponse::STATUS_CODE_CREATED,
model: UtopiaResponse::MODEL_TRANSACTION,
)
],
contentType: ContentType::JSON
))
->param('transactionId', '', new UID(), 'Transaction ID.')
->param('operations', [], new ArrayList(new Operation()), 'Array of staged operations.', true)
->inject('response')
->inject('dbForProject')
->inject('plan')
->callback($this->action(...));
}
public function action(string $transactionId, array $operations, UtopiaResponse $response, Database $dbForProject, array $plan): void
{
$transaction = $dbForProject->getDocument('transactions', $transactionId);
if ($transaction->isEmpty() || $transaction->getAttribute('status', '') !== 'pending') {
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Invalid or nonpending transaction');
}
$maxBatch = $plan['databasesBatchSize'] ?? APP_LIMIT_DATABASE_BATCH;
$existing = $transaction->getAttribute('operations', 0);
if (($existing + \count($operations)) > $maxBatch) {
throw new Exception(
Exception::TRANSACTION_LIMIT_EXCEEDED,
'Transaction already has ' . $existing . ' operations, adding ' . \count($operations) . ' would exceed the maximum of ' . $maxBatch
);
}
$databases = $collections = $staged = [];
foreach ($operations as $operation) {
$database = $databases[$operation['databaseId']] ??= $dbForProject->getDocument('databases', $operation['databaseId']);
if ($database->isEmpty()) {
throw new Exception(Exception::DATABASE_NOT_FOUND);
}
$collection = $collections[$operation['collectionId']] ??= $dbForProject->getDocument('database_' . $database->getSequence(), $operation['collectionId']);
if ($collection->isEmpty()) {
throw new Exception(Exception::COLLECTION_NOT_FOUND);
}
$staged[] = new Document([
'$id' => ID::unique(),
'databaseInternalId' => $database->getSequence(),
'collectionInternalId' => $collection->getSequence(),
'transactionInternalId' => $transaction->getSequence(),
'documentId' => $operation['documentId'] ?? ID::unique(),
'action' => $operation['action'],
'data' => $operation['data'] ?? new \stdClass(),
]);
}
$dbForProject->withTransaction(function () use ($dbForProject, $transactionId, $staged, $existing, $operations) {
$dbForProject->createDocuments('transactionLogs', $staged);
$dbForProject->increaseDocumentAttribute(
'transactions',
$transactionId,
'operations',
\count($operations)
);
});
$response
->setStatusCode(SwooleResponse::STATUS_CODE_CREATED)
->dynamic($transaction, UtopiaResponse::MODEL_TRANSACTION);
}
}

View file

@ -0,0 +1,73 @@
<?php
namespace Appwrite\Platform\Modules\Databases\Http\Transactions;
use Appwrite\Platform\Action;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
use Appwrite\SDK\Method;
use Appwrite\SDK\Response as SDKResponse;
use Appwrite\Utopia\Response as UtopiaResponse;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Helpers\ID;
use Utopia\Database\Validator\UID;
use Utopia\DateTime\DateTime;
use Utopia\Swoole\Response as SwooleResponse;
use Utopia\Validator\Range;
class Create extends Action
{
public static function getName(): string
{
return 'createTransaction';
}
protected function getResponseModel(): string
{
return UtopiaResponse::MODEL_TRANSACTION;
}
public function __construct()
{
$this
->setHttpMethod(self::HTTP_REQUEST_METHOD_POST)
->setHttpPath('/v1/databases/transactions')
->desc('Create transaction')
->groups(['api', 'database', 'transactions'])
->label('scope', 'transactions.write')
->label('resourceType', RESOURCE_TYPE_DATABASES)
->label('sdk', new Method(
namespace: 'databases',
group: 'transactions',
name: 'createTransaction',
description: '/docs/references/databases/create-transaction.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: SwooleResponse::STATUS_CODE_CREATED,
model: UtopiaResponse::MODEL_TRANSACTION,
)
],
contentType: ContentType::JSON
))
->param('ttl', APP_DATABASE_TXN_TTL_DEFAULT, new Range(min: APP_DATABASE_TXN_TTL_MIN, max: APP_DATABASE_TXN_TTL_MAX), 'Seconds before the transaction expires.', true)
->inject('response')
->inject('dbForProject')
->callback($this->action(...));
}
public function action(int $ttl, UtopiaResponse $response, Database $dbForProject): void
{
$transaction = $dbForProject->createDocument('transactions', new Document([
'$id' => ID::unique(),
'status' => 'pending',
'operations' => 0,
'expiresAt' => DateTime::addSeconds(new \DateTime(), $ttl),
]));
$response
->setStatusCode(SwooleResponse::STATUS_CODE_CREATED)
->dynamic($transaction, UtopiaResponse::MODEL_TRANSACTION);
}
}

View file

@ -0,0 +1,76 @@
<?php
namespace Appwrite\Platform\Modules\Databases\Http\Transactions;
use Appwrite\Extend\Exception;
use Appwrite\Platform\Action;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
use Appwrite\SDK\Method;
use Appwrite\SDK\Response as SDKResponse;
use Appwrite\Utopia\Response as UtopiaResponse;
use Utopia\Database\Database;
use Utopia\Database\Query;
use Utopia\Database\Validator\UID;
use Utopia\Swoole\Response as SwooleResponse;
class Delete extends Action
{
public static function getName(): string
{
return 'deleteTransaction';
}
protected function getResponseModel(): string
{
return UtopiaResponse::MODEL_NONE;
}
public function __construct()
{
$this
->setHttpMethod(self::HTTP_REQUEST_METHOD_DELETE)
->setHttpPath('/v1/databases/transactions/:transactionId')
->desc('Delete transaction')
->groups(['api', 'database', 'transactions'])
->label('scope', 'transactions.write')
->label('resourceType', RESOURCE_TYPE_DATABASES)
->label('sdk', new Method(
namespace: 'databases',
group: 'transactions',
name: 'deleteTransaction',
description: '/docs/references/databases/delete-transaction.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: SwooleResponse::STATUS_CODE_NOCONTENT,
model: UtopiaResponse::MODEL_NONE,
)
],
contentType: ContentType::NONE
))
->param('transactionId', '', new UID(), 'Transaction ID.')
->inject('response')
->inject('dbForProject')
->callback($this->action(...));
}
public function action(string $transactionId, UtopiaResponse $response, Database $dbForProject): void
{
$transaction = $dbForProject->getDocument('transactions', $transactionId);
if ($transaction->isEmpty()) {
throw new Exception(Exception::TRANSACTION_NOT_FOUND);
}
if (!$dbForProject->deleteDocument('transactions', $transactionId)) {
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Failed to remove transaction from DB');
}
$dbForProject->deleteDocuments('transactionLogs', [
Query::equal('transactionInternalId', [$transaction->getSequence()]),
]);
$response->noContent();
}
}

View file

@ -0,0 +1,69 @@
<?php
namespace Appwrite\Platform\Modules\Databases\Http\Transactions;
use Appwrite\Extend\Exception;
use Appwrite\Platform\Action;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
use Appwrite\SDK\Method;
use Appwrite\SDK\Response as SDKResponse;
use Appwrite\Utopia\Response as UtopiaResponse;
use Utopia\Database\Database;
use Utopia\Database\Validator\UID;
use Utopia\Swoole\Response as SwooleResponse;
class Get extends Action
{
public static function getName(): string
{
return 'getTransaction';
}
protected function getResponseModel(): string
{
return UtopiaResponse::MODEL_TRANSACTION;
}
public function __construct()
{
$this
->setHttpMethod(self::HTTP_REQUEST_METHOD_GET)
->setHttpPath('/v1/databases/transactions/:transactionId')
->desc('Get transaction')
->groups(['api', 'database', 'transactions'])
->label('scope', 'transactions.read')
->label('resourceType', RESOURCE_TYPE_DATABASES)
->label('sdk', new Method(
namespace: 'databases',
group: 'transactions',
name: 'getTransaction',
description: '/docs/references/databases/get-transaction.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: SwooleResponse::STATUS_CODE_OK,
model: UtopiaResponse::MODEL_TRANSACTION,
)
],
contentType: ContentType::JSON
))
->param('transactionId', '', new UID(), 'Transaction ID.')
->inject('response')
->inject('dbForProject')
->callback($this->action(...));
}
public function action(string $transactionId, UtopiaResponse $response, Database $dbForProject): void
{
$transaction = $dbForProject->getDocument('transactions', $transactionId);
if ($transaction->isEmpty()) {
throw new Exception(Exception::TRANSACTION_NOT_FOUND);
}
$response
->setStatusCode(SwooleResponse::STATUS_CODE_OK)
->dynamic($transaction, UtopiaResponse::MODEL_TRANSACTION);
}
}

View file

@ -0,0 +1,249 @@
<?php
namespace Appwrite\Platform\Modules\Databases\Http\Transactions;
use Appwrite\Extend\Exception;
use Appwrite\Platform\Action;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
use Appwrite\SDK\Method;
use Appwrite\SDK\Response as SDKResponse;
use Appwrite\Utopia\Response as UtopiaResponse;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Exception\Conflict as ConflictException;
use Utopia\Database\Exception\Duplicate as DuplicateException;
use Utopia\Database\Helpers\ID;
use Utopia\Database\Query;
use Utopia\Database\Validator\UID;
use Utopia\Swoole\Response as SwooleResponse;
use Utopia\Validator\Boolean;
class Update extends Action
{
public static function getName(): string
{
return 'updateTransaction';
}
protected function getResponseModel(): string
{
return UtopiaResponse::MODEL_TRANSACTION;
}
public function __construct()
{
$this
->setHttpMethod(self::HTTP_REQUEST_METHOD_PATCH)
->setHttpPath('/v1/databases/transactions/:transactionId')
->desc('Update transaction')
->groups(['api', 'database', 'transactions'])
->label('scope', 'collections.write')
->label('resourceType', RESOURCE_TYPE_DATABASES)
->label('sdk', new Method(
namespace: 'databases',
group: 'transactions',
name: 'updateTransaction',
description: '/docs/references/databases/update-transaction.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: SwooleResponse::STATUS_CODE_OK,
model: UtopiaResponse::MODEL_TRANSACTION,
)
],
contentType: ContentType::JSON
))
->param('transactionId', '', new UID(), 'Transaction ID.')
->param('commit', false, new Boolean(), 'Commit transaction?', true)
->param('rollback', false, new Boolean(), 'Rollback transaction?', true)
->inject('requestTimestamp')
->inject('response')
->inject('dbForProject')
->inject('project')
->callback($this->action(...));
}
public function action(string $transactionId, bool $commit, bool $rollback, ?\DateTime $requestTimestamp, UtopiaResponse $response, Database $dbForProject, Document $project): void
{
if (!$commit && !$rollback) {
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Either commit or rollback must be true');
}
if ($commit && $rollback) {
throw new Exception(Exception::GENERAL_BAD_REQUEST, 'Cannot commit and rollback at the same time');
}
$transaction = $dbForProject->getDocument('transactions', $transactionId);
if ($transaction->isEmpty()) {
throw new Exception(Exception::TRANSACTION_NOT_FOUND);
}
if ($transaction->getAttribute('status', '') !== 'pending') {
throw new Exception(Exception::TRANSACTION_NOT_READY);
}
$now = new \DateTime();
$expiresAt = new \DateTime($transaction->getAttribute('expiresAt', 'now'));
if ($now > $expiresAt) {
throw new Exception(Exception::TRANSACTION_EXPIRED);
}
if ($commit) {
$dbForProject->withTransaction(function () use ($dbForProject, $transactionId, $transaction, $requestTimestamp) {
$dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'committing',
]));
$operations = $dbForProject->find('transactionLogs', [
Query::equal('transactionInternalId', [$transaction->getSequence()]),
]);
$creates
= $updates
= $deletes
= $increments
= $decrements
= $bulkUpdates
= $bulkDeletes
= [];
foreach ($operations as $operation) {
$databaseInternalId = $operation['databaseInternalId'];
$collectionInternalId = $operation['collectionInternalId'];
$documentId = $operation['documentId'];
switch ($operation['action']) {
case 'create':
$creates[$databaseInternalId][$collectionInternalId][] = new Document([
'$id' => $documentId ?? ID::unique(),
...$operation['data']
]);
break;
case 'update':
case 'upsert':
$updates[$databaseInternalId][$collectionInternalId][] = new Document([
'$id' => $documentId,
...$operation['data'],
]);
break;
case 'delete':
$deletes[$databaseInternalId][$collectionInternalId][] = $documentId;
break;
case 'increment':
$increments[$databaseInternalId][$collectionInternalId][] = [
'attribute' => $operation['data']['attribute'],
'value' => $operation['data']['value'] ?? 1,
'max' => $operation['data']['max'] ?? null,
];
break;
case 'decrement':
$decrements[$databaseInternalId][$collectionInternalId][] = [
'attribute' => $operation['data']['attribute'],
'value' => $operation['data']['value'] ?? 1,
'min' => $operation['data']['min'] ?? null,
];
break;
case 'bulkUpdate':
$bulkUpdates[$databaseInternalId][$collectionInternalId][] = [
'data' => $operation['data']['data'] ?? null,
'queries' => $operation['data']['queries'] ?? [],
];
break;
case 'bulkDelete':
$bulkDeletes[$databaseInternalId][$collectionInternalId][] = [
'queries' => $operation['data']['queries'] ?? [],
];
break;
}
}
try {
foreach ($creates as $dbId => $cols) {
foreach ($cols as $colId => $docs) {
$dbForProject->createDocuments("database_{$dbId}_collection_{$colId}", $docs);
}
}
foreach ($updates as $dbId => $cols) {
foreach ($cols as $colId => $docs) {
$dbForProject->createOrUpdateDocuments("database_{$dbId}_collection_{$colId}", $docs);
}
}
foreach ($deletes as $dbId => $cols) {
foreach ($cols as $colId => $ids) {
$dbForProject->deleteDocuments("database_{$dbId}_collection_{$colId}", [
Query::equal('$id', $ids),
]);
}
}
foreach ($increments as $dbId => $cols) {
foreach ($cols as $colId => $increments) {
foreach ($increments as $increment) {
$dbForProject->increaseDocumentAttribute(
"database_{$dbId}_collection_{$colId}",
$increment['attribute'],
$increment['value'],
$increment['max']
);
}
}
}
foreach ($decrements as $dbId => $cols) {
foreach ($cols as $colId => $decrements) {
foreach ($decrements as $decrement) {
$dbForProject->decreaseDocumentAttribute(
"database_{$dbId}_collection_{$colId}",
$decrement['attribute'],
$decrement['value'],
$decrement['min']
);
}
}
}
foreach ($bulkUpdates as $dbId => $cols) {
foreach ($cols as $colId => $updates) {
foreach ($updates as $update) {
$dbForProject->updateDocuments("database_{$dbId}_collection_{$colId}", $update['data'], $update['queries']);
}
}
}
foreach ($bulkDeletes as $dbId => $cols) {
foreach ($cols as $colId => $deletes) {
foreach ($deletes as $delete) {
$dbForProject->deleteDocuments("database_{$dbId}_collection_{$colId}", $delete['queries']);
}
}
}
$dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'committed',
]));
$dbForProject->deleteDocuments('transactionLogs', [
Query::equal('transactionInternalId', [$transaction->getSequence()]),
]);
} catch (DuplicateException|ConflictException) {
$dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'failed',
]));
throw new Exception(Exception::TRANSACTION_CONFLICT);
}
});
$transaction = $dbForProject->getDocument('transactions', $transactionId);
}
if ($rollback) {
$dbForProject->deleteDocuments('transactionLogs', [
Query::equal('transactionInternalId', [$transaction->getSequence()]),
]);
$transaction = $dbForProject->updateDocument('transactions', $transactionId, new Document([
'status' => 'rolledBack',
]));
}
$response
->setStatusCode(SwooleResponse::STATUS_CODE_OK)
->dynamic($transaction, UtopiaResponse::MODEL_TRANSACTION);
}
}

View file

@ -0,0 +1,73 @@
<?php
namespace Appwrite\Platform\Modules\Databases\Http\Transactions;
use Appwrite\Extend\Exception;
use Appwrite\Platform\Action;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
use Appwrite\SDK\Method;
use Appwrite\SDK\Response as SDKResponse;
use Appwrite\Utopia\Database\Validator\Queries\Transactions;
use Appwrite\Utopia\Response as UtopiaResponse;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Exception\Query as QueryException;
use Utopia\Database\Query;
use Utopia\Swoole\Response as SwooleResponse;
class XList extends Action
{
public static function getName(): string
{
return 'listTransactions';
}
protected function getResponseModel(): string
{
return UtopiaResponse::MODEL_TRANSACTION_LIST;
}
public function __construct()
{
$this
->setHttpMethod(self::HTTP_REQUEST_METHOD_GET)
->setHttpPath('/v1/databases/transactions')
->desc('List transactions')
->groups(['api', 'database', 'transactions'])
->label('scope', 'transactions.read')
->label('resourceType', RESOURCE_TYPE_DATABASES)
->label('sdk', new Method(
namespace: 'databases',
group: 'transactions',
name: 'listTransactions',
description: '/docs/references/databases/list-transactions.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: SwooleResponse::STATUS_CODE_OK,
model: UtopiaResponse::MODEL_TRANSACTION_LIST,
)
],
contentType: ContentType::JSON
))
->param('queries', [], new Transactions(), 'Array of query strings generated using the Query class provided by the SDK. [Learn more about queries](https://appwrite.io/docs/queries).', true)
->inject('response')
->inject('dbForProject')
->callback($this->action(...));
}
public function action(array $queries, UtopiaResponse $response, Database $dbForProject): void
{
try {
$queries = Query::parseQueries($queries);
} catch (QueryException $e) {
throw new Exception(Exception::GENERAL_QUERY_INVALID, $e->getMessage());
}
$response->dynamic(new Document([
'transactions' => $dbForProject->find('transactions', $queries),
'total' => $dbForProject->count('transactions', $queries),
]), UtopiaResponse::MODEL_TRANSACTION_LIST);
}
}