Start adding new bans data for builder + add a few comments + make optimizations

This commit is contained in:
Théophile Diot 2024-08-12 16:11:27 +01:00
parent 0c84bba4fc
commit 128ca47de0
No known key found for this signature in database
GPG key ID: FA995104A0BA376A

View file

@ -1,6 +1,5 @@
#!/usr/bin/env python3
import json
import base64
from base64 import b64encode
from contextlib import suppress
from math import floor
from os import _exit, getenv, listdir, sep
@ -48,15 +47,17 @@ from src.reverse_proxied import ReverseProxied
from src.totp import Totp
from src.ui_data import UIData
from builder.home import home_builder
from builder.instances import instances_builder
from builder.global_config import global_config_builder
from builder.jobs import jobs_builder
from builder.services import services_builder
from builder.raw_mode import raw_mode_builder
from builder.advanced_mode import advanced_mode_builder
from builder.easy_mode import easy_mode_builder
from builder.logs import logs_builder
# TODO: rename to bans
from builder.bans2 import bans_builder # type: ignore
from builder.home import home_builder # type: ignore
from builder.instances import instances_builder # type: ignore
from builder.global_config import global_config_builder # type: ignore
from builder.jobs import jobs_builder # type: ignore
from builder.services import services_builder # type: ignore
from builder.raw_mode import raw_mode_builder # type: ignore
from builder.advanced_mode import advanced_mode_builder # type: ignore
from builder.easy_mode import easy_mode_builder # type: ignore
from builder.logs import logs_builder # type: ignore
from common_utils import get_version # type: ignore
from logger import setup_logger # type: ignore
@ -175,6 +176,7 @@ LOG_RX = re_compile(r"^(?P<date>\d+/\d+/\d+\s\d+:\d+:\d+)\s\[(?P<level>[a-z]+)\]
REVERSE_PROXY_PATH = re_compile(r"^(?P<host>https?://.{1,255}(:((6553[0-5])|(655[0-2]\d)|(65[0-4]\d{2})|(6[0-4]\d{3})|([1-5]\d{4})|([0-5]{0,5})|(\d{1,4})))?)$")
# TODO: Move this to a utils file
def wait_applying():
current_time = datetime.now()
ready = False
@ -197,6 +199,7 @@ def wait_applying():
app.logger.error("Too many retries while waiting for scheduler to apply configuration...")
# TODO: Find a more elegant way to handle this
def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: bool = False, was_draft: bool = False, threaded: bool = False) -> int:
# Do the operation
error = 0
@ -265,6 +268,7 @@ def manage_bunkerweb(method: str, *args, operation: str = "reloads", is_draft: b
# UTILS
# TODO: move this to a utils file
def run_action(plugin: str, function_name: str = "", *, tmp_dir: Optional[Path] = None) -> Union[dict, Response]:
message = ""
if not tmp_dir:
@ -352,10 +356,7 @@ def run_action(plugin: str, function_name: str = "", *, tmp_dir: Optional[Path]
return {"status": "ok", "code": 200, "data": res}
def get_user_info():
return current_user.get_id(), current_user.password.encode("utf-8"), bool(current_user.totp_secret), current_user.totp_secret
# TODO: move this to a utils file
def verify_data_in_form(data: dict[str, Union[tuple, any]] = {}, err_message: str = "", redirect_url: str = "", next: bool = False) -> Union[bool, Response]:
# Loop on each key in data
for key, values in data.items():
@ -372,6 +373,7 @@ def verify_data_in_form(data: dict[str, Union[tuple, any]] = {}, err_message: st
return True
# TODO: move this to a utils file
def handle_error(err_message: str = "", redirect_url: str = "", next: bool = False, log: Union[bool, str] = False) -> Union[bool, Response]:
"""Handle error message, flash it, log it if needed and redirect to redirect_url if provided or return False."""
flash(err_message, "error")
@ -391,6 +393,7 @@ def handle_error(err_message: str = "", redirect_url: str = "", next: bool = Fal
return redirect(url_for(redirect_url))
# TODO: move this to a utils file
def error_message(msg: str):
app.logger.error(msg)
return {"status": "ko", "message": msg}
@ -424,7 +427,7 @@ def inject_variables():
# check that is value is in tuple
return dict(
data_server_global=json.dumps({"username": current_user.get_id() if current_user.is_authenticated else "", "plugins_page": plugins_page}),
data_server_global=dumps({"username": current_user.get_id() if current_user.is_authenticated else "", "plugins_page": plugins_page}),
script_nonce=app.config["SCRIPT_NONCE"],
is_pro_version=metadata["is_pro"],
pro_status=metadata["pro_status"],
@ -598,6 +601,9 @@ def before_request():
return logout()
### * ROUTES * ###
@app.route("/", strict_slashes=False)
def index():
if app.db.get_ui_user():
@ -816,10 +822,8 @@ def home():
"plugins_errors": app.db.get_plugins_errors(),
}
data_server_builder = home_builder(data)
data_server_builder = base64.b64encode(bytes(json.dumps(data_server_builder), "utf-8")).decode("ascii")
return render_template("home.html", data_server_builder=data_server_builder)
builder = home_builder(data)
return render_template("home.html", data_server_builder=b64encode(dumps(builder).encode("utf-8")).decode("ascii"))
@app.route("/account", methods=["GET", "POST"])
@ -1019,10 +1023,8 @@ def instances():
# Display instances
instances = app.bw_instances_utils.get_instances()
data_server_builder = instances_builder(instances)
data_server_builder = base64.b64encode(bytes(json.dumps(data_server_builder), "utf-8")).decode("ascii")
return render_template("instances.html", title="Instances", data_server_builder=data_server_builder)
builder = instances_builder(instances)
return render_template("instances.html", title="Instances", data_server_builder=b64encode(dumps(builder).encode("utf-8")).decode("ascii"))
def get_service_data(page_name: str):
@ -1213,21 +1215,17 @@ def services_modes():
global_config = app.bw_config.get_config(global_only=True, methods=True)
plugins = app.bw_config.get_plugins()
data_server_builder = None
builder = None
templates_db = app.db.get_templates()
if mode == "raw":
data_server_builder = raw_mode_builder(templates_db, plugins, global_config, total_config, service_name or "new", False if service_name else True)
builder = raw_mode_builder(templates_db, plugins, global_config, total_config, service_name or "new", not service_name)
elif mode == "advanced":
builder = advanced_mode_builder(templates_db, plugins, global_config, total_config, service_name or "new", not service_name)
elif mode == "easy":
builder = easy_mode_builder(templates_db, plugins, global_config, total_config, service_name or "new", not service_name)
if mode == "advanced":
data_server_builder = advanced_mode_builder(templates_db, plugins, global_config, total_config, service_name or "new", False if service_name else True)
if mode == "easy":
data_server_builder = easy_mode_builder(templates_db, plugins, global_config, total_config, service_name or "new", False if service_name else True)
data_server_builder = base64.b64encode(bytes(json.dumps(data_server_builder), "utf-8")).decode("ascii")
return render_template("modes.html", data_server_builder=data_server_builder)
return render_template("modes.html", data_server_builder=b64encode(dumps(builder).encode("utf-8")).decode("ascii"))
@app.route("/services", methods=["GET", "POST"])
@ -1276,10 +1274,8 @@ def services():
services.sort(key=lambda x: x["SERVER_NAME"]["value"])
data_server_builder = services_builder(services)
data_server_builder = base64.b64encode(bytes(json.dumps(data_server_builder), "utf-8")).decode("ascii")
return render_template("services.html", data_server_builder=data_server_builder)
builder = services_builder(services)
return render_template("services.html", data_server_builder=b64encode(dumps(builder).encode("utf-8")).decode("ascii"))
@app.route("/global-config", methods=["GET", "POST"])
@ -1350,10 +1346,8 @@ def global_config():
global_config = app.bw_config.get_config(global_only=True, methods=True)
plugins = app.bw_config.get_plugins()
data_server_builder = global_config_builder({}, plugins, global_config)
data_server_builder = base64.b64encode(bytes(json.dumps(data_server_builder), "utf-8")).decode("ascii")
return render_template("global-config.html", data_server_builder=data_server_builder)
builder = global_config_builder({}, plugins, global_config)
return render_template("global-config.html", data_server_builder=b64encode(dumps(builder).encode("utf-8")).decode("ascii"))
@app.route("/configs", methods=["GET", "POST"])
@ -2032,9 +2026,8 @@ def logs():
with logs_path.joinpath(current_file).open(encoding="utf-8") as f:
raw_logs = f.read()
data_server_builder = logs_builder(files, current_file, raw_logs)
data_server_builder = base64.b64encode(bytes(json.dumps(data_server_builder), "utf-8")).decode("ascii")
return render_template("logs.html", data_server_builder=data_server_builder)
builder = logs_builder(files, current_file, raw_logs)
return render_template("logs.html", data_server_builder=b64encode(dumps(builder).encode("utf-8")).decode("ascii"))
@app.route("/reports", methods=["GET"])
@ -2250,38 +2243,38 @@ def bans():
instance_bans = app.bw_instances_utils.get_bans()
# Prepare data
reasons = {}
timestamp_now = time()
for ban in instance_bans:
if not any(b["ip"] == ban["ip"] for b in bans):
bans.append(ban)
# Get the last 100 bans
bans = bans[:100]
reasons = {"all"}
remains = {"all"}
for ban in bans:
exp = ban.pop("exp", 0)
# Add remain
ban["remain"], ban["term"] = ("unknown", "unknown") if exp <= 0 else get_remain(exp)
remain = ("unknown", "unknown") if exp <= 0 else get_remain(exp)
ban["remain"] = remain[0]
if remain[1] != "unknown":
remains.add(remain[1])
# Convert stamp to date
ban["ban_start"] = datetime.fromtimestamp(floor(ban["date"])).strftime("%d/%m/%Y %H:%M:%S")
ban["ban_end"] = datetime.fromtimestamp(floor(timestamp_now + exp)).strftime("%d/%m/%Y %H:%M:%S")
# Get top reason
if not ban["reason"] in reasons:
reasons[ban["reason"]] = 0
reasons[ban["reason"]] = reasons[ban["reason"]] + 1
reasons.add(["reason"])
top_reason = ([k for k, v in reasons.items() if v == max(reasons.values())] or [""])[0]
return render_template("bans.html", bans=bans, top_reason=top_reason, username=current_user.get_id())
builder = bans_builder(bans, list(reasons), list(remains))
return render_template("bans.html", data_server_builder=b64encode(dumps(builder).encode("utf-8")).decode("ascii"))
@app.route("/jobs", methods=["GET"])
@login_required
def jobs():
data_server_builder = jobs_builder(app.db.get_jobs())
data_server_builder = base64.b64encode(bytes(json.dumps(data_server_builder), "utf-8")).decode("ascii")
return render_template("jobs.html", data_server_builder=data_server_builder)
builder = jobs_builder(app.db.get_jobs())
return render_template("jobs.html", data_server_builder=b64encode(dumps(builder).encode("utf-8")).decode("ascii"))
@app.route("/jobs/download", methods=["GET"])