mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Refactor certbot-new job to optimize the certbot requests
This commit is contained in:
parent
149cffe0bd
commit
f74a7bb192
1 changed files with 55 additions and 68 deletions
|
|
@ -10,6 +10,7 @@ from tarfile import open as tar_open
|
|||
from io import BytesIO
|
||||
from shutil import rmtree
|
||||
from re import findall, MULTILINE
|
||||
from typing import List, LiteralString, Tuple, Union
|
||||
|
||||
for deps_path in [
|
||||
join(sep, "usr", "share", "bunkerweb", *paths)
|
||||
|
|
@ -62,7 +63,7 @@ def certbot_new(domains: str, email: str, letsencrypt_path: Path, letsencrypt_jo
|
|||
).returncode
|
||||
|
||||
|
||||
def certbot_check_domains(domains: list[str], letsencrypt_path: Path) -> int:
|
||||
def certbot_check_domains(domains: Union[List[str], List[LiteralString]], letsencrypt_path: Path) -> Tuple[int, str]:
|
||||
proc = run(
|
||||
[
|
||||
join(sep, "usr", "share", "bunkerweb", "deps", "python", "bin", "certbot"),
|
||||
|
|
@ -82,14 +83,14 @@ def certbot_check_domains(domains: list[str], letsencrypt_path: Path) -> int:
|
|||
)
|
||||
if proc.returncode != 0:
|
||||
logger.error(f"Error while checking certificates :\n{proc.stdout}")
|
||||
return 2
|
||||
return 2, proc.stdout
|
||||
first_needed_domain = domains[0]
|
||||
needed_domains = set(domains)
|
||||
for raw_domains in findall(r"^ Domains: (.*)$", proc.stdout, MULTILINE):
|
||||
current_domains = raw_domains.split(" ")
|
||||
if current_domains[0] == first_needed_domain and set(current_domains) == needed_domains:
|
||||
return 1
|
||||
return 0
|
||||
return 1, proc.stdout
|
||||
return 0, proc.stdout
|
||||
|
||||
|
||||
status = 0
|
||||
|
|
@ -97,10 +98,14 @@ status = 0
|
|||
try:
|
||||
# Check if we're using let's encrypt
|
||||
use_letsencrypt = False
|
||||
is_multisite = getenv("MULTISITE", "no") == "yes"
|
||||
all_domains = getenv("SERVER_NAME", "")
|
||||
server_names = getenv("SERVER_NAME", "").split(" ")
|
||||
|
||||
if getenv("AUTO_LETS_ENCRYPT", "no") == "yes":
|
||||
use_letsencrypt = True
|
||||
elif getenv("MULTISITE", "no") == "yes":
|
||||
for first_server in getenv("SERVER_NAME", "").split(" "):
|
||||
elif is_multisite:
|
||||
for first_server in server_names:
|
||||
if first_server and getenv(f"{first_server}_AUTO_LETS_ENCRYPT", "no") == "yes":
|
||||
use_letsencrypt = True
|
||||
break
|
||||
|
|
@ -108,6 +113,9 @@ try:
|
|||
if not use_letsencrypt:
|
||||
logger.info("Let's Encrypt is not activated, skipping generation...")
|
||||
_exit(0)
|
||||
elif not getenv("SERVER_NAME"):
|
||||
logger.warning("There are no server names, skipping generation...")
|
||||
_exit(0)
|
||||
|
||||
# Create directory if it doesn't exist
|
||||
letsencrypt_path = Path(sep, "var", "cache", "bunkerweb", "letsencrypt")
|
||||
|
|
@ -148,84 +156,63 @@ try:
|
|||
logger.info("No Let's Encrypt data found in db cache")
|
||||
|
||||
# Multisite case
|
||||
if getenv("MULTISITE", "no") == "yes" and getenv("SERVER_NAME"):
|
||||
for first_server in getenv("SERVER_NAME", "").split(" "):
|
||||
if (
|
||||
not first_server
|
||||
or getenv(
|
||||
f"{first_server}_AUTO_LETS_ENCRYPT",
|
||||
getenv("AUTO_LETS_ENCRYPT", "no"),
|
||||
)
|
||||
!= "yes"
|
||||
):
|
||||
continue
|
||||
if is_multisite:
|
||||
domains_sever_names = {}
|
||||
|
||||
domains = getenv(f"{first_server}_SERVER_NAME", first_server)
|
||||
if certbot_check_domains(domains.split(" "), letsencrypt_path) == 1:
|
||||
logger.info(
|
||||
f"Certificates already exists for domain(s) {domains}",
|
||||
)
|
||||
for first_server in server_names:
|
||||
if not first_server or getenv(f"{first_server}_AUTO_LETS_ENCRYPT", getenv("AUTO_LETS_ENCRYPT", "no")) != "yes":
|
||||
continue
|
||||
domains_sever_names[first_server] = getenv(f"{first_server}_SERVER_NAME", first_server)
|
||||
|
||||
real_email = getenv(
|
||||
f"{first_server}_EMAIL_LETS_ENCRYPT",
|
||||
getenv("EMAIL_LETS_ENCRYPT", f"contact@{first_server}"),
|
||||
)
|
||||
if not real_email:
|
||||
real_email = f"contact@{first_server}"
|
||||
ret, stdout = certbot_check_domains(" ".join(domains_sever_names.values()).split(" "), letsencrypt_path)
|
||||
|
||||
logger.info(
|
||||
f"Asking certificates for domains : {domains} (email = {real_email}) ...",
|
||||
)
|
||||
if (
|
||||
certbot_new(
|
||||
domains.replace(" ", ","),
|
||||
real_email,
|
||||
letsencrypt_path,
|
||||
letsencrypt_job_path,
|
||||
)
|
||||
!= 0
|
||||
):
|
||||
status = 2
|
||||
logger.error(
|
||||
f"Certificate generation failed for domain(s) {domains} ...",
|
||||
)
|
||||
continue
|
||||
if ret == 1:
|
||||
logger.info(f"Certificates already exists for all domain(s) {all_domains}")
|
||||
else:
|
||||
domains_to_ask = []
|
||||
|
||||
if ret == 2:
|
||||
domains_to_ask = server_names
|
||||
else:
|
||||
status = 1 if status == 0 else status
|
||||
logger.info(f"Certificate generation succeeded for domain(s) : {domains}")
|
||||
valid_domains = findall(r"Domains: (.*)$", stdout, MULTILINE)
|
||||
for first_server, domains in domains_sever_names.items():
|
||||
if all(domain in valid_domains for domain in domains.split(" ")):
|
||||
logger.info(f"Certificates already exists for domain(s) {domains}")
|
||||
continue
|
||||
domains_to_ask.append(first_server)
|
||||
|
||||
for first_server in domains_to_ask:
|
||||
domains = getenv(f"{first_server}_SERVER_NAME", first_server)
|
||||
real_email = getenv(f"{first_server}_EMAIL_LETS_ENCRYPT", getenv("EMAIL_LETS_ENCRYPT", f"contact@{first_server}"))
|
||||
if not real_email:
|
||||
real_email = f"contact@{first_server}"
|
||||
|
||||
logger.info(f"Asking certificates for domains : {domains} (email = {real_email}) ...")
|
||||
if certbot_new(domains.replace(" ", ","), real_email, letsencrypt_path, letsencrypt_job_path) != 0:
|
||||
status = 2
|
||||
logger.error(f"Certificate generation failed for domain(s) {domains} ...")
|
||||
continue
|
||||
else:
|
||||
status = 1 if status == 0 else status
|
||||
logger.info(f"Certificate generation succeeded for domain(s) : {domains}")
|
||||
# Singlesite case
|
||||
elif getenv("AUTO_LETS_ENCRYPT", "no") == "yes" and getenv("SERVER_NAME"):
|
||||
first_server = getenv("SERVER_NAME", "").split(" ")[0]
|
||||
domains = getenv("SERVER_NAME", "")
|
||||
else:
|
||||
first_server = server_names[0]
|
||||
|
||||
if certbot_check_domains(domains.split(" "), letsencrypt_path) == 1:
|
||||
logger.info(
|
||||
f"Certificates already exists for domain(s) {domains}",
|
||||
)
|
||||
if certbot_check_domains(all_domains.split(" "), letsencrypt_path)[0] == 1:
|
||||
logger.info(f"Certificates already exists for domain(s) {all_domains}")
|
||||
else:
|
||||
real_email = getenv("EMAIL_LETS_ENCRYPT", f"contact@{first_server}")
|
||||
if not real_email:
|
||||
real_email = f"contact@{first_server}"
|
||||
|
||||
logger.info(
|
||||
f"Asking certificates for domain(s) : {domains} (email = {real_email}) ...",
|
||||
)
|
||||
if (
|
||||
certbot_new(
|
||||
domains.replace(" ", ","),
|
||||
real_email,
|
||||
letsencrypt_path,
|
||||
letsencrypt_job_path,
|
||||
)
|
||||
!= 0
|
||||
):
|
||||
logger.info(f"Asking certificates for domain(s) : {all_domains} (email = {real_email}) ...")
|
||||
if certbot_new(all_domains.replace(" ", ","), real_email, letsencrypt_path, letsencrypt_job_path) != 0:
|
||||
status = 2
|
||||
logger.error(f"Certificate generation failed for domain(s) : {domains}")
|
||||
logger.error(f"Certificate generation failed for domain(s) : {all_domains}")
|
||||
else:
|
||||
status = 1
|
||||
logger.info(f"Certificate generation succeeded for domain(s) : {domains}")
|
||||
logger.info(f"Certificate generation succeeded for domain(s) : {all_domains}")
|
||||
|
||||
# Put new folder in cache
|
||||
bio = BytesIO()
|
||||
|
|
|
|||
Loading…
Reference in a new issue