This commit is contained in:
shimon 2024-10-22 19:39:57 +03:00
parent f8b2878943
commit 54555cb3cd
7 changed files with 83 additions and 76 deletions

View file

@ -207,7 +207,8 @@ App::post('/v1/projects')
if (\in_array($dsn->getHost(), $sharedTables)) {
$dbForProject
->setSharedTables(true)
->setTenant($globalCollections ? null : $project->getInternalId())
->setTenant($project->getInternalId())
//->setTenant($globalCollections ? null : $project->getInternalId())
->setNamespace($dsn->getParam('namespace'));
} else {
$dbForProject
@ -218,46 +219,50 @@ App::post('/v1/projects')
$create = true;
try {
// try {
$dbForProject->create();
} catch (Duplicate) {
$create = false;
}
// } catch (Duplicate) {
// $create = false;
//}
if ($create || !$globalCollections) {
$audit = new Audit($dbForProject);
$audit->setup();
$audit = new Audit($dbForProject);
$audit->setup();
$abuse = new TimeLimit('', 0, 1, $dbForProject);
$abuse->setup();
$abuse = new TimeLimit('', 0, 1, $dbForProject);
$abuse->setup();
/** @var array $collections */
$collections = Config::getParam('collections', [])['projects'] ?? [];
/** @var array $collections */
$collections = Config::getParam('collections', [])['projects'] ?? [];
foreach ($collections as $key => $collection) {
if (($collection['$collection'] ?? '') !== Database::METADATA) {
continue;
}
foreach ($collections as $key => $collection) {
if (($collection['$collection'] ?? '') !== Database::METADATA) {
continue;
}
$attributes = \array_map(fn ($attribute) => new Document($attribute), $collection['attributes']);
$indexes = \array_map(fn (array $index) => new Document($index), $collection['indexes']);
$attributes = \array_map(function (array $attribute) {
return new Document($attribute);
}, $collection['attributes']);
$indexes = \array_map(function (array $index) {
return new Document($index);
}, $collection['indexes']);
try {
$dbForProject->createCollection($key, $attributes, $indexes);
} catch (Duplicate) {
if (!$globalCollections) {
$dbForProject->createDocument(Database::METADATA, new Document([
'$id' => ID::custom($key),
'$permissions' => [Permission::create(Role::any())],
'name' => $key,
'attributes' => $attributes,
'indexes' => $indexes,
'documentSecurity' => true
]));
}
// if (!$globalCollections) {
// $dbForProject->createDocument(Database::METADATA, new Document([
// '$id' => ID::custom($key),
// '$permissions' => [Permission::create(Role::any())],
// 'name' => $key,
// 'attributes' => $attributes,
// 'indexes' => $indexes,
// 'documentSecurity' => true
// ]));
// }
}
}
}
// }
// Hook allowing instant project mirroring during migration
// Outside of migration, hook is not registered and has no effect

View file

@ -51,7 +51,7 @@
"utopia-php/cache": "0.10.*",
"utopia-php/cli": "0.15.*",
"utopia-php/config": "0.2.*",
"utopia-php/database": "dev-feat-migrations as 0.53.6",
"utopia-php/database": "0.53.6",
"utopia-php/domains": "0.5.*",
"utopia-php/dsn": "0.2.1",
"utopia-php/framework": "0.33.*",

39
composer.lock generated
View file

@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "5cb1524d4677594554ab26d05b3c38e6",
"content-hash": "a81b2ddbd465059b986947b63435e2bc",
"packages": [
{
"name": "adhocore/jwt",
@ -1724,16 +1724,16 @@
},
{
"name": "utopia-php/database",
"version": "dev-feat-migrations",
"version": "0.53.6",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/database.git",
"reference": "5b9462f8abc12c2c7ab2dac99aa6f68aa2e27365"
"reference": "feddc8e808eaea9a11c65cca3f01683def422f52"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/database/zipball/5b9462f8abc12c2c7ab2dac99aa6f68aa2e27365",
"reference": "5b9462f8abc12c2c7ab2dac99aa6f68aa2e27365",
"url": "https://api.github.com/repos/utopia-php/database/zipball/feddc8e808eaea9a11c65cca3f01683def422f52",
"reference": "feddc8e808eaea9a11c65cca3f01683def422f52",
"shasum": ""
},
"require": {
@ -1774,9 +1774,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/database/issues",
"source": "https://github.com/utopia-php/database/tree/feat-migrations"
"source": "https://github.com/utopia-php/database/tree/0.53.6"
},
"time": "2024-10-22T08:05:24+00:00"
"time": "2024-10-08T02:18:46+00:00"
},
{
"name": "utopia-php/domains",
@ -2341,16 +2341,16 @@
},
{
"name": "utopia-php/platform",
"version": "0.7.0",
"version": "0.7.1",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/platform.git",
"reference": "beeea0f2c9bce14a6869fc5c87a1047cdecb5c52"
"reference": "3433a0f1a54988f2a59c735f507745cb2c24638a"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/platform/zipball/beeea0f2c9bce14a6869fc5c87a1047cdecb5c52",
"reference": "beeea0f2c9bce14a6869fc5c87a1047cdecb5c52",
"url": "https://api.github.com/repos/utopia-php/platform/zipball/3433a0f1a54988f2a59c735f507745cb2c24638a",
"reference": "3433a0f1a54988f2a59c735f507745cb2c24638a",
"shasum": ""
},
"require": {
@ -2385,9 +2385,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/platform/issues",
"source": "https://github.com/utopia-php/platform/tree/0.7.0"
"source": "https://github.com/utopia-php/platform/tree/0.7.1"
},
"time": "2024-05-08T17:00:55+00:00"
"time": "2024-10-22T10:27:49+00:00"
},
{
"name": "utopia-php/pools",
@ -7003,18 +7003,9 @@
"time": "2024-03-07T20:33:40+00:00"
}
],
"aliases": [
{
"package": "utopia-php/database",
"version": "dev-feat-migrations",
"alias": "0.53.6",
"alias_normalized": "0.53.6.0"
}
],
"aliases": [],
"minimum-stability": "stable",
"stability-flags": {
"utopia-php/database": 20
},
"stability-flags": [],
"prefer-stable": false,
"prefer-lowest": false,
"platform": {

View file

@ -11,7 +11,6 @@ use Utopia\Database\Exception;
use Utopia\Database\Query;
use Utopia\Platform\Action;
use Utopia\Pools\Group;
use Utopia\Queue\Connection;
use Utopia\System\System;
use function Swoole\Coroutine\run;
@ -26,7 +25,7 @@ abstract class ScheduleBase extends Action
abstract public static function getName(): string;
abstract public static function getSupportedResource(): string;
abstract public static function getCollectionId(): string;
abstract protected function enqueueResources(Connection $queue, Database $dbForConsole);
abstract protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void;
public function __construct()
{
@ -34,10 +33,10 @@ abstract class ScheduleBase extends Action
$this
->desc("Execute {$type}s scheduled in Appwrite")
->inject('queue')
->inject('pools')
->inject('dbForConsole')
->inject('getProjectDB')
->callback(fn (Connection $queue, Database $dbForConsole, callable $getProjectDB) => $this->action($queue, $dbForConsole, $getProjectDB));
->callback(fn (Group $pools, Database $dbForConsole, callable $getProjectDB) => $this->action($pools, $dbForConsole, $getProjectDB));
}
/**
@ -45,7 +44,7 @@ abstract class ScheduleBase extends Action
* 2. Create timer that sync all changes from 'schedules' collection to local copy. Only reading changes thanks to 'resourceUpdatedAt' attribute
* 3. Create timer that prepares coroutines for soon-to-execute schedules. When it's ready, coroutine sleeps until exact time before sending request to worker.
*/
public function action(Connection $queue, Database $dbForConsole, callable $getProjectDB): void
public function action(Group $pools, Database $dbForConsole, callable $getProjectDB): void
{
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
@ -114,15 +113,17 @@ abstract class ScheduleBase extends Action
$latestDocument = \end($results);
}
$pools->reclaim();
Console::success("{$total} resources were loaded in " . (\microtime(true) - $loadStart) . " seconds");
Console::success("Starting timers at " . DateTime::now());
run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $queue) {
run(function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools, $getProjectDB) {
/**
* The timer synchronize $schedules copy with database collection.
*/
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $queue) {
Timer::tick(static::UPDATE_TIMER * 1000, function () use ($dbForConsole, &$lastSyncUpdate, $getSchedule, $pools) {
$time = DateTime::now();
$timerStart = \microtime(true);
@ -171,15 +172,17 @@ abstract class ScheduleBase extends Action
$lastSyncUpdate = $time;
$timerEnd = \microtime(true);
$pools->reclaim();
Console::log("Sync tick: {$total} schedules were updated in " . ($timerEnd - $timerStart) . " seconds");
});
Timer::tick(
static::ENQUEUE_TIMER * 1000,
fn () => $this->enqueueResources($queue, $dbForConsole)
fn () => $this->enqueueResources($pools, $dbForConsole, $getProjectDB)
);
$this->enqueueResources($queue, $dbForConsole);
$this->enqueueResources($pools, $dbForConsole, $getProjectDB);
});
}
}

View file

@ -6,7 +6,6 @@ use Appwrite\Event\Func;
use Swoole\Coroutine as Co;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Utopia\Queue\Connection;
class ScheduleExecutions extends ScheduleBase
{
@ -28,10 +27,11 @@ class ScheduleExecutions extends ScheduleBase
return 'executions';
}
protected function enqueueResources(Connection $queue, Database $dbForConsole): void
protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void
{
$queueForFunctions = new Func($queue);
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
$queueForFunctions = new Func($connection);
$intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds');
foreach ($this->schedules as $schedule) {
@ -81,5 +81,7 @@ class ScheduleExecutions extends ScheduleBase
unset($this->schedules[$schedule['$internalId']]);
}
$queue->reclaim();
}
}

View file

@ -8,7 +8,6 @@ use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Pools\Group;
use Utopia\Queue\Connection;
class ScheduleFunctions extends ScheduleBase
{
@ -32,7 +31,7 @@ class ScheduleFunctions extends ScheduleBase
return 'functions';
}
protected function enqueueResources(Connection $queue, Database $dbForConsole): void
protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void
{
$timerStart = \microtime(true);
$time = DateTime::now();
@ -71,9 +70,12 @@ class ScheduleFunctions extends ScheduleBase
}
foreach ($delayedExecutions as $delay => $scheduleKeys) {
\go(function () use ($delay, $scheduleKeys, $queue) {
\go(function () use ($delay, $scheduleKeys, $pools) {
\sleep($delay); // in seconds
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
foreach ($scheduleKeys as $scheduleKey) {
// Ensure schedule was not deleted
if (!\array_key_exists($scheduleKey, $this->schedules)) {
@ -82,7 +84,7 @@ class ScheduleFunctions extends ScheduleBase
$schedule = $this->schedules[$scheduleKey];
$queueForFunctions = new Func($queue);
$queueForFunctions = new Func($connection);
$queueForFunctions
->setType('schedule')
@ -92,6 +94,8 @@ class ScheduleFunctions extends ScheduleBase
->setProject($schedule['project'])
->trigger();
}
$queue->reclaim();
});
}

View file

@ -5,7 +5,6 @@ namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Messaging;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Utopia\Queue\Connection;
class ScheduleMessages extends ScheduleBase
{
@ -27,7 +26,7 @@ class ScheduleMessages extends ScheduleBase
return 'messages';
}
protected function enqueueResources(Connection $queue, Database $dbForConsole): void
protected function enqueueResources(Group $pools, Database $dbForConsole, callable $getProjectDB): void
{
foreach ($this->schedules as $schedule) {
if (!$schedule['active']) {
@ -41,9 +40,10 @@ class ScheduleMessages extends ScheduleBase
continue;
}
\go(function () use ($schedule, $queue, $dbForConsole) {
$queueForMessaging = new Messaging($queue);
\go(function () use ($schedule, $pools, $dbForConsole) {
$queue = $pools->get('queue')->pop();
$connection = $queue->getResource();
$queueForMessaging = new Messaging($connection);
$queueForMessaging
->setType(MESSAGE_SEND_TYPE_EXTERNAL)
@ -56,6 +56,8 @@ class ScheduleMessages extends ScheduleBase
$schedule['$id'],
);
$queue->reclaim();
unset($this->schedules[$schedule['$internalId']]);
});
}