Port securesystemslib.hash module

securesystemslib.hash is a small wrapper around hashlib, which serves
two main purposes:
* provide helper function to hash a file
* translate custom hash algorithm name "blake2b-256" to "blake2b" with
  (digest_size=32).

In preparation for the removal of securesystemslib.hash, this patch ports
above behavior to tuf and uses the builtin hashlib directly where
possible.

related secure-systems-lab/securesystemslib#943

Signed-off-by: Lukas Puehringer <lukas.puehringer@nyu.edu>
This commit is contained in:
Lukas Puehringer 2025-03-18 14:49:24 +01:00
parent 9f873cb9d5
commit 866409ffe9
3 changed files with 45 additions and 27 deletions

View file

@ -45,6 +45,7 @@
from __future__ import annotations
import datetime
import hashlib
import logging
import os
import tempfile
@ -52,7 +53,6 @@
from typing import TYPE_CHECKING
from urllib import parse
import securesystemslib.hash as sslib_hash
from securesystemslib.signer import CryptoSigner, Signer
from tuf.api.exceptions import DownloadHTTPError
@ -80,6 +80,8 @@
SPEC_VER = ".".join(SPECIFICATION_VERSION)
_DEFAULT_HASH_ALGORITHM = "sha256"
@dataclass
class FetchTracker:
@ -292,9 +294,9 @@ def _compute_hashes_and_length(
self, role: str
) -> tuple[dict[str, str], int]:
data = self.fetch_metadata(role)
digest_object = sslib_hash.digest(sslib_hash.DEFAULT_HASH_ALGORITHM)
digest_object = hashlib.new(_DEFAULT_HASH_ALGORITHM)
digest_object.update(data)
hashes = {sslib_hash.DEFAULT_HASH_ALGORITHM: digest_object.hexdigest()}
hashes = {_DEFAULT_HASH_ALGORITHM: digest_object.hexdigest()}
return hashes, len(data)
def update_timestamp(self) -> None:

View file

@ -17,7 +17,6 @@
from typing import ClassVar
from securesystemslib import exceptions as sslib_exceptions
from securesystemslib import hash as sslib_hash
from securesystemslib.signer import (
CryptoSigner,
Key,
@ -958,9 +957,7 @@ def test_targetfile_from_file(self) -> None:
# Test with a non-existing file
file_path = os.path.join(self.repo_dir, Targets.type, "file123.txt")
with self.assertRaises(FileNotFoundError):
TargetFile.from_file(
file_path, file_path, [sslib_hash.DEFAULT_HASH_ALGORITHM]
)
TargetFile.from_file(file_path, file_path, ["sha256"])
# Test with an unsupported algorithm
file_path = os.path.join(self.repo_dir, Targets.type, "file1.txt")

View file

@ -8,8 +8,10 @@
import abc
import fnmatch
import hashlib
import io
import logging
import sys
from dataclasses import dataclass
from datetime import datetime, timezone
from typing import (
@ -21,7 +23,6 @@
)
from securesystemslib import exceptions as sslib_exceptions
from securesystemslib import hash as sslib_hash
from securesystemslib.signer import Key, Signature
from tuf.api.exceptions import LengthOrHashMismatchError, UnsignedMetadataError
@ -34,6 +35,9 @@
_TARGETS = "targets"
_TIMESTAMP = "timestamp"
_DEFAULT_HASH_ALGORITHM = "sha256"
_BLAKE_HASH_ALGORITHM = "blake2b-256"
# We aim to support SPECIFICATION_VERSION and require the input metadata
# files to have the same major version (the first number) as ours.
SPECIFICATION_VERSION = ["1", "0", "31"]
@ -45,6 +49,30 @@
T = TypeVar("T", "Root", "Timestamp", "Snapshot", "Targets")
def _hash(algo: str) -> Any: # noqa: ANN401
"""Returns new hash object, supporting custom "blake2b-256" algo name."""
if algo == _BLAKE_HASH_ALGORITHM:
return hashlib.blake2b(digest_size=32)
return hashlib.new(algo)
def _file_hash(f: IO[bytes], algo: str) -> Any: # noqa: ANN401
"""Returns hashed file."""
f.seek(0)
if sys.version_info >= (3, 11):
digest = hashlib.file_digest(f, lambda: _hash(algo)) # type: ignore[arg-type]
else:
# Fallback for older Pythons. Chunk size is taken from the previously
# used and now deprecated `securesystemslib.hash.digest_fileobject`.
digest = _hash(algo)
for chunk in iter(lambda: f.read(4096), b""):
digest.update(chunk)
return digest
class Signed(metaclass=abc.ABCMeta):
"""A base class for the signed part of TUF metadata.
@ -664,19 +692,15 @@ def _verify_hashes(
data: bytes | IO[bytes], expected_hashes: dict[str, str]
) -> None:
"""Verify that the hash of ``data`` matches ``expected_hashes``."""
is_bytes = isinstance(data, bytes)
for algo, exp_hash in expected_hashes.items():
try:
if is_bytes:
digest_object = sslib_hash.digest(algo)
if isinstance(data, bytes):
digest_object = _hash(algo)
digest_object.update(data)
else:
# if data is not bytes, assume it is a file object
digest_object = sslib_hash.digest_fileobject(data, algo)
except (
sslib_exceptions.UnsupportedAlgorithmError,
sslib_exceptions.FormatError,
) as e:
digest_object = _file_hash(data, algo)
except (ValueError, TypeError) as e:
raise LengthOrHashMismatchError(
f"Unsupported algorithm '{algo}'"
) from e
@ -731,21 +755,16 @@ def _get_length_and_hashes(
hashes = {}
if hash_algorithms is None:
hash_algorithms = [sslib_hash.DEFAULT_HASH_ALGORITHM]
hash_algorithms = [_DEFAULT_HASH_ALGORITHM]
for algorithm in hash_algorithms:
try:
if isinstance(data, bytes):
digest_object = sslib_hash.digest(algorithm)
digest_object = _hash(algorithm)
digest_object.update(data)
else:
digest_object = sslib_hash.digest_fileobject(
data, algorithm
)
except (
sslib_exceptions.UnsupportedAlgorithmError,
sslib_exceptions.FormatError,
) as e:
digest_object = _file_hash(data, algorithm)
except (ValueError, TypeError) as e:
raise ValueError(f"Unsupported algorithm '{algorithm}'") from e
hashes[algorithm] = digest_object.hexdigest()
@ -1150,7 +1169,7 @@ def is_delegated_path(self, target_filepath: str) -> bool:
if self.path_hash_prefixes is not None:
# Calculate the hash of the filepath
# to determine in which bin to find the target.
digest_object = sslib_hash.digest(algorithm="sha256")
digest_object = hashlib.new(name="sha256")
digest_object.update(target_filepath.encode("utf-8"))
target_filepath_hash = digest_object.hexdigest()
@ -1269,7 +1288,7 @@ def get_role_for_target(self, target_filepath: str) -> str:
target_filepath: URL path to a target file, relative to a base
targets URL.
"""
hasher = sslib_hash.digest(algorithm="sha256")
hasher = hashlib.new(name="sha256")
hasher.update(target_filepath.encode("utf-8"))
# We can't ever need more than 4 bytes (32 bits).