mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
add RUF and BLE rulesets; ignore some broad exceptions (BLE001) and RUF012
Signed-off-by: E3E <ntanzill@purdue.edu>
This commit is contained in:
parent
c6256875f0
commit
52601e2bd8
13 changed files with 33 additions and 30 deletions
|
|
@ -106,7 +106,7 @@ def add_target(self, role: str, targetpath: str) -> bool:
|
|||
with self.edit_targets(role) as delegated:
|
||||
delegated.targets[targetpath] = targetfile
|
||||
|
||||
except Exception as e:
|
||||
except Exception as e: # noqa: BLE001
|
||||
print(f"Failed to submit new {role} with added target: {e}")
|
||||
return False
|
||||
|
||||
|
|
|
|||
|
|
@ -84,6 +84,7 @@ line-length=80
|
|||
select = [
|
||||
"A", # flake8-builtins
|
||||
"B", # flake8-bugbear
|
||||
"BLE", # flake8-blind-except
|
||||
"C4", # flake8-comprehensions
|
||||
"D", # pydocstyle
|
||||
"DTZ", # flake8-datetimez
|
||||
|
|
@ -94,6 +95,7 @@ select = [
|
|||
"N", # pep8-naming
|
||||
"PL", # pylint
|
||||
"RET", # flake8-return
|
||||
"RUF", # ruff-specific rules
|
||||
"S", # flake8-bandit
|
||||
"SIM", # flake8-simplify
|
||||
"UP", # pyupgrade
|
||||
|
|
@ -107,9 +109,10 @@ ignore = [
|
|||
|
||||
[tool.ruff.lint.per-file-ignores]
|
||||
"tests/*" = [
|
||||
"D", # pydocstyle: no docstrings required for tests
|
||||
"E501", # line-too-long: embedded test data in "fmt: off" blocks is ok
|
||||
"S", # bandit: Not running bandit on tests
|
||||
"D", # pydocstyle: no docstrings required for tests
|
||||
"E501", # line-too-long: embedded test data in "fmt: off" blocks is ok
|
||||
"S", # bandit: Not running bandit on tests
|
||||
"RUF012", # ruff: mutable-class-default
|
||||
]
|
||||
"examples/*/*" = [
|
||||
"D", # pydocstyle: no docstrings required for examples
|
||||
|
|
|
|||
|
|
@ -46,7 +46,7 @@ def setUpClass(cls) -> None:
|
|||
|
||||
cls.url_prefix = (
|
||||
f"http://{utils.TEST_HOST_ADDRESS}:"
|
||||
f"{str(cls.server_process_handler.port)}"
|
||||
f"{cls.server_process_handler.port!s}"
|
||||
)
|
||||
target_filename = os.path.basename(cls.target_file.name)
|
||||
cls.url = f"{cls.url_prefix}/{target_filename}"
|
||||
|
|
|
|||
|
|
@ -447,7 +447,7 @@ def test_new_timestamp_fast_forward_recovery(self) -> None:
|
|||
self._assert_version_equals(Timestamp.type, 1)
|
||||
|
||||
def test_new_snapshot_hash_mismatch(self) -> None:
|
||||
# Check against timestamp role’s snapshot hash
|
||||
# Check against timestamp role's snapshot hash
|
||||
|
||||
# Update timestamp with snapshot's hashes
|
||||
self.sim.compute_metafile_hashes_length = True
|
||||
|
|
@ -477,7 +477,7 @@ def test_new_snapshot_unsigned(self) -> None:
|
|||
self._assert_files_exist([Root.type, Timestamp.type])
|
||||
|
||||
def test_new_snapshot_version_mismatch(self) -> None:
|
||||
# Check against timestamp role’s snapshot version
|
||||
# Check against timestamp role's snapshot version
|
||||
|
||||
# Increase snapshot version without updating timestamp
|
||||
self.sim.snapshot.version += 1
|
||||
|
|
@ -544,7 +544,7 @@ def test_new_snapshot_expired(self) -> None:
|
|||
self._assert_files_exist([Root.type, Timestamp.type])
|
||||
|
||||
def test_new_targets_hash_mismatch(self) -> None:
|
||||
# Check against snapshot role’s targets hashes
|
||||
# Check against snapshot role's targets hashes
|
||||
|
||||
# Update snapshot with target's hashes
|
||||
self.sim.compute_metafile_hashes_length = True
|
||||
|
|
@ -575,7 +575,7 @@ def test_new_targets_unsigned(self) -> None:
|
|||
self._assert_files_exist([Root.type, Timestamp.type, Snapshot.type])
|
||||
|
||||
def test_new_targets_version_mismatch(self) -> None:
|
||||
# Check against snapshot role’s targets version
|
||||
# Check against snapshot role's targets version
|
||||
|
||||
# Increase targets version without updating snapshot
|
||||
self.sim.targets.version += 1
|
||||
|
|
|
|||
|
|
@ -36,7 +36,7 @@ def can_connect(port: int) -> bool:
|
|||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||||
sock.connect(("localhost", port))
|
||||
return True
|
||||
except Exception:
|
||||
except Exception: # noqa: BLE001
|
||||
return False
|
||||
finally:
|
||||
# The process will always enter in finally even after return.
|
||||
|
|
|
|||
|
|
@ -226,7 +226,7 @@ def _start_process(self, extra_cmd_args: List[str], popen_cwd: str) -> None:
|
|||
"""Starts the process running the server."""
|
||||
|
||||
# The "-u" option forces stdin, stdout and stderr to be unbuffered.
|
||||
command = [sys.executable, "-u", self.server] + extra_cmd_args
|
||||
command = [sys.executable, "-u", self.server, *extra_cmd_args]
|
||||
|
||||
# Reusing one subprocess in multiple tests, but split up the logs
|
||||
# for each.
|
||||
|
|
@ -344,7 +344,7 @@ def flush_log(self) -> None:
|
|||
|
||||
if len(self.__logged_messages) > 0:
|
||||
title = "Test server (" + self.server + ") output:\n"
|
||||
message = [title] + self.__logged_messages
|
||||
message = [title, *self.__logged_messages]
|
||||
self.__logger.info("| ".join(message))
|
||||
self.__logged_messages = []
|
||||
|
||||
|
|
|
|||
|
|
@ -606,7 +606,7 @@ def get_delegated_role(self, delegated_role: str) -> Role:
|
|||
|
||||
return self.roles[delegated_role]
|
||||
|
||||
def get_key(self, keyid: str) -> Key: # noqa: D102
|
||||
def get_key(self, keyid: str) -> Key:
|
||||
if keyid not in self.keys:
|
||||
raise ValueError(f"Key {keyid} not found")
|
||||
|
||||
|
|
@ -1778,7 +1778,7 @@ def get_delegated_role(self, delegated_role: str) -> Role:
|
|||
|
||||
return role
|
||||
|
||||
def get_key(self, keyid: str) -> Key: # noqa: D102
|
||||
def get_key(self, keyid: str) -> Key:
|
||||
if self.delegations is None:
|
||||
raise ValueError("No delegations found")
|
||||
if keyid not in self.delegations.keys:
|
||||
|
|
|
|||
|
|
@ -83,7 +83,7 @@ def from_bytes(cls, data: bytes) -> "SimpleEnvelope[T]":
|
|||
envelope_dict = json.loads(data.decode())
|
||||
envelope = SimpleEnvelope.from_dict(envelope_dict)
|
||||
|
||||
except Exception as e:
|
||||
except Exception as e: # noqa: BLE001
|
||||
raise DeserializationError from e
|
||||
|
||||
return envelope
|
||||
|
|
@ -103,7 +103,7 @@ def to_bytes(self) -> bytes:
|
|||
envelope_dict = self.to_dict()
|
||||
json_bytes = json.dumps(envelope_dict).encode()
|
||||
|
||||
except Exception as e:
|
||||
except Exception as e: # noqa: BLE001
|
||||
raise SerializationError from e
|
||||
|
||||
return json_bytes
|
||||
|
|
@ -123,7 +123,7 @@ def from_signed(cls, signed: T) -> "SimpleEnvelope[T]":
|
|||
signed_dict = signed.to_dict()
|
||||
json_bytes = json.dumps(signed_dict).encode()
|
||||
|
||||
except Exception as e:
|
||||
except Exception as e: # noqa: BLE001
|
||||
raise SerializationError from e
|
||||
|
||||
return cls(json_bytes, cls._DEFAULT_PAYLOAD_TYPE, [])
|
||||
|
|
@ -152,7 +152,7 @@ def get_signed(self) -> T:
|
|||
else:
|
||||
raise ValueError(f'unrecognized role type "{_type}"')
|
||||
|
||||
except Exception as e:
|
||||
except Exception as e: # noqa: BLE001
|
||||
raise DeserializationError from e
|
||||
|
||||
return cast(T, inner_cls.from_dict(payload_dict))
|
||||
|
|
|
|||
|
|
@ -377,7 +377,7 @@ def sign(
|
|||
|
||||
try:
|
||||
signature = signer.sign(bytes_data)
|
||||
except Exception as e:
|
||||
except Exception as e: # noqa: BLE001
|
||||
raise UnsignedMetadataError(f"Failed to sign: {e}") from e
|
||||
|
||||
if not append:
|
||||
|
|
|
|||
|
|
@ -35,7 +35,7 @@ def deserialize(self, raw_data: bytes) -> Metadata:
|
|||
json_dict = json.loads(raw_data.decode("utf-8"))
|
||||
metadata_obj = Metadata.from_dict(json_dict)
|
||||
|
||||
except Exception as e:
|
||||
except Exception as e: # noqa: BLE001
|
||||
raise DeserializationError("Failed to deserialize JSON") from e
|
||||
|
||||
return metadata_obj
|
||||
|
|
@ -77,10 +77,10 @@ def serialize(self, metadata_obj: Metadata) -> bytes:
|
|||
raise ValueError(
|
||||
"Metadata changes if you serialize and deserialize."
|
||||
)
|
||||
except Exception as e:
|
||||
except Exception as e: # noqa: BLE001
|
||||
raise ValueError("Metadata cannot be validated!") from e
|
||||
|
||||
except Exception as e:
|
||||
except Exception as e: # noqa: BLE001
|
||||
raise SerializationError("Failed to serialize JSON") from e
|
||||
|
||||
return json_bytes
|
||||
|
|
@ -97,7 +97,7 @@ def serialize(self, signed_obj: Signed) -> bytes:
|
|||
signed_dict = signed_obj.to_dict()
|
||||
canonical_bytes = encode_canonical(signed_dict).encode("utf-8")
|
||||
|
||||
except Exception as e:
|
||||
except Exception as e: # noqa: BLE001
|
||||
raise SerializationError from e
|
||||
|
||||
return canonical_bytes
|
||||
|
|
|
|||
|
|
@ -454,7 +454,7 @@ def _load_from_metadata(
|
|||
data: bytes,
|
||||
delegator: Optional[Delegator] = None,
|
||||
role_name: Optional[str] = None,
|
||||
) -> Tuple[T, bytes, Dict[str, Signature]]: # noqa: D102
|
||||
) -> Tuple[T, bytes, Dict[str, Signature]]:
|
||||
"""Load traditional metadata bytes, and extract and verify payload.
|
||||
|
||||
If no delegator is passed, verification is skipped. Returns a tuple of
|
||||
|
|
@ -481,7 +481,7 @@ def _load_from_simple_envelope(
|
|||
data: bytes,
|
||||
delegator: Optional[Delegator] = None,
|
||||
role_name: Optional[str] = None,
|
||||
) -> Tuple[T, bytes, Dict[str, Signature]]: # noqa: D102
|
||||
) -> Tuple[T, bytes, Dict[str, Signature]]:
|
||||
"""Load simple envelope bytes, and extract and verify payload.
|
||||
|
||||
If no delegator is passed, verification is skipped. Returns a tuple of
|
||||
|
|
|
|||
|
|
@ -68,7 +68,7 @@ def fetch(self, url: str) -> Iterator[bytes]:
|
|||
return self._fetch(url)
|
||||
except exceptions.DownloadError as e:
|
||||
raise e
|
||||
except Exception as e:
|
||||
except Exception as e: # noqa: BLE001
|
||||
raise exceptions.DownloadError(f"Failed to download {url}") from e
|
||||
|
||||
@contextmanager
|
||||
|
|
|
|||
|
|
@ -87,7 +87,7 @@ def get_pypi_pip_version() -> str:
|
|||
# newest tarball and figure out the version from the filename
|
||||
with TemporaryDirectory() as pypi_dir:
|
||||
cmd = ["pip", "download", "--no-deps", "--dest", pypi_dir]
|
||||
source_download = cmd + ["--no-binary", PYPI_PROJECT, PYPI_PROJECT]
|
||||
source_download = [*cmd, "--no-binary", PYPI_PROJECT, PYPI_PROJECT]
|
||||
subprocess.run(source_download, stdout=subprocess.DEVNULL, check=True)
|
||||
for filename in os.listdir(pypi_dir):
|
||||
prefix, postfix = f"{PYPI_PROJECT}-", ".tar.gz"
|
||||
|
|
@ -130,8 +130,8 @@ def verify_pypi_release(version: str, compare_dir: str) -> bool:
|
|||
with TemporaryDirectory() as pypi_dir:
|
||||
cmd = ["pip", "download", "--no-deps", "--dest", pypi_dir]
|
||||
target = f"{PYPI_PROJECT}=={version}"
|
||||
binary_download = cmd + [target]
|
||||
source_download = cmd + ["--no-binary", PYPI_PROJECT, target]
|
||||
binary_download = [*cmd, target]
|
||||
source_download = [*cmd, "--no-binary", PYPI_PROJECT, target]
|
||||
|
||||
subprocess.run(binary_download, stdout=subprocess.DEVNULL, check=True)
|
||||
subprocess.run(source_download, stdout=subprocess.DEVNULL, check=True)
|
||||
|
|
@ -162,7 +162,7 @@ def sign_release_artifacts(
|
|||
artifact_path = os.path.join(build_dir, filename)
|
||||
signature_path = f"{filename}.asc"
|
||||
subprocess.run(
|
||||
cmd + ["--output", signature_path, artifact_path], check=True
|
||||
[*cmd, "--output", signature_path, artifact_path], check=True
|
||||
)
|
||||
|
||||
if not os.path.exists(signature_path):
|
||||
|
|
|
|||
Loading…
Reference in a new issue