mirror of
https://github.com/bunkerity/bunkerweb
synced 2026-05-24 09:28:37 +00:00
Refactor Let's Encrypt job scripts
This commit is contained in:
parent
6a00df269f
commit
cf282a3a42
6 changed files with 102 additions and 247 deletions
|
|
@ -20,6 +20,7 @@ for deps_path in [
|
|||
sys_path.append(deps_path)
|
||||
|
||||
from Database import Database # type: ignore
|
||||
from jobs import get_integration # type: ignore
|
||||
from logger import setup_logger # type: ignore
|
||||
from API import API # type: ignore
|
||||
|
||||
|
|
@ -28,25 +29,11 @@ status = 0
|
|||
|
||||
try:
|
||||
# Get env vars
|
||||
bw_integration = "Linux"
|
||||
integration_path = Path(sep, "usr", "share", "bunkerweb", "INTEGRATION")
|
||||
os_release_path = Path(sep, "etc", "os-release")
|
||||
if getenv("KUBERNETES_MODE", "no") == "yes":
|
||||
bw_integration = "Kubernetes"
|
||||
elif getenv("SWARM_MODE", "no") == "yes":
|
||||
bw_integration = "Swarm"
|
||||
elif getenv("AUTOCONF_MODE", "no") == "yes":
|
||||
bw_integration = "Autoconf"
|
||||
elif integration_path.is_file():
|
||||
bw_integration = integration_path.read_text(encoding="utf-8").strip()
|
||||
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text(encoding="utf-8"):
|
||||
bw_integration = "Docker"
|
||||
|
||||
token = getenv("CERTBOT_TOKEN", "")
|
||||
validation = getenv("CERTBOT_VALIDATION", "")
|
||||
|
||||
# Cluster case
|
||||
if bw_integration in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
|
||||
if get_integration() in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
|
||||
db = Database(logger, sqlalchemy_string=getenv("DATABASE_URI", None), pool=False)
|
||||
lock = Lock()
|
||||
|
||||
|
|
@ -54,39 +41,20 @@ try:
|
|||
instances = db.get_instances()
|
||||
|
||||
for instance in instances:
|
||||
api = API(
|
||||
f"http://{instance['hostname']}:{instance['port']}",
|
||||
host=instance["server_name"],
|
||||
)
|
||||
sent, err, status, resp = api.request(
|
||||
"POST",
|
||||
"/lets-encrypt/challenge",
|
||||
data={"token": token, "validation": validation},
|
||||
)
|
||||
api = API(f"http://{instance['hostname']}:{instance['port']}", host=instance["server_name"])
|
||||
sent, err, status, resp = api.request("POST", "/lets-encrypt/challenge", data={"token": token, "validation": validation})
|
||||
if not sent:
|
||||
status = 1
|
||||
logger.error(f"Can't send API request to {api.endpoint}/lets-encrypt/challenge : {err}")
|
||||
elif status != 200:
|
||||
status = 1
|
||||
logger.error(
|
||||
f"Error while sending API request to {api.endpoint}/lets-encrypt/challenge : status = {resp['status']}, msg = {resp['msg']}",
|
||||
)
|
||||
logger.error(f"Error while sending API request to {api.endpoint}/lets-encrypt/challenge : status = {resp['status']}, msg = {resp['msg']}")
|
||||
else:
|
||||
logger.info(
|
||||
f"Successfully sent API request to {api.endpoint}/lets-encrypt/challenge",
|
||||
)
|
||||
logger.info(f"Successfully sent API request to {api.endpoint}/lets-encrypt/challenge")
|
||||
|
||||
# Linux case
|
||||
else:
|
||||
root_dir = Path(
|
||||
sep,
|
||||
"var",
|
||||
"tmp",
|
||||
"bunkerweb",
|
||||
"lets-encrypt",
|
||||
".well-known",
|
||||
"acme-challenge",
|
||||
)
|
||||
root_dir = Path(sep, "var", "tmp", "bunkerweb", "lets-encrypt", ".well-known", "acme-challenge")
|
||||
root_dir.mkdir(parents=True, exist_ok=True)
|
||||
root_dir.joinpath(token).write_text(validation, encoding="utf-8")
|
||||
except:
|
||||
|
|
|
|||
|
|
@ -20,6 +20,7 @@ for deps_path in [
|
|||
sys_path.append(deps_path)
|
||||
|
||||
from Database import Database # type: ignore
|
||||
from jobs import get_integration # type: ignore
|
||||
from logger import setup_logger # type: ignore
|
||||
from API import API # type: ignore
|
||||
|
||||
|
|
@ -28,60 +29,29 @@ status = 0
|
|||
|
||||
try:
|
||||
# Get env vars
|
||||
bw_integration = "Linux"
|
||||
integration_path = Path(sep, "usr", "share", "bunkerweb", "INTEGRATION")
|
||||
os_release_path = Path(sep, "etc", "os-release")
|
||||
if getenv("KUBERNETES_MODE", "no") == "yes":
|
||||
bw_integration = "Kubernetes"
|
||||
elif getenv("SWARM_MODE", "no") == "yes":
|
||||
bw_integration = "Swarm"
|
||||
elif getenv("AUTOCONF_MODE", "no") == "yes":
|
||||
bw_integration = "Autoconf"
|
||||
elif integration_path.is_file():
|
||||
bw_integration = integration_path.read_text(encoding="utf-8").strip()
|
||||
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text(encoding="utf-8"):
|
||||
bw_integration = "Docker"
|
||||
|
||||
token = getenv("CERTBOT_TOKEN", "")
|
||||
|
||||
# Cluster case
|
||||
if bw_integration in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
|
||||
if get_integration() in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
|
||||
db = Database(logger, sqlalchemy_string=getenv("DATABASE_URI", None), pool=False)
|
||||
lock = Lock()
|
||||
with lock:
|
||||
instances = db.get_instances()
|
||||
|
||||
for instance in instances:
|
||||
api = API(
|
||||
f"http://{instance['hostname']}:{instance['port']}",
|
||||
host=instance["server_name"],
|
||||
)
|
||||
api = API(f"http://{instance['hostname']}:{instance['port']}", host=instance["server_name"])
|
||||
sent, err, status, resp = api.request("DELETE", "/lets-encrypt/challenge", data={"token": token})
|
||||
if not sent:
|
||||
status = 1
|
||||
logger.error(f"Can't send API request to {api.endpoint}/lets-encrypt/challenge : {err}")
|
||||
elif status != 200:
|
||||
status = 1
|
||||
logger.error(
|
||||
f"Error while sending API request to {api.endpoint}/lets-encrypt/challenge : status = {resp['status']}, msg = {resp['msg']}",
|
||||
)
|
||||
logger.error(f"Error while sending API request to {api.endpoint}/lets-encrypt/challenge : status = {resp['status']}, msg = {resp['msg']}")
|
||||
else:
|
||||
logger.info(
|
||||
f"Successfully sent API request to {api.endpoint}/lets-encrypt/challenge",
|
||||
)
|
||||
logger.info(f"Successfully sent API request to {api.endpoint}/lets-encrypt/challenge")
|
||||
# Linux case
|
||||
else:
|
||||
challenge_path = Path(
|
||||
sep,
|
||||
"var",
|
||||
"tmp",
|
||||
"bunkerweb",
|
||||
"lets-encrypt",
|
||||
".well-known",
|
||||
"acme-challenge",
|
||||
token,
|
||||
)
|
||||
challenge_path.unlink(missing_ok=True)
|
||||
Path(sep, "var", "tmp", "bunkerweb", "lets-encrypt", ".well-known", "acme-challenge", token).unlink(missing_ok=True)
|
||||
except:
|
||||
status = 1
|
||||
logger.error(f"Exception while running certbot-cleanup.py :\n{format_exc()}")
|
||||
|
|
|
|||
|
|
@ -3,7 +3,6 @@
|
|||
from io import BytesIO
|
||||
from os import getenv, sep
|
||||
from os.path import join
|
||||
from pathlib import Path
|
||||
from subprocess import DEVNULL, STDOUT, run
|
||||
from sys import exit as sys_exit, path as sys_path
|
||||
from tarfile import open as tar_open
|
||||
|
|
@ -23,6 +22,7 @@ for deps_path in [
|
|||
sys_path.append(deps_path)
|
||||
|
||||
from Database import Database # type: ignore
|
||||
from jobs import get_integration # type: ignore
|
||||
from logger import setup_logger # type: ignore
|
||||
from API import API # type: ignore
|
||||
|
||||
|
|
@ -31,34 +31,17 @@ status = 0
|
|||
|
||||
try:
|
||||
# Get env vars
|
||||
bw_integration = "Linux"
|
||||
integration_path = Path(sep, "usr", "share", "bunkerweb", "INTEGRATION")
|
||||
os_release_path = Path(sep, "etc", "os-release")
|
||||
if getenv("KUBERNETES_MODE", "no") == "yes":
|
||||
bw_integration = "Kubernetes"
|
||||
elif getenv("SWARM_MODE", "no") == "yes":
|
||||
bw_integration = "Swarm"
|
||||
elif getenv("AUTOCONF_MODE", "no") == "yes":
|
||||
bw_integration = "Autoconf"
|
||||
elif integration_path.is_file():
|
||||
bw_integration = integration_path.read_text(encoding="utf-8").strip()
|
||||
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text(encoding="utf-8"):
|
||||
bw_integration = "Docker"
|
||||
|
||||
token = getenv("CERTBOT_TOKEN", "")
|
||||
|
||||
logger.info(f"Certificates renewal for {getenv('RENEWED_DOMAINS')} successful")
|
||||
|
||||
# Cluster case
|
||||
if bw_integration in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
|
||||
if get_integration() in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
|
||||
# Create tarball of /var/cache/bunkerweb/letsencrypt
|
||||
tgz = BytesIO()
|
||||
|
||||
with tar_open(mode="w:gz", fileobj=tgz, compresslevel=3) as tf:
|
||||
tf.add(
|
||||
join(sep, "var", "cache", "bunkerweb", "letsencrypt", "etc"),
|
||||
arcname="etc",
|
||||
)
|
||||
tf.add(join(sep, "var", "cache", "bunkerweb", "letsencrypt", "etc"), arcname="etc")
|
||||
tgz.seek(0, 0)
|
||||
files = {"archive.tar.gz": tgz}
|
||||
|
||||
|
|
@ -95,15 +78,7 @@ try:
|
|||
logger.info(f"Successfully sent API request to {api.endpoint}/reload")
|
||||
# Linux case
|
||||
else:
|
||||
if (
|
||||
run(
|
||||
[join(sep, "usr", "sbin", "nginx"), "-s", "reload"],
|
||||
stdin=DEVNULL,
|
||||
stderr=STDOUT,
|
||||
check=False,
|
||||
).returncode
|
||||
!= 0
|
||||
):
|
||||
if run([join(sep, "usr", "sbin", "nginx"), "-s", "reload"], stdin=DEVNULL, stderr=STDOUT, check=False).returncode != 0:
|
||||
status = 1
|
||||
logger.error("Error while reloading nginx")
|
||||
else:
|
||||
|
|
|
|||
|
|
@ -9,8 +9,7 @@ from traceback import format_exc
|
|||
from tarfile import open as tar_open
|
||||
from io import BytesIO
|
||||
from shutil import rmtree
|
||||
from re import findall, MULTILINE
|
||||
from typing import List, LiteralString, Tuple, Union
|
||||
from re import MULTILINE, search
|
||||
|
||||
for deps_path in [
|
||||
join(sep, "usr", "share", "bunkerweb", *paths)
|
||||
|
|
@ -30,24 +29,31 @@ from jobs import get_file_in_db, set_file_in_db # type: ignore
|
|||
logger = setup_logger("LETS-ENCRYPT.new", getenv("LOG_LEVEL", "INFO"))
|
||||
status = 0
|
||||
|
||||
CERTBOT_BIN = join(sep, "usr", "share", "bunkerweb", "deps", "python", "bin", "certbot")
|
||||
|
||||
def certbot_new(domains: str, email: str, letsencrypt_path: Path, letsencrypt_job_path: Path) -> int:
|
||||
LETS_ENCRYPT_PATH = Path(sep, "var", "cache", "bunkerweb", "letsencrypt")
|
||||
LETS_ENCRYPT_JOBS_PATH = Path(sep, "usr", "share", "bunkerweb", "core", "letsencrypt", "jobs")
|
||||
LETS_ENCRYPT_WORK_DIR = join(sep, "var", "lib", "bunkerweb", "letsencrypt")
|
||||
LETS_ENCRYPT_LOGS_DIR = join(sep, "var", "log", "bunkerweb")
|
||||
|
||||
|
||||
def certbot_new(domains: str, email: str, use_letsencrypt_staging: bool = False) -> int:
|
||||
return run(
|
||||
[
|
||||
join(sep, "usr", "share", "bunkerweb", "deps", "python", "bin", "certbot"),
|
||||
CERTBOT_BIN,
|
||||
"certonly",
|
||||
"--config-dir",
|
||||
str(letsencrypt_path.joinpath("etc")),
|
||||
LETS_ENCRYPT_PATH.joinpath("etc").as_posix(),
|
||||
"--work-dir",
|
||||
join(sep, "var", "lib", "bunkerweb", "letsencrypt"),
|
||||
LETS_ENCRYPT_WORK_DIR,
|
||||
"--logs-dir",
|
||||
join(sep, "var", "log", "bunkerweb"),
|
||||
LETS_ENCRYPT_LOGS_DIR,
|
||||
"--manual",
|
||||
"--preferred-challenges=http",
|
||||
"--manual-auth-hook",
|
||||
str(letsencrypt_job_path.joinpath("certbot-auth.py")),
|
||||
LETS_ENCRYPT_JOBS_PATH.joinpath("certbot-auth.py").as_posix(),
|
||||
"--manual-cleanup-hook",
|
||||
str(letsencrypt_job_path.joinpath("certbot-cleanup.py")),
|
||||
LETS_ENCRYPT_JOBS_PATH.joinpath("certbot-cleanup.py").as_posix(),
|
||||
"-n",
|
||||
"-d",
|
||||
domains,
|
||||
|
|
@ -56,43 +62,13 @@ def certbot_new(domains: str, email: str, letsencrypt_path: Path, letsencrypt_jo
|
|||
"--agree-tos",
|
||||
"--expand",
|
||||
]
|
||||
+ (["--staging"] if getenv("USE_LETS_ENCRYPT_STAGING", "no") == "yes" else []),
|
||||
+ (["--staging"] if use_letsencrypt_staging else []),
|
||||
stdin=DEVNULL,
|
||||
stderr=STDOUT,
|
||||
env=environ.copy() | {"PYTHONPATH": join(sep, "usr", "share", "bunkerweb", "deps", "python")},
|
||||
).returncode
|
||||
|
||||
|
||||
def certbot_check_domains(domains: Union[List[str], List[LiteralString]], letsencrypt_path: Path) -> Tuple[int, str]:
|
||||
proc = run(
|
||||
[
|
||||
join(sep, "usr", "share", "bunkerweb", "deps", "python", "bin", "certbot"),
|
||||
"certificates",
|
||||
"--config-dir",
|
||||
str(letsencrypt_path.joinpath("etc")),
|
||||
"--work-dir",
|
||||
join(sep, "var", "lib", "bunkerweb", "letsencrypt"),
|
||||
"--logs-dir",
|
||||
join(sep, "var", "log", "bunkerweb"),
|
||||
],
|
||||
stdin=DEVNULL,
|
||||
stdout=PIPE,
|
||||
stderr=STDOUT,
|
||||
text=True,
|
||||
env=environ.copy() | {"PYTHONPATH": join(sep, "usr", "share", "bunkerweb", "deps", "python")},
|
||||
)
|
||||
if proc.returncode != 0:
|
||||
logger.error(f"Error while checking certificates :\n{proc.stdout}")
|
||||
return 2, proc.stdout
|
||||
first_needed_domain = domains[0]
|
||||
needed_domains = set(domains)
|
||||
for raw_domains in findall(r"^ Domains: (.*)$", proc.stdout, MULTILINE):
|
||||
current_domains = raw_domains.split(" ")
|
||||
if current_domains[0] == first_needed_domain and set(current_domains) == needed_domains:
|
||||
return 1, proc.stdout
|
||||
return 0, proc.stdout
|
||||
|
||||
|
||||
status = 0
|
||||
|
||||
try:
|
||||
|
|
@ -100,7 +76,7 @@ try:
|
|||
use_letsencrypt = False
|
||||
is_multisite = getenv("MULTISITE", "no") == "yes"
|
||||
all_domains = getenv("SERVER_NAME", "")
|
||||
server_names = getenv("SERVER_NAME", "").split(" ")
|
||||
server_names = all_domains.split(" ")
|
||||
|
||||
if getenv("AUTO_LETS_ENCRYPT", "no") == "yes":
|
||||
use_letsencrypt = True
|
||||
|
|
@ -117,44 +93,27 @@ try:
|
|||
logger.warning("There are no server names, skipping generation...")
|
||||
_exit(0)
|
||||
|
||||
# Create directory if it doesn't exist
|
||||
letsencrypt_path = Path(sep, "var", "cache", "bunkerweb", "letsencrypt")
|
||||
letsencrypt_path.mkdir(parents=True, exist_ok=True)
|
||||
|
||||
letsencrypt_job_path = Path(sep, "usr", "share", "bunkerweb", "core", "letsencrypt", "jobs")
|
||||
# Create directories if they doesn't exist
|
||||
LETS_ENCRYPT_PATH.mkdir(parents=True, exist_ok=True)
|
||||
Path(sep, "var", "lib", "bunkerweb", "letsencrypt").mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Get env vars
|
||||
bw_integration = "Linux"
|
||||
integration_path = Path(sep, "usr", "share", "bunkerweb", "INTEGRATION")
|
||||
os_release_path = Path(sep, "etc", "os-release")
|
||||
if getenv("KUBERNETES_MODE", "no") == "yes":
|
||||
bw_integration = "Kubernetes"
|
||||
elif getenv("SWARM_MODE", "no") == "yes":
|
||||
bw_integration = "Swarm"
|
||||
elif getenv("AUTOCONF_MODE", "no") == "yes":
|
||||
bw_integration = "Autoconf"
|
||||
elif integration_path.is_file():
|
||||
bw_integration = integration_path.read_text(encoding="utf-8").strip()
|
||||
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text(encoding="utf-8"):
|
||||
bw_integration = "Docker"
|
||||
|
||||
# Extract letsencrypt folder if it exists in db
|
||||
db = Database(logger, sqlalchemy_string=getenv("DATABASE_URI", None), pool=False)
|
||||
db = Database(logger, sqlalchemy_string=getenv("DATABASE_URI"), pool=False)
|
||||
|
||||
tgz = get_file_in_db("folder.tgz", db, job_name="certbot-renew")
|
||||
if tgz:
|
||||
# Delete folder if needed
|
||||
if letsencrypt_path.exists():
|
||||
rmtree(str(letsencrypt_path), ignore_errors=True)
|
||||
letsencrypt_path.mkdir(parents=True, exist_ok=True)
|
||||
if LETS_ENCRYPT_PATH.exists():
|
||||
rmtree(LETS_ENCRYPT_PATH, ignore_errors=True)
|
||||
LETS_ENCRYPT_PATH.mkdir(parents=True, exist_ok=True)
|
||||
# Extract it
|
||||
with tar_open(name="folder.tgz", mode="r:gz", fileobj=BytesIO(tgz)) as tf:
|
||||
tf.extractall(str(letsencrypt_path))
|
||||
tf.extractall(LETS_ENCRYPT_PATH)
|
||||
logger.info("Successfully retrieved Let's Encrypt data from db cache")
|
||||
else:
|
||||
logger.info("No Let's Encrypt data found in db cache")
|
||||
|
||||
domains_to_ask = []
|
||||
# Multisite case
|
||||
if is_multisite:
|
||||
domains_sever_names = {}
|
||||
|
|
@ -163,61 +122,58 @@ try:
|
|||
if not first_server or getenv(f"{first_server}_AUTO_LETS_ENCRYPT", getenv("AUTO_LETS_ENCRYPT", "no")) != "yes":
|
||||
continue
|
||||
domains_sever_names[first_server] = getenv(f"{first_server}_SERVER_NAME", first_server)
|
||||
|
||||
ret, stdout = certbot_check_domains(" ".join(domains_sever_names.values()).split(" "), letsencrypt_path)
|
||||
|
||||
if ret == 1:
|
||||
logger.info(f"Certificates already exists for all domain(s) {all_domains}")
|
||||
else:
|
||||
domains_to_ask = []
|
||||
|
||||
if ret == 2:
|
||||
domains_to_ask = server_names
|
||||
else:
|
||||
valid_domains = findall(r"Domains: (.*)$", stdout, MULTILINE)
|
||||
for first_server, domains in domains_sever_names.items():
|
||||
if all(domain in valid_domains for domain in domains.split(" ")):
|
||||
logger.info(f"Certificates already exists for domain(s) {domains}")
|
||||
continue
|
||||
domains_to_ask.append(first_server)
|
||||
|
||||
for first_server in domains_to_ask:
|
||||
domains = getenv(f"{first_server}_SERVER_NAME", first_server)
|
||||
real_email = getenv(f"{first_server}_EMAIL_LETS_ENCRYPT", getenv("EMAIL_LETS_ENCRYPT", f"contact@{first_server}"))
|
||||
if not real_email:
|
||||
real_email = f"contact@{first_server}"
|
||||
|
||||
logger.info(f"Asking certificates for domains : {domains} (email = {real_email}) ...")
|
||||
if certbot_new(domains.replace(" ", ","), real_email, letsencrypt_path, letsencrypt_job_path) != 0:
|
||||
status = 2
|
||||
logger.error(f"Certificate generation failed for domain(s) {domains} ...")
|
||||
continue
|
||||
else:
|
||||
status = 1 if status == 0 else status
|
||||
logger.info(f"Certificate generation succeeded for domain(s) : {domains}")
|
||||
# Singlesite case
|
||||
else:
|
||||
first_server = server_names[0]
|
||||
domains_sever_names = {server_names[0]: all_domains}
|
||||
|
||||
if certbot_check_domains(all_domains.split(" "), letsencrypt_path)[0] == 1:
|
||||
logger.info(f"Certificates already exists for domain(s) {all_domains}")
|
||||
proc = run(
|
||||
[CERTBOT_BIN, "certificates", "--config-dir", LETS_ENCRYPT_PATH.joinpath("etc").as_posix(), "--work-dir", LETS_ENCRYPT_WORK_DIR, "--logs-dir", LETS_ENCRYPT_LOGS_DIR],
|
||||
stdin=DEVNULL,
|
||||
stdout=PIPE,
|
||||
stderr=STDOUT,
|
||||
text=True,
|
||||
env=environ.copy() | {"PYTHONPATH": join(sep, "usr", "share", "bunkerweb", "deps", "python")},
|
||||
)
|
||||
stdout = proc.stdout
|
||||
|
||||
if proc.returncode != 0:
|
||||
logger.error(f"Error while checking certificates :\n{proc.stdout}")
|
||||
domains_to_ask = server_names
|
||||
else:
|
||||
for first_server, domains in domains_sever_names.items():
|
||||
current_domains = search(rf"Domains: {first_server}(?P<domains>.*)$", stdout, MULTILINE)
|
||||
if not current_domains:
|
||||
domains_to_ask.append(first_server)
|
||||
continue
|
||||
elif set(f"{first_server}{current_domains.groupdict()['domains']}".strip().split(" ")) != set(domains.split(" ")):
|
||||
logger.warning(f"Domains for {first_server} are not the same as in the certificate, asking new certificate...")
|
||||
domains_to_ask.append(first_server)
|
||||
continue
|
||||
logger.info(f"Certificates already exists for domain(s) {domains}")
|
||||
|
||||
for first_server, domains in domains_sever_names.items():
|
||||
if first_server not in domains_to_ask:
|
||||
continue
|
||||
|
||||
real_email = getenv(f"{first_server}_EMAIL_LETS_ENCRYPT", getenv("EMAIL_LETS_ENCRYPT", f"contact@{first_server}"))
|
||||
if not real_email:
|
||||
real_email = f"contact@{first_server}"
|
||||
|
||||
use_letsencrypt_staging = getenv(f"{first_server}_USE_LETS_ENCRYPT_STAGING", getenv("USE_LETS_ENCRYPT_STAGING", "no")) == "yes"
|
||||
|
||||
logger.info(f"Asking certificates for domain(s) : {domains} (email = {real_email}) to Let's Encrypt {'staging ' if use_letsencrypt_staging else ''}...")
|
||||
if certbot_new(domains.replace(" ", ","), real_email, use_letsencrypt_staging) != 0:
|
||||
status = 2
|
||||
logger.error(f"Certificate generation failed for domain(s) {domains} ...")
|
||||
continue
|
||||
else:
|
||||
real_email = getenv("EMAIL_LETS_ENCRYPT", f"contact@{first_server}")
|
||||
if not real_email:
|
||||
real_email = f"contact@{first_server}"
|
||||
|
||||
logger.info(f"Asking certificates for domain(s) : {all_domains} (email = {real_email}) ...")
|
||||
if certbot_new(all_domains.replace(" ", ","), real_email, letsencrypt_path, letsencrypt_job_path) != 0:
|
||||
status = 2
|
||||
logger.error(f"Certificate generation failed for domain(s) : {all_domains}")
|
||||
else:
|
||||
status = 1
|
||||
logger.info(f"Certificate generation succeeded for domain(s) : {all_domains}")
|
||||
status = 1 if status == 0 else status
|
||||
logger.info(f"Certificate generation succeeded for domain(s) : {domains}")
|
||||
|
||||
# Put new folder in cache
|
||||
bio = BytesIO()
|
||||
with tar_open("folder.tgz", mode="w:gz", fileobj=bio, compresslevel=9) as tgz:
|
||||
tgz.add(str(letsencrypt_path), arcname=".")
|
||||
tgz.add(LETS_ENCRYPT_PATH, arcname=".")
|
||||
bio.seek(0, 0)
|
||||
|
||||
# Put tgz in cache
|
||||
|
|
|
|||
|
|
@ -28,6 +28,8 @@ from jobs import get_file_in_db, set_file_in_db # type: ignore
|
|||
logger = setup_logger("LETS-ENCRYPT.renew", getenv("LOG_LEVEL", "INFO"))
|
||||
status = 0
|
||||
|
||||
LETS_ENCRYPT_PATH = Path(sep, "var", "cache", "bunkerweb", "letsencrypt")
|
||||
|
||||
try:
|
||||
# Check if we're using let's encrypt
|
||||
use_letsencrypt = False
|
||||
|
|
@ -44,37 +46,21 @@ try:
|
|||
_exit(0)
|
||||
|
||||
# Create directory if it doesn't exist
|
||||
letsencrypt_path = Path(sep, "var", "cache", "bunkerweb", "letsencrypt")
|
||||
letsencrypt_path.mkdir(parents=True, exist_ok=True)
|
||||
LETS_ENCRYPT_PATH.mkdir(parents=True, exist_ok=True)
|
||||
Path(sep, "var", "lib", "bunkerweb", "letsencrypt").mkdir(parents=True, exist_ok=True)
|
||||
|
||||
# Get env vars
|
||||
bw_integration = "Linux"
|
||||
integration_path = Path(sep, "usr", "share", "bunkerweb", "INTEGRATION")
|
||||
os_release_path = Path(sep, "etc", "os-release")
|
||||
if getenv("KUBERNETES_MODE", "no") == "yes":
|
||||
bw_integration = "Kubernetes"
|
||||
elif getenv("SWARM_MODE", "no") == "yes":
|
||||
bw_integration = "Swarm"
|
||||
elif getenv("AUTOCONF_MODE", "no") == "yes":
|
||||
bw_integration = "Autoconf"
|
||||
elif integration_path.is_file():
|
||||
bw_integration = integration_path.read_text(encoding="utf-8").strip()
|
||||
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text(encoding="utf-8"):
|
||||
bw_integration = "Docker"
|
||||
|
||||
# Extract letsencrypt folder if it exists in db
|
||||
db = Database(logger, sqlalchemy_string=getenv("DATABASE_URI", None), pool=False)
|
||||
db = Database(logger, sqlalchemy_string=getenv("DATABASE_URI"), pool=False)
|
||||
|
||||
tgz = get_file_in_db("folder.tgz", db)
|
||||
if tgz:
|
||||
# Delete folder if needed
|
||||
if letsencrypt_path.exists():
|
||||
rmtree(str(letsencrypt_path), ignore_errors=True)
|
||||
letsencrypt_path.mkdir(parents=True, exist_ok=True)
|
||||
if LETS_ENCRYPT_PATH.exists():
|
||||
rmtree(LETS_ENCRYPT_PATH, ignore_errors=True)
|
||||
LETS_ENCRYPT_PATH.mkdir(parents=True, exist_ok=True)
|
||||
# Extract it
|
||||
with tar_open(name="folder.tgz", mode="r:gz", fileobj=BytesIO(tgz)) as tf:
|
||||
tf.extractall(str(letsencrypt_path))
|
||||
tf.extractall(LETS_ENCRYPT_PATH)
|
||||
logger.info("Successfully retrieved Let's Encrypt data from db cache")
|
||||
else:
|
||||
logger.info("No Let's Encrypt data found in db cache")
|
||||
|
|
@ -86,7 +72,7 @@ try:
|
|||
"renew",
|
||||
"--no-random-sleep-on-renew",
|
||||
"--config-dir",
|
||||
str(letsencrypt_path.joinpath("etc")),
|
||||
LETS_ENCRYPT_PATH.joinpath("etc").as_posix(),
|
||||
"--work-dir",
|
||||
join(sep, "var", "lib", "bunkerweb", "letsencrypt"),
|
||||
"--logs-dir",
|
||||
|
|
@ -105,7 +91,7 @@ try:
|
|||
# Put new folder in cache
|
||||
bio = BytesIO()
|
||||
with tar_open("folder.tgz", mode="w:gz", fileobj=bio, compresslevel=9) as tgz:
|
||||
tgz.add(str(letsencrypt_path), arcname=".")
|
||||
tgz.add(LETS_ENCRYPT_PATH, arcname=".")
|
||||
bio.seek(0, 0)
|
||||
|
||||
# Put tgz in cache
|
||||
|
|
|
|||
|
|
@ -217,19 +217,19 @@ def get_integration() -> str:
|
|||
integration_path = Path(sep, "usr", "share", "bunkerweb", "INTEGRATION")
|
||||
os_release_path = Path(sep, "etc", "os-release")
|
||||
if getenv("KUBERNETES_MODE", "no").lower() == "yes":
|
||||
return "kubernetes"
|
||||
return "Kubernetes"
|
||||
elif getenv("SWARM_MODE", "no").lower() == "yes":
|
||||
return "swarm"
|
||||
return "Swarm"
|
||||
elif getenv("AUTOCONF_MODE", "no").lower() == "yes":
|
||||
return "autoconf"
|
||||
return "Autoconf"
|
||||
elif integration_path.is_file():
|
||||
return integration_path.read_text(encoding="utf-8").strip().lower()
|
||||
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text(encoding="utf-8"):
|
||||
return "docker"
|
||||
return "Docker"
|
||||
|
||||
return "linux"
|
||||
return "Linux"
|
||||
except:
|
||||
return "unknown"
|
||||
return "Unknown"
|
||||
|
||||
|
||||
def get_os_info() -> Dict[str, str]:
|
||||
|
|
|
|||
Loading…
Reference in a new issue