Add obfuscation_file and obfuscation_checksum columns to Plugin_pages table in model.py to support obfuscated actions in web UI

This commit is contained in:
Théophile Diot 2024-04-18 11:48:35 +02:00
parent 2ce01d14b2
commit aa944188c9
No known key found for this signature in database
GPG key ID: 248FEA4BAE400D06
3 changed files with 139 additions and 38 deletions

View file

@ -3,17 +3,17 @@
from contextlib import contextmanager, suppress
from copy import deepcopy
from datetime import datetime
from hashlib import sha256
from inspect import getsourcefile
from io import BytesIO
from logging import Logger
from os import _exit, getenv, listdir, sep
from os.path import basename, join
from os.path import join
from pathlib import Path
from re import compile as re_compile
from sys import _getframe, path as sys_path
from sys import argv, path as sys_path
from typing import Any, Dict, List, Literal, Optional, Tuple, Union
from time import sleep
from traceback import format_exc
from zipfile import ZIP_DEFLATED, ZipFile
from model import (
Base,
@ -37,7 +37,7 @@ for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in ((
if deps_path not in sys_path:
sys_path.append(deps_path)
from common_utils import file_hash # type: ignore
from common_utils import bytes_hash # type: ignore
from pymysql import install_as_MySQLdb
from sqlalchemy import create_engine, event, MetaData as sql_metadata, text, inspect
@ -717,6 +717,7 @@ class Database:
.with_entities(
Plugin_pages.template_checksum,
Plugin_pages.actions_checksum,
Plugin_pages.obfuscation_checksum,
)
.filter_by(plugin_id=plugin["id"])
.first()
@ -728,8 +729,21 @@ class Database:
if {"template.html", "actions.py"}.issubset(listdir(str(path_ui))):
template = path_ui.joinpath("template.html").read_bytes()
actions = path_ui.joinpath("actions.py").read_bytes()
template_checksum = sha256(template).hexdigest()
actions_checksum = sha256(actions).hexdigest()
template_checksum = bytes_hash(template, algorithm="sha256")
actions_checksum = bytes_hash(actions, algorithm="sha256")
obfuscation_file = None
obfuscation_checksum = None
obfuscation_dir = path_ui.joinpath("pyarmor_runtime_000000")
if obfuscation_dir.is_dir():
obfuscation_file = BytesIO()
with ZipFile(obfuscation_file, "w", ZIP_DEFLATED) as zip_file:
for path in obfuscation_dir.rglob("*"):
if path.is_file():
zip_file.write(path, path.relative_to(path_ui))
obfuscation_file.seek(0, 0)
obfuscation_file = obfuscation_file.getvalue()
obfuscation_checksum = bytes_hash(obfuscation_file, algorithm="sha256")
if db_plugin_page:
updates = {}
@ -749,6 +763,14 @@ class Database:
}
)
if obfuscation_checksum != db_plugin_page.obfuscation_checksum:
updates.update(
{
Plugin_pages.obfuscation_file: obfuscation_file,
Plugin_pages.obfuscation_checksum: obfuscation_checksum,
}
)
if updates:
self.logger.warning(f'Page for plugin "{plugin["id"]}" already exists, updating it with the new values')
session.query(Plugin_pages).filter(Plugin_pages.plugin_id == plugin["id"]).update(updates)
@ -764,6 +786,8 @@ class Database:
template_checksum=template_checksum,
actions_file=actions,
actions_checksum=actions_checksum,
obfuscation_file=obfuscation_file,
obfuscation_checksum=obfuscation_checksum,
)
)
remove = False
@ -1103,7 +1127,7 @@ class Database:
custom_config["type"] = custom_config["type"].replace("-", "_").lower() # type: ignore
custom_config["data"] = custom_config["data"].encode("utf-8") if isinstance(custom_config["data"], str) else custom_config["data"]
custom_config["checksum"] = sha256(custom_config["data"]).hexdigest() # type: ignore
custom_config["checksum"] = bytes_hash(custom_config["data"], algorithm="sha256") # type: ignore
service_id = custom_config.get("service_id", None) or None
filters = {
@ -1292,7 +1316,7 @@ class Database:
return ""
def delete_job_cache(self, file_name: str, *, job_name: Optional[str] = None, service_id: Optional[str] = None):
job_name = job_name or basename(getsourcefile(_getframe(1))).replace(".py", "")
job_name = job_name or argv[0].replace(".py", "")
filters = {"file_name": file_name}
if job_name:
filters["job_name"] = job_name
@ -1312,7 +1336,7 @@ class Database:
checksum: Optional[str] = None,
) -> str:
"""Update the plugin cache in the database"""
job_name = job_name or basename(getsourcefile(_getframe(1))).replace(".py", "")
job_name = job_name or argv[0].replace(".py", "")
service_id = service_id or None
with self.__db_session() as session:
cache = session.query(Jobs_cache).filter_by(job_name=job_name, service_id=service_id, file_name=file_name).first()
@ -1587,32 +1611,46 @@ class Database:
if path_ui.is_dir():
remove = True
if {"template.html", "actions.py"}.issubset(listdir(str(path_ui))):
template = path_ui.joinpath("template.html").read_bytes()
actions = path_ui.joinpath("actions.py").read_bytes()
template_checksum = bytes_hash(template, algorithm="sha256")
actions_checksum = bytes_hash(actions, algorithm="sha256")
obfuscation_file = None
obfuscation_checksum = None
obfuscation_dir = path_ui.joinpath("pyarmor_runtime_000000")
if obfuscation_dir.is_dir():
obfuscation_file = BytesIO()
with ZipFile(obfuscation_file, "w", ZIP_DEFLATED) as zip_file:
for path in obfuscation_dir.rglob("*"):
if path.is_file():
zip_file.write(path, path.relative_to(path_ui))
obfuscation_file.seek(0, 0)
obfuscation_file = obfuscation_file.getvalue()
obfuscation_checksum = bytes_hash(obfuscation_file, algorithm="sha256")
if not db_plugin_page:
changes = True
template = path_ui.joinpath("template.html").read_bytes()
actions = path_ui.joinpath("actions.py").read_bytes()
to_put.append(
Plugin_pages(
plugin_id=plugin["id"],
template_file=template,
template_checksum=sha256(template).hexdigest(),
template_checksum=template_checksum,
actions_file=actions,
actions_checksum=sha256(actions).hexdigest(),
actions_checksum=actions_checksum,
obfuscation_file=obfuscation_file,
obfuscation_checksum=obfuscation_checksum,
)
)
remove = False
else:
updates = {}
template_path = path_ui.joinpath("template.html")
actions_path = path_ui.joinpath("actions.py")
template_checksum = file_hash(str(template_path))
actions_checksum = file_hash(str(actions_path))
if template_checksum != db_plugin_page.template_checksum:
updates.update(
{
Plugin_pages.template_file: template_path.read_bytes(),
Plugin_pages.template_file: template,
Plugin_pages.template_checksum: template_checksum,
}
)
@ -1620,11 +1658,19 @@ class Database:
if actions_checksum != db_plugin_page.actions_checksum:
updates.update(
{
Plugin_pages.actions_file: actions_path.read_bytes(),
Plugin_pages.actions_file: actions,
Plugin_pages.actions_checksum: actions_checksum,
}
)
if obfuscation_checksum != db_plugin_page.obfuscation_checksum:
updates.update(
{
Plugin_pages.obfuscation_file: obfuscation_file,
Plugin_pages.obfuscation_checksum: obfuscation_checksum,
}
)
if updates:
changes = True
session.query(Plugin_pages).filter(Plugin_pages.plugin_id == plugin["id"]).update(updates)
@ -1736,31 +1782,44 @@ class Database:
.filter_by(plugin_id=plugin["id"])
.first()
)
template = path_ui.joinpath("template.html").read_bytes()
actions = path_ui.joinpath("actions.py").read_bytes()
template_checksum = bytes_hash(template, algorithm="sha256")
actions_checksum = bytes_hash(actions, algorithm="sha256")
obfuscation_file = None
obfuscation_checksum = None
obfuscation_dir = path_ui.joinpath("pyarmor_runtime_000000")
if obfuscation_dir.is_dir():
obfuscation_file = BytesIO()
with ZipFile(obfuscation_file, "w", ZIP_DEFLATED) as zip_file:
for path in obfuscation_dir.rglob("*"):
if path.is_file():
zip_file.write(path, path.relative_to(path_ui))
obfuscation_file.seek(0, 0)
obfuscation_file = obfuscation_file.getvalue()
obfuscation_checksum = bytes_hash(obfuscation_file, algorithm="sha256")
if not db_plugin_page:
template = path_ui.joinpath("template.html").read_bytes()
actions = path_ui.joinpath("actions.py").read_bytes()
to_put.append(
Plugin_pages(
plugin_id=plugin["id"],
template_file=template,
template_checksum=sha256(template).hexdigest(),
template_checksum=template_checksum,
actions_file=actions,
actions_checksum=sha256(actions).hexdigest(),
actions_checksum=actions_checksum,
obfuscation_file=obfuscation_file,
obfuscation_checksum=obfuscation_checksum,
)
)
else:
updates = {}
template_path = path_ui.joinpath("template.html")
actions_path = path_ui.joinpath("actions.py")
template_checksum = file_hash(str(template_path))
actions_checksum = file_hash(str(actions_path))
if template_checksum != db_plugin_page.template_checksum:
updates.update(
{
Plugin_pages.template_file: template_path.read_bytes(),
Plugin_pages.template_file: template,
Plugin_pages.template_checksum: template_checksum,
}
)
@ -1768,11 +1827,19 @@ class Database:
if actions_checksum != db_plugin_page.actions_checksum:
updates.update(
{
Plugin_pages.actions_file: actions_path.read_bytes(),
Plugin_pages.actions_file: actions,
Plugin_pages.actions_checksum: actions_checksum,
}
)
if obfuscation_checksum != db_plugin_page.obfuscation_checksum:
updates.update(
{
Plugin_pages.obfuscation_file: obfuscation_file,
Plugin_pages.obfuscation_checksum: obfuscation_checksum,
}
)
if updates:
session.query(Plugin_pages).filter(Plugin_pages.plugin_id == plugin["id"]).update(updates)
@ -1806,7 +1873,7 @@ class Database:
with self.__db_session() as session:
entities = [Plugins.id, Plugins.stream, Plugins.name, Plugins.description, Plugins.version, Plugins.type, Plugins.method, Plugins.checksum]
if with_data:
entities.append(Plugins.data)
entities.append(Plugins.data) # type: ignore
db_plugins = session.query(Plugins).with_entities(*entities)
if _type != "all":
@ -2071,6 +2138,16 @@ class Database:
return page.template_file
def get_plugin_obfuscation(self, plugin: str) -> Optional[Any]:
"""get obfuscation file for the plugin"""
with self.__db_session() as session:
page = session.query(Plugin_pages).with_entities(Plugin_pages.obfuscation_file).filter_by(plugin_id=plugin).first()
if not page:
return None
return page.obfuscation_file
def get_ui_user(self) -> Optional[dict]:
"""Get ui user."""
with self.__db_session() as session:

View file

@ -150,6 +150,8 @@ class Plugin_pages(Base):
template_checksum = Column(String(128), nullable=False)
actions_file = Column(LargeBinary(length=(2**32) - 1), nullable=False)
actions_checksum = Column(String(128), nullable=False)
obfuscation_file = Column(LargeBinary(length=(2**32) - 1), default=None, nullable=True)
obfuscation_checksum = Column(String(128), default=None, nullable=True)
plugin = relationship("Plugins", back_populates="pages")

View file

@ -9,6 +9,7 @@ from string import ascii_letters, digits
from sys import path as sys_path, modules as sys_modules
from pathlib import Path
from typing import Union
from uuid import uuid4
for deps_path in [join(sep, "usr", "share", "bunkerweb", *paths) for paths in (("deps", "python"), ("utils",), ("api",), ("db",))]:
if deps_path not in sys_path:
@ -302,15 +303,34 @@ def run_action(plugin: str, function_name: str = ""):
if module is None:
return {"status": "ko", "code": 404, "message": "The actions.py file for the plugin does not exist"}
obfuscation = db.get_plugin_obfuscation(plugin)
tmp_dir = None
try:
# Try to import the custom plugin
with NamedTemporaryFile(mode="wb", suffix=".py", delete=True) as temp:
temp.write(module)
temp.flush()
temp.seek(0)
loader = SourceFileLoader("actions", temp.name)
if obfuscation:
tmp_dir = Path(sep, "var", "tmp", "bunkerweb", "ui", "action", str(uuid4()))
tmp_dir.mkdir(parents=True, exist_ok=True)
action_file = tmp_dir.joinpath("actions.py")
with ZipFile(BytesIO(obfuscation), "r") as zip_ref:
zip_ref.extractall(tmp_dir)
action_file.write_bytes(module)
sys_path.append(tmp_dir.as_posix())
loader = SourceFileLoader("actions", action_file.as_posix())
actions = loader.load_module()
else:
with NamedTemporaryFile(mode="wb", suffix=".py", delete=True) as temp:
temp.write(module)
temp.flush()
temp.seek(0)
loader = SourceFileLoader("actions", temp.name)
actions = loader.load_module()
except:
if tmp_dir:
sys_path.pop()
rmtree(tmp_dir, ignore_errors=True)
app.logger.exception("An error occurred while importing the plugin")
return {"status": "ko", "code": 500, "message": "An error occurred while importing the plugin, see logs for more details"}
res = None
@ -332,11 +352,13 @@ def run_action(plugin: str, function_name: str = ""):
finally:
if sbin_nginx_path.is_file():
# Remove the custom plugin from the shared library
if sys_path:
sys_path.pop()
sys_modules.pop("actions", None)
del actions
if tmp_dir:
sys_path.pop()
rmtree(tmp_dir, ignore_errors=True)
if message or not isinstance(res, dict) and not res:
return {"status": "ko", "code": 500, "message": message or "The plugin did not return a valid response"}