feat: Filter accepted custom configs in templates + add templates custom configs automatically when fetching them from the database

This commit is contained in:
Théophile Diot 2024-08-06 15:02:10 +01:00
parent 57a1e223a9
commit 1f0210bb4d
No known key found for this signature in database
GPG key ID: FA995104A0BA376A

View file

@ -77,6 +77,7 @@ class Database:
DB_STRING_RX = re_compile(r"^(?P<database>(mariadb|mysql)(\+pymysql)?|sqlite(\+pysqlite)?|postgresql(\+psycopg)?):/+(?P<path>/[^\s]+)")
READONLY_ERROR = ("readonly", "read-only", "command denied", "Access denied")
RESTRICTED_TEMPLATE_SETTINGS = ("USE_TEMPLATE", "IS_DRAFT")
MULTISITE_CUSTOM_CONFIG_TYPES = ("server-http", "modsec-crs", "modsec", "server-stream", "crs-plugins-before", "crs-plugins-after")
def __init__(
self, logger: Logger, sqlalchemy_string: Optional[str] = None, *, ui: bool = False, pool: Optional[bool] = None, log: bool = True, **kwargs
@ -1012,6 +1013,12 @@ class Database:
)
continue
if config_type not in self.MULTISITE_CUSTOM_CONFIG_TYPES:
self.logger.error(
f'{plugin.get("type", "core").title()} Plugin "{plugin["id"]}"\'s Template {template_id} has an invalid config type: {config_type}'
)
continue
if not templates_path.joinpath(template_id, "configs", config_type, config_name).is_file():
self.logger.error(
f'{plugin.get("type", "core").title()} Plugin "{plugin["id"]}"\'s Template {template_id} has a missing config: {config}'
@ -1854,14 +1861,17 @@ class Database:
def get_custom_configs(self) -> List[Dict[str, Any]]:
"""Get the custom configs from the database"""
db_config = self.get_non_default_settings(filtered_settings={"USE_TEMPLATE"})
with self._db_session() as session:
return [
custom_configs = [
{
"service_id": custom_config.service_id,
"type": custom_config.type,
"name": custom_config.name,
"data": custom_config.data,
"method": custom_config.method,
"template": None,
}
for custom_config in (
session.query(Custom_configs).with_entities(
@ -1874,6 +1884,36 @@ class Database:
)
]
if not db_config:
return custom_configs
for service in session.query(Services).with_entities(Services.id).all():
for key, value in db_config.items():
if key.startswith(f"{service.id}_"):
for template_config in (
session.query(Template_custom_configs)
.with_entities(Template_custom_configs.type, Template_custom_configs.name, Template_custom_configs.data)
.filter_by(template_id=value)
):
if not any(
custom_config["service_id"] == service.id
and custom_config["type"] == template_config.type
and custom_config["name"] == template_config.name
for custom_config in custom_configs
):
custom_configs.append(
{
"service_id": service.id,
"type": template_config.type,
"name": template_config.name,
"data": template_config.data,
"method": "default",
"template": value,
}
)
return custom_configs
def get_services_settings(self, methods: bool = False, with_drafts: bool = False) -> List[Dict[str, Any]]:
"""Get the services' configs from the database"""
services = []
@ -2490,6 +2530,12 @@ class Database:
)
continue
if config_type not in self.MULTISITE_CUSTOM_CONFIG_TYPES:
self.logger.error(
f'{plugin.get("type", "core").title()} Plugin "{plugin["id"]}"\'s Template "{template_id}"\'s Custom config "{config}" is not a valid type, skipping it'
)
continue
if not templates_path.joinpath(template_id, "configs", config_type, config_name).is_file():
self.logger.error(
f'{plugin.get("type", "core").title()} Plugin "{plugin["id"]}"\'s Template "{template_id}"\'s Custom config "{config}" does not exist, skipping it'
@ -2720,6 +2766,12 @@ class Database:
)
continue
if config_type not in self.MULTISITE_CUSTOM_CONFIG_TYPES:
self.logger.error(
f'{plugin.get("type", "core").title()} Plugin "{plugin["id"]}"\'s Template "{template_id}"\'s Custom config "{config}" is not a valid type, skipping it'
)
continue
if not templates_path.joinpath(template_id, "configs", config_type, config_name).is_file():
self.logger.error(
f'{plugin.get("type", "core").title()} Plugin "{plugin["id"]}"\'s Template "{template_id}"\'s Custom config "{config}" does not exist, skipping it'
@ -3093,6 +3145,16 @@ class Database:
return page.data
def get_templates(self, plugin: Optional[str] = None) -> List[Dict[str, Any]]:
"""Get templates."""
with self._db_session() as session:
query = session.query(Templates).with_entities(Templates.id, Templates.plugin_id, Templates.name)
if plugin:
query = query.filter_by(plugin_id=plugin)
return [{"id": template.id, "plugin_id": template.plugin_id, "name": template.name} for template in query]
def get_template_settings(self, template_id: str) -> Dict[str, Any]:
"""Get templates settings."""
with self._db_session() as session: