Add an option to create TargetFile from data/file

This is a repository tooling use case but also helpful when testing.
It could be useful when we need to update the targets object.

Signed-off-by: Velichka Atanasova <avelichka@vmware.com>
This commit is contained in:
Velichka Atanasova 2021-08-10 09:13:33 +03:00
parent ea9acf2bfd
commit 65fd1aaf8a
2 changed files with 116 additions and 1 deletions

View file

@ -47,6 +47,7 @@
import_ed25519_privatekey_from_file
)
from securesystemslib import hash as sslib_hash
from securesystemslib.signer import (
SSlibSigner,
Signature
@ -621,7 +622,49 @@ def test_length_and_hash_validation(self):
file1_targetfile.length = expected_length
file1_targetfile.hashes = {'sha256': 'incorrecthash'}
self.assertRaises(exceptions.LengthOrHashMismatchError,
file1_targetfile.verify_length_and_hashes, file1)
file1_targetfile.verify_length_and_hashes, file1)
def test_targetfile_from_file(self):
# Test with an existing file and valid hash algorithm
file_path = os.path.join(self.repo_dir, 'targets', 'file1.txt')
targetfile_from_file = TargetFile.from_file(
file_path, file_path, ['sha256']
)
with open(file_path, "rb") as file:
targetfile_from_file.verify_length_and_hashes(file)
# Test with a non-existing file
file_path = os.path.join(self.repo_dir, 'targets', 'file123.txt')
self.assertRaises(
FileNotFoundError,
TargetFile.from_file,
file_path,
file_path,
[sslib_hash.DEFAULT_HASH_ALGORITHM]
)
# Test with an unsupported algorithm
file_path = os.path.join(self.repo_dir, 'targets', 'file1.txt')
self.assertRaises(
exceptions.UnsupportedAlgorithmError,
TargetFile.from_file,
file_path,
file_path,
['123']
)
def test_targetfile_from_data(self):
data = b"Inline test content"
target_file_path = os.path.join(self.repo_dir, 'targets', 'file1.txt')
# Test with a valid hash algorithm
targetfile_from_data = TargetFile.from_data(target_file_path, data, ['sha256'])
targetfile_from_data.verify_length_and_hashes(data)
# Test with no algorithms specified
targetfile_from_data = TargetFile.from_data(target_file_path, data)
targetfile_from_data.verify_length_and_hashes(data)
def test_is_delegated_role(self):
# test path matches

View file

@ -1219,6 +1219,78 @@ def to_dict(self) -> Dict[str, Any]:
**self.unrecognized_fields,
}
@classmethod
def from_file(
cls,
target_file_path: str,
local_path: str,
hash_algorithms: Optional[List[str]] = None,
) -> "TargetFile":
"""Creates TargetFile object from a file.
Arguments:
target_file_path: The TargetFile path.
local_path: The local path to the file to create TargetFile from.
hash_algorithms: An optional list of hash algorithms to create
the hashes with. If not specified the securesystemslib default
hash algorithm is used.
Raises:
FileNotFoundError: The file doesn't exist.
UnsupportedAlgorithmError: The hash algorithms list
contains an unsupported algorithm.
"""
with open(local_path, "rb") as file:
return cls.from_data(target_file_path, file, hash_algorithms)
@classmethod
def from_data(
cls,
target_file_path: str,
data: Union[bytes, IO[bytes]],
hash_algorithms: Optional[List[str]] = None,
) -> "TargetFile":
"""Creates TargetFile object from bytes.
Arguments:
target_file_path: The TargetFile path.
data: The data to create TargetFile from.
hash_algorithms: An optional list of hash algorithms to create
the hashes with. If not specified the securesystemslib default
hash algorithm is used.
Raises:
UnsupportedAlgorithmError: The hash algorithms list
contains an unsupported algorithm.
"""
if isinstance(data, bytes):
length = len(data)
else:
data.seek(0, io.SEEK_END)
length = data.tell()
hashes = {}
if hash_algorithms is None:
hash_algorithms = [sslib_hash.DEFAULT_HASH_ALGORITHM]
for algorithm in hash_algorithms:
try:
if isinstance(data, bytes):
digest_object = sslib_hash.digest(algorithm)
digest_object.update(data)
else:
digest_object = sslib_hash.digest_fileobject(
data, algorithm
)
except (
sslib_exceptions.UnsupportedAlgorithmError,
sslib_exceptions.FormatError,
) as e:
raise exceptions.UnsupportedAlgorithmError(
f"Unsupported algorithm '{algorithm}'"
) from e
hashes[algorithm] = digest_object.hexdigest()
return cls(length, hashes, target_file_path)
def verify_length_and_hashes(self, data: Union[bytes, IO[bytes]]) -> None:
"""Verifies that length and hashes of "data" match expected values.