appwrite/app/worker.php
ArnabChatterjee20k f6655343c7 pr comments addressed
* Added new exception `MIGRATION_DATABASE_TYPE_UNSUPPORTED` with proper error metadata and HTTP 400 response.
* Replaced generic CSV database type errors with the new migration-specific exception for clearer error handling.
* Added support for `DOCUMENTSDB` in migration transfer resource service mapping.
* Fixed Appwrite report initialization by correctly injecting `getDatabasesDB`.
* Updated database adapter initialization to conditionally disable attribute support for `DOCUMENTSDB`.
* Moved `setSupportForAttributes` logic from pool initialization to database resource creation.
* Removed duplicate `getDatabasesDB` resource definition and redundant database event listener setup.
* Cleaned up unused variables and minor code inconsistencies.
* Fixed docblock formatting in `TransactionState`.
* Adjusted metrics handling in VectorDB embeddings text creation (removed unnecessary trigger/reset flow).
2026-03-10 14:30:57 +05:30

641 lines
23 KiB
PHP
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

<?php
require_once __DIR__ . '/init.php';
use Appwrite\Certificates\LetsEncrypt;
use Appwrite\Event\Audit;
use Appwrite\Event\Build;
use Appwrite\Event\Certificate;
use Appwrite\Event\Database as EventDatabase;
use Appwrite\Event\Delete;
use Appwrite\Event\Event;
use Appwrite\Event\Func;
use Appwrite\Event\Mail;
use Appwrite\Event\Messaging;
use Appwrite\Event\Migration;
use Appwrite\Event\Realtime;
use Appwrite\Event\Screenshot;
use Appwrite\Event\StatsUsage;
use Appwrite\Event\Webhook;
use Appwrite\Platform\Appwrite;
use Appwrite\Utopia\Database\Documents\User;
use Executor\Executor;
use Swoole\Runtime;
use Utopia\Abuse\Adapters\TimeLimit\Redis as TimeLimitRedis;
use Utopia\Audit\Adapter\Database as AdapterDatabase;
use Utopia\Audit\Audit as UtopiaAudit;
use Utopia\Cache\Adapter\Pool as CachePool;
use Utopia\Cache\Adapter\Sharding;
use Utopia\Cache\Cache;
use Utopia\Config\Config;
use Utopia\Console;
use Utopia\Database\Adapter\Pool as DatabasePool;
use Utopia\Database\Database;
use Utopia\Database\DateTime;
use Utopia\Database\Document;
use Utopia\Database\Validator\Authorization;
use Utopia\DSN\DSN;
use Utopia\Logger\Log;
use Utopia\Logger\Logger;
use Utopia\Platform\Service;
use Utopia\Pools\Group;
use Utopia\Queue\Broker\Pool as BrokerPool;
use Utopia\Queue\Message;
use Utopia\Queue\Publisher;
use Utopia\Queue\Server;
use Utopia\Registry\Registry;
use Utopia\Storage\Device\Telemetry as TelemetryDevice;
use Utopia\System\System;
use Utopia\Telemetry\Adapter as Telemetry;
use Utopia\Telemetry\Adapter\None as NoTelemetry;
Runtime::enableCoroutine();
require_once __DIR__ . '/init/span.php';
global $register;
Server::setResource('register', fn () => $register);
Server::setResource('authorization', function () {
$authorization = new Authorization();
$authorization->disable();
return $authorization;
}, []);
Server::setResource('dbForPlatform', function (Cache $cache, Registry $register, Authorization $authorization) {
$pools = $register->get('pools');
$adapter = new DatabasePool($pools->get('console'));
$dbForPlatform = new Database($adapter, $cache);
$dbForPlatform
->setDatabase(APP_DATABASE)
->setAuthorization($authorization)
->setNamespace('_console')
->setDocumentType('users', User::class)
;
return $dbForPlatform;
}, ['cache', 'register', 'authorization']);
Server::setResource('project', function (Message $message, Database $dbForPlatform) {
$payload = $message->getPayload() ?? [];
$project = new Document($payload['project'] ?? []);
if ($project->getId() === 'console') {
return $project;
}
return $dbForPlatform->getDocument('projects', $project->getId());
}, ['message', 'dbForPlatform']);
Server::setResource('dbForProject', function (Cache $cache, Registry $register, Message $message, Document $project, Database $dbForPlatform, Authorization $authorization) {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForPlatform;
}
$pools = $register->get('pools');
try {
$dsn = new DSN($project->getAttribute('database'));
} catch (\InvalidArgumentException) {
// TODO: Temporary until all projects are using shared tables
$dsn = new DSN('mysql://' . $project->getAttribute('database'));
}
$adapter = new DatabasePool($pools->get($dsn->getHost()));
$database = new Database($adapter, $cache);
$database->setDocumentType('users', User::class);
$sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
if (\in_array($dsn->getHost(), $sharedTables)) {
$database
->setSharedTables(true)
->setTenant((int)$project->getSequence())
->setNamespace($dsn->getParam('namespace'));
} else {
$database
->setSharedTables(false)
->setTenant(null)
->setNamespace('_' . $project->getSequence());
}
$database
->setDatabase(APP_DATABASE)
->setAuthorization($authorization)
->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS_WORKER);
return $database;
}, ['cache', 'register', 'message', 'project', 'dbForPlatform', 'authorization']);
Server::setResource('getProjectDB', function (Group $pools, Database $dbForPlatform, $cache, Authorization $authorization) {
$databases = []; // TODO: @Meldiron This should probably be responsibility of utopia-php/pools
return function (Document $project) use ($pools, $dbForPlatform, $cache, $authorization, &$databases): Database {
if ($project->isEmpty() || $project->getId() === 'console') {
return $dbForPlatform;
}
try {
$dsn = new DSN($project->getAttribute('database'));
} catch (\InvalidArgumentException) {
// TODO: Temporary until all projects are using shared tables
$dsn = new DSN('mysql://' . $project->getAttribute('database'));
}
if (isset($databases[$dsn->getHost()])) {
$database = $databases[$dsn->getHost()];
$database->setAuthorization($authorization);
$sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
if (\in_array($dsn->getHost(), $sharedTables)) {
$database
->setSharedTables(true)
->setTenant((int)$project->getSequence())
->setNamespace($dsn->getParam('namespace'));
} else {
$database
->setSharedTables(false)
->setTenant(null)
->setNamespace('_' . $project->getSequence());
}
return $database;
}
$adapter = new DatabasePool($pools->get($dsn->getHost()));
$database = new Database($adapter, $cache);
$databases[$dsn->getHost()] = $database;
$sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
if (\in_array($dsn->getHost(), $sharedTables)) {
$database
->setSharedTables(true)
->setTenant((int)$project->getSequence())
->setNamespace($dsn->getParam('namespace'));
} else {
$database
->setSharedTables(false)
->setTenant(null)
->setNamespace('_' . $project->getSequence());
}
$database
->setDatabase(APP_DATABASE)
->setAuthorization($authorization)
->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS_WORKER);
return $database;
};
}, ['pools', 'dbForPlatform', 'cache', 'authorization']);
Server::setResource('getLogsDB', function (Group $pools, Cache $cache, Authorization $authorization) {
$database = null;
return function (?Document $project = null) use ($pools, $cache, $database, $authorization) {
if ($database !== null && $project !== null && !$project->isEmpty() && $project->getId() !== 'console') {
$database->setTenant((int)$project->getSequence());
return $database;
}
$adapter = new DatabasePool($pools->get('logs'));
$database = new Database($adapter, $cache);
$database
->setDatabase(APP_DATABASE)
->setAuthorization($authorization)
->setSharedTables(true)
->setNamespace('logsV1')
->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS_WORKER)
->setMaxQueryValues(APP_DATABASE_QUERY_MAX_VALUES_WORKER);
// set tenant
if ($project !== null && !$project->isEmpty() && $project->getId() !== 'console') {
$database->setTenant((int)$project->getSequence());
}
return $database;
};
}, ['pools', 'cache', 'authorization']);
Server::setResource('getDatabasesDB', function (Cache $cache, Registry $register, Document $project, Authorization $authorization) {
return function (Document $database, ?Document $projectDocument = null) use ($cache, $register, $project, $authorization): Database {
$projectDocument ??= $project;
$databaseType = $database->getAttribute('database', '');
// Backwardscompatibility: older or seeded legacy databases may not have a DSN stored
// in the "database" attribute. In that case, fall back to the project's database DSN.
if ($databaseType === '') {
$databaseType = $projectDocument->getAttribute('database', '');
}
try {
$databaseDSN = new DSN($databaseType);
} catch (\InvalidArgumentException) {
$databaseDSN = new DSN('mysql://'.$databaseType);
}
try {
$dsn = new DSN($projectDocument->getAttribute('database'));
} catch (\InvalidArgumentException) {
// Temporary fallback until all projects use shared tables
$dsn = new DSN('mysql://' . $projectDocument->getAttribute('database'));
}
$pools = $register->get('pools');
$pool = $pools->get($databaseDSN->getHost());
$adapter = new DatabasePool($pool);
$adapter->setSupportForAttributes($databaseType !== DOCUMENTSDB);
$database = new Database($adapter, $cache);
$database
->setDatabase(APP_DATABASE)
->setAuthorization($authorization);
$sharedTables = \explode(',', System::getEnv('_APP_DATABASE_SHARED_TABLES', ''));
if (\in_array($dsn->getHost(), $sharedTables, true)) {
$database
->setSharedTables(true)
->setTenant((int) $projectDocument->getSequence())
->setNamespace($dsn->getParam('namespace'));
} else {
$database
->setSharedTables(false)
->setTenant(null)
->setNamespace('_' . $projectDocument->getSequence());
}
$database->setTimeout(APP_DATABASE_TIMEOUT_MILLISECONDS_WORKER);
return $database;
};
}, ['cache', 'register', 'project', 'authorization']);
Server::setResource('abuseRetention', function () {
return time() - (int) System::getEnv('_APP_MAINTENANCE_RETENTION_ABUSE', 86400); // 1 day
});
Server::setResource('auditRetention', function (Document $project) {
if ($project->getId() === 'console') {
return DateTime::addSeconds(new \DateTime(), -1 * System::getEnv('_APP_MAINTENANCE_RETENTION_AUDIT_CONSOLE', 15778800)); // 6 months
}
return DateTime::addSeconds(new \DateTime(), -1 * System::getEnv('_APP_MAINTENANCE_RETENTION_AUDIT', 1209600)); // 14 days
}, ['project']);
Server::setResource('executionRetention', function () {
return DateTime::addSeconds(new \DateTime(), -1 * System::getEnv('_APP_MAINTENANCE_RETENTION_EXECUTION', 1209600)); // 14 days
});
Server::setResource('cache', function (Registry $register) {
$pools = $register->get('pools');
$list = Config::getParam('pools-cache', []);
$adapters = [];
foreach ($list as $value) {
$adapters[] = new CachePool($pools->get($value));
}
return new Cache(new Sharding($adapters));
}, ['register']);
Server::setResource('redis', function () {
$host = System::getEnv('_APP_REDIS_HOST', 'localhost');
$port = System::getEnv('_APP_REDIS_PORT', 6379);
$pass = System::getEnv('_APP_REDIS_PASS', '');
$redis = new \Redis();
@$redis->pconnect($host, (int)$port);
if ($pass) {
$redis->auth($pass);
}
$redis->setOption(\Redis::OPT_READ_TIMEOUT, -1);
return $redis;
});
Server::setResource('timelimit', function (\Redis $redis) {
return function (string $key, int $limit, int $time) use ($redis) {
return new TimeLimitRedis($key, $limit, $time, $redis);
};
}, ['redis']);
Server::setResource('log', fn () => new Log());
Server::setResource('publisher', function (Group $pools) {
return new BrokerPool(publisher: $pools->get('publisher'));
}, ['pools']);
Server::setResource('publisherDatabases', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
Server::setResource('publisherFunctions', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
Server::setResource('publisherMigrations', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
Server::setResource('publisherStatsUsage', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
Server::setResource('publisherMessaging', function (BrokerPool $publisher) {
return $publisher;
}, ['publisher']);
Server::setResource('consumer', function (Group $pools) {
return new BrokerPool(consumer: $pools->get('consumer'));
}, ['pools']);
Server::setResource('consumerDatabases', function (BrokerPool $consumer) {
return $consumer;
}, ['consumer']);
Server::setResource('consumerMigrations', function (BrokerPool $consumer) {
return $consumer;
}, ['consumer']);
Server::setResource('consumerStatsUsage', function (BrokerPool $consumer) {
return $consumer;
}, ['consumer']);
Server::setResource('queueForStatsUsage', function (Publisher $publisher) {
return new StatsUsage($publisher);
}, ['publisher']);
Server::setResource('queueForDatabase', function (Publisher $publisher) {
return new EventDatabase($publisher);
}, ['publisher']);
Server::setResource('queueForMessaging', function (Publisher $publisher) {
return new Messaging($publisher);
}, ['publisher']);
Server::setResource('queueForMails', function (Publisher $publisher) {
return new Mail($publisher);
}, ['publisher']);
Server::setResource('queueForBuilds', function (Publisher $publisher) {
return new Build($publisher);
}, ['publisher']);
Server::setResource('queueForScreenshots', function (Publisher $publisher) {
return new Screenshot($publisher);
}, ['publisher']);
Server::setResource('queueForDeletes', function (Publisher $publisher) {
return new Delete($publisher);
}, ['publisher']);
Server::setResource('queueForEvents', function (Publisher $publisher) {
return new Event($publisher);
}, ['publisher']);
Server::setResource('queueForAudits', function (Publisher $publisher) {
return new Audit($publisher);
}, ['publisher']);
Server::setResource('queueForWebhooks', function (Publisher $publisher) {
return new Webhook($publisher);
}, ['publisher']);
Server::setResource('queueForFunctions', function (Publisher $publisher) {
return new Func($publisher);
}, ['publisher']);
Server::setResource('queueForRealtime', function () {
return new Realtime();
}, []);
Server::setResource('queueForCertificates', function (Publisher $publisher) {
return new Certificate($publisher);
}, ['publisher']);
Server::setResource('queueForMigrations', function (Publisher $publisher) {
return new Migration($publisher);
}, ['publisher']);
Server::setResource('logger', function (Registry $register) {
return $register->get('logger');
}, ['register']);
Server::setResource('pools', function (Registry $register) {
return $register->get('pools');
}, ['register']);
Server::setResource('telemetry', fn () => new NoTelemetry());
Server::setResource('deviceForSites', function (Document $project, Telemetry $telemetry) {
return new TelemetryDevice($telemetry, getDevice(APP_STORAGE_SITES . '/app-' . $project->getId()));
}, ['project', 'telemetry']);
Server::setResource('deviceForMigrations', function (Document $project, Telemetry $telemetry) {
return new TelemetryDevice($telemetry, getDevice(APP_STORAGE_IMPORTS . '/app-' . $project->getId()));
}, ['project', 'telemetry']);
Server::setResource('deviceForFunctions', function (Document $project, Telemetry $telemetry) {
return new TelemetryDevice($telemetry, getDevice(APP_STORAGE_FUNCTIONS . '/app-' . $project->getId()));
}, ['project', 'telemetry']);
Server::setResource('deviceForFiles', function (Document $project, Telemetry $telemetry) {
return new TelemetryDevice($telemetry, getDevice(APP_STORAGE_UPLOADS . '/app-' . $project->getId()));
}, ['project', 'telemetry']);
Server::setResource('deviceForBuilds', function (Document $project, Telemetry $telemetry) {
return new TelemetryDevice($telemetry, getDevice(APP_STORAGE_BUILDS . '/app-' . $project->getId()));
}, ['project', 'telemetry']);
Server::setResource('deviceForCache', function (Document $project, Telemetry $telemetry) {
return new TelemetryDevice($telemetry, getDevice(APP_STORAGE_CACHE . '/app-' . $project->getId()));
}, ['project', 'telemetry']);
Server::setResource(
'isResourceBlocked',
fn () => fn (Document $project, string $resourceType, ?string $resourceId) => false
);
Server::setResource('plan', function (array $plan = []) {
return [];
});
Server::setResource('certificates', function () {
$email = System::getEnv('_APP_EMAIL_CERTIFICATES', System::getEnv('_APP_SYSTEM_SECURITY_EMAIL_ADDRESS'));
if (empty($email)) {
throw new Exception('You must set a valid security email address (_APP_EMAIL_CERTIFICATES) to issue a LetsEncrypt SSL certificate.');
}
return new LetsEncrypt($email);
});
Server::setResource('logError', function (Registry $register, Document $project) {
return function (Throwable $error, string $namespace, string $action, ?array $extras = null) use ($register, $project) {
$logger = $register->get('logger');
if ($logger) {
$version = System::getEnv('_APP_VERSION', 'UNKNOWN');
$log = new Log();
$log->setNamespace($namespace);
$log->setServer(System::getEnv('_APP_LOGGING_SERVICE_IDENTIFIER', \gethostname()));
$log->setVersion($version);
$log->setType(Log::TYPE_ERROR);
$log->setMessage($error->getMessage());
$log->addTag('code', $error->getCode());
$log->addTag('verboseType', get_class($error));
$log->addTag('projectId', $project->getId() ?? '');
$log->addExtra('file', $error->getFile());
$log->addExtra('line', $error->getLine());
$log->addExtra('trace', $error->getTraceAsString());
if ($error->getPrevious() !== null) {
if ($error->getPrevious()->getMessage() != $error->getMessage()) {
$log->addExtra('previousMessage', $error->getPrevious()->getMessage());
}
$log->addExtra('previousFile', $error->getPrevious()->getFile());
$log->addExtra('previousLine', $error->getPrevious()->getLine());
}
foreach (($extras ?? []) as $key => $value) {
$log->addExtra($key, $value);
}
$log->setAction($action);
$isProduction = System::getEnv('_APP_ENV', 'development') === 'production';
$log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING);
try {
$responseCode = $logger->addLog($log);
Console::info('Error log pushed with status code: ' . $responseCode);
} catch (Throwable $th) {
Console::error('Error pushing log: ' . $th->getMessage());
}
}
Console::warning("Failed: {$error->getMessage()}");
Console::warning($error->getTraceAsString());
if ($error->getPrevious() !== null) {
if ($error->getPrevious()->getMessage() != $error->getMessage()) {
Console::warning("Previous Failed: {$error->getPrevious()->getMessage()}");
}
Console::warning("Previous File: {$error->getPrevious()->getFile()} Line: {$error->getPrevious()->getLine()}");
}
};
}, ['register', 'project']);
Server::setResource('executor', fn () => new Executor());
Server::setResource('getAudit', function (Database $dbForPlatform, callable $getProjectDB) {
return function (Document $project) use ($dbForPlatform, $getProjectDB) {
if ($project->isEmpty() || $project->getId() === 'console') {
$adapter = new AdapterDatabase($dbForPlatform);
return new UtopiaAudit($adapter);
}
$dbForProject = $getProjectDB($project);
$adapter = new AdapterDatabase($dbForProject);
return new UtopiaAudit($adapter);
};
}, ['dbForPlatform', 'getProjectDB']);
Server::setResource('executionsRetentionCount', function (Document $project, array $plan) {
if ($project->getId() === 'console' || empty($plan)) {
return 0;
}
return (int) ($plan['executionsRetentionCount'] ?? 100);
}, ['project', 'plan']);
$pools = $register->get('pools');
$platform = new Appwrite();
$args = $platform->getEnv('argv');
if (!isset($args[1])) {
Console::error('Missing worker name');
Console::exit(1);
}
\array_shift($args);
$workerName = $args[0];
if (\str_starts_with($workerName, 'databases')) {
$queueName = System::getEnv('_APP_QUEUE_NAME', 'database_db_main');
} else {
$queueName = System::getEnv('_APP_QUEUE_NAME', 'v1-' . strtolower($workerName));
}
try {
/**
* Any worker can be configured with the following env vars:
* - _APP_WORKERS_NUM The total number of worker processes
* - _APP_WORKER_PER_CORE The number of worker processes per core (ignored if _APP_WORKERS_NUM is set)
* - _APP_QUEUE_NAME The name of the queue to read for database events
*/
$platform->init(Service::TYPE_WORKER, [
'workersNum' => System::getEnv('_APP_WORKERS_NUM', 1),
'connection' => $pools->get('consumer')->pop()->getResource(),
'workerName' => strtolower($workerName) ?? null,
'queueName' => $queueName
]);
} catch (\Throwable $e) {
Console::error($e->getMessage() . ', File: ' . $e->getFile() . ', Line: ' . $e->getLine());
}
$worker = $platform->getWorker();
Server::setResource('bus', function ($register) use ($worker) {
return $register->get('bus')->setResolver(fn (string $name) => $worker->getResource($name));
}, ['register']);
$worker
->error()
->inject('error')
->inject('logger')
->inject('log')
->inject('pools')
->inject('project')
->inject('authorization')
->action(function (Throwable $error, ?Logger $logger, Log $log, Group $pools, Document $project, Authorization $authorization) use ($worker, $queueName) {
$version = System::getEnv('_APP_VERSION', 'UNKNOWN');
if ($logger) {
$log->setNamespace("appwrite-worker");
$log->setServer(System::getEnv('_APP_LOGGING_SERVICE_IDENTIFIER', \gethostname()));
$log->setVersion($version);
$log->setType(Log::TYPE_ERROR);
$log->setMessage($error->getMessage());
$log->setAction('appwrite-queue-' . $queueName);
$log->addTag('verboseType', get_class($error));
$log->addTag('code', $error->getCode());
$log->addTag('projectId', $project->getId() ?? 'n/a');
$log->addExtra('file', $error->getFile());
$log->addExtra('line', $error->getLine());
$log->addExtra('trace', $error->getTraceAsString());
$log->addExtra('roles', $authorization->getRoles());
$isProduction = System::getEnv('_APP_ENV', 'development') === 'production';
$log->setEnvironment($isProduction ? Log::ENVIRONMENT_PRODUCTION : Log::ENVIRONMENT_STAGING);
try {
$responseCode = $logger->addLog($log);
Console::info('Error log pushed with status code: ' . $responseCode);
} catch (Throwable $th) {
Console::error('Error pushing log: ' . $th->getMessage());
}
}
Console::error('[Error] Type: ' . get_class($error));
Console::error('[Error] Message: ' . $error->getMessage());
Console::error('[Error] File: ' . $error->getFile());
Console::error('[Error] Line: ' . $error->getLine());
});
$worker->start();