mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Update issue #137.
Modify format of hash bin delegations. Test implementation.
This commit is contained in:
parent
5d1906a239
commit
96f6152fbf
1 changed files with 95 additions and 33 deletions
128
tuf/libtuf.py
Executable file → Normal file
128
tuf/libtuf.py
Executable file → Normal file
|
|
@ -1565,6 +1565,59 @@ def target_files(self):
|
|||
|
||||
|
||||
|
||||
def add_directory_paths(self, list_of_directory_paths):
|
||||
"""
|
||||
<Purpose>
|
||||
|
||||
>>>
|
||||
>>>
|
||||
>>>
|
||||
|
||||
<Arguments>
|
||||
list_of_directory_paths:
|
||||
|
||||
<Exceptions>
|
||||
tuf.Error, if a directory path in 'list_of_directory_paths' is not a
|
||||
directory, or not under the repository's targets directory.
|
||||
|
||||
<Side Effects>
|
||||
None.
|
||||
|
||||
<Returns>
|
||||
None.
|
||||
"""
|
||||
|
||||
# Does 'filepath' have the correct format?
|
||||
# Ensure the arguments have the appropriate number of objects and object
|
||||
# types, and that all dict keys are properly named.
|
||||
# Raise 'tuf.FormatError' if there is a mismatch.
|
||||
tuf.formats.PATHS_SCHEMA.check_match(list_of_directory_paths)
|
||||
|
||||
directory_paths = []
|
||||
|
||||
for directory_path in list_of_directory_paths:
|
||||
directory_path = os.path.abspath(directory_path)
|
||||
if not os.path.isdir(directory_path):
|
||||
message = repr(directory_path)+ ' is not a directory.'
|
||||
raise tuf.Error(message)
|
||||
|
||||
targets_directory = os.path.join(self._targets_directory, '')
|
||||
directory_path = os.path.join(directory_path, '')
|
||||
if not directory_path.startswith(targets_directory):
|
||||
message = repr(directory_path)+' is not under the Repository\'s '+\
|
||||
'targets directory: '+repr(self._targets_directory)
|
||||
raise tuf.Error(message)
|
||||
|
||||
directory_paths.append(directory_path[len(self._targets_directory):])
|
||||
|
||||
roleinfo = tuf.roledb.get_roleinfo(self._rolename)
|
||||
for directory_path in directory_paths:
|
||||
if directory_path not in roleinfo['paths']:
|
||||
roleinfo['paths'].append(directory_path)
|
||||
tuf.roledb.update_roleinfo(self._rolename, roleinfo)
|
||||
|
||||
|
||||
|
||||
def add_target(self, filepath):
|
||||
"""
|
||||
<Purpose>
|
||||
|
|
@ -1841,7 +1894,7 @@ def delegate(self, rolename, public_keys, list_of_targets,
|
|||
|
||||
list_of_targets:
|
||||
A list of target filepaths that are added to the paths of 'rolename'.
|
||||
'targets' is a list of target filepaths, and can be empty.
|
||||
'list_of_targets' is a list of target filepaths, and can be empty.
|
||||
|
||||
threshold:
|
||||
The threshold number of keys of 'rolename'.
|
||||
|
|
@ -1851,9 +1904,9 @@ def delegate(self, rolename, public_keys, list_of_targets,
|
|||
files added to 'rolename' must fall under one of the restricted paths.
|
||||
|
||||
path_hash_prefixes:
|
||||
A list of hash prefixes in PATH_HASH_PREFIXES_SCHEMA format, used in
|
||||
hashed bin delegations. Targets may be located and stored in hashed
|
||||
bins by calculating the target path's hash prefix.
|
||||
A list of hash prefixes in 'tuf.formats.PATH_HASH_PREFIXES_SCHEMA'
|
||||
format, used in hashed bin delegations. Targets may be located and
|
||||
stored in hashed bins by calculating the target path's hash prefix.
|
||||
|
||||
<Exceptions>
|
||||
tuf.FormatError, if any of the arguments are improperly formatted.
|
||||
|
|
@ -1931,15 +1984,16 @@ def delegate(self, rolename, public_keys, list_of_targets,
|
|||
relative_restricted_paths = []
|
||||
|
||||
if restricted_paths is not None:
|
||||
for target in restricted_paths:
|
||||
target = os.path.abspath(target)
|
||||
if not os.path.commonprefix([self._targets_directory, target]) == \
|
||||
for path in restricted_paths:
|
||||
path = os.path.abspath(path)
|
||||
if not os.path.commonprefix([self._targets_directory, path]) == \
|
||||
self._targets_directory:
|
||||
message = repr(target)+' is not under the Repository\'s targets '+\
|
||||
message = repr(path)+' is not under the Repository\'s targets '+\
|
||||
'directory: '+repr(self._targets_directory)
|
||||
raise tuf.Error(message)
|
||||
|
||||
relative_restricted_paths.append(target[targets_directory_length+1:])
|
||||
|
||||
path = os.path.join(path, '')
|
||||
relative_restricted_paths.append(path[targets_directory_length+1:])
|
||||
|
||||
# Create a new Targets object for the 'rolename' delegation. An initial
|
||||
# expiration is set (3 months from the current time).
|
||||
|
|
@ -1968,6 +2022,9 @@ def delegate(self, rolename, public_keys, list_of_targets,
|
|||
roleinfo['paths'] = relative_restricted_paths
|
||||
if path_hash_prefixes is not None:
|
||||
roleinfo['path_hash_prefixes'] = path_hash_prefixes
|
||||
# A role in a delegations must list either 'path_hash_prefixes'
|
||||
# or 'paths'.
|
||||
del roleinfo['paths']
|
||||
|
||||
current_roleinfo['delegations']['roles'].append(roleinfo)
|
||||
tuf.roledb.update_roleinfo(self.rolename, current_roleinfo)
|
||||
|
|
@ -1977,7 +2034,7 @@ def delegate(self, rolename, public_keys, list_of_targets,
|
|||
new_targets_object.add_key(key)
|
||||
|
||||
# Add the new delegation to this Targets object. For example, 'django' is
|
||||
# added to 'repository.targets' (i.e., repository.targets('django').
|
||||
# added to 'repository.targets' (i.e., repository.targets('django')).
|
||||
self._delegated_roles[rolename] = new_targets_object
|
||||
|
||||
|
||||
|
|
@ -2084,7 +2141,7 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins,
|
|||
None.
|
||||
"""
|
||||
|
||||
# Does 'rolename' have the correct format?
|
||||
# Do the arguments have the correct format?
|
||||
# Ensure the arguments have the appropriate number of objects and object
|
||||
# types, and that all dict keys are properly named.
|
||||
# Raise 'tuf.FormatError' if there is a mismatch.
|
||||
|
|
@ -2102,18 +2159,19 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins,
|
|||
message = 'The number of bins argument must be a multiple of 16.'
|
||||
raise tuf.FormatError(message)
|
||||
|
||||
logger.info('There are '+len(list_of_targets)+' total targets.')
|
||||
logger.info('There are '+str(len(list_of_targets))+' total targets.')
|
||||
|
||||
# Store the target paths that fall into each bin.
|
||||
target_paths_in_bin = {}
|
||||
for bin_index in xrange(max_number_of_bins):
|
||||
target_paths_in_bin[bin_index] = []
|
||||
|
||||
|
||||
# Assign every path to its bin. Ensure every target is located under the
|
||||
# repository's targets directory.
|
||||
for target_path in list_of_targets:
|
||||
target_path = os.path.abspath(target_path)
|
||||
if not target_path.startswith(self._targets_directory+'/'):
|
||||
message = 'A path in the list of targets arguments is not '+\
|
||||
message = 'A path in the list of targets argument is not '+\
|
||||
'under the repository\'s targets directory: '+repr(target_path)
|
||||
raise tuf.FormatError(message)
|
||||
|
||||
|
|
@ -2123,7 +2181,7 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins,
|
|||
relative_path = target_path[len(self._targets_directory)+1:]
|
||||
digest_object = tuf.hash.digest(algorithm=HASH_FUNCTION)
|
||||
digest_object.update(relative_path)
|
||||
relative_path_hash = digest.hexdigest()
|
||||
relative_path_hash = digest_object.hexdigest()
|
||||
relative_path_hash_prefix = relative_path_hash[:prefix_length]
|
||||
|
||||
# 'target_paths_in_bin' store bin indices in base-10, so convert the
|
||||
|
|
@ -2132,8 +2190,7 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins,
|
|||
bin_index = int(relative_path_hash_prefix, 16)
|
||||
|
||||
# Add the 'target_path' (absolute) to the bin.
|
||||
target_paths_in_bin[bin_index] = \
|
||||
target_paths_in_bin[bin_index].append(target_path)
|
||||
target_paths_in_bin[bin_index].append(target_path)
|
||||
|
||||
# Calculate the path hash prefixes of each bin_offset stored in the parent
|
||||
# role. For example: 'targets/unclaimed/004' may list the path hash
|
||||
|
|
@ -2146,26 +2203,33 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins,
|
|||
# are listed in 'path_hash_prefixes' of 'outer_bin_index.
|
||||
for outer_bin_index in xrange(0, max_number_of_bins, bin_offset):
|
||||
# The bin index in hex padded from the left with zeroes for up to the
|
||||
# 'prefix_lengthn'.
|
||||
bin_rolename = hex(outer_bin_index)[2:].zfill(prefix_length)
|
||||
|
||||
# 'prefix_length'.
|
||||
# 'targets/unclaimed/000-003'
|
||||
start_bin = hex(outer_bin_index)[2:].zfill(prefix_length)
|
||||
end_bin = hex(outer_bin_index+bin_offset-1)[2:].zfill(prefix_length)
|
||||
if start_bin == end_bin:
|
||||
bin_rolename = start_bin
|
||||
else:
|
||||
bin_rolename = start_bin + '-' + end_bin
|
||||
|
||||
# The hash prefixes of the skipped bin roles, or the roles not directly
|
||||
# delegated from the parent role.
|
||||
path_hash_prefixes = []
|
||||
bin_rolename_targets = []
|
||||
|
||||
for inner_bin_index in xrange(outer_bin_index, outer_bin_index+bin_offset):
|
||||
# 'inner_bin_rolename' in padded hex. For example, "00b".
|
||||
inner_bin_rolename = hex(inner_bin_index)[2:].zfill(prefix_length)
|
||||
path_hash_prefixes.append(inner_bin_rolename)
|
||||
bin_rolename_targets.extend(target_paths_in_bin[inner_bin_index])
|
||||
|
||||
# Delegate from the "unclaimed" targets role to each 'bin_rolename'
|
||||
# (i.e., outer_bin_index).
|
||||
bin_rolename_targets = target_paths_in_bin[outer_bin_index]
|
||||
self.delegate(bin_rolename, keys_of_hashed_bins,
|
||||
list_of_targets=bin_rolename_targets,
|
||||
path_hash_prefixes=path_hash_prefixes)
|
||||
|
||||
message = 'Delegated from '+repr(self.rolename)+' to '+repr(binned_rolename)
|
||||
message = 'Delegated from '+repr(self.rolename)+' to '+repr(bin_rolename)
|
||||
logger.debug(message)
|
||||
|
||||
|
||||
|
|
@ -2191,12 +2255,10 @@ def delegations(self):
|
|||
None.
|
||||
|
||||
<Returns>
|
||||
A dictionary containing the rolenames (as dict keys) and role Targets
|
||||
objects of this Targets' delegations.
|
||||
Example: {'targets/unclaimed-role/django': Targets(), ...}
|
||||
A list containing the Targets objects of this Targets' delegations.
|
||||
"""
|
||||
|
||||
return self._delegated_roles
|
||||
return self._delegated_roles.values()
|
||||
|
||||
|
||||
|
||||
|
|
@ -3752,8 +3814,8 @@ def generate_targets_metadata(targets_directory, target_files, version,
|
|||
digest_target = os.path.join(dirname, digest_filename)
|
||||
|
||||
if not os.path.exists(digest_target):
|
||||
logger.warn('Copying target file to ' + repr(digest_target))
|
||||
shutil.copy(target_path, digest_target)
|
||||
logger.warn('Hard linking target file to ' + repr(digest_target))
|
||||
os.link(target_path, digest_target)
|
||||
|
||||
# Generate the targets metadata object.
|
||||
targets_metadata = tuf.formats.TargetsFile.make_metadata(version,
|
||||
|
|
@ -4148,8 +4210,8 @@ def write_metadata_file(metadata, filename, compressions, consistent_snapshots):
|
|||
file_object.move(written_filename)
|
||||
|
||||
for consistent_filename in consistent_filenames:
|
||||
logger.info('Saving ' + repr(consistent_filename))
|
||||
shutil.copy(written_filename, consistent_filename)
|
||||
logger.info('Linking ' + repr(consistent_filename))
|
||||
os.link(written_filename, consistent_filename)
|
||||
|
||||
|
||||
# Generate the compressed versions of 'metadata', if necessary. A compressed
|
||||
|
|
@ -4241,8 +4303,8 @@ def _write_compressed_metadata(file_object, compressed_filename,
|
|||
# Save any remaining compressed consistent snapshots.
|
||||
for consistent_filename in consistent_filenames:
|
||||
if not os.path.exists(consistent_filename):
|
||||
logger.info('Saving ' + repr(consistent_filename))
|
||||
shutil.copy(compressed_filename, consistent_filename)
|
||||
logger.info('Linking ' + repr(consistent_filename))
|
||||
os.link(compressed_filename, consistent_filename)
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue