Optimize Templator rendering process for faster performance

This commit is contained in:
Théophile Diot 2024-09-30 15:58:58 +02:00
parent a272448242
commit 5989988c93
No known key found for this signature in database
GPG key ID: FA995104A0BA376A

View file

@ -1,7 +1,9 @@
#!/usr/bin/env python3
from glob import glob
from concurrent.futures import ThreadPoolExecutor
from importlib import import_module
from glob import glob
from os import getenv
from os.path import basename, join
from pathlib import Path
from random import choice
@ -9,133 +11,281 @@ from string import ascii_letters, digits
from sys import path as sys_path
from typing import Any, Dict, List, Optional
if join("usr", "share", "bunkerweb", "deps", "python") in sys_path:
sys_path.append(join("usr", "share", "bunkerweb", "deps", "python"))
# Correct the sys.path modification logic
deps_path = join("usr", "share", "bunkerweb", "deps", "python")
if deps_path not in sys_path:
sys_path.append(deps_path)
from logger import setup_logger # type: ignore
from jinja2 import Environment, FileSystemLoader
# Configure logging
logger = setup_logger("Templator", getenv("CUSTOM_LOG_LEVEL", getenv("LOG_LEVEL", "INFO")))
class Templator:
def __init__(self, templates: str, core: str, plugins: str, pro_plugins: str, output: str, target: str, config: Dict[str, Any]):
self.__templates = templates
self.__global_templates = [basename(template) for template in glob(join(self.__templates, "*", "*.conf"))]
self.__core = core
self.__plugins = plugins
self.__pro_plugins = pro_plugins
self.__output = output
self.__target = target
self.__config = config
self.__jinja_env = self.__load_jinja_env()
"""A class to render configuration files using Jinja2 templates."""
def render(self):
self.__render_global()
servers = [self.__config.get("SERVER_NAME", "").strip()]
if self.__config.get("MULTISITE", "no") == "yes":
servers = self.__config.get("SERVER_NAME", "").strip().split(" ")
def __init__(
self,
templates: str,
core: str,
plugins: str,
pro_plugins: str,
output: str,
target: str,
config: Dict[str, Any],
):
"""Initialize the Templator with paths and configuration.
Args:
templates (str): Path to the templates directory.
core (str): Path to the core directory.
plugins (str): Path to the plugins directory.
pro_plugins (str): Path to the pro plugins directory.
output (str): Path to the output directory.
target (str): Target path.
config (Dict[str, Any]): Configuration dictionary.
"""
if not isinstance(templates, str):
raise TypeError("templates must be a string")
if not isinstance(core, str):
raise TypeError("core must be a string")
if not isinstance(plugins, str):
raise TypeError("plugins must be a string")
if not isinstance(pro_plugins, str):
raise TypeError("pro_plugins must be a string")
if not isinstance(output, str):
raise TypeError("output must be a string")
if not isinstance(target, str):
raise TypeError("target must be a string")
if not isinstance(config, dict):
raise TypeError("config must be a dictionary")
self._templates = templates
self._global_templates = [basename(template) for template in glob(join(self._templates, "*", "*.conf"))]
self._core = core
self._plugins = plugins
self._pro_plugins = pro_plugins
self._output = output
self._target = target
self._config = config
self._jinja_env = self._load_jinja_env()
def render(self) -> None:
"""Render the templates based on the provided configuration."""
self._render_global()
servers = [self._config.get("SERVER_NAME", "").strip()]
if self._config.get("MULTISITE", "no") == "yes":
servers = self._config.get("SERVER_NAME", "").strip().split()
for server in servers:
self.__render_server(server)
self._render_server(server)
def __load_jinja_env(self) -> Environment:
searchpath = [self.__templates]
for subpath in glob(join(self.__core, "*", "confs")) + glob(join(self.__plugins, "*", "confs")) + glob(join(self.__pro_plugins, "*", "confs")):
def _load_jinja_env(self) -> Environment:
"""Load the Jinja2 environment with the appropriate search paths.
Returns:
Environment: The Jinja2 environment.
"""
searchpath = [self._templates]
for subpath in glob(join(self._core, "*", "confs")) + glob(join(self._plugins, "*", "confs")) + glob(join(self._pro_plugins, "*", "confs")):
if Path(subpath).is_dir():
searchpath.append(subpath)
return Environment(loader=FileSystemLoader(searchpath=searchpath), lstrip_blocks=True, trim_blocks=True)
return Environment(
loader=FileSystemLoader(searchpath=searchpath),
lstrip_blocks=True,
trim_blocks=True,
)
def __find_templates(self, contexts) -> List[str]:
templates = []
for template in self.__jinja_env.list_templates():
if "global" in contexts and "/" not in template:
templates.append(template)
continue
for context in contexts:
if template.startswith(context):
templates.append(template)
def _find_templates(self, contexts: List[str]) -> List[str]:
"""Find templates matching the given contexts.
Args:
contexts (List[str]): List of context names.
Returns:
List[str]: List of template names.
"""
context_set = set(contexts)
templates = [
template
for template in self._jinja_env.list_templates()
if any(template.startswith(context + "/") or (context == "global" and "/" not in template) for context in context_set)
]
return templates
def __write_config(self, subpath: Optional[str] = None, config: Optional[Dict[str, Any]] = None):
real_path = Path(self.__output, subpath or "", "variables.env")
real_path.parent.mkdir(parents=True, exist_ok=True)
real_path.write_text("\n".join(f"{k}={v}" for k, v in (config or self.__config).items()))
def _write_config(self, subpath: Optional[str] = None, config: Optional[Dict[str, Any]] = None) -> None:
"""Write the configuration to a variables.env file.
def __render_global(self):
self.__write_config()
templates = self.__find_templates(["global", "http", "stream", "default-server-http"])
Args:
subpath (Optional[str], optional): Subpath under the output directory. Defaults to None.
config (Optional[Dict[str, Any]], optional): Configuration dictionary. Defaults to None.
"""
real_config = config if config is not None else self._config
real_path = Path(self._output, subpath or "", "variables.env")
try:
real_path.parent.mkdir(parents=True, exist_ok=True)
with real_path.open("w") as f:
f.write("\n".join(f"{k}={v}" for k, v in real_config.items()))
except IOError as e:
logger.error(f"Error writing configuration to {real_path}: {e}")
def _get_server_config(self, server: str) -> Dict[str, Any]:
"""Get the configuration for a specific server.
Args:
server (str): Server name.
Returns:
Dict[str, Any]: Configuration dictionary for the server.
"""
config = self._config.copy()
prefix = f"{server}_"
for variable, value in self._config.items():
if variable.startswith(prefix):
config[variable[len(prefix) :]] = value # noqa: E203
config["NGINX_PREFIX"] = join(self._target, server) + "/"
server_key = f"{server}_SERVER_NAME"
if server_key not in self._config:
config["SERVER_NAME"] = server
return config
def _render_global(self) -> None:
"""Render global templates."""
self._write_config()
templates = self._find_templates(["global", "http", "stream", "default-server-http"])
for template in templates:
self.__render_template(template)
self._render_template(template)
def __render_server(self, server: str):
templates = self.__find_templates(["modsec", "modsec-crs", "crs-plugins-before", "crs-plugins-after", "server-http", "server-stream"])
if self.__config.get("MULTISITE", "no") == "yes":
config = self.__config.copy()
for variable, value in self.__config.items():
if variable.startswith(f"{server}_"):
config[variable.replace(f"{server}_", "", 1)] = value
self.__write_config(subpath=server, config=config)
def _render_server(self, server: str) -> None:
"""Render templates for a specific server.
Args:
server (str): Server name.
"""
# Step 1: Find all relevant templates
templates = self._find_templates(["modsec", "modsec-crs", "crs-plugins-before", "crs-plugins-after", "server-http", "server-stream"])
# Step 2: Handle multisite configuration if applicable
subpath = None
config = None
if self._config.get("MULTISITE", "no") == "yes":
subpath = server
config = self._get_server_config(server)
self._write_config(subpath=subpath, config=config)
# Step 3: Precompute 'name' for each template
global_templates_set = set(self._global_templates) # Faster lookups
template_info = []
for template in templates:
subpath = None
config = None
name = None
if self.__config.get("MULTISITE", "no") == "yes":
subpath = server
config = self.__config.copy()
for variable, value in self.__config.items():
if variable.startswith(f"{server}_"):
config[variable.replace(f"{server}_", "", 1)] = value
config["NGINX_PREFIX"] = join(self.__target, server) + "/"
server_key = f"{server}_SERVER_NAME"
if server_key not in self.__config:
config["SERVER_NAME"] = server
# Check if the template ends with any of the global configurations
name = basename(template) if any(template.endswith(root_conf) for root_conf in global_templates_set) else None
template_info.append((template, name))
for root_conf in self.__global_templates:
if template.endswith(root_conf):
name = basename(template)
break
self.__render_template(template, subpath=subpath, config=config, name=name)
# Step 4: Define the rendering function
def render_template(info):
template, name = info
self._render_template(template, subpath=subpath, config=config, name=name)
def __render_template(
# Step 5: Use ThreadPoolExecutor with optimized settings
max_workers = min(32, len(template_info)) # Example: Limit to 32 or number of templates
with ThreadPoolExecutor(max_workers=max_workers) as executor:
executor.map(render_template, template_info)
def _render_template(
self,
template: str,
subpath: Optional[str] = None,
config: Optional[Dict[str, Any]] = None,
name: Optional[str] = None,
):
# Get real config and output folder in case it's a server config and we are in multisite mode
real_config = config.copy() if config else self.__config.copy()
) -> None:
"""Render a single template.
Args:
template (str): Template name.
subpath (Optional[str], optional): Subpath under the output directory. Defaults to None.
config (Optional[Dict[str, Any]], optional): Configuration dictionary. Defaults to None.
name (Optional[str], optional): Output file name. Defaults to None.
"""
real_config = config.copy() if config else self._config.copy()
real_config["all"] = real_config.copy()
real_config["import"] = import_module
real_config["is_custom_conf"] = Templator.is_custom_conf
real_config["has_variable"] = Templator.has_variable
real_config["random"] = Templator.random
real_config["read_lines"] = Templator.read_lines
real_path = Path(self.__output, subpath or "", name or template)
jinja_template = self.__jinja_env.get_template(template)
real_path.parent.mkdir(parents=True, exist_ok=True)
real_path.write_text(jinja_template.render(real_config))
real_config.update(
{
"is_custom_conf": Templator.is_custom_conf,
"has_variable": Templator.has_variable,
"random": Templator.random,
"read_lines": Templator.read_lines,
# TODO: Remove 'import' to avoid security risks
"import": import_module,
}
)
real_path = Path(self._output, subpath or "", name or template)
try:
jinja_template = self._jinja_env.get_template(template)
real_path.parent.mkdir(parents=True, exist_ok=True)
with real_path.open("w") as f:
f.write(jinja_template.render(real_config))
except Exception as e:
logger.error(f"Error rendering template {template}: {e}")
@staticmethod
def is_custom_conf(path: str) -> bool:
"""Check if the path contains any .conf files.
Args:
path (str): Path to check.
Returns:
bool: True if .conf files are found, False otherwise.
"""
return bool(glob(join(path, "*.conf")))
@staticmethod
def has_variable(all_vars: Dict[str, Any], variable: str, value: Any) -> bool:
"""Check if the variable has the specified value.
Args:
all_vars (Dict[str, Any]): Configuration variables.
variable (str): Variable name.
value (Any): Value to check against.
Returns:
bool: True if the variable has the specified value, False otherwise.
"""
if all_vars.get(variable) == value:
return True
elif all_vars.get("MULTISITE", "no") == "yes":
for server_name in all_vars["SERVER_NAME"].strip().split(" "):
server_names = all_vars.get("SERVER_NAME", "").strip().split()
for server_name in server_names:
if all_vars.get(f"{server_name}_{variable}") == value:
return True
return False
@staticmethod
def random(nb: int) -> str:
characters = ascii_letters + digits
def random(nb: int, characters: str = ascii_letters + digits) -> str:
"""Generate a random string of specified length.
Args:
nb (int): Length of the random string.
characters (str, optional): Characters to choose from. Defaults to ascii_letters + digits.
Returns:
str: Random string.
"""
return "".join(choice(characters) for _ in range(nb))
@staticmethod
def read_lines(file: str) -> List[str]:
"""Read lines from a file.
Args:
file (str): Path to the file.
Returns:
List[str]: List of lines, or empty list if file not found.
"""
try:
return Path(file).read_text().splitlines()
except:
except FileNotFoundError:
return []