Merge pull request #2273 from VickyMerzOwn/develop

enhancement: Add from_data() method to MetaFile
This commit is contained in:
Lukas Pühringer 2023-08-16 17:21:41 +02:00 committed by GitHub
commit 00a6ac7f0a
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
2 changed files with 82 additions and 28 deletions

View file

@ -34,6 +34,7 @@
Delegations,
Key,
Metadata,
MetaFile,
Root,
Signature,
Snapshot,
@ -725,6 +726,29 @@ def test_targetfile_from_data(self) -> None:
targetfile_from_data = TargetFile.from_data(target_file_path, data)
targetfile_from_data.verify_length_and_hashes(data)
def test_metafile_from_data(self) -> None:
data = b"Inline test content"
# Test with a valid hash algorithm
metafile = MetaFile.from_data(1, data, ["sha256"])
metafile.verify_length_and_hashes(data)
# Test with an invalid hash algorithm
with self.assertRaises(ValueError):
metafile = MetaFile.from_data(1, data, ["invalid_algorithm"])
metafile.verify_length_and_hashes(data)
self.assertEqual(
metafile,
MetaFile(
1,
19,
{
"sha256": "fcee2e6d56ab08eab279016f7db7e4e1d172ccea78e15f4cf8bd939991a418fa"
},
),
)
def test_targetfile_get_prefixed_paths(self) -> None:
target = TargetFile(100, {"sha256": "abc", "md5": "def"}, "a/b/f.ext")
self.assertEqual(

View file

@ -900,6 +900,41 @@ def _validate_length(length: int) -> None:
if length < 0:
raise ValueError(f"Length must be >= 0, got {length}")
@staticmethod
def _get_length_and_hashes(
data: Union[bytes, IO[bytes]], hash_algorithms: Optional[List[str]]
) -> Tuple[int, Dict[str, str]]:
"""Calculate length and hashes of ``data``."""
if isinstance(data, bytes):
length = len(data)
else:
data.seek(0, io.SEEK_END)
length = data.tell()
hashes = {}
if hash_algorithms is None:
hash_algorithms = [sslib_hash.DEFAULT_HASH_ALGORITHM]
for algorithm in hash_algorithms:
try:
if isinstance(data, bytes):
digest_object = sslib_hash.digest(algorithm)
digest_object.update(data)
else:
digest_object = sslib_hash.digest_fileobject(
data, algorithm
)
except (
sslib_exceptions.UnsupportedAlgorithmError,
sslib_exceptions.FormatError,
) as e:
raise ValueError(f"Unsupported algorithm '{algorithm}'") from e
hashes[algorithm] = digest_object.hexdigest()
return (length, hashes)
class MetaFile(BaseFile):
"""A container with information about a particular metadata file.
@ -966,6 +1001,28 @@ def from_dict(cls, meta_dict: Dict[str, Any]) -> "MetaFile":
# All fields left in the meta_dict are unrecognized.
return cls(version, length, hashes, meta_dict)
@classmethod
def from_data(
cls,
version: int,
data: Union[bytes, IO[bytes]],
hash_algorithms: List[str],
) -> "MetaFile":
"""Creates MetaFile object from bytes.
This constructor should only be used if hashes are wanted.
By default, MetaFile(ver) should be used.
Args:
version: Version of the metadata file.
data: Metadata bytes that the metafile represents.
hash_algorithms: Hash algorithms to create the hashes with. If not
specified, the securesystemslib default hash algorithm is used.
Raises:
ValueError: The hash algorithms list contains an unsupported
algorithm.
"""
length, hashes = cls._get_length_and_hashes(data, hash_algorithms)
return cls(version, length, hashes)
def to_dict(self) -> Dict[str, Any]:
"""Return the dictionary representation of self."""
res_dict: Dict[str, Any] = {
@ -1693,34 +1750,7 @@ def from_data(
ValueError: The hash algorithms list contains an unsupported
algorithm.
"""
if isinstance(data, bytes):
length = len(data)
else:
data.seek(0, io.SEEK_END)
length = data.tell()
hashes = {}
if hash_algorithms is None:
hash_algorithms = [sslib_hash.DEFAULT_HASH_ALGORITHM]
for algorithm in hash_algorithms:
try:
if isinstance(data, bytes):
digest_object = sslib_hash.digest(algorithm)
digest_object.update(data)
else:
digest_object = sslib_hash.digest_fileobject(
data, algorithm
)
except (
sslib_exceptions.UnsupportedAlgorithmError,
sslib_exceptions.FormatError,
) as e:
raise ValueError(f"Unsupported algorithm '{algorithm}'") from e
hashes[algorithm] = digest_object.hexdigest()
length, hashes = cls._get_length_and_hashes(data, hash_algorithms)
return cls(length, hashes, target_file_path)
def verify_length_and_hashes(self, data: Union[bytes, IO[bytes]]) -> None: