bunkerweb/tests/Test.py

206 lines
7.2 KiB
Python
Raw Normal View History

2023-10-03 10:01:24 +00:00
from abc import ABC
2022-07-11 10:17:33 +00:00
from time import time, sleep
from requests import get
from traceback import format_exc
2022-07-13 19:23:58 +00:00
from shutil import copytree
2022-07-11 10:17:33 +00:00
from os.path import isdir, join
2023-10-03 10:01:24 +00:00
from os import mkdir, walk, rename
2022-07-13 19:43:04 +00:00
from re import sub, search, MULTILINE
2022-07-13 19:23:58 +00:00
from subprocess import run
2023-03-05 20:51:04 +00:00
from logger import log
from string import ascii_lowercase, digits
from random import choice
from ssl import SSLContext, create_connection
import OpenSSL.crypto as crypto
from urllib.parse import urlparse
2023-12-20 15:11:06 +00:00
2023-03-09 09:04:59 +00:00
class Test(ABC):
def __init__(self, name, kind, timeout, tests, no_copy_container=False, delay=0):
2022-07-11 10:17:33 +00:00
self._name = name
self.__kind = kind
self._timeout = timeout
2022-07-11 10:17:33 +00:00
self.__tests = tests
self._no_copy_container = no_copy_container
2022-07-27 12:44:46 +00:00
self.__delay = delay
2023-10-03 10:01:24 +00:00
self._domains = {}
2023-03-09 09:04:59 +00:00
log(
"TEST",
"",
f"instiantiated with {len(tests)} tests and timeout of {timeout}s for {self._name}",
2023-03-09 09:04:59 +00:00
)
2022-07-11 10:17:33 +00:00
# called once before running all the different tests for a given integration
2023-10-04 15:50:21 +00:00
@staticmethod
def init():
2023-03-09 09:04:59 +00:00
try:
if not isdir("/tmp/bw-data"):
2022-07-11 10:17:33 +00:00
mkdir("/tmp/bw-data")
2022-07-14 12:33:04 +00:00
run("sudo chmod 777 /tmp/bw-data", shell=True)
2022-07-11 10:17:33 +00:00
rm_dirs = ["configs", "plugins", "www"]
2023-03-09 09:04:59 +00:00
for rm_dir in rm_dirs:
if isdir(rm_dir):
run(f"sudo rm -rf /tmp/bw-data/{rm_dir}", shell=True)
2023-03-09 09:04:59 +00:00
if not isdir("/tmp/tests"):
2022-07-11 14:23:50 +00:00
mkdir("/tmp/tests")
2023-03-09 09:04:59 +00:00
except:
log("TEST", "", f"exception while running Test.init()\n{format_exc()}")
2022-07-11 10:17:33 +00:00
return False
return True
# Class method
2023-10-04 15:50:21 +00:00
@staticmethod
def end():
return True
2022-07-13 20:57:36 +00:00
# helper to check domains
2023-03-09 09:04:59 +00:00
def _check_domains(self):
for k, v in self._domains.items():
if v is None:
log("TEST", "⚠️", f"env {k} is None")
2022-07-11 10:17:33 +00:00
# called before starting the tests
# must be override if specific actions needs to be done
2023-03-09 09:04:59 +00:00
def _setup_test(self):
try:
2022-07-11 14:23:50 +00:00
rm_dirs = ["configs", "plugins", "www"]
2023-03-09 09:04:59 +00:00
for rm_dir in rm_dirs:
if isdir(f"/tmp/bw-data/{rm_dir}"):
2023-03-09 09:04:59 +00:00
run(
f"sudo bash -c 'rm -rf /tmp/bw-data/{rm_dir}/*'",
2023-03-09 09:04:59 +00:00
shell=True,
)
if isdir(f"/tmp/tests/{self._name}"):
run(f"sudo rm -rf /tmp/tests/{self._name}", shell=True)
2023-04-28 13:45:21 +00:00
copytree(f"./examples/{self._name}", f"/tmp/tests/{self._name}")
2023-03-09 09:04:59 +00:00
except:
log(
"TEST",
"",
f"exception while running Test._setup_test()\n{format_exc()}",
2023-03-09 09:04:59 +00:00
)
2022-07-11 10:17:33 +00:00
return False
return True
# called after running the tests
2023-03-09 09:04:59 +00:00
def _cleanup_test(self):
try:
run(f"sudo rm -rf /tmp/tests/{self._name}", shell=True)
2023-03-09 09:04:59 +00:00
except:
log(
"TEST",
"",
f"exception while running Test._cleanup_test()\n{format_exc()}",
2023-03-09 09:04:59 +00:00
)
2022-07-11 10:17:33 +00:00
return False
return True
# run all the tests
2023-03-09 09:04:59 +00:00
def run_tests(self):
if not self._setup_test():
self._debug_fail()
2022-07-11 10:17:33 +00:00
return False
2023-03-09 09:04:59 +00:00
if self.__delay != 0:
log("TEST", "", f"delay is set, sleeping {self.__delay}s")
2022-07-27 12:44:46 +00:00
sleep(self.__delay)
2022-07-11 10:17:33 +00:00
start = time()
2023-03-09 09:04:59 +00:00
while time() < start + self._timeout:
2022-07-11 10:17:33 +00:00
all_ok = True
2023-03-09 09:04:59 +00:00
for test in self.__tests:
2022-07-11 10:17:33 +00:00
ok = self.__run_test(test)
2022-07-20 18:31:50 +00:00
sleep(1)
2023-03-09 09:04:59 +00:00
if not ok:
2022-07-11 10:17:33 +00:00
all_ok = False
break
2023-03-09 09:04:59 +00:00
if all_ok:
2022-07-11 10:17:33 +00:00
elapsed = str(int(time() - start))
2023-03-09 09:04:59 +00:00
log(
"TEST",
"",
f"success ({elapsed}/{self._timeout}s)",
2023-03-09 09:04:59 +00:00
)
2022-07-11 10:17:33 +00:00
return self._cleanup_test()
log("TEST", "⚠️", "tests not ok, retrying in 1s ...")
2022-07-14 12:27:18 +00:00
self._debug_fail()
self._cleanup_test()
log("TEST", "", f"failed (timeout = {self._timeout}s)")
2022-07-11 10:17:33 +00:00
return False
# run a single test
2023-03-09 09:04:59 +00:00
def __run_test(self, test):
try:
ok = False
2022-07-23 09:29:18 +00:00
ex_url = test["url"]
2023-03-09 09:04:59 +00:00
for ex_domain, test_domain in self._domains.items():
if search(ex_domain, ex_url):
2022-07-23 09:29:18 +00:00
ex_url = sub(ex_domain, test_domain, ex_url)
break
2023-03-09 09:04:59 +00:00
if test["type"] == "string":
r = get(ex_url, timeout=10, verify=False)
ok = test["string"].casefold() in r.text.casefold()
if not ok:
log("TEST", "⚠️", f"String not found : {test['string'].casefold()}")
2023-03-09 09:04:59 +00:00
elif test["type"] == "status":
r = get(ex_url, timeout=10, verify=False)
ok = test["status"] == r.status_code
if not ok:
log("TEST", "⚠️", f"Wrong status code : {str(r.status_code)}")
if ok and "tls" in test:
ex_tls = test["tls"]
tls_edit = True
if "tls_edit" in test:
tls_edit = test["tls_edit"]
if tls_edit:
for ex_domain, test_domain in self._domains.items():
if search(ex_domain, ex_tls):
ex_tls = sub(ex_domain, test_domain, ex_tls)
connection = create_connection((urlparse(ex_url).netloc, 443))
context = SSLContext()
sock = context.wrap_socket(connection, server_hostname=urlparse(ex_url).netloc)
cert = sock.getpeercert(True)
sock.close()
x509 = crypto.load_certificate(crypto.FILETYPE_ASN1, cert)
if x509.get_subject().CN != ex_tls:
ok = False
log("TEST", "⚠️", f"wrong cert CN : {x509.get_subject().CN} != {ex_tls}")
return ok
2023-03-09 09:04:59 +00:00
except:
log("TEST", "⚠️", f"Exception : {format_exc()}")
2022-07-13 19:43:04 +00:00
return False
raise (Exception(f"unknown test type {test['type']}"))
2022-07-11 10:17:33 +00:00
2022-07-14 12:27:18 +00:00
# called when tests fail : typical case is to show logs
2023-03-09 09:04:59 +00:00
def _debug_fail(self):
2022-07-14 12:27:18 +00:00
pass
2023-10-03 10:01:24 +00:00
@staticmethod
2023-03-09 09:04:59 +00:00
def replace_in_file(path, old, new):
try:
with open(path, "r") as f:
content = f.read()
content = sub(old, new, content, flags=MULTILINE)
2023-03-09 09:04:59 +00:00
with open(path, "w") as f:
f.write(content)
2023-03-09 09:04:59 +00:00
except:
log("TEST", "⚠️", f"can't replace file {path} : {format_exc()}")
2022-07-11 10:17:33 +00:00
2023-10-03 10:01:24 +00:00
@staticmethod
2023-03-09 09:04:59 +00:00
def replace_in_files(path, old, new):
for root, dirs, files in walk(path):
for name in files:
Test.replace_in_file(join(root, name), old, new)
2022-07-11 14:23:50 +00:00
2023-10-03 10:01:24 +00:00
@staticmethod
2023-03-09 09:04:59 +00:00
def rename(path, old, new):
for root, dirs, files in walk(path):
for name in dirs + files:
2022-07-11 10:17:33 +00:00
full_path = join(root, name)
2022-07-20 14:26:53 +00:00
new_path = sub(old, new, full_path)
2023-03-09 09:04:59 +00:00
if full_path != new_path:
rename(full_path, new_path)
2023-10-03 10:01:24 +00:00
@staticmethod
def random_string(length):
charset = ascii_lowercase + digits
return "".join(choice(charset) for i in range(length))