Merge pull request #9757 from appwrite/feat-pool-adapter

Feat pool adapter
This commit is contained in:
Jake Barnby 2025-05-20 08:25:34 +00:00 committed by GitHub
commit 2f6a82f4d5
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
22 changed files with 331 additions and 395 deletions

View file

@ -12,11 +12,13 @@ use Appwrite\Runtimes\Runtimes;
use Executor\Executor;
use Swoole\Runtime;
use Swoole\Timer;
use Utopia\Cache\Adapter\Pool as CachePool;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\CLI\CLI;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Adapter\Pool as DatabasePool;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Validator\Authorization;
@ -24,6 +26,7 @@ use Utopia\DSN\DSN;
use Utopia\Logger\Log;
use Utopia\Platform\Service;
use Utopia\Pools\Group;
use Utopia\Queue\Broker\Pool as BrokerPool;
use Utopia\Queue\Publisher;
use Utopia\Registry\Registry;
use Utopia\System\System;
@ -46,10 +49,7 @@ CLI::setResource('cache', function ($pools) {
$adapters = [];
foreach ($list as $value) {
$adapters[] = $pools
->get($value)
->pop()
->getResource();
$adapters[] = new CachePool($pools->get($value));
}
return new Cache(new Sharding($adapters));
@ -69,12 +69,8 @@ CLI::setResource('dbForPlatform', function ($pools, $cache) {
$attempts++;
try {
// Prepare database connection
$dbAdapter = $pools
->get('console')
->pop()
->getResource();
$dbForPlatform = new Database($dbAdapter, $cache);
$adapter = new DatabasePool($pools->get('console'));
$dbForPlatform = new Database($adapter, $cache);
$dbForPlatform
->setNamespace('_console')
@ -92,7 +88,6 @@ CLI::setResource('dbForPlatform', function ($pools, $cache) {
$ready = true;
} catch (\Throwable $err) {
Console::warning($err->getMessage());
$pools->get('console')->reclaim();
sleep($sleep);
}
} while ($attempts < $maxAttempts && !$ready);
@ -142,12 +137,8 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform
return $database;
}
$dbAdapter = $pools
->get($dsn->getHost())
->pop()
->getResource();
$database = new Database($dbAdapter, $cache);
$adapter = new DatabasePool($pools->get($dsn->getHost()));
$database = new Database($adapter, $cache);
$databases[$dsn->getHost()] = $database;
$sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
@ -173,21 +164,15 @@ CLI::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform
CLI::setResource('getLogsDB', function (Group $pools, Cache $cache) {
$database = null;
return function (?Document $project = null) use ($pools, $cache, $database) {
if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') {
$database->setTenant($project->getInternalId());
return $database;
}
$dbAdapter = $pools
->get('logs')
->pop()
->getResource();
$database = new Database(
$dbAdapter,
$cache
);
$adapter = new DatabasePool($pools->get('logs'));
$database = new Database($adapter, $cache);
$database
->setSharedTables(true)
@ -211,7 +196,7 @@ CLI::setResource('queueForStatsResources', function (Publisher $publisher) {
return new StatsResources($publisher);
}, ['publisher']);
CLI::setResource('publisher', function (Group $pools) {
return $pools->get('publisher')->pop()->getResource();
return new BrokerPool(publisher: $pools->get('publisher'));
}, ['pools']);
CLI::setResource('queueForFunctions', function (Publisher $publisher) {
return new Func($publisher);

View file

@ -3,13 +3,16 @@
use Appwrite\ClamAV\Network;
use Appwrite\Event\Event;
use Appwrite\Extend\Exception;
use Appwrite\PubSub\Adapter\Pool as PubSubPool;
use Appwrite\SDK\AuthType;
use Appwrite\SDK\ContentType;
use Appwrite\SDK\Method;
use Appwrite\SDK\Response as SDKResponse;
use Appwrite\Utopia\Response;
use Utopia\App;
use Utopia\Cache\Adapter\Pool as CachePool;
use Utopia\Config\Config;
use Utopia\Database\Adapter\Pool as DatabasePool;
use Utopia\Database\Document;
use Utopia\Domains\Validator\PublicDomain;
use Utopia\Pools\Group;
@ -34,8 +37,8 @@ App::get('/v1/health')
namespace: 'health',
group: 'health',
name: 'get',
auth: [AuthType::KEY],
description: '/docs/references/health/get.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -70,11 +73,11 @@ App::get('/v1/health/db')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'health',
name: 'getDB',
description: '/docs/references/health/get-db.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -86,8 +89,8 @@ App::get('/v1/health/db')
->inject('response')
->inject('pools')
->action(function (Response $response, Group $pools) {
$output = [];
$failures = [];
$configs = [
'Console.DB' => Config::getParam('pools-console'),
@ -97,7 +100,7 @@ App::get('/v1/health/db')
foreach ($configs as $key => $config) {
foreach ($config as $database) {
try {
$adapter = $pools->get($database)->pop()->getResource();
$adapter = new DatabasePool($pools->get($database));
$checkStart = \microtime(true);
@ -108,16 +111,16 @@ App::get('/v1/health/db')
'ping' => \round((\microtime(true) - $checkStart) / 1000)
]);
} else {
$failure[] = $database;
$failures[] = $database;
}
} catch (\Throwable $th) {
$failure[] = $database;
} catch (\Throwable) {
$failures[] = $database;
}
}
}
if (!empty($failure)) {
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'DB failure on: ' . implode(", ", $failure));
if (!empty($failures)) {
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'DB failure on: ' . implode(", ", $failures));
}
$response->dynamic(new Document([
@ -131,11 +134,11 @@ App::get('/v1/health/cache')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'health',
name: 'getCache',
description: '/docs/references/health/get-cache.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -147,44 +150,39 @@ App::get('/v1/health/cache')
->inject('response')
->inject('pools')
->action(function (Response $response, Group $pools) {
$output = [];
$failures = [];
$configs = [
'Cache' => Config::getParam('pools-cache'),
];
foreach ($configs as $key => $config) {
foreach ($config as $database) {
foreach ($config as $cache) {
try {
/** @var \Utopia\Cache\Adapter $adapter */
$adapter = $pools->get($database)->pop()->getResource();
$adapter = new CachePool($pools->get($cache));
$checkStart = \microtime(true);
if ($adapter->ping()) {
$output[] = new Document([
'name' => $key . " ($database)",
'name' => $key . " ($cache)",
'status' => 'pass',
'ping' => \round((\microtime(true) - $checkStart) / 1000)
]);
} else {
$output[] = new Document([
'name' => $key . " ($database)",
'status' => 'fail',
'ping' => \round((\microtime(true) - $checkStart) / 1000)
]);
$failures[] = $cache;
}
} catch (\Throwable $th) {
$output[] = new Document([
'name' => $key . " ($database)",
'status' => 'fail',
'ping' => \round((\microtime(true) - $checkStart) / 1000)
]);
} catch (\Throwable) {
$failures[] = $cache;
}
}
}
if (!empty($failures)) {
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Cache failure on: ' . implode(", ", $failures));
}
$response->dynamic(new Document([
'statuses' => $output,
'total' => count($output),
@ -196,11 +194,11 @@ App::get('/v1/health/pubsub')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'health',
name: 'getPubSub',
description: '/docs/references/health/get-pubsub.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -212,44 +210,39 @@ App::get('/v1/health/pubsub')
->inject('response')
->inject('pools')
->action(function (Response $response, Group $pools) {
$output = [];
$failures = [];
$configs = [
'PubSub' => Config::getParam('pools-pubsub'),
];
foreach ($configs as $key => $config) {
foreach ($config as $database) {
foreach ($config as $pubsub) {
try {
/** @var \Appwrite\PubSub\Adapter $adapter */
$adapter = $pools->get($database)->pop()->getResource();
$adapter = new PubSubPool($pools->get($pubsub));
$checkStart = \microtime(true);
if ($adapter->ping()) {
$output[] = new Document([
'name' => $key . " ($database)",
'name' => $key . " ($pubsub)",
'status' => 'pass',
'ping' => \round((\microtime(true) - $checkStart) / 1000)
]);
} else {
$output[] = new Document([
'name' => $key . " ($database)",
'status' => 'fail',
'ping' => \round((\microtime(true) - $checkStart) / 1000)
]);
$failures[] = $pubsub;
}
} catch (\Throwable $th) {
$output[] = new Document([
'name' => $key . " ($database)",
'status' => 'fail',
'ping' => \round((\microtime(true) - $checkStart) / 1000)
]);
} catch (\Throwable) {
$failures[] = $pubsub;
}
}
}
if (!empty($failures)) {
throw new Exception(Exception::GENERAL_SERVER_ERROR, 'Pubsub failure on: ' . implode(", ", $failures));
}
$response->dynamic(new Document([
'statuses' => $output,
'total' => count($output),
@ -261,11 +254,11 @@ App::get('/v1/health/time')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'health',
name: 'getTime',
description: '/docs/references/health/get-time.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -325,11 +318,11 @@ App::get('/v1/health/queue/webhooks')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'queue',
name: 'getQueueWebhooks',
description: '/docs/references/health/get-queue-webhooks.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -351,18 +344,18 @@ App::get('/v1/health/queue/webhooks')
}
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
}, ['response']);
});
App::get('/v1/health/queue/logs')
->desc('Get logs queue')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'queue',
name: 'getQueueLogs',
description: '/docs/references/health/get-queue-logs.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -384,18 +377,18 @@ App::get('/v1/health/queue/logs')
}
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
}, ['response']);
});
App::get('/v1/health/certificate')
->desc('Get the SSL certificate for a domain')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'health',
name: 'getCertificate',
description: '/docs/references/health/get-certificate.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -441,18 +434,18 @@ App::get('/v1/health/certificate')
'validTo' => $certificatePayload['validTo_time_t'],
'signatureTypeSN' => $certificatePayload['signatureTypeSN'],
]), Response::MODEL_HEALTH_CERTIFICATE);
}, ['response']);
});
App::get('/v1/health/queue/certificates')
->desc('Get certificates queue')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'queue',
name: 'getQueueCertificates',
description: '/docs/references/health/get-queue-certificates.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -474,18 +467,18 @@ App::get('/v1/health/queue/certificates')
}
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
}, ['response']);
});
App::get('/v1/health/queue/builds')
->desc('Get builds queue')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'queue',
name: 'getQueueBuilds',
description: '/docs/references/health/get-queue-builds.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -507,18 +500,18 @@ App::get('/v1/health/queue/builds')
}
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
}, ['response']);
});
App::get('/v1/health/queue/databases')
->desc('Get databases queue')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'queue',
name: 'getQueueDatabases',
description: '/docs/references/health/get-queue-databases.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -541,18 +534,18 @@ App::get('/v1/health/queue/databases')
}
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
}, ['response']);
});
App::get('/v1/health/queue/deletes')
->desc('Get deletes queue')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'queue',
name: 'getQueueDeletes',
description: '/docs/references/health/get-queue-deletes.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -574,18 +567,18 @@ App::get('/v1/health/queue/deletes')
}
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
}, ['response']);
});
App::get('/v1/health/queue/mails')
->desc('Get mails queue')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'queue',
name: 'getQueueMails',
description: '/docs/references/health/get-queue-mails.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -607,18 +600,18 @@ App::get('/v1/health/queue/mails')
}
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
}, ['response']);
});
App::get('/v1/health/queue/messaging')
->desc('Get messaging queue')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'queue',
name: 'getQueueMessaging',
description: '/docs/references/health/get-queue-messaging.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -640,18 +633,18 @@ App::get('/v1/health/queue/messaging')
}
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
}, ['response']);
});
App::get('/v1/health/queue/migrations')
->desc('Get migrations queue')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'queue',
name: 'getQueueMigrations',
description: '/docs/references/health/get-queue-migrations.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -673,18 +666,18 @@ App::get('/v1/health/queue/migrations')
}
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
}, ['response']);
});
App::get('/v1/health/queue/functions')
->desc('Get functions queue')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'queue',
name: 'getQueueFunctions',
description: '/docs/references/health/get-queue-functions.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -706,18 +699,18 @@ App::get('/v1/health/queue/functions')
}
$response->dynamic(new Document([ 'size' => $size ]), Response::MODEL_HEALTH_QUEUE);
}, ['response']);
});
App::get('/v1/health/queue/stats-resources')
->desc('Get stats resources queue')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'queue',
name: 'getQueueStatsResources',
description: '/docs/references/health/get-queue-stats-resources.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -746,11 +739,11 @@ App::get('/v1/health/queue/stats-usage')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'queue',
name: 'getQueueUsage',
description: '/docs/references/health/get-queue-stats-usage.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -779,11 +772,11 @@ App::get('/v1/health/storage/local')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'storage',
name: 'getStorageLocal',
description: '/docs/references/health/get-storage-local.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -829,11 +822,11 @@ App::get('/v1/health/storage')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'storage',
name: 'getStorage',
description: '/docs/references/health/get-storage.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -877,11 +870,11 @@ App::get('/v1/health/anti-virus')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'health',
name: 'getAntivirus',
description: '/docs/references/health/get-storage-anti-virus.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,
@ -923,11 +916,11 @@ App::get('/v1/health/queue/failed/:name')
->groups(['api', 'health'])
->label('scope', 'health.read')
->label('sdk', new Method(
auth: [AuthType::KEY],
namespace: 'health',
group: 'queue',
name: 'getFailedJobs',
description: '/docs/references/health/get-failed-queue-jobs.md',
auth: [AuthType::KEY],
responses: [
new SDKResponse(
code: Response::STATUS_CODE_OK,

View file

@ -24,6 +24,7 @@ use Utopia\App;
use Utopia\Audit\Audit;
use Utopia\Cache\Cache;
use Utopia\Config\Config;
use Utopia\Database\Adapter\Pool as DatabasePool;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
@ -223,7 +224,7 @@ App::post('/v1/projects')
$sharedTables = $sharedTablesV1 || $sharedTablesV2;
if (!$sharedTablesV2) {
$adapter = $pools->get($dsn->getHost())->pop()->getResource();
$adapter = new DatabasePool($pools->get($dsn->getHost()));
$dbForProject = new Database($adapter, $cache);
if ($sharedTables) {

View file

@ -14,9 +14,11 @@ use Utopia\App;
use Utopia\Audit\Audit;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Adapter\Pool as DatabasePool;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Exception\Duplicate as DuplicateException;
use Utopia\Database\Helpers\ID;
use Utopia\Database\Helpers\Permission;
use Utopia\Database\Helpers\Role;
@ -167,7 +169,7 @@ function createDatabase(App $app, string $resourceKey, string $dbName, array $co
$sleep = 1;
$attempts = 0;
do {
while (true) {
try {
$attempts++;
$resource = $app->getResource($resourceKey);
@ -176,13 +178,12 @@ function createDatabase(App $app, string $resourceKey, string $dbName, array $co
break; // exit loop on success
} catch (\Exception $e) {
Console::warning(" └── Database not ready. Retrying connection ({$attempts})...");
$pools->reclaim();
if ($attempts >= $max) {
throw new \Exception(' └── Failed to connect to database: ' . $e->getMessage());
}
sleep($sleep);
}
} while ($attempts < $max);
}
Console::success("[Setup] - $dbName database init started...");
@ -318,11 +319,7 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg
$cache = $app->getResource('cache');
foreach ($sharedTablesV2 as $hostname) {
$adapter = $pools
->get($hostname)
->pop()
->getResource();
$adapter = new DatabasePool($pools->get($hostname));
$dbForProject = (new Database($adapter, $cache))
->setDatabase('appwrite')
->setSharedTables(true)
@ -332,7 +329,7 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg
try {
Console::success('[Setup] - Creating project database: ' . $hostname . '...');
$dbForProject->create();
} catch (Duplicate) {
} catch (DuplicateException) {
Console::success('[Setup] - Skip: metadata table already exists');
}
@ -358,7 +355,6 @@ $http->on(Constant::EVENT_START, function (Server $http) use ($payloadSize, $reg
}
}
$pools->reclaim();
Console::success('[Setup] - Server database init completed...');
});
@ -473,6 +469,7 @@ $http->on(Constant::EVENT_REQUEST, function (SwooleRequest $swooleRequest, Swool
Console::error('[Error] Message: ' . $th->getMessage());
Console::error('[Error] File: ' . $th->getFile());
Console::error('[Error] Line: ' . $th->getLine());
Console::error('[Error] Trace: ' . $th->getTraceAsString());
$swooleResponse->setStatusCode(500);
@ -490,8 +487,6 @@ $http->on(Constant::EVENT_REQUEST, function (SwooleRequest $swooleRequest, Swool
];
$swooleResponse->end(\json_encode($output));
} finally {
$pools->reclaim();
}
});

View file

@ -216,13 +216,13 @@ $register->set('pools', function () {
'mysql',
'mariadb' => function () use ($dsnHost, $dsnPort, $dsnUser, $dsnPass, $dsnDatabase) {
return new PDOProxy(function () use ($dsnHost, $dsnPort, $dsnUser, $dsnPass, $dsnDatabase) {
return new PDO("mysql:host={$dsnHost};port={$dsnPort};dbname={$dsnDatabase};charset=utf8mb4", $dsnUser, $dsnPass, array(
return new PDO("mysql:host={$dsnHost};port={$dsnPort};dbname={$dsnDatabase};charset=utf8mb4", $dsnUser, $dsnPass, [
\PDO::ATTR_TIMEOUT => 3, // Seconds
\PDO::ATTR_PERSISTENT => false,
\PDO::ATTR_DEFAULT_FETCH_MODE => \PDO::FETCH_ASSOC,
\PDO::ATTR_EMULATE_PREPARES => true,
\PDO::ATTR_STRINGIFY_FETCHES => true
));
]);
});
},
'redis' => function () use ($dsnHost, $dsnPort, $dsnPass) {

View file

@ -24,10 +24,12 @@ use Appwrite\Utopia\Request;
use Executor\Executor;
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
use Utopia\App;
use Utopia\Cache\Adapter\Pool as CachePool;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Adapter\Pool as DatabasePool;
use Utopia\Database\Database;
use Utopia\Database\Document;
use Utopia\Database\Helpers\ID;
@ -37,6 +39,7 @@ use Utopia\DSN\DSN;
use Utopia\Locale\Locale;
use Utopia\Logger\Log;
use Utopia\Pools\Group;
use Utopia\Queue\Broker\Pool as BrokerPool;
use Utopia\Queue\Publisher;
use Utopia\Storage\Device;
use Utopia\Storage\Device\AWS;
@ -72,10 +75,10 @@ App::setResource('localeCodes', function () {
// Queues
App::setResource('publisher', function (Group $pools) {
return $pools->get('publisher')->pop()->getResource();
return new BrokerPool(publisher: $pools->get('publisher'));
}, ['pools']);
App::setResource('consumer', function (Group $pools) {
return $pools->get('consumer')->pop()->getResource();
return new BrokerPool(consumer: $pools->get('consumer'));
}, ['pools']);
App::setResource('queueForMessaging', function (Publisher $publisher) {
return new Messaging($publisher);
@ -329,12 +332,8 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform
$dsn = new DSN('mysql://' . $project->getAttribute('database'));
}
$dbAdapter = $pools
->get($dsn->getHost())
->pop()
->getResource();
$database = new Database($dbAdapter, $cache);
$adapter = new DatabasePool($pools->get($dsn->getHost()));
$database = new Database($adapter, $cache);
$database
->setMetadata('host', \gethostname())
@ -360,12 +359,8 @@ App::setResource('dbForProject', function (Group $pools, Database $dbForPlatform
}, ['pools', 'dbForPlatform', 'cache', 'project']);
App::setResource('dbForPlatform', function (Group $pools, Cache $cache) {
$dbAdapter = $pools
->get('console')
->pop()
->getResource();
$database = new Database($dbAdapter, $cache);
$adapter = new DatabasePool($pools->get('console'));
$database = new Database($adapter, $cache);
$database
->setNamespace('_console')
@ -378,7 +373,7 @@ App::setResource('dbForPlatform', function (Group $pools, Cache $cache) {
}, ['pools', 'cache']);
App::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform, $cache) {
$databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools
$databases = [];
return function (Document $project) use ($pools, $dbForPlatform, $cache, &$databases) {
if ($project->isEmpty() || $project->getId() === 'console') {
@ -420,12 +415,8 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform
return $database;
}
$dbAdapter = $pools
->get($dsn->getHost())
->pop()
->getResource();
$database = new Database($dbAdapter, $cache);
$adapter = new DatabasePool($pools->get($dsn->getHost()));
$database = new Database($adapter, $cache);
$databases[$dsn->getHost()] = $database;
$configure($database);
@ -435,21 +426,15 @@ App::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform
App::setResource('getLogsDB', function (Group $pools, Cache $cache) {
$database = null;
return function (?Document $project = null) use ($pools, $cache, $database) {
return function (?Document $project = null) use ($pools, $cache, &$database) {
if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') {
$database->setTenant($project->getInternalId());
return $database;
}
$dbAdapter = $pools
->get('logs')
->pop()
->getResource();
$database = new Database(
$dbAdapter,
$cache
);
$adapter = new DatabasePool($pools->get('logs'));
$database = new Database($adapter, $cache);
$database
->setSharedTables(true)
@ -473,10 +458,7 @@ App::setResource('cache', function (Group $pools, Telemetry $telemetry) {
$adapters = [];
foreach ($list as $value) {
$adapters[] = $pools
->get($value)
->pop()
->getResource();
$adapters[] = new CachePool($pools->get($value));
}
$cache = new Cache(new Sharding($adapters));

View file

@ -5,6 +5,7 @@ use Appwrite\Extend\Exception;
use Appwrite\Extend\Exception as AppwriteException;
use Appwrite\Messaging\Adapter\Realtime;
use Appwrite\Network\Validator\Origin;
use Appwrite\PubSub\Adapter\Pool as PubSubPool;
use Appwrite\Utopia\Request;
use Appwrite\Utopia\Response;
use Swoole\Http\Request as SwooleRequest;
@ -15,10 +16,12 @@ use Swoole\Timer;
use Utopia\Abuse\Abuse;
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
use Utopia\App;
use Utopia\Cache\Adapter\Pool as CachePool;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Adapter\Pool as DatabasePool;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
@ -28,13 +31,15 @@ use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
use Utopia\DSN\DSN;
use Utopia\Logger\Log;
use Utopia\Pools\Group;
use Utopia\Registry\Registry;
use Utopia\System\System;
use Utopia\Telemetry\Adapter\None as NoTelemetry;
use Utopia\WebSocket\Adapter;
use Utopia\WebSocket\Server;
/**
* @var \Utopia\Registry\Registry $register
* @var Registry $register
*/
require_once __DIR__ . '/init.php';
@ -46,17 +51,17 @@ if (!function_exists('getConsoleDB')) {
{
global $register;
/** @var \Utopia\Pools\Group $pools */
static $database = null;
if ($database !== null) {
return $database;
}
/** @var Group $pools */
$pools = $register->get('pools');
$dbAdapter = $pools
->get('console')
->pop()
->getResource()
;
$database = new Database($dbAdapter, getCache());
$adapter = new DatabasePool($pools->get('console'));
$database = new Database($adapter, getCache());
$database
->setNamespace('_console')
->setMetadata('host', \gethostname())
@ -72,7 +77,13 @@ if (!function_exists('getProjectDB')) {
{
global $register;
/** @var \Utopia\Pools\Group $pools */
static $databases = [];
if (isset($databases[$project->getInternalId()])) {
return $databases[$project->getInternalId()];
}
/** @var Group $pools */
$pools = $register->get('pools');
if ($project->isEmpty() || $project->getId() === 'console') {
@ -86,11 +97,7 @@ if (!function_exists('getProjectDB')) {
$dsn = new DSN('mysql://' . $project->getAttribute('database'));
}
$adapter = $pools
->get($dsn->getHost())
->pop()
->getResource();
$adapter = new DatabasePool($pools->get($dsn->getHost()));
$database = new Database($adapter, getCache());
$sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
@ -111,7 +118,7 @@ if (!function_exists('getProjectDB')) {
->setMetadata('host', \gethostname())
->setMetadata('project', $project->getId());
return $database;
return $databases[$project->getInternalId()] = $database;
}
}
@ -121,20 +128,22 @@ if (!function_exists('getCache')) {
{
global $register;
$pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */
static $cache = null;
if ($cache !== null) {
return $cache;
}
$pools = $register->get('pools'); /** @var Group $pools */
$list = Config::getParam('pools-cache', []);
$adapters = [];
foreach ($list as $value) {
$adapters[] = $pools
->get($value)
->pop()
->getResource()
;
$adapters[] = new CachePool($pools->get($value));
}
return new Cache(new Sharding($adapters));
return $cache = new Cache(new Sharding($adapters));
}
}
@ -142,6 +151,12 @@ if (!function_exists('getCache')) {
if (!function_exists('getRedis')) {
function getRedis(): \Redis
{
static $redis = null;
if ($redis !== null) {
return $redis;
}
$host = System::getEnv('_APP_REDIS_HOST', 'localhost');
$port = System::getEnv('_APP_REDIS_PORT', 6379);
$pass = System::getEnv('_APP_REDIS_PASS', '');
@ -160,21 +175,39 @@ if (!function_exists('getRedis')) {
if (!function_exists('getTimelimit')) {
function getTimelimit(): TimeLimitRedis
{
return new TimeLimitRedis("", 0, 1, getRedis());
static $timelimit = null;
if ($timelimit !== null) {
return $timelimit;
}
return $timelimit = new TimeLimitRedis("", 0, 1, getRedis());
}
}
if (!function_exists('getRealtime')) {
function getRealtime(): Realtime
{
return new Realtime();
static $realtime = null;
if ($realtime !== null) {
return $realtime;
}
return $realtime = new Realtime();
}
}
if (!function_exists('getTelemetry')) {
function getTelemetry(int $workerId): Utopia\Telemetry\Adapter
{
return new NoTelemetry();
static $telemetry = null;
if ($telemetry !== null) {
return $telemetry;
}
return $telemetry = new NoTelemetry();
}
}
@ -273,7 +306,6 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
sleep(DATABASE_RECONNECT_SLEEP);
}
} while (true);
$register->get('pools')->reclaim();
});
/**
@ -299,9 +331,7 @@ $server->onStart(function () use ($stats, $register, $containerId, &$statsDocume
Authorization::skip(fn () => $database->updateDocument('realtime', $statsDocument->getId(), $statsDocument));
} catch (Throwable $th) {
call_user_func($logError, $th, "updateWorkerDocument");
} finally {
$register->get('pools')->reclaim();
$logError($th, "updateWorkerDocument");
}
});
}
@ -370,8 +400,6 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
'data' => $event['data']
]));
}
$register->get('pools')->reclaim();
}
}
/**
@ -407,8 +435,8 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
}
$start = time();
/** @var \Appwrite\PubSub\Adapter $pubsub */
$pubsub = $register->get('pools')->get('pubsub')->pop()->getResource();
$pubsub = new PubSubPool($register->get('pools')->get('pubsub'));
if ($pubsub->ping(true)) {
$attempts = 0;
Console::success('Pub/sub connection established (worker: ' . $workerId . ')');
@ -436,8 +464,6 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
$realtime->unsubscribe($connection);
$realtime->subscribe($projectId, $connection, $roles, $channels);
$register->get('pools')->reclaim();
}
}
@ -463,14 +489,12 @@ $server->onWorkerStart(function (int $workerId) use ($server, $register, $stats,
}
});
} catch (Throwable $th) {
call_user_func($logError, $th, "pubSubConnection");
$logError($th, "pubSubConnection");
Console::error('Pub/sub error: ' . $th->getMessage());
$attempts++;
sleep(DATABASE_RECONNECT_SLEEP);
continue;
} finally {
$register->get('pools')->reclaim();
}
}
@ -572,7 +596,7 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
$stats->incr($project->getId(), 'connections');
$stats->incr($project->getId(), 'connectionsTotal');
} catch (Throwable $th) {
call_user_func($logError, $th, "initServer");
$logError($th, "initServer");
// Handle SQL error code is 'HY000'
$code = $th->getCode();
@ -596,8 +620,6 @@ $server->onOpen(function (int $connection, SwooleRequest $request) use ($server,
Console::error('[Error] Code: ' . $response['data']['code']);
Console::error('[Error] Message: ' . $response['data']['message']);
}
} finally {
$register->get('pools')->reclaim();
}
});
@ -696,8 +718,6 @@ $server->onMessage(function (int $connection, string $message) use ($server, $re
if ($th->getCode() === 1008) {
$server->close($connection, $th->getCode());
}
} finally {
$register->get('pools')->reclaim();
}
});

View file

@ -20,10 +20,12 @@ use Appwrite\Platform\Appwrite;
use Executor\Executor;
use Swoole\Runtime;
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
use Utopia\Cache\Adapter\Pool as CachePool;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Adapter\Pool as DatabasePool;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
@ -33,6 +35,7 @@ use Utopia\Logger\Log;
use Utopia\Logger\Logger;
use Utopia\Platform\Service;
use Utopia\Pools\Group;
use Utopia\Queue\Broker\Pool as BrokerPool;
use Utopia\Queue\Message;
use Utopia\Queue\Publisher;
use Utopia\Queue\Server;
@ -40,21 +43,17 @@ use Utopia\Registry\Registry;
use Utopia\System\System;
Authorization::disable();
Runtime::enableCoroutine(SWOOLE_HOOK_ALL);
Runtime::enableCoroutine();
Server::setResource('register', fn () => $register);
Server::setResource('dbForPlatform', function (Cache $cache, Registry $register) {
$pools = $register->get('pools');
$database = $pools
->get('console')
->pop()
->getResource();
$adapter = new DatabasePool($pools->get('console'));
$dbForPlatform = new Database($adapter, $cache);
$dbForPlatform->setNamespace('_console');
$adapter = new Database($database, $cache);
$adapter->setNamespace('_console');
return $adapter;
return $dbForPlatform;
}, ['cache', 'register']);
Server::setResource('project', function (Message $message, Database $dbForPlatform) {
@ -82,20 +81,9 @@ Server::setResource('dbForProject', function (Cache $cache, Registry $register,
$dsn = new DSN('mysql://' . $project->getAttribute('database'));
}
$adapter = $pools
->get($dsn->getHost())
->pop()
->getResource();
$adapter = new DatabasePool($pools->get($dsn->getHost()));
$database = new Database($adapter, $cache);
try {
$dsn = new DSN($project->getAttribute('database'));
} catch (\InvalidArgumentException) {
// TODO: Temporary until all projects are using shared tables
$dsn = new DSN('mysql://' . $project->getAttribute('database'));
}
$sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
if (\in_array($dsn->getHost(), $sharedTables)) {
@ -150,12 +138,8 @@ Server::setResource('getProjectDB', function (Group $pools, Database $dbForPlatf
return $database;
}
$dbAdapter = $pools
->get($dsn->getHost())
->pop()
->getResource();
$database = new Database($dbAdapter, $cache);
$adapter = new DatabasePool($pools->get($dsn->getHost()));
$database = new Database($adapter, $cache);
$databases[$dsn->getHost()] = $database;
@ -187,15 +171,8 @@ Server::setResource('getLogsDB', function (Group $pools, Cache $cache) {
return $database;
}
$dbAdapter = $pools
->get('logs')
->pop()
->getResource();
$database = new Database(
$dbAdapter,
$cache
);
$adapter = new DatabasePool($pools->get('logs'));
$database = new Database($adapter, $cache);
$database
->setSharedTables(true)
@ -233,11 +210,7 @@ Server::setResource('cache', function (Registry $register) {
$adapters = [];
foreach ($list as $value) {
$adapters[] = $pools
->get($value)
->pop()
->getResource()
;
$adapters[] = new CachePool($pools->get($value));
}
return new Cache(new Sharding($adapters));
@ -267,11 +240,11 @@ Server::setResource('timelimit', function (\Redis $redis) {
Server::setResource('log', fn () => new Log());
Server::setResource('publisher', function (Group $pools) {
return $pools->get('publisher')->pop()->getResource();
return new BrokerPool(publisher: $pools->get('publisher'));
}, ['pools']);
Server::setResource('consumer', function (Group $pools) {
return $pools->get('consumer')->pop()->getResource();
return new BrokerPool(consumer: $pools->get('consumer'));
}, ['pools']);
Server::setResource('queueForStatsUsage', function (Publisher $publisher) {
@ -448,13 +421,6 @@ try {
$worker = $platform->getWorker();
$worker
->shutdown()
->inject('pools')
->action(function (Group $pools) {
$pools->reclaim();
});
$worker
->error()
->inject('error')
@ -462,8 +428,7 @@ $worker
->inject('log')
->inject('pools')
->inject('project')
->action(function (Throwable $error, ?Logger $logger, Log $log, Group $pools, Document $project) use ($queueName) {
$pools->reclaim();
->action(function (Throwable $error, ?Logger $logger, Log $log, Group $pools, Document $project) use ($worker, $queueName) {
$version = System::getEnv('_APP_VERSION', 'UNKNOWN');
if ($logger) {

View file

@ -65,7 +65,7 @@
"utopia-php/platform": "0.7.*",
"utopia-php/pools": "0.8.*",
"utopia-php/preloader": "0.2.*",
"utopia-php/queue": "0.9.*",
"utopia-php/queue": "0.10.*",
"utopia-php/registry": "0.5.*",
"utopia-php/storage": "0.18.*",
"utopia-php/swoole": "0.8.*",

69
composer.lock generated
View file

@ -4,7 +4,7 @@
"Read more about it at https://getcomposer.org/doc/01-basic-usage.md#installing-dependencies",
"This file is @generated automatically"
],
"content-hash": "2c14e20244a06f508dd67cda717aefeb",
"content-hash": "63feb1a7cf4cfa2cc7fa0870236e61ea",
"packages": [
{
"name": "adhocore/jwt",
@ -3499,16 +3499,16 @@
},
{
"name": "utopia-php/database",
"version": "0.69.2",
"version": "0.69.5",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/database.git",
"reference": "60591ab073bb80bb9843338754b679bb8169e4ed"
"reference": "4abe53609dfc23b2ea82884d12b149df6a8af2f5"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/database/zipball/60591ab073bb80bb9843338754b679bb8169e4ed",
"reference": "60591ab073bb80bb9843338754b679bb8169e4ed",
"url": "https://api.github.com/repos/utopia-php/database/zipball/4abe53609dfc23b2ea82884d12b149df6a8af2f5",
"reference": "4abe53609dfc23b2ea82884d12b149df6a8af2f5",
"shasum": ""
},
"require": {
@ -3549,9 +3549,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/database/issues",
"source": "https://github.com/utopia-php/database/tree/0.69.2"
"source": "https://github.com/utopia-php/database/tree/0.69.5"
},
"time": "2025-05-14T07:51:44+00:00"
"time": "2025-05-17T08:01:51+00:00"
},
{
"name": "utopia-php/domains",
@ -3701,16 +3701,16 @@
},
{
"name": "utopia-php/framework",
"version": "0.33.19",
"version": "0.33.20",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/http.git",
"reference": "64c7b7bb8a8595ffe875fa8d4b7705684dbf46c0"
"reference": "e1c7ab4e0b5b0a9a70256b1e00912e101e76a131"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/http/zipball/64c7b7bb8a8595ffe875fa8d4b7705684dbf46c0",
"reference": "64c7b7bb8a8595ffe875fa8d4b7705684dbf46c0",
"url": "https://api.github.com/repos/utopia-php/http/zipball/e1c7ab4e0b5b0a9a70256b1e00912e101e76a131",
"reference": "e1c7ab4e0b5b0a9a70256b1e00912e101e76a131",
"shasum": ""
},
"require": {
@ -3742,9 +3742,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/http/issues",
"source": "https://github.com/utopia-php/http/tree/0.33.19"
"source": "https://github.com/utopia-php/http/tree/0.33.20"
},
"time": "2025-03-06T11:37:49+00:00"
"time": "2025-05-18T23:51:21+00:00"
},
{
"name": "utopia-php/image",
@ -4059,16 +4059,16 @@
},
{
"name": "utopia-php/platform",
"version": "0.7.4",
"version": "0.7.6",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/platform.git",
"reference": "a5b93d8177702ec458c3af9137663133c012b71b"
"reference": "6bc7fbb43ec2b7f9ee5bdef5d4b5e4a81860950b"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/platform/zipball/a5b93d8177702ec458c3af9137663133c012b71b",
"reference": "a5b93d8177702ec458c3af9137663133c012b71b",
"url": "https://api.github.com/repos/utopia-php/platform/zipball/6bc7fbb43ec2b7f9ee5bdef5d4b5e4a81860950b",
"reference": "6bc7fbb43ec2b7f9ee5bdef5d4b5e4a81860950b",
"shasum": ""
},
"require": {
@ -4077,11 +4077,11 @@
"php": ">=8.0",
"utopia-php/cli": "0.15.*",
"utopia-php/framework": "0.33.*",
"utopia-php/queue": "0.9.*"
"utopia-php/queue": "0.10.*"
},
"require-dev": {
"laravel/pint": "1.2.*",
"phpunit/phpunit": "^9.3"
"laravel/pint": "1.*",
"phpunit/phpunit": "9.*"
},
"type": "library",
"autoload": {
@ -4103,9 +4103,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/platform/issues",
"source": "https://github.com/utopia-php/platform/tree/0.7.4"
"source": "https://github.com/utopia-php/platform/tree/0.7.6"
},
"time": "2025-03-13T13:00:12+00:00"
"time": "2025-05-18T20:31:24+00:00"
},
{
"name": "utopia-php/pools",
@ -4214,16 +4214,16 @@
},
{
"name": "utopia-php/queue",
"version": "0.9.1",
"version": "0.10.0",
"source": {
"type": "git",
"url": "https://github.com/utopia-php/queue.git",
"reference": "32b6f84c55aae761db5a5ae76cc91ca8dbc8bc32"
"reference": "0eccc559168ea72241c39a4c482d868314666be1"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/utopia-php/queue/zipball/32b6f84c55aae761db5a5ae76cc91ca8dbc8bc32",
"reference": "32b6f84c55aae761db5a5ae76cc91ca8dbc8bc32",
"url": "https://api.github.com/repos/utopia-php/queue/zipball/0eccc559168ea72241c39a4c482d868314666be1",
"reference": "0eccc559168ea72241c39a4c482d868314666be1",
"shasum": ""
},
"require": {
@ -4232,6 +4232,7 @@
"utopia-php/cli": "0.15.*",
"utopia-php/fetch": "0.4.*",
"utopia-php/framework": "0.33.*",
"utopia-php/pools": "0.8.*",
"utopia-php/telemetry": "0.1.*"
},
"require-dev": {
@ -4273,9 +4274,9 @@
],
"support": {
"issues": "https://github.com/utopia-php/queue/issues",
"source": "https://github.com/utopia-php/queue/tree/0.9.1"
"source": "https://github.com/utopia-php/queue/tree/0.10.0"
},
"time": "2025-03-28T19:49:36+00:00"
"time": "2025-04-17T12:15:52+00:00"
},
{
"name": "utopia-php/registry",
@ -4770,16 +4771,16 @@
"packages-dev": [
{
"name": "appwrite/sdk-generator",
"version": "0.40.16",
"version": "0.40.17",
"source": {
"type": "git",
"url": "https://github.com/appwrite/sdk-generator.git",
"reference": "f1f506da74033f0cb5a11e3dffcfd1ee8daf237d"
"reference": "7e333c1003bfd4763e4d6f3a0a799fde5e7bc4de"
},
"dist": {
"type": "zip",
"url": "https://api.github.com/repos/appwrite/sdk-generator/zipball/f1f506da74033f0cb5a11e3dffcfd1ee8daf237d",
"reference": "f1f506da74033f0cb5a11e3dffcfd1ee8daf237d",
"url": "https://api.github.com/repos/appwrite/sdk-generator/zipball/7e333c1003bfd4763e4d6f3a0a799fde5e7bc4de",
"reference": "7e333c1003bfd4763e4d6f3a0a799fde5e7bc4de",
"shasum": ""
},
"require": {
@ -4815,9 +4816,9 @@
"description": "Appwrite PHP library for generating API SDKs for multiple programming languages and platforms",
"support": {
"issues": "https://github.com/appwrite/sdk-generator/issues",
"source": "https://github.com/appwrite/sdk-generator/tree/0.40.16"
"source": "https://github.com/appwrite/sdk-generator/tree/0.40.17"
},
"time": "2025-05-09T12:06:09+00:00"
"time": "2025-05-16T15:10:54+00:00"
},
{
"name": "doctrine/annotations",

View file

@ -286,13 +286,6 @@ class Event
return $this;
}
public function setParamSensitive(string $key): self
{
$this->sensitive[$key] = true;
return $this;
}
/**
* Get param of event.
*

View file

@ -2,14 +2,14 @@
namespace Appwrite\Messaging\Adapter;
use Appwrite\Messaging\Adapter;
use Appwrite\Messaging\Adapter as MessagingAdapter;
use Appwrite\PubSub\Adapter\Pool as PubSubPool;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Helpers\ID;
use Utopia\Database\Helpers\Role;
use Utopia\Pools\Pool;
class Realtime extends Adapter
class Realtime extends MessagingAdapter
{
/**
* Connection Tree
@ -36,12 +36,12 @@ class Realtime extends Adapter
*/
public array $subscriptions = [];
private Pool $pubsubPool;
private PubSubPool $pubSubPool;
public function __construct()
{
global $register;
$this->pubsubPool = $register->get('pools')->get('pubsub');
$this->pubSubPool = new PubSubPool($register->get('pools')->get('pubsub'));
}
/**
@ -132,11 +132,12 @@ class Realtime extends Adapter
* Sends an event to the Realtime Server
* @param string $projectId
* @param array $payload
* @param string $event
* @param array $events
* @param array $channels
* @param array $roles
* @param array $options
* @return void
* @throws \Exception
*/
public function send(string $projectId, array $payload, array $events, array $channels, array $roles, array $options = []): void
{
@ -147,7 +148,7 @@ class Realtime extends Adapter
$permissionsChanged = array_key_exists('permissionsChanged', $options) && $options['permissionsChanged'];
$userId = array_key_exists('userId', $options) ? $options['userId'] : null;
$message = [
$this->pubSubPool->publish('realtime', json_encode([
'project' => $projectId,
'roles' => $roles,
'permissionsChanged' => $permissionsChanged,
@ -158,9 +159,7 @@ class Realtime extends Adapter
'timestamp' => DateTime::formatTz(DateTime::now()),
'payload' => $payload
]
];
$this->pubsubPool->use(fn (\Appwrite\PubSub\Adapter $pubsub) => $pubsub->publish('realtime', json_encode($message)));
]));
}
/**
@ -175,8 +174,9 @@ class Realtime extends Adapter
* - 1,121.328 ms (±0.84%) | 1,000,000 Connections / 10,000,000 Subscriptions
*
* @param array $event
* @return int[]|string[]
*/
public function getSubscribers(array $event)
public function getSubscribers(array $event): array
{
$receivers = [];
@ -230,7 +230,7 @@ class Realtime extends Adapter
foreach ($channels as $key => $value) {
switch (true) {
case strpos($key, 'account.') === 0:
case \str_starts_with($key, 'account.'):
unset($channels[$key]);
break;
@ -272,6 +272,7 @@ class Realtime extends Adapter
$channels[] = 'account.' . $parts[1];
$roles = [Role::user(ID::custom($parts[1]))->toString()];
break;
case 'migrations':
case 'rules':
$channels[] = 'console';
$channels[] = 'projects.' . $project->getId();
@ -352,12 +353,6 @@ class Realtime extends Adapter
$roles = [Role::team($project->getAttribute('teamId'))->toString()];
}
break;
case 'migrations':
$channels[] = 'console';
$channels[] = 'projects.' . $project->getId();
$projectId = 'console';
$roles = [Role::team($project->getAttribute('teamId'))->toString()];
break;
}

View file

@ -3,14 +3,19 @@
namespace Appwrite\Platform\Tasks;
use Appwrite\ClamAV\Network;
use Appwrite\PubSub\Adapter;
use Appwrite\PubSub\Adapter\Pool as PubSubPool;
use PHPMailer\PHPMailer\PHPMailer;
use Utopia\App;
use Utopia\Cache\Adapter\Pool as CachePool;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Adapter\Pool as DatabasePool;
use Utopia\Domains\Domain;
use Utopia\DSN\DSN;
use Utopia\Logger\Logger;
use Utopia\Platform\Action;
use Utopia\Pools\Group;
use Utopia\Queue\Broker\Pool as BrokerPool;
use Utopia\Registry\Registry;
use Utopia\Storage\Device\Local;
use Utopia\Storage\Storage;
@ -76,9 +81,9 @@ class Doctor extends Action
Console::log('🟢 Abuse protection is enabled');
}
$authWhitelistRoot = System::getEnv('_APP_CONSOLE_WHITELIST_ROOT', null);
$authWhitelistEmails = System::getEnv('_APP_CONSOLE_WHITELIST_EMAILS', null);
$authWhitelistIPs = System::getEnv('_APP_CONSOLE_WHITELIST_IPS', null);
$authWhitelistRoot = System::getEnv('_APP_CONSOLE_WHITELIST_ROOT');
$authWhitelistEmails = System::getEnv('_APP_CONSOLE_WHITELIST_EMAILS');
$authWhitelistIPs = System::getEnv('_APP_CONSOLE_WHITELIST_IPS');
if (
empty($authWhitelistRoot)
@ -114,19 +119,16 @@ class Doctor extends Action
} else {
Console::log('🟢 Logging adapter is enabled (' . $providerName . ')');
}
} catch (\Throwable $th) {
} catch (\Throwable) {
Console::log('🔴 Logging adapter is misconfigured');
}
\usleep(200 * 1000); // Sleep for 0.2 seconds
try {
Console::log("\n" . '[Connectivity]');
} catch (\Throwable $th) {
//throw $th;
}
Console::log("\n" . '[Connectivity]');
$pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */
/** @var Group $pools */
$pools = $register->get('pools');
$configs = [
'Console.DB' => Config::getParam('pools-console'),
@ -136,20 +138,22 @@ class Doctor extends Action
foreach ($configs as $key => $config) {
foreach ($config as $database) {
try {
$adapter = $pools->get($database)->pop()->getResource();
$adapter = new DatabasePool($pools->get($database));
if ($adapter->ping()) {
Console::success('🟢 ' . str_pad("{$key}({$database})", 50, '.') . 'connected');
} else {
Console::error('🔴 ' . str_pad("{$key}({$database})", 47, '.') . 'disconnected');
}
} catch (\Throwable $th) {
} catch (\Throwable) {
Console::error('🔴 ' . str_pad("{$key}.({$database})", 47, '.') . 'disconnected');
}
}
}
$pools = $register->get('pools'); /** @var \Utopia\Pools\Group $pools */
/** @var Group $pools */
$pools = $register->get('pools');
$configs = [
'Cache' => Config::getParam('pools-cache'),
'Queue' => Config::getParam('pools-queue'),
@ -159,15 +163,18 @@ class Doctor extends Action
foreach ($configs as $key => $config) {
foreach ($config as $pool) {
try {
/** @var Adapter $adapter */
$adapter = $pools->get($pool)->pop()->getResource();
$adapter = match($key) {
'Cache' => new CachePool($pools->get($pool)),
'Queue' => new BrokerPool($pools->get($pool)),
'PubSub' => new PubSubPool($pools->get($pool)),
};
if ($adapter->ping()) {
Console::success('🟢 ' . str_pad("{$key}({$pool})", 50, '.') . 'connected');
} else {
Console::error('🔴 ' . str_pad("{$key}({$pool})", 47, '.') . 'disconnected');
}
} catch (\Throwable $th) {
} catch (\Throwable) {
Console::error('🔴 ' . str_pad("{$key}({$pool})", 47, '.') . 'disconnected');
}
}
@ -185,13 +192,14 @@ class Doctor extends Action
} else {
Console::error('🔴 ' . str_pad("Antivirus", 47, '.') . 'disconnected');
}
} catch (\Throwable $th) {
} catch (\Throwable) {
Console::error('🔴 ' . str_pad("Antivirus", 47, '.') . 'disconnected');
}
}
try {
$mail = $register->get('smtp'); /* @var $mail \PHPMailer\PHPMailer\PHPMailer */
/* @var PHPMailer $mail */
$mail = $register->get('smtp');
$mail->addAddress('demo@example.com', 'Example.com');
$mail->Subject = 'Test SMTP Connection';
@ -200,7 +208,7 @@ class Doctor extends Action
$mail->send();
Console::success('🟢 ' . str_pad("SMTP", 50, '.') . 'connected');
} catch (\Throwable $th) {
} catch (\Throwable) {
Console::error('🔴 ' . str_pad("SMTP", 47, '.') . 'disconnected');
}
@ -274,7 +282,7 @@ class Doctor extends Action
Console::error('Failed to check for a newer version' . "\n");
}
}
} catch (\Throwable $th) {
} catch (\Throwable) {
Console::error('Failed to check for a newer version' . "\n");
}
}

View file

@ -12,6 +12,7 @@ use Utopia\Database\Query;
use Utopia\Database\Validator\Authorization;
use Utopia\Platform\Action;
use Utopia\Pools\Group;
use Utopia\Queue\Broker\Pool as BrokerPool;
use Utopia\System\System;
use Utopia\Telemetry\Adapter as Telemetry;
use Utopia\Telemetry\Gauge;
@ -24,6 +25,8 @@ abstract class ScheduleBase extends Action
protected array $schedules = [];
protected BrokerPool $publisher;
private ?Histogram $collectSchedulesTelemetryDuration = null;
private ?Gauge $collectSchedulesTelemetryCount = null;
private ?Gauge $scheduleTelemetryCount = null;
@ -68,6 +71,7 @@ abstract class ScheduleBase extends Action
Console::title(\ucfirst(static::getSupportedResource()) . ' scheduler V1');
Console::success(APP_NAME . ' ' . \ucfirst(static::getSupportedResource()) . ' scheduler v1 has started');
$this->publisher = new BrokerPool($pools->get('publisher'));
$this->scheduleTelemetryCount = $telemetry->createGauge('task.schedule.count');
$this->collectSchedulesTelemetryDuration = $telemetry->createHistogram('task.schedule.collect_schedules.duration', 's');
$this->collectSchedulesTelemetryCount = $telemetry->createGauge('task.schedule.collect_schedules.count');
@ -119,8 +123,6 @@ abstract class ScheduleBase extends Action
$schedule->getAttribute('resourceId')
);
$pools->reclaim();
return [
'$internalId' => $schedule->getInternalId(),
'$id' => $schedule->getId(),

View file

@ -29,9 +29,7 @@ class ScheduleExecutions extends ScheduleBase
protected function enqueueResources(Group $pools, Database $dbForPlatform, callable $getProjectDB): void
{
$queue = $pools->get('publisher')->pop();
$connection = $queue->getResource();
$queueForFunctions = new Func($connection);
$queueForFunctions = new Func($this->publisher);
$intervalEnd = (new \DateTime())->modify('+' . self::ENQUEUE_TIMER . ' seconds');
foreach ($this->schedules as $schedule) {
@ -85,7 +83,5 @@ class ScheduleExecutions extends ScheduleBase
unset($this->schedules[$schedule['$internalId']]);
}
$queue->reclaim();
}
}

View file

@ -79,9 +79,6 @@ class ScheduleFunctions extends ScheduleBase
\go(function () use ($delay, $schedules, $pools, $dbForPlatform) {
\sleep($delay); // in seconds
$queue = $pools->get('publisher')->pop();
$connection = $queue->getResource();
foreach ($schedules as $delayConfig) {
$scheduleKey = $delayConfig['key'];
// Ensure schedule was not deleted
@ -93,7 +90,7 @@ class ScheduleFunctions extends ScheduleBase
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
$queueForFunctions = new Func($connection);
$queueForFunctions = new Func($this->publisher);
$queueForFunctions
->setType('schedule')
@ -105,8 +102,6 @@ class ScheduleFunctions extends ScheduleBase
$this->recordEnqueueDelay($delayConfig['nextDate']);
}
$queue->reclaim();
});
}

View file

@ -41,9 +41,7 @@ class ScheduleMessages extends ScheduleBase
}
\go(function () use ($schedule, $scheduledAt, $pools, $dbForPlatform) {
$queue = $pools->get('publisher')->pop();
$connection = $queue->getResource();
$queueForMessaging = new Messaging($connection);
$queueForMessaging = new Messaging($this->publisher);
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
@ -58,7 +56,6 @@ class ScheduleMessages extends ScheduleBase
$schedule['$id'],
);
$queue->reclaim();
$this->recordEnqueueDelay($scheduledAt);
unset($this->schedules[$schedule['$internalId']]);
});

View file

@ -61,10 +61,7 @@ class Deletes extends Action
->inject('executionRetention')
->inject('auditRetention')
->inject('log')
->callback(
fn ($message, Document $project, Database $dbForPlatform, callable $getProjectDB, callable $getLogsDB, Device $deviceForFiles, Device $deviceForFunctions, Device $deviceForBuilds, Device $deviceForCache, CertificatesAdapter $certificates, Executor $executor, string $executionRetention, string $auditRetention, Log $log) =>
$this->action($message, $project, $dbForPlatform, $getProjectDB, $getLogsDB, $deviceForFiles, $deviceForFunctions, $deviceForBuilds, $deviceForCache, $certificates, $executor, $executionRetention, $auditRetention, $log)
);
->callback($this->action(...));
}
/**

View file

@ -325,7 +325,7 @@ class StatsUsage extends Action
break;
}
} catch (Throwable $e) {
console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}");
Console::error("[reducer] " . " {DateTime::now()} " . " {$project->getInternalId()} " . " {$e->getMessage()}");
}
}
@ -344,7 +344,7 @@ class StatsUsage extends Action
continue;
}
console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys);
Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys);
try {
foreach ($stats['keys'] ?? [] as $key => $value) {
@ -381,7 +381,7 @@ class StatsUsage extends Action
}
}
} catch (Exception $e) {
console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage());
Console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage());
}
}
@ -405,7 +405,7 @@ class StatsUsage extends Action
}
protected function prepareForLogsDB(Document $project, Document $stat)
protected function prepareForLogsDB(Document $project, Document $stat): void
{
if (System::getEnv('_APP_STATS_USAGE_DUAL_WRITING', 'disabled') === 'disabled') {
return;
@ -430,8 +430,7 @@ class StatsUsage extends Action
return;
}
$dbForLogs = call_user_func($this->getLogsDB);
$dbForLogs
$dbForLogs = ($this->getLogsDB)()
->setTenant(null)
->setTenantPerDocument(true);
@ -446,6 +445,5 @@ class StatsUsage extends Action
} catch (Throwable $th) {
Console::error($th->getMessage());
}
$this->register->get('pools')->get('logs')->reclaim();
}
}

View file

@ -70,9 +70,9 @@ class StatsUsageDump extends Action
];
/**
* @var callable
* @var callable(Document): Database
*/
protected mixed $getLogsDB;
protected $getLogsDB;
protected array $periods = [
'1h' => 'Y-m-d H:00',
@ -126,10 +126,10 @@ class StatsUsageDump extends Action
continue;
}
console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys);
Console::log('['.DateTime::now().'] Id: '.$project->getId(). ' InternalId: '.$project->getInternalId(). ' Db: '.$project->getAttribute('database').' ReceivedAt: '.$receivedAt. ' Keys: '.$numberOfKeys);
try {
/** @var \Utopia\Database\Database $dbForProject */
/** @var Database $dbForProject */
$dbForProject = $getProjectDB($project);
foreach ($stats['keys'] ?? [] as $key => $value) {
if ($value == 0) {
@ -169,7 +169,7 @@ class StatsUsageDump extends Action
}
}
} catch (\Exception $e) {
console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage());
Console::error('[' . DateTime::now() . '] project [' . $project->getInternalId() . '] database [' . $project['database'] . '] ' . ' ' . $e->getMessage());
}
}
}
@ -190,8 +190,7 @@ class StatsUsageDump extends Action
}
}
/** @var \Utopia\Database\Database $dbForLogs*/
$dbForLogs = call_user_func($this->getLogsDB, $project);
$dbForLogs = ($this->getLogsDB)($project);
try {
$dbForLogs->createOrUpdateDocumentsWithIncrease(
@ -203,7 +202,5 @@ class StatsUsageDump extends Action
} catch (\Throwable $th) {
Console::error($th->getMessage());
}
$this->register->get('pools')->get('logs')->reclaim();
}
}

View file

@ -0,0 +1,46 @@
<?php
namespace Appwrite\PubSub\Adapter;
use Appwrite\PubSub\Adapter;
use Utopia\Database\Exception as DatabaseException;
use Utopia\Pools\Pool as UtopiaPool;
class Pool implements Adapter
{
public function __construct(private UtopiaPool $pool)
{
}
public function ping($message = null): bool
{
return $this->delegate(__FUNCTION__, \func_get_args());
}
public function subscribe($channels, $callback): void
{
$this->delegate(__FUNCTION__, \func_get_args());
}
public function publish($channel, $message): void
{
$this->delegate(__FUNCTION__, \func_get_args());
}
/**
* Forward method calls to the internal adapter instance via the pool.
*
* Required because __call() can't be used to implement abstract methods.
*
* @param string $method
* @param array<mixed> $args
* @return mixed
* @throws DatabaseException
*/
public function delegate(string $method, array $args): mixed
{
return $this->pool->use(function (Adapter $adapter) use ($method, $args) {
return $adapter->{$method}(...$args);
});
}
}

View file

@ -35,7 +35,7 @@ services:
- VERSION=dev
restart: unless-stopped
ports:
- 9501:80
- "9501:80"
networks:
- appwrite
labels:
@ -52,15 +52,12 @@ services:
- ./phpunit.xml:/usr/src/code/phpunit.xml
- ./tests:/usr/src/code/tests
- ./app:/usr/src/code/app
# - ./vendor:/usr/src/code/vendor
- ./docs:/usr/src/code/docs
- ./public:/usr/src/code/public
- ./src:/usr/src/code/src
- ./debug:/tmp
depends_on:
- mariadb
- redis
# - clamav
environment:
- _APP_COMPRESSION_MIN_SIZE_BYTES
- _APP_ENV
@ -355,33 +352,6 @@ services:
volumes:
- appwrite-redis:/data:rw
# clamav:
# image: appwrite/clamav:1.2.0
# container_name: appwrite-clamav
# restart: unless-stopped
# networks:
# - appwrite
# volumes:
# - appwrite-uploads:/storage/uploads
# redis-commander:
# image: rediscommander/redis-commander:latest
# restart: unless-stopped
# networks:
# - appwrite
# environment:
# - REDIS_HOSTS=redis
# ports:
# - "8081:8081"
# webgrind:
# image: 'jokkedk/webgrind:latest'
# volumes:
# - './debug:/tmp'
# ports:
# - '3001:80'
networks:
gateway:
appwrite: