Refactor Let's Encrypt job scripts

This commit is contained in:
Théophile Diot 2024-03-01 10:12:48 +01:00
parent 6a00df269f
commit cf282a3a42
No known key found for this signature in database
GPG key ID: 248FEA4BAE400D06
6 changed files with 102 additions and 247 deletions

View file

@ -20,6 +20,7 @@ for deps_path in [
sys_path.append(deps_path)
from Database import Database # type: ignore
from jobs import get_integration # type: ignore
from logger import setup_logger # type: ignore
from API import API # type: ignore
@ -28,25 +29,11 @@ status = 0
try:
# Get env vars
bw_integration = "Linux"
integration_path = Path(sep, "usr", "share", "bunkerweb", "INTEGRATION")
os_release_path = Path(sep, "etc", "os-release")
if getenv("KUBERNETES_MODE", "no") == "yes":
bw_integration = "Kubernetes"
elif getenv("SWARM_MODE", "no") == "yes":
bw_integration = "Swarm"
elif getenv("AUTOCONF_MODE", "no") == "yes":
bw_integration = "Autoconf"
elif integration_path.is_file():
bw_integration = integration_path.read_text(encoding="utf-8").strip()
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text(encoding="utf-8"):
bw_integration = "Docker"
token = getenv("CERTBOT_TOKEN", "")
validation = getenv("CERTBOT_VALIDATION", "")
# Cluster case
if bw_integration in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
if get_integration() in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
db = Database(logger, sqlalchemy_string=getenv("DATABASE_URI", None), pool=False)
lock = Lock()
@ -54,39 +41,20 @@ try:
instances = db.get_instances()
for instance in instances:
api = API(
f"http://{instance['hostname']}:{instance['port']}",
host=instance["server_name"],
)
sent, err, status, resp = api.request(
"POST",
"/lets-encrypt/challenge",
data={"token": token, "validation": validation},
)
api = API(f"http://{instance['hostname']}:{instance['port']}", host=instance["server_name"])
sent, err, status, resp = api.request("POST", "/lets-encrypt/challenge", data={"token": token, "validation": validation})
if not sent:
status = 1
logger.error(f"Can't send API request to {api.endpoint}/lets-encrypt/challenge : {err}")
elif status != 200:
status = 1
logger.error(
f"Error while sending API request to {api.endpoint}/lets-encrypt/challenge : status = {resp['status']}, msg = {resp['msg']}",
)
logger.error(f"Error while sending API request to {api.endpoint}/lets-encrypt/challenge : status = {resp['status']}, msg = {resp['msg']}")
else:
logger.info(
f"Successfully sent API request to {api.endpoint}/lets-encrypt/challenge",
)
logger.info(f"Successfully sent API request to {api.endpoint}/lets-encrypt/challenge")
# Linux case
else:
root_dir = Path(
sep,
"var",
"tmp",
"bunkerweb",
"lets-encrypt",
".well-known",
"acme-challenge",
)
root_dir = Path(sep, "var", "tmp", "bunkerweb", "lets-encrypt", ".well-known", "acme-challenge")
root_dir.mkdir(parents=True, exist_ok=True)
root_dir.joinpath(token).write_text(validation, encoding="utf-8")
except:

View file

@ -20,6 +20,7 @@ for deps_path in [
sys_path.append(deps_path)
from Database import Database # type: ignore
from jobs import get_integration # type: ignore
from logger import setup_logger # type: ignore
from API import API # type: ignore
@ -28,60 +29,29 @@ status = 0
try:
# Get env vars
bw_integration = "Linux"
integration_path = Path(sep, "usr", "share", "bunkerweb", "INTEGRATION")
os_release_path = Path(sep, "etc", "os-release")
if getenv("KUBERNETES_MODE", "no") == "yes":
bw_integration = "Kubernetes"
elif getenv("SWARM_MODE", "no") == "yes":
bw_integration = "Swarm"
elif getenv("AUTOCONF_MODE", "no") == "yes":
bw_integration = "Autoconf"
elif integration_path.is_file():
bw_integration = integration_path.read_text(encoding="utf-8").strip()
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text(encoding="utf-8"):
bw_integration = "Docker"
token = getenv("CERTBOT_TOKEN", "")
# Cluster case
if bw_integration in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
if get_integration() in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
db = Database(logger, sqlalchemy_string=getenv("DATABASE_URI", None), pool=False)
lock = Lock()
with lock:
instances = db.get_instances()
for instance in instances:
api = API(
f"http://{instance['hostname']}:{instance['port']}",
host=instance["server_name"],
)
api = API(f"http://{instance['hostname']}:{instance['port']}", host=instance["server_name"])
sent, err, status, resp = api.request("DELETE", "/lets-encrypt/challenge", data={"token": token})
if not sent:
status = 1
logger.error(f"Can't send API request to {api.endpoint}/lets-encrypt/challenge : {err}")
elif status != 200:
status = 1
logger.error(
f"Error while sending API request to {api.endpoint}/lets-encrypt/challenge : status = {resp['status']}, msg = {resp['msg']}",
)
logger.error(f"Error while sending API request to {api.endpoint}/lets-encrypt/challenge : status = {resp['status']}, msg = {resp['msg']}")
else:
logger.info(
f"Successfully sent API request to {api.endpoint}/lets-encrypt/challenge",
)
logger.info(f"Successfully sent API request to {api.endpoint}/lets-encrypt/challenge")
# Linux case
else:
challenge_path = Path(
sep,
"var",
"tmp",
"bunkerweb",
"lets-encrypt",
".well-known",
"acme-challenge",
token,
)
challenge_path.unlink(missing_ok=True)
Path(sep, "var", "tmp", "bunkerweb", "lets-encrypt", ".well-known", "acme-challenge", token).unlink(missing_ok=True)
except:
status = 1
logger.error(f"Exception while running certbot-cleanup.py :\n{format_exc()}")

View file

@ -3,7 +3,6 @@
from io import BytesIO
from os import getenv, sep
from os.path import join
from pathlib import Path
from subprocess import DEVNULL, STDOUT, run
from sys import exit as sys_exit, path as sys_path
from tarfile import open as tar_open
@ -23,6 +22,7 @@ for deps_path in [
sys_path.append(deps_path)
from Database import Database # type: ignore
from jobs import get_integration # type: ignore
from logger import setup_logger # type: ignore
from API import API # type: ignore
@ -31,34 +31,17 @@ status = 0
try:
# Get env vars
bw_integration = "Linux"
integration_path = Path(sep, "usr", "share", "bunkerweb", "INTEGRATION")
os_release_path = Path(sep, "etc", "os-release")
if getenv("KUBERNETES_MODE", "no") == "yes":
bw_integration = "Kubernetes"
elif getenv("SWARM_MODE", "no") == "yes":
bw_integration = "Swarm"
elif getenv("AUTOCONF_MODE", "no") == "yes":
bw_integration = "Autoconf"
elif integration_path.is_file():
bw_integration = integration_path.read_text(encoding="utf-8").strip()
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text(encoding="utf-8"):
bw_integration = "Docker"
token = getenv("CERTBOT_TOKEN", "")
logger.info(f"Certificates renewal for {getenv('RENEWED_DOMAINS')} successful")
# Cluster case
if bw_integration in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
if get_integration() in ("Docker", "Swarm", "Kubernetes", "Autoconf"):
# Create tarball of /var/cache/bunkerweb/letsencrypt
tgz = BytesIO()
with tar_open(mode="w:gz", fileobj=tgz, compresslevel=3) as tf:
tf.add(
join(sep, "var", "cache", "bunkerweb", "letsencrypt", "etc"),
arcname="etc",
)
tf.add(join(sep, "var", "cache", "bunkerweb", "letsencrypt", "etc"), arcname="etc")
tgz.seek(0, 0)
files = {"archive.tar.gz": tgz}
@ -95,15 +78,7 @@ try:
logger.info(f"Successfully sent API request to {api.endpoint}/reload")
# Linux case
else:
if (
run(
[join(sep, "usr", "sbin", "nginx"), "-s", "reload"],
stdin=DEVNULL,
stderr=STDOUT,
check=False,
).returncode
!= 0
):
if run([join(sep, "usr", "sbin", "nginx"), "-s", "reload"], stdin=DEVNULL, stderr=STDOUT, check=False).returncode != 0:
status = 1
logger.error("Error while reloading nginx")
else:

View file

@ -9,8 +9,7 @@ from traceback import format_exc
from tarfile import open as tar_open
from io import BytesIO
from shutil import rmtree
from re import findall, MULTILINE
from typing import List, LiteralString, Tuple, Union
from re import MULTILINE, search
for deps_path in [
join(sep, "usr", "share", "bunkerweb", *paths)
@ -30,24 +29,31 @@ from jobs import get_file_in_db, set_file_in_db # type: ignore
logger = setup_logger("LETS-ENCRYPT.new", getenv("LOG_LEVEL", "INFO"))
status = 0
CERTBOT_BIN = join(sep, "usr", "share", "bunkerweb", "deps", "python", "bin", "certbot")
def certbot_new(domains: str, email: str, letsencrypt_path: Path, letsencrypt_job_path: Path) -> int:
LETS_ENCRYPT_PATH = Path(sep, "var", "cache", "bunkerweb", "letsencrypt")
LETS_ENCRYPT_JOBS_PATH = Path(sep, "usr", "share", "bunkerweb", "core", "letsencrypt", "jobs")
LETS_ENCRYPT_WORK_DIR = join(sep, "var", "lib", "bunkerweb", "letsencrypt")
LETS_ENCRYPT_LOGS_DIR = join(sep, "var", "log", "bunkerweb")
def certbot_new(domains: str, email: str, use_letsencrypt_staging: bool = False) -> int:
return run(
[
join(sep, "usr", "share", "bunkerweb", "deps", "python", "bin", "certbot"),
CERTBOT_BIN,
"certonly",
"--config-dir",
str(letsencrypt_path.joinpath("etc")),
LETS_ENCRYPT_PATH.joinpath("etc").as_posix(),
"--work-dir",
join(sep, "var", "lib", "bunkerweb", "letsencrypt"),
LETS_ENCRYPT_WORK_DIR,
"--logs-dir",
join(sep, "var", "log", "bunkerweb"),
LETS_ENCRYPT_LOGS_DIR,
"--manual",
"--preferred-challenges=http",
"--manual-auth-hook",
str(letsencrypt_job_path.joinpath("certbot-auth.py")),
LETS_ENCRYPT_JOBS_PATH.joinpath("certbot-auth.py").as_posix(),
"--manual-cleanup-hook",
str(letsencrypt_job_path.joinpath("certbot-cleanup.py")),
LETS_ENCRYPT_JOBS_PATH.joinpath("certbot-cleanup.py").as_posix(),
"-n",
"-d",
domains,
@ -56,43 +62,13 @@ def certbot_new(domains: str, email: str, letsencrypt_path: Path, letsencrypt_jo
"--agree-tos",
"--expand",
]
+ (["--staging"] if getenv("USE_LETS_ENCRYPT_STAGING", "no") == "yes" else []),
+ (["--staging"] if use_letsencrypt_staging else []),
stdin=DEVNULL,
stderr=STDOUT,
env=environ.copy() | {"PYTHONPATH": join(sep, "usr", "share", "bunkerweb", "deps", "python")},
).returncode
def certbot_check_domains(domains: Union[List[str], List[LiteralString]], letsencrypt_path: Path) -> Tuple[int, str]:
proc = run(
[
join(sep, "usr", "share", "bunkerweb", "deps", "python", "bin", "certbot"),
"certificates",
"--config-dir",
str(letsencrypt_path.joinpath("etc")),
"--work-dir",
join(sep, "var", "lib", "bunkerweb", "letsencrypt"),
"--logs-dir",
join(sep, "var", "log", "bunkerweb"),
],
stdin=DEVNULL,
stdout=PIPE,
stderr=STDOUT,
text=True,
env=environ.copy() | {"PYTHONPATH": join(sep, "usr", "share", "bunkerweb", "deps", "python")},
)
if proc.returncode != 0:
logger.error(f"Error while checking certificates :\n{proc.stdout}")
return 2, proc.stdout
first_needed_domain = domains[0]
needed_domains = set(domains)
for raw_domains in findall(r"^ Domains: (.*)$", proc.stdout, MULTILINE):
current_domains = raw_domains.split(" ")
if current_domains[0] == first_needed_domain and set(current_domains) == needed_domains:
return 1, proc.stdout
return 0, proc.stdout
status = 0
try:
@ -100,7 +76,7 @@ try:
use_letsencrypt = False
is_multisite = getenv("MULTISITE", "no") == "yes"
all_domains = getenv("SERVER_NAME", "")
server_names = getenv("SERVER_NAME", "").split(" ")
server_names = all_domains.split(" ")
if getenv("AUTO_LETS_ENCRYPT", "no") == "yes":
use_letsencrypt = True
@ -117,44 +93,27 @@ try:
logger.warning("There are no server names, skipping generation...")
_exit(0)
# Create directory if it doesn't exist
letsencrypt_path = Path(sep, "var", "cache", "bunkerweb", "letsencrypt")
letsencrypt_path.mkdir(parents=True, exist_ok=True)
letsencrypt_job_path = Path(sep, "usr", "share", "bunkerweb", "core", "letsencrypt", "jobs")
# Create directories if they doesn't exist
LETS_ENCRYPT_PATH.mkdir(parents=True, exist_ok=True)
Path(sep, "var", "lib", "bunkerweb", "letsencrypt").mkdir(parents=True, exist_ok=True)
# Get env vars
bw_integration = "Linux"
integration_path = Path(sep, "usr", "share", "bunkerweb", "INTEGRATION")
os_release_path = Path(sep, "etc", "os-release")
if getenv("KUBERNETES_MODE", "no") == "yes":
bw_integration = "Kubernetes"
elif getenv("SWARM_MODE", "no") == "yes":
bw_integration = "Swarm"
elif getenv("AUTOCONF_MODE", "no") == "yes":
bw_integration = "Autoconf"
elif integration_path.is_file():
bw_integration = integration_path.read_text(encoding="utf-8").strip()
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text(encoding="utf-8"):
bw_integration = "Docker"
# Extract letsencrypt folder if it exists in db
db = Database(logger, sqlalchemy_string=getenv("DATABASE_URI", None), pool=False)
db = Database(logger, sqlalchemy_string=getenv("DATABASE_URI"), pool=False)
tgz = get_file_in_db("folder.tgz", db, job_name="certbot-renew")
if tgz:
# Delete folder if needed
if letsencrypt_path.exists():
rmtree(str(letsencrypt_path), ignore_errors=True)
letsencrypt_path.mkdir(parents=True, exist_ok=True)
if LETS_ENCRYPT_PATH.exists():
rmtree(LETS_ENCRYPT_PATH, ignore_errors=True)
LETS_ENCRYPT_PATH.mkdir(parents=True, exist_ok=True)
# Extract it
with tar_open(name="folder.tgz", mode="r:gz", fileobj=BytesIO(tgz)) as tf:
tf.extractall(str(letsencrypt_path))
tf.extractall(LETS_ENCRYPT_PATH)
logger.info("Successfully retrieved Let's Encrypt data from db cache")
else:
logger.info("No Let's Encrypt data found in db cache")
domains_to_ask = []
# Multisite case
if is_multisite:
domains_sever_names = {}
@ -163,61 +122,58 @@ try:
if not first_server or getenv(f"{first_server}_AUTO_LETS_ENCRYPT", getenv("AUTO_LETS_ENCRYPT", "no")) != "yes":
continue
domains_sever_names[first_server] = getenv(f"{first_server}_SERVER_NAME", first_server)
ret, stdout = certbot_check_domains(" ".join(domains_sever_names.values()).split(" "), letsencrypt_path)
if ret == 1:
logger.info(f"Certificates already exists for all domain(s) {all_domains}")
else:
domains_to_ask = []
if ret == 2:
domains_to_ask = server_names
else:
valid_domains = findall(r"Domains: (.*)$", stdout, MULTILINE)
for first_server, domains in domains_sever_names.items():
if all(domain in valid_domains for domain in domains.split(" ")):
logger.info(f"Certificates already exists for domain(s) {domains}")
continue
domains_to_ask.append(first_server)
for first_server in domains_to_ask:
domains = getenv(f"{first_server}_SERVER_NAME", first_server)
real_email = getenv(f"{first_server}_EMAIL_LETS_ENCRYPT", getenv("EMAIL_LETS_ENCRYPT", f"contact@{first_server}"))
if not real_email:
real_email = f"contact@{first_server}"
logger.info(f"Asking certificates for domains : {domains} (email = {real_email}) ...")
if certbot_new(domains.replace(" ", ","), real_email, letsencrypt_path, letsencrypt_job_path) != 0:
status = 2
logger.error(f"Certificate generation failed for domain(s) {domains} ...")
continue
else:
status = 1 if status == 0 else status
logger.info(f"Certificate generation succeeded for domain(s) : {domains}")
# Singlesite case
else:
first_server = server_names[0]
domains_sever_names = {server_names[0]: all_domains}
if certbot_check_domains(all_domains.split(" "), letsencrypt_path)[0] == 1:
logger.info(f"Certificates already exists for domain(s) {all_domains}")
proc = run(
[CERTBOT_BIN, "certificates", "--config-dir", LETS_ENCRYPT_PATH.joinpath("etc").as_posix(), "--work-dir", LETS_ENCRYPT_WORK_DIR, "--logs-dir", LETS_ENCRYPT_LOGS_DIR],
stdin=DEVNULL,
stdout=PIPE,
stderr=STDOUT,
text=True,
env=environ.copy() | {"PYTHONPATH": join(sep, "usr", "share", "bunkerweb", "deps", "python")},
)
stdout = proc.stdout
if proc.returncode != 0:
logger.error(f"Error while checking certificates :\n{proc.stdout}")
domains_to_ask = server_names
else:
for first_server, domains in domains_sever_names.items():
current_domains = search(rf"Domains: {first_server}(?P<domains>.*)$", stdout, MULTILINE)
if not current_domains:
domains_to_ask.append(first_server)
continue
elif set(f"{first_server}{current_domains.groupdict()['domains']}".strip().split(" ")) != set(domains.split(" ")):
logger.warning(f"Domains for {first_server} are not the same as in the certificate, asking new certificate...")
domains_to_ask.append(first_server)
continue
logger.info(f"Certificates already exists for domain(s) {domains}")
for first_server, domains in domains_sever_names.items():
if first_server not in domains_to_ask:
continue
real_email = getenv(f"{first_server}_EMAIL_LETS_ENCRYPT", getenv("EMAIL_LETS_ENCRYPT", f"contact@{first_server}"))
if not real_email:
real_email = f"contact@{first_server}"
use_letsencrypt_staging = getenv(f"{first_server}_USE_LETS_ENCRYPT_STAGING", getenv("USE_LETS_ENCRYPT_STAGING", "no")) == "yes"
logger.info(f"Asking certificates for domain(s) : {domains} (email = {real_email}) to Let's Encrypt {'staging ' if use_letsencrypt_staging else ''}...")
if certbot_new(domains.replace(" ", ","), real_email, use_letsencrypt_staging) != 0:
status = 2
logger.error(f"Certificate generation failed for domain(s) {domains} ...")
continue
else:
real_email = getenv("EMAIL_LETS_ENCRYPT", f"contact@{first_server}")
if not real_email:
real_email = f"contact@{first_server}"
logger.info(f"Asking certificates for domain(s) : {all_domains} (email = {real_email}) ...")
if certbot_new(all_domains.replace(" ", ","), real_email, letsencrypt_path, letsencrypt_job_path) != 0:
status = 2
logger.error(f"Certificate generation failed for domain(s) : {all_domains}")
else:
status = 1
logger.info(f"Certificate generation succeeded for domain(s) : {all_domains}")
status = 1 if status == 0 else status
logger.info(f"Certificate generation succeeded for domain(s) : {domains}")
# Put new folder in cache
bio = BytesIO()
with tar_open("folder.tgz", mode="w:gz", fileobj=bio, compresslevel=9) as tgz:
tgz.add(str(letsencrypt_path), arcname=".")
tgz.add(LETS_ENCRYPT_PATH, arcname=".")
bio.seek(0, 0)
# Put tgz in cache

View file

@ -28,6 +28,8 @@ from jobs import get_file_in_db, set_file_in_db # type: ignore
logger = setup_logger("LETS-ENCRYPT.renew", getenv("LOG_LEVEL", "INFO"))
status = 0
LETS_ENCRYPT_PATH = Path(sep, "var", "cache", "bunkerweb", "letsencrypt")
try:
# Check if we're using let's encrypt
use_letsencrypt = False
@ -44,37 +46,21 @@ try:
_exit(0)
# Create directory if it doesn't exist
letsencrypt_path = Path(sep, "var", "cache", "bunkerweb", "letsencrypt")
letsencrypt_path.mkdir(parents=True, exist_ok=True)
LETS_ENCRYPT_PATH.mkdir(parents=True, exist_ok=True)
Path(sep, "var", "lib", "bunkerweb", "letsencrypt").mkdir(parents=True, exist_ok=True)
# Get env vars
bw_integration = "Linux"
integration_path = Path(sep, "usr", "share", "bunkerweb", "INTEGRATION")
os_release_path = Path(sep, "etc", "os-release")
if getenv("KUBERNETES_MODE", "no") == "yes":
bw_integration = "Kubernetes"
elif getenv("SWARM_MODE", "no") == "yes":
bw_integration = "Swarm"
elif getenv("AUTOCONF_MODE", "no") == "yes":
bw_integration = "Autoconf"
elif integration_path.is_file():
bw_integration = integration_path.read_text(encoding="utf-8").strip()
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text(encoding="utf-8"):
bw_integration = "Docker"
# Extract letsencrypt folder if it exists in db
db = Database(logger, sqlalchemy_string=getenv("DATABASE_URI", None), pool=False)
db = Database(logger, sqlalchemy_string=getenv("DATABASE_URI"), pool=False)
tgz = get_file_in_db("folder.tgz", db)
if tgz:
# Delete folder if needed
if letsencrypt_path.exists():
rmtree(str(letsencrypt_path), ignore_errors=True)
letsencrypt_path.mkdir(parents=True, exist_ok=True)
if LETS_ENCRYPT_PATH.exists():
rmtree(LETS_ENCRYPT_PATH, ignore_errors=True)
LETS_ENCRYPT_PATH.mkdir(parents=True, exist_ok=True)
# Extract it
with tar_open(name="folder.tgz", mode="r:gz", fileobj=BytesIO(tgz)) as tf:
tf.extractall(str(letsencrypt_path))
tf.extractall(LETS_ENCRYPT_PATH)
logger.info("Successfully retrieved Let's Encrypt data from db cache")
else:
logger.info("No Let's Encrypt data found in db cache")
@ -86,7 +72,7 @@ try:
"renew",
"--no-random-sleep-on-renew",
"--config-dir",
str(letsencrypt_path.joinpath("etc")),
LETS_ENCRYPT_PATH.joinpath("etc").as_posix(),
"--work-dir",
join(sep, "var", "lib", "bunkerweb", "letsencrypt"),
"--logs-dir",
@ -105,7 +91,7 @@ try:
# Put new folder in cache
bio = BytesIO()
with tar_open("folder.tgz", mode="w:gz", fileobj=bio, compresslevel=9) as tgz:
tgz.add(str(letsencrypt_path), arcname=".")
tgz.add(LETS_ENCRYPT_PATH, arcname=".")
bio.seek(0, 0)
# Put tgz in cache

View file

@ -217,19 +217,19 @@ def get_integration() -> str:
integration_path = Path(sep, "usr", "share", "bunkerweb", "INTEGRATION")
os_release_path = Path(sep, "etc", "os-release")
if getenv("KUBERNETES_MODE", "no").lower() == "yes":
return "kubernetes"
return "Kubernetes"
elif getenv("SWARM_MODE", "no").lower() == "yes":
return "swarm"
return "Swarm"
elif getenv("AUTOCONF_MODE", "no").lower() == "yes":
return "autoconf"
return "Autoconf"
elif integration_path.is_file():
return integration_path.read_text(encoding="utf-8").strip().lower()
elif os_release_path.is_file() and "Alpine" in os_release_path.read_text(encoding="utf-8"):
return "docker"
return "Docker"
return "linux"
return "Linux"
except:
return "unknown"
return "Unknown"
def get_os_info() -> Dict[str, str]: