From 61b9517a87156d53e050f6da5fd2239dc0448a0e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Th=C3=A9ophile=20Diot?= Date: Thu, 9 Mar 2023 10:05:35 +0100 Subject: [PATCH] Fix error when multiple jobs are trying to write in db at the same time --- .../core/blacklist/jobs/blacklist-download.py | 18 +++++----- .../core/bunkernet/jobs/bunkernet-data.py | 19 +++++----- .../core/bunkernet/jobs/bunkernet-register.py | 17 +++++---- .../core/customcert/jobs/custom-cert.py | 36 ++++++++++--------- .../core/greylist/jobs/greylist-download.py | 18 +++++----- src/common/core/jobs/jobs/download-plugins.py | 11 ++++-- src/common/core/jobs/jobs/mmdb-asn.py | 10 ++++-- src/common/core/jobs/jobs/mmdb-country.py | 10 ++++-- .../core/letsencrypt/jobs/certbot-auth.py | 7 +++- .../core/letsencrypt/jobs/certbot-cleanup.py | 7 +++- .../core/letsencrypt/jobs/certbot-deploy.py | 7 +++- .../core/letsencrypt/jobs/certbot-new.py | 30 +++++++++------- .../core/misc/jobs/default-server-cert.py | 1 - .../core/realip/jobs/realip-download.py | 18 ++++++---- .../core/selfsigned/jobs/self-signed.py | 22 ++++++------ .../core/whitelist/jobs/whitelist-download.py | 18 +++++----- 16 files changed, 151 insertions(+), 98 deletions(-) diff --git a/src/common/core/blacklist/jobs/blacklist-download.py b/src/common/core/blacklist/jobs/blacklist-download.py index a8d30b645..83c35f4d1 100755 --- a/src/common/core/blacklist/jobs/blacklist-download.py +++ b/src/common/core/blacklist/jobs/blacklist-download.py @@ -6,6 +6,7 @@ from os import _exit, getenv from pathlib import Path from re import IGNORECASE, compile as re_compile from sys import exit as sys_exit, path as sys_path +from threading import Lock from traceback import format_exc from typing import Tuple @@ -61,10 +62,10 @@ db = Database( logger, sqlalchemy_string=getenv("DATABASE_URI", None), ) +lock = Lock() status = 0 try: - # Check if at least a server has Blacklist activated blacklist_activated = False # Multisite case @@ -191,13 +192,14 @@ try: status = 2 else: # Update db - err = db.update_job_cache( - "blacklist-download", - None, - f"{kind}.list", - content, - checksum=new_hash, - ) + with lock: + err = db.update_job_cache( + "blacklist-download", + None, + f"{kind}.list", + content, + checksum=new_hash, + ) if err: logger.warning(f"Couldn't update db cache: {err}") diff --git a/src/common/core/bunkernet/jobs/bunkernet-data.py b/src/common/core/bunkernet/jobs/bunkernet-data.py index 22286aae2..69c628a10 100755 --- a/src/common/core/bunkernet/jobs/bunkernet-data.py +++ b/src/common/core/bunkernet/jobs/bunkernet-data.py @@ -3,6 +3,7 @@ from os import _exit, getenv from pathlib import Path from sys import exit as sys_exit, path as sys_path +from threading import Lock from traceback import format_exc sys_path.extend( @@ -24,10 +25,10 @@ db = Database( logger, sqlalchemy_string=getenv("DATABASE_URI", None), ) +lock = Lock() status = 0 try: - # Check if at least a server has BunkerNet activated bunkernet_activated = False # Multisite case @@ -123,13 +124,15 @@ try: _exit(2) # Update db - err = db.update_job_cache( - "bunkernet-data", - None, - "ip.list", - content, - checksum=new_hash, - ) + with lock: + err = db.update_job_cache( + "bunkernet-data", + None, + "ip.list", + content, + checksum=new_hash, + ) + if err: logger.warning(f"Couldn't update db ip.list cache: {err}") diff --git a/src/common/core/bunkernet/jobs/bunkernet-register.py b/src/common/core/bunkernet/jobs/bunkernet-register.py index e8dd72199..8ca04253a 100755 --- a/src/common/core/bunkernet/jobs/bunkernet-register.py +++ b/src/common/core/bunkernet/jobs/bunkernet-register.py @@ -3,6 +3,7 @@ from os import _exit, getenv from pathlib import Path from sys import exit as sys_exit, path as sys_path +from threading import Lock from time import sleep from traceback import format_exc @@ -24,10 +25,10 @@ db = Database( logger, sqlalchemy_string=getenv("DATABASE_URI", None), ) +lock = Lock() status = 0 try: - # Check if at least a server has BunkerNet activated bunkernet_activated = False # Multisite case @@ -153,12 +154,14 @@ try: Path("/var/cache/bunkerweb/bunkernet/instance.id").write_text(bunkernet_id) # Update db - err = db.update_job_cache( - "bunkernet-register", - None, - "instance.id", - bunkernet_id.encode("utf-8"), - ) + with lock: + err = db.update_job_cache( + "bunkernet-register", + None, + "instance.id", + bunkernet_id.encode("utf-8"), + ) + if err: logger.warning(f"Couldn't update db cache: {err}") else: diff --git a/src/common/core/customcert/jobs/custom-cert.py b/src/common/core/customcert/jobs/custom-cert.py index 322fd66fa..b720b9ffc 100644 --- a/src/common/core/customcert/jobs/custom-cert.py +++ b/src/common/core/customcert/jobs/custom-cert.py @@ -4,6 +4,7 @@ from os import getenv, makedirs from pathlib import Path from shutil import copy from sys import exit as sys_exit, path as sys_path +from threading import Lock from traceback import format_exc from typing import Optional @@ -24,6 +25,7 @@ db = Database( logger, sqlalchemy_string=getenv("DATABASE_URI", None), ) +lock = Lock() def check_cert(cert_path, key_path, first_server: Optional[str] = None) -> bool: @@ -80,13 +82,14 @@ def check_cert(cert_path, key_path, first_server: Optional[str] = None) -> bool: copy(key_path, key_cache_path.replace(".hash", "")) with open(key_path, "r") as f: - err = db.update_job_cache( - "custom-cert", - first_server, - key_cache_path.replace(".hash", "").split("/")[-1], - f.read().encode("utf-8"), - checksum=key_hash, - ) + with lock: + err = db.update_job_cache( + "custom-cert", + first_server, + key_cache_path.replace(".hash", "").split("/")[-1], + f.read().encode("utf-8"), + checksum=key_hash, + ) if err: logger.warning( @@ -94,13 +97,14 @@ def check_cert(cert_path, key_path, first_server: Optional[str] = None) -> bool: ) with open(cert_path, "r") as f: - err = db.update_job_cache( - "custom-cert", - first_server, - cert_cache_path.replace(".hash", "").split("/")[-1], - f.read().encode("utf-8"), - checksum=cert_hash, - ) + with lock: + err = db.update_job_cache( + "custom-cert", + first_server, + cert_cache_path.replace(".hash", "").split("/")[-1], + f.read().encode("utf-8"), + checksum=cert_hash, + ) if err: logger.warning( @@ -129,9 +133,7 @@ try: for first_server in servers: if not first_server or ( - getenv( - f"{first_server}_USE_CUSTOM_SSL", getenv("USE_CUSTOM_SSL", "no") - ) + getenv(f"{first_server}_USE_CUSTOM_SSL", getenv("USE_CUSTOM_SSL", "no")) != "yes" ): continue diff --git a/src/common/core/greylist/jobs/greylist-download.py b/src/common/core/greylist/jobs/greylist-download.py index 54d627dc5..6c1f7f2d2 100755 --- a/src/common/core/greylist/jobs/greylist-download.py +++ b/src/common/core/greylist/jobs/greylist-download.py @@ -6,6 +6,7 @@ from os import _exit, getenv from pathlib import Path from re import IGNORECASE, compile as re_compile from sys import exit as sys_exit, path as sys_path +from threading import Lock from traceback import format_exc from typing import Tuple @@ -61,10 +62,10 @@ db = Database( logger, sqlalchemy_string=getenv("DATABASE_URI", None), ) +lock = Lock() status = 0 try: - # Check if at least a server has Greylist activated greylist_activated = False # Multisite case @@ -175,13 +176,14 @@ try: status = 2 else: # Update db - err = db.update_job_cache( - "greylist-download", - None, - f"{kind}.list", - content, - checksum=new_hash, - ) + with lock: + err = db.update_job_cache( + "greylist-download", + None, + f"{kind}.list", + content, + checksum=new_hash, + ) if err: logger.warning(f"Couldn't update db cache: {err}") diff --git a/src/common/core/jobs/jobs/download-plugins.py b/src/common/core/jobs/jobs/download-plugins.py index 54266b0e7..4bab33994 100644 --- a/src/common/core/jobs/jobs/download-plugins.py +++ b/src/common/core/jobs/jobs/download-plugins.py @@ -6,6 +6,7 @@ from os.path import dirname, join from pathlib import Path from stat import S_IEXEC from sys import exit as sys_exit, path as sys_path +from threading import Lock from uuid import uuid4 from glob import glob from json import load, loads @@ -32,6 +33,7 @@ db = Database( logger, sqlalchemy_string=getenv("DATABASE_URI", None), ) +lock = Lock() status = 0 @@ -54,7 +56,6 @@ def install_plugin(plugin_dir): try: - # Check if we have plugins to download plugin_urls = getenv("EXTERNAL_PLUGIN_URLS", "") if not plugin_urls: @@ -109,7 +110,9 @@ try: external_plugins.append(plugin_file) external_plugins_ids.append(plugin_file["id"]) - db_plugins = db.get_plugins() + with lock: + db_plugins = db.get_plugins() + for plugin in db_plugins: if plugin["external"] is True and plugin["id"] not in external_plugins_ids: external_plugins.append(plugin) @@ -121,7 +124,9 @@ try: chmod(join(root, name), 0o770) if external_plugins: - err = db.update_external_plugins(external_plugins) + with lock: + err = db.update_external_plugins(external_plugins) + if err: logger.error( f"Couldn't update external plugins to database: {err}", diff --git a/src/common/core/jobs/jobs/mmdb-asn.py b/src/common/core/jobs/jobs/mmdb-asn.py index 08d50a310..65bbae31b 100755 --- a/src/common/core/jobs/jobs/mmdb-asn.py +++ b/src/common/core/jobs/jobs/mmdb-asn.py @@ -5,6 +5,7 @@ from gzip import decompress from os import _exit, getenv from pathlib import Path from sys import exit as sys_exit, path as sys_path +from threading import Lock from traceback import format_exc sys_path.extend( @@ -27,6 +28,7 @@ db = Database( logger, sqlalchemy_string=getenv("DATABASE_URI", None), ) +lock = Lock() status = 0 try: @@ -68,9 +70,11 @@ try: _exit(2) # Update db - err = db.update_job_cache( - "mmdb-asn", None, "asn.mmdb", resp.content, checksum=new_hash - ) + with lock: + err = db.update_job_cache( + "mmdb-asn", None, "asn.mmdb", resp.content, checksum=new_hash + ) + if err: logger.warning(f"Couldn't update db cache: {err}") diff --git a/src/common/core/jobs/jobs/mmdb-country.py b/src/common/core/jobs/jobs/mmdb-country.py index 9c623c37b..42956bc8d 100755 --- a/src/common/core/jobs/jobs/mmdb-country.py +++ b/src/common/core/jobs/jobs/mmdb-country.py @@ -5,6 +5,7 @@ from gzip import decompress from os import _exit, getenv from pathlib import Path from sys import exit as sys_exit, path as sys_path +from threading import Lock from traceback import format_exc sys_path.extend( @@ -27,6 +28,7 @@ db = Database( logger, sqlalchemy_string=getenv("DATABASE_URI", None), ) +lock = Lock() status = 0 try: @@ -70,9 +72,11 @@ try: _exit(2) # Update db - err = db.update_job_cache( - "mmdb-country", None, "country.mmdb", resp.content, checksum=new_hash - ) + with lock: + err = db.update_job_cache( + "mmdb-country", None, "country.mmdb", resp.content, checksum=new_hash + ) + if err: logger.warning(f"Couldn't update db cache: {err}") diff --git a/src/common/core/letsencrypt/jobs/certbot-auth.py b/src/common/core/letsencrypt/jobs/certbot-auth.py index 02065121d..97da4b8f8 100755 --- a/src/common/core/letsencrypt/jobs/certbot-auth.py +++ b/src/common/core/letsencrypt/jobs/certbot-auth.py @@ -3,6 +3,7 @@ from os import getenv, makedirs from pathlib import Path from sys import exit as sys_exit, path as sys_path +from threading import Lock from traceback import format_exc sys_path.extend( @@ -23,6 +24,7 @@ db = Database( logger, sqlalchemy_string=getenv("DATABASE_URI", None), ) +lock = Lock() status = 0 try: @@ -42,7 +44,10 @@ try: # Cluster case if bw_integration in ("Docker", "Swarm", "Kubernetes", "Autoconf"): - for instance in db.get_instances(): + with lock: + instances = db.get_instances() + + for instance in instances: endpoint = f"http://{instance['hostname']}:{instance['port']}" host = instance["server_name"] api = API(endpoint, host=host) diff --git a/src/common/core/letsencrypt/jobs/certbot-cleanup.py b/src/common/core/letsencrypt/jobs/certbot-cleanup.py index 10c5d099c..6f2fae3cd 100755 --- a/src/common/core/letsencrypt/jobs/certbot-cleanup.py +++ b/src/common/core/letsencrypt/jobs/certbot-cleanup.py @@ -4,6 +4,7 @@ from os import getenv from os.path import isfile from pathlib import Path from sys import exit as sys_exit, path as sys_path +from threading import Lock from traceback import format_exc sys_path.extend( @@ -24,6 +25,7 @@ db = Database( logger, sqlalchemy_string=getenv("DATABASE_URI", None), ) +lock = Lock() status = 0 try: @@ -42,7 +44,10 @@ try: # Cluster case if bw_integration in ("Docker", "Swarm", "Kubernetes", "Autoconf"): - for instance in db.get_instances(): + with lock: + instances = db.get_instances() + + for instance in instances: endpoint = f"http://{instance['hostname']}:{instance['port']}" host = instance["server_name"] api = API(endpoint, host=host) diff --git a/src/common/core/letsencrypt/jobs/certbot-deploy.py b/src/common/core/letsencrypt/jobs/certbot-deploy.py index f90319b04..0ec020352 100755 --- a/src/common/core/letsencrypt/jobs/certbot-deploy.py +++ b/src/common/core/letsencrypt/jobs/certbot-deploy.py @@ -8,6 +8,7 @@ from shutil import chown from subprocess import run, DEVNULL, STDOUT from sys import exit as sys_exit, path as sys_path from tarfile import open as tar_open +from threading import Lock from traceback import format_exc sys_path.extend( @@ -28,6 +29,7 @@ db = Database( logger, sqlalchemy_string=getenv("DATABASE_URI", None), ) +lock = Lock() status = 0 try: @@ -62,7 +64,10 @@ try: tgz.seek(0, 0) files = {"archive.tar.gz": tgz} - for instance in db.get_instances(): + with lock: + instances = db.get_instances() + + for instance in instances: endpoint = f"http://{instance['hostname']}:{instance['port']}" host = instance["server_name"] api = API(endpoint, host=host) diff --git a/src/common/core/letsencrypt/jobs/certbot-new.py b/src/common/core/letsencrypt/jobs/certbot-new.py index 7878d86b3..c9b205ce0 100755 --- a/src/common/core/letsencrypt/jobs/certbot-new.py +++ b/src/common/core/letsencrypt/jobs/certbot-new.py @@ -4,6 +4,7 @@ from os import environ, getenv from pathlib import Path from subprocess import DEVNULL, STDOUT, run from sys import exit as sys_exit, path as sys_path +from threading import Lock from traceback import format_exc sys_path.extend( @@ -22,6 +23,7 @@ db = Database( logger, sqlalchemy_string=getenv("DATABASE_URI", None), ) +lock = Lock() status = 0 @@ -102,12 +104,14 @@ try: ).read_bytes() # Update db - err = db.update_job_cache( - "certbot-new", - first_server, - "cert.pem", - cert, - ) + with lock: + err = db.update_job_cache( + "certbot-new", + first_server, + "cert.pem", + cert, + ) + if err: logger.warning(f"Couldn't update db cache: {err}") @@ -140,12 +144,14 @@ try: ).read_bytes() # Update db - err = db.update_job_cache( - "certbot-new", - first_server, - "cert.pem", - cert, - ) + with lock: + err = db.update_job_cache( + "certbot-new", + first_server, + "cert.pem", + cert, + ) + if err: logger.warning(f"Couldn't update db cache: {err}") except: diff --git a/src/common/core/misc/jobs/default-server-cert.py b/src/common/core/misc/jobs/default-server-cert.py index f23133b3c..c07613c6c 100644 --- a/src/common/core/misc/jobs/default-server-cert.py +++ b/src/common/core/misc/jobs/default-server-cert.py @@ -19,7 +19,6 @@ logger = setup_logger("DEFAULT-SERVER-CERT", getenv("LOG_LEVEL", "INFO")) status = 0 try: - # Check if we need to generate a self-signed default cert for non-SNI "clients" need_default_cert = False if getenv("MULTISITE", "no") == "yes": diff --git a/src/common/core/realip/jobs/realip-download.py b/src/common/core/realip/jobs/realip-download.py index d2d885c3f..fc69cb351 100755 --- a/src/common/core/realip/jobs/realip-download.py +++ b/src/common/core/realip/jobs/realip-download.py @@ -5,6 +5,7 @@ from ipaddress import ip_address, ip_network from os import _exit, getenv from pathlib import Path from sys import exit as sys_exit, path as sys_path +from threading import Lock from traceback import format_exc sys_path.extend( @@ -39,6 +40,7 @@ db = Database( logger, sqlalchemy_string=getenv("DATABASE_URI", None), ) +lock = Lock() status = 0 try: @@ -125,13 +127,15 @@ try: _exit(2) # Update db - err = db.update_job_cache( - "realip-download", - None, - "combined.list", - content, - checksum=new_hash, - ) + with lock: + err = db.update_job_cache( + "realip-download", + None, + "combined.list", + content, + checksum=new_hash, + ) + if err: logger.warning(f"Couldn't update db cache: {err}") diff --git a/src/common/core/selfsigned/jobs/self-signed.py b/src/common/core/selfsigned/jobs/self-signed.py index 38b9f239d..b2f6c1a3e 100755 --- a/src/common/core/selfsigned/jobs/self-signed.py +++ b/src/common/core/selfsigned/jobs/self-signed.py @@ -4,6 +4,7 @@ from os import getenv from pathlib import Path from subprocess import DEVNULL, STDOUT, run from sys import exit as sys_exit, path as sys_path +from threading import Lock from traceback import format_exc sys_path.extend( @@ -22,6 +23,7 @@ db = Database( logger, sqlalchemy_string=getenv("DATABASE_URI", None), ) +lock = Lock() def generate_cert(first_server, days, subj): @@ -40,22 +42,22 @@ def generate_cert(first_server, days, subj): return False, 2 # Update db - with open(f"/var/cache/bunkerweb/selfsigned/{first_server}.key", "r") as f: - key_data = f.read().encode("utf-8") + key_data = Path(f"/var/cache/bunkerweb/selfsigned/{first_server}.key").read_bytes() - err = db.update_job_cache( - "self-signed", first_server, f"{first_server}.key", key_data - ) + with lock: + err = db.update_job_cache( + "self-signed", first_server, f"{first_server}.key", key_data + ) if err: logger.warning(f"Couldn't update db cache for {first_server}.key file: {err}") - with open(f"/var/cache/bunkerweb/selfsigned/{first_server}.pem", "r") as f: - pem_data = f.read().encode("utf-8") + pem_data = Path(f"/var/cache/bunkerweb/selfsigned/{first_server}.pem").read_bytes() - err = db.update_job_cache( - "self-signed", first_server, f"{first_server}.pem", pem_data - ) + with lock: + err = db.update_job_cache( + "self-signed", first_server, f"{first_server}.pem", pem_data + ) if err: logger.warning(f"Couldn't update db cache for {first_server}.pem file: {err}") diff --git a/src/common/core/whitelist/jobs/whitelist-download.py b/src/common/core/whitelist/jobs/whitelist-download.py index 134943514..deb622521 100755 --- a/src/common/core/whitelist/jobs/whitelist-download.py +++ b/src/common/core/whitelist/jobs/whitelist-download.py @@ -6,6 +6,7 @@ from os import _exit, getenv from pathlib import Path from re import IGNORECASE, compile as re_compile from sys import exit as sys_exit, path as sys_path +from threading import Lock from traceback import format_exc from typing import Tuple @@ -61,10 +62,10 @@ db = Database( logger, sqlalchemy_string=getenv("DATABASE_URI", None), ) +lock = Lock() status = 0 try: - # Check if at least a server has Whitelist activated whitelist_activated = False # Multisite case @@ -175,13 +176,14 @@ try: status = 2 else: # Update db - err = db.update_job_cache( - "whitelist-download", - None, - f"{kind}.list", - content, - checksum=new_hash, - ) + with lock: + err = db.update_job_cache( + "whitelist-download", + None, + f"{kind}.list", + content, + checksum=new_hash, + ) if err: logger.warning(f"Couldn't update db cache: {err}")