Rollback cross regions connections

This commit is contained in:
shimon 2024-10-31 17:43:49 +02:00
parent b00b609621
commit 16a2d61874
17 changed files with 119 additions and 232 deletions

View file

@ -162,23 +162,15 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForConsole,
CLI::setResource('queue', function (Group $pools) {
return $pools->get('queue')->pop()->getResource();
}, ['pools']);
CLI::setResource('poolForQueue', function (Group $pools) {
return $pools->get('queue');
}, ['pools']);
CLI::setResource('queueForFunctions', function (Connection $queue) {
return new Func($queue);
}, ['queue']);
CLI::setResource('queueForDeletes', function (Connection $queue) {
return new Delete($queue);
}, ['queue']);
CLI::setResource('queueForCertificates', function (Connection $queue) {
return new Certificate($queue);
}, ['queue']);
CLI::setResource('logError', function (Registry $register) {
return function (Throwable $error, string $namespace, string $action) use ($register) {
$logger = $register->get('logger');

View file

@ -162,7 +162,6 @@ App::init()
->inject('mode')
->inject('team')
->action(function (App $utopia, Request $request, Database $dbForConsole, Document $project, Document $user, ?Document $session, array $servers, string $mode, Document $team) {
$route = $utopia->getRoute();
if ($project->isEmpty()) {
@ -364,6 +363,7 @@ App::init()
->inject('dbForProject')
->inject('mode')
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Messaging $queueForMessaging, Audit $queueForAudits, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Usage $queueForUsage, Database $dbForProject, string $mode) use ($databaseListener) {
$route = $utopia->getRoute();
if (
@ -461,7 +461,6 @@ App::init()
->on(Database::EVENT_DOCUMENT_DELETE, 'calculate-usage', fn ($event, $document) => $databaseListener($event, $document, $project, $queueForUsage, $dbForProject));
$useCache = $route->getLabel('cache', false);
if ($useCache) {
$key = md5($request->getURI() . '*' . implode('*', $request->getParams()) . '*' . APP_CACHE_BUSTER);
$cacheLog = Authorization::skip(fn () => $dbForProject->getDocument('cache', $key));
@ -470,8 +469,8 @@ App::init()
);
$timestamp = 60 * 60 * 24 * 30;
$data = $cache->load($key, $timestamp);
if (!empty($data) && !$cacheLog->isEmpty()) {
$timerStart = \microtime(true);
$parts = explode('/', $cacheLog->getAttribute('resourceType'));
$type = $parts[0] ?? null;
@ -552,7 +551,6 @@ App::shutdown()
->inject('project')
->inject('dbForProject')
->action(function (App $utopia, Request $request, Response $response, Document $project, Database $dbForProject) {
$route = $utopia->getRoute();
$sessionLimit = $project->getAttribute('auths', [])['maxSessions'] ?? APP_LIMIT_USER_SESSIONS_DEFAULT;
$session = $response->getPayload();
$userId = $session['userId'] ?? '';
@ -597,9 +595,8 @@ App::shutdown()
->inject('queueForFunctions')
->inject('mode')
->inject('dbForConsole')
->inject('realtimeConnection')
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole, callable $realtimeConnection) use ($parseLabel) {
$route = $utopia->getRoute();
->action(function (App $utopia, Request $request, Response $response, Document $project, Document $user, Event $queueForEvents, Audit $queueForAudits, Usage $queueForUsage, Delete $queueForDeletes, EventDatabase $queueForDatabase, Build $queueForBuilds, Messaging $queueForMessaging, Database $dbForProject, Func $queueForFunctions, string $mode, Database $dbForConsole) use ($parseLabel) {
$responsePayload = $response->getPayload();
if (!empty($queueForEvents->getEvent())) {
@ -645,7 +642,6 @@ App::shutdown()
);
Realtime::send(
redis: $realtimeConnection($queueForEvents->getSourceRegion()),
projectId: $target['projectId'] ?? $project->getId(),
payload: $queueForEvents->getRealtimePayload(),
events: $allEvents,
@ -713,11 +709,9 @@ App::shutdown()
* Cache label
*/
$useCache = $route->getLabel('cache', false);
if ($useCache) {
$resource = $resourceType = null;
$data = $response->getPayload();
if (!empty($data['payload'])) {
$pattern = $route->getLabel('cache.resource', null);
if (!empty($pattern)) {
@ -734,7 +728,6 @@ App::shutdown()
$cacheLog = Authorization::skip(fn () => $dbForProject->getDocument('cache', $key));
$accessedAt = $cacheLog->getAttribute('accessedAt', '');
$now = DateTime::now();
if ($cacheLog->isEmpty()) {
Authorization::skip(fn () => $dbForProject->createDocument('cache', new Document([
'$id' => $key,
@ -761,6 +754,8 @@ App::shutdown()
}
}
if ($project->getId() !== 'console') {
if (!Auth::isPrivilegedUser(Authorization::getRoles())) {
$fileSize = 0;

View file

@ -1530,33 +1530,23 @@ App::setResource('deviceForLocal', function () {
return new Local();
});
App::setResource('deviceForFiles', function ($project, $connectionString) {
return getDevice(APP_STORAGE_UPLOADS.'/app-'.$project->getId(), $connectionString);
}, ['project', 'connectionString']);
App::setResource('deviceForFiles', function ($project) {
return getDevice(APP_STORAGE_UPLOADS . '/app-' . $project->getId());
}, ['project']);
App::setResource('deviceForFunctions', function ($project, $connectionString) {
return getDevice(APP_STORAGE_FUNCTIONS.'/app-'.$project->getId(), $connectionString);
}, ['project', 'connectionString']);
App::setResource('deviceForFunctions', function ($project) {
return getDevice(APP_STORAGE_FUNCTIONS . '/app-' . $project->getId());
}, ['project']);
App::setResource('deviceForBuilds', function ($project, $connectionString) {
return getDevice(APP_STORAGE_BUILDS.'/app-'.$project->getId(), $connectionString);
}, ['project', 'connectionString']);
App::setResource('deviceForBuilds', function ($project) {
return getDevice(APP_STORAGE_BUILDS . '/app-' . $project->getId());
}, ['project']);
App::setResource('connectionString', function () {
return System::getEnv('_APP_CONNECTIONS_STORAGE', '');
});
App::setResource('realtimeConnection',function ($pools) {
return function () use ($pools) {
return $pools->get('pubsub')->pop()->getResource();
};
}, ['pools']);
function getDevice(string $root, string $connectionString = ''): Device
function getDevice(string $root, string $connection = ''): Device
{
$connection = !empty($connection) ? $connection : System::getEnv('_APP_CONNECTIONS_STORAGE', '');
if (! empty($connectionString)) {
if (!empty($connection)) {
$acl = 'private';
$device = Storage::DEVICE_LOCAL;
$accessKey = '';
@ -1565,7 +1555,7 @@ function getDevice(string $root, string $connectionString = ''): Device
$region = '';
try {
$dsn = new DSN($connectionString);
$dsn = new DSN($connection);
$device = $dsn->getScheme();
$accessKey = $dsn->getUser() ?? '';
$accessSecret = $dsn->getPassword() ?? '';
@ -1826,9 +1816,6 @@ App::setResource('team', function (Document $project, Database $dbForConsole, Ap
]);
});
if (!$team) {
$team = new Document([]);
}
return $team;
}, ['project', 'dbForConsole', 'utopia', 'request']);

View file

@ -137,17 +137,14 @@ if (!function_exists('getCache')) {
}
}
if (!function_exists("getPubSub")) {
function getPubSub(): \Redis
if (!function_exists('getRealtime')) {
function getRealtime(): Realtime
{
global $register;
$pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */
return $pools->get('pubsub')->pop()->getResource();
return new Realtime();
}
}
$realtime = new Realtime();
$realtime = getRealtime();
/**
* Table for statistics across all workers.
@ -370,7 +367,7 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
}
$start = time();
$redis = getPubSub(); /** @var \Redis $redis */
$redis = $register->get('pools')->get('pubsub')->pop()->getResource(); /** @var Redis $redis */
$redis->setOption(Redis::OPT_READ_TIMEOUT, -1);
if ($redis->ping(true)) {

View file

@ -262,20 +262,20 @@ Server::setResource('pools', function (Registry $register) {
return $register->get('pools');
}, ['register']);
Server::setResource('deviceForFunctions', function (Document $project, $connectionString) {
return getDevice(APP_STORAGE_FUNCTIONS.'/app-'.$project->getId(), $connectionString);
}, ['project', 'connectionString']);
Server::setResource('deviceForFunctions', function (Document $project) {
return getDevice(APP_STORAGE_FUNCTIONS . '/app-' . $project->getId());
}, ['project']);
Server::setResource('deviceForFiles', function (Document $project, $connectionString) {
return getDevice(APP_STORAGE_UPLOADS.'/app-'.$project->getId(), $connectionString);
}, ['project', 'connectionString']);
Server::setResource('deviceForFiles', function (Document $project) {
return getDevice(APP_STORAGE_UPLOADS . '/app-' . $project->getId());
}, ['project']);
Server::setResource('deviceForBuilds', function (Document $project, $connectionString) {
return getDevice(APP_STORAGE_BUILDS.'/app-'.$project->getId(), $connectionString);
}, ['project', 'connectionString']);
Server::setResource('deviceForBuilds', function (Document $project) {
return getDevice(APP_STORAGE_BUILDS . '/app-' . $project->getId());
}, ['project']);
Server::setResource('deviceForCache', function (Document $project) {
return getDevice(APP_STORAGE_CACHE.'/app-'.$project->getId());
return getDevice(APP_STORAGE_CACHE . '/app-' . $project->getId());
}, ['project']);
Server::setResource(
@ -283,11 +283,6 @@ Server::setResource(
fn () => fn (Document $project, string $resourceType, ?string $resourceId) => false
);
Server::setResource('realtimeConnection',function ($pools) {
return function () use ($pools) {
return $pools->get('pubsub')->pop()->getResource();
};
}, ['pools']);
$pools = $register->get('pools');
$platform = new Appwrite();

View file

@ -122,17 +122,15 @@ class Realtime extends Adapter
/**
* Sends an event to the Realtime Server
* @param \Redis $redis
* @param string $projectId
* @param array $payload
* @param array $events
* @param string $event
* @param array $channels
* @param array $roles
* @param array $options
* @return void
* @throws \RedisException
*/
public static function send(\Redis $redis, string $projectId, array $payload, array $events, array $channels, array $roles, array $options = []): void
public static function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options = []): void
{
if (empty($channels) || empty($roles) || empty($projectId)) {
return;
@ -140,8 +138,9 @@ class Realtime extends Adapter
$permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged'];
$userId = array_key_exists('userId', $options) ? $options['userId'] : null;
var_dump($redis->getHost());
var_dump($channels);
$redis = new \Redis(); //TODO: make this part of the constructor
$redis->connect(System::getEnv('_APP_REDIS_HOST', ''), System::getEnv('_APP_REDIS_PORT', ''));
$redis->publish('realtime', json_encode([
'project' => $projectId,
'roles' => $roles,

View file

@ -11,7 +11,6 @@ use Utopia\Database\Exception;
use Utopia\Database\Query;
use Utopia\Platform\Action;
use Utopia\Pools\Group;
use Utopia\Pools\Pool;
use Utopia\System\System;
use function Swoole\Coroutine\run;
@ -26,7 +25,7 @@ abstract class ScheduleBase extends Action
abstract public static function getName(): string;
abstract public static function getSupportedResource(): string;
abstract public static function getCollectionId(): string;
abstract protected function enqueueResources(\Utopia\Pools\Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void;
abstract protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void;
public function __construct()
{
@ -34,10 +33,10 @@ abstract class ScheduleBase extends Action
$this
->desc("Execute {$type}s scheduled in Appwrite")
->inject('poolForQueue')
->inject('pools')
->inject('dbForConsole')
->inject('getProjectDB')
->callback(fn (Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB) => $this->action($poolForQueue, $dbForConsole, $getProjectDB));
->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB));
}
/**
@ -45,7 +44,7 @@ abstract class ScheduleBase extends Action
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
*/
public function action(Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void
public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void
{
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
@ -114,17 +113,17 @@ abstract class ScheduleBase extends Action
$latestDocument = \end($results);
}
$poolForQueue->reclaim();
$pools->reclaim();
Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds");
Console::success("Starting timers at " . DateTime::now());
run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $poolForQueue, $getProjectDB) {
run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools, $getProjectDB) {
/**
* The timer synchronize $schedules copy with database collection.
*/
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $poolForQueue) {
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) {
$time = DateTime::now();
$timerStart = \microtime(true);
@ -173,17 +172,17 @@ abstract class ScheduleBase extends Action
$lastSyncUpdate = $time;
$timerEnd = \microtime(true);
$poolForQueue->reclaim();
$pools->reclaim();
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
});
Timer::tick(
static::ENQUEUE_TIMER * 1000,
fn () => $this->enqueueResources($poolForQueue, $dbForConsole, $getProjectDB)
fn () => $this->enqueueResources($pools, $dbForConsole, $getProjectDB)
);
$this->enqueueResources($poolForQueue, $dbForConsole, $getProjectDB);
$this->enqueueResources($pools, $dbForConsole, $getProjectDB);
});
}
}

View file

@ -6,7 +6,6 @@ use Appwrite\Event\Func;
use Swoole\Coroutine as Co;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Utopia\Pools\Pool;
class ScheduleExecutions extends ScheduleBase
{
@ -28,9 +27,9 @@ class ScheduleExecutions extends ScheduleBase
return 'executions';
}
protected function enqueueResources(\Utopia\Pools\Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void
protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void
{
$queue = $poolForQueue->pop();
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
$queueForFunctions = new Func($connection);
$intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds');

View file

@ -8,7 +8,6 @@ use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Pools\Group;
use Utopia\Pools\Pool;
class ScheduleFunctions extends ScheduleBase
{
@ -32,7 +31,7 @@ class ScheduleFunctions extends ScheduleBase
return 'functions';
}
protected function enqueueResources(\Utopia\Pools\Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void
protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void
{
$timerStart = \microtime(true);
$time = DateTime::now();
@ -71,10 +70,10 @@ class ScheduleFunctions extends ScheduleBase
}
foreach ($delayedExecutions as $delay => $scheduleKeys) {
\go(function () use ($delay, $scheduleKeys, $poolForQueue) {
\go(function () use ($delay, $scheduleKeys, $pools) {
\sleep($delay); // in seconds
$queue = $poolForQueue->pop();
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
foreach ($scheduleKeys as $scheduleKey) {

View file

@ -5,7 +5,6 @@ namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Messaging;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Utopia\Pools\Pool;
class ScheduleMessages extends ScheduleBase
{
@ -27,7 +26,7 @@ class ScheduleMessages extends ScheduleBase
return 'messages';
}
protected function enqueueResources(\Utopia\Pools\Pool $poolForQueue, Database $dbForConsole, callable $getProjectDB): void
protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void
{
foreach ($this->schedules as $schedule) {
if (!$schedule['active']) {
@ -41,8 +40,8 @@ class ScheduleMessages extends ScheduleBase
continue;
}
\go(function () use ($schedule, $poolForQueue, $dbForConsole) {
$queue = $poolForQueue->pop();
\go(function () use ($schedule, $pools, $dbForConsole) {
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
$queueForMessaging = new Messaging($connection);

View file

@ -33,11 +33,6 @@ use Utopia\VCS\Adapter\Git\GitHub;
class Builds extends Action
{
/**
* @var mixed|string
*/
protected string $sourceRegion;
public static function getName(): string
{
return 'builds';
@ -59,8 +54,7 @@ class Builds extends Action
->inject('dbForProject')
->inject('deviceForFunctions')
->inject('log')
->inject('realtimeConnection')
->callback(fn ($message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log, Callable $realtimeConnection) => $this->action($message, $dbForConsole, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $deviceForFunctions, $log, $realtimeConnection));
->callback(fn ($message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $usage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log) => $this->action($message, $dbForConsole, $queueForEvents, $queueForFunctions, $usage, $cache, $dbForProject, $deviceForFunctions, $log));
}
/**
@ -73,11 +67,10 @@ class Builds extends Action
* @param Database $dbForProject
* @param Device $deviceForFunctions
* @param Log $log
* @param callable $realtimeConnection
* @return void
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log, Callable $realtimeConnection): void
public function action(Message $message, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Usage $queueForUsage, Cache $cache, Database $dbForProject, Device $deviceForFunctions, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -90,7 +83,6 @@ class Builds extends Action
$resource = new Document($payload['resource'] ?? []);
$deployment = new Document($payload['deployment'] ?? []);
$template = new Document($payload['template'] ?? []);
$this->sourceRegion = $payload['sourceRegion'] ?? 'default';
$log->addTag('projectId', $project->getId());
$log->addTag('type', $type);
@ -100,7 +92,7 @@ class Builds extends Action
case BUILD_TYPE_RETRY:
Console::info('Creating build for deployment: ' . $deployment->getId());
$github = new GitHub($cache);
$this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForEvents, $queueForUsage, $dbForConsole, $dbForProject, $github, $project, $resource, $deployment, $template, $log, $realtimeConnection);
$this->buildDeployment($deviceForFunctions, $queueForFunctions, $queueForEvents, $queueForUsage, $dbForConsole, $dbForProject, $github, $project, $resource, $deployment, $template, $log);
break;
default:
@ -125,7 +117,7 @@ class Builds extends Action
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Database $dbForConsole, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log, Callable $realtimeConnection): void
protected function buildDeployment(Device $deviceForFunctions, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Database $dbForConsole, Database $dbForProject, GitHub $github, Document $project, Document $function, Document $deployment, Document $template, Log $log): void
{
$executor = new Executor(System::getEnv('_APP_EXECUTOR_HOST'));
@ -384,7 +376,6 @@ class Builds extends Action
project: $project
);
Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: 'console',
payload: $build->getArrayCopy(),
events: $allEvents,
@ -463,7 +454,6 @@ class Builds extends Action
);
Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: 'console',
payload: $build->getArrayCopy(),
events: $allEvents,
@ -562,12 +552,12 @@ class Builds extends Action
$err = $error;
}
}),
Co\go(function () use ($realtimeConnection, $executor, $project, $deployment, &$response, &$build, $dbForProject, $allEvents, &$err, &$isCanceled) {
Co\go(function () use ($executor, $project, $deployment, &$response, &$build, $dbForProject, $allEvents, &$err, &$isCanceled) {
try {
$executor->getLogs(
deploymentId: $deployment->getId(),
projectId: $project->getId(),
callback: function ($logs) use ($realtimeConnection, &$response, &$err, &$build, $dbForProject, $allEvents, $project, &$isCanceled) {
callback: function ($logs) use (&$response, &$err, &$build, $dbForProject, $allEvents, $project, &$isCanceled) {
if ($isCanceled) {
return;
}
@ -601,7 +591,6 @@ class Builds extends Action
project: $project
);
Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: 'console',
payload: $build->getArrayCopy(),
events: $allEvents,
@ -704,7 +693,6 @@ class Builds extends Action
project: $project
);
Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: 'console',
payload: $build->getArrayCopy(),
events: $allEvents,

View file

@ -30,11 +30,6 @@ use Utopia\System\System;
class Certificates extends Action
{
/**
* @var mixed|string
*/
protected string $sourceRegion;
public static function getName(): string
{
return 'certificates';
@ -53,8 +48,7 @@ class Certificates extends Action
->inject('queueForEvents')
->inject('queueForFunctions')
->inject('log')
->inject('realtimeConnection')
->callback(fn (Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, Callable $realtimeConnection) => $this->action($message, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $log, $realtimeConnection));
->callback(fn (Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log) => $this->action($message, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $log));
}
/**
@ -64,12 +58,11 @@ class Certificates extends Action
* @param Event $queueForEvents
* @param Func $queueForFunctions
* @param Log $log
* @param callable $realtimeConnection
* @return void
* @throws Throwable
* @throws \Utopia\Database\Exception
*/
public function action(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, Callable $realtimeConnection): void
public function action(Message $message, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -80,10 +73,10 @@ class Certificates extends Action
$document = new Document($payload['domain'] ?? []);
$domain = new Domain($document->getAttribute('domain', ''));
$skipRenewCheck = $payload['skipRenewCheck'] ?? false;
$this->sourceRegion = $payload['sourceRegion'] ?? 'default';
$log->addTag('domain', $domain->get());
$this->execute($domain, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $log,$realtimeConnection, $skipRenewCheck);
$this->execute($domain, $dbForConsole, $queueForMails, $queueForEvents, $queueForFunctions, $log, $skipRenewCheck);
}
/**
@ -92,17 +85,12 @@ class Certificates extends Action
* @param Mail $queueForMails
* @param Event $queueForEvents
* @param Func $queueForFunctions
* @param Log $log
* @param callable $realtimeConnection
* @param bool $skipRenewCheck
* @return void
* @throws Authorization
* @throws Conflict
* @throws Structure
* @throws Throwable
* @throws \Utopia\Database\Exception
*/
protected function execute(Domain $domain, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, Callable $realtimeConnection, bool $skipRenewCheck = false): void
private function execute(Domain $domain, Database $dbForConsole, Mail $queueForMails, Event $queueForEvents, Func $queueForFunctions, Log $log, bool $skipRenewCheck = false): void
{
/**
* 1. Read arguments and validate domain
@ -138,7 +126,7 @@ class Certificates extends Action
$certificate = $dbForConsole->findOne('certificates', [Query::equal('domain', [$domain->get()])]);
// If we don't have certificate for domain yet, let's create new document. At the end we save it
if (!$certificate) {
if ($certificate->isEmpty()) {
$certificate = new Document();
$certificate->setAttribute('domain', $domain->get());
}
@ -174,6 +162,7 @@ class Certificates extends Action
$logs = 'Certificate successfully generated.';
$certificate->setAttribute('logs', \mb_strcut($logs, 0, 1000000));// Limit to 1MB
// Give certificates to Traefik
$this->applyCertificateFiles($folder, $domain->get(), $letsEncryptData);
@ -204,7 +193,7 @@ class Certificates extends Action
$certificate->setAttribute('updated', DateTime::now());
// Save all changes we made to certificate document into database
$this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForConsole, $queueForEvents, $queueForFunctions, $realtimeConnection);
$this->saveCertificateDocument($domain->get(), $certificate, $success, $dbForConsole, $queueForEvents, $queueForFunctions);
}
}
@ -223,11 +212,11 @@ class Certificates extends Action
* @throws Conflict
* @throws Structure
*/
protected function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Callable $realtimeConnection): void
private function saveCertificateDocument(string $domain, Document $certificate, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions): void
{
// Check if update or insert required
$certificateDocument = $dbForConsole->findOne('certificates', [Query::equal('domain', [$domain])]);
if (!empty($certificateDocument) && !$certificateDocument->isEmpty()) {
if (!$certificateDocument->isEmpty()) {
// Merge new data with current data
$certificate = new Document(\array_merge($certificateDocument->getArrayCopy(), $certificate->getArrayCopy()));
$certificate = $dbForConsole->updateDocument('certificates', $certificate->getId(), $certificate);
@ -237,7 +226,7 @@ class Certificates extends Action
}
$certificateId = $certificate->getId();
$this->updateDomainDocuments($certificateId, $domain, $success, $dbForConsole, $queueForEvents, $queueForFunctions, $realtimeConnection);
$this->updateDomainDocuments($certificateId, $domain, $success, $dbForConsole, $queueForEvents, $queueForFunctions);
}
/**
@ -245,7 +234,7 @@ class Certificates extends Action
*
* @return null|string Returns main domain. If null, there is no main domain yet.
*/
protected function getMainDomain(): ?string
private function getMainDomain(): ?string
{
$envDomain = System::getEnv('_APP_DOMAIN', '');
if (!empty($envDomain) && $envDomain !== 'localhost') {
@ -266,7 +255,7 @@ class Certificates extends Action
* @return void
* @throws Exception
*/
protected function validateDomain(Domain $domain, bool $isMainDomain, Log $log): void
private function validateDomain(Domain $domain, bool $isMainDomain, Log $log): void
{
if (empty($domain->get())) {
throw new Exception('Missing certificate domain.');
@ -310,7 +299,7 @@ class Certificates extends Action
* @return bool True, if certificate needs to be renewed
* @throws Exception
*/
protected function isRenewRequired(string $domain, Log $log): bool
private function isRenewRequired(string $domain, Log $log): bool
{
$certPath = APP_STORAGE_CERTIFICATES . '/' . $domain . '/cert.pem';
if (\file_exists($certPath)) {
@ -344,7 +333,7 @@ class Certificates extends Action
* @return array Named array with keys 'stdout' and 'stderr', both string
* @throws Exception
*/
protected function issueCertificate(string $folder, string $domain, string $email): array
private function issueCertificate(string $folder, string $domain, string $email): array
{
$stdout = '';
$stderr = '';
@ -374,7 +363,7 @@ class Certificates extends Action
* @return string
* @throws \Utopia\Database\Exception
*/
protected function getRenewDate(string $domain): string
private function getRenewDate(string $domain): string
{
$certPath = APP_STORAGE_CERTIFICATES . '/' . $domain . '/cert.pem';
$certData = openssl_x509_parse(file_get_contents($certPath));
@ -392,7 +381,7 @@ class Certificates extends Action
* @return void
* @throws Exception
*/
protected function applyCertificateFiles(string $folder, string $domain, array $letsEncryptData): void
private function applyCertificateFiles(string $folder, string $domain, array $letsEncryptData): void
{
// Prepare folder in storage for domain
@ -443,7 +432,7 @@ class Certificates extends Action
* @return void
* @throws Exception
*/
protected function notifyError(string $domain, string $errorMessage, int $attempt, Mail $queueForMails): void
private function notifyError(string $domain, string $errorMessage, int $attempt, Mail $queueForMails): void
{
// Log error into console
Console::warning('Cannot renew domain (' . $domain . ') on attempt no. ' . $attempt . ' certificate: ' . $errorMessage);
@ -486,14 +475,14 @@ class Certificates extends Action
*
* @return void
*/
protected function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions, Callable $realtimeConnection): void
private function updateDomainDocuments(string $certificateId, string $domain, bool $success, Database $dbForConsole, Event $queueForEvents, Func $queueForFunctions): void
{
$rule = $dbForConsole->findOne('rules', [
Query::equal('domain', [$domain]),
]);
if ($rule !== false && !$rule->isEmpty()) {
if (!$rule->isEmpty()) {
$rule->setAttribute('certificateId', $certificateId);
$rule->setAttribute('status', $success ? 'verified' : 'unverified');
$dbForConsole->updateDocument('rules', $rule->getId(), $rule);
@ -536,7 +525,6 @@ class Certificates extends Action
project: $project
);
Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: 'console',
payload: $rule->getArrayCopy(),
events: $allEvents,
@ -544,7 +532,6 @@ class Certificates extends Action
roles: $target['roles']
);
Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: $project->getId(),
payload: $rule->getArrayCopy(),
events: $allEvents,

View file

@ -17,15 +17,9 @@ use Utopia\Database\Query;
use Utopia\Logger\Log;
use Utopia\Platform\Action;
use Utopia\Queue\Message;
use Utopia\System\System;
class Databases extends Action
{
/**
* @var array|mixed
*/
protected string $sourceRegion;
public static function getName(): string
{
return 'databases';
@ -42,8 +36,7 @@ class Databases extends Action
->inject('dbForConsole')
->inject('dbForProject')
->inject('log')
->inject('realtimeConnection')
->callback(fn (Message $message, Database $dbForConsole, Database $dbForProject, Log $log, callable $realtimeConnection) => $this->action($message, $dbForConsole, $dbForProject, $log, $realtimeConnection));
->callback(fn (Message $message, Database $dbForConsole, Database $dbForProject, Log $log) => $this->action($message, $dbForConsole, $dbForProject, $log));
}
/**
@ -51,11 +44,10 @@ class Databases extends Action
* @param Database $dbForConsole
* @param Database $dbForProject
* @param Log $log
* @param callable $realtimeConnection
* @return void
* @throws \Exception
*/
public function action(Message $message, Database $dbForConsole, Database $dbForProject, Log $log, callable $realtimeConnection): void
public function action(Message $message, Database $dbForConsole, Database $dbForProject, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -68,7 +60,6 @@ class Databases extends Action
$collection = new Document($payload['collection'] ?? []);
$document = new Document($payload['document'] ?? []);
$database = new Document($payload['database'] ?? []);
$this->sourceRegion = $payload['sourceRegion'] ?? null;
$log->addTag('projectId', $project->getId());
$log->addTag('type', $type);
@ -82,10 +73,10 @@ class Databases extends Action
match (\strval($type)) {
DATABASE_TYPE_DELETE_DATABASE => $this->deleteDatabase($database, $project, $dbForProject),
DATABASE_TYPE_DELETE_COLLECTION => $this->deleteCollection($database, $collection, $project, $dbForProject),
DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject, $realtimeConnection),
DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject, $realtimeConnection),
DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForConsole, $dbForProject, $realtimeConnection),
DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForConsole, $dbForProject, $realtimeConnection),
DATABASE_TYPE_CREATE_ATTRIBUTE => $this->createAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject),
DATABASE_TYPE_DELETE_ATTRIBUTE => $this->deleteAttribute($database, $collection, $document, $project, $dbForConsole, $dbForProject),
DATABASE_TYPE_CREATE_INDEX => $this->createIndex($database, $collection, $document, $project, $dbForConsole, $dbForProject),
DATABASE_TYPE_DELETE_INDEX => $this->deleteIndex($database, $collection, $document, $project, $dbForConsole, $dbForProject),
default => throw new \Exception('No database operation for type: ' . \strval($type)),
};
}
@ -97,13 +88,12 @@ class Databases extends Action
* @param Document $project
* @param Database $dbForConsole
* @param Database $dbForProject
* @param callable $realtimeConnection
* @return void
* @throws Authorization
* @throws Conflict
* @throws \Exception
*/
private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForConsole, Database $dbForProject, callable $realtimeConnection): void
private function createAttribute(Document $database, Document $collection, Document $attribute, Document $project, Database $dbForConsole, Database $dbForProject): void
{
if ($collection->isEmpty()) {
throw new Exception('Missing collection');
@ -144,6 +134,7 @@ class Databases extends Action
$options = $attribute->getAttribute('options', []);
$project = $dbForConsole->getDocument('projects', $projectId);
try {
switch ($type) {
case Database::VAR_RELATIONSHIP:
@ -203,7 +194,7 @@ class Databases extends Action
);
}
} finally {
$this->trigger($database, $collection, $attribute, $project, $projectId, $events, $realtimeConnection);
$this->trigger($database, $collection, $attribute, $project, $projectId, $events);
}
if ($type === Database::VAR_RELATIONSHIP && $options['twoWay']) {
@ -220,7 +211,6 @@ class Databases extends Action
* @param Document $project
* @param Database $dbForConsole
* @param Database $dbForProject
* @param callable $realtimeConnection
* @return void
* @throws Authorization
* @throws Conflict
@ -305,7 +295,7 @@ class Databases extends Action
);
}
} finally {
$this->trigger($database, $collection, $attribute, $project, $projectId, $events, $realtimeConnection);
$this->trigger($database, $collection, $attribute, $project, $projectId, $events);
}
// The underlying database removes/rebuilds indexes when attribute is removed
@ -375,14 +365,13 @@ class Databases extends Action
* @param Document $project
* @param Database $dbForConsole
* @param Database $dbForProject
* @param callable $realtimeConnection
* @return void
* @throws Authorization
* @throws Conflict
* @throws Structure
* @throws DatabaseException
*/
private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject, callable $realtimeConnection): void
private function createIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject): void
{
if ($collection->isEmpty()) {
throw new Exception('Missing collection');
@ -424,7 +413,7 @@ class Databases extends Action
$index->setAttribute('status', 'failed')
);
} finally {
$this->trigger($database, $collection, $index, $project, $projectId, $events, $realtimeConnection);
$this->trigger($database, $collection, $index, $project, $projectId, $events);
}
$dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collectionId);
@ -437,14 +426,13 @@ class Databases extends Action
* @param Document $project
* @param Database $dbForConsole
* @param Database $dbForProject
* @param callable $realtimeConnection
* @return void
* @throws Authorization
* @throws Conflict
* @throws Structure
* @throws DatabaseException
*/
private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject, callable $realtimeConnection): void
private function deleteIndex(Document $database, Document $collection, Document $index, Document $project, Database $dbForConsole, Database $dbForProject): void
{
if ($collection->isEmpty()) {
throw new Exception('Missing collection');
@ -483,7 +471,7 @@ class Databases extends Action
$index->setAttribute('status', 'stuck')
);
} finally {
$this->trigger($database, $collection, $index, $project, $projectId, $events, $realtimeConnection);
$this->trigger($database, $collection, $index, $project, $projectId, $events);
}
$dbForProject->purgeCachedDocument('database_' . $database->getInternalId(), $collection->getId());
@ -606,20 +594,14 @@ class Databases extends Action
Console::info("Deleted {$count} document by group in " . ($executionEnd - $executionStart) . " seconds");
}
/**
* @throws \RedisException
* @throws Exception
*/
protected function trigger(
Document $database,
Document $collection,
Document $attribute,
Document $project,
string $projectId,
array $events,
callable $realtimeConnection
array $events
): void {
var_dump('Send message to realtime region='.$this->sourceRegion);
$target = Realtime::fromPayload(
// Pass first, most verbose event pattern
event: $events[0],
@ -627,7 +609,6 @@ class Databases extends Action
project: $project,
);
Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: 'console',
payload: $attribute->getArrayCopy(),
events: $events,

View file

@ -494,7 +494,6 @@ class Deletes extends Action
];
$limit = \count($projectCollectionIds) + 25;
$sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
while (true) {
$collections = $dbForProject->listCollections($limit);

View file

@ -28,11 +28,6 @@ use Utopia\System\System;
class Functions extends Action
{
/**
* @var mixed|string
*/
protected string $sourceRegion;
public static function getName(): string
{
return 'functions';
@ -54,12 +49,10 @@ class Functions extends Action
->inject('queueForUsage')
->inject('log')
->inject('isResourceBlocked')
->inject('realtimeConnection')
->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, callable $isResourceBlocked, Callable $realtimeConnection) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage, $log, $isResourceBlocked, $realtimeConnection));
->callback(fn (Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, callable $isResourceBlocked) => $this->action($project, $message, $dbForProject, $queueForFunctions, $queueForEvents, $queueForUsage, $log, $isResourceBlocked));
}
public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, callable $isResourceBlocked, Callable $realtimeConnection): void
public function action(Document $project, Message $message, Database $dbForProject, Func $queueForFunctions, Event $queueForEvents, Usage $queueForUsage, Log $log, callable $isResourceBlocked): void
{
$payload = $message->getPayload() ?? [];
@ -104,8 +97,6 @@ class Functions extends Action
return;
}
$this->sourceRegion = $payload['sourceRegion'] ?? 'default';
if ($function->isEmpty() && !empty($functionId)) {
$function = $dbForProject->getDocument('functions', $functionId);
}
@ -143,7 +134,6 @@ class Functions extends Action
Console::success('Iterating function: ' . $function->getAttribute('name'));
$this->execute(
realtimeConnection: $realtimeConnection,
log: $log,
dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions,
@ -184,7 +174,6 @@ class Functions extends Action
$execution = new Document($payload['execution'] ?? []);
$user = new Document($payload['user'] ?? []);
$this->execute(
realtimeConnection: $realtimeConnection,
log: $log,
dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions,
@ -207,7 +196,6 @@ class Functions extends Action
case 'schedule':
$execution = new Document($payload['execution'] ?? []);
$this->execute(
realtimeConnection: $realtimeConnection,
log: $log,
dbForProject: $dbForProject,
queueForFunctions: $queueForFunctions,
@ -317,7 +305,6 @@ class Functions extends Action
* @throws Conflict
*/
private function execute(
Callable $realtimeConnection,
Log $log,
Database $dbForProject,
Func $queueForFunctions,
@ -610,7 +597,6 @@ class Functions extends Action
project: $project
);
Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: 'console',
payload: $execution->getArrayCopy(),
events: $allEvents,
@ -618,7 +604,6 @@ class Functions extends Action
roles: $target['roles']
);
Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: $project->getId(),
payload: $execution->getArrayCopy(),
events: $allEvents,

View file

@ -178,7 +178,7 @@ class Messaging extends Action
Query::equal('type', [$providerType]),
]);
if ($default === false || $default->isEmpty()) {
if ($default->isEmpty()) {
$dbForProject->updateDocument('messages', $message->getId(), $message->setAttributes([
'status' => MessageStatus::FAILED,
'deliveryErrors' => ['No enabled provider found.']

View file

@ -36,10 +36,6 @@ class Migrations extends Action
protected Database $dbForConsole;
protected Document $project;
/**
* @var string
*/
protected string $sourceRegion;
public static function getName(): string
{
@ -57,14 +53,13 @@ class Migrations extends Action
->inject('dbForProject')
->inject('dbForConsole')
->inject('log')
->inject('realtimeConnection')
->callback(fn (Message $message, Database $dbForProject, Database $dbForConsole, Log $log, Callable $realtimeConnection) => $this->action($message, $dbForProject, $dbForConsole, $log, $realtimeConnection));
->callback(fn (Message $message, Database $dbForProject, Database $dbForConsole, Log $log) => $this->action($message, $dbForProject, $dbForConsole, $log));
}
/**
* @throws Exception
*/
public function action(Message $message, Database $dbForProject, Database $dbForConsole, Log $log, Callable $realtimeConnection): void
public function action(Message $message, Database $dbForProject, Database $dbForConsole, Log $log): void
{
$payload = $message->getPayload() ?? [];
@ -80,7 +75,6 @@ class Migrations extends Action
return;
}
$this->sourceRegion = $payload['sourceRegion'] ?? 'default';
$this->dbForProject = $dbForProject;
$this->dbForConsole = $dbForConsole;
$this->project = $project;
@ -95,7 +89,7 @@ class Migrations extends Action
$log->addTag('migrationId', $migration->getId());
$log->addTag('projectId', $project->getId());
$this->processMigration($migration, $log, $realtimeConnection);
$this->processMigration($migration, $log);
}
/**
@ -130,7 +124,7 @@ class Migrations extends Action
),
SourceAppwrite::getName() => new SourceAppwrite(
$credentials['projectId'],
$credentials['endpoint'],
$credentials['endpoint'] === 'http://localhost/v1' ? 'http://appwrite/v1' : $credentials['endpoint'],
$credentials['apiKey'],
),
default => throw new \Exception('Invalid source type'),
@ -140,16 +134,15 @@ class Migrations extends Action
/**
* @throws Exception
*/
protected function processDestination(Document $migration): Destination
protected function processDestination(Document $migration, string $apiKey): Destination
{
$destination = $migration->getAttribute('destination');
$credentials = $migration->getAttribute('credentials');
return match ($destination) {
DestinationAppwrite::getName() => new DestinationAppwrite(
$credentials['projectId'],
$credentials['endpoint'],
$credentials['apiKey'],
$this->project->getId(),
'http://appwrite/v1',
$apiKey,
$this->dbForProject,
Config::getParam('collections', [])['databases']['collections'],
),
@ -164,7 +157,7 @@ class Migrations extends Action
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function updateMigrationDocument(Document $migration, Document $project, Callable $realtimeConnection): Document
protected function updateMigrationDocument(Document $migration, Document $project): Document
{
/** Trigger Realtime */
$allEvents = Event::generateEvents('migrations.[migrationId].update', [
@ -178,7 +171,6 @@ class Migrations extends Action
);
Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: 'console',
payload: $migration->getArrayCopy(),
events: $allEvents,
@ -187,7 +179,6 @@ class Migrations extends Action
);
Realtime::send(
redis: $realtimeConnection($this->sourceRegion),
projectId: $project->getId(),
payload: $migration->getArrayCopy(),
events: $allEvents,
@ -261,11 +252,6 @@ class Migrations extends Action
}
/**
* @param Document $project
* @param Document $migration
* @param Log $log
* @param callable $realtimeConnection
* @return void
* @throws Authorization
* @throws Conflict
* @throws Restricted
@ -273,7 +259,7 @@ class Migrations extends Action
* @throws \Utopia\Database\Exception
* @throws Exception
*/
protected function processMigration(Document $migration, Log $log, Callable $realtimeConnection): void
protected function processMigration(Document $migration, Log $log): void
{
$project = $this->project;
$projectDocument = $this->dbForConsole->getDocument('projects', $project->getId());
@ -285,8 +271,8 @@ class Migrations extends Action
$migration = $this->dbForProject->getDocument('migrations', $migration->getId());
if (
$migration->getAttribute('source') === SourceAppwrite::getName() ||
$migration->getAttribute('destination') === DestinationAppwrite::getName()
$migration->getAttribute('source') === SourceAppwrite::getName() &&
empty($migration->getAttribute('credentials', []))
) {
$credentials = $migration->getAttribute('credentials', []);
@ -299,12 +285,12 @@ class Migrations extends Action
$migration->setAttribute('stage', 'processing');
$migration->setAttribute('status', 'processing');
$this->updateMigrationDocument($migration, $projectDocument, $realtimeConnection);
$this->updateMigrationDocument($migration, $projectDocument);
$log->addTag('type', $migration->getAttribute('source'));
$source = $this->processSource($migration);
$destination = $this->processDestination($migration);
$destination = $this->processDestination($migration, $tempAPIKey->getAttribute('secret'));
$source->report();
@ -315,14 +301,14 @@ class Migrations extends Action
/** Start Transfer */
$migration->setAttribute('stage', 'migrating');
$this->updateMigrationDocument($migration, $projectDocument, $realtimeConnection);
$this->updateMigrationDocument($migration, $projectDocument);
$transfer->run(
$migration->getAttribute('resources'),
function () use ($migration, $transfer, $projectDocument) {
$migration->setAttribute('resourceData', json_encode($transfer->getCache()));
$migration->setAttribute('statusCounters', json_encode($transfer->getStatusCounters()));
$this->updateMigrationDocument($migration, $projectDocument, $realtimeConnection);
$this->updateMigrationDocument($migration, $projectDocument);
},
$migration->getAttribute('resourceId'),
$migration->getAttribute('resourceType')
@ -361,7 +347,7 @@ class Migrations extends Action
$migration->setAttribute('errors', $errorMessages);
$log->addExtra('migrationErrors', json_encode($errorMessages));
$this->updateMigrationDocument($migration, $projectDocument, $realtimeConnection);
$this->updateMigrationDocument($migration, $projectDocument);
return;
}
@ -402,7 +388,7 @@ class Migrations extends Action
$this->removeAPIKey($tempAPIKey);
}
$this->updateMigrationDocument($migration, $projectDocument, $realtimeConnection);
$this->updateMigrationDocument($migration, $projectDocument);
if ($migration->getAttribute('status', '') === 'failed') {
Console::error('Migration('.$migration->getInternalId().':'.$migration->getId().') failed, Project('.$this->project->getInternalId().':'.$this->project->getId().')');