Enhance plugin download functionality by adding file type detection and improving error handling for ZIP and TAR extraction

This commit is contained in:
Théophile Diot 2024-12-23 17:17:52 +01:00
parent 0924671330
commit 511e79aae8
No known key found for this signature in database
GPG key ID: FA995104A0BA376A
2 changed files with 124 additions and 75 deletions

View file

@ -2,6 +2,7 @@
from io import BytesIO
from itertools import chain
from mimetypes import guess_type
from os import getenv, sep
from os.path import join
from pathlib import Path
@ -10,8 +11,8 @@ from sys import exit as sys_exit, path as sys_path
from uuid import uuid4
from json import JSONDecodeError, load as json_load, loads
from shutil import copytree, rmtree
from tarfile import open as tar_open
from zipfile import ZipFile
from tarfile import TarError, open as tar_open
from zipfile import BadZipFile, ZipFile
for deps_path in [
join(sep, "usr", "share", "bunkerweb", *paths)
@ -95,60 +96,81 @@ try:
# Loop on URLs
LOGGER.info(f"Downloading external plugins from {plugin_urls}...")
for plugin_url in plugin_urls.split(" "):
# Download Plugin file
try:
if plugin_urls.startswith("file://"):
content = Path(plugin_urls[7:]).read_bytes()
else:
content = b""
resp = get(
plugin_url,
headers={"User-Agent": "BunkerWeb"},
stream=True,
timeout=30,
)
with BytesIO() as content:
# Download Plugin file
try:
if plugin_urls.startswith("file://"):
content = Path(plugin_urls[7:]).read_bytes()
else:
resp = get(plugin_url, headers={"User-Agent": "BunkerWeb"}, stream=True, timeout=30)
if resp.status_code != 200:
LOGGER.warning(f"Got status code {resp.status_code}, skipping...")
if resp.status_code != 200:
LOGGER.warning(f"Got status code {resp.status_code}, skipping...")
continue
# Iterate over the response content in chunks
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
content.write(chunk)
content.seek(0)
except BaseException as e:
LOGGER.error(f"Exception while downloading plugin(s) from {plugin_url} :\n{e}")
status = 2
continue
# Extract it to tmp folder
temp_dir = TMP_DIR.joinpath(str(uuid4()))
try:
temp_dir.mkdir(parents=True, exist_ok=True)
# Detect file type
file_type = Magic(mime=True).from_buffer(content.getvalue())
LOGGER.debug(f"Detected file type: {file_type}")
# Fallback to file extension detection
if file_type == "application/octet-stream":
file_type = guess_type(plugin_url)[0] or "application/octet-stream"
LOGGER.debug(f"Guessed file type from URL: {file_type}")
content.seek(0)
# Handle ZIP files
if file_type == "application/zip" or plugin_url.endswith(".zip"):
try:
with ZipFile(content) as zf:
zf.extractall(path=temp_dir)
LOGGER.info(f"Successfully extracted ZIP file to {temp_dir}")
except BadZipFile as e:
LOGGER.error(f"Invalid ZIP file: {e}")
continue
# Handle TAR files (all compression types)
elif file_type.startswith("application/x-tar") or plugin_url.endswith((".tar", ".tar.gz", ".tgz", ".tar.bz2", ".tbz2", ".tar.xz", ".txz")):
try:
# Detect the appropriate tar mode
tar_mode = "r"
if plugin_url.endswith(".gz") or file_type == "application/gzip":
tar_mode = "r:gz"
elif plugin_url.endswith(".bz2"):
tar_mode = "r:bz2"
elif plugin_url.endswith(".xz"):
tar_mode = "r:xz"
with tar_open(fileobj=content, mode=tar_mode) as tar:
tar.extractall(path=temp_dir)
LOGGER.info(f"Successfully extracted TAR file to {temp_dir}")
except TarError as e:
LOGGER.error(f"Invalid TAR file: {e}")
continue
else:
LOGGER.error(f"Unknown file type for {plugin_url}, either ZIP or TAR is supported, skipping...")
continue
# Iterate over the response content in chunks
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
content += chunk
except BaseException as e:
LOGGER.error(f"Exception while downloading plugin(s) from {plugin_url} :\n{e}")
status = 2
continue
# Extract it to tmp folder
temp_dir = TMP_DIR.joinpath(str(uuid4()))
try:
temp_dir.mkdir(parents=True, exist_ok=True)
file_type = Magic(mime=True).from_buffer(content)
if file_type == "application/zip":
with ZipFile(BytesIO(content)) as zf:
zf.extractall(path=temp_dir)
elif file_type == "application/gzip":
with tar_open(fileobj=BytesIO(content), mode="r:gz") as tar:
try:
tar.extractall(path=temp_dir, filter="data")
except TypeError:
tar.extractall(path=temp_dir)
elif file_type == "application/x-tar":
with tar_open(fileobj=BytesIO(content), mode="r") as tar:
try:
tar.extractall(path=temp_dir, filter="data")
except TypeError:
tar.extractall(path=temp_dir)
else:
LOGGER.error(f"Unknown file type for {plugin_url}, either zip or tar are supported, skipping...")
except Exception as e:
LOGGER.error(f"Exception while decompressing plugin(s) from {plugin_url}:\n{e}")
continue
except BaseException as e:
LOGGER.error(f"Exception while decompressing plugin(s) from {plugin_url} :\n{e}")
status = 2
continue
# Install plugins
try:

View file

@ -1,6 +1,7 @@
#!/usr/bin/env python3
from io import BytesIO
from mimetypes import guess_type
from os import getenv, sep
from os.path import join
from pathlib import Path
@ -11,8 +12,8 @@ from typing import Dict, Set
from uuid import uuid4
from json import dumps
from shutil import copy, copytree, move, rmtree
from tarfile import open as tar_open
from zipfile import ZipFile
from tarfile import TarError, open as tar_open
from zipfile import BadZipFile, ZipFile
for deps_path in [
join(sep, "usr", "share", "bunkerweb", *paths)
@ -113,49 +114,75 @@ try:
with BytesIO() as content:
try:
# Download the file
resp = get(crs_plugin_url, headers={"User-Agent": "BunkerWeb"}, stream=True, timeout=5)
if resp.status_code != 200:
LOGGER.warning(f"Got status code {resp.status_code}, skipping download of plugin(s) with URL {crs_plugin_url}...")
continue
# Iterate over the response content in chunks
# Write content to BytesIO
for chunk in resp.iter_content(chunk_size=8192):
if chunk:
content.write(chunk)
content.seek(0)
except BaseException as e:
LOGGER.error(f"Exception while downloading plugin(s) with URL {crs_plugin_url} :\n{e}")
except Exception as e:
LOGGER.error(f"Exception while downloading plugin(s) with URL {crs_plugin_url}:\n{e}")
continue
# Extract it to tmp folder
temp_dir = TMP_DIR.joinpath(str(uuid4()))
try:
temp_dir.mkdir(parents=True, exist_ok=True)
# Detect file type
file_type = Magic(mime=True).from_buffer(content.getvalue())
LOGGER.debug(f"Detected file type: {file_type}")
# Fallback to file extension detection
if file_type == "application/octet-stream":
file_type = guess_type(crs_plugin_url)[0] or "application/octet-stream"
LOGGER.debug(f"Guessed file type from URL: {file_type}")
content.seek(0)
if file_type == "application/zip":
with ZipFile(content) as zf:
zf.extractall(path=temp_dir)
elif file_type == "application/gzip":
with tar_open(fileobj=content, mode="r:gz") as tar:
try:
tar.extractall(path=temp_dir, filter="data")
except TypeError:
tar.extractall(path=temp_dir)
elif file_type == "application/x-tar":
with tar_open(fileobj=content, mode="r") as tar:
try:
tar.extractall(path=temp_dir, filter="data")
except TypeError:
# Handle ZIP files
if file_type == "application/zip" or crs_plugin_url.endswith(".zip"):
try:
with ZipFile(content) as zf:
zf.extractall(path=temp_dir)
LOGGER.info(f"Successfully extracted ZIP file to {temp_dir}")
except BadZipFile as e:
LOGGER.error(f"Invalid ZIP file: {e}")
continue
# Handle TAR files (all compression types)
elif file_type.startswith("application/x-tar") or crs_plugin_url.endswith(
(".tar", ".tar.gz", ".tgz", ".tar.bz2", ".tbz2", ".tar.xz", ".txz")
):
try:
# Detect the appropriate tar mode
tar_mode = "r"
if crs_plugin_url.endswith(".gz") or file_type == "application/gzip":
tar_mode = "r:gz"
elif crs_plugin_url.endswith(".bz2"):
tar_mode = "r:bz2"
elif crs_plugin_url.endswith(".xz"):
tar_mode = "r:xz"
with tar_open(fileobj=content, mode=tar_mode) as tar:
tar.extractall(path=temp_dir)
LOGGER.info(f"Successfully extracted TAR file to {temp_dir}")
except TarError as e:
LOGGER.error(f"Invalid TAR file: {e}")
continue
else:
LOGGER.error(f"Unknown file type for {crs_plugin_url}, either zip or tar are supported, skipping...")
LOGGER.error(f"Unknown file type for {crs_plugin_url}, either ZIP or TAR is supported, skipping...")
continue
except BaseException as e:
LOGGER.error(f"Exception while decompressing plugin(s) from {crs_plugin_url} :\n{e}")
except Exception as e:
LOGGER.error(f"Exception while decompressing plugin(s) from {crs_plugin_url}:\n{e}")
continue
plugin_name = ""