Remove all remaining pool usages

This commit is contained in:
Jake Barnby 2025-04-29 21:28:53 +12:00
parent 4d92af5ba6
commit 02b1c82226
No known key found for this signature in database
GPG key ID: C437A8CC85B96E9C
4 changed files with 60 additions and 54 deletions

View file

@ -3,17 +3,19 @@
namespace Appwrite\Platform\Tasks;
use Appwrite\ClamAV\Network;
use Appwrite\PubSub\Adapter as PubSubAdapter;
use Appwrite\PubSub\Adapter\Pool as PubSubPool;
use PHPMailer\PHPMailer\PHPMailer;
use Utopia\App;
use Utopia\Cache\Adapter\Pool as CachePool;
use Utopia\CLI\Console;
use Utopia\Config\Config;
use Utopia\Database\Adapter as DatabaseAdapter;
use Utopia\Database\Adapter\Pool as DatabasePool;
use Utopia\Domains\Domain;
use Utopia\DSN\DSN;
use Utopia\Logger\Logger;
use Utopia\Platform\Action;
use Utopia\Pools\Group;
use Utopia\Queue\Broker\Pool as BrokerPool;
use Utopia\Registry\Registry;
use Utopia\Storage\Device\Local;
use Utopia\Storage\Storage;
@ -136,13 +138,13 @@ class Doctor extends Action
foreach ($configs as $key => $config) {
foreach ($config as $database) {
try {
$pools->get($database)->use(function (DatabaseAdapter $adapter) use ($key, $database) {
if ($adapter->ping()) {
Console::success('🟢 ' . str_pad("{$key}({$database})", 50, '.') . 'connected');
} else {
Console::error('🔴 ' . str_pad("{$key}({$database})", 47, '.') . 'disconnected');
}
});
$adapter = new DatabasePool($pools->get($database));
if ($adapter->ping()) {
Console::success('🟢 ' . str_pad("{$key}({$database})", 50, '.') . 'connected');
} else {
Console::error('🔴 ' . str_pad("{$key}({$database})", 47, '.') . 'disconnected');
}
} catch (\Throwable) {
Console::error('🔴 ' . str_pad("{$key}.({$database})", 47, '.') . 'disconnected');
}
@ -161,13 +163,17 @@ class Doctor extends Action
foreach ($configs as $key => $config) {
foreach ($config as $pool) {
try {
$pools->get($pool)->use(function (PubSubAdapter $adapter) use ($key, $pool) {
if ($adapter->ping()) {
Console::success('🟢 ' . str_pad("{$key}({$pool})", 50, '.') . 'connected');
} else {
Console::error('🔴 ' . str_pad("{$key}({$pool})", 47, '.') . 'disconnected');
}
});
$adapter = match($key) {
'Cache' => new CachePool($pools->get($pool)),
'Queue' => new BrokerPool($pools->get($pool)),
'PubSub' => new PubSubPool($pools->get($pool)),
};
if ($adapter->ping()) {
Console::success('🟢 ' . str_pad("{$key}({$pool})", 50, '.') . 'connected');
} else {
Console::error('🔴 ' . str_pad("{$key}({$pool})", 47, '.') . 'disconnected');
}
} catch (\Throwable) {
Console::error('🔴 ' . str_pad("{$key}({$pool})", 47, '.') . 'disconnected');
}

View file

@ -3,9 +3,9 @@
namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Func;
use Swoole\Coroutine as Co;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Utopia\Queue\Broker\Pool as BrokerPool;
use Utopia\Queue\Publisher;
class ScheduleExecutions extends ScheduleBase
@ -58,24 +58,23 @@ class ScheduleExecutions extends ScheduleBase
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
\go(function () use ($schedule, $delay, $data, $pools) {
Co::sleep($delay);
\Co::sleep($delay);
$pools->get('publisher')->use(function (Publisher $publisher) use ($schedule, $data) {
$queueForFunctions = new Func($publisher);
$publisher = new BrokerPool($pools->get('publisher'));
$queueForFunctions->setType('schedule')
// Set functionId instead of function as we don't have $dbForProject
// TODO: Refactor to use function instead of functionId
->setFunctionId($schedule['resource']['functionId'])
->setExecution($schedule['resource'])
->setMethod($data['method'] ?? 'POST')
->setPath($data['path'] ?? '/')
->setHeaders($data['headers'] ?? [])
->setBody($data['body'] ?? '')
->setProject($schedule['project'])
->setUserId($data['userId'] ?? '')
->trigger();
});
$queueForFunctions = new Func($publisher);
$queueForFunctions->setType('schedule')
// Set functionId instead of function as we don't have $dbForProject
// TODO: Refactor to use function instead of functionId
->setFunctionId($schedule['resource']['functionId'])
->setExecution($schedule['resource'])
->setMethod($data['method'] ?? 'POST')
->setPath($data['path'] ?? '/')
->setHeaders($data['headers'] ?? [])
->setBody($data['body'] ?? '')
->setProject($schedule['project'])
->setUserId($data['userId'] ?? '');
});
$dbForPlatform->deleteDocument(

View file

@ -8,6 +8,7 @@ use Utopia\CLI\Console;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Pools\Group;
use Utopia\Queue\Broker\Pool as BrokerPool;
use Utopia\Queue\Publisher;
class ScheduleFunctions extends ScheduleBase
@ -72,8 +73,7 @@ class ScheduleFunctions extends ScheduleBase
foreach ($delayedExecutions as $delay => $scheduleKeys) {
\go(function () use ($delay, $scheduleKeys, $pools, $dbForPlatform) {
\sleep($delay); // in seconds
\Co::sleep($delay); // in seconds
foreach ($scheduleKeys as $scheduleKey) {
// Ensure schedule was not deleted
@ -85,17 +85,17 @@ class ScheduleFunctions extends ScheduleBase
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
$pools->get('publisher')->use(function (Publisher $publisher) use ($schedule) {
$queueForFunctions = new Func($publisher);
$publisher = new BrokerPool($pools->get('publisher'));
$queueForFunctions
->setType('schedule')
->setFunction($schedule['resource'])
->setMethod('POST')
->setPath('/')
->setProject($schedule['project'])
->trigger();
});
$queueForFunctions = new Func($publisher);
$queueForFunctions
->setType('schedule')
->setFunction($schedule['resource'])
->setMethod('POST')
->setPath('/')
->setProject($schedule['project'])
->trigger();
}
});
}

View file

@ -5,6 +5,7 @@ namespace Appwrite\Platform\Tasks;
use Appwrite\Event\Messaging;
use Utopia\Database\Database;
use Utopia\Pools\Group;
use Utopia\Queue\Broker\Pool as BrokerPool;
use Utopia\Queue\Publisher;
class ScheduleMessages extends ScheduleBase
@ -42,17 +43,17 @@ class ScheduleMessages extends ScheduleBase
}
\go(function () use ($schedule, $pools, $dbForPlatform) {
$pools->get('publisher')->use(function (Publisher $publisher) use ($schedule, $dbForPlatform) {
$queueForMessaging = new Messaging($publisher);
$publisher = new BrokerPool($pools->get('publisher'));
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
$queueForMessaging = new Messaging($publisher);
$queueForMessaging
->setType(MESSAGE_SEND_TYPE_EXTERNAL)
->setMessageId($schedule['resourceId'])
->setProject($schedule['project'])
->trigger();
});
$this->updateProjectAccess($schedule['project'], $dbForPlatform);
$queueForMessaging
->setType(MESSAGE_SEND_TYPE_EXTERNAL)
->setMessageId($schedule['resourceId'])
->setProject($schedule['project'])
->trigger();
$dbForPlatform->deleteDocument(
'schedules',