mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-17 22:18:34 +00:00
174 lines
5.9 KiB
Python
174 lines
5.9 KiB
Python
from abc import ABC, abstractmethod
|
||
from sys import stderr
|
||
from time import time, sleep
|
||
from requests import get
|
||
from traceback import format_exc
|
||
from shutil import copytree
|
||
from os.path import isdir, join
|
||
from os import mkdir, makedirs, walk, chmod, rename
|
||
from re import sub, search, MULTILINE
|
||
from datetime import datetime
|
||
from subprocess import run
|
||
from logger import log
|
||
|
||
|
||
class Test(ABC):
|
||
def __init__(self, name, kind, timeout, tests, no_copy_container=False, delay=0):
|
||
self._name = name
|
||
self.__kind = kind
|
||
self._timeout = timeout
|
||
self.__tests = tests
|
||
self._no_copy_container = no_copy_container
|
||
self.__delay = delay
|
||
log(
|
||
"TEST",
|
||
"ℹ️",
|
||
"instiantiated with "
|
||
+ str(len(tests))
|
||
+ " tests and timeout of "
|
||
+ str(timeout)
|
||
+ "s for "
|
||
+ self._name,
|
||
)
|
||
|
||
# Class method
|
||
# called once before running all the different tests for a given integration
|
||
def init():
|
||
try:
|
||
if not isdir("/tmp/bw-data"):
|
||
mkdir("/tmp/bw-data")
|
||
run("sudo chmod 777 /tmp/bw-data", shell=True)
|
||
rm_dirs = ["configs", "plugins", "www"]
|
||
for rm_dir in rm_dirs:
|
||
if isdir(rm_dir):
|
||
run("sudo rm -rf /tmp/bw-data/" + rm_dir, shell=True)
|
||
if not isdir("/tmp/tests"):
|
||
mkdir("/tmp/tests")
|
||
except:
|
||
log("TEST", "❌", "exception while running Test.init()\n" + format_exc())
|
||
return False
|
||
return True
|
||
|
||
# Class method
|
||
# called once all tests ended
|
||
def end():
|
||
return True
|
||
|
||
# helper to check domains
|
||
def _check_domains(self):
|
||
for k, v in self._domains.items():
|
||
if v is None:
|
||
log("TEST", "⚠️", "env " + k + " is None")
|
||
|
||
# called before starting the tests
|
||
# must be override if specific actions needs to be done
|
||
def _setup_test(self):
|
||
try:
|
||
rm_dirs = ["configs", "plugins", "www"]
|
||
for rm_dir in rm_dirs:
|
||
if isdir("/tmp/bw-data/" + rm_dir):
|
||
run(
|
||
"sudo bash -c 'rm -rf /tmp/bw-data/" + rm_dir + "/*'",
|
||
shell=True,
|
||
)
|
||
if isdir("/tmp/tests/" + self._name):
|
||
run("sudo rm -rf /tmp/tests/" + self._name, shell=True)
|
||
copytree("./examples/" + self._name, "/tmp/tests/" + self._name)
|
||
except:
|
||
log(
|
||
"TEST",
|
||
"❌",
|
||
"exception while running Test._setup_test()\n" + format_exc(),
|
||
)
|
||
return False
|
||
return True
|
||
|
||
# called after running the tests
|
||
def _cleanup_test(self):
|
||
try:
|
||
run("sudo rm -rf /tmp/tests/" + self._name, shell=True)
|
||
except:
|
||
log(
|
||
"TEST",
|
||
"❌",
|
||
"exception while running Test._cleanup_test()\n" + format_exc(),
|
||
)
|
||
return False
|
||
return True
|
||
|
||
# run all the tests
|
||
def run_tests(self):
|
||
if not self._setup_test():
|
||
self._debug_fail()
|
||
return False
|
||
if self.__delay != 0:
|
||
log("TEST", "ℹ️", "delay is set, sleeping " + str(self.__delay) + "s")
|
||
sleep(self.__delay)
|
||
start = time()
|
||
while time() < start + self._timeout:
|
||
all_ok = True
|
||
for test in self.__tests:
|
||
ok = self.__run_test(test)
|
||
sleep(1)
|
||
if not ok:
|
||
all_ok = False
|
||
break
|
||
if all_ok:
|
||
elapsed = str(int(time() - start))
|
||
log(
|
||
"TEST",
|
||
"ℹ️",
|
||
"success (" + elapsed + "/" + str(self._timeout) + "s)",
|
||
)
|
||
return self._cleanup_test()
|
||
log("TEST", "⚠️", "tests not ok, retrying in 1s ...")
|
||
self._debug_fail()
|
||
self._cleanup_test()
|
||
log("TEST", "❌", "failed (timeout = " + str(self._timeout) + "s)")
|
||
return False
|
||
|
||
# run a single test
|
||
def __run_test(self, test):
|
||
try:
|
||
ex_url = test["url"]
|
||
for ex_domain, test_domain in self._domains.items():
|
||
if search(ex_domain, ex_url):
|
||
ex_url = sub(ex_domain, test_domain, ex_url)
|
||
break
|
||
if test["type"] == "string":
|
||
r = get(ex_url, timeout=10, verify=False)
|
||
return test["string"].casefold() in r.text.casefold()
|
||
elif test["type"] == "status":
|
||
r = get(ex_url, timeout=10, verify=False)
|
||
return test["status"] == r.status_code
|
||
except:
|
||
# log("TEST", "❌", "exception while running test of type " + test["type"] + " on URL " + ex_url + "\n" + format_exc())
|
||
return False
|
||
raise (Exception("unknow test type " + test["type"]))
|
||
|
||
# called when tests fail : typical case is to show logs
|
||
def _debug_fail(self):
|
||
pass
|
||
|
||
def replace_in_file(path, old, new):
|
||
try:
|
||
with open(path, "r") as f:
|
||
content = f.read()
|
||
content = sub(old, new, content, flags=MULTILINE)
|
||
with open(path, "w") as f:
|
||
f.write(content)
|
||
except:
|
||
log("TEST", "⚠️", "can't replace file " + path + " : " + format_exc())
|
||
|
||
def replace_in_files(path, old, new):
|
||
for root, dirs, files in walk(path):
|
||
for name in files:
|
||
Test.replace_in_file(join(root, name), old, new)
|
||
|
||
def rename(path, old, new):
|
||
for root, dirs, files in walk(path):
|
||
for name in dirs + files:
|
||
full_path = join(root, name)
|
||
new_path = sub(old, new, full_path)
|
||
if full_path != new_path:
|
||
rename(full_path, new_path)
|