mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Fix form action URLs in bans_modal.html and bans.html + Add back-end logic for ban page
This commit is contained in:
parent
a737bad334
commit
232b55142e
5 changed files with 132 additions and 110 deletions
|
|
@ -1,6 +1,7 @@
|
|||
#!/usr/bin/env python3
|
||||
|
||||
from contextlib import suppress
|
||||
from math import floor
|
||||
from os import _exit, getenv, listdir, sep, urandom
|
||||
from os.path import basename, dirname, join
|
||||
from secrets import choice
|
||||
|
|
@ -48,7 +49,7 @@ from src.Config import Config
|
|||
from src.ReverseProxied import ReverseProxied
|
||||
from src.User import AnonymousUser, User
|
||||
|
||||
from utils import check_settings, get_b64encoded_qr_image, path_to_dict, get_remain, get_range_from_remain
|
||||
from utils import check_settings, get_b64encoded_qr_image, path_to_dict, get_remain
|
||||
from Database import Database # type: ignore
|
||||
from logging import getLogger
|
||||
|
||||
|
|
@ -1678,56 +1679,68 @@ def bans():
|
|||
flash("No data to proceed", "error")
|
||||
return redirect(url_for("bans"))
|
||||
|
||||
# Multiple operations : add ban or unban
|
||||
operation = request.form["operation"]
|
||||
# data = request.form["data"]
|
||||
try:
|
||||
data = json_loads(request.form["data"])
|
||||
assert isinstance(data, list)
|
||||
except BaseException:
|
||||
flash("Data must be a list of dict", "error")
|
||||
return redirect(url_for("bans"))
|
||||
|
||||
# TODO : unban logic
|
||||
# data format for unban is the same as bans send on client
|
||||
if operation == "unban":
|
||||
pass
|
||||
if request.form["operation"] == "unban":
|
||||
for unban in data:
|
||||
try:
|
||||
unban = json_loads(unban.replace('"', '"').replace("'", '"'))
|
||||
except BaseException:
|
||||
continue
|
||||
if "ip" not in unban:
|
||||
continue
|
||||
resp = app.config["INSTANCES"].unban(unban["ip"])
|
||||
if resp:
|
||||
flash(f"Couldn't unban {unban['ip']} on the following instances: {', '.join(resp)}", "error")
|
||||
else:
|
||||
flash(f"Successfully unbanned {unban['ip']}")
|
||||
elif request.form["operation"] == "ban":
|
||||
for ban in data:
|
||||
try:
|
||||
ban = json_loads(ban.replace('"', '"').replace("'", '"'))
|
||||
except BaseException:
|
||||
continue
|
||||
if "ip" not in ban:
|
||||
continue
|
||||
try:
|
||||
ban_end = float(ban.get("ban_end", 86400))
|
||||
except BaseException:
|
||||
continue
|
||||
resp = app.config["INSTANCES"].ban(ban["ip"], ban_end, ban.get("reason", "manual"))
|
||||
if resp:
|
||||
flash(f"Couldn't ban {ban['ip']} on the following instances: {', '.join(resp)}", "error")
|
||||
else:
|
||||
flash(f"Successfully banned {ban['ip']}")
|
||||
else:
|
||||
flash("Operation unknown", "error")
|
||||
return redirect(url_for("bans"))
|
||||
|
||||
# TODO : add ban logic
|
||||
# data format : [{"ip": string, "ban_start": timestamp, "ban_end": timestamp, "reason": string}]
|
||||
# "ban_start" is optional : default is time.time()
|
||||
# "ban_end" is optional : default is one month
|
||||
if operation == "ban":
|
||||
pass
|
||||
return redirect(url_for("loading", next=url_for("bans"), message="Update bans"))
|
||||
|
||||
return redirect(
|
||||
url_for(
|
||||
"loading",
|
||||
next=url_for("bans"),
|
||||
message="Update bans",
|
||||
)
|
||||
)
|
||||
|
||||
# TODO : Get bans list from database and send it
|
||||
# Need to limit the number of bans around 100 last ones
|
||||
bans = [
|
||||
{"ip": "124.0.0.1", "ban_start": 1705663430, "ban_end": 1705758948, "reason": "antibot"},
|
||||
{"ip": "124.0.0.2", "ban_start": 1705663430, "ban_end": 1708437348, "reason": "test"},
|
||||
{"ip": "124.0.0.3", "ban_start": 1705663430, "ban_end": 1740059748, "reason": "unknown"},
|
||||
]
|
||||
bans = app.config["INSTANCES"].get_bans()[:100]
|
||||
|
||||
# Prepare data
|
||||
reasons = {}
|
||||
now_stamp = int(time()) # in seconds
|
||||
timestamp_now = time()
|
||||
|
||||
for ban in bans:
|
||||
exp = ban.pop("exp")
|
||||
# Add remain
|
||||
remain = "unknown" if ban["ban_end"] - now_stamp < 0 else get_remain(ban["ban_end"] - now_stamp)
|
||||
ban["remain"] = remain
|
||||
ban["term"] = get_range_from_remain(remain)
|
||||
ban["remain"], ban["term"] = ("unknown", "unknown") if exp <= 0 else get_remain(exp)
|
||||
# Convert stamp to date
|
||||
ban["ban_start"] = datetime.fromtimestamp(ban["ban_start"])
|
||||
ban["ban_end"] = datetime.fromtimestamp(ban["ban_end"])
|
||||
ban["ban_start"] = datetime.fromtimestamp(floor(ban["date"])).strftime("%d/%m/%Y %H:%M:%S")
|
||||
ban["ban_end"] = datetime.fromtimestamp(floor(timestamp_now + exp)).strftime("%d/%m/%Y %H:%M:%S")
|
||||
# Get top reason
|
||||
if not ban["reason"] in reasons:
|
||||
reasons[ban["reason"]] = 0
|
||||
reasons[ban["reason"]] = reasons[ban["reason"]] + 1
|
||||
|
||||
top_reason = [k for k, v in reasons.items() if v == max(reasons.values())][0]
|
||||
top_reason = ([k for k, v in reasons.items() if v == max(reasons.values())] or [""])[0]
|
||||
|
||||
return render_template(
|
||||
"bans.html",
|
||||
|
|
|
|||
|
|
@ -4,7 +4,7 @@ from os import sep
|
|||
from os.path import join
|
||||
from pathlib import Path
|
||||
from subprocess import DEVNULL, STDOUT, run
|
||||
from typing import Any, Optional, Union
|
||||
from typing import Any, List, Optional, Tuple, Union
|
||||
|
||||
from API import API # type: ignore
|
||||
from ApiCaller import ApiCaller # type: ignore
|
||||
|
|
@ -106,6 +106,15 @@ class Instance:
|
|||
|
||||
return self.apiCaller.send_to_apis("POST", "/restart")
|
||||
|
||||
def bans(self) -> Tuple[bool, dict[str, Any]]:
|
||||
return self.apiCaller.send_to_apis("GET", "/bans", response=True)
|
||||
|
||||
def ban(self, ip: str, exp: float, reason: str) -> bool:
|
||||
return self.apiCaller.send_to_apis("POST", "/ban", data={"ip": ip, "exp": exp, "reason": reason})
|
||||
|
||||
def unban(self, ip: str) -> bool:
|
||||
return self.apiCaller.send_to_apis("POST", "/unban", data={"ip": ip})
|
||||
|
||||
|
||||
class Instances:
|
||||
def __init__(self, docker_client, kubernetes_client, integration: str):
|
||||
|
|
@ -255,7 +264,7 @@ class Instances:
|
|||
return not_reloaded or "Successfully reloaded instances"
|
||||
|
||||
def reload_instance(self, _id: Optional[int] = None, instance: Optional[Instance] = None) -> str:
|
||||
if instance is None:
|
||||
if not instance:
|
||||
instance = self.__instance_from_id(_id)
|
||||
|
||||
result = instance.reload()
|
||||
|
|
@ -294,3 +303,42 @@ class Instances:
|
|||
return f"Instance {instance.name} has been restarted."
|
||||
|
||||
return f"Can't restart {instance.name}"
|
||||
|
||||
def get_bans(self, _id: Optional[int] = None) -> List[dict[str, Any]]:
|
||||
if _id:
|
||||
instance = self.__instance_from_id(_id)
|
||||
resp, instance_bans = instance.bans()
|
||||
if not resp:
|
||||
return []
|
||||
return instance_bans[instance.name].get("data", [])
|
||||
|
||||
bans: List[dict[str, Any]] = []
|
||||
for instance in self.get_instances():
|
||||
resp, instance_bans = instance.bans()
|
||||
if not resp:
|
||||
continue
|
||||
bans.extend(instance_bans[instance.name].get("data", []))
|
||||
|
||||
bans.sort(key=lambda x: x["exp"])
|
||||
|
||||
unique_bans = {}
|
||||
|
||||
return [unique_bans.setdefault(item["ip"], item) for item in bans if item["ip"] not in unique_bans]
|
||||
|
||||
def ban(self, ip: str, exp: float, reason: str, _id: Optional[int] = None) -> Union[str, list[str]]:
|
||||
if _id:
|
||||
instance = self.__instance_from_id(_id)
|
||||
if instance.ban(ip, exp, reason):
|
||||
return ""
|
||||
return f"Can't ban {ip} on {instance.name}"
|
||||
|
||||
return [instance.name for instance in self.get_instances() if not instance.ban(ip, exp, reason)]
|
||||
|
||||
def unban(self, ip: str, _id: Optional[int] = None) -> Union[str, list[str]]:
|
||||
if _id:
|
||||
instance = self.__instance_from_id(_id)
|
||||
if instance.unban(ip):
|
||||
return ""
|
||||
return f"Can't unban {ip} on {instance.name}"
|
||||
|
||||
return [instance.name for instance in self.get_instances() if not instance.unban(ip)]
|
||||
|
|
|
|||
2
src/ui/templates/bans.html
vendored
2
src/ui/templates/bans.html
vendored
|
|
@ -349,7 +349,7 @@ url_for(request.endpoint)[1:].split("/")[-1].strip() %}
|
|||
</div>
|
||||
</div>
|
||||
|
||||
<form id="unban-items" action="/bans" method="post" class="w-full col-span-12 justify-center flex mt-6 mb-3">
|
||||
<form id="unban-items" action="bans" method="post" class="w-full col-span-12 justify-center flex mt-6 mb-3">
|
||||
<input type="hidden" name="csrf_token" value="{{csrf_token()}}">
|
||||
<input type="hidden" name="operation" value="unban">
|
||||
<input data-unban-inp type="hidden" name="data" value="">
|
||||
|
|
|
|||
2
src/ui/templates/bans_modal.html
vendored
2
src/ui/templates/bans_modal.html
vendored
|
|
@ -174,7 +174,7 @@
|
|||
data-ban-add-form
|
||||
class="w-full flex flex-col justify-between"
|
||||
id="form-new"
|
||||
action="/bans"
|
||||
action="bans"
|
||||
method="POST"
|
||||
>
|
||||
<input type="hidden" name="csrf_token" value="{{csrf_token()}}" />
|
||||
|
|
|
|||
101
src/ui/utils.py
101
src/ui/utils.py
|
|
@ -7,79 +7,40 @@ from typing import List, Optional
|
|||
|
||||
from qrcode.main import QRCode
|
||||
|
||||
import math
|
||||
|
||||
def get_remain(seconds):
|
||||
term = "minute(s)"
|
||||
years, seconds = divmod(seconds, 60 * 60 * 24 * 365)
|
||||
months, seconds = divmod(seconds, 60 * 60 * 24 * 30)
|
||||
while months >= 12:
|
||||
years += 1
|
||||
months -= 12
|
||||
days, seconds = divmod(seconds, 60 * 60 * 24)
|
||||
hours, seconds = divmod(seconds, 60 * 60)
|
||||
minutes, seconds = divmod(seconds, 60)
|
||||
time_parts = []
|
||||
if years > 0:
|
||||
term = "year(s)"
|
||||
time_parts.append(f"{int(years)} year{'' if years == 1 else 's'}")
|
||||
if months > 0:
|
||||
if term == "minute(s)":
|
||||
term = "month(s)"
|
||||
time_parts.append(f"{int(months)} month{'' if months == 1 else 's'}")
|
||||
if days > 0:
|
||||
if term == "minute(s)":
|
||||
term = "day(s)"
|
||||
time_parts.append(f"{int(days)} day{'' if days == 1 else 's'}")
|
||||
if hours > 0:
|
||||
if term == "minute(s)":
|
||||
term = "hour(s)"
|
||||
time_parts.append(f"{int(hours)} hour{'' if hours == 1 else 's'}")
|
||||
if minutes > 0:
|
||||
time_parts.append(f"{int(minutes)} minute{'' if minutes == 1 else 's'}")
|
||||
|
||||
def get_remain(remain_time):
|
||||
# Convert s to ms
|
||||
ms = int(str(remain_time) + "000")
|
||||
if len(time_parts) > 1:
|
||||
time_parts[-1] = f"and {time_parts[-1]}"
|
||||
|
||||
seconds = math.floor(ms / 1000)
|
||||
minutes = math.floor(seconds / 60)
|
||||
hours = math.floor(minutes / 60)
|
||||
days = math.floor(hours / 24)
|
||||
months = math.floor(days / 30)
|
||||
years = math.floor(days / 365)
|
||||
seconds %= 60
|
||||
minutes %= 60
|
||||
hours %= 24
|
||||
days %= 30
|
||||
months %= 12
|
||||
|
||||
remain = f'{format_remain(years, "year")} {format_remain(months, "month")} {format_remain(days, "day")} {format_remain(hours, "hour")} {format_remain(minutes, "minute")} {format_remain(seconds, "second")}'
|
||||
return remain
|
||||
|
||||
|
||||
def format_remain(num, singular):
|
||||
if not num:
|
||||
return ""
|
||||
|
||||
if num == 1:
|
||||
return f"{num} {singular}"
|
||||
|
||||
if num > 1:
|
||||
return f"{num} {singular}s"
|
||||
|
||||
|
||||
def get_range_from_remain(remain):
|
||||
# Not handle
|
||||
if remain == "unknown":
|
||||
return remain
|
||||
|
||||
# Data, need format <n>y <n>m <n>d <n>h <n>min <n>s
|
||||
split_remain = remain.split(" ")
|
||||
terms = [num for num in split_remain if num.isdigit()]
|
||||
term = ""
|
||||
formats = ["year(s)", "month(s)", "day(s)", "hour(s)", "minute(s)", "second(s)"]
|
||||
chars = ["year", "month", "day", "hour", "second", "s"]
|
||||
|
||||
# start from seconds to years, stop when first 0 occurrence
|
||||
# The remain term is first 0 occurrence - 1
|
||||
for i in range(len(terms)):
|
||||
# remove letter
|
||||
num = terms[len(terms) - 1 - i]
|
||||
for char in chars:
|
||||
num = num.replace(char, "")
|
||||
num = "0" if not num else num
|
||||
|
||||
num = int(num)
|
||||
|
||||
# Case seconds or less
|
||||
if not num and i == 0:
|
||||
term = formats[len(formats) - 1]
|
||||
break
|
||||
|
||||
# Case last element
|
||||
if num and i == (len(terms) - 1):
|
||||
term = formats[len(formats) - 1 - i]
|
||||
break
|
||||
|
||||
# Case between seconds and years
|
||||
if not num:
|
||||
term = formats[len(formats) - i]
|
||||
break
|
||||
|
||||
return term
|
||||
return " ".join(time_parts), term
|
||||
|
||||
|
||||
def path_to_dict(
|
||||
|
|
|
|||
Loading…
Reference in a new issue