Fix form action URLs in bans_modal.html and bans.html + Add back-end logic for ban page

This commit is contained in:
Théophile Diot 2024-01-23 18:16:53 +01:00
parent a737bad334
commit 232b55142e
No known key found for this signature in database
GPG key ID: 248FEA4BAE400D06
5 changed files with 132 additions and 110 deletions

View file

@ -1,6 +1,7 @@
#!/usr/bin/env python3
from contextlib import suppress
from math import floor
from os import _exit, getenv, listdir, sep, urandom
from os.path import basename, dirname, join
from secrets import choice
@ -48,7 +49,7 @@ from src.Config import Config
from src.ReverseProxied import ReverseProxied
from src.User import AnonymousUser, User
from utils import check_settings, get_b64encoded_qr_image, path_to_dict, get_remain, get_range_from_remain
from utils import check_settings, get_b64encoded_qr_image, path_to_dict, get_remain
from Database import Database # type: ignore
from logging import getLogger
@ -1678,56 +1679,68 @@ def bans():
flash("No data to proceed", "error")
return redirect(url_for("bans"))
# Multiple operations : add ban or unban
operation = request.form["operation"]
# data = request.form["data"]
try:
data = json_loads(request.form["data"])
assert isinstance(data, list)
except BaseException:
flash("Data must be a list of dict", "error")
return redirect(url_for("bans"))
# TODO : unban logic
# data format for unban is the same as bans send on client
if operation == "unban":
pass
if request.form["operation"] == "unban":
for unban in data:
try:
unban = json_loads(unban.replace('"', '"').replace("'", '"'))
except BaseException:
continue
if "ip" not in unban:
continue
resp = app.config["INSTANCES"].unban(unban["ip"])
if resp:
flash(f"Couldn't unban {unban['ip']} on the following instances: {', '.join(resp)}", "error")
else:
flash(f"Successfully unbanned {unban['ip']}")
elif request.form["operation"] == "ban":
for ban in data:
try:
ban = json_loads(ban.replace('"', '"').replace("'", '"'))
except BaseException:
continue
if "ip" not in ban:
continue
try:
ban_end = float(ban.get("ban_end", 86400))
except BaseException:
continue
resp = app.config["INSTANCES"].ban(ban["ip"], ban_end, ban.get("reason", "manual"))
if resp:
flash(f"Couldn't ban {ban['ip']} on the following instances: {', '.join(resp)}", "error")
else:
flash(f"Successfully banned {ban['ip']}")
else:
flash("Operation unknown", "error")
return redirect(url_for("bans"))
# TODO : add ban logic
# data format : [{"ip": string, "ban_start": timestamp, "ban_end": timestamp, "reason": string}]
# "ban_start" is optional : default is time.time()
# "ban_end" is optional : default is one month
if operation == "ban":
pass
return redirect(url_for("loading", next=url_for("bans"), message="Update bans"))
return redirect(
url_for(
"loading",
next=url_for("bans"),
message="Update bans",
)
)
# TODO : Get bans list from database and send it
# Need to limit the number of bans around 100 last ones
bans = [
{"ip": "124.0.0.1", "ban_start": 1705663430, "ban_end": 1705758948, "reason": "antibot"},
{"ip": "124.0.0.2", "ban_start": 1705663430, "ban_end": 1708437348, "reason": "test"},
{"ip": "124.0.0.3", "ban_start": 1705663430, "ban_end": 1740059748, "reason": "unknown"},
]
bans = app.config["INSTANCES"].get_bans()[:100]
# Prepare data
reasons = {}
now_stamp = int(time()) # in seconds
timestamp_now = time()
for ban in bans:
exp = ban.pop("exp")
# Add remain
remain = "unknown" if ban["ban_end"] - now_stamp < 0 else get_remain(ban["ban_end"] - now_stamp)
ban["remain"] = remain
ban["term"] = get_range_from_remain(remain)
ban["remain"], ban["term"] = ("unknown", "unknown") if exp <= 0 else get_remain(exp)
# Convert stamp to date
ban["ban_start"] = datetime.fromtimestamp(ban["ban_start"])
ban["ban_end"] = datetime.fromtimestamp(ban["ban_end"])
ban["ban_start"] = datetime.fromtimestamp(floor(ban["date"])).strftime("%d/%m/%Y %H:%M:%S")
ban["ban_end"] = datetime.fromtimestamp(floor(timestamp_now + exp)).strftime("%d/%m/%Y %H:%M:%S")
# Get top reason
if not ban["reason"] in reasons:
reasons[ban["reason"]] = 0
reasons[ban["reason"]] = reasons[ban["reason"]] + 1
top_reason = [k for k, v in reasons.items() if v == max(reasons.values())][0]
top_reason = ([k for k, v in reasons.items() if v == max(reasons.values())] or [""])[0]
return render_template(
"bans.html",

View file

@ -4,7 +4,7 @@ from os import sep
from os.path import join
from pathlib import Path
from subprocess import DEVNULL, STDOUT, run
from typing import Any, Optional, Union
from typing import Any, List, Optional, Tuple, Union
from API import API # type: ignore
from ApiCaller import ApiCaller # type: ignore
@ -106,6 +106,15 @@ class Instance:
return self.apiCaller.send_to_apis("POST", "/restart")
def bans(self) -> Tuple[bool, dict[str, Any]]:
return self.apiCaller.send_to_apis("GET", "/bans", response=True)
def ban(self, ip: str, exp: float, reason: str) -> bool:
return self.apiCaller.send_to_apis("POST", "/ban", data={"ip": ip, "exp": exp, "reason": reason})
def unban(self, ip: str) -> bool:
return self.apiCaller.send_to_apis("POST", "/unban", data={"ip": ip})
class Instances:
def __init__(self, docker_client, kubernetes_client, integration: str):
@ -255,7 +264,7 @@ class Instances:
return not_reloaded or "Successfully reloaded instances"
def reload_instance(self, _id: Optional[int] = None, instance: Optional[Instance] = None) -> str:
if instance is None:
if not instance:
instance = self.__instance_from_id(_id)
result = instance.reload()
@ -294,3 +303,42 @@ class Instances:
return f"Instance {instance.name} has been restarted."
return f"Can't restart {instance.name}"
def get_bans(self, _id: Optional[int] = None) -> List[dict[str, Any]]:
if _id:
instance = self.__instance_from_id(_id)
resp, instance_bans = instance.bans()
if not resp:
return []
return instance_bans[instance.name].get("data", [])
bans: List[dict[str, Any]] = []
for instance in self.get_instances():
resp, instance_bans = instance.bans()
if not resp:
continue
bans.extend(instance_bans[instance.name].get("data", []))
bans.sort(key=lambda x: x["exp"])
unique_bans = {}
return [unique_bans.setdefault(item["ip"], item) for item in bans if item["ip"] not in unique_bans]
def ban(self, ip: str, exp: float, reason: str, _id: Optional[int] = None) -> Union[str, list[str]]:
if _id:
instance = self.__instance_from_id(_id)
if instance.ban(ip, exp, reason):
return ""
return f"Can't ban {ip} on {instance.name}"
return [instance.name for instance in self.get_instances() if not instance.ban(ip, exp, reason)]
def unban(self, ip: str, _id: Optional[int] = None) -> Union[str, list[str]]:
if _id:
instance = self.__instance_from_id(_id)
if instance.unban(ip):
return ""
return f"Can't unban {ip} on {instance.name}"
return [instance.name for instance in self.get_instances() if not instance.unban(ip)]

View file

@ -349,7 +349,7 @@ url_for(request.endpoint)[1:].split("/")[-1].strip() %}
</div>
</div>
<form id="unban-items" action="/bans" method="post" class="w-full col-span-12 justify-center flex mt-6 mb-3">
<form id="unban-items" action="bans" method="post" class="w-full col-span-12 justify-center flex mt-6 mb-3">
<input type="hidden" name="csrf_token" value="{{csrf_token()}}">
<input type="hidden" name="operation" value="unban">
<input data-unban-inp type="hidden" name="data" value="">

View file

@ -174,7 +174,7 @@
data-ban-add-form
class="w-full flex flex-col justify-between"
id="form-new"
action="/bans"
action="bans"
method="POST"
>
<input type="hidden" name="csrf_token" value="{{csrf_token()}}" />

View file

@ -7,79 +7,40 @@ from typing import List, Optional
from qrcode.main import QRCode
import math
def get_remain(seconds):
term = "minute(s)"
years, seconds = divmod(seconds, 60 * 60 * 24 * 365)
months, seconds = divmod(seconds, 60 * 60 * 24 * 30)
while months >= 12:
years += 1
months -= 12
days, seconds = divmod(seconds, 60 * 60 * 24)
hours, seconds = divmod(seconds, 60 * 60)
minutes, seconds = divmod(seconds, 60)
time_parts = []
if years > 0:
term = "year(s)"
time_parts.append(f"{int(years)} year{'' if years == 1 else 's'}")
if months > 0:
if term == "minute(s)":
term = "month(s)"
time_parts.append(f"{int(months)} month{'' if months == 1 else 's'}")
if days > 0:
if term == "minute(s)":
term = "day(s)"
time_parts.append(f"{int(days)} day{'' if days == 1 else 's'}")
if hours > 0:
if term == "minute(s)":
term = "hour(s)"
time_parts.append(f"{int(hours)} hour{'' if hours == 1 else 's'}")
if minutes > 0:
time_parts.append(f"{int(minutes)} minute{'' if minutes == 1 else 's'}")
def get_remain(remain_time):
# Convert s to ms
ms = int(str(remain_time) + "000")
if len(time_parts) > 1:
time_parts[-1] = f"and {time_parts[-1]}"
seconds = math.floor(ms / 1000)
minutes = math.floor(seconds / 60)
hours = math.floor(minutes / 60)
days = math.floor(hours / 24)
months = math.floor(days / 30)
years = math.floor(days / 365)
seconds %= 60
minutes %= 60
hours %= 24
days %= 30
months %= 12
remain = f'{format_remain(years, "year")} {format_remain(months, "month")} {format_remain(days, "day")} {format_remain(hours, "hour")} {format_remain(minutes, "minute")} {format_remain(seconds, "second")}'
return remain
def format_remain(num, singular):
if not num:
return ""
if num == 1:
return f"{num} {singular}"
if num > 1:
return f"{num} {singular}s"
def get_range_from_remain(remain):
# Not handle
if remain == "unknown":
return remain
# Data, need format <n>y <n>m <n>d <n>h <n>min <n>s
split_remain = remain.split(" ")
terms = [num for num in split_remain if num.isdigit()]
term = ""
formats = ["year(s)", "month(s)", "day(s)", "hour(s)", "minute(s)", "second(s)"]
chars = ["year", "month", "day", "hour", "second", "s"]
# start from seconds to years, stop when first 0 occurrence
# The remain term is first 0 occurrence - 1
for i in range(len(terms)):
# remove letter
num = terms[len(terms) - 1 - i]
for char in chars:
num = num.replace(char, "")
num = "0" if not num else num
num = int(num)
# Case seconds or less
if not num and i == 0:
term = formats[len(formats) - 1]
break
# Case last element
if num and i == (len(terms) - 1):
term = formats[len(formats) - 1 - i]
break
# Case between seconds and years
if not num:
term = formats[len(formats) - i]
break
return term
return " ".join(time_parts), term
def path_to_dict(