diff --git a/docs/json2md.py b/docs/json2md.py
index dba5c97bc..23c3d7e22 100755
--- a/docs/json2md.py
+++ b/docs/json2md.py
@@ -6,9 +6,9 @@ from pytablewriter import MarkdownTableWriter
def print_md_table(settings):
- values = []
- for setting, data in settings.items():
- values.append(
+ writer = MarkdownTableWriter(
+ headers=["Setting", "Default", "Context", "Multiple", "Description"],
+ value_matrix=[
[
f"`{setting}`",
"" if data["default"] == "" else f"`{data['default']}`",
@@ -16,13 +16,11 @@ def print_md_table(settings):
"no" if not "multiple" in data else "yes",
data["help"],
]
- )
- writer = MarkdownTableWriter(
- headers=["Setting", "Default", "Context", "Multiple", "Description"],
- value_matrix=values,
+ for setting, data in settings.items()
+ ],
)
writer.write_table()
- print("")
+ print()
print("# Settings\n")
diff --git a/src/autoconf/IngressController.py b/src/autoconf/IngressController.py
index df438c0da..d4f177da7 100644
--- a/src/autoconf/IngressController.py
+++ b/src/autoconf/IngressController.py
@@ -22,14 +22,14 @@ class IngressController(Controller, ConfigCaller):
self.__logger = setup_logger("Ingress-controller", getenv("LOG_LEVEL", "INFO"))
def _get_controller_instances(self):
- controller_instances = []
- for pod in self.__corev1.list_pod_for_all_namespaces(watch=False).items:
+ return [
+ pod
+ for pod in self.__corev1.list_pod_for_all_namespaces(watch=False).items
if (
pod.metadata.annotations != None
and "bunkerweb.io/INSTANCE" in pod.metadata.annotations
- ):
- controller_instances.append(pod)
- return controller_instances
+ )
+ ]
def _to_instances(self, controller_instance):
instance = {}
@@ -289,9 +289,10 @@ class IngressController(Controller, ConfigCaller):
def process_events(self):
watch_types = ["pod", "ingress", "configmap"]
- threads = []
- for watch_type in watch_types:
- threads.append(Thread(target=self.__watch, args=(watch_type,)))
+ threads = [
+ Thread(target=self.__watch, args=(watch_type,))
+ for watch_type in watch_types
+ ]
for thread in threads:
thread.start()
for thread in threads:
diff --git a/src/autoconf/SwarmController.py b/src/autoconf/SwarmController.py
index d36375b26..9e551ba71 100644
--- a/src/autoconf/SwarmController.py
+++ b/src/autoconf/SwarmController.py
@@ -143,9 +143,10 @@ class SwarmController(Controller, ConfigCaller):
def process_events(self):
event_types = ["service", "config"]
- threads = []
- for event_type in event_types:
- threads.append(Thread(target=self.__event, args=(event_type,)))
+ threads = [
+ Thread(target=self.__event, args=(event_type,))
+ for event_type in event_types
+ ]
for thread in threads:
thread.start()
for thread in threads:
diff --git a/src/autoconf/main.py b/src/autoconf/main.py
index 5b4e3fde1..afb659310 100644
--- a/src/autoconf/main.py
+++ b/src/autoconf/main.py
@@ -5,10 +5,14 @@ from signal import SIGINT, SIGTERM, signal
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/api")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/api",
+ "/usr/share/bunkerweb/db",
+ )
+)
from logger import setup_logger
from SwarmController import SwarmController
diff --git a/src/common/cli/CLI.py b/src/common/cli/CLI.py
index 314504349..736506ee2 100644
--- a/src/common/cli/CLI.py
+++ b/src/common/cli/CLI.py
@@ -42,7 +42,7 @@ class CLI(ApiCaller):
def __get_apis(self):
# Docker case
- if self.__integration == "docker" or self.__integration == "linux":
+ if self.__integration in ("docker", "linux"):
return [
API(
f"http://127.0.0.1:{self.__variables['API_HTTP_PORT']}",
diff --git a/src/common/cli/main.py b/src/common/cli/main.py
index 4328fbc1e..fcca7ee59 100644
--- a/src/common/cli/main.py
+++ b/src/common/cli/main.py
@@ -5,10 +5,14 @@ from os import _exit
from sys import exit as sys_exit, path
from traceback import format_exc
-path.append("/usr/share/bunkerweb/deps/python")
-path.append("/usr/share/bunkerweb/cli")
-path.append("/usr/share/bunkerweb/utils")
-path.append("/usr/share/bunkerweb/api")
+path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/cli",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/api",
+ )
+)
from logger import setup_logger
from CLI import CLI
diff --git a/src/common/core/blacklist/jobs/blacklist-download.py b/src/common/core/blacklist/jobs/blacklist-download.py
index 267408a17..fdaf6ced2 100755
--- a/src/common/core/blacklist/jobs/blacklist-download.py
+++ b/src/common/core/blacklist/jobs/blacklist-download.py
@@ -1,15 +1,21 @@
#!/usr/bin/python3
+from contextlib import suppress
from ipaddress import ip_address, ip_network
from os import _exit, getenv, makedirs
+from pathlib import Path
from re import IGNORECASE, compile as re_compile
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
from typing import Tuple
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/db",
+ )
+)
from requests import get
@@ -25,17 +31,13 @@ uri_rx = re_compile(rb"^/")
def check_line(kind: str, line: bytes) -> Tuple[bool, bytes]:
if kind == "IP":
if b"/" in line:
- try:
+ with suppress(ValueError):
ip_network(line.decode("utf-8"))
return True, line
- except ValueError:
- pass
else:
- try:
+ with suppress(ValueError):
ip_address(line.decode("utf-8"))
return True, line
- except ValueError:
- pass
elif kind == "RDNS":
if rdns_rx.match(line):
return True, line.lower()
@@ -163,8 +165,7 @@ try:
content += data + b"\n"
i += 1
- with open(f"/var/tmp/bunkerweb/blacklist/{kind}.list", "wb") as f:
- f.write(content)
+ Path(f"/var/tmp/bunkerweb/blacklist/{kind}.list").write_bytes(content)
logger.info(f"Downloaded {i} bad {kind}")
# Check if file has changed
diff --git a/src/common/core/bunkernet/jobs/bunkernet-data.py b/src/common/core/bunkernet/jobs/bunkernet-data.py
index 3c213eb68..65574b59b 100755
--- a/src/common/core/bunkernet/jobs/bunkernet-data.py
+++ b/src/common/core/bunkernet/jobs/bunkernet-data.py
@@ -2,13 +2,18 @@
from os import _exit, getenv, makedirs
from os.path import isfile
+from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/db")
-sys_path.append("/usr/share/bunkerweb/core/bunkernet/jobs")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/db",
+ "/usr/share/bunkerweb/core/bunkernet/jobs",
+ )
+)
from bunkernet import data
from Database import Database
@@ -83,8 +88,7 @@ try:
# Writing data to file
logger.info("Saving BunkerNet data ...")
content = "\n".join(data["data"]).encode("utf-8")
- with open("/var/tmp/bunkerweb/bunkernet-ip.list", "wb") as f:
- f.write(content)
+ Path("/var/tmp/bunkerweb/bunkernet-ip.list").write_bytes(content)
# Check if file has changed
new_hash = file_hash("/var/tmp/bunkerweb/bunkernet-ip.list")
diff --git a/src/common/core/bunkernet/jobs/bunkernet-register.py b/src/common/core/bunkernet/jobs/bunkernet-register.py
index 97ba00c1e..4095a4ccb 100755
--- a/src/common/core/bunkernet/jobs/bunkernet-register.py
+++ b/src/common/core/bunkernet/jobs/bunkernet-register.py
@@ -2,14 +2,19 @@
from os import _exit, getenv, makedirs, remove
from os.path import isfile
+from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from time import sleep
from traceback import format_exc
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/db")
-sys_path.append("/usr/share/bunkerweb/core/bunkernet/jobs")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/db",
+ "/usr/share/bunkerweb/core/bunkernet/jobs",
+ )
+)
from bunkernet import register, ping, get_id
from Database import Database
@@ -76,8 +81,7 @@ try:
f"Successfully registered on BunkerNet API with instance id {data['data']}"
)
else:
- with open("/var/cache/bunkerweb/bunkernet/instance.id", "r") as f:
- bunkernet_id = f.read()
+ bunkernet_id = Path("/var/cache/bunkerweb/bunkernet/instance.id").read_text()
logger.info(f"Already registered on BunkerNet API with instance id {get_id()}")
# Ping
@@ -115,8 +119,7 @@ try:
logger.info("Connectivity with BunkerWeb is successful !")
status = 1
if not isfile("/var/cache/bunkerweb/bunkernet/instance.id"):
- with open("/var/cache/bunkerweb/bunkernet/instance.id", "w") as f:
- f.write(bunkernet_id)
+ Path("/var/cache/bunkerweb/bunkernet/instance.id").write_text(bunkernet_id)
# Update db
err = db.update_job_cache(
diff --git a/src/common/core/customcert/jobs/custom-cert.py b/src/common/core/customcert/jobs/custom-cert.py
index a74a65bf4..8b0239c42 100644
--- a/src/common/core/customcert/jobs/custom-cert.py
+++ b/src/common/core/customcert/jobs/custom-cert.py
@@ -2,13 +2,18 @@
from os import getenv, makedirs, remove
from os.path import isfile
+from pathlib import Path
from shutil import copy
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/db",
+ )
+)
from Database import Database
from jobs import file_hash
@@ -45,16 +50,13 @@ def check_cert(cert_path, key_path, first_server: str = None) -> bool:
cert_hash = file_hash(cert_path)
if not isfile(cert_cache_path):
- with open(cert_cache_path, "w") as f:
- f.write(cert_hash)
+ Path(cert_cache_path).write_text(cert_hash)
old_hash = file_hash(cert_cache_path)
if old_hash == cert_hash:
return False
- with open(cert_cache_path, "w") as f:
- f.write(cert_hash)
-
+ Path(cert_cache_path).write_text(cert_hash)
copy(cert_path, cert_cache_path.replace(".hash", ""))
if not isfile(key_path):
@@ -71,8 +73,7 @@ def check_cert(cert_path, key_path, first_server: str = None) -> bool:
key_hash = file_hash(key_path)
if not isfile(key_cache_path):
- with open(key_cache_path, "w") as f:
- f.write(key_hash)
+ Path(key_cache_path).write_text(key_hash)
old_hash = file_hash(key_cache_path)
if old_hash != key_hash:
diff --git a/src/common/core/greylist/jobs/greylist-download.py b/src/common/core/greylist/jobs/greylist-download.py
index 8561312f0..8c10de358 100755
--- a/src/common/core/greylist/jobs/greylist-download.py
+++ b/src/common/core/greylist/jobs/greylist-download.py
@@ -1,15 +1,21 @@
#!/usr/bin/python3
+from contextlib import suppress
from ipaddress import ip_address, ip_network
from os import _exit, getenv, makedirs
+from pathlib import Path
from re import IGNORECASE, compile as re_compile
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
from typing import Tuple
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/db",
+ )
+)
from requests import get
@@ -25,17 +31,13 @@ uri_rx = re_compile(rb"^/")
def check_line(kind: str, line: bytes) -> Tuple[bool, bytes]:
if kind == "IP":
if b"/" in line:
- try:
+ with suppress(ValueError):
ip_network(line.decode("utf-8"))
return True, line
- except ValueError:
- pass
else:
- try:
+ with suppress(ValueError):
ip_address(line.decode("utf-8"))
return True, line
- except ValueError:
- pass
elif kind == "RDNS":
if rdns_rx.match(line):
return True, line.lower()
@@ -147,8 +149,7 @@ try:
content += data + b"\n"
i += 1
- with open(f"/var/tmp/bunkerweb/greylist/{kind}.list", "wb") as f:
- f.write(content)
+ Path(f"/var/tmp/bunkerweb/greylist/{kind}.list").write_bytes(content)
logger.info(f"Downloaded {i} grey {kind}")
# Check if file has changed
diff --git a/src/common/core/jobs/jobs/download-plugins.py b/src/common/core/jobs/jobs/download-plugins.py
index a90b5d4ef..27ca3744d 100644
--- a/src/common/core/jobs/jobs/download-plugins.py
+++ b/src/common/core/jobs/jobs/download-plugins.py
@@ -12,9 +12,13 @@ from shutil import copytree, rmtree
from traceback import format_exc
from zipfile import ZipFile
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/db",
+ )
+)
from requests import get
@@ -32,7 +36,6 @@ status = 0
def install_plugin(plugin_dir):
# Load plugin.json
- metadata = {}
with open(f"{plugin_dir}plugin.json", "rb") as f:
metadata = loads(f.read())
# Don't go further if plugin is already installed
diff --git a/src/common/core/jobs/jobs/mmdb-asn.py b/src/common/core/jobs/jobs/mmdb-asn.py
index 5570ec5ff..08d50a310 100755
--- a/src/common/core/jobs/jobs/mmdb-asn.py
+++ b/src/common/core/jobs/jobs/mmdb-asn.py
@@ -3,12 +3,17 @@
from datetime import date
from gzip import decompress
from os import _exit, getenv
+from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/db",
+ )
+)
from maxminddb import open_database
from requests import get
@@ -39,8 +44,7 @@ try:
# Save it to temp
logger.info("Saving mmdb file to tmp ...")
- with open("/var/tmp/bunkerweb/asn.mmdb", "wb") as f:
- f.write(decompress(resp.content))
+ Path(f"/var/tmp/bunkerweb/asn.mmdb").write_bytes(decompress(resp.content))
# Try to load it
logger.info("Checking if mmdb file is valid ...")
diff --git a/src/common/core/jobs/jobs/mmdb-country.py b/src/common/core/jobs/jobs/mmdb-country.py
index 40ab4b6f4..9c623c37b 100755
--- a/src/common/core/jobs/jobs/mmdb-country.py
+++ b/src/common/core/jobs/jobs/mmdb-country.py
@@ -3,12 +3,17 @@
from datetime import date
from gzip import decompress
from os import _exit, getenv
+from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/db",
+ )
+)
from requests import get
from maxminddb import open_database
@@ -39,8 +44,7 @@ try:
# Save it to temp
logger.info("Saving mmdb file to tmp ...")
- with open("/var/tmp/bunkerweb/country.mmdb", "wb") as f:
- f.write(decompress(resp.content))
+ Path(f"/var/tmp/bunkerweb/country.mmdb").write_bytes(decompress(resp.content))
# Try to load it
logger.info("Checking if mmdb file is valid ...")
diff --git a/src/common/core/letsencrypt/jobs/certbot-auth.py b/src/common/core/letsencrypt/jobs/certbot-auth.py
index 5de50a3cf..59be475ba 100755
--- a/src/common/core/letsencrypt/jobs/certbot-auth.py
+++ b/src/common/core/letsencrypt/jobs/certbot-auth.py
@@ -2,13 +2,18 @@
from os import getenv, makedirs
from os.path import exists
+from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/api")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/api",
+ "/usr/share/bunkerweb/db",
+ )
+)
from Database import Database
from logger import setup_logger
@@ -67,8 +72,7 @@ try:
else:
root_dir = "/var/tmp/bunkerweb/lets-encrypt/.well-known/acme-challenge/"
makedirs(root_dir, exist_ok=True)
- with open(f"{root_dir}{token}", "w") as f:
- f.write(validation)
+ Path(f"{root_dir}{token}").write_text(validation)
except:
status = 1
logger.error(f"Exception while running certbot-auth.py :\n{format_exc()}")
diff --git a/src/common/core/letsencrypt/jobs/certbot-cleanup.py b/src/common/core/letsencrypt/jobs/certbot-cleanup.py
index dbe7ea5e0..152d3e489 100755
--- a/src/common/core/letsencrypt/jobs/certbot-cleanup.py
+++ b/src/common/core/letsencrypt/jobs/certbot-cleanup.py
@@ -5,10 +5,14 @@ from os.path import exists, isfile
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/api")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/api",
+ "/usr/share/bunkerweb/db",
+ )
+)
from Database import Database
from logger import setup_logger
diff --git a/src/common/core/letsencrypt/jobs/certbot-deploy.py b/src/common/core/letsencrypt/jobs/certbot-deploy.py
index 2bcf8fcb4..268a9fd32 100755
--- a/src/common/core/letsencrypt/jobs/certbot-deploy.py
+++ b/src/common/core/letsencrypt/jobs/certbot-deploy.py
@@ -9,10 +9,14 @@ from sys import exit as sys_exit, path as sys_path
from tarfile import open as tar_open
from traceback import format_exc
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/api")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/api",
+ "/usr/share/bunkerweb/db",
+ )
+)
from Database import Database
from logger import setup_logger
diff --git a/src/common/core/letsencrypt/jobs/certbot-new.py b/src/common/core/letsencrypt/jobs/certbot-new.py
index 4cb820bec..6e818d6fc 100755
--- a/src/common/core/letsencrypt/jobs/certbot-new.py
+++ b/src/common/core/letsencrypt/jobs/certbot-new.py
@@ -2,13 +2,18 @@
from os import environ, getcwd, getenv
from os.path import exists
+from pathlib import Path
from subprocess import DEVNULL, STDOUT, run
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/db",
+ )
+)
from Database import Database
from logger import setup_logger
@@ -93,10 +98,9 @@ try:
)
if exists(f"/etc/letsencrypt/live/{first_server}/cert.pem"):
- with open(
- f"/etc/letsencrypt/live/{first_server}/cert.pem", "rb"
- ) as f:
- cert = f.read()
+ cert = Path(
+ f"/etc/letsencrypt/live/{first_server}/cert.pem"
+ ).read_bytes()
# Update db
err = db.update_job_cache(
@@ -132,10 +136,9 @@ try:
)
if exists(f"/etc/letsencrypt/live/{first_server}/cert.pem"):
- with open(
- f"/etc/letsencrypt/live/{first_server}/cert.pem", "rb"
- ) as f:
- cert = f.read()
+ cert = Path(
+ f"/etc/letsencrypt/live/{first_server}/cert.pem"
+ ).read_bytes()
# Update db
err = db.update_job_cache(
diff --git a/src/common/core/letsencrypt/jobs/certbot-renew.py b/src/common/core/letsencrypt/jobs/certbot-renew.py
index cf464c3b8..b8ad40bb0 100755
--- a/src/common/core/letsencrypt/jobs/certbot-renew.py
+++ b/src/common/core/letsencrypt/jobs/certbot-renew.py
@@ -6,8 +6,12 @@ from subprocess import DEVNULL, STDOUT, run
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ )
+)
from logger import setup_logger
@@ -21,7 +25,7 @@ def renew(domain):
"--cert-name",
domain,
"--deploy-hook",
- f"{getcwd()}/certbot-deploy.py",
+ "/usr/share/bunkerweb/core/letsencrypt/jobs/certbot-deploy.py",
],
stdin=DEVNULL,
stderr=STDOUT,
diff --git a/src/common/core/realip/jobs/realip-download.py b/src/common/core/realip/jobs/realip-download.py
index 0f9b4848a..8b8b36d31 100755
--- a/src/common/core/realip/jobs/realip-download.py
+++ b/src/common/core/realip/jobs/realip-download.py
@@ -1,13 +1,19 @@
#!/usr/bin/python3
+from contextlib import suppress
from ipaddress import ip_address, ip_network
from os import _exit, getenv, makedirs
+from pathlib import Path
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/db",
+ )
+)
from requests import get
@@ -18,17 +24,13 @@ from jobs import cache_file, cache_hash, file_hash, is_cached_file
def check_line(line):
if "/" in line:
- try:
+ with suppress(ValueError):
ip_network(line)
return True, line
- except ValueError:
- pass
else:
- try:
+ with suppress(ValueError):
ip_address(line)
return True, line
- except ValueError:
- pass
return False, ""
@@ -69,10 +71,7 @@ try:
_exit(0)
# Get URLs
- urls = []
- for url in getenv("REALIP_FROM_URLS", "").split(" "):
- if url != "" and url not in urls:
- urls.append(url)
+ urls = [url for url in getenv("REALIP_FROM_URLS", "").split(" ") if url]
# Download and write data to temp file
i = 0
@@ -101,8 +100,7 @@ try:
f"Exception while getting RealIP list from {url} :\n{format_exc()}"
)
- with open("/var/tmp/bunkerweb/realip-combined.list", "wb") as f:
- f.write(content)
+ Path("/var/tmp/bunkerweb/realip-combined.list").write_bytes(content)
# Check if file has changed
new_hash = file_hash("/var/tmp/bunkerweb/realip-combined.list")
diff --git a/src/common/core/selfsigned/jobs/self-signed.py b/src/common/core/selfsigned/jobs/self-signed.py
index 94626d4be..3cc9368f2 100755
--- a/src/common/core/selfsigned/jobs/self-signed.py
+++ b/src/common/core/selfsigned/jobs/self-signed.py
@@ -6,9 +6,13 @@ from subprocess import DEVNULL, STDOUT, run
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/db",
+ )
+)
from Database import Database
from logger import setup_logger
diff --git a/src/common/core/whitelist/jobs/whitelist-download.py b/src/common/core/whitelist/jobs/whitelist-download.py
index fb5eeb174..4a8d6dec8 100755
--- a/src/common/core/whitelist/jobs/whitelist-download.py
+++ b/src/common/core/whitelist/jobs/whitelist-download.py
@@ -1,15 +1,21 @@
#!/usr/bin/python3
+from contextlib import suppress
from ipaddress import ip_address, ip_network
from os import _exit, getenv, makedirs
+from pathlib import Path
from re import IGNORECASE, compile as re_compile
from sys import exit as sys_exit, path as sys_path
from traceback import format_exc
from typing import Tuple
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/db",
+ )
+)
from requests import get
@@ -25,17 +31,13 @@ uri_rx = re_compile(rb"^/")
def check_line(kind: str, line: bytes) -> Tuple[bool, bytes]:
if kind == "IP":
if b"/" in line:
- try:
+ with suppress(ValueError):
ip_network(line.decode("utf-8"))
return True, line
- except ValueError:
- pass
else:
- try:
+ with suppress(ValueError):
ip_address(line.decode("utf-8"))
return True, line
- except ValueError:
- pass
elif kind == "RDNS":
if rdns_rx.match(line):
return True, line.lower()
@@ -147,8 +149,7 @@ try:
content += data + b"\n"
i += 1
- with open(f"/var/tmp/bunkerweb/whitelist/{kind}.list", "wb") as f:
- f.write(content)
+ Path(f"/var/tmp/bunkerweb/whitelist/{kind}.list").write_bytes(content)
logger.info(f"Downloaded {i} good {kind}")
# Check if file has changed
diff --git a/src/common/db/Database.py b/src/common/db/Database.py
index 6ebcfe4f2..cef981223 100644
--- a/src/common/db/Database.py
+++ b/src/common/db/Database.py
@@ -1,4 +1,4 @@
-from contextlib import contextmanager
+from contextlib import contextmanager, suppress
from copy import deepcopy
from datetime import datetime
from hashlib import sha256
@@ -68,10 +68,8 @@ class Database:
)
if sqlalchemy_string.startswith("sqlite"):
- try:
+ with suppress(FileExistsError):
makedirs(dirname(sqlalchemy_string.split("///")[1]), exist_ok=True)
- except FileExistsError:
- pass
elif "+" in sqlalchemy_string and "+pymysql" not in sqlalchemy_string:
splitted = sqlalchemy_string.split("+")
sqlalchemy_string = f"{splitted[0]}:{':'.join(splitted[1].split(':')[1:])}"
@@ -567,12 +565,10 @@ class Database:
Global_values.suffix == suffix,
).update({Global_values.value: value})
- try:
+ with suppress(ProgrammingError, OperationalError):
metadata = session.query(Metadata).get(1)
if metadata is not None and not metadata.first_config_saved:
metadata.first_config_saved = True
- except (ProgrammingError, OperationalError):
- pass
try:
session.add_all(to_put)
@@ -645,8 +641,9 @@ class Database:
if custom_conf is None:
to_put.append(Custom_configs(**config))
- elif config["checksum"] != custom_conf.checksum and (
- method == custom_conf.method or method == "autoconf"
+ elif config["checksum"] != custom_conf.checksum and method in (
+ custom_conf.method,
+ "autoconf",
):
session.query(Custom_configs).filter(
Custom_configs.service_id == config.get("service_id", None),
@@ -810,10 +807,9 @@ class Database:
for key, value in deepcopy(tmp_config).items():
if key.startswith(f"{service}_"):
- tmp_config[key.replace(f"{service}_", "")] = value
- del tmp_config[key]
+ tmp_config[key.replace(f"{service}_", "")] = tmp_config.pop(key)
elif any(key.startswith(f"{s}_") for s in service_names):
- del tmp_config[key]
+ tmp_config.pop(key)
else:
tmp_config[key] = (
{"value": value["value"], "method": "default"}
@@ -1367,7 +1363,7 @@ class Database:
return "An instance with the same hostname already exists."
session.add(
- Instances(hostname=hostname, port=int(port), server_name=server_name)
+ Instances(hostname=hostname, port=port, server_name=server_name)
)
try:
diff --git a/src/common/gen/Templator.py b/src/common/gen/Templator.py
index e1eadb4a1..5aca93e0e 100644
--- a/src/common/gen/Templator.py
+++ b/src/common/gen/Templator.py
@@ -62,9 +62,9 @@ class Templator:
if config != None:
real_config = config
Path(dirname(real_path)).mkdir(parents=True, exist_ok=True)
- with open(real_path, "w") as f:
- for k, v in real_config.items():
- f.write(f"{k}={v}\n")
+ Path(real_path).write_text(
+ "\n".join(f"{k}={v}" for k, v in real_config.items())
+ )
def __render_global(self):
self.__write_config()
@@ -129,8 +129,7 @@ class Templator:
real_name = name
jinja_template = self.__jinja_env.get_template(template)
Path(dirname(f"{real_output}{real_name}")).mkdir(parents=True, exist_ok=True)
- with open(f"{real_output}{real_name}", "w") as f:
- f.write(jinja_template.render(real_config))
+ Path(f"{real_output}{real_name}").write_text(jinja_template.render(real_config))
def is_custom_conf(path):
return glob(f"{path}/*.conf")
diff --git a/src/common/gen/main.py b/src/common/gen/main.py
index ef397d3e4..6a07b32e3 100644
--- a/src/common/gen/main.py
+++ b/src/common/gen/main.py
@@ -11,9 +11,13 @@ from time import sleep
from traceback import format_exc
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/api")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/api",
+ )
+)
from logger import setup_logger
from Configurator import Configurator
diff --git a/src/common/gen/save_config.py b/src/common/gen/save_config.py
index f78196790..2b5fc8947 100644
--- a/src/common/gen/save_config.py
+++ b/src/common/gen/save_config.py
@@ -13,10 +13,14 @@ from traceback import format_exc
from typing import Any
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/api")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/api",
+ "/usr/share/bunkerweb/db",
+ )
+)
from docker import DockerClient
from kubernetes import client as kube_client
diff --git a/src/common/utils/jobs.py b/src/common/utils/jobs.py
index 25f87a792..02e3ebbf4 100644
--- a/src/common/utils/jobs.py
+++ b/src/common/utils/jobs.py
@@ -1,3 +1,4 @@
+from contextlib import suppress
from datetime import datetime
from hashlib import sha512
from json import dumps, loads
@@ -21,7 +22,6 @@ def is_cached_file(file, expire):
return False
if not path.isfile(f"{file}.md"):
return False
- cached_time = 0
with open(f"{file}.md", "r") as f:
cached_time = loads(f.read())["date"]
current_time = datetime.timestamp(datetime.now())
@@ -51,11 +51,9 @@ def file_hash(file):
def cache_hash(cache):
- try:
+ with suppress(BaseException):
with open(f"{cache}.md", "r") as f:
return loads(f.read())["checksum"]
- except:
- pass
return None
diff --git a/src/scheduler/JobScheduler.py b/src/scheduler/JobScheduler.py
index c63957178..2398825a4 100644
--- a/src/scheduler/JobScheduler.py
+++ b/src/scheduler/JobScheduler.py
@@ -13,8 +13,7 @@ from schedule import (
from sys import path as sys_path
from traceback import format_exc
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(("/usr/share/bunkerweb/utils", "/usr/share/bunkerweb/db"))
from Database import Database
from logger import setup_logger
diff --git a/src/scheduler/main.py b/src/scheduler/main.py
index ed968fba5..31e8f1c82 100644
--- a/src/scheduler/main.py
+++ b/src/scheduler/main.py
@@ -16,6 +16,7 @@ from os import (
walk,
)
from os.path import dirname, exists, isdir, isfile, islink, join
+from pathlib import Path
from shutil import chown, copy, rmtree
from signal import SIGINT, SIGTERM, signal, SIGHUP
from subprocess import run as subprocess_run, DEVNULL, STDOUT
@@ -24,10 +25,14 @@ from time import sleep
from traceback import format_exc
from typing import Any, Dict, List
-sys_path.append("/usr/share/bunkerweb/deps/python")
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/api")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/deps/python",
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/api",
+ "/usr/share/bunkerweb/db",
+ )
+)
from dotenv import dotenv_values
@@ -97,9 +102,8 @@ def generate_custom_configs(
if custom_config["service_id"]:
tmp_path += f"/{custom_config['service_id']}"
tmp_path += f"/{custom_config['name']}.conf"
- makedirs(dirname(tmp_path), exist_ok=True)
- with open(tmp_path, "wb") as f:
- f.write(custom_config["data"])
+ Path(dirname(tmp_path)).mkdir(parents=True, exist_ok=True)
+ Path(tmp_path).write_bytes(custom_config["data"])
# Fix permissions for the custom configs folder
for root, dirs, files in walk("/data/configs", topdown=False):
@@ -127,8 +131,7 @@ if __name__ == "__main__":
_exit(1)
# Write pid to file
- with open("/var/tmp/bunkerweb/scheduler.pid", "w") as f:
- f.write(str(getpid()))
+ Path("/var/tmp/bunkerweb/scheduler.pid").write_text(str(getpid()))
# Parse arguments
parser = ArgumentParser(description="Job scheduler for BunkerWeb")
diff --git a/src/ui/main.py b/src/ui/main.py
index d64f97c12..12f3c1cdc 100755
--- a/src/ui/main.py
+++ b/src/ui/main.py
@@ -1,4 +1,6 @@
+from contextlib import suppress
from io import BytesIO
+from pathlib import Path
from signal import SIGINT, signal, SIGTERM
from bs4 import BeautifulSoup
from copy import deepcopy
@@ -41,9 +43,13 @@ from typing import Optional
from uuid import uuid4
from zipfile import BadZipFile, ZipFile
-sys_path.append("/usr/share/bunkerweb/utils")
-sys_path.append("/usr/share/bunkerweb/api")
-sys_path.append("/usr/share/bunkerweb/db")
+sys_path.extend(
+ (
+ "/usr/share/bunkerweb/utils",
+ "/usr/share/bunkerweb/api",
+ "/usr/share/bunkerweb/db",
+ )
+)
from src.Instances import Instances
from src.ConfigFiles import ConfigFiles
@@ -112,8 +118,7 @@ if not vars["FLASK_ENV"] == "development" and vars["ABSOLUTE_URI"].endswith(
logger.error("Please change the default URL.")
sys_exit(1)
-with open("/var/tmp/bunkerweb/ui.pid", "w") as f:
- f.write(str(getpid()))
+Path("/var/tmp/bunkerweb/ui.pid").write_text(str(getpid()))
login_manager = LoginManager()
login_manager.init_app(app)
@@ -334,12 +339,12 @@ def instances():
# Manage instances
if request.method == "POST":
# Check operation
- if not "operation" in request.form or not request.form["operation"] in [
+ if not "operation" in request.form or not request.form["operation"] in (
"reload",
"start",
"stop",
"restart",
- ]:
+ ):
flash("Missing operation parameter on /instances.", "error")
return redirect(url_for("loading", next=url_for("instances")))
@@ -384,11 +389,11 @@ def services():
if request.method == "POST":
# Check operation
- if not "operation" in request.form or not request.form["operation"] in [
+ if not "operation" in request.form or not request.form["operation"] in (
"new",
"edit",
"delete",
- ]:
+ ):
flash("Missing operation parameter on /services.", "error")
return redirect(url_for("loading", next=url_for("services")))
@@ -574,11 +579,11 @@ def configs():
operation = ""
# Check operation
- if not "operation" in request.form or not request.form["operation"] in [
+ if not "operation" in request.form or not request.form["operation"] in (
"new",
"edit",
"delete",
- ]:
+ ):
flash("Missing operation parameter on /configs.", "error")
return redirect(url_for("loading", next=url_for("configs")))
@@ -811,7 +816,7 @@ def plugins():
except BadZipFile:
error = 1
flash(
- f"{file} is not a valid zip file. ({folder_name if folder_name else temp_folder_name})",
+ f"{file} is not a valid zip file. ({folder_name or temp_folder_name})",
"error",
)
else:
@@ -911,37 +916,37 @@ def plugins():
except ReadError:
error = 1
flash(
- f"Couldn't read file {file} ({folder_name if folder_name else temp_folder_name})",
+ f"Couldn't read file {file} ({folder_name or temp_folder_name})",
"error",
)
except CompressionError:
error = 1
flash(
- f"{file} is not a valid tar file ({folder_name if folder_name else temp_folder_name})",
+ f"{file} is not a valid tar file ({folder_name or temp_folder_name})",
"error",
)
except HeaderError:
error = 1
flash(
- f"The file plugin.json in {file} is not valid ({folder_name if folder_name else temp_folder_name})",
+ f"The file plugin.json in {file} is not valid ({folder_name or temp_folder_name})",
"error",
)
except KeyError:
error = 1
flash(
- f"{file} is not a valid plugin (plugin.json file is missing) ({folder_name if folder_name else temp_folder_name})",
+ f"{file} is not a valid plugin (plugin.json file is missing) ({folder_name or temp_folder_name})",
"error",
)
except JSONDecodeError as e:
error = 1
flash(
- f"The file plugin.json in {file} is not valid ({e.msg}: line {e.lineno} column {e.colno} (char {e.pos})) ({folder_name if folder_name else temp_folder_name})",
+ f"The file plugin.json in {file} is not valid ({e.msg}: line {e.lineno} column {e.colno} (char {e.pos})) ({folder_name or temp_folder_name})",
"error",
)
except ValueError:
error = 1
flash(
- f"The file plugin.json is missing one or more of the following keys: {', '.join(PLUGIN_KEYS)} ({folder_name if folder_name else temp_folder_name})",
+ f"The file plugin.json is missing one or more of the following keys: {', '.join(PLUGIN_KEYS)} ({folder_name or temp_folder_name})",
"error",
)
except FileExistsError:
@@ -985,10 +990,8 @@ def plugins():
# Remove tmp folder
if exists("/var/tmp/bunkerweb/ui"):
- try:
+ with suppress(OSError):
rmtree("/var/tmp/bunkerweb/ui")
- except OSError:
- pass
return redirect(
url_for("loading", next=url_for("plugins"), message="Reloading plugins")
@@ -1057,11 +1060,9 @@ def upload_plugin():
if not file.filename.endswith((".zip", ".tar.gz", ".tar.xz")):
return {"status": "ko"}, 422
- with open(
- f"/var/tmp/bunkerweb/ui/{uuid4()}{file.filename[file.filename.index('.'):]}",
- "wb",
- ) as f:
- f.write(file.read())
+ Path(
+ f"/var/tmp/bunkerweb/ui/{uuid4()}{file.filename[file.filename.index('.'):]}"
+ ).write_bytes(file.read())
return {"status": "ok"}, 201
@@ -1127,7 +1128,7 @@ def custom_plugin(plugin):
finally:
# Remove the custom plugin from the shared library
sys_path.pop()
- del sys_modules["actions"]
+ sys_modules.pop("actions")
del actions
if (
@@ -1304,7 +1305,7 @@ def logs_container(container_id):
if from_date is not None:
last_update = from_date
- if any(arg and not arg.isdigit() for arg in [last_update, from_date, to_date]):
+ if any(arg and not arg.isdigit() for arg in (last_update, from_date, to_date)):
return (
jsonify(
{
diff --git a/src/ui/src/Config.py b/src/ui/src/Config.py
index 38164982e..aa716a03a 100644
--- a/src/ui/src/Config.py
+++ b/src/ui/src/Config.py
@@ -1,5 +1,6 @@
from copy import deepcopy
from os import listdir, remove
+from pathlib import Path
from time import sleep
from flask import flash
from os.path import exists, isfile
@@ -98,11 +99,8 @@ class Config:
if not isfile(filename):
return {}
- with open(filename, "r") as f:
- env = f.read()
-
data = {}
- for line in env.split("\n"):
+ for line in Path(filename).read_text().split("\n"):
if not "=" in line:
continue
var = line.split("=")[0]
@@ -121,8 +119,9 @@ class Config:
variables : dict
The dict to convert to env file
"""
- with open(filename, "w") as f:
- f.write("\n".join(f"{k}={variables[k]}" for k in sorted(variables)))
+ Path(filename).write_text(
+ "\n".join(f"{k}={variables[k]}" for k in sorted(variables))
+ )
def __gen_conf(self, global_conf: dict, services_conf: list[dict]) -> None:
"""Generates the nginx configuration file from the given configuration
@@ -306,7 +305,7 @@ class Config:
if edit is False:
return f"Service {service['SERVER_NAME']} already exists.", 1
- del services[i]
+ services.pop(i)
services.append(variables)
self.__gen_conf(self.get_config(methods=False), services)
@@ -397,11 +396,11 @@ class Config:
for k in full_env:
if k.startswith(service_name):
- del new_env[k]
+ new_env.pop(k)
for service in new_services:
if k in service:
- del service[k]
+ service.pop(k)
self.__gen_conf(new_env, new_services)
return f"Configuration for {service_name} has been deleted.", 0
diff --git a/src/ui/src/ConfigFiles.py b/src/ui/src/ConfigFiles.py
index 4a5ab6c3d..2f16832ad 100644
--- a/src/ui/src/ConfigFiles.py
+++ b/src/ui/src/ConfigFiles.py
@@ -1,5 +1,6 @@
from os import listdir, mkdir, remove, replace, walk
from os.path import dirname, exists, join, isfile
+from pathlib import Path
from re import compile as re_compile
from shutil import rmtree, move as shutil_move
from typing import Tuple
@@ -95,7 +96,7 @@ class ConfigFiles:
def create_folder(self, path: str, name: str) -> Tuple[str, int]:
folder_path = join(path, name) if not path.endswith(name) else path
try:
- mkdir(folder_path)
+ Path(folder_path).mkdir()
except OSError:
return f"Could not create {folder_path}", 1
@@ -103,11 +104,8 @@ class ConfigFiles:
def create_file(self, path: str, name: str, content: str) -> Tuple[str, int]:
file_path = join(path, name)
- mkdir(path, exist_ok=True)
-
- with open(file_path, "w") as f:
- f.write(content)
-
+ Path(path).mkdir(exist_ok=True)
+ Path(file_path).write_text(content)
return f"The file {file_path} was successfully created", 0
def edit_folder(self, path: str, name: str, old_name: str) -> Tuple[str, int]:
@@ -137,8 +135,7 @@ class ConfigFiles:
old_path = dirname(join(path, old_name))
try:
- with open(old_path, "r") as f:
- file_content = f.read()
+ file_content = Path(old_path).read_text()
except FileNotFoundError:
return f"Could not find {old_path}", 1
@@ -161,7 +158,6 @@ class ConfigFiles:
except OSError:
return f"Could not remove {old_path}", 1
- with open(new_path, "w") as f:
- f.write(content)
+ Path(new_path).write_text(content)
return f"The file {old_path} was successfully edited", 0
diff --git a/src/ui/src/__init__.py b/src/ui/src/__init__.py
new file mode 100644
index 000000000..e69de29bb
diff --git a/tests/LinuxTest.py b/tests/LinuxTest.py
index 0c904911a..6bdb16285 100644
--- a/tests/LinuxTest.py
+++ b/tests/LinuxTest.py
@@ -18,7 +18,7 @@ class LinuxTest(Test):
r"app2\.example\.com": getenv("TEST_DOMAIN1_2"),
r"app3\.example\.com": getenv("TEST_DOMAIN1_3"),
}
- if not distro in ["ubuntu", "debian", "fedora", "centos"]:
+ if not distro in ("ubuntu", "debian", "fedora", "centos"):
raise Exception(f"unknown distro {distro}")
self.__distro = distro
self.__logger = setup_logger("Linux_test", getenv("LOGLEVEL", "INFO"))
@@ -40,9 +40,9 @@ class LinuxTest(Test):
proc = run(cmd, shell=True)
if proc.returncode != 0:
raise Exception("docker run failed (linux stack)")
- if distro in ["ubuntu", "debian"]:
+ if distro in ("ubuntu", "debian"):
cmd = "apt install -y /opt/\$(ls /opt | grep deb)"
- elif distro in ["centos", "fedora"]:
+ elif distro in ("centos", "fedora"):
cmd = "dnf install -y /opt/\$(ls /opt | grep rpm)"
proc = LinuxTest.docker_exec(distro, cmd)
if proc.returncode != 0:
@@ -64,7 +64,7 @@ class LinuxTest(Test):
f"docker exec failed for directory {src} (linux stack)"
)
- if distro in ["ubuntu", "debian"]:
+ if distro in ("ubuntu", "debian"):
LinuxTest.docker_exec(
distro,
"DEBIAN_FRONTEND=noninteractive apt-get install -y php-fpm unzip",
@@ -87,7 +87,7 @@ class LinuxTest(Test):
LinuxTest.docker_exec(
distro, "systemctl stop php7.4-fpm ; systemctl start php7.4-fpm"
)
- elif distro in ["centos", "fedora"]:
+ elif distro in ("centos", "fedora"):
LinuxTest.docker_exec(distro, "dnf install -y php-fpm unzip")
LinuxTest.docker_cp(
distro, "./tests/www-rpm.conf", "/etc/php-fpm.d/www.conf"
diff --git a/tests/Test.py b/tests/Test.py
index 573cb176b..664e270cc 100644
--- a/tests/Test.py
+++ b/tests/Test.py
@@ -1,4 +1,5 @@
from abc import ABC
+from pathlib import Path
from time import time, sleep
from requests import get
from traceback import format_exc
@@ -134,11 +135,9 @@ class Test(ABC):
@staticmethod
def replace_in_file(path, old, new):
try:
- with open(path, "r") as f:
- content = f.read()
- content = sub(old, new, content, flags=MULTILINE)
- with open(path, "w") as f:
- f.write(content)
+ Path(path).write_text(
+ sub(old, new, Path(path).read_text(), flags=MULTILINE)
+ )
except:
setup_logger("Test", getenv("LOG_LEVEL", "INFO")).warning(
f"Can't replace file {path}"
diff --git a/tests/main.py b/tests/main.py
index d870612c6..1eae2260f 100755
--- a/tests/main.py
+++ b/tests/main.py
@@ -1,15 +1,15 @@
#!/usr/bin/env python3
+from pathlib import Path
from sys import path, argv, exit
from glob import glob
-from os import getcwd, getenv, _exit
+from os import getenv, _exit
from os.path import isfile
from traceback import format_exc
from json import loads
from subprocess import run
-path.append(f"{getcwd()}/utils")
-path.append(f"{getcwd()}/tests")
+path.extend((f"{Path.cwd()}/utils", f"{Path.cwd()}/tests"))
from Test import Test
from DockerTest import DockerTest
@@ -26,7 +26,7 @@ if len(argv) <= 1:
exit(1)
test_type = argv[1]
-if not test_type in ["linux", "docker", "autoconf", "swarm", "kubernetes", "ansible"]:
+if not test_type in ("linux", "docker", "autoconf", "swarm", "kubernetes", "ansible"):
logger.error(f"Wrong type argument {test_type}")
exit(1)