mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Merge branch 'develop' of https://github.com/theupdateframework/tuf into develop
Conflicts: tuf/__init__.py tuf/client/updater.py
This commit is contained in:
commit
e23b4e5eae
5 changed files with 66 additions and 45 deletions
|
|
@ -236,8 +236,12 @@ def test_with_tuf(self):
|
|||
# continue the update process. Sleep for at least 2 seconds to ensure
|
||||
# 'repository.timestamp.expiration' is reached.
|
||||
time.sleep(2)
|
||||
self.assertRaises(tuf.ExpiredMetadataError,
|
||||
self.repository_updater.refresh)
|
||||
try:
|
||||
self.repository_updater.refresh()
|
||||
|
||||
except tuf.NoWorkingMirrorError as e:
|
||||
for mirror_url, mirror_error in e.mirror_errors.iteritems():
|
||||
self.assertTrue(isinstance(mirror_error, tuf.ExpiredMetadataError))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
|
|
|
|||
|
|
@ -461,20 +461,21 @@ def test_2__delete_metadata(self):
|
|||
def test_2__ensure_not_expired(self):
|
||||
# This test condition will verify that nothing is raised when a metadata
|
||||
# file has a future expiration date.
|
||||
self.repository_updater._ensure_not_expired('root')
|
||||
root_metadata = self.repository_updater.metadata['current']['root']
|
||||
self.repository_updater._ensure_not_expired(root_metadata, 'root')
|
||||
|
||||
# 'tuf.ExpiredMetadataError' should be raised in this next test condition,
|
||||
# because the expiration_date has expired by 10 seconds.
|
||||
expires = tuf.formats.unix_timestamp_to_datetime(int(time.time() - 10))
|
||||
expires = expires.isoformat() + 'Z'
|
||||
self.repository_updater.metadata['current']['root']['expires'] = expires
|
||||
root_metadata['expires'] = expires
|
||||
|
||||
# Ensure the 'expires' value of the root file is valid by checking the
|
||||
# the formats of the 'root.json' object.
|
||||
root_object = self.repository_updater.metadata['current']['root']
|
||||
self.assertTrue(tuf.formats.ROOT_SCHEMA.matches(root_object))
|
||||
self.assertTrue(tuf.formats.ROOT_SCHEMA.matches(root_metadata))
|
||||
self.assertRaises(tuf.ExpiredMetadataError,
|
||||
self.repository_updater._ensure_not_expired, 'root')
|
||||
self.repository_updater._ensure_not_expired,
|
||||
root_metadata, 'root')
|
||||
|
||||
|
||||
|
||||
|
|
@ -658,6 +659,12 @@ def test_4_refresh(self):
|
|||
# This unit test is based on adding an extra target file to the
|
||||
# server and rebuilding all server-side metadata. All top-level metadata
|
||||
# should be updated when the client calls refresh().
|
||||
|
||||
# First verify that an expired root metadata is updated.
|
||||
expired_date = '1960-01-01T12:00:00Z'
|
||||
self.repository_updater.metadata['current']['root']['expires'] = expired_date
|
||||
self.repository_updater.refresh()
|
||||
|
||||
repository = repo_tool.load_repository(self.repository_directory)
|
||||
target3 = os.path.join(self.repository_directory, 'targets', 'file3.txt')
|
||||
|
||||
|
|
|
|||
|
|
@ -20,8 +20,12 @@
|
|||
provide that reason in those cases.
|
||||
"""
|
||||
|
||||
import logging
|
||||
|
||||
import tuf.log
|
||||
import tuf._vendor.six as six
|
||||
|
||||
logging = logging.getLogger('tuf.__init__')
|
||||
|
||||
# Import 'tuf.formats' if a module tries to import the
|
||||
# entire tuf package (i.e., from tuf import *).
|
||||
|
|
|
|||
|
|
@ -608,6 +608,31 @@ def refresh(self, unsafely_update_root_if_necessary=True):
|
|||
# is insufficient trusted signatures for the specified metadata.
|
||||
# Raise 'tuf.NoWorkingMirrorError' if an update fails.
|
||||
|
||||
# Is the Root role expired? When the top-level roles are initially loaded
|
||||
# from disk, their expiration is not checked to allow their updating when
|
||||
# requested (and give the updater the chance to continue, rather than always
|
||||
# failing with an expired metadata error.) If
|
||||
# 'unsafely_update_root_if_necessary' is True, update an expired Root role
|
||||
# now. Updating the other top-level roles, regardless of their validity,
|
||||
# should only occur if the root of trust is up-to-date.
|
||||
root_metadata = self.metadata['current']['root']
|
||||
try:
|
||||
self._ensure_not_expired(root_metadata, 'root')
|
||||
|
||||
except tuf.ExpiredMetadataError as e:
|
||||
# Raise 'tuf.NoWorkingMirrorError' if a valid (not expired, properly
|
||||
# signed, and valid metadata) 'root' cannot be installed.
|
||||
if unsafely_update_root_if_necessary:
|
||||
message = \
|
||||
'Expired Root metadata was loaded from disk. Try to update it now.'
|
||||
logger.info(message)
|
||||
self._update_metadata('root', DEFAULT_ROOT_FILEINFO)
|
||||
|
||||
# The caller explicitly requested not to unsafely fetch an expired Root.
|
||||
else:
|
||||
logger.info('An expired Root metadata was loaded and must be updated.')
|
||||
raise
|
||||
|
||||
# Use default but sane information for timestamp metadata, and do not
|
||||
# require strict checks on its required length.
|
||||
try:
|
||||
|
|
@ -629,13 +654,6 @@ def refresh(self, unsafely_update_root_if_necessary=True):
|
|||
else:
|
||||
raise
|
||||
|
||||
else:
|
||||
# Updated the top-level metadata (which all had valid signatures),
|
||||
# however, have they expired? Raise 'tuf.ExpiredMetadataError' if any of
|
||||
# the metadata has expired.
|
||||
for metadata_role in ['timestamp', 'root', 'snapshot', 'targets']:
|
||||
self._ensure_not_expired(metadata_role)
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -892,6 +910,9 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object,
|
|||
# 'tuf.FormatError' if not.
|
||||
tuf.formats.check_signable_object_format(metadata_signable)
|
||||
|
||||
# Is 'metadata_signable' expired?
|
||||
self._ensure_not_expired(metadata_signable['signed'], metadata_role)
|
||||
|
||||
# Is 'metadata_signable' newer than the currently installed
|
||||
# version?
|
||||
current_metadata_role = self.metadata['current'].get(metadata_role)
|
||||
|
|
@ -992,7 +1013,7 @@ def unsafely_verify_uncompressed_metadata_file(metadata_file_object):
|
|||
self._check_hashes(metadata_file_object, uncompressed_file_hashes)
|
||||
self._verify_uncompressed_metadata_file(metadata_file_object,
|
||||
metadata_role)
|
||||
|
||||
|
||||
def unsafely_verify_compressed_metadata_file(metadata_file_object):
|
||||
self._hard_check_file_length(metadata_file_object, compressed_file_length)
|
||||
self._check_hashes(metadata_file_object, compressed_file_hashes)
|
||||
|
|
@ -1071,7 +1092,7 @@ def safely_verify_uncompressed_metadata_file(metadata_file_object):
|
|||
uncompressed_file_length)
|
||||
self._check_hashes(metadata_file_object, uncompressed_file_hashes)
|
||||
self._verify_uncompressed_metadata_file(metadata_file_object,
|
||||
metadata_role)
|
||||
metadata_role)
|
||||
|
||||
def safely_verify_compressed_metadata_file(metadata_file_object):
|
||||
self._hard_check_file_length(metadata_file_object, compressed_file_length)
|
||||
|
|
@ -1356,7 +1377,7 @@ def _update_metadata(self, metadata_role, uncompressed_fileinfo,
|
|||
logger.debug('Updated '+repr(current_filepath)+'.')
|
||||
self.metadata['previous'][metadata_role] = current_metadata_object
|
||||
self.metadata['current'][metadata_role] = updated_metadata_object
|
||||
self._update_fileinfo(metadata_filename)
|
||||
self._update_fileinfo(uncompressed_metadata_filename)
|
||||
|
||||
# Ensure the role and key information of the top-level roles is also updated
|
||||
# according to the newly-installed Root metadata.
|
||||
|
|
@ -1726,20 +1747,24 @@ def _delete_metadata(self, metadata_role):
|
|||
|
||||
|
||||
|
||||
def _ensure_not_expired(self, metadata_role):
|
||||
def _ensure_not_expired(self, metadata_object, metadata_rolename):
|
||||
"""
|
||||
<Purpose>
|
||||
Non-public method that raises an exception if the current specified
|
||||
metadata has expired.
|
||||
|
||||
<Arguments>
|
||||
metadata_role:
|
||||
metadata_object:
|
||||
The metadata that should be expired, a 'tuf.formats.ANYROLE_SCHEMA'
|
||||
object.
|
||||
|
||||
metadata_rolename:
|
||||
The name of the metadata. This is a role name and should not end
|
||||
in '.json'. Examples: 'root', 'targets', 'targets/linux/x86'.
|
||||
|
||||
<Exceptions>
|
||||
tuf.ExpiredMetadataError:
|
||||
If 'metadata_role' has expired.
|
||||
If 'metadata_rolename' has expired.
|
||||
|
||||
<Side Effects>
|
||||
None.
|
||||
|
|
@ -1747,17 +1772,9 @@ def _ensure_not_expired(self, metadata_role):
|
|||
<Returns>
|
||||
None.
|
||||
"""
|
||||
|
||||
# Construct the full metadata filename and the location of its
|
||||
# current path. The current path of 'metadata_role' is needed
|
||||
# to log the exact filename of the expired metadata.
|
||||
metadata_filename = metadata_role + '.json'
|
||||
rolepath = os.path.join(self.metadata_directory['current'],
|
||||
metadata_filename)
|
||||
rolepath = os.path.abspath(rolepath)
|
||||
|
||||
|
||||
# Extract the expiration time.
|
||||
expires = self.metadata['current'][metadata_role]['expires']
|
||||
expires = metadata_object['expires']
|
||||
|
||||
# If the current time has surpassed the expiration date, raise
|
||||
# an exception. 'expires' is in 'tuf.formats.ISO8601_DATETIME_SCHEMA'
|
||||
|
|
@ -1772,7 +1789,7 @@ def _ensure_not_expired(self, metadata_role):
|
|||
expires_timestamp = tuf.formats.datetime_to_unix_timestamp(expires_datetime)
|
||||
|
||||
if expires_timestamp < current_time:
|
||||
message = 'Metadata '+repr(rolepath)+' expired on ' + \
|
||||
message = 'Metadata '+repr(metadata_rolename)+' expired on ' + \
|
||||
expires_datetime.ctime() + ' (UTC).'
|
||||
logger.error(message)
|
||||
|
||||
|
|
@ -1910,13 +1927,6 @@ def _refresh_targets_metadata(self, rolename='targets', include_delegations=Fals
|
|||
|
||||
self._update_metadata_if_changed(rolename)
|
||||
|
||||
# Remove the role if it has expired.
|
||||
try:
|
||||
self._ensure_not_expired(rolename)
|
||||
|
||||
except tuf.ExpiredMetadataError:
|
||||
tuf.roledb.remove_role(rolename)
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -2035,13 +2045,6 @@ def refresh_targets_metadata_chain(self, rolename):
|
|||
|
||||
self._update_metadata_if_changed(rolename)
|
||||
|
||||
# Remove the role if it has expired.
|
||||
try:
|
||||
self._ensure_not_expired(rolename)
|
||||
refreshed_chain.append(rolename)
|
||||
except tuf.ExpiredMetadataError:
|
||||
tuf.roledb.remove_role(rolename)
|
||||
|
||||
return refreshed_chain
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -502,10 +502,13 @@ def _download_fixed_amount_of_data(connection, temp_file, required_length):
|
|||
# Data successfully read from the connection. Store it.
|
||||
temp_file.write(data)
|
||||
total_downloaded = total_downloaded + len(data)
|
||||
|
||||
except:
|
||||
raise
|
||||
|
||||
else:
|
||||
return total_downloaded
|
||||
|
||||
finally:
|
||||
# Whatever happens, make sure that we always close the connection.
|
||||
connection.close()
|
||||
|
|
|
|||
Loading…
Reference in a new issue