Implement MultiRepoUpdater

TODO: Unit testing still remains
This commit is contained in:
Vladimir Diaz 2017-02-10 15:19:03 -05:00
parent 40aaf93f5a
commit bbe899b75c

View file

@ -141,6 +141,239 @@
iso8601_logger.disabled = True
class MultiRepoUpdater(object):
"""
<Purpose>
Provide a way for clients to request a target file from multiple
repositories. Which repositories to query is determined by a map
file (i.e,. map.json).
See TAP 4 for more information on the map file and requesting updates from
multiple repositories.
<Arguments>
map_file:
The path of the map file. The map file is needed to determine which
repositories to query given a target file.
<Exceptions>
tuf.exceptions.Error, if the map file cannot be loaded.
<Side Effects>
None.
<Returns>
None.
"""
def __init__(self, map_file):
# Does 'map_file' have the correct format?
# Raise 'securesystemslib.exceptions.FormatError' if there is a mismatch.
securesystemslib.formats.PATH_SCHEMA.check_match(map_file)
# The map file dictionary that associates targets with repositories.
self.map_file = {}
# A dictionary mapping repositories to TUF updaters.
self.repository_names_to_updaters = {}
try:
self.map_file = securesystemslib.util.load_json_file(map_file)
except (securesystemslib.exceptions.Error) as e:
raise tuf.exceptions.Error('Cannot load the map file: ' + str(e))
def get_one_valid_targetinfo(target_filename):
"""
<Purpose>
Return the targetinfo, if any, for the given 'target_filename'.
<Arguments>
target_filename:
The relative path of the target file to update.
<Exceptions>
tuf.FormatError, if the argument is improperly formatted.
<Side Effects>
None.
<Returns>
The targetinfo (conformant with tuf.formats.TARGETINFO_SCHEMA) for
'target_filename', if available. Return None if no targetinfo is
available.
"""
# {"repository_name": [mirror URLs, ...], ...}
repository_names_to_mirrors = self.map_file['repositories']
repositories_directory = tuf.settings.repositories_directory
for repository_name in repository_names_to_mirrors:
# Each repository must cache its metadata in a separate location.
repository_directory = os.path.join(repositories_directory, repository_name)
if not os.path.isdir(repository_directory):
raise tuf.exceptions.Error('The metadata directory'
' for ' + repr(repository_name) + ' must exist at ' + repr(repository_directory))
else:
logger.debug('Found local directory for ' + repr(repository_name))
# The latest known root metadata file must already be on disk.
root_file = os.path.isfile(os.path.join(repository_directory,
'current', 'root.json'))
if not os.path.isfile(root_file):
raise tuf.exceptions.Error('The Root file must exist at ' + repr(root_file))
else:
logger.debug('Found Root file at ' + repr(root_file))
# Iterate over mappings.
# [{"paths": [], "repositories": [], "terminating": Boolean}, ...]
for mapping in self.map_file['mappings']:
# If this mapping is relevant to the target...
if paths_match_target(mapping['paths'], target_filename):
targetinfos = []
# Use the *unmodified* TUF updater for a single repository to fetch the
# targetinfo from each repository.
for repository_name in mapping['repositories']:
targetinfo = update_from_repository(repository_name,
repository_names_to_mirrors, target_filename)
targetinfos.append(targetinfo)
# If the targetinfo on each repository is equal to the others, and it
# is not empty, then return the targetinfo.
if targets_are_equal_and_not_empty(targetinfos):
return targetinfo
else:
continue
# If we are here, it means either the mapping is irrelevant to the
# target, or the targets were missing from all repositories in this
# mapping, or the targets on all repositories did not match. In that
# case, are we allowed to continue to the next mapping? Let's check
# the terminating entry.
if mapping['terminating']:
return None
# If we are here, it means either there were no mappings, or none of the
# mappings provided the target.
return None
def paths_match_target(paths, target_filename):
for path in paths:
if fnmatch.fnmatch(path, target_filename):
return True
else:
continue
# If we are here, then none of the paths are relevant to the target.
return False
def get_updater(repository_name, repository_names_to_mirrors):
# NOTE: Do not refresh metadata for a repository that has been visited.
updater = self.repository_names_to_updaters.get(repository_name)
if not updater:
# Unimportant: some massaging for unmodified TUF client.
mirrors = {}
for url in repository_names_to_mirrors[repository_name]:
mirrors[url] = {
'url_prefix': url,
'metadata_path': 'metadata',
'targets_path': 'targets',
'confined_target_dirs': ['']}
# NOTE: State (e.g., keys) should NOT be shared across different updater
# instances.
updater = tuf.client.updater.Updater(repository_name, mirrors)
try:
updater.refresh()
except:
return None
else:
self.repository_names_to_updaters[repository_name] = updater
return updater
def _update_from_repository(dirname, dirname_to_mirrors, target_filename):
# Set the repository directory containing the metadata.
tuf.settings.repository_directory = dirname
updater = get_updater(dirname, dirname_to_mirrors)
try:
return updater.get_one_valid_targetinfo(target_filename)
except:
return None
def _targets_are_equal_and_not_empty(targetinfos):
"""
If not empty, check only that length and hashes are equal; ignore custom
targets metadata.
"""
# Target is empty.
if len(targetinfos) == 0:
return False
else:
prev_targetinfo = targetinfos[0]
# Target is empty.
if not prev_targetinfo: return False
else:
for curr_targetinfo in targetinfos[1:]:
# Target is empty.
if not curr_targetinfo: return False
else:
prev_length = prev_targetinfo['length']
curr_length = curr_targetinfo['length']
if prev_length != curr_length: return False
prev_hashes = prev_targetinfo['hashes']
curr_hashes = curr_targetinfo['hashes']
if prev_hashes.keys() != curr_hashes.keys(): return False
for function, prev_digest in prev_hashes.items():
if prev_digest != curr_hashes[function]: return False
prev_targetinfo = curr_targetinfo
# If we are here, then all the targets are equal.
return True
class Updater(object):
"""
<Purpose>
@ -2541,8 +2774,8 @@ def _visit_child_role(self, child_role, target_filepath, parent_delegations):
# 'role_name' should have been validated when it was downloaded.
# The 'paths' or 'path_hash_prefixes' fields should not be missing,
# so we raise a format error here in case they are both missing.
raise securesystemslib.exceptions.FormatError(repr(child_role_name) + ' has neither '
'"paths" nor "path_hash_prefixes".')
raise securesystemslib.exceptions.FormatError(repr(child_role_name) + ' has'
' neither "paths" nor "path_hash_prefixes".')
if child_role_is_relevant:
# Is the child role allowed by its parent role to specify this path