mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Remove compression from updater.py and its unit test
This commit is contained in:
parent
a602d449f3
commit
bd754d595a
2 changed files with 20 additions and 125 deletions
|
|
@ -688,18 +688,16 @@ def test_3__update_metadata(self):
|
|||
# version is installed if the compressed one is downloaded.
|
||||
self.assertFalse('targets' in self.repository_updater.metadata['current'])
|
||||
self.repository_updater._update_metadata('targets',
|
||||
DEFAULT_TARGETS_FILELENGTH,
|
||||
targets_versioninfo['version'],
|
||||
'gzip')
|
||||
DEFAULT_TARGETS_FILELENGTH, targets_versioninfo['version'])
|
||||
self.assertTrue('targets' in self.repository_updater.metadata['current'])
|
||||
self.assertEqual(targets_versioninfo['version'],
|
||||
self.repository_updater.metadata['current']['targets']['version'])
|
||||
self.repository_updater.metadata['current']['targets']['version'])
|
||||
|
||||
# Test: Invalid / untrusted version numbers.
|
||||
# Invalid version number for the uncompressed version of 'targets.json'.
|
||||
self.assertRaises(tuf.exceptions.NoWorkingMirrorError,
|
||||
self.repository_updater._update_metadata,
|
||||
'targets', DEFAULT_TARGETS_FILELENGTH, 88)
|
||||
self.repository_updater._update_metadata,
|
||||
'targets', DEFAULT_TARGETS_FILELENGTH, 88)
|
||||
|
||||
# Verify that the specific exception raised is correct for the previous
|
||||
# case.
|
||||
|
|
@ -711,19 +709,13 @@ def test_3__update_metadata(self):
|
|||
for mirror_error in six.itervalues(e.mirror_errors):
|
||||
assert isinstance(mirror_error, securesystemslib.exceptions.BadVersionNumberError)
|
||||
|
||||
# Invalid version number for the compressed version of 'targets.json'
|
||||
self.assertRaises(tuf.exceptions.NoWorkingMirrorError,
|
||||
self.repository_updater._update_metadata,
|
||||
'targets', DEFAULT_TARGETS_FILELENGTH, 88,
|
||||
'gzip')
|
||||
|
||||
# Verify that the specific exception raised is correct for the previous
|
||||
# case. The version number is checked, so the specific error in
|
||||
# this case should be 'securesystemslib.exceptions.BadVersionNumberError'.
|
||||
try:
|
||||
self.repository_updater._update_metadata('targets',
|
||||
DEFAULT_TARGETS_FILELENGTH,
|
||||
88, 'gzip')
|
||||
88)
|
||||
|
||||
except tuf.exceptions.NoWorkingMirrorError as e:
|
||||
for mirror_error in six.itervalues(e.mirror_errors):
|
||||
|
|
|
|||
|
|
@ -687,7 +687,7 @@ def refresh(self, unsafely_update_root_if_necessary=True):
|
|||
|
||||
|
||||
|
||||
def _update_root_metadata(self, current_root_metadata, compression_algorithm=None):
|
||||
def _update_root_metadata(self, current_root_metadata):
|
||||
"""
|
||||
<Purpose>
|
||||
The root file must be signed by the current root threshold and keys as
|
||||
|
|
@ -704,9 +704,6 @@ def _update_root_metadata(self, current_root_metadata, compression_algorithm=Non
|
|||
current_root_metadata:
|
||||
The currently held version of root.
|
||||
|
||||
compresison_algorithm:
|
||||
The compression algorithm used to compress remote metadata.
|
||||
|
||||
<Side Effects>
|
||||
Updates the root metadata files with the latest information.
|
||||
|
||||
|
|
@ -717,8 +714,7 @@ def _update_root_metadata(self, current_root_metadata, compression_algorithm=Non
|
|||
# Retrieve the latest, remote root.json.
|
||||
latest_root_metadata_file = \
|
||||
self._get_metadata_file('root', 'root.json',
|
||||
tuf.settings.DEFAULT_ROOT_REQUIRED_LENGTH, None,
|
||||
compression_algorithm=compression_algorithm)
|
||||
tuf.settings.DEFAULT_ROOT_REQUIRED_LENGTH, None)
|
||||
latest_root_metadata = \
|
||||
securesystemslib.util.load_json_string(latest_root_metadata_file.read().decode('utf-8'))
|
||||
|
||||
|
|
@ -736,8 +732,8 @@ def _update_root_metadata(self, current_root_metadata, compression_algorithm=Non
|
|||
# in the latest root.json after running through the intermediates with
|
||||
# _update_metadata().
|
||||
self.consistent_snapshot = True
|
||||
self._update_metadata('root', tuf.settings.DEFAULT_ROOT_REQUIRED_LENGTH, version=version,
|
||||
compression_algorithm=compression_algorithm)
|
||||
self._update_metadata('root', tuf.settings.DEFAULT_ROOT_REQUIRED_LENGTH,
|
||||
version=version)
|
||||
|
||||
|
||||
|
||||
|
|
@ -925,18 +921,13 @@ def verify_target_file(target_file_object):
|
|||
self._hard_check_file_length(target_file_object, file_length)
|
||||
self._check_hashes(target_file_object, file_hashes)
|
||||
|
||||
# Target files, unlike metadata files, are not decompressed; the
|
||||
# 'compression' argument to _get_file() is needed only for decompression of
|
||||
# metadata. Target files may be compressed or uncompressed.
|
||||
if self.consistent_snapshot:
|
||||
target_digest = random.choice(list(file_hashes.values()))
|
||||
dirname, basename = os.path.split(target_filepath)
|
||||
target_filepath = os.path.join(dirname, target_digest + '.' + basename)
|
||||
|
||||
return self._get_file(target_filepath, verify_target_file,
|
||||
'target', file_length, compression=None,
|
||||
verify_compressed_file_function=None,
|
||||
download_safely=True)
|
||||
'target', file_length, download_safely=True)
|
||||
|
||||
|
||||
|
||||
|
|
@ -1015,8 +1006,7 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object,
|
|||
|
||||
|
||||
def _get_metadata_file(self, metadata_role, remote_filename,
|
||||
upperbound_filelength, expected_version,
|
||||
compression_algorithm):
|
||||
upperbound_filelength, expected_version):
|
||||
"""
|
||||
<Purpose>
|
||||
Non-public method that tries downloading, up to a certain length, a
|
||||
|
|
@ -1039,10 +1029,6 @@ def _get_metadata_file(self, metadata_role, remote_filename,
|
|||
The expected and required version number of the 'metadata_role' file
|
||||
downloaded. 'expected_version' is an integer.
|
||||
|
||||
compression_algorithm:
|
||||
The name of the compression algorithm (e.g., 'gzip'). The algorithm is
|
||||
needed if the remote metadata file is compressed.
|
||||
|
||||
<Exceptions>
|
||||
tuf.exceptions.NoWorkingMirrorError:
|
||||
The metadata could not be fetched. This is raised only when all known
|
||||
|
|
@ -1068,13 +1054,6 @@ def _get_metadata_file(self, metadata_role, remote_filename,
|
|||
file_object = tuf.download.unsafe_download(file_mirror,
|
||||
upperbound_filelength)
|
||||
|
||||
if compression_algorithm is not None:
|
||||
logger.info('Decompressing ' + str(file_mirror))
|
||||
file_object.decompress_temp_file_object(compression_algorithm)
|
||||
|
||||
else:
|
||||
logger.info('Not decompressing ' + str(file_mirror))
|
||||
|
||||
# Verify 'file_object' according to the callable function.
|
||||
# 'file_object' is also verified if decompressed above (i.e., the
|
||||
# uncompressed version).
|
||||
|
|
@ -1153,8 +1132,7 @@ def _verify_root_chain_link(self, role, current, next):
|
|||
|
||||
|
||||
def _get_file(self, filepath, verify_file_function, file_type,
|
||||
file_length, compression=None,
|
||||
verify_compressed_file_function=None, download_safely=True):
|
||||
file_length, download_safely=True):
|
||||
"""
|
||||
<Purpose>
|
||||
Non-public method that tries downloading, up to a certain length, a
|
||||
|
|
@ -1181,15 +1159,6 @@ def _get_file(self, filepath, verify_file_function, file_type,
|
|||
The expected length, or upper bound, of the target or metadata file to
|
||||
be downloaded.
|
||||
|
||||
compression:
|
||||
The name of the compression algorithm (e.g., 'gzip'), if the metadata
|
||||
file is compressed.
|
||||
|
||||
verify_compressed_file_function:
|
||||
If compression is specified, in the case of metadata files, this
|
||||
callable function may be set to perform verification of the compressed
|
||||
version of the metadata file. Decompressed metadata is also verified.
|
||||
|
||||
download_safely:
|
||||
A boolean switch to toggle safe or unsafe download of the file.
|
||||
|
||||
|
|
@ -1216,9 +1185,10 @@ def _get_file(self, filepath, verify_file_function, file_type,
|
|||
|
||||
for file_mirror in file_mirrors:
|
||||
try:
|
||||
# TODO: Instead of the more fragile 'download_safely' switch, unroll the
|
||||
# function into two separate ones: one for "safe" download, and the other one
|
||||
# for "unsafe" download? This should induce safer and more readable code.
|
||||
# TODO: Instead of the more fragile 'download_safely' switch, unroll
|
||||
# the function into two separate ones: one for "safe" download, and the
|
||||
# other one for "unsafe" download? This should induce safer and more
|
||||
# readable code.
|
||||
if download_safely:
|
||||
file_object = tuf.download.safe_download(file_mirror,
|
||||
file_length)
|
||||
|
|
@ -1226,15 +1196,6 @@ def _get_file(self, filepath, verify_file_function, file_type,
|
|||
file_object = tuf.download.unsafe_download(file_mirror,
|
||||
file_length)
|
||||
|
||||
if compression is not None:
|
||||
if verify_compressed_file_function is not None:
|
||||
verify_compressed_file_function(file_object)
|
||||
logger.info('Decompressing ' + str(file_mirror))
|
||||
file_object.decompress_temp_file_object(compression)
|
||||
|
||||
else:
|
||||
logger.info('Not decompressing ' + str(file_mirror))
|
||||
|
||||
# Verify 'file_object' according to the callable function.
|
||||
# 'file_object' is also verified if decompressed above (i.e., the
|
||||
# uncompressed version).
|
||||
|
|
@ -1261,8 +1222,7 @@ def _get_file(self, filepath, verify_file_function, file_type,
|
|||
|
||||
|
||||
|
||||
def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
|
||||
compression_algorithm=None):
|
||||
def _update_metadata(self, metadata_role, upperbound_filelength, version=None):
|
||||
"""
|
||||
<Purpose>
|
||||
Non-public method that downloads, verifies, and 'installs' the metadata
|
||||
|
|
@ -1284,12 +1244,6 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
|
|||
The expected and required version number of the 'metadata_role' file
|
||||
downloaded. 'expected_version' is an integer.
|
||||
|
||||
compression_algorithm:
|
||||
A string designating the compression type of 'metadata_role'.
|
||||
The 'snapshot' metadata file may be optionally downloaded and stored in
|
||||
compressed form. Currently, only metadata files compressed with 'gzip'
|
||||
are considered. Any other string is ignored.
|
||||
|
||||
<Exceptions>
|
||||
tuf.exceptions.NoWorkingMirrorError:
|
||||
The metadata cannot be updated. This is not specific to a single
|
||||
|
|
@ -1309,11 +1263,6 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
|
|||
metadata_filename = metadata_role + '.json'
|
||||
uncompressed_metadata_filename = metadata_filename
|
||||
|
||||
# The 'snapshot' or Targets metadata may be compressed. Add the appropriate
|
||||
# extension to 'metadata_filename'.
|
||||
if compression_algorithm == 'gzip':
|
||||
metadata_filename = metadata_filename + '.gz'
|
||||
|
||||
# Attempt a file download from each mirror until the file is downloaded and
|
||||
# verified. If the signature of the downloaded file is valid, proceed,
|
||||
# otherwise log a warning and try the next mirror. 'metadata_file_object'
|
||||
|
|
@ -1329,10 +1278,6 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
|
|||
# for each other. In this case, we will download the metadata up to the
|
||||
# best length we can get for it, not request a specific version, but
|
||||
# perform the rest of the checks (e.g., signature verification).
|
||||
#
|
||||
# Note also that we presently support decompression of only "safe"
|
||||
# metadata, but this is easily extend to "unsafe" metadata as well as
|
||||
# "safe" targets.
|
||||
|
||||
remote_filename = metadata_filename
|
||||
filename_version = ''
|
||||
|
|
@ -1344,8 +1289,7 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
|
|||
|
||||
metadata_file_object = \
|
||||
self._get_metadata_file(metadata_role, remote_filename,
|
||||
upperbound_filelength, version,
|
||||
compression_algorithm)
|
||||
upperbound_filelength, version)
|
||||
|
||||
# The metadata has been verified. Move the metadata file into place.
|
||||
# First, move the 'current' metadata file to the 'previous' directory
|
||||
|
|
@ -1370,16 +1314,7 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
|
|||
metadata_signable = \
|
||||
securesystemslib.util.load_json_string(metadata_file_object.read().decode('utf-8'))
|
||||
|
||||
if compression_algorithm == 'gzip':
|
||||
current_uncompressed_filepath = \
|
||||
os.path.join(self.metadata_directory['current'],
|
||||
uncompressed_metadata_filename)
|
||||
current_uncompressed_filepath = \
|
||||
os.path.abspath(current_uncompressed_filepath)
|
||||
metadata_file_object.move(current_uncompressed_filepath)
|
||||
|
||||
else:
|
||||
metadata_file_object.move(current_filepath)
|
||||
metadata_file_object.move(current_filepath)
|
||||
|
||||
# Extract the metadata object so we can store it to the metadata store.
|
||||
# 'current_metadata_object' set to 'None' if there is not an object
|
||||
|
|
@ -1512,38 +1447,6 @@ def _update_metadata_if_changed(self, metadata_role,
|
|||
|
||||
logger.debug('Metadata ' + repr(uncompressed_metadata_filename) + ' has changed.')
|
||||
|
||||
# There might be a compressed version of 'snapshot.json' or Targets
|
||||
# metadata available for download. Check the 'meta' field of
|
||||
# 'referenced_metadata' to see if it is listed when 'metadata_role'
|
||||
# is 'snapshot'. The full rolename for delegated Targets metadata
|
||||
# must begin with 'targets/'. The snapshot role lists all the Targets
|
||||
# metadata available on the repository, including any that may be in
|
||||
# compressed form.
|
||||
#
|
||||
# In addition to validating the fileinfo (i.e., file lengths and hashes)
|
||||
# of the uncompressed metadata, the compressed version is also verified to
|
||||
# match its respective fileinfo. Verifying the compressed fileinfo ensures
|
||||
# untrusted data is not decompressed prior to verifying hashes, or
|
||||
# decompressing a file that may be invalid or partially intact.
|
||||
compression = None
|
||||
|
||||
# Check for the availability of compressed versions of 'snapshot.json',
|
||||
# 'targets.json', and delegated Targets (that also start with 'targets').
|
||||
# For 'targets.json' and delegated metadata, 'referenced_metata'
|
||||
# should always be 'snapshot'. 'snapshot.json' specifies all roles
|
||||
# provided by a repository, including their version numbers.
|
||||
if metadata_role == 'snapshot' or metadata_role.startswith('targets'):
|
||||
if 'gzip' in self.metadata['current']['root']['compression_algorithms']:
|
||||
compression = 'gzip'
|
||||
gzip_metadata_filename = uncompressed_metadata_filename + '.gz'
|
||||
logger.debug('Compressed version of ' +
|
||||
repr(uncompressed_metadata_filename) + ' is available at ' +
|
||||
repr(gzip_metadata_filename) + '.')
|
||||
|
||||
else:
|
||||
logger.debug('Compressed version of ' +
|
||||
repr(uncompressed_metadata_filename) + ' not available.')
|
||||
|
||||
# The file lengths of metadata are unknown, only their version numbers are
|
||||
# known. Set an upper limit for the length of the downloaded file for each
|
||||
# expected role. Note: The Timestamp role is not updated via this
|
||||
|
|
@ -1560,7 +1463,7 @@ def _update_metadata_if_changed(self, metadata_role,
|
|||
|
||||
try:
|
||||
self._update_metadata(metadata_role, upperbound_filelength,
|
||||
expected_versioninfo['version'], compression)
|
||||
expected_versioninfo['version'])
|
||||
|
||||
except:
|
||||
# The current metadata we have is not current but we couldn't get new
|
||||
|
|
|
|||
Loading…
Reference in a new issue