From 866409ffe9f058cefa062d7b13f91bd84217f7de Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Tue, 18 Mar 2025 14:49:24 +0100 Subject: [PATCH] Port securesystemslib.hash module securesystemslib.hash is a small wrapper around hashlib, which serves two main purposes: * provide helper function to hash a file * translate custom hash algorithm name "blake2b-256" to "blake2b" with (digest_size=32). In preparation for the removal of securesystemslib.hash, this patch ports above behavior to tuf and uses the builtin hashlib directly where possible. related secure-systems-lab/securesystemslib#943 Signed-off-by: Lukas Puehringer --- tests/repository_simulator.py | 8 +++-- tests/test_api.py | 5 +-- tuf/api/_payload.py | 59 +++++++++++++++++++++++------------ 3 files changed, 45 insertions(+), 27 deletions(-) diff --git a/tests/repository_simulator.py b/tests/repository_simulator.py index 637ba42a..5e9ba189 100644 --- a/tests/repository_simulator.py +++ b/tests/repository_simulator.py @@ -45,6 +45,7 @@ from __future__ import annotations import datetime +import hashlib import logging import os import tempfile @@ -52,7 +53,6 @@ from typing import TYPE_CHECKING from urllib import parse -import securesystemslib.hash as sslib_hash from securesystemslib.signer import CryptoSigner, Signer from tuf.api.exceptions import DownloadHTTPError @@ -80,6 +80,8 @@ SPEC_VER = ".".join(SPECIFICATION_VERSION) +_DEFAULT_HASH_ALGORITHM = "sha256" + @dataclass class FetchTracker: @@ -292,9 +294,9 @@ def _compute_hashes_and_length( self, role: str ) -> tuple[dict[str, str], int]: data = self.fetch_metadata(role) - digest_object = sslib_hash.digest(sslib_hash.DEFAULT_HASH_ALGORITHM) + digest_object = hashlib.new(_DEFAULT_HASH_ALGORITHM) digest_object.update(data) - hashes = {sslib_hash.DEFAULT_HASH_ALGORITHM: digest_object.hexdigest()} + hashes = {_DEFAULT_HASH_ALGORITHM: digest_object.hexdigest()} return hashes, len(data) def update_timestamp(self) -> None: diff --git a/tests/test_api.py b/tests/test_api.py index 7b80d360..53a13f14 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -17,7 +17,6 @@ from typing import ClassVar from securesystemslib import exceptions as sslib_exceptions -from securesystemslib import hash as sslib_hash from securesystemslib.signer import ( CryptoSigner, Key, @@ -958,9 +957,7 @@ def test_targetfile_from_file(self) -> None: # Test with a non-existing file file_path = os.path.join(self.repo_dir, Targets.type, "file123.txt") with self.assertRaises(FileNotFoundError): - TargetFile.from_file( - file_path, file_path, [sslib_hash.DEFAULT_HASH_ALGORITHM] - ) + TargetFile.from_file(file_path, file_path, ["sha256"]) # Test with an unsupported algorithm file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt") diff --git a/tuf/api/_payload.py b/tuf/api/_payload.py index 56852082..72c66785 100644 --- a/tuf/api/_payload.py +++ b/tuf/api/_payload.py @@ -8,8 +8,10 @@ import abc import fnmatch +import hashlib import io import logging +import sys from dataclasses import dataclass from datetime import datetime, timezone from typing import ( @@ -21,7 +23,6 @@ ) from securesystemslib import exceptions as sslib_exceptions -from securesystemslib import hash as sslib_hash from securesystemslib.signer import Key, Signature from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError @@ -34,6 +35,9 @@ _TARGETS = "targets" _TIMESTAMP = "timestamp" +_DEFAULT_HASH_ALGORITHM = "sha256" +_BLAKE_HASH_ALGORITHM = "blake2b-256" + # We aim to support SPECIFICATION_VERSION and require the input metadata # files to have the same major version (the first number) as ours. SPECIFICATION_VERSION = ["1", "0", "31"] @@ -45,6 +49,30 @@ T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets") +def _hash(algo: str) -> Any: # noqa: ANN401 + """Returns new hash object, supporting custom "blake2b-256" algo name.""" + if algo == _BLAKE_HASH_ALGORITHM: + return hashlib.blake2b(digest_size=32) + + return hashlib.new(algo) + + +def _file_hash(f: IO[bytes], algo: str) -> Any: # noqa: ANN401 + """Returns hashed file.""" + f.seek(0) + if sys.version_info >= (3, 11): + digest = hashlib.file_digest(f, lambda: _hash(algo)) # type: ignore[arg-type] + + else: + # Fallback for older Pythons. Chunk size is taken from the previously + # used and now deprecated `securesystemslib.hash.digest_fileobject`. + digest = _hash(algo) + for chunk in iter(lambda: f.read(4096), b""): + digest.update(chunk) + + return digest + + class Signed(metaclass=abc.ABCMeta): """A base class for the signed part of TUF metadata. @@ -664,19 +692,15 @@ def _verify_hashes( data: bytes | IO[bytes], expected_hashes: dict[str, str] ) -> None: """Verify that the hash of ``data`` matches ``expected_hashes``.""" - is_bytes = isinstance(data, bytes) for algo, exp_hash in expected_hashes.items(): try: - if is_bytes: - digest_object = sslib_hash.digest(algo) + if isinstance(data, bytes): + digest_object = _hash(algo) digest_object.update(data) else: # if data is not bytes, assume it is a file object - digest_object = sslib_hash.digest_fileobject(data, algo) - except ( - sslib_exceptions.UnsupportedAlgorithmError, - sslib_exceptions.FormatError, - ) as e: + digest_object = _file_hash(data, algo) + except (ValueError, TypeError) as e: raise LengthOrHashMismatchError( f"Unsupported algorithm '{algo}'" ) from e @@ -731,21 +755,16 @@ def _get_length_and_hashes( hashes = {} if hash_algorithms is None: - hash_algorithms = [sslib_hash.DEFAULT_HASH_ALGORITHM] + hash_algorithms = [_DEFAULT_HASH_ALGORITHM] for algorithm in hash_algorithms: try: if isinstance(data, bytes): - digest_object = sslib_hash.digest(algorithm) + digest_object = _hash(algorithm) digest_object.update(data) else: - digest_object = sslib_hash.digest_fileobject( - data, algorithm - ) - except ( - sslib_exceptions.UnsupportedAlgorithmError, - sslib_exceptions.FormatError, - ) as e: + digest_object = _file_hash(data, algorithm) + except (ValueError, TypeError) as e: raise ValueError(f"Unsupported algorithm '{algorithm}'") from e hashes[algorithm] = digest_object.hexdigest() @@ -1150,7 +1169,7 @@ def is_delegated_path(self, target_filepath: str) -> bool: if self.path_hash_prefixes is not None: # Calculate the hash of the filepath # to determine in which bin to find the target. - digest_object = sslib_hash.digest(algorithm="sha256") + digest_object = hashlib.new(name="sha256") digest_object.update(target_filepath.encode("utf-8")) target_filepath_hash = digest_object.hexdigest() @@ -1269,7 +1288,7 @@ def get_role_for_target(self, target_filepath: str) -> str: target_filepath: URL path to a target file, relative to a base targets URL. """ - hasher = sslib_hash.digest(algorithm="sha256") + hasher = hashlib.new(name="sha256") hasher.update(target_filepath.encode("utf-8")) # We can't ever need more than 4 bytes (32 bits).