mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Merge pull request #203 from vladimir-v-diaz/develop
Add test_repository_tool.py
This commit is contained in:
commit
973e8d1297
6 changed files with 1910 additions and 83 deletions
Binary file not shown.
|
Before Width: | Height: | Size: 88 KiB After Width: | Height: | Size: 90 KiB |
1820
tests/unit/test_repository_tool.py
Executable file
1820
tests/unit/test_repository_tool.py
Executable file
File diff suppressed because it is too large
Load diff
|
|
@ -5,7 +5,7 @@
|
|||
test_updater.py
|
||||
|
||||
<Author>
|
||||
Konstantin Andrianov
|
||||
Konstantin Andrianov.
|
||||
|
||||
<Started>
|
||||
October 15, 2012.
|
||||
|
|
|
|||
|
|
@ -894,9 +894,15 @@ def _decrypt(file_contents, password):
|
|||
# from 'file_contents'. These five values are delimited by
|
||||
# '_ENCRYPTION_DELIMITER'. This delimiter is arbitrarily chosen and should
|
||||
# not occur in the hexadecimal representations of the fields it is separating.
|
||||
salt, iterations, hmac, iv, ciphertext = \
|
||||
file_contents.split(_ENCRYPTION_DELIMITER)
|
||||
# Raise 'tuf.CryptoError', if 'file_contents' does not contains the expected
|
||||
# data layout.
|
||||
try:
|
||||
salt, iterations, hmac, iv, ciphertext = \
|
||||
file_contents.split(_ENCRYPTION_DELIMITER)
|
||||
|
||||
except ValueError:
|
||||
raise tuf.CryptoError('Invalid encrypted file.')
|
||||
|
||||
# Ensure we have the expected raw data for the delimited cryptographic data.
|
||||
salt = binascii.unhexlify(salt)
|
||||
iterations = int(binascii.unhexlify(iterations))
|
||||
|
|
|
|||
|
|
@ -188,7 +188,7 @@ def write(self, write_partial=False, consistent_snapshot=False):
|
|||
private keys, etc.
|
||||
|
||||
<Arguments>
|
||||
write_partial:
|
||||
mrite_partial:
|
||||
A boolean indicating whether partial metadata should be written to
|
||||
disk. Partial metadata may be written to allow multiple maintainters
|
||||
to independently sign and update role metadata. write() raises an
|
||||
|
|
@ -878,7 +878,7 @@ def keys(self):
|
|||
<Purpose>
|
||||
A getter method that returns the role's keyids of the keys. The role
|
||||
is expected to eventually contain a threshold of signatures generated
|
||||
by the private keys of each of the role's keys (returned here as a keyid).
|
||||
by the private keys of each of the role's keys (returned here as a keyid.)
|
||||
|
||||
<Arguments>
|
||||
None.
|
||||
|
|
@ -1117,7 +1117,8 @@ def expiration(self, expiration_datetime_utc):
|
|||
'tuf.formats.DATETIME_SCHEMA'.
|
||||
|
||||
<Exceptions>
|
||||
tuf.FormatError, if 'expiration_datetime_utc' is improperly formatted.
|
||||
tuf.FormatError, if 'expiration_datetime_utc' is improperly formatted,
|
||||
or invalid (e.g., already expired).
|
||||
|
||||
<Side Effects>
|
||||
Modifies the expiration attribute of the Repository object.
|
||||
|
|
@ -1135,7 +1136,7 @@ def expiration(self, expiration_datetime_utc):
|
|||
# Further validate the datetime, such as a correct date, time, expiration.
|
||||
# Convert 'expiration_datetime_utc' to a unix timestamp so that it can be
|
||||
# compared with time.time().
|
||||
expiration_datetime_utc = expiration_datetime_utc+' UTC'
|
||||
expiration_datetime_utc = expiration_datetime_utc + ' UTC'
|
||||
try:
|
||||
unix_timestamp = tuf.formats.parse_time(expiration_datetime_utc)
|
||||
|
||||
|
|
@ -1479,13 +1480,14 @@ class Targets(Metadata):
|
|||
tuf.FormatError, if the arguments are improperly formatted.
|
||||
|
||||
<Side Effects>
|
||||
Modifies the roleinfo of the targets role in 'tuf.roledb'.
|
||||
Modifies the roleinfo of the targets role in 'tuf.roledb', or creates
|
||||
a default one named 'targets'.
|
||||
|
||||
<Returns>
|
||||
None.
|
||||
"""
|
||||
|
||||
def __init__(self, targets_directory, rolename, roleinfo=None):
|
||||
def __init__(self, targets_directory, rolename='targets', roleinfo=None):
|
||||
|
||||
# Do the arguments have the correct format?
|
||||
# Ensure the arguments have the appropriate number of objects and object
|
||||
|
|
@ -1609,7 +1611,8 @@ def add_restricted_paths(self, list_of_directory_paths, child_rolename):
|
|||
|
||||
child_rolename:
|
||||
The child delegation that requires an update to its restricted paths,
|
||||
as listed in the parent role's delegations.
|
||||
as listed in the parent role's delegations (e.g., 'Django' in
|
||||
'targets/unclaimed/Django').
|
||||
|
||||
<Exceptions>
|
||||
tuf.Error, if a directory path in 'list_of_directory_paths' is not a
|
||||
|
|
@ -2190,8 +2193,9 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins,
|
|||
considered the hash prefix).
|
||||
|
||||
<Exceptions>
|
||||
tuf.FormatError, if the arguments are improperly formatted,
|
||||
'number_of_bins' is not a power of 2, or one of the targets
|
||||
tuf.FormatError, if the arguments are improperly formatted.
|
||||
|
||||
tuf.Error, if 'number_of_bins' is not a power of 2, or one of the targets
|
||||
in 'list_of_targets' is not located under the repository's targets
|
||||
directory.
|
||||
|
||||
|
|
@ -2227,7 +2231,7 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins,
|
|||
# (total_hash_prefixes / number_of_bins) hash prefixes.
|
||||
if total_hash_prefixes % number_of_bins != 0:
|
||||
message = 'The "number_of_bins" argument must be a power of 2.'
|
||||
raise tuf.FormatError(message)
|
||||
raise tuf.Error(message)
|
||||
|
||||
logger.info('Creating hashed bin delegations.')
|
||||
logger.info(repr(len(list_of_targets)) + ' total targets.')
|
||||
|
|
@ -2248,7 +2252,7 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins,
|
|||
if not target_path.startswith(self._targets_directory+os.sep):
|
||||
message = 'A path in the list of targets argument is not '+\
|
||||
'under the repository\'s targets directory: '+repr(target_path)
|
||||
raise tuf.FormatError(message)
|
||||
raise tuf.Error(message)
|
||||
|
||||
# Determine the hash prefix of 'target_path' by computing the digest of
|
||||
# its path relative to the targets directory. Example:
|
||||
|
|
@ -2767,29 +2771,32 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata,
|
|||
|
||||
# Strip the digest if 'consistent_snapshot' is True.
|
||||
# Example: 'targets/unclaimed/13df98ab0.django.json' -->
|
||||
# 'targets/unclaimed/django.json'
|
||||
metadata_name, embeded_digest = \
|
||||
_strip_consistent_snapshot_digest(metadata_name, consistent_snapshot)
|
||||
|
||||
# 'targets/unclaimed/django.json'. Consistent and non-consistent
|
||||
# metadata might co-exist if write() and write(consistent_snapshot=True)
|
||||
# are mixed, so ensure only 'digest.filename' metadata is stripped.
|
||||
embeded_digest = None
|
||||
if metadata_name not in snapshot_metadata['meta']:
|
||||
metadata_name, embeded_digest = \
|
||||
_strip_consistent_snapshot_digest(metadata_name, consistent_snapshot)
|
||||
|
||||
# Strip filename extensions. The role database does not include the
|
||||
# metadata extension.
|
||||
metadata_name_extension = metadata_name
|
||||
for metadata_extension in METADATA_EXTENSIONS:
|
||||
if metadata_name.endswith(metadata_extension):
|
||||
metadata_name_without_extension = \
|
||||
metadata_name[:-len(metadata_extension)]
|
||||
metadata_name = metadata_name[:-len(metadata_extension)]
|
||||
|
||||
# Delete the metadata file if it does not exist in 'tuf.roledb'.
|
||||
# repository_tool.py might have marked 'metadata_name' as removed, but its
|
||||
# metadata file is not actually deleted yet. Do it now.
|
||||
if not tuf.roledb.role_exists(metadata_name_without_extension):
|
||||
# 'repository_tool.py' might have marked 'metadata_name' as removed, but
|
||||
# its metadata file is not actually deleted yet. Do it now.
|
||||
if not tuf.roledb.role_exists(metadata_name):
|
||||
logger.info('Removing outdated metadata: ' + repr(metadata_path))
|
||||
os.remove(metadata_path)
|
||||
|
||||
# Delete outdated consistent snapshots. snapshot metadata includes
|
||||
# the file extension of roles.
|
||||
if consistent_snapshot:
|
||||
#metadata_name_extension = metadata_name + METADATA_EXTENSION
|
||||
file_hashes = snapshot_metadata['meta'][metadata_name] \
|
||||
if consistent_snapshot and embeded_digest is not None:
|
||||
file_hashes = snapshot_metadata['meta'][metadata_name_extension] \
|
||||
['hashes'].values()
|
||||
if embeded_digest not in file_hashes:
|
||||
logger.info('Removing outdated metadata: ' + repr(metadata_path))
|
||||
|
|
@ -2956,9 +2963,11 @@ def load_repository(repository_directory):
|
|||
|
||||
<Exceptions>
|
||||
tuf.FormatError, if 'repository_directory' or any of the metadata files
|
||||
are improperly formatted. Also raised if, at a minimum, the Root role
|
||||
cannot be found.
|
||||
are improperly formatted.
|
||||
|
||||
tuf.RepositoryError, if the Root role cannot be found. At a minimum,
|
||||
a repository must contain 'root.json'
|
||||
|
||||
<Side Effects>
|
||||
All the metadata files found in the repository are loaded and their contents
|
||||
stored in a repository_tool.Repository object.
|
||||
|
|
@ -3379,6 +3388,8 @@ def import_rsa_privatekey_from_file(filepath, password=None):
|
|||
<Exceptions>
|
||||
tuf.FormatError, if the arguments are improperly formatted.
|
||||
|
||||
tuf.CryptoError, if 'filepath' is not a valid encrypted key file.
|
||||
|
||||
<Side Effects>
|
||||
The contents of 'filepath' is read, decrypted, and the key stored.
|
||||
|
||||
|
|
@ -3408,7 +3419,8 @@ def import_rsa_privatekey_from_file(filepath, password=None):
|
|||
with open(filepath, 'rb') as file_object:
|
||||
encrypted_pem = file_object.read()
|
||||
|
||||
# Convert 'encrypted_pem' to 'tuf.formats.RSAKEY_SCHEMA' format.
|
||||
# Convert 'encrypted_pem' to 'tuf.formats.RSAKEY_SCHEMA' format. Raise
|
||||
# 'tuf.CryptoError' if 'encrypted_pem' is invalid.
|
||||
rsa_key = tuf.keys.import_rsakey_from_encrypted_pem(encrypted_pem, password)
|
||||
|
||||
return rsa_key
|
||||
|
|
@ -3691,13 +3703,13 @@ def get_metadata_filenames(metadata_directory=None):
|
|||
If 'metadata_directory' is set to 'metadata', the dictionary
|
||||
returned would contain:
|
||||
|
||||
filenames = {'root': 'metadata/root.json',
|
||||
'targets': 'metadata/targets.json',
|
||||
'snapshot': 'metadata/snapshot.json',
|
||||
'timestamp': 'metadata/timestamp.json'}
|
||||
filenames = {'root.json': 'metadata/root.json',
|
||||
'targets.json': 'metadata/targets.json',
|
||||
'snapshot.json': 'metadata/snapshot.json',
|
||||
'timestamp.json': 'metadata/timestamp.json'}
|
||||
|
||||
If the metadata directory is not set by the caller, the current
|
||||
directory is used.
|
||||
If 'metadata_directory' is not set by the caller, the current directory is
|
||||
used.
|
||||
|
||||
<Arguments>
|
||||
metadata_directory:
|
||||
|
|
@ -3714,19 +3726,15 @@ def get_metadata_filenames(metadata_directory=None):
|
|||
metadata files, such as 'root.json' and 'snapshot.json'.
|
||||
"""
|
||||
|
||||
if metadata_directory is None:
|
||||
metadata_directory = os.getcwd()
|
||||
|
||||
# Does 'metadata_directory' have the correct format?
|
||||
# Ensure the arguments have the appropriate number of objects and object
|
||||
# types, and that all dict keys are properly named.
|
||||
# Raise 'tuf.FormatError' if there is a mismatch.
|
||||
tuf.formats.PATH_SCHEMA.check_match(metadata_directory)
|
||||
|
||||
if metadata_directory is None:
|
||||
metadata_directory = '.'
|
||||
|
||||
# Does 'metadata_directory' have the correct format?
|
||||
# Raise 'tuf.FormatError' if there is a mismatch.
|
||||
tuf.formats.PATH_SCHEMA.check_match(metadata_directory)
|
||||
|
||||
# Store the filepaths of the top-level roles, including the
|
||||
# 'metadata_directory' for each one.
|
||||
filenames = {}
|
||||
|
|
@ -3749,13 +3757,14 @@ def get_metadata_filenames(metadata_directory=None):
|
|||
|
||||
|
||||
|
||||
def get_metadata_file_info(filename):
|
||||
def get_metadata_fileinfo(filename):
|
||||
"""
|
||||
<Purpose>
|
||||
Retrieve the file information of 'filename'. The object returned
|
||||
conforms to 'tuf.formats.FILEINFO_SCHEMA'. The information
|
||||
generated for 'filename' is stored in metadata files like 'targets.json'.
|
||||
The fileinfo object returned has the form:
|
||||
|
||||
fileinfo = {'length': 1024,
|
||||
'hashes': {'sha256': 1233dfba312, ...},
|
||||
'custom': {...}}
|
||||
|
|
@ -3776,7 +3785,7 @@ def get_metadata_file_info(filename):
|
|||
<Returns>
|
||||
A dictionary conformant to 'tuf.formats.FILEINFO_SCHEMA'. This
|
||||
dictionary contains the length, hashes, and custom data about the
|
||||
'filename' metadata file.
|
||||
'filename' metadata file. SHA256 hashes are generated by default.
|
||||
"""
|
||||
|
||||
# Does 'filename' have the correct format?
|
||||
|
|
@ -3831,30 +3840,7 @@ def get_target_hash(target_filepath):
|
|||
The hash of 'target_filepath'.
|
||||
"""
|
||||
|
||||
# Does 'target_filepath' have the correct format?
|
||||
# Ensure the arguments have the appropriate number of objects and object
|
||||
# types, and that all dict keys are properly named.
|
||||
# Raise 'tuf.FormatError' if there is a mismatch.
|
||||
tuf.formats.RELPATH_SCHEMA.check_match(target_filepath)
|
||||
|
||||
# Calculate the hash of the filepath to determine which bin to find the
|
||||
# target. The client currently assumes the repository uses
|
||||
# 'HASH_FUNCTION' to generate hashes.
|
||||
digest_object = tuf.hash.digest(HASH_FUNCTION)
|
||||
|
||||
try:
|
||||
digest_object.update(target_filepath)
|
||||
|
||||
except UnicodeEncodeError:
|
||||
# Sometimes, there are Unicode characters in target paths. We assume a
|
||||
# UTF-8 encoding and try to hash that.
|
||||
digest_object = tuf.hash.digest(HASH_FUNCTION)
|
||||
encoded_target_filepath = target_filepath.encode('utf-8')
|
||||
digest_object.update(encoded_target_filepath)
|
||||
|
||||
target_filepath_hash = digest_object.hexdigest()
|
||||
|
||||
return target_filepath_hash
|
||||
return tuf.util.get_target_hash(target_filepath)
|
||||
|
||||
|
||||
|
||||
|
|
@ -3878,13 +3864,16 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot):
|
|||
'tuf.formats.TIME_SCHEMA'.
|
||||
|
||||
consistent_snapshot:
|
||||
Boolean. If True, a file digest is expected to be prepended to the
|
||||
filename of any target file located in the targets directory. Each digest
|
||||
is stripped from the target filename and listed in the snapshot metadata.
|
||||
|
||||
<Exceptions>
|
||||
tuf.FormatError, if the generated root metadata object could not
|
||||
be generated with the correct format.
|
||||
|
||||
tuf.Error, if an error is encountered while generating the root
|
||||
metadata object.
|
||||
metadata object (e.g., a required top-level role not found in 'tuf.roledb'.)
|
||||
|
||||
<Side Effects>
|
||||
The contents of 'tuf.keydb.py' and 'tuf.roledb.py' are read.
|
||||
|
|
@ -4056,7 +4045,7 @@ def generate_targets_metadata(targets_directory, target_files, version,
|
|||
'targets metadata.'
|
||||
raise tuf.Error(message)
|
||||
|
||||
filedict[relative_targetpath] = get_metadata_file_info(target_path)
|
||||
filedict[relative_targetpath] = get_metadata_fileinfo(target_path)
|
||||
|
||||
if write_consistent_targets:
|
||||
for target_digest in filedict[relative_targetpath]['hashes'].values():
|
||||
|
|
@ -4082,7 +4071,7 @@ def generate_targets_metadata(targets_directory, target_files, version,
|
|||
|
||||
def generate_snapshot_metadata(metadata_directory, version, expiration_date,
|
||||
root_filename, targets_filename,
|
||||
consistent_snapshot):
|
||||
consistent_snapshot=False):
|
||||
"""
|
||||
<Purpose>
|
||||
Create the snapshot metadata. The minimum metadata must exist
|
||||
|
|
@ -4118,7 +4107,7 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date,
|
|||
is stripped from the target filename and listed in the snapshot metadata.
|
||||
|
||||
<Exceptions>
|
||||
tuf.FormatError, if 'metadata_directory' is improperly formatted.
|
||||
tuf.FormatError, if the arguments are improperly formatted.
|
||||
|
||||
tuf.Error, if an error occurred trying to generate the snapshot metadata
|
||||
object.
|
||||
|
|
@ -4139,14 +4128,15 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date,
|
|||
tuf.formats.TIME_SCHEMA.check_match(expiration_date)
|
||||
tuf.formats.PATH_SCHEMA.check_match(root_filename)
|
||||
tuf.formats.PATH_SCHEMA.check_match(targets_filename)
|
||||
tuf.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot)
|
||||
|
||||
metadata_directory = _check_directory(metadata_directory)
|
||||
|
||||
# Retrieve the fileinfo of 'root.json' and 'targets.json'. This file
|
||||
# information includes data such as file length, hashes of the file, etc.
|
||||
filedict = {}
|
||||
filedict[ROOT_FILENAME] = get_metadata_file_info(root_filename)
|
||||
filedict[TARGETS_FILENAME] = get_metadata_file_info(targets_filename)
|
||||
filedict[ROOT_FILENAME] = get_metadata_fileinfo(root_filename)
|
||||
filedict[TARGETS_FILENAME] = get_metadata_fileinfo(targets_filename)
|
||||
|
||||
# Add compressed versions of the 'targets.json' and 'root.json' metadata,
|
||||
# if they exist.
|
||||
|
|
@ -4158,10 +4148,10 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date,
|
|||
# add their file attributes to 'filedict'.
|
||||
if os.path.exists(compressed_root_filename):
|
||||
filedict[ROOT_FILENAME+extension] = \
|
||||
get_metadata_file_info(compressed_root_filename)
|
||||
get_metadata_fileinfo(compressed_root_filename)
|
||||
if os.path.exists(compressed_targets_filename):
|
||||
filedict[TARGETS_FILENAME+extension] = \
|
||||
get_metadata_file_info(compressed_targets_filename)
|
||||
get_metadata_fileinfo(compressed_targets_filename)
|
||||
|
||||
# Walk the 'targets/' directory and generate the fileinfo of all the role
|
||||
# files found. This information is stored in the 'meta' field of the snapshot
|
||||
|
|
@ -4191,7 +4181,7 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date,
|
|||
# Obsolete role files may still be found. Ensure only roles loaded
|
||||
# in the roledb are included in the snapshot metadata.
|
||||
if tuf.roledb.role_exists(rolename):
|
||||
filedict[metadata_name] = get_metadata_file_info(metadata_path)
|
||||
filedict[metadata_name] = get_metadata_fileinfo(metadata_path)
|
||||
|
||||
# Generate the snapshot metadata object.
|
||||
snapshot_metadata = tuf.formats.SnapshotFile.make_metadata(version,
|
||||
|
|
@ -4254,7 +4244,7 @@ def generate_timestamp_metadata(snapshot_filename, version,
|
|||
# Retrieve the fileinfo of the snapshot metadata file.
|
||||
# This file information contains hashes, file length, custom data, etc.
|
||||
fileinfo = {}
|
||||
fileinfo[SNAPSHOT_FILENAME] = get_metadata_file_info(snapshot_filename)
|
||||
fileinfo[SNAPSHOT_FILENAME] = get_metadata_fileinfo(snapshot_filename)
|
||||
|
||||
# Save the fileinfo of the compressed versions of 'timestamp.json'
|
||||
# in 'fileinfo'. Log the files included in 'fileinfo'.
|
||||
|
|
@ -4264,14 +4254,14 @@ def generate_timestamp_metadata(snapshot_filename, version,
|
|||
|
||||
compressed_filename = snapshot_filename + '.' + file_extension
|
||||
try:
|
||||
compressed_fileinfo = get_metadata_file_info(compressed_filename)
|
||||
compressed_fileinfo = get_metadata_fileinfo(compressed_filename)
|
||||
|
||||
except:
|
||||
logger.warn('Cannot get fileinfo about '+str(compressed_filename))
|
||||
logger.warn('Cannot get fileinfo about '+repr(compressed_filename))
|
||||
|
||||
else:
|
||||
logger.info('Including fileinfo about '+str(compressed_filename))
|
||||
fileinfo[SNAPSHOT_FILENAME+'.'+file_extension] = compressed_fileinfo
|
||||
logger.info('Including fileinfo about '+repr(compressed_filename))
|
||||
fileinfo[SNAPSHOT_FILENAME + '.' + file_extension] = compressed_fileinfo
|
||||
|
||||
# Generate the timestamp metadata object.
|
||||
timestamp_metadata = tuf.formats.TimestampFile.make_metadata(version,
|
||||
|
|
|
|||
13
tuf/util.py
13
tuf/util.py
|
|
@ -879,6 +879,8 @@ def load_json_file(filepath):
|
|||
<Exceptions>
|
||||
tuf.FormatError: If 'filepath' is improperly formatted.
|
||||
|
||||
tuf.Error: If 'filepath' cannot be deserialized to a Python object.
|
||||
|
||||
IOError in case of runtime IO exceptions.
|
||||
|
||||
<Side Effects>
|
||||
|
|
@ -892,6 +894,8 @@ def load_json_file(filepath):
|
|||
# tuf.FormatError is raised on incorrect format.
|
||||
tuf.formats.PATH_SCHEMA.check_match(filepath)
|
||||
|
||||
deserialized_object = None
|
||||
|
||||
# The file is mostly likely gzipped.
|
||||
if filepath.endswith('.gz'):
|
||||
logger.debug('gzip.open('+str(filepath)+')')
|
||||
|
|
@ -902,7 +906,14 @@ def load_json_file(filepath):
|
|||
fileobject = open(filepath)
|
||||
|
||||
try:
|
||||
return json.load(fileobject)
|
||||
deserialized_object = json.load(fileobject)
|
||||
|
||||
except ValueError, TypeError:
|
||||
message = 'Cannot deserialize to a Python object: '+repr(filepath)
|
||||
raise tuf.Error(message)
|
||||
|
||||
else:
|
||||
return deserialized_object
|
||||
|
||||
finally:
|
||||
fileobject.close()
|
||||
|
|
|
|||
Loading…
Reference in a new issue