updater: simplify Updater.download_target() call stack

The call stack and code for download_target() is more complex than
required:
* download_target() : builds target destination filepath, gets length
  and hashes
* _get_target_file() : fixes filenames if consistent snapshots enabled,
  defines verification callback
* _get_file() : iterates mirrors, tries to download files, verifies them

Remove the verification callback and collapse the call stack by a single
level to make the code easier to follow.

Signed-off-by: Joshua Lock <jlock@vmware.com>
This commit is contained in:
Joshua Lock 2020-11-05 10:45:06 +00:00
parent b3ada5bb0b
commit ad1335b6ed
2 changed files with 34 additions and 124 deletions

View file

@ -1616,13 +1616,13 @@ def test_9__get_target_hash(self):
def test_10__hard_check_file_length(self):
def test_10__check_file_length(self):
# Test for exception if file object is not equal to trusted file length.
with tempfile.TemporaryFile() as temp_file_object:
temp_file_object.write(b'X')
temp_file_object.seek(0)
self.assertRaises(tuf.exceptions.DownloadLengthMismatchError,
self.repository_updater._hard_check_file_length,
self.repository_updater._check_file_length,
temp_file_object, 10)
@ -1744,26 +1744,6 @@ def test_11__verify_metadata_file(self):
metadata_file_object, 'root')
def test_12__get_file(self):
# Test for an "unsafe" download, where the file is downloaded up to
# a required length (and no more). The "safe" download approach
# downloads an exact required length.
targets_path = os.path.join(self.repository_directory, 'metadata', 'targets.json')
file_size, file_hashes = securesystemslib.util.get_file_details(targets_path)
file_type = 'meta'
def verify_target_file(targets_path):
# Every target file must have its length and hashes inspected.
self.repository_updater._hard_check_file_length(targets_path, file_size)
self.repository_updater._check_hashes(targets_path, file_hashes)
self.repository_updater._get_file('targets.json', verify_target_file,
file_type, file_size, download_safely=True).close()
self.repository_updater._get_file('targets.json', verify_target_file,
file_type, file_size, download_safely=False).close()
def test_13__targets_of_role(self):
# Test case where a list of targets is given. By default, the 'targets'
# parameter is None.

View file

@ -1217,7 +1217,7 @@ def _check_hashes(self, file_object, trusted_hashes):
def _hard_check_file_length(self, file_object, trusted_file_length):
def _check_file_length(self, file_object, trusted_file_length):
"""
<Purpose>
Non-public method that ensures the length of 'file_object' is strictly
@ -1304,15 +1304,6 @@ def _get_target_file(self, target_filepath, file_length, file_hashes,
A file object containing the target.
"""
# Define a callable function that is passed as an argument to _get_file()
# and called. The 'verify_target_file' function ensures the file length
# and hashes of 'target_filepath' are strictly equal to the trusted values.
def verify_target_file(target_file_object):
# Every target file must have its length and hashes inspected.
self._hard_check_file_length(target_file_object, file_length)
self._check_hashes(target_file_object, file_hashes)
if self.consistent_snapshot and prefix_filename_with_hash:
# Note: values() does not return a list in Python 3. Use list()
# on values() for Python 2+3 compatibility.
@ -1320,8 +1311,37 @@ def verify_target_file(target_file_object):
dirname, basename = os.path.split(target_filepath)
target_filepath = os.path.join(dirname, target_digest + '.' + basename)
return self._get_file(target_filepath, verify_target_file,
'target', file_length, download_safely=True)
file_mirrors = tuf.mirrors.get_list_of_mirrors('target', target_filepath,
self.mirrors)
# file_mirror (URL): error (Exception)
file_mirror_errors = {}
file_object = None
for file_mirror in file_mirrors:
try:
file_object = tuf.download.safe_download(file_mirror, file_length)
# Verify 'file_object' against the expected length and hashes.
self._check_file_length(file_object, file_length)
self._check_hashes(file_object, file_hashes)
except Exception as exception:
# Remember the error from this mirror, and "reset" the target file.
logger.debug('Update failed from ' + file_mirror + '.')
file_mirror_errors[file_mirror] = exception
file_object = None
else:
break
if file_object:
return file_object
else:
logger.debug('Failed to update ' + repr(target_filepath) + ' from'
' all mirrors: ' + repr(file_mirror_errors))
raise tuf.exceptions.NoWorkingMirrorError(file_mirror_errors)
@ -1605,96 +1625,6 @@ def _get_metadata_file(self, metadata_role, remote_filename,
def _get_file(self, filepath, verify_file_function, file_type, file_length,
download_safely=True):
"""
<Purpose>
Non-public method that tries downloading, up to a certain length, a
metadata or target file from a list of known mirrors. As soon as the first
valid copy of the file is found, the rest of the mirrors will be skipped.
<Arguments>
filepath:
The relative metadata or target filepath.
verify_file_function:
A callable function that expects a file object and raises an exception
if the file is invalid.
Target files and uncompressed versions of metadata may be verified with
'verify_file_function'.
file_type:
Type of data needed for download, must correspond to one of the strings
in the list ['meta', 'target']. 'meta' for metadata file type or
'target' for target file type. It should correspond to the
'securesystemslib.formats.NAME_SCHEMA' format.
file_length:
The expected length, or upper bound, of the target or metadata file to
be downloaded.
download_safely:
A boolean switch to toggle safe or unsafe download of the file.
<Exceptions>
tuf.exceptions.NoWorkingMirrorError:
The metadata could not be fetched. This is raised only when all known
mirrors failed to provide a valid copy of the desired metadata file.
<Side Effects>
The file is downloaded from all known repository mirrors in the worst
case. If a valid copy of the file is found, it is stored in a temporary
file and returned.
<Returns>
A file object containing the metadata or target.
"""
file_mirrors = tuf.mirrors.get_list_of_mirrors(file_type, filepath,
self.mirrors)
# file_mirror (URL): error (Exception)
file_mirror_errors = {}
file_object = None
for file_mirror in file_mirrors:
try:
# TODO: Instead of the more fragile 'download_safely' switch, unroll
# the function into two separate ones: one for "safe" download, and the
# other one for "unsafe" download? This should induce safer and more
# readable code.
if download_safely:
file_object = tuf.download.safe_download(file_mirror, file_length)
else:
file_object = tuf.download.unsafe_download(file_mirror, file_length)
# Verify 'file_object' according to the callable function.
# 'file_object' is also verified if decompressed above (i.e., the
# uncompressed version).
verify_file_function(file_object)
except Exception as exception:
# Remember the error from this mirror, and "reset" the target file.
logger.debug('Update failed from ' + file_mirror + '.')
file_mirror_errors[file_mirror] = exception
file_object = None
else:
break
if file_object:
return file_object
else:
logger.debug('Failed to update ' + repr(filepath) + ' from'
' all mirrors: ' + repr(file_mirror_errors))
raise tuf.exceptions.NoWorkingMirrorError(file_mirror_errors)
def _update_metadata(self, metadata_role, upperbound_filelength, version=None):
"""
<Purpose>