sync against 1.6x

This commit is contained in:
shimon 2024-09-15 17:40:55 +02:00
parent a00c771be0
commit bee844fcdb
23 changed files with 448 additions and 189 deletions

View file

@ -109,8 +109,8 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole,
if (isset($databases[$dsn->getHost()])) { if (isset($databases[$dsn->getHost()])) {
$database = $databases[$dsn->getHost()]; $database = $databases[$dsn->getHost()];
if ($dsn->getHost() === System::getEnv('_APP_DATABASE_SHARED_TABLES', '')) { $sharedTablesKeys = explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
$database if (in_array($dsn->getHost(), $sharedTablesKeys)) { $database
->setSharedTables(true) ->setSharedTables(true)
->setTenant($project->getInternalId()) ->setTenant($project->getInternalId())
->setNamespace($dsn->getParam('namespace')); ->setNamespace($dsn->getParam('namespace'));
@ -133,8 +133,8 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole,
$databases[$dsn->getHost()] = $database; $databases[$dsn->getHost()] = $database;
if ($dsn->getHost() === System::getEnv('_APP_DATABASE_SHARED_TABLES', '')) { $sharedTablesKeys = explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
$database if (in_array($dsn->getHost(), $sharedTablesKeys)) { $database
->setSharedTables(true) ->setSharedTables(true)
->setTenant($project->getInternalId()) ->setTenant($project->getInternalId())
->setNamespace($dsn->getParam('namespace')); ->setNamespace($dsn->getParam('namespace'));

36
app/config/bla Normal file
View file

@ -0,0 +1,36 @@
version: '3.8'
services:
db_15:
image: mysql:8.0.36-debian
container_name: mysql_db_15
environment:
MYSQL_ROOT_PASSWORD: lJVYGNZTOTF4VsGSnz5CWoVp
MYSQL_DATABASE: appwrite
MYSQL_USER: appwrite_user
MYSQL_PASSWORD: Fy8i367HpJd8EB1VGos3vsf
volumes:
- /mnt/db-15:/var/lib/mysql
ports:
- "3307:3306"
networks:
- mysql_network
db_16:
image: mysql:8.0.36-debian
container_name: mysql_db_16
environment:
MYSQL_ROOT_PASSWORD: d1RNc7NsnsQ8I1OQd0eeLWr0
MYSQL_DATABASE: appwrite
MYSQL_USER: appwrite_user
MYSQL_PASSWORD: KlQ4NMyl46YPumqSA8CUOLjO
volumes:
- /mnt/db-16:/var/lib/mysql
ports:
- "3308:3306"
networks:
- mysql_network
networks:
mysql_network:
driver: bridge

1
app/console Submodule

@ -0,0 +1 @@
Subproject commit 0959b594b32f176819d4afb3a769afea212db789

View file

@ -183,7 +183,8 @@ App::post('/v1/functions')
->inject('queueForBuilds') ->inject('queueForBuilds')
->inject('dbForConsole') ->inject('dbForConsole')
->inject('gitHub') ->inject('gitHub')
->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Database $dbForConsole, GitHub $github) use ($redeployVcs) { ->inject('realtimeConnection')
->action(function (string $functionId, string $name, string $runtime, array $execute, array $events, string $schedule, int $timeout, bool $enabled, bool $logging, string $entrypoint, string $commands, array $scopes, string $installationId, string $providerRepositoryId, string $providerBranch, bool $providerSilentMode, string $providerRootDirectory, string $templateRepository, string $templateOwner, string $templateRootDirectory, string $templateVersion, string $specification, Request $request, Response $response, Database $dbForProject, Document $project, Document $user, Event $queueForEvents, Build $queueForBuilds, Database $dbForConsole, GitHub $githubgithub, Callable $realtimeConnection) use ($redeployVcs) {
$functionId = ($functionId == 'unique()') ? ID::unique() : $functionId; $functionId = ($functionId == 'unique()') ? ID::unique() : $functionId;
$allowList = \array_filter(\explode(',', System::getEnv('_APP_FUNCTIONS_RUNTIMES', ''))); $allowList = \array_filter(\explode(',', System::getEnv('_APP_FUNCTIONS_RUNTIMES', '')));
@ -374,6 +375,7 @@ App::post('/v1/functions')
project: $project project: $project
); );
Realtime::send( Realtime::send(
redis: $realtimeConnection($queueForEvents->getSourceRegion()),
projectId: 'console', projectId: 'console',
payload: $rule->getArrayCopy(), payload: $rule->getArrayCopy(),
events: $allEvents, events: $allEvents,
@ -381,6 +383,7 @@ App::post('/v1/functions')
roles: $target['roles'] roles: $target['roles']
); );
Realtime::send( Realtime::send(
redis: $realtimeConnection($queueForEvents->getSourceRegion()),
projectId: $project->getId(), projectId: $project->getId(),
payload: $rule->getArrayCopy(), payload: $rule->getArrayCopy(),
events: $allEvents, events: $allEvents,

View file

@ -125,6 +125,13 @@ App::post('/v1/projects')
if ($index !== false) { if ($index !== false) {
$dsn = $databases[$index]; $dsn = $databases[$index];
} else { } else {
if ($region !== 'default') {
$databases = array_filter($databases, function ($value) use ($region) {
return str_contains($value, $region);
});
}
$dsn = $databases[array_rand($databases)]; $dsn = $databases[array_rand($databases)];
} }
@ -133,11 +140,13 @@ App::post('/v1/projects')
} }
// TODO: Temporary until all projects are using shared tables. // TODO: Temporary until all projects are using shared tables.
if ($dsn === System::getEnv('_APP_DATABASE_SHARED_TABLES', '')) { $sharedTablesKeys = explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
if (in_array($dsn, $sharedTablesKeys)) {
$schema = 'appwrite'; $schema = 'appwrite';
$database = 'appwrite'; $database = 'appwrite';
$namespace = System::getEnv('_APP_DATABASE_SHARED_NAMESPACE', ''); $namespace = System::getEnv('_APP_DATABASE_SHARED_NAMESPACE', '');
$dsn = $schema . '://' . System::getEnv('_APP_DATABASE_SHARED_TABLES', '') . '?database=' . $database; $dsn = $schema . '://' . $dsn . '?database=' . $database;
if (!empty($namespace)) { if (!empty($namespace)) {
$dsn .= '&namespace=' . $namespace; $dsn .= '&namespace=' . $namespace;
@ -192,8 +201,8 @@ App::post('/v1/projects')
$adapter = $pools->get($dsn->getHost())->pop()->getResource(); $adapter = $pools->get($dsn->getHost())->pop()->getResource();
$dbForProject = new Database($adapter, $cache); $dbForProject = new Database($adapter, $cache);
if ($dsn->getHost() === System::getEnv('_APP_DATABASE_SHARED_TABLES', '')) { $sharedTablesKeys = explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
$dbForProject if (in_array($dsn->getHost(), $sharedTablesKeys)) { $dbForProject
->setSharedTables(true) ->setSharedTables(true)
->setTenant($project->getInternalId()) ->setTenant($project->getInternalId())
->setNamespace($dsn->getParam('namespace')); ->setNamespace($dsn->getParam('namespace'));

View file

@ -590,7 +590,8 @@ App::shutdown()
->inject('queueForFunctions') ->inject('queueForFunctions')
->inject('mode') ->inject('mode')
->inject('dbForConsole') ->inject('dbForConsole')
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole) use ($parseLabel) { ->inject('realtimeConnection')
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole, callable $realtimeConnection) use ($parseLabel) {
$responsePayload = $response->getPayload(); $responsePayload = $response->getPayload();
@ -636,6 +637,7 @@ App::shutdown()
); );
Realtime::send( Realtime::send(
redis: $realtimeConnection($queueForEvents->getSourceRegion()),
projectId: $target['projectId'] ?? $project->getId(), projectId: $target['projectId'] ?? $project->getId(),
payload: $queueForEvents->getRealtimePayload(), payload: $queueForEvents->getRealtimePayload(),
events: $allEvents, events: $allEvents,

View file

@ -89,6 +89,10 @@ use Utopia\Validator\Range;
use Utopia\Validator\URL; use Utopia\Validator\URL;
use Utopia\Validator\WhiteList; use Utopia\Validator\WhiteList;
use Utopia\VCS\Adapter\Git\GitHub as VcsGitHub; use Utopia\VCS\Adapter\Git\GitHub as VcsGitHub;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Utopia\Request;
const APP_NAME = 'Appwrite'; const APP_NAME = 'Appwrite';
const APP_DOMAIN = 'appwrite.io'; const APP_DOMAIN = 'appwrite.io';
@ -1390,8 +1394,8 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForConsole,
$dsn = new DSN('mysql://' . $project->getAttribute('database')); $dsn = new DSN('mysql://' . $project->getAttribute('database'));
} }
if ($dsn->getHost() === System::getEnv('_APP_DATABASE_SHARED_TABLES', '')) { $sharedTablesKeys = explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
$database if (in_array($dsn->getHost(), $sharedTablesKeys)) { $database
->setSharedTables(true) ->setSharedTables(true)
->setTenant($project->getInternalId()) ->setTenant($project->getInternalId())
->setNamespace($dsn->getParam('namespace')); ->setNamespace($dsn->getParam('namespace'));
@ -1443,8 +1447,8 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForConsole,
->setMetadata('project', $project->getId()) ->setMetadata('project', $project->getId())
->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS); ->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS);
if ($dsn->getHost() === System::getEnv('_APP_DATABASE_SHARED_TABLES', '')) { $sharedTablesKeys = explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
$database if (in_array($dsn->getHost(), $sharedTablesKeys)) { $database
->setSharedTables(true) ->setSharedTables(true)
->setTenant($project->getInternalId()) ->setTenant($project->getInternalId())
->setNamespace($dsn->getParam('namespace')); ->setNamespace($dsn->getParam('namespace'));
@ -1494,23 +1498,33 @@ App::setResource('deviceForLocal', function () {
return new Local(); return new Local();
}); });
App::setResource('deviceForFiles', function ($project) { App::setResource('deviceForFiles', function ($project, $connectionString) {
return getDevice(APP_STORAGE_UPLOADS . '/app-' . $project->getId()); return getDevice(APP_STORAGE_UPLOADS.'/app-'.$project->getId(), $connectionString);
}, ['project']); }, ['project', 'connectionString']);
App::setResource('deviceForFunctions', function ($project) { App::setResource('deviceForFunctions', function ($project, $connectionString) {
return getDevice(APP_STORAGE_FUNCTIONS . '/app-' . $project->getId()); return getDevice(APP_STORAGE_FUNCTIONS.'/app-'.$project->getId(), $connectionString);
}, ['project']); }, ['project', 'connectionString']);
App::setResource('deviceForBuilds', function ($project) { App::setResource('deviceForBuilds', function ($project, $connectionString) {
return getDevice(APP_STORAGE_BUILDS . '/app-' . $project->getId()); return getDevice(APP_STORAGE_BUILDS.'/app-'.$project->getId(), $connectionString);
}, ['project']); }, ['project', 'connectionString']);
function getDevice($root): Device App::setResource('connectionString', function () {
return System::getEnv('_APP_CONNECTIONS_STORAGE', '');
});
App::setResource('realtimeConnection',function ($pools) {
return function () use ($pools) {
return $pools->get('pubsub')->pop()->getResource();
};
}, ['pools']);
function getDevice(string $root, string $connectionString = ''): Device
{ {
$connection = System::getEnv('_APP_CONNECTIONS_STORAGE', '');
if (!empty($connection)) { if (! empty($connectionString)) {
$acl = 'private'; $acl = 'private';
$device = Storage::DEVICE_LOCAL; $device = Storage::DEVICE_LOCAL;
$accessKey = ''; $accessKey = '';
@ -1519,7 +1533,7 @@ function getDevice($root): Device
$region = ''; $region = '';
try { try {
$dsn = new DSN($connection); $dsn = new DSN($connectionString);
$device = $dsn->getScheme(); $device = $dsn->getScheme();
$accessKey = $dsn->getUser() ?? ''; $accessKey = $dsn->getUser() ?? '';
$accessSecret = $dsn->getPassword() ?? ''; $accessSecret = $dsn->getPassword() ?? '';

View file

@ -92,8 +92,8 @@ if (!function_exists("getProjectDB")) {
$database = new Database($adapter, getCache()); $database = new Database($adapter, getCache());
if ($dsn->getHost() === System::getEnv('_APP_DATABASE_SHARED_TABLES', '')) { $sharedTablesKeys = explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
$database if (in_array($dsn->getHost(), $sharedTablesKeys)) { $database
->setSharedTables(true) ->setSharedTables(true)
->setTenant($project->getInternalId()) ->setTenant($project->getInternalId())
->setNamespace($dsn->getParam('namespace')); ->setNamespace($dsn->getParam('namespace'));
@ -135,6 +135,16 @@ if (!function_exists("getCache")) {
} }
} }
if (!function_exists("getPubSub")) {
function getPubSub(): \Redis
{
global $register;
$pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */
return $pools->get('pubsub')->pop()->getResource();
}
}
$realtime = new Realtime(); $realtime = new Realtime();
/** /**
@ -354,7 +364,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
} }
$start = time(); $start = time();
$redis = $register->get('pools')->get('pubsub')->pop()->getResource(); /** @var Redis $redis */ $redis = getPubSub(); /** @var \Redis $redis */
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1); $redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
if ($redis->ping(true)) { if ($redis->ping(true)) {

View file

@ -93,8 +93,8 @@ Server::setResource('dbForProject', function (Cache $cache, Registry $register,
$dsn = new DSN('mysql://' . $project->getAttribute('database')); $dsn = new DSN('mysql://' . $project->getAttribute('database'));
} }
if ($dsn->getHost() === System::getEnv('_APP_DATABASE_SHARED_TABLES', '')) { $sharedTablesKeys = explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
$database if (in_array($dsn->getHost(), $sharedTablesKeys)) { $database
->setSharedTables(true) ->setSharedTables(true)
->setTenant($project->getInternalId()) ->setTenant($project->getInternalId())
->setNamespace($dsn->getParam('namespace')); ->setNamespace($dsn->getParam('namespace'));
@ -126,8 +126,8 @@ Server::setResource('getProjectDB', function (Group $pools, Database $dbForConso
if (isset($databases[$dsn->getHost()])) { if (isset($databases[$dsn->getHost()])) {
$database = $databases[$dsn->getHost()]; $database = $databases[$dsn->getHost()];
if ($dsn->getHost() === System::getEnv('_APP_DATABASE_SHARED_TABLES', '')) { $sharedTablesKeys = explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
$database if (in_array($dsn->getHost(), $sharedTablesKeys)) { $database
->setSharedTables(true) ->setSharedTables(true)
->setTenant($project->getInternalId()) ->setTenant($project->getInternalId())
->setNamespace($dsn->getParam('namespace')); ->setNamespace($dsn->getParam('namespace'));
@ -150,8 +150,8 @@ Server::setResource('getProjectDB', function (Group $pools, Database $dbForConso
$databases[$dsn->getHost()] = $database; $databases[$dsn->getHost()] = $database;
if ($dsn->getHost() === System::getEnv('_APP_DATABASE_SHARED_TABLES', '')) { $sharedTablesKeys = explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
$database if (in_array($dsn->getHost(), $sharedTablesKeys)) { $database
->setSharedTables(true) ->setSharedTables(true)
->setTenant($project->getInternalId()) ->setTenant($project->getInternalId())
->setNamespace($dsn->getParam('namespace')); ->setNamespace($dsn->getParam('namespace'));
@ -256,22 +256,27 @@ Server::setResource('pools', function (Registry $register) {
return $register->get('pools'); return $register->get('pools');
}, ['register']); }, ['register']);
Server::setResource('deviceForFunctions', function (Document $project) { Server::setResource('deviceForFunctions', function (Document $project, $connectionString) {
return getDevice(APP_STORAGE_FUNCTIONS . '/app-' . $project->getId()); return getDevice(APP_STORAGE_FUNCTIONS.'/app-'.$project->getId(), $connectionString);
}, ['project']); }, ['project', 'connectionString']);
Server::setResource('deviceForFiles', function (Document $project) { Server::setResource('deviceForFiles', function (Document $project, $connectionString) {
return getDevice(APP_STORAGE_UPLOADS . '/app-' . $project->getId()); return getDevice(APP_STORAGE_UPLOADS.'/app-'.$project->getId(), $connectionString);
}, ['project']); }, ['project', 'connectionString']);
Server::setResource('deviceForBuilds', function (Document $project) { Server::setResource('deviceForBuilds', function (Document $project, $connectionString) {
return getDevice(APP_STORAGE_BUILDS . '/app-' . $project->getId()); return getDevice(APP_STORAGE_BUILDS.'/app-'.$project->getId(), $connectionString);
}, ['project']); }, ['project', 'connectionString']);
Server::setResource('deviceForCache', function (Document $project) { Server::setResource('deviceForCache', function (Document $project, $connectionString) {
return getDevice(APP_STORAGE_CACHE . '/app-' . $project->getId()); return getDevice(APP_STORAGE_CACHE.'/app-'.$project->getId(), $connectionString);
}, ['project']); }, ['project', 'connectionString']);
Server::setResource('realtimeConnection',function ($pools) {
return function () use ($pools) {
return $pools->get('pubsub')->pop()->getResource();
};
}, ['pools']);
$pools = $register->get('pools'); $pools = $register->get('pools');
$platform = new Appwrite(); $platform = new Appwrite();

235
composer.lock generated
View file

@ -1130,20 +1130,20 @@
}, },
{ {
"name": "symfony/polyfill-mbstring", "name": "symfony/polyfill-mbstring",
"version": "v1.30.0", "version": "v1.31.0",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/symfony/polyfill-mbstring.git", "url": "https://github.com/symfony/polyfill-mbstring.git",
"reference": "fd22ab50000ef01661e2a31d850ebaa297f8e03c" "reference": "85181ba99b2345b0ef10ce42ecac37612d9fd341"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/fd22ab50000ef01661e2a31d850ebaa297f8e03c", "url": "https://api.github.com/repos/symfony/polyfill-mbstring/zipball/85181ba99b2345b0ef10ce42ecac37612d9fd341",
"reference": "fd22ab50000ef01661e2a31d850ebaa297f8e03c", "reference": "85181ba99b2345b0ef10ce42ecac37612d9fd341",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
"php": ">=7.1" "php": ">=7.2"
}, },
"provide": { "provide": {
"ext-mbstring": "*" "ext-mbstring": "*"
@ -1190,7 +1190,7 @@
"shim" "shim"
], ],
"support": { "support": {
"source": "https://github.com/symfony/polyfill-mbstring/tree/v1.30.0" "source": "https://github.com/symfony/polyfill-mbstring/tree/v1.31.0"
}, },
"funding": [ "funding": [
{ {
@ -1206,24 +1206,24 @@
"type": "tidelift" "type": "tidelift"
} }
], ],
"time": "2024-06-19T12:30:46+00:00" "time": "2024-09-09T11:45:10+00:00"
}, },
{ {
"name": "symfony/polyfill-php80", "name": "symfony/polyfill-php80",
"version": "v1.30.0", "version": "v1.31.0",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/symfony/polyfill-php80.git", "url": "https://github.com/symfony/polyfill-php80.git",
"reference": "77fa7995ac1b21ab60769b7323d600a991a90433" "reference": "60328e362d4c2c802a54fcbf04f9d3fb892b4cf8"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/symfony/polyfill-php80/zipball/77fa7995ac1b21ab60769b7323d600a991a90433", "url": "https://api.github.com/repos/symfony/polyfill-php80/zipball/60328e362d4c2c802a54fcbf04f9d3fb892b4cf8",
"reference": "77fa7995ac1b21ab60769b7323d600a991a90433", "reference": "60328e362d4c2c802a54fcbf04f9d3fb892b4cf8",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
"php": ">=7.1" "php": ">=7.2"
}, },
"type": "library", "type": "library",
"extra": { "extra": {
@ -1270,7 +1270,7 @@
"shim" "shim"
], ],
"support": { "support": {
"source": "https://github.com/symfony/polyfill-php80/tree/v1.30.0" "source": "https://github.com/symfony/polyfill-php80/tree/v1.31.0"
}, },
"funding": [ "funding": [
{ {
@ -1286,7 +1286,7 @@
"type": "tidelift" "type": "tidelift"
} }
], ],
"time": "2024-05-31T15:07:36+00:00" "time": "2024-09-09T11:45:10+00:00"
}, },
{ {
"name": "thecodingmachine/safe", "name": "thecodingmachine/safe",
@ -2993,16 +2993,16 @@
"packages-dev": [ "packages-dev": [
{ {
"name": "appwrite/sdk-generator", "name": "appwrite/sdk-generator",
"version": "0.39.19", "version": "0.39.21",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/appwrite/sdk-generator.git", "url": "https://github.com/appwrite/sdk-generator.git",
"reference": "d5653a2f744d2c297d44f99ff68bfc26c1a3b804" "reference": "9754b190d33aaad56fdb8defc94f90248184c5ac"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/appwrite/sdk-generator/zipball/d5653a2f744d2c297d44f99ff68bfc26c1a3b804", "url": "https://api.github.com/repos/appwrite/sdk-generator/zipball/9754b190d33aaad56fdb8defc94f90248184c5ac",
"reference": "d5653a2f744d2c297d44f99ff68bfc26c1a3b804", "reference": "9754b190d33aaad56fdb8defc94f90248184c5ac",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
@ -3011,12 +3011,12 @@
"ext-mbstring": "*", "ext-mbstring": "*",
"matthiasmullie/minify": "1.3.*", "matthiasmullie/minify": "1.3.*",
"php": ">=8.0", "php": ">=8.0",
"twig/twig": "v3.8.*" "twig/twig": "3.14.*"
}, },
"require-dev": { "require-dev": {
"brianium/paratest": "v7.4.*", "brianium/paratest": "7.*",
"phpunit/phpunit": "10.5.*", "phpunit/phpunit": "11.*",
"squizlabs/php_codesniffer": "3.9.*" "squizlabs/php_codesniffer": "3.*"
}, },
"type": "library", "type": "library",
"autoload": { "autoload": {
@ -3038,22 +3038,22 @@
"description": "Appwrite PHP library for generating API SDKs for multiple programming languages and platforms", "description": "Appwrite PHP library for generating API SDKs for multiple programming languages and platforms",
"support": { "support": {
"issues": "https://github.com/appwrite/sdk-generator/issues", "issues": "https://github.com/appwrite/sdk-generator/issues",
"source": "https://github.com/appwrite/sdk-generator/tree/0.39.19" "source": "https://github.com/appwrite/sdk-generator/tree/0.39.21"
}, },
"time": "2024-08-30T12:04:18+00:00" "time": "2024-09-10T08:49:29+00:00"
}, },
{ {
"name": "doctrine/annotations", "name": "doctrine/annotations",
"version": "2.0.1", "version": "2.0.2",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/doctrine/annotations.git", "url": "https://github.com/doctrine/annotations.git",
"reference": "e157ef3f3124bbf6fe7ce0ffd109e8a8ef284e7f" "reference": "901c2ee5d26eb64ff43c47976e114bf00843acf7"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/doctrine/annotations/zipball/e157ef3f3124bbf6fe7ce0ffd109e8a8ef284e7f", "url": "https://api.github.com/repos/doctrine/annotations/zipball/901c2ee5d26eb64ff43c47976e114bf00843acf7",
"reference": "e157ef3f3124bbf6fe7ce0ffd109e8a8ef284e7f", "reference": "901c2ee5d26eb64ff43c47976e114bf00843acf7",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
@ -3065,10 +3065,10 @@
"require-dev": { "require-dev": {
"doctrine/cache": "^2.0", "doctrine/cache": "^2.0",
"doctrine/coding-standard": "^10", "doctrine/coding-standard": "^10",
"phpstan/phpstan": "^1.8.0", "phpstan/phpstan": "^1.10.28",
"phpunit/phpunit": "^7.5 || ^8.5 || ^9.5", "phpunit/phpunit": "^7.5 || ^8.5 || ^9.5",
"symfony/cache": "^5.4 || ^6", "symfony/cache": "^5.4 || ^6.4 || ^7",
"vimeo/psalm": "^4.10" "vimeo/psalm": "^4.30 || ^5.14"
}, },
"suggest": { "suggest": {
"php": "PHP 8.0 or higher comes with attributes, a native replacement for annotations" "php": "PHP 8.0 or higher comes with attributes, a native replacement for annotations"
@ -3114,9 +3114,9 @@
], ],
"support": { "support": {
"issues": "https://github.com/doctrine/annotations/issues", "issues": "https://github.com/doctrine/annotations/issues",
"source": "https://github.com/doctrine/annotations/tree/2.0.1" "source": "https://github.com/doctrine/annotations/tree/2.0.2"
}, },
"time": "2023-02-02T22:02:53+00:00" "time": "2024-09-05T10:17:24+00:00"
}, },
{ {
"name": "doctrine/deprecations", "name": "doctrine/deprecations",
@ -4185,16 +4185,16 @@
}, },
{ {
"name": "phpstan/phpdoc-parser", "name": "phpstan/phpdoc-parser",
"version": "1.30.0", "version": "1.30.1",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/phpstan/phpdoc-parser.git", "url": "https://github.com/phpstan/phpdoc-parser.git",
"reference": "5ceb0e384997db59f38774bf79c2a6134252c08f" "reference": "51b95ec8670af41009e2b2b56873bad96682413e"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/phpstan/phpdoc-parser/zipball/5ceb0e384997db59f38774bf79c2a6134252c08f", "url": "https://api.github.com/repos/phpstan/phpdoc-parser/zipball/51b95ec8670af41009e2b2b56873bad96682413e",
"reference": "5ceb0e384997db59f38774bf79c2a6134252c08f", "reference": "51b95ec8670af41009e2b2b56873bad96682413e",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
@ -4226,9 +4226,9 @@
"description": "PHPDoc parser with support for nullable, intersection and generic types", "description": "PHPDoc parser with support for nullable, intersection and generic types",
"support": { "support": {
"issues": "https://github.com/phpstan/phpdoc-parser/issues", "issues": "https://github.com/phpstan/phpdoc-parser/issues",
"source": "https://github.com/phpstan/phpdoc-parser/tree/1.30.0" "source": "https://github.com/phpstan/phpdoc-parser/tree/1.30.1"
}, },
"time": "2024-08-29T09:54:52+00:00" "time": "2024-09-07T20:13:05+00:00"
}, },
{ {
"name": "phpunit/php-code-coverage", "name": "phpunit/php-code-coverage",
@ -4756,16 +4756,16 @@
}, },
{ {
"name": "psr/log", "name": "psr/log",
"version": "3.0.1", "version": "3.0.2",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/php-fig/log.git", "url": "https://github.com/php-fig/log.git",
"reference": "79dff0b268932c640297f5208d6298f71855c03e" "reference": "f16e1d5863e37f8d8c2a01719f5b34baa2b714d3"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/php-fig/log/zipball/79dff0b268932c640297f5208d6298f71855c03e", "url": "https://api.github.com/repos/php-fig/log/zipball/f16e1d5863e37f8d8c2a01719f5b34baa2b714d3",
"reference": "79dff0b268932c640297f5208d6298f71855c03e", "reference": "f16e1d5863e37f8d8c2a01719f5b34baa2b714d3",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
@ -4800,9 +4800,9 @@
"psr-3" "psr-3"
], ],
"support": { "support": {
"source": "https://github.com/php-fig/log/tree/3.0.1" "source": "https://github.com/php-fig/log/tree/3.0.2"
}, },
"time": "2024-08-21T13:31:24+00:00" "time": "2024-09-11T13:17:53+00:00"
}, },
{ {
"name": "sebastian/cli-parser", "name": "sebastian/cli-parser",
@ -6222,20 +6222,20 @@
}, },
{ {
"name": "symfony/polyfill-ctype", "name": "symfony/polyfill-ctype",
"version": "v1.30.0", "version": "v1.31.0",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/symfony/polyfill-ctype.git", "url": "https://github.com/symfony/polyfill-ctype.git",
"reference": "0424dff1c58f028c451efff2045f5d92410bd540" "reference": "a3cc8b044a6ea513310cbd48ef7333b384945638"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/symfony/polyfill-ctype/zipball/0424dff1c58f028c451efff2045f5d92410bd540", "url": "https://api.github.com/repos/symfony/polyfill-ctype/zipball/a3cc8b044a6ea513310cbd48ef7333b384945638",
"reference": "0424dff1c58f028c451efff2045f5d92410bd540", "reference": "a3cc8b044a6ea513310cbd48ef7333b384945638",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
"php": ">=7.1" "php": ">=7.2"
}, },
"provide": { "provide": {
"ext-ctype": "*" "ext-ctype": "*"
@ -6281,7 +6281,7 @@
"portable" "portable"
], ],
"support": { "support": {
"source": "https://github.com/symfony/polyfill-ctype/tree/v1.30.0" "source": "https://github.com/symfony/polyfill-ctype/tree/v1.31.0"
}, },
"funding": [ "funding": [
{ {
@ -6297,24 +6297,24 @@
"type": "tidelift" "type": "tidelift"
} }
], ],
"time": "2024-05-31T15:07:36+00:00" "time": "2024-09-09T11:45:10+00:00"
}, },
{ {
"name": "symfony/polyfill-intl-grapheme", "name": "symfony/polyfill-intl-grapheme",
"version": "v1.30.0", "version": "v1.31.0",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/symfony/polyfill-intl-grapheme.git", "url": "https://github.com/symfony/polyfill-intl-grapheme.git",
"reference": "64647a7c30b2283f5d49b874d84a18fc22054b7a" "reference": "b9123926e3b7bc2f98c02ad54f6a4b02b91a8abe"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/symfony/polyfill-intl-grapheme/zipball/64647a7c30b2283f5d49b874d84a18fc22054b7a", "url": "https://api.github.com/repos/symfony/polyfill-intl-grapheme/zipball/b9123926e3b7bc2f98c02ad54f6a4b02b91a8abe",
"reference": "64647a7c30b2283f5d49b874d84a18fc22054b7a", "reference": "b9123926e3b7bc2f98c02ad54f6a4b02b91a8abe",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
"php": ">=7.1" "php": ">=7.2"
}, },
"suggest": { "suggest": {
"ext-intl": "For best performance" "ext-intl": "For best performance"
@ -6359,7 +6359,7 @@
"shim" "shim"
], ],
"support": { "support": {
"source": "https://github.com/symfony/polyfill-intl-grapheme/tree/v1.30.0" "source": "https://github.com/symfony/polyfill-intl-grapheme/tree/v1.31.0"
}, },
"funding": [ "funding": [
{ {
@ -6375,24 +6375,24 @@
"type": "tidelift" "type": "tidelift"
} }
], ],
"time": "2024-05-31T15:07:36+00:00" "time": "2024-09-09T11:45:10+00:00"
}, },
{ {
"name": "symfony/polyfill-intl-normalizer", "name": "symfony/polyfill-intl-normalizer",
"version": "v1.30.0", "version": "v1.31.0",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/symfony/polyfill-intl-normalizer.git", "url": "https://github.com/symfony/polyfill-intl-normalizer.git",
"reference": "a95281b0be0d9ab48050ebd988b967875cdb9fdb" "reference": "3833d7255cc303546435cb650316bff708a1c75c"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/symfony/polyfill-intl-normalizer/zipball/a95281b0be0d9ab48050ebd988b967875cdb9fdb", "url": "https://api.github.com/repos/symfony/polyfill-intl-normalizer/zipball/3833d7255cc303546435cb650316bff708a1c75c",
"reference": "a95281b0be0d9ab48050ebd988b967875cdb9fdb", "reference": "3833d7255cc303546435cb650316bff708a1c75c",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
"php": ">=7.1" "php": ">=7.2"
}, },
"suggest": { "suggest": {
"ext-intl": "For best performance" "ext-intl": "For best performance"
@ -6440,7 +6440,7 @@
"shim" "shim"
], ],
"support": { "support": {
"source": "https://github.com/symfony/polyfill-intl-normalizer/tree/v1.30.0" "source": "https://github.com/symfony/polyfill-intl-normalizer/tree/v1.31.0"
}, },
"funding": [ "funding": [
{ {
@ -6456,7 +6456,83 @@
"type": "tidelift" "type": "tidelift"
} }
], ],
"time": "2024-05-31T15:07:36+00:00" "time": "2024-09-09T11:45:10+00:00"
},
{
"name": "symfony/polyfill-php81",
"version": "v1.31.0",
"source": {
"type": "git",
"url": "https://github.com/symfony/polyfill-php81.git",
"reference": "4a4cfc2d253c21a5ad0e53071df248ed48c6ce5c"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/symfony/polyfill-php81/zipball/4a4cfc2d253c21a5ad0e53071df248ed48c6ce5c",
"reference": "4a4cfc2d253c21a5ad0e53071df248ed48c6ce5c",
"shasum": ""
},
"require": {
"php": ">=7.2"
},
"type": "library",
"extra": {
"thanks": {
"name": "symfony/polyfill",
"url": "https://github.com/symfony/polyfill"
}
},
"autoload": {
"files": [
"bootstrap.php"
],
"psr-4": {
"Symfony\\Polyfill\\Php81\\": ""
},
"classmap": [
"Resources/stubs"
]
},
"notification-url": "https://packagist.org/downloads/",
"license": [
"MIT"
],
"authors": [
{
"name": "Nicolas Grekas",
"email": "p@tchwork.com"
},
{
"name": "Symfony Community",
"homepage": "https://symfony.com/contributors"
}
],
"description": "Symfony polyfill backporting some PHP 8.1+ features to lower PHP versions",
"homepage": "https://symfony.com",
"keywords": [
"compatibility",
"polyfill",
"portable",
"shim"
],
"support": {
"source": "https://github.com/symfony/polyfill-php81/tree/v1.31.0"
},
"funding": [
{
"url": "https://symfony.com/sponsor",
"type": "custom"
},
{
"url": "https://github.com/fabpot",
"type": "github"
},
{
"url": "https://tidelift.com/funding/github/packagist/symfony/symfony",
"type": "tidelift"
}
],
"time": "2024-09-09T11:45:10+00:00"
}, },
{ {
"name": "symfony/process", "name": "symfony/process",
@ -6790,30 +6866,37 @@
}, },
{ {
"name": "twig/twig", "name": "twig/twig",
"version": "v3.8.0", "version": "v3.14.0",
"source": { "source": {
"type": "git", "type": "git",
"url": "https://github.com/twigphp/Twig.git", "url": "https://github.com/twigphp/Twig.git",
"reference": "9d15f0ac07f44dc4217883ec6ae02fd555c6f71d" "reference": "126b2c97818dbff0cdf3fbfc881aedb3d40aae72"
}, },
"dist": { "dist": {
"type": "zip", "type": "zip",
"url": "https://api.github.com/repos/twigphp/Twig/zipball/9d15f0ac07f44dc4217883ec6ae02fd555c6f71d", "url": "https://api.github.com/repos/twigphp/Twig/zipball/126b2c97818dbff0cdf3fbfc881aedb3d40aae72",
"reference": "9d15f0ac07f44dc4217883ec6ae02fd555c6f71d", "reference": "126b2c97818dbff0cdf3fbfc881aedb3d40aae72",
"shasum": "" "shasum": ""
}, },
"require": { "require": {
"php": ">=7.2.5", "php": ">=8.0.2",
"symfony/deprecation-contracts": "^2.5|^3",
"symfony/polyfill-ctype": "^1.8", "symfony/polyfill-ctype": "^1.8",
"symfony/polyfill-mbstring": "^1.3", "symfony/polyfill-mbstring": "^1.3",
"symfony/polyfill-php80": "^1.22" "symfony/polyfill-php81": "^1.29"
}, },
"require-dev": { "require-dev": {
"psr/container": "^1.0|^2.0", "psr/container": "^1.0|^2.0",
"symfony/phpunit-bridge": "^5.4.9|^6.3|^7.0" "symfony/phpunit-bridge": "^5.4.9|^6.4|^7.0"
}, },
"type": "library", "type": "library",
"autoload": { "autoload": {
"files": [
"src/Resources/core.php",
"src/Resources/debug.php",
"src/Resources/escaper.php",
"src/Resources/string_loader.php"
],
"psr-4": { "psr-4": {
"Twig\\": "src/" "Twig\\": "src/"
} }
@ -6846,7 +6929,7 @@
], ],
"support": { "support": {
"issues": "https://github.com/twigphp/Twig/issues", "issues": "https://github.com/twigphp/Twig/issues",
"source": "https://github.com/twigphp/Twig/tree/v3.8.0" "source": "https://github.com/twigphp/Twig/tree/v3.14.0"
}, },
"funding": [ "funding": [
{ {
@ -6858,7 +6941,7 @@
"type": "tidelift" "type": "tidelift"
} }
], ],
"time": "2023-11-21T18:54:41+00:00" "time": "2024-09-09T17:55:12+00:00"
}, },
{ {
"name": "webmozart/glob", "name": "webmozart/glob",
@ -6936,5 +7019,5 @@
"platform-overrides": { "platform-overrides": {
"php": "8.3" "php": "8.3"
}, },
"plugin-api-version": "2.6.0" "plugin-api-version": "2.2.0"
} }

View file

@ -115,6 +115,7 @@ class Build extends Event
$client = new Client($this->queue, $this->connection); $client = new Client($this->queue, $this->connection);
return $client->enqueue([ return $client->enqueue([
'sourceRegion' => System::getEnv('_APP_REGION', 'default'),
'project' => $this->project, 'project' => $this->project,
'resource' => $this->resource, 'resource' => $this->resource,
'deployment' => $this->deployment, 'deployment' => $this->deployment,

View file

@ -77,6 +77,7 @@ class Certificate extends Event
$client = new Client($this->queue, $this->connection); $client = new Client($this->queue, $this->connection);
return $client->enqueue([ return $client->enqueue([
'sourceRegion' => System::getEnv('_APP_REGION', 'default'),
'project' => $this->project, 'project' => $this->project,
'domain' => $this->domain, 'domain' => $this->domain,
'skipRenewCheck' => $this->skipRenewCheck 'skipRenewCheck' => $this->skipRenewCheck

View file

@ -121,6 +121,7 @@ class Database extends Event
try { try {
$result = $client->enqueue([ $result = $client->enqueue([
'sourceRegion' => System::getEnv('_APP_REGION', 'default'),
'project' => $this->project, 'project' => $this->project,
'user' => $this->user, 'user' => $this->user,
'type' => $this->type, 'type' => $this->type,

View file

@ -6,6 +6,7 @@ use InvalidArgumentException;
use Utopia\Database\Document; use Utopia\Database\Document;
use Utopia\Queue\Client; use Utopia\Queue\Client;
use Utopia\Queue\Connection; use Utopia\Queue\Connection;
use Utopia\System\System;
class Event class Event
{ {
@ -109,6 +110,11 @@ class Event
return $this->event; return $this->event;
} }
public function getSourceRegion(): string
{
return System::getEnv('_APP_REGION', 'default');
}
/** /**
* Set project for this event. * Set project for this event.
* *
@ -301,6 +307,7 @@ class Event
$client = new Client($this->queue, $this->connection); $client = new Client($this->queue, $this->connection);
return $client->enqueue([ return $client->enqueue([
'sourceRegion' => $this->getSourceRegion(),
'project' => $this->project, 'project' => $this->project,
'user' => $this->user, 'user' => $this->user,
'payload' => $this->payload, 'payload' => $this->payload,

View file

@ -79,6 +79,8 @@ class Migration extends Event
$client = new Client($this->queue, $this->connection); $client = new Client($this->queue, $this->connection);
return $client->enqueue([ return $client->enqueue([
'sourceRegion' => $this->getSourceRegion(),
'project' => $this->project, 'project' => $this->project,
'user' => $this->user, 'user' => $this->user,
'migration' => $this->migration 'migration' => $this->migration

View file

@ -5,6 +5,8 @@ namespace Appwrite\Messaging;
abstract class Adapter abstract class Adapter
{ {
abstract public function subscribe(string $projectId, mixed $identifier, array $roles, array $channels): void; abstract public function subscribe(string $projectId, mixed $identifier, array $roles, array $channels): void;
abstract public function unsubscribe(mixed $identifier): void; abstract public function unsubscribe(mixed $identifier): void;
abstract public static function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options): void;
abstract public static function send(\redis $redis, string $projectId, array $payload, array $events, array $channels, array $roles, array $options): void;
} }

View file

@ -122,15 +122,17 @@ class Realtime extends Adapter
/** /**
* Sends an event to the Realtime Server * Sends an event to the Realtime Server
* @param \Redis $redis
* @param string $projectId * @param string $projectId
* @param array $payload * @param array $payload
* @param string $event * @param array $events
* @param array $channels * @param array $channels
* @param array $roles * @param array $roles
* @param array $options * @param array $options
* @return void * @return void
* @throws \RedisException
*/ */
public static function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options = []): void public static function send(\Redis $redis, string $projectId, array $payload, array $events, array $channels, array $roles, array $options = []): void
{ {
if (empty($channels) || empty($roles) || empty($projectId)) { if (empty($channels) || empty($roles) || empty($projectId)) {
return; return;
@ -139,8 +141,6 @@ class Realtime extends Adapter
$permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged']; $permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged'];
$userId = array_key_exists('userId', $options) ? $options['userId'] : null; $userId = array_key_exists('userId', $options) ? $options['userId'] : null;
$redis = new \Redis(); //TODO: make this part of the constructor
$redis->connect(System::getEnv('_APP_REDIS_HOST', ''), System::getEnv('_APP_REDIS_PORT', ''));
$redis->publish('realtime', json_encode([ $redis->publish('realtime', json_encode([
'project' => $projectId, 'project' => $projectId,
'roles' => $roles, 'roles' => $roles,

View file

@ -33,6 +33,11 @@ use Utopia\VCS\Adapter\Git\GitHub;
class Builds extends Action class Builds extends Action
{ {
/**
* @var mixed|string
*/
protected string $sourceRegion;
public static function getName(): string public static function getName(): string
{ {
return 'builds'; return 'builds';
@ -54,7 +59,8 @@ class Builds extends Action
->inject('dbForProject') ->inject('dbForProject')
->inject('deviceForFunctions') ->inject('deviceForFunctions')
->inject('log') ->inject('log')
->callback(fn ($message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log) => $this->action($message, $dbForConsole, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $deviceForFunctions, $log)); ->inject('realtimeConnection')
->callback(fn ($message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log, Callable $realtimeConnection) => $this->action($message, $dbForConsole, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $deviceForFunctions, $log, $realtimeConnection));
} }
/** /**
@ -67,10 +73,11 @@ class Builds extends Action
* @param Database $dbForProject * @param Database $dbForProject
* @param Device $deviceForFunctions * @param Device $deviceForFunctions
* @param Log $log * @param Log $log
* @param callable $realtimeConnection
* @return void * @return void
* @throws \Utopia\Database\Exception * @throws \Utopia\Database\Exception
*/ */
public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log): void public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log, Callable $realtimeConnection): void
{ {
$payload = $message->getPayload() ?? []; $payload = $message->getPayload() ?? [];
@ -84,6 +91,7 @@ class Builds extends Action
$deployment = new Document($payload['deployment'] ?? []); $deployment = new Document($payload['deployment'] ?? []);
$template = new Document($payload['template'] ?? []); $template = new Document($payload['template'] ?? []);
$this->sourceRegion = $payload['sourceRegion'] ?? 'default';
$log->addTag('projectId', $project->getId()); $log->addTag('projectId', $project->getId());
$log->addTag('type', $type); $log->addTag('type', $type);
@ -92,7 +100,7 @@ class Builds extends Action
case BUILD_TYPE_RETRY: case BUILD_TYPE_RETRY:
Console::info('Creating build for deployment: ' . $deployment->getId()); Console::info('Creating build for deployment: ' . $deployment->getId());
$github = new GitHub($cache); $github = new GitHub($cache);
$this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForEvents, $queueForUsage, $dbForConsole, $dbForProject, $github, $project, $resource, $deployment, $template, $log); $this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForEvents, $queueForUsage, $dbForConsole, $dbForProject, $github, $project, $resource, $deployment, $template, $log, $realtimeConnection);
break; break;
default: default:
@ -117,7 +125,7 @@ class Builds extends Action
* @throws \Utopia\Database\Exception * @throws \Utopia\Database\Exception
* @throws Exception * @throws Exception
*/ */
protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Database $dbForConsole, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Database $dbForConsole, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log, Callable $realtimeConnection): void
{ {
$executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST')); $executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST'));
@ -376,6 +384,7 @@ class Builds extends Action
project: $project project: $project
); );
Realtime::send( Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: 'console', projectId: 'console',
payload: $build->getArrayCopy(), payload: $build->getArrayCopy(),
events: $allEvents, events: $allEvents,
@ -454,6 +463,7 @@ class Builds extends Action
); );
Realtime::send( Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: 'console', projectId: 'console',
payload: $build->getArrayCopy(), payload: $build->getArrayCopy(),
events: $allEvents, events: $allEvents,
@ -552,12 +562,12 @@ class Builds extends Action
$err = $error; $err = $error;
} }
}), }),
Co\go(function () use ($executor, $project, $deployment, &$response, &$build, $dbForProject, $allEvents, &$err, &$isCanceled) { Co\go(function () use ($realtimeConnection, $executor, $project, $deployment, &$response, &$build, $dbForProject, $allEvents, &$err, &$isCanceled) {
try { try {
$executor->getLogs( $executor->getLogs(
deploymentId: $deployment->getId(), deploymentId: $deployment->getId(),
projectId: $project->getId(), projectId: $project->getId(),
callback: function ($logs) use (&$response, &$err, &$build, $dbForProject, $allEvents, $project, &$isCanceled) { callback: function ($logs) use ($realtimeConnection, &$response, &$err, &$build, $dbForProject, $allEvents, $project, &$isCanceled) {
if ($isCanceled) { if ($isCanceled) {
return; return;
} }
@ -591,6 +601,7 @@ class Builds extends Action
project: $project project: $project
); );
Realtime::send( Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: 'console', projectId: 'console',
payload: $build->getArrayCopy(), payload: $build->getArrayCopy(),
events: $allEvents, events: $allEvents,
@ -693,6 +704,7 @@ class Builds extends Action
project: $project project: $project
); );
Realtime::send( Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: 'console', projectId: 'console',
payload: $build->getArrayCopy(), payload: $build->getArrayCopy(),
events: $allEvents, events: $allEvents,

View file

@ -30,6 +30,11 @@ use Utopia\System\System;
class Certificates extends Action class Certificates extends Action
{ {
/**
* @var mixed|string
*/
protected string $sourceRegion;
public static function getName(): string public static function getName(): string
{ {
return 'certificates'; return 'certificates';
@ -48,7 +53,8 @@ class Certificates extends Action
->inject('queueForEvents') ->inject('queueForEvents')
->inject('queueForFunctions') ->inject('queueForFunctions')
->inject('log') ->inject('log')
->callback(fn (Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log) => $this->action($message, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $log)); ->inject('realtimeConnection')
->callback(fn (Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, Callable $realtimeConnection) => $this->action($message, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $log, $realtimeConnection));
} }
/** /**
@ -58,11 +64,12 @@ class Certificates extends Action
* @param Event $queueForEvents * @param Event $queueForEvents
* @param Func $queueForFunctions * @param Func $queueForFunctions
* @param Log $log * @param Log $log
* @param callable $realtimeConnection
* @return void * @return void
* @throws Throwable * @throws Throwable
* @throws \Utopia\Database\Exception * @throws \Utopia\Database\Exception
*/ */
public function action(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log): void public function action(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, Callable $realtimeConnection): void
{ {
$payload = $message->getPayload() ?? []; $payload = $message->getPayload() ?? [];
@ -74,9 +81,9 @@ class Certificates extends Action
$domain = new Domain($document->getAttribute('domain', '')); $domain = new Domain($document->getAttribute('domain', ''));
$skipRenewCheck = $payload['skipRenewCheck'] ?? false; $skipRenewCheck = $payload['skipRenewCheck'] ?? false;
$this->sourceRegion = $payload['sourceRegion'] ?? 'default';
$log->addTag('domain', $domain->get()); $log->addTag('domain', $domain->get());
$this->execute($domain, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $log,$realtimeConnection, $skipRenewCheck);
$this->execute($domain, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $log, $skipRenewCheck);
} }
/** /**
@ -85,12 +92,17 @@ class Certificates extends Action
* @param Mail $queueForMails * @param Mail $queueForMails
* @param Event $queueForEvents * @param Event $queueForEvents
* @param Func $queueForFunctions * @param Func $queueForFunctions
* @param Log $log
* @param callable $realtimeConnection
* @param bool $skipRenewCheck * @param bool $skipRenewCheck
* @return void * @return void
* @throws Authorization
* @throws Conflict
* @throws Structure
* @throws Throwable * @throws Throwable
* @throws \Utopia\Database\Exception * @throws \Utopia\Database\Exception
*/ */
private function execute(Domain $domain, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, bool $skipRenewCheck = false): void protected function execute(Domain $domain, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, Callable $realtimeConnection, bool $skipRenewCheck = false): void
{ {
/** /**
* 1. Read arguments and validate domain * 1. Read arguments and validate domain
@ -193,7 +205,7 @@ class Certificates extends Action
$certificate->setAttribute('updated', DateTime::now()); $certificate->setAttribute('updated', DateTime::now());
// Save all changes we made to certificate document into database // Save all changes we made to certificate document into database
$this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForConsole, $queueForEvents, $queueForFunctions); $this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForConsole, $queueForEvents, $queueForFunctions, $realtimeConnection);
} }
} }
@ -212,7 +224,7 @@ class Certificates extends Action
* @throws Conflict * @throws Conflict
* @throws Structure * @throws Structure
*/ */
private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions): void protected function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Callable $realtimeConnection): void
{ {
// Check if update or insert required // Check if update or insert required
$certificateDocument = $dbForConsole->findOne('certificates', [Query::equal('domain', [$domain])]); $certificateDocument = $dbForConsole->findOne('certificates', [Query::equal('domain', [$domain])]);
@ -226,7 +238,7 @@ class Certificates extends Action
} }
$certificateId = $certificate->getId(); $certificateId = $certificate->getId();
$this->updateDomainDocuments($certificateId, $domain, $success, $dbForConsole, $queueForEvents, $queueForFunctions); $this->updateDomainDocuments($certificateId, $domain, $success, $dbForConsole, $queueForEvents, $queueForFunctions, $realtimeConnection);
} }
/** /**
@ -234,7 +246,7 @@ class Certificates extends Action
* *
* @return null|string Returns main domain. If null, there is no main domain yet. * @return null|string Returns main domain. If null, there is no main domain yet.
*/ */
private function getMainDomain(): ?string protected function getMainDomain(): ?string
{ {
$envDomain = System::getEnv('_APP_DOMAIN', ''); $envDomain = System::getEnv('_APP_DOMAIN', '');
if (!empty($envDomain) && $envDomain !== 'localhost') { if (!empty($envDomain) && $envDomain !== 'localhost') {
@ -255,7 +267,7 @@ class Certificates extends Action
* @return void * @return void
* @throws Exception * @throws Exception
*/ */
private function validateDomain(Domain $domain, bool $isMainDomain, Log $log): void protected function validateDomain(Domain $domain, bool $isMainDomain, Log $log): void
{ {
if (empty($domain->get())) { if (empty($domain->get())) {
throw new Exception('Missing certificate domain.'); throw new Exception('Missing certificate domain.');
@ -299,7 +311,7 @@ class Certificates extends Action
* @return bool True, if certificate needs to be renewed * @return bool True, if certificate needs to be renewed
* @throws Exception * @throws Exception
*/ */
private function isRenewRequired(string $domain, Log $log): bool protected function isRenewRequired(string $domain, Log $log): bool
{ {
$certPath = APP_STORAGE_CERTIFICATES . '/' . $domain . '/cert.pem'; $certPath = APP_STORAGE_CERTIFICATES . '/' . $domain . '/cert.pem';
if (\file_exists($certPath)) { if (\file_exists($certPath)) {
@ -333,7 +345,7 @@ class Certificates extends Action
* @return array Named array with keys 'stdout' and 'stderr', both string * @return array Named array with keys 'stdout' and 'stderr', both string
* @throws Exception * @throws Exception
*/ */
private function issueCertificate(string $folder, string $domain, string $email): array protected function issueCertificate(string $folder, string $domain, string $email): array
{ {
$stdout = ''; $stdout = '';
$stderr = ''; $stderr = '';
@ -363,7 +375,7 @@ class Certificates extends Action
* @return string * @return string
* @throws \Utopia\Database\Exception * @throws \Utopia\Database\Exception
*/ */
private function getRenewDate(string $domain): string protected function getRenewDate(string $domain): string
{ {
$certPath = APP_STORAGE_CERTIFICATES . '/' . $domain . '/cert.pem'; $certPath = APP_STORAGE_CERTIFICATES . '/' . $domain . '/cert.pem';
$certData = openssl_x509_parse(file_get_contents($certPath)); $certData = openssl_x509_parse(file_get_contents($certPath));
@ -381,7 +393,7 @@ class Certificates extends Action
* @return void * @return void
* @throws Exception * @throws Exception
*/ */
private function applyCertificateFiles(string $folder, string $domain, array $letsEncryptData): void protected function applyCertificateFiles(string $folder, string $domain, array $letsEncryptData): void
{ {
// Prepare folder in storage for domain // Prepare folder in storage for domain
@ -432,7 +444,7 @@ class Certificates extends Action
* @return void * @return void
* @throws Exception * @throws Exception
*/ */
private function notifyError(string $domain, string $errorMessage, int $attempt, Mail $queueForMails): void protected function notifyError(string $domain, string $errorMessage, int $attempt, Mail $queueForMails): void
{ {
// Log error into console // Log error into console
Console::warning('Cannot renew domain (' . $domain . ') on attempt no. ' . $attempt . ' certificate: ' . $errorMessage); Console::warning('Cannot renew domain (' . $domain . ') on attempt no. ' . $attempt . ' certificate: ' . $errorMessage);
@ -475,7 +487,7 @@ class Certificates extends Action
* *
* @return void * @return void
*/ */
private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions): void protected function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Callable $realtimeConnection): void
{ {
$rule = $dbForConsole->findOne('rules', [ $rule = $dbForConsole->findOne('rules', [
@ -525,6 +537,7 @@ class Certificates extends Action
project: $project project: $project
); );
Realtime::send( Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: 'console', projectId: 'console',
payload: $rule->getArrayCopy(), payload: $rule->getArrayCopy(),
events: $allEvents, events: $allEvents,
@ -532,6 +545,7 @@ class Certificates extends Action
roles: $target['roles'] roles: $target['roles']
); );
Realtime::send( Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: $project->getId(), projectId: $project->getId(),
payload: $rule->getArrayCopy(), payload: $rule->getArrayCopy(),
events: $allEvents, events: $allEvents,

View file

@ -21,6 +21,11 @@ use Utopia\Queue\Message;
class Databases extends Action class Databases extends Action
{ {
/**
* @var array|mixed
*/
protected string $sourceRegion;
public static function getName(): string public static function getName(): string
{ {
return 'databases'; return 'databases';
@ -37,7 +42,8 @@ class Databases extends Action
->inject('dbForConsole') ->inject('dbForConsole')
->inject('dbForProject') ->inject('dbForProject')
->inject('log') ->inject('log')
->callback(fn (Message $message, Database $dbForConsole, Database $dbForProject, Log $log) => $this->action($message, $dbForConsole, $dbForProject, $log)); ->inject('realtimeConnection')
->callback(fn (Message $message, Database $dbForConsole, Database $dbForProject, Log $log, callable $realtimeConnection) => $this->action($message, $dbForConsole, $dbForProject, $log, $realtimeConnection));
} }
/** /**
@ -45,10 +51,16 @@ class Databases extends Action
* @param Database $dbForConsole * @param Database $dbForConsole
* @param Database $dbForProject * @param Database $dbForProject
* @param Log $log * @param Log $log
* @param callable $realtimeConnection
* @return void * @return void
* @throws \Exception * @throws Authorization
* @throws Conflict
* @throws DatabaseException
* @throws Restricted
* @throws Structure
* @throws Exception
*/ */
public function action(Message $message, Database $dbForConsole, Database $dbForProject, Log $log): void public function action(Message $message, Database $dbForConsole, Database $dbForProject, Log $log, callable $realtimeConnection): void
{ {
$payload = $message->getPayload() ?? []; $payload = $message->getPayload() ?? [];
@ -62,6 +74,7 @@ class Databases extends Action
$document = new Document($payload['document'] ?? []); $document = new Document($payload['document'] ?? []);
$database = new Document($payload['database'] ?? []); $database = new Document($payload['database'] ?? []);
$this->sourceRegion = $payload['sourceRegion'] ?? 'default';
$log->addTag('projectId', $project->getId()); $log->addTag('projectId', $project->getId());
$log->addTag('type', $type); $log->addTag('type', $type);
@ -74,10 +87,10 @@ class Databases extends Action
match (\strval($type)) { match (\strval($type)) {
DATABASE_TYPE_DELETE_DATABASE => $this->deleteDatabase($database, $project, $dbForProject), DATABASE_TYPE_DELETE_DATABASE => $this->deleteDatabase($database, $project, $dbForProject),
DATABASE_TYPE_DELETE_COLLECTION => $this->deleteCollection($database, $collection, $project, $dbForProject), DATABASE_TYPE_DELETE_COLLECTION => $this->deleteCollection($database, $collection, $project, $dbForProject),
DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject), DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject, $realtimeConnection),
DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject), DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject, $realtimeConnection),
DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForConsole, $dbForProject), DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForConsole, $dbForProject, $realtimeConnection),
DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForConsole, $dbForProject), DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForConsole, $dbForProject, $realtimeConnection),
default => throw new \Exception('No database operation for type: ' . \strval($type)), default => throw new \Exception('No database operation for type: ' . \strval($type)),
}; };
} }
@ -89,12 +102,15 @@ class Databases extends Action
* @param Document $project * @param Document $project
* @param Database $dbForConsole * @param Database $dbForConsole
* @param Database $dbForProject * @param Database $dbForProject
* @param callable $realtimeConnection
* @return void * @return void
* @throws Authorization * @throws Authorization
* @throws Conflict * @throws Conflict
* @throws \Exception * @throws DatabaseException
* @throws Structure
* @throws Exception
*/ */
private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForConsole, Database $dbForProject): void private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForConsole, Database $dbForProject, callable $realtimeConnection): void
{ {
if ($collection->isEmpty()) { if ($collection->isEmpty()) {
throw new Exception('Missing collection'); throw new Exception('Missing collection');
@ -195,7 +211,7 @@ class Databases extends Action
); );
} }
} finally { } finally {
$this->trigger($database, $collection, $attribute, $project, $projectId, $events); $this->trigger($database, $collection, $attribute, $project, $projectId, $events, $realtimeConnection);
} }
if ($type === Database::VAR_RELATIONSHIP && $options['twoWay']) { if ($type === Database::VAR_RELATIONSHIP && $options['twoWay']) {
@ -212,12 +228,16 @@ class Databases extends Action
* @param Document $project * @param Document $project
* @param Database $dbForConsole * @param Database $dbForConsole
* @param Database $dbForProject * @param Database $dbForProject
* @param callable $realtimeConnection
* @return void * @return void
* @throws Authorization * @throws Authorization
* @throws Conflict * @throws Conflict
* @throws \Exception * @throws DatabaseException
**/ * @throws Restricted
private function deleteAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForConsole, Database $dbForProject): void * @throws Structure
* @throws Exception
*/
private function deleteAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForConsole, Database $dbForProject, callable $realtimeConnection): void
{ {
if ($collection->isEmpty()) { if ($collection->isEmpty()) {
throw new Exception('Missing collection'); throw new Exception('Missing collection');
@ -296,7 +316,7 @@ class Databases extends Action
); );
} }
} finally { } finally {
$this->trigger($database, $collection, $attribute, $project, $projectId, $events); $this->trigger($database, $collection, $attribute, $project, $projectId, $events, $realtimeConnection);
} }
// The underlying database removes/rebuilds indexes when attribute is removed // The underlying database removes/rebuilds indexes when attribute is removed
@ -366,13 +386,15 @@ class Databases extends Action
* @param Document $project * @param Document $project
* @param Database $dbForConsole * @param Database $dbForConsole
* @param Database $dbForProject * @param Database $dbForProject
* @param callable $realtimeConnection
* @return void * @return void
* @throws Authorization * @throws Authorization
* @throws Conflict * @throws Conflict
* @throws Structure
* @throws DatabaseException * @throws DatabaseException
* @throws Structure
* @throws Exception
*/ */
private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject): void private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject, callable $realtimeConnection): void
{ {
if ($collection->isEmpty()) { if ($collection->isEmpty()) {
throw new Exception('Missing collection'); throw new Exception('Missing collection');
@ -414,7 +436,7 @@ class Databases extends Action
$index->setAttribute('status', 'failed') $index->setAttribute('status', 'failed')
); );
} finally { } finally {
$this->trigger($database, $collection, $index, $project, $projectId, $events); $this->trigger($database, $collection, $index, $project, $projectId, $events, $realtimeConnection);
} }
$dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collectionId); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collectionId);
@ -427,13 +449,15 @@ class Databases extends Action
* @param Document $project * @param Document $project
* @param Database $dbForConsole * @param Database $dbForConsole
* @param Database $dbForProject * @param Database $dbForProject
* @param callable $realtimeConnection
* @return void * @return void
* @throws Authorization * @throws Authorization
* @throws Conflict * @throws Conflict
* @throws Structure
* @throws DatabaseException * @throws DatabaseException
* @throws Structure
* @throws Exception
*/ */
private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject): void private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject, callable $realtimeConnection): void
{ {
if ($collection->isEmpty()) { if ($collection->isEmpty()) {
throw new Exception('Missing collection'); throw new Exception('Missing collection');
@ -472,7 +496,7 @@ class Databases extends Action
$index->setAttribute('status', 'stuck') $index->setAttribute('status', 'stuck')
); );
} finally { } finally {
$this->trigger($database, $collection, $index, $project, $projectId, $events); $this->trigger($database, $collection, $index, $project, $projectId, $events, $realtimeConnection);
} }
$dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collection->getId()); $dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collection->getId());
@ -612,13 +636,18 @@ class Databases extends Action
Console::info("Deleted {$count} document by group in " . ($executionEnd - $executionStart) . " seconds"); Console::info("Deleted {$count} document by group in " . ($executionEnd - $executionStart) . " seconds");
} }
/**
* @throws \RedisException
* @throws Exception
*/
protected function trigger( protected function trigger(
Document $database, Document $database,
Document $collection, Document $collection,
Document $attribute, Document $attribute,
Document $project, Document $project,
string $projectId, string $projectId,
array $events array $events,
callable $realtimeConnection
): void { ): void {
$target = Realtime::fromPayload( $target = Realtime::fromPayload(
// Pass first, most verbose event pattern // Pass first, most verbose event pattern
@ -627,6 +656,7 @@ class Databases extends Action
project: $project, project: $project,
); );
Realtime::send( Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: 'console', projectId: 'console',
payload: $attribute->getArrayCopy(), payload: $attribute->getArrayCopy(),
events: $events, events: $events,

View file

@ -497,19 +497,20 @@ class Deletes extends Action
]; ];
$limit = \count($projectCollectionIds) + 25; $limit = \count($projectCollectionIds) + 25;
$sharedTablesKeys = explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
while (true) { while (true) {
$collections = $dbForProject->listCollections($limit); $collections = $dbForProject->listCollections($limit);
foreach ($collections as $collection) { foreach ($collections as $collection) {
if ($dsn->getHost() !== System::getEnv('_APP_DATABASE_SHARED_TABLES', '') || !\in_array($collection->getId(), $projectCollectionIds)) { if (! in_array($dsn->getHost(), $sharedTablesKeys) || !\in_array($collection->getId(), $projectCollectionIds)) {
$dbForProject->deleteCollection($collection->getId()); $dbForProject->deleteCollection($collection->getId()); $dbForProject->deleteCollection($collection->getId());
} else { } else {
$this->deleteByGroup($collection->getId(), [], database: $dbForProject); $this->deleteByGroup($collection->getId(), [], database: $dbForProject);
} }
} }
if ($dsn->getHost() === System::getEnv('_APP_DATABASE_SHARED_TABLES', '')) { if (in_array($dsn->getHost(), $sharedTablesKeys)) {
$collectionsIds = \array_map(fn ($collection) => $collection->getId(), $collections); $collectionsIds = \array_map(fn ($collection) => $collection->getId(), $collections);
if (empty(\array_diff($collectionsIds, $projectCollectionIds))) { if (empty(\array_diff($collectionsIds, $projectCollectionIds))) {
@ -558,7 +559,8 @@ class Deletes extends Action
], $dbForConsole); ], $dbForConsole);
// Delete metadata table // Delete metadata table
if ($dsn->getHost() !== System::getEnv('_APP_DATABASE_SHARED_TABLES', '')) { System::getEnv('_APP_DATABASE_SHARED_TABLES', '');
if (! in_array($dsn, $sharedTablesKeys)) {
$dbForProject->deleteCollection('_metadata'); $dbForProject->deleteCollection('_metadata');
} else { } else {
$this->deleteByGroup('_metadata', [], $dbForProject); $this->deleteByGroup('_metadata', [], $dbForProject);

View file

@ -28,6 +28,11 @@ use Utopia\System\System;
class Functions extends Action class Functions extends Action
{ {
/**
* @var mixed|string
*/
protected string $sourceRegion;
public static function getName(): string public static function getName(): string
{ {
return 'functions'; return 'functions';
@ -47,7 +52,8 @@ class Functions extends Action
->inject('queueForEvents') ->inject('queueForEvents')
->inject('queueForUsage') ->inject('queueForUsage')
->inject('log') ->inject('log')
->callback(fn (Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log) => $this->action($message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage, $log)); ->inject('realtimeConnection')
->callback(fn (Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, Callable $realtimeConnection) => $this->action($message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage, $log, $realtimeConnection));
} }
/** /**
@ -63,7 +69,7 @@ class Functions extends Action
* @throws \Utopia\Database\Exception * @throws \Utopia\Database\Exception
* @throws Conflict * @throws Conflict
*/ */
public function action(Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log): void public function action(Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, Callable $realtimeConnection): void
{ {
$payload = $message->getPayload() ?? []; $payload = $message->getPayload() ?? [];
@ -93,6 +99,9 @@ class Functions extends Action
return; return;
} }
$this->sourceRegion = $payload['sourceRegion'] ?? 'default';
if ($function->isEmpty() && !empty($functionId)) { if ($function->isEmpty() && !empty($functionId)) {
$function = $dbForProject->getDocument('functions', $functionId); $function = $dbForProject->getDocument('functions', $functionId);
} }
@ -125,6 +134,7 @@ class Functions extends Action
Console::success('Iterating function: ' . $function->getAttribute('name')); Console::success('Iterating function: ' . $function->getAttribute('name'));
$this->execute( $this->execute(
realtimeConnection: $realtimeConnection,
log: $log, log: $log,
dbForProject: $dbForProject, dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions, queueForFunctions: $queueForFunctions,
@ -161,6 +171,7 @@ class Functions extends Action
$execution = new Document($payload['execution'] ?? []); $execution = new Document($payload['execution'] ?? []);
$user = new Document($payload['user'] ?? []); $user = new Document($payload['user'] ?? []);
$this->execute( $this->execute(
realtimeConnection: $realtimeConnection,
log: $log, log: $log,
dbForProject: $dbForProject, dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions, queueForFunctions: $queueForFunctions,
@ -183,6 +194,7 @@ class Functions extends Action
case 'schedule': case 'schedule':
$execution = new Document($payload['execution'] ?? []); $execution = new Document($payload['execution'] ?? []);
$this->execute( $this->execute(
realtimeConnection: $realtimeConnection,
log: $log, log: $log,
dbForProject: $dbForProject, dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions, queueForFunctions: $queueForFunctions,
@ -292,6 +304,7 @@ class Functions extends Action
* @throws Conflict * @throws Conflict
*/ */
private function execute( private function execute(
Callable $realtimeConnection,
Log $log, Log $log,
Database $dbForProject, Database $dbForProject,
Func $queueForFunctions, Func $queueForFunctions,
@ -583,6 +596,7 @@ class Functions extends Action
payload: $execution payload: $execution
); );
Realtime::send( Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: 'console', projectId: 'console',
payload: $execution->getArrayCopy(), payload: $execution->getArrayCopy(),
events: $allEvents, events: $allEvents,
@ -590,6 +604,7 @@ class Functions extends Action
roles: $target['roles'] roles: $target['roles']
); );
Realtime::send( Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: $project->getId(), projectId: $project->getId(),
payload: $execution->getArrayCopy(), payload: $execution->getArrayCopy(),
events: $allEvents, events: $allEvents,

View file

@ -32,6 +32,10 @@ class Migrations extends Action
{ {
private ?Database $dbForProject = null; private ?Database $dbForProject = null;
private ?Database $dbForConsole = null; private ?Database $dbForConsole = null;
/**
* @var string
*/
protected string $sourceRegion;
public static function getName(): string public static function getName(): string
{ {
@ -49,7 +53,8 @@ class Migrations extends Action
->inject('dbForProject') ->inject('dbForProject')
->inject('dbForConsole') ->inject('dbForConsole')
->inject('log') ->inject('log')
->callback(fn (Message $message, Database $dbForProject, Database $dbForConsole, Log $log) => $this->action($message, $dbForProject, $dbForConsole, $log)); ->inject('realtimeConnection')
->callback(fn (Message $message, Database $dbForProject, Database $dbForConsole, Log $log, Callable $realtimeConnection) => $this->action($message, $dbForProject, $dbForConsole, $log, $realtimeConnection));
} }
/** /**
@ -60,7 +65,7 @@ class Migrations extends Action
* @return void * @return void
* @throws Exception * @throws Exception
*/ */
public function action(Message $message, Database $dbForProject, Database $dbForConsole, Log $log): void public function action(Message $message, Database $dbForProject, Database $dbForConsole, Log $log, Callable $realtimeConnection): void
{ {
$payload = $message->getPayload() ?? []; $payload = $message->getPayload() ?? [];
@ -76,6 +81,7 @@ class Migrations extends Action
return; return;
} }
$this->sourceRegion = $payload['sourceRegion'] ?? 'default';
$this->dbForProject = $dbForProject; $this->dbForProject = $dbForProject;
$this->dbForConsole = $dbForConsole; $this->dbForConsole = $dbForConsole;
@ -89,7 +95,7 @@ class Migrations extends Action
$log->addTag('migrationId', $migration->getId()); $log->addTag('migrationId', $migration->getId());
$log->addTag('projectId', $project->getId()); $log->addTag('projectId', $project->getId());
$this->processMigration($project, $migration, $log); $this->processMigration($project, $migration, $log, $realtimeConnection);
} }
/** /**
@ -134,7 +140,7 @@ class Migrations extends Action
* @throws \Utopia\Database\Exception * @throws \Utopia\Database\Exception
* @throws Exception * @throws Exception
*/ */
protected function updateMigrationDocument(Document $migration, Document $project): Document protected function updateMigrationDocument(Document $migration, Document $project, Callable $realtimeConnection): Document
{ {
/** Trigger Realtime */ /** Trigger Realtime */
$allEvents = Event::generateEvents('migrations.[migrationId].update', [ $allEvents = Event::generateEvents('migrations.[migrationId].update', [
@ -148,6 +154,7 @@ class Migrations extends Action
); );
Realtime::send( Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: 'console', projectId: 'console',
payload: $migration->getArrayCopy(), payload: $migration->getArrayCopy(),
events: $allEvents, events: $allEvents,
@ -156,6 +163,7 @@ class Migrations extends Action
); );
Realtime::send( Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: $project->getId(), projectId: $project->getId(),
payload: $migration->getArrayCopy(), payload: $migration->getArrayCopy(),
events: $allEvents, events: $allEvents,
@ -236,6 +244,7 @@ class Migrations extends Action
* @param Document $project * @param Document $project
* @param Document $migration * @param Document $migration
* @param Log $log * @param Log $log
* @param callable $realtimeConnection
* @return void * @return void
* @throws Authorization * @throws Authorization
* @throws Conflict * @throws Conflict
@ -243,7 +252,7 @@ class Migrations extends Action
* @throws Structure * @throws Structure
* @throws \Utopia\Database\Exception * @throws \Utopia\Database\Exception
*/ */
protected function processMigration(Document $project, Document $migration, Log $log): void protected function processMigration(Document $project, Document $migration, Log $log, Callable $realtimeConnection): void
{ {
/** /**
* @var Document $migrationDocument * @var Document $migrationDocument
@ -259,7 +268,7 @@ class Migrations extends Action
$migrationDocument->setAttribute('stage', 'processing'); $migrationDocument->setAttribute('stage', 'processing');
$migrationDocument->setAttribute('status', 'processing'); $migrationDocument->setAttribute('status', 'processing');
$log->addBreadcrumb(new Breadcrumb("debug", "migration", "Migration hit stage 'processing'", \microtime(true))); $log->addBreadcrumb(new Breadcrumb("debug", "migration", "Migration hit stage 'processing'", \microtime(true)));
$this->updateMigrationDocument($migrationDocument, $projectDocument); $this->updateMigrationDocument($migrationDocument, $projectDocument, $realtimeConnection);
$log->addTag('type', $migrationDocument->getAttribute('source')); $log->addTag('type', $migrationDocument->getAttribute('source'));
@ -281,12 +290,12 @@ class Migrations extends Action
/** Start Transfer */ /** Start Transfer */
$migrationDocument->setAttribute('stage', 'migrating'); $migrationDocument->setAttribute('stage', 'migrating');
$log->addBreadcrumb(new Breadcrumb("debug", "migration", "Migration hit stage 'migrating'", \microtime(true))); $log->addBreadcrumb(new Breadcrumb("debug", "migration", "Migration hit stage 'migrating'", \microtime(true)));
$this->updateMigrationDocument($migrationDocument, $projectDocument); $this->updateMigrationDocument($migrationDocument, $projectDocument, $realtimeConnection);
$transfer->run($migrationDocument->getAttribute('resources'), function () use ($migrationDocument, $transfer, $projectDocument) { $transfer->run($migrationDocument->getAttribute('resources'), function () use ($realtimeConnection, $migrationDocument, $transfer, $projectDocument) {
$migrationDocument->setAttribute('resourceData', json_encode($transfer->getCache())); $migrationDocument->setAttribute('resourceData', json_encode($transfer->getCache()));
$migrationDocument->setAttribute('statusCounters', json_encode($transfer->getStatusCounters())); $migrationDocument->setAttribute('statusCounters', json_encode($transfer->getStatusCounters()));
$this->updateMigrationDocument($migrationDocument, $projectDocument); $this->updateMigrationDocument($migrationDocument, $projectDocument, $realtimeConnection);
}); });
$sourceErrors = $source->getErrors(); $sourceErrors = $source->getErrors();
@ -309,7 +318,7 @@ class Migrations extends Action
$migrationDocument->setAttribute('errors', $errorMessages); $migrationDocument->setAttribute('errors', $errorMessages);
$log->addExtra('migrationErrors', json_encode($errorMessages)); $log->addExtra('migrationErrors', json_encode($errorMessages));
$this->updateMigrationDocument($migrationDocument, $projectDocument); $this->updateMigrationDocument($migrationDocument, $projectDocument, $realtimeConnection);
return; return;
} }
@ -352,7 +361,7 @@ class Migrations extends Action
$this->removeAPIKey($tempAPIKey); $this->removeAPIKey($tempAPIKey);
} }
if ($migrationDocument) { if ($migrationDocument) {
$this->updateMigrationDocument($migrationDocument, $projectDocument); $this->updateMigrationDocument($migrationDocument, $projectDocument, $realtimeConnection);
if ($migrationDocument->getAttribute('status', '') == 'failed') { if ($migrationDocument->getAttribute('status', '') == 'failed') {
throw new Exception("Migration failed"); throw new Exception("Migration failed");