Fix snyk vulnerabilities in web UI main.py

This commit is contained in:
Théophile Diot 2024-04-02 09:22:50 +01:00
parent edc0bf26c2
commit 8f253c3f2a
No known key found for this signature in database
GPG key ID: 248FEA4BAE400D06

View file

@ -3,7 +3,7 @@
from contextlib import suppress
from math import floor
from os import _exit, getenv, getpid, listdir, sep, urandom
from os.path import basename, dirname, join
from os.path import basename, dirname, isabs, join
from secrets import choice
from string import ascii_letters, digits
from sys import path as sys_path, modules as sys_modules
@ -28,7 +28,7 @@ from hashlib import sha256
from importlib.machinery import SourceFileLoader
from io import BytesIO
from json import JSONDecodeError, dumps, loads as json_loads
from jinja2 import Environment, FileSystemLoader
from jinja2 import Environment, FileSystemLoader, select_autoescape
from kubernetes import client as kube_client
from kubernetes import config as kube_config
from kubernetes.client.exceptions import ApiException as kube_ApiException
@ -42,6 +42,7 @@ from tarfile import CompressionError, HeaderError, ReadError, TarError, open as
from threading import Thread
from tempfile import NamedTemporaryFile
from time import sleep, time
from werkzeug.utils import secure_filename
from zipfile import BadZipFile, ZipFile
from src.Instances import Instances
@ -492,6 +493,7 @@ def loading():
@app.route("/check", methods=["GET"])
def check():
# deepcode ignore TooPermissiveCors: We need to allow all origins for the wizard
return Response(status=200, headers={"Access-Control-Allow-Origin": "*"}, response=dumps({"message": "ok"}), content_type="application/json")
@ -548,6 +550,7 @@ def setup():
app.config["RELOADING"] = True
app.config["LAST_RELOAD"] = time()
# deepcode ignore MissingAPI: We don't need to check to wait for the thread to finish
Thread(
target=manage_bunkerweb,
name="Reloading instances",
@ -696,6 +699,7 @@ def account():
# Reload instances
app.config["RELOADING"] = True
app.config["LAST_RELOAD"] = time()
# deepcode ignore MissingAPI: We don't need to check to wait for the thread to finish
Thread(
target=manage_bunkerweb,
name="Reloading instances",
@ -819,6 +823,7 @@ def instances():
return redirect_flash_error("Missing operation parameter on /instances.", "instances")
app.config["RELOADING"] = True
app.config["LAST_RELOAD"] = time()
# deepcode ignore MissingAPI: We don't need to check to wait for the thread to finish
Thread(
target=manage_bunkerweb,
name="Reloading instances",
@ -922,6 +927,7 @@ def services():
# Reload instances
app.config["RELOADING"] = True
app.config["LAST_RELOAD"] = time()
# deepcode ignore MissingAPI: We don't need to check to wait for the thread to finish
Thread(
target=manage_bunkerweb,
name="Reloading instances",
@ -1030,6 +1036,7 @@ def global_config():
# Reload instances
app.config["RELOADING"] = True
app.config["LAST_RELOAD"] = time()
# deepcode ignore MissingAPI: We don't need to check to wait for the thread to finish
Thread(
target=manage_bunkerweb,
name="Reloading instances",
@ -1250,8 +1257,10 @@ def plugins():
except KeyError:
is_dir = True
try:
# deepcode ignore TarSlip: We don't need to check for tar slip as we are checking the files when they are uploaded
tar_file.extractall(str(temp_folder_path), filter="data")
except TypeError:
# deepcode ignore TarSlip: We don't need to check for tar slip as we are checking the files when they are uploaded
tar_file.extractall(str(temp_folder_path))
except ReadError:
errors += 1
@ -1392,6 +1401,7 @@ def plugins():
# Reload instances
app.config["RELOADING"] = True
app.config["LAST_RELOAD"] = time()
# deepcode ignore MissingAPI: We don't need to check to wait for the thread to finish
Thread(
target=manage_bunkerweb,
name="Reloading instances",
@ -1436,9 +1446,14 @@ def upload_plugin():
tmp_ui_path.mkdir(parents=True, exist_ok=True)
for uploaded_file in request.files.values():
if not uploaded_file.filename:
continue
if not uploaded_file.filename.endswith((".zip", ".tar.gz", ".tar.xz")):
return {"status": "ko"}, 422
folder_name = Path(secure_filename(uploaded_file.filename)).stem
with BytesIO(uploaded_file.read()) as io:
io.seek(0, 0)
plugins = []
@ -1448,23 +1463,32 @@ def upload_plugin():
if file.endswith("plugin.json"):
plugins.append(basename(dirname(file)))
if len(plugins) > 1:
for file in zip_file.namelist():
if isabs(file) or ".." in file:
return {"status": "ko"}, 422
zip_file.extractall(str(tmp_ui_path) + "/")
folder_name = uploaded_file.filename.replace(".zip", "")
else:
with tar_open(fileobj=io) as tar_file:
for file in tar_file.getnames():
if file.endswith("plugin.json"):
plugins.append(basename(dirname(file)))
if len(plugins) > 1:
for member in tar_file.getmembers():
if isabs(member.name) or ".." in member.name:
return {"status": "ko"}, 422
try:
# deepcode ignore TarSlip: The files in the tar are being inspected before extraction
tar_file.extractall(str(tmp_ui_path) + "/", filter="data")
except TypeError:
# deepcode ignore TarSlip: The files in the tar are being inspected before extraction
tar_file.extractall(str(tmp_ui_path) + "/")
folder_name = uploaded_file.filename.replace(".tar.gz", "").replace(".tar.xz", "")
if len(plugins) <= 1:
io.seek(0, 0)
tmp_ui_path.joinpath(uploaded_file.filename).write_bytes(io.read())
# deepcode ignore PT: The folder name is being sanitized before
tmp_ui_path.joinpath(folder_name).write_bytes(io.read())
return {"status": "ok"}, 201
for plugin in plugins:
@ -1474,7 +1498,8 @@ def upload_plugin():
tgz.seek(0, 0)
tmp_ui_path.joinpath(f"{plugin}.tar.gz").write_bytes(tgz.read())
rmtree(str(tmp_ui_path.joinpath(folder_name)), ignore_errors=True)
# deepcode ignore PT: The folder name is being sanitized before
rmtree(tmp_ui_path.joinpath(folder_name), ignore_errors=True)
return {"status": "ok"}, 201
@ -1584,7 +1609,10 @@ def custom_plugin(plugin: str):
# Get prerender from action.py
pre_render = run_action(plugin, "pre_render")
return render_template(
Environment(loader=FileSystemLoader(join(sep, "usr", "share", "bunkerweb", "ui", "templates") + "/")).from_string(page.decode("utf-8")),
# deepcode ignore Ssti: We trust the plugin template
Environment(
loader=FileSystemLoader(join(sep, "usr", "share", "bunkerweb", "ui", "templates") + "/"), autoescape=select_autoescape(["html"])
).from_string(page.decode("utf-8")),
username=current_user.get_id(),
current_endpoint=plugin,
plugin=curr_plugin,
@ -1778,14 +1806,14 @@ def logs_container(container_id):
if docker_client:
try:
if INTEGRATION != "Swarm":
docker_logs = docker_client.containers.get(container_id).logs(
docker_logs = docker_client.containers.get(container_id).logs( # type: ignore
stdout=True,
stderr=True,
since=datetime.fromtimestamp(last_update),
timestamps=True,
)
else:
docker_logs = docker_client.services.get(container_id).logs(
docker_logs = docker_client.services.get(container_id).logs( # type: ignore
stdout=True,
stderr=True,
since=datetime.fromtimestamp(last_update),
@ -2079,12 +2107,15 @@ def jobs_download():
if not plugin_id or not job_name or not file_name:
return jsonify({"status": "ko", "message": "plugin_id, job_name and file_name are required"}), 422
file_name = secure_filename(file_name)
cache_file = db.get_job_cache_file(job_name, file_name, service_id=service_id, plugin_id=plugin_id)
if not cache_file:
return jsonify({"status": "ko", "message": "file not found"}), 404
file = BytesIO(cache_file)
# deepcode ignore PT: We sanitize the file name
return send_file(file, as_attachment=True, download_name=file_name)