From 96f6152fbfabb767b1bb8642a14345bd3c5edbbf Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Wed, 22 Jan 2014 12:52:55 -0500 Subject: [PATCH] Update issue #137. Modify format of hash bin delegations. Test implementation. --- tuf/libtuf.py | 128 +++++++++++++++++++++++++++++++++++++------------- 1 file changed, 95 insertions(+), 33 deletions(-) mode change 100755 => 100644 tuf/libtuf.py diff --git a/tuf/libtuf.py b/tuf/libtuf.py old mode 100755 new mode 100644 index 1dee636a..032c7184 --- a/tuf/libtuf.py +++ b/tuf/libtuf.py @@ -1565,6 +1565,59 @@ def target_files(self): + def add_directory_paths(self, list_of_directory_paths): + """ + + + >>> + >>> + >>> + + + list_of_directory_paths: + + + tuf.Error, if a directory path in 'list_of_directory_paths' is not a + directory, or not under the repository's targets directory. + + + None. + + + None. + """ + + # Does 'filepath' have the correct format? + # Ensure the arguments have the appropriate number of objects and object + # types, and that all dict keys are properly named. + # Raise 'tuf.FormatError' if there is a mismatch. + tuf.formats.PATHS_SCHEMA.check_match(list_of_directory_paths) + + directory_paths = [] + + for directory_path in list_of_directory_paths: + directory_path = os.path.abspath(directory_path) + if not os.path.isdir(directory_path): + message = repr(directory_path)+ ' is not a directory.' + raise tuf.Error(message) + + targets_directory = os.path.join(self._targets_directory, '') + directory_path = os.path.join(directory_path, '') + if not directory_path.startswith(targets_directory): + message = repr(directory_path)+' is not under the Repository\'s '+\ + 'targets directory: '+repr(self._targets_directory) + raise tuf.Error(message) + + directory_paths.append(directory_path[len(self._targets_directory):]) + + roleinfo = tuf.roledb.get_roleinfo(self._rolename) + for directory_path in directory_paths: + if directory_path not in roleinfo['paths']: + roleinfo['paths'].append(directory_path) + tuf.roledb.update_roleinfo(self._rolename, roleinfo) + + + def add_target(self, filepath): """ @@ -1841,7 +1894,7 @@ def delegate(self, rolename, public_keys, list_of_targets, list_of_targets: A list of target filepaths that are added to the paths of 'rolename'. - 'targets' is a list of target filepaths, and can be empty. + 'list_of_targets' is a list of target filepaths, and can be empty. threshold: The threshold number of keys of 'rolename'. @@ -1851,9 +1904,9 @@ def delegate(self, rolename, public_keys, list_of_targets, files added to 'rolename' must fall under one of the restricted paths. path_hash_prefixes: - A list of hash prefixes in PATH_HASH_PREFIXES_SCHEMA format, used in - hashed bin delegations. Targets may be located and stored in hashed - bins by calculating the target path's hash prefix. + A list of hash prefixes in 'tuf.formats.PATH_HASH_PREFIXES_SCHEMA' + format, used in hashed bin delegations. Targets may be located and + stored in hashed bins by calculating the target path's hash prefix. tuf.FormatError, if any of the arguments are improperly formatted. @@ -1931,15 +1984,16 @@ def delegate(self, rolename, public_keys, list_of_targets, relative_restricted_paths = [] if restricted_paths is not None: - for target in restricted_paths: - target = os.path.abspath(target) - if not os.path.commonprefix([self._targets_directory, target]) == \ + for path in restricted_paths: + path = os.path.abspath(path) + if not os.path.commonprefix([self._targets_directory, path]) == \ self._targets_directory: - message = repr(target)+' is not under the Repository\'s targets '+\ + message = repr(path)+' is not under the Repository\'s targets '+\ 'directory: '+repr(self._targets_directory) raise tuf.Error(message) - - relative_restricted_paths.append(target[targets_directory_length+1:]) + + path = os.path.join(path, '') + relative_restricted_paths.append(path[targets_directory_length+1:]) # Create a new Targets object for the 'rolename' delegation. An initial # expiration is set (3 months from the current time). @@ -1968,6 +2022,9 @@ def delegate(self, rolename, public_keys, list_of_targets, roleinfo['paths'] = relative_restricted_paths if path_hash_prefixes is not None: roleinfo['path_hash_prefixes'] = path_hash_prefixes + # A role in a delegations must list either 'path_hash_prefixes' + # or 'paths'. + del roleinfo['paths'] current_roleinfo['delegations']['roles'].append(roleinfo) tuf.roledb.update_roleinfo(self.rolename, current_roleinfo) @@ -1977,7 +2034,7 @@ def delegate(self, rolename, public_keys, list_of_targets, new_targets_object.add_key(key) # Add the new delegation to this Targets object. For example, 'django' is - # added to 'repository.targets' (i.e., repository.targets('django'). + # added to 'repository.targets' (i.e., repository.targets('django')). self._delegated_roles[rolename] = new_targets_object @@ -2084,7 +2141,7 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins, None. """ - # Does 'rolename' have the correct format? + # Do the arguments have the correct format? # Ensure the arguments have the appropriate number of objects and object # types, and that all dict keys are properly named. # Raise 'tuf.FormatError' if there is a mismatch. @@ -2102,18 +2159,19 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins, message = 'The number of bins argument must be a multiple of 16.' raise tuf.FormatError(message) - logger.info('There are '+len(list_of_targets)+' total targets.') + logger.info('There are '+str(len(list_of_targets))+' total targets.') # Store the target paths that fall into each bin. target_paths_in_bin = {} for bin_index in xrange(max_number_of_bins): target_paths_in_bin[bin_index] = [] - + # Assign every path to its bin. Ensure every target is located under the # repository's targets directory. for target_path in list_of_targets: + target_path = os.path.abspath(target_path) if not target_path.startswith(self._targets_directory+'/'): - message = 'A path in the list of targets arguments is not '+\ + message = 'A path in the list of targets argument is not '+\ 'under the repository\'s targets directory: '+repr(target_path) raise tuf.FormatError(message) @@ -2123,7 +2181,7 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins, relative_path = target_path[len(self._targets_directory)+1:] digest_object = tuf.hash.digest(algorithm=HASH_FUNCTION) digest_object.update(relative_path) - relative_path_hash = digest.hexdigest() + relative_path_hash = digest_object.hexdigest() relative_path_hash_prefix = relative_path_hash[:prefix_length] # 'target_paths_in_bin' store bin indices in base-10, so convert the @@ -2132,8 +2190,7 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins, bin_index = int(relative_path_hash_prefix, 16) # Add the 'target_path' (absolute) to the bin. - target_paths_in_bin[bin_index] = \ - target_paths_in_bin[bin_index].append(target_path) + target_paths_in_bin[bin_index].append(target_path) # Calculate the path hash prefixes of each bin_offset stored in the parent # role. For example: 'targets/unclaimed/004' may list the path hash @@ -2146,26 +2203,33 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins, # are listed in 'path_hash_prefixes' of 'outer_bin_index. for outer_bin_index in xrange(0, max_number_of_bins, bin_offset): # The bin index in hex padded from the left with zeroes for up to the - # 'prefix_lengthn'. - bin_rolename = hex(outer_bin_index)[2:].zfill(prefix_length) - + # 'prefix_length'. + # 'targets/unclaimed/000-003' + start_bin = hex(outer_bin_index)[2:].zfill(prefix_length) + end_bin = hex(outer_bin_index+bin_offset-1)[2:].zfill(prefix_length) + if start_bin == end_bin: + bin_rolename = start_bin + else: + bin_rolename = start_bin + '-' + end_bin + # The hash prefixes of the skipped bin roles, or the roles not directly # delegated from the parent role. path_hash_prefixes = [] + bin_rolename_targets = [] for inner_bin_index in xrange(outer_bin_index, outer_bin_index+bin_offset): # 'inner_bin_rolename' in padded hex. For example, "00b". inner_bin_rolename = hex(inner_bin_index)[2:].zfill(prefix_length) path_hash_prefixes.append(inner_bin_rolename) + bin_rolename_targets.extend(target_paths_in_bin[inner_bin_index]) # Delegate from the "unclaimed" targets role to each 'bin_rolename' # (i.e., outer_bin_index). - bin_rolename_targets = target_paths_in_bin[outer_bin_index] self.delegate(bin_rolename, keys_of_hashed_bins, list_of_targets=bin_rolename_targets, path_hash_prefixes=path_hash_prefixes) - message = 'Delegated from '+repr(self.rolename)+' to '+repr(binned_rolename) + message = 'Delegated from '+repr(self.rolename)+' to '+repr(bin_rolename) logger.debug(message) @@ -2191,12 +2255,10 @@ def delegations(self): None. - A dictionary containing the rolenames (as dict keys) and role Targets - objects of this Targets' delegations. - Example: {'targets/unclaimed-role/django': Targets(), ...} + A list containing the Targets objects of this Targets' delegations. """ - return self._delegated_roles + return self._delegated_roles.values() @@ -3752,8 +3814,8 @@ def generate_targets_metadata(targets_directory, target_files, version, digest_target = os.path.join(dirname, digest_filename) if not os.path.exists(digest_target): - logger.warn('Copying target file to ' + repr(digest_target)) - shutil.copy(target_path, digest_target) + logger.warn('Hard linking target file to ' + repr(digest_target)) + os.link(target_path, digest_target) # Generate the targets metadata object. targets_metadata = tuf.formats.TargetsFile.make_metadata(version, @@ -4148,8 +4210,8 @@ def write_metadata_file(metadata, filename, compressions, consistent_snapshots): file_object.move(written_filename) for consistent_filename in consistent_filenames: - logger.info('Saving ' + repr(consistent_filename)) - shutil.copy(written_filename, consistent_filename) + logger.info('Linking ' + repr(consistent_filename)) + os.link(written_filename, consistent_filename) # Generate the compressed versions of 'metadata', if necessary. A compressed @@ -4241,8 +4303,8 @@ def _write_compressed_metadata(file_object, compressed_filename, # Save any remaining compressed consistent snapshots. for consistent_filename in consistent_filenames: if not os.path.exists(consistent_filename): - logger.info('Saving ' + repr(consistent_filename)) - shutil.copy(compressed_filename, consistent_filename) + logger.info('Linking ' + repr(consistent_filename)) + os.link(compressed_filename, consistent_filename)