Refactor certbot-new job to optimize the certbot requests

This commit is contained in:
Théophile Diot 2024-02-28 11:44:03 +01:00
parent 149cffe0bd
commit f74a7bb192
No known key found for this signature in database
GPG key ID: 248FEA4BAE400D06

View file

@ -10,6 +10,7 @@ from tarfile import open as tar_open
from io import BytesIO
from shutil import rmtree
from re import findall, MULTILINE
from typing import List, LiteralString, Tuple, Union
for deps_path in [
join(sep, "usr", "share", "bunkerweb", *paths)
@ -62,7 +63,7 @@ def certbot_new(domains: str, email: str, letsencrypt_path: Path, letsencrypt_jo
).returncode
def certbot_check_domains(domains: list[str], letsencrypt_path: Path) -> int:
def certbot_check_domains(domains: Union[List[str], List[LiteralString]], letsencrypt_path: Path) -> Tuple[int, str]:
proc = run(
[
join(sep, "usr", "share", "bunkerweb", "deps", "python", "bin", "certbot"),
@ -82,14 +83,14 @@ def certbot_check_domains(domains: list[str], letsencrypt_path: Path) -> int:
)
if proc.returncode != 0:
logger.error(f"Error while checking certificates :\n{proc.stdout}")
return 2
return 2, proc.stdout
first_needed_domain = domains[0]
needed_domains = set(domains)
for raw_domains in findall(r"^ Domains: (.*)$", proc.stdout, MULTILINE):
current_domains = raw_domains.split(" ")
if current_domains[0] == first_needed_domain and set(current_domains) == needed_domains:
return 1
return 0
return 1, proc.stdout
return 0, proc.stdout
status = 0
@ -97,10 +98,14 @@ status = 0
try:
# Check if we're using let's encrypt
use_letsencrypt = False
is_multisite = getenv("MULTISITE", "no") == "yes"
all_domains = getenv("SERVER_NAME", "")
server_names = getenv("SERVER_NAME", "").split(" ")
if getenv("AUTO_LETS_ENCRYPT", "no") == "yes":
use_letsencrypt = True
elif getenv("MULTISITE", "no") == "yes":
for first_server in getenv("SERVER_NAME", "").split(" "):
elif is_multisite:
for first_server in server_names:
if first_server and getenv(f"{first_server}_AUTO_LETS_ENCRYPT", "no") == "yes":
use_letsencrypt = True
break
@ -108,6 +113,9 @@ try:
if not use_letsencrypt:
logger.info("Let's Encrypt is not activated, skipping generation...")
_exit(0)
elif not getenv("SERVER_NAME"):
logger.warning("There are no server names, skipping generation...")
_exit(0)
# Create directory if it doesn't exist
letsencrypt_path = Path(sep, "var", "cache", "bunkerweb", "letsencrypt")
@ -148,84 +156,63 @@ try:
logger.info("No Let's Encrypt data found in db cache")
# Multisite case
if getenv("MULTISITE", "no") == "yes" and getenv("SERVER_NAME"):
for first_server in getenv("SERVER_NAME", "").split(" "):
if (
not first_server
or getenv(
f"{first_server}_AUTO_LETS_ENCRYPT",
getenv("AUTO_LETS_ENCRYPT", "no"),
)
!= "yes"
):
continue
if is_multisite:
domains_sever_names = {}
domains = getenv(f"{first_server}_SERVER_NAME", first_server)
if certbot_check_domains(domains.split(" "), letsencrypt_path) == 1:
logger.info(
f"Certificates already exists for domain(s) {domains}",
)
for first_server in server_names:
if not first_server or getenv(f"{first_server}_AUTO_LETS_ENCRYPT", getenv("AUTO_LETS_ENCRYPT", "no")) != "yes":
continue
domains_sever_names[first_server] = getenv(f"{first_server}_SERVER_NAME", first_server)
real_email = getenv(
f"{first_server}_EMAIL_LETS_ENCRYPT",
getenv("EMAIL_LETS_ENCRYPT", f"contact@{first_server}"),
)
if not real_email:
real_email = f"contact@{first_server}"
ret, stdout = certbot_check_domains(" ".join(domains_sever_names.values()).split(" "), letsencrypt_path)
logger.info(
f"Asking certificates for domains : {domains} (email = {real_email}) ...",
)
if (
certbot_new(
domains.replace(" ", ","),
real_email,
letsencrypt_path,
letsencrypt_job_path,
)
!= 0
):
status = 2
logger.error(
f"Certificate generation failed for domain(s) {domains} ...",
)
continue
if ret == 1:
logger.info(f"Certificates already exists for all domain(s) {all_domains}")
else:
domains_to_ask = []
if ret == 2:
domains_to_ask = server_names
else:
status = 1 if status == 0 else status
logger.info(f"Certificate generation succeeded for domain(s) : {domains}")
valid_domains = findall(r"Domains: (.*)$", stdout, MULTILINE)
for first_server, domains in domains_sever_names.items():
if all(domain in valid_domains for domain in domains.split(" ")):
logger.info(f"Certificates already exists for domain(s) {domains}")
continue
domains_to_ask.append(first_server)
for first_server in domains_to_ask:
domains = getenv(f"{first_server}_SERVER_NAME", first_server)
real_email = getenv(f"{first_server}_EMAIL_LETS_ENCRYPT", getenv("EMAIL_LETS_ENCRYPT", f"contact@{first_server}"))
if not real_email:
real_email = f"contact@{first_server}"
logger.info(f"Asking certificates for domains : {domains} (email = {real_email}) ...")
if certbot_new(domains.replace(" ", ","), real_email, letsencrypt_path, letsencrypt_job_path) != 0:
status = 2
logger.error(f"Certificate generation failed for domain(s) {domains} ...")
continue
else:
status = 1 if status == 0 else status
logger.info(f"Certificate generation succeeded for domain(s) : {domains}")
# Singlesite case
elif getenv("AUTO_LETS_ENCRYPT", "no") == "yes" and getenv("SERVER_NAME"):
first_server = getenv("SERVER_NAME", "").split(" ")[0]
domains = getenv("SERVER_NAME", "")
else:
first_server = server_names[0]
if certbot_check_domains(domains.split(" "), letsencrypt_path) == 1:
logger.info(
f"Certificates already exists for domain(s) {domains}",
)
if certbot_check_domains(all_domains.split(" "), letsencrypt_path)[0] == 1:
logger.info(f"Certificates already exists for domain(s) {all_domains}")
else:
real_email = getenv("EMAIL_LETS_ENCRYPT", f"contact@{first_server}")
if not real_email:
real_email = f"contact@{first_server}"
logger.info(
f"Asking certificates for domain(s) : {domains} (email = {real_email}) ...",
)
if (
certbot_new(
domains.replace(" ", ","),
real_email,
letsencrypt_path,
letsencrypt_job_path,
)
!= 0
):
logger.info(f"Asking certificates for domain(s) : {all_domains} (email = {real_email}) ...")
if certbot_new(all_domains.replace(" ", ","), real_email, letsencrypt_path, letsencrypt_job_path) != 0:
status = 2
logger.error(f"Certificate generation failed for domain(s) : {domains}")
logger.error(f"Certificate generation failed for domain(s) : {all_domains}")
else:
status = 1
logger.info(f"Certificate generation succeeded for domain(s) : {domains}")
logger.info(f"Certificate generation succeeded for domain(s) : {all_domains}")
# Put new folder in cache
bio = BytesIO()