mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Initial re-implementation of compressed metadata verification in updater.py
This commit is contained in:
parent
738fa4d4a7
commit
6de2fdca5c
4 changed files with 233 additions and 280 deletions
|
|
@ -211,7 +211,7 @@ def parse_options():
|
|||
# the current directory.
|
||||
try:
|
||||
update_client(repository_mirror)
|
||||
except (tuf.RepositoryError, tuf.ExpiredMetadataError), e:
|
||||
except (tuf.NoWorkingMirrorError, tuf.RepositoryError), e:
|
||||
sys.stderr.write('Error: '+str(e)+'\n')
|
||||
sys.exit(1)
|
||||
|
||||
|
|
|
|||
196
tuf/client/updater.py
Executable file → Normal file
196
tuf/client/updater.py
Executable file → Normal file
|
|
@ -556,7 +556,7 @@ def refresh(self):
|
|||
# would need an infinite regress of metadata. Therefore, we use some
|
||||
# default, sane metadata about it.
|
||||
DEFAULT_TIMESTAMP_FILEINFO = {
|
||||
'hashes':None,
|
||||
'hashes': {},
|
||||
'length': tuf.conf.DEFAULT_TIMESTAMP_REQUIRED_LENGTH
|
||||
}
|
||||
|
||||
|
|
@ -627,8 +627,7 @@ def __check_hashes(self, file_object, trusted_hashes):
|
|||
|
||||
|
||||
|
||||
def __hard_check_compressed_file_length(self, file_object,
|
||||
compressed_file_length):
|
||||
def __hard_check_file_length(self, file_object, trusted_file_length):
|
||||
"""
|
||||
<Purpose>
|
||||
A helper function that checks the expected compressed length of a
|
||||
|
|
@ -655,20 +654,19 @@ def __hard_check_compressed_file_length(self, file_object,
|
|||
|
||||
"""
|
||||
|
||||
observed_length = file_object.get_compressed_length()
|
||||
if observed_length != compressed_file_length:
|
||||
raise tuf.DownloadLengthMismatchError(compressed_file_length,
|
||||
observed_length = len(file_object.read())
|
||||
if observed_length != trusted_file_length:
|
||||
raise tuf.DownloadLengthMismatchError(trusted_file_length,
|
||||
observed_length)
|
||||
else:
|
||||
logger.debug('file length ('+str(observed_length)+\
|
||||
') == trusted length ('+str(compressed_file_length)+')')
|
||||
logger.debug('download length ('+str(observed_length)+\
|
||||
') == trusted length ('+str(trusted_file_length)+')')
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def __soft_check_compressed_file_length(self, file_object,
|
||||
compressed_file_length):
|
||||
def __soft_check_file_length(self, file_object, trusted_file_length):
|
||||
"""
|
||||
<Purpose>
|
||||
A helper function that checks the expected compressed length of a
|
||||
|
|
@ -694,20 +692,19 @@ def __soft_check_compressed_file_length(self, file_object,
|
|||
None.
|
||||
"""
|
||||
|
||||
observed_length = file_object.get_compressed_length()
|
||||
if observed_length > compressed_file_length:
|
||||
raise tuf.DownloadLengthMismatchError(compressed_file_length,
|
||||
observed_length = len(file_object.read())
|
||||
if observed_length > trusted_file_length:
|
||||
raise tuf.DownloadLengthMismatchError(trusted_file_length,
|
||||
observed_length)
|
||||
else:
|
||||
logger.debug('file length ('+str(observed_length)+\
|
||||
') <= trusted length ('+str(compressed_file_length)+')')
|
||||
logger.debug('download length ('+str(observed_length)+\
|
||||
') <= trusted length ('+str(trusted_file_length)+')')
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def get_target_file(self, target_filepath, compressed_file_length,
|
||||
uncompressed_file_hashes):
|
||||
def get_target_file(self, target_filepath, file_length, file_hashes):
|
||||
"""
|
||||
<Purpose>
|
||||
Safely download a target file up to a certain length, and check its
|
||||
|
|
@ -738,15 +735,15 @@ def get_target_file(self, target_filepath, compressed_file_length,
|
|||
A tuf.util.TempFile file-like object containing the target.
|
||||
"""
|
||||
|
||||
def verify_uncompressed_target_file(target_file_object):
|
||||
def verify_target_file(target_file_object):
|
||||
# Every target file must have its length and hashes inspected.
|
||||
self.__hard_check_compressed_file_length(target_file_object,
|
||||
compressed_file_length)
|
||||
self.__check_hashes(target_file_object, uncompressed_file_hashes)
|
||||
self.__hard_check_file_length(target_file_object, file_length)
|
||||
self.__check_hashes(target_file_object, file_hashes)
|
||||
|
||||
return self.__get_file(target_filepath, verify_uncompressed_target_file,
|
||||
'target', compressed_file_length,
|
||||
download_safely=True, compression=None)
|
||||
return self.__get_file(target_filepath, verify_target_file,
|
||||
'target', file_length, compression=None,
|
||||
verify_compressed_file_function=None,
|
||||
download_safely=True)
|
||||
|
||||
|
||||
|
||||
|
|
@ -830,7 +827,9 @@ def __verify_uncompressed_metadata_file(self, metadata_file_object,
|
|||
|
||||
|
||||
def unsafely_get_metadata_file(self, metadata_role, metadata_filepath,
|
||||
compressed_file_length):
|
||||
uncompressed_fileinfo,
|
||||
compression=None, compressed_fileinfo=None):
|
||||
|
||||
"""
|
||||
<Purpose>
|
||||
Unsafely download a metadata file up to a certain length. The actual file
|
||||
|
|
@ -861,25 +860,39 @@ def unsafely_get_metadata_file(self, metadata_role, metadata_filepath,
|
|||
<Returns>
|
||||
A tuf.util.TempFile file-like object containing the metadata.
|
||||
"""
|
||||
|
||||
uncompressed_file_length = uncompressed_fileinfo['length']
|
||||
uncompressed_file_hashes = uncompressed_fileinfo['hashes']
|
||||
|
||||
if compression and compressed_fileinfo:
|
||||
compressed_file_length = compressed_fileinfo['length']
|
||||
compressed_file_hashes = compressed_fileinfo['hashes']
|
||||
|
||||
def unsafely_verify_uncompressed_metadata_file(metadata_file_object):
|
||||
self.__soft_check_compressed_file_length(metadata_file_object,
|
||||
compressed_file_length)
|
||||
self.__soft_check_file_length(metadata_file_object,
|
||||
uncompressed_file_length)
|
||||
self.__check_hashes(metadata_file_object, uncompressed_file_hashes)
|
||||
self.__verify_uncompressed_metadata_file(metadata_file_object,
|
||||
metadata_role)
|
||||
|
||||
def unsafely_verify_compressed_metadata_file(metadata_file_object):
|
||||
self.__hard_check_file_length(metadata_file_object,
|
||||
compressed_file_length)
|
||||
self.__check_hashes(metadata_file_object, compressed_file_hashes)
|
||||
|
||||
return self.__get_file(metadata_filepath,
|
||||
unsafely_verify_uncompressed_metadata_file, 'meta',
|
||||
compressed_file_length, download_safely=False,
|
||||
compression=None)
|
||||
uncompressed_file_length, compression,
|
||||
unsafely_verify_compressed_metadata_file,
|
||||
download_safely=False)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def safely_get_metadata_file(self, metadata_role, metadata_filepath,
|
||||
compressed_file_length,
|
||||
uncompressed_file_hashes, compression):
|
||||
uncompressed_fileinfo,
|
||||
compression=None, compressed_fileinfo=None):
|
||||
"""
|
||||
<Purpose>
|
||||
Safely download a metadata file up to a certain length, and check its
|
||||
|
|
@ -915,18 +928,33 @@ def safely_get_metadata_file(self, metadata_role, metadata_filepath,
|
|||
<Returns>
|
||||
A tuf.util.TempFile file-like object containing the metadata.
|
||||
"""
|
||||
|
||||
uncompressed_file_length = uncompressed_fileinfo['length']
|
||||
download_file_length = uncompressed_file_length
|
||||
uncompressed_file_hashes = uncompressed_fileinfo['hashes']
|
||||
|
||||
if compression and compressed_fileinfo:
|
||||
compressed_file_length = compressed_fileinfo['length']
|
||||
download_file_length = compressed_file_length
|
||||
compressed_file_hashes = compressed_fileinfo['hashes']
|
||||
|
||||
def safely_verify_uncompressed_metadata_file(metadata_file_object):
|
||||
self.__hard_check_compressed_file_length(metadata_file_object,
|
||||
compressed_file_length)
|
||||
self.__hard_check_file_length(metadata_file_object,
|
||||
uncompressed_file_length)
|
||||
self.__check_hashes(metadata_file_object, uncompressed_file_hashes)
|
||||
self.__verify_uncompressed_metadata_file(metadata_file_object,
|
||||
metadata_role)
|
||||
|
||||
def safely_verify_compressed_metadata_file(metadata_file_object):
|
||||
self.__hard_check_file_length(metadata_file_object,
|
||||
compressed_file_length)
|
||||
self.__check_hashes(metadata_file_object, compressed_file_hashes)
|
||||
|
||||
return self.__get_file(metadata_filepath,
|
||||
safely_verify_uncompressed_metadata_file, 'meta',
|
||||
compressed_file_length, download_safely=True,
|
||||
compression=compression)
|
||||
download_file_length, compression,
|
||||
safely_verify_compressed_metadata_file,
|
||||
download_safely=True)
|
||||
|
||||
|
||||
|
||||
|
|
@ -935,8 +963,9 @@ def safely_verify_uncompressed_metadata_file(metadata_file_object):
|
|||
# TODO: Instead of the more fragile 'download_safely' switch, unroll the
|
||||
# function into two separate ones: one for "safe" download, and the other one
|
||||
# for "unsafe" download? This should induce safer and more readable code.
|
||||
def __get_file(self, filepath, verify_uncompressed_file, file_type,
|
||||
compressed_file_length, download_safely, compression):
|
||||
def __get_file(self, filepath, verify_file_function, file_type,
|
||||
file_length, compression=None,
|
||||
verify_compressed_file_function=None, download_safely=True):
|
||||
"""
|
||||
<Purpose>
|
||||
Try downloading, up to a certain length, a metadata or target file from a
|
||||
|
|
@ -948,8 +977,8 @@ def __get_file(self, filepath, verify_uncompressed_file, file_type,
|
|||
The relative metadata or target filepath.
|
||||
|
||||
verify_uncompressed_file:
|
||||
A function which expects an uncompressed file-like object and which
|
||||
will raise an exception in case the file is not valid for any reason.
|
||||
A callable function that expects an uncompressed file-like object and
|
||||
raises an exception if the file is invalid.
|
||||
|
||||
file_type:
|
||||
Type of data needed for download, must correspond to one of the strings
|
||||
|
|
@ -991,18 +1020,19 @@ def __get_file(self, filepath, verify_uncompressed_file, file_type,
|
|||
try:
|
||||
if download_safely:
|
||||
file_object = tuf.download.safe_download(file_mirror,
|
||||
compressed_file_length)
|
||||
file_length)
|
||||
else:
|
||||
file_object = tuf.download.unsafe_download(file_mirror,
|
||||
compressed_file_length)
|
||||
file_length)
|
||||
|
||||
if compression:
|
||||
if compression is not None:
|
||||
verify_compressed_file_function(file_object)
|
||||
logger.debug('Decompressing '+str(file_mirror))
|
||||
file_object.decompress_temp_file_object(compression)
|
||||
else:
|
||||
logger.debug('Not decompressing '+str(file_mirror))
|
||||
|
||||
verify_uncompressed_file(file_object)
|
||||
verify_file_function(file_object)
|
||||
|
||||
except Exception, exception:
|
||||
# Remember the error from this mirror, and "reset" the target file.
|
||||
|
|
@ -1023,7 +1053,8 @@ def __get_file(self, filepath, verify_uncompressed_file, file_type,
|
|||
|
||||
|
||||
|
||||
def _update_metadata(self, metadata_role, fileinfo, compression=None):
|
||||
def _update_metadata(self, metadata_role, uncompressed_fileinfo,
|
||||
compression=None, compressed_fileinfo=None):
|
||||
"""
|
||||
<Purpose>
|
||||
Download, verify, and 'install' the metadata belonging to 'metadata_role'.
|
||||
|
|
@ -1037,27 +1068,26 @@ def _update_metadata(self, metadata_role, fileinfo, compression=None):
|
|||
The name of the metadata. This is a role name and should not end
|
||||
in '.txt'. Examples: 'root', 'targets', 'targets/linux/x86'.
|
||||
|
||||
fileinfo:
|
||||
A dictionary containing length and hashes of the metadata file.
|
||||
uncompressed_fileinfo:
|
||||
A dictionary containing length and hashes of the uncompressed metadata
|
||||
file.
|
||||
|
||||
Ex: {"hashes": {"sha256": "3a5a6ec1f353...dedce36e0"},
|
||||
"length": 1340}
|
||||
The length must be that of the compressed metadata file if it is
|
||||
compressed, or uncompressed metadata file if it is uncompressed.
|
||||
The hashes must be that of the uncompressed metadata file.
|
||||
|
||||
STRICT_REQUIRED_LENGTH:
|
||||
A Boolean indicator used to signal whether we should perform strict
|
||||
checking of the required length in 'fileinfo'. True by default. True
|
||||
by default. We explicitly set this to False when we know that we want
|
||||
to turn this off for downloading the timestamp metadata, which has no
|
||||
signed required_length.
|
||||
|
||||
|
||||
compression:
|
||||
A string designating the compression type of 'metadata_role'.
|
||||
The 'release' metadata file may be optionally downloaded and stored in
|
||||
compressed form. Currently, only metadata files compressed with 'gzip'
|
||||
are considered. Any other string is ignored.
|
||||
|
||||
compressed_fileinfo:
|
||||
A dictionary containing length and hashes of the compressed metadata
|
||||
file.
|
||||
|
||||
Ex: {"hashes": {"sha256": "3a5a6ec1f353...dedce36e0"},
|
||||
"length": 1340}
|
||||
|
||||
<Exceptions>
|
||||
tuf.NoWorkingMirrorError:
|
||||
The metadata could not be updated. This is not specific to a single
|
||||
|
|
@ -1082,11 +1112,6 @@ def _update_metadata(self, metadata_role, fileinfo, compression=None):
|
|||
if compression == 'gzip':
|
||||
metadata_filename = metadata_filename + '.gz'
|
||||
|
||||
# Extract file length and file hashes. They will be passed as arguments
|
||||
# to 'download_file' function.
|
||||
compressed_file_length = fileinfo['length']
|
||||
uncompressed_file_hashes = fileinfo['hashes']
|
||||
|
||||
# Attempt a file download from each mirror until the file is downloaded and
|
||||
# verified. If the signature of the downloaded file is valid, proceed,
|
||||
# otherwise log a warning and try the next mirror. 'metadata_file_object'
|
||||
|
|
@ -1106,17 +1131,17 @@ def _update_metadata(self, metadata_role, fileinfo, compression=None):
|
|||
# Note also that we presently support decompression of only "safe"
|
||||
# metadata, but this is easily extend to "unsafe" metadata as well as
|
||||
# "safe" targets.
|
||||
|
||||
|
||||
if metadata_role == 'timestamp':
|
||||
metadata_file_object = \
|
||||
self.unsafely_get_metadata_file(metadata_role, metadata_filename,
|
||||
compressed_file_length)
|
||||
uncompressed_fileinfo,
|
||||
compression, compressed_fileinfo)
|
||||
else:
|
||||
metadata_file_object = \
|
||||
self.safely_get_metadata_file(metadata_role, metadata_filename,
|
||||
compressed_file_length,
|
||||
uncompressed_file_hashes,
|
||||
compression=compression)
|
||||
uncompressed_fileinfo,
|
||||
compression, compressed_fileinfo)
|
||||
|
||||
# The metadata has been verified. Move the metadata file into place.
|
||||
# First, move the 'current' metadata file to the 'previous' directory
|
||||
|
|
@ -1129,6 +1154,7 @@ def _update_metadata(self, metadata_role, fileinfo, compression=None):
|
|||
previous_filepath = os.path.join(self.metadata_directory['previous'],
|
||||
metadata_filename)
|
||||
previous_filepath = os.path.abspath(previous_filepath)
|
||||
|
||||
if os.path.exists(current_filepath):
|
||||
# Previous metadata might not exist, say when delegations are added.
|
||||
tuf.util.ensure_parent_dir(previous_filepath)
|
||||
|
|
@ -1145,6 +1171,7 @@ def _update_metadata(self, metadata_role, fileinfo, compression=None):
|
|||
current_uncompressed_filepath = \
|
||||
os.path.abspath(current_uncompressed_filepath)
|
||||
metadata_file_object.move(current_uncompressed_filepath)
|
||||
|
||||
else:
|
||||
metadata_file_object.move(current_filepath)
|
||||
|
||||
|
|
@ -1229,6 +1256,7 @@ def _update_metadata_if_changed(self, metadata_role, referenced_metadata='releas
|
|||
message = 'Cannot update '+repr(metadata_role)+' because ' \
|
||||
+referenced_metadata+' is missing.'
|
||||
raise tuf.RepositoryError(message)
|
||||
|
||||
# The referenced metadata has been loaded. Extract the new
|
||||
# fileinfo for 'metadata_role' from it.
|
||||
else:
|
||||
|
|
@ -1243,18 +1271,25 @@ def _update_metadata_if_changed(self, metadata_role, referenced_metadata='releas
|
|||
# must begin with 'targets/'. The Release role lists all the Targets
|
||||
# metadata available on the repository, including any that may be in
|
||||
# compressed form.
|
||||
#
|
||||
# In addition to validating the fileinfo (i.e., file lengths and hashes)
|
||||
# of the uncompressed metadata, the compressed version is also verified to
|
||||
# match its respective fileinfo. Verifying the compressed fileinfo ensures
|
||||
# untrusted data is not decompressed prior to verifying hashes, or
|
||||
# decompressing a file that may be invalid or partially intact.
|
||||
compression = None
|
||||
compressed_fileinfo = None
|
||||
|
||||
# Extract the fileinfo of the uncompressed version of 'metadata_role'.
|
||||
uncompressed_fileinfo = self.metadata['current'][referenced_metadata] \
|
||||
['meta'] \
|
||||
[uncompressed_metadata_filename]
|
||||
|
||||
# Check for availability of compressed versions of 'release.txt',
|
||||
# 'targets.txt', and delegated Targets, which also start with 'targets'.
|
||||
# Check for the availability of compressed versions of 'release.txt',
|
||||
# 'targets.txt', and delegated Targets (that also start with 'targets').
|
||||
# For 'targets.txt' and delegated metadata, 'referenced_metata'
|
||||
# should always be 'release'. 'release.txt' specifies all roles
|
||||
# provided by a repository, including their file sizes and hashes.
|
||||
# provided by a repository, including their file lengths and hashes.
|
||||
if metadata_role == 'release' or metadata_role.startswith('targets'):
|
||||
gzip_metadata_filename = uncompressed_metadata_filename + '.gz'
|
||||
if gzip_metadata_filename in self.metadata['current'] \
|
||||
|
|
@ -1262,20 +1297,13 @@ def _update_metadata_if_changed(self, metadata_role, referenced_metadata='releas
|
|||
compression = 'gzip'
|
||||
compressed_fileinfo = self.metadata['current'][referenced_metadata] \
|
||||
['meta'][gzip_metadata_filename]
|
||||
# NOTE: When we download the compressed file, we care about its
|
||||
# compressed length. However, we check the hash of the uncompressed
|
||||
# file; therefore we use the hashes of the uncompressed file.
|
||||
fileinfo = {'length': compressed_fileinfo['length'],
|
||||
'hashes': uncompressed_fileinfo['hashes']}
|
||||
|
||||
logger.debug('Compressed version of '+\
|
||||
repr(uncompressed_metadata_filename)+' is available at '+\
|
||||
repr(gzip_metadata_filename)+'.')
|
||||
else:
|
||||
logger.debug('Compressed version of '+\
|
||||
repr(uncompressed_metadata_filename)+' not available.')
|
||||
fileinfo = uncompressed_fileinfo
|
||||
else:
|
||||
fileinfo = uncompressed_fileinfo
|
||||
|
||||
# Simply return if the file has not changed, according to the metadata
|
||||
# about the uncompressed file provided by the referenced metadata.
|
||||
|
|
@ -1287,8 +1315,8 @@ def _update_metadata_if_changed(self, metadata_role, referenced_metadata='releas
|
|||
' has changed.')
|
||||
|
||||
try:
|
||||
self._update_metadata(metadata_role, fileinfo=fileinfo,
|
||||
compression=compression)
|
||||
self._update_metadata(metadata_role, uncompressed_fileinfo, compression,
|
||||
compressed_fileinfo)
|
||||
except:
|
||||
# The current metadata we have is not current but we couldn't
|
||||
# get new metadata. We shouldn't use the old metadata anymore.
|
||||
|
|
@ -1299,8 +1327,9 @@ def _update_metadata_if_changed(self, metadata_role, referenced_metadata='releas
|
|||
# We shouldn't need to, but we need to check the trust
|
||||
# implications of the current implementation.
|
||||
self._delete_metadata(metadata_role)
|
||||
logger.error('Metadata for '+str(metadata_role)+' could not be updated')
|
||||
logger.error('Metadata for '+repr(metadata_role)+' cannot be updated.')
|
||||
raise
|
||||
|
||||
else:
|
||||
# We need to remove delegated roles because the delegated roles
|
||||
# may not be trusted anymore.
|
||||
|
|
@ -2549,8 +2578,3 @@ def download_target(self, target, destination_directory):
|
|||
logger.warn(str(target_dirpath)+' does not exist.')
|
||||
|
||||
target_file_object.move(destination)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
313
tuf/libtuf.py
313
tuf/libtuf.py
|
|
@ -824,7 +824,6 @@ def expiration(self):
|
|||
def expiration(self, expiration_datetime_utc):
|
||||
"""
|
||||
<Purpose>
|
||||
TODO: return 'input_datetime_utc' in ISO 8601 format.
|
||||
|
||||
>>>
|
||||
>>>
|
||||
|
|
@ -932,8 +931,8 @@ def __init__(self):
|
|||
expiration = tuf.formats.format_time(time.time()+ROOT_EXPIRATION)
|
||||
|
||||
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
|
||||
'signatures': [], 'version': 0, 'compressions': [''],
|
||||
'expires': expiration, 'partial_loaded': False}
|
||||
'signatures': [], 'version': 1, 'compressions': [''],
|
||||
'expires': expiration}
|
||||
try:
|
||||
tuf.roledb.add_role(self._rolename, roleinfo)
|
||||
except tuf.RoleAlreadyExistsError, e:
|
||||
|
|
@ -969,8 +968,8 @@ def __init__(self):
|
|||
expiration = tuf.formats.format_time(time.time()+TIMESTAMP_EXPIRATION)
|
||||
|
||||
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
|
||||
'signatures': [], 'version': 0, 'compressions': [''],
|
||||
'expires': expiration, 'partial_loaded': False}
|
||||
'signatures': [], 'version': 1, 'compressions': [''],
|
||||
'expires': expiration}
|
||||
|
||||
try:
|
||||
tuf.roledb.add_role(self.rolename, roleinfo)
|
||||
|
|
@ -1007,8 +1006,8 @@ def __init__(self):
|
|||
expiration = tuf.formats.format_time(time.time()+RELEASE_EXPIRATION)
|
||||
|
||||
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
|
||||
'signatures': [], 'version': 0, 'compressions': [''],
|
||||
'expires': expiration, 'partial_loaded': False}
|
||||
'signatures': [], 'version': 1, 'compressions': [''],
|
||||
'expires': expiration}
|
||||
|
||||
try:
|
||||
tuf.roledb.add_role(self._rolename, roleinfo)
|
||||
|
|
@ -1016,6 +1015,10 @@ def __init__(self):
|
|||
pass
|
||||
|
||||
|
||||
def write_partial(self):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -1044,11 +1047,10 @@ class Targets(Metadata):
|
|||
def __init__(self, targets_directory, rolename, roleinfo=None):
|
||||
|
||||
# Do the arguments have the correct format?
|
||||
# This check ensures arguments have the appropriate number of objects and
|
||||
# object types, and that all dict keys are properly named.
|
||||
# Raise 'tuf.FormatError' if the check fails.
|
||||
# Raise 'tuf.FormatError' if any are improperly formatted.
|
||||
tuf.formats.PATH_SCHEMA.check_match(targets_directory)
|
||||
tuf.formats.ROLENAME_SCHEMA.check_match(rolename)
|
||||
|
||||
if roleinfo is not None:
|
||||
tuf.formats.ROLEDB_SCHEMA.check_match(roleinfo)
|
||||
|
||||
|
|
@ -1060,10 +1062,15 @@ def __init__(self, targets_directory, rolename, roleinfo=None):
|
|||
expiration = tuf.formats.format_time(time.time()+TARGETS_EXPIRATION)
|
||||
|
||||
if roleinfo is None:
|
||||
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
|
||||
'version': 0, 'compressions': [''], 'expires': expiration,
|
||||
'signatures': [], 'paths': [], 'path_hash_prefixes': [],
|
||||
'partial_loaded': False,
|
||||
roleinfo = {'keyids': [],
|
||||
'signing_keyids': [],
|
||||
'threshold': 1,
|
||||
'version': 1,
|
||||
'compressions': [''],
|
||||
'expires': expiration,
|
||||
'signatures': [],
|
||||
'paths': [],
|
||||
'path_hash_prefixes': [],
|
||||
'delegations': {'keys': {},
|
||||
'roles': []}}
|
||||
|
||||
|
|
@ -1407,7 +1414,7 @@ def delegate(self, rolename, public_keys, list_of_targets,
|
|||
# Add role to 'tuf.roledb.py'.
|
||||
expiration = tuf.formats.format_time(time.time()+TARGETS_EXPIRATION)
|
||||
roleinfo = {'name': full_rolename, 'keyids': keyids, 'signing_keyids': [],
|
||||
'threshold': threshold, 'version': 0, 'compressions': [''],
|
||||
'threshold': threshold, 'version': 1, 'compressions': [''],
|
||||
'expires': expiration, 'signatures': [],
|
||||
'paths': relative_targetpaths, 'delegations': {'keys': {},
|
||||
'roles': []}}
|
||||
|
|
@ -1537,39 +1544,20 @@ def _generate_and_write_metadata(rolename, filenames, write_partial,
|
|||
|
||||
signable = sign_metadata(metadata, roleinfo['signing_keyids'],
|
||||
metadata_filename)
|
||||
|
||||
# Increment version number if this is a new/first partial write.
|
||||
if write_partial:
|
||||
temp_signable = sign_metadata(metadata, [],
|
||||
metadata_filename)
|
||||
temp_signable['signatures'].extend(roleinfo['signatures'])
|
||||
status = tuf.sig.get_signature_status(temp_signable, rolename)
|
||||
if len(status['good_sigs']) == 0:
|
||||
metadata['version'] = metadata['version'] + 1
|
||||
signable = sign_metadata(metadata, roleinfo['signing_keyids'],
|
||||
metadata_filename)
|
||||
# non-partial write()
|
||||
else:
|
||||
if tuf.sig.verify(signable, rolename) and not roleinfo['partial_loaded']:
|
||||
metadata['version'] = metadata['version'] + 1
|
||||
signable = sign_metadata(metadata, roleinfo['signing_keyids'],
|
||||
metadata_filename)
|
||||
|
||||
# Write the metadata to file if contains a threshold of signatures.
|
||||
signable['signatures'].extend(roleinfo['signatures'])
|
||||
|
||||
|
||||
if tuf.sig.verify(signable, rolename) or write_partial:
|
||||
_remove_invalid_and_duplicate_signatures(signable)
|
||||
if not write_partial:
|
||||
_remove_invalid_and_duplicate_signatures(signable)
|
||||
for compression in roleinfo['compressions']:
|
||||
write_metadata_file(signable, metadata_filename, compression)
|
||||
|
||||
return signable
|
||||
|
||||
# 'signable' contains an invalid threshold of signatures.
|
||||
return signable
|
||||
|
||||
else:
|
||||
message = 'Not enough signatures for '+repr(metadata_filename)
|
||||
raise tuf.Error(message, signable)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -1622,19 +1610,6 @@ def _get_password(prompt='Password: ', confirm=False):
|
|||
|
||||
|
||||
|
||||
def _check_if_partial_loaded(rolename, signable, roleinfo):
|
||||
"""
|
||||
"""
|
||||
|
||||
status = tuf.sig.get_signature_status(signable, rolename)
|
||||
if len(status['good_sigs']) < status['threshold'] and \
|
||||
len(status['good_sigs']) >= 1:
|
||||
roleinfo['partial_loaded'] = True
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def _check_directory(directory):
|
||||
"""
|
||||
<Purpose>
|
||||
|
|
@ -1899,8 +1874,6 @@ def load_repository(repository_directory):
|
|||
if signature not in roleinfo['signatures']:
|
||||
roleinfo['signatures'].append(signature)
|
||||
|
||||
_check_if_partial_loaded('root', signable, roleinfo)
|
||||
|
||||
if os.path.exists(root_filename+'.gz'):
|
||||
roleinfo['compressions'].append('gz')
|
||||
tuf.roledb.update_roleinfo('root', roleinfo)
|
||||
|
|
@ -1929,8 +1902,6 @@ def load_repository(repository_directory):
|
|||
roleinfo['delegations'] = targets_metadata['delegations']
|
||||
if os.path.exists(targets_filename+'.gz'):
|
||||
roleinfo['compressions'].append('gz')
|
||||
|
||||
_check_if_partial_loaded('targets', signable, roleinfo)
|
||||
tuf.roledb.update_roleinfo('targets', roleinfo)
|
||||
|
||||
# Add the keys specified in the delegations field of the Targets role.
|
||||
|
|
@ -1942,12 +1913,9 @@ def load_repository(repository_directory):
|
|||
|
||||
for role in targets_metadata['delegations']['roles']:
|
||||
rolename = role['name']
|
||||
roleinfo = {'name': role['name'],
|
||||
'keyids': role['keyids'],
|
||||
'threshold': role['threshold'],
|
||||
'compressions': [''],
|
||||
'signing_keyids': [],
|
||||
'signatures': [],
|
||||
roleinfo = {'name': role['name'], 'keyids': role['keyids'],
|
||||
'threshold': role['threshold'], 'compressions': [''],
|
||||
'signing_keyids': [], 'signatures': [],
|
||||
'delegations': {'keys': {},
|
||||
'roles': []}}
|
||||
tuf.roledb.add_role(rolename, roleinfo)
|
||||
|
|
@ -1969,8 +1937,6 @@ def load_repository(repository_directory):
|
|||
roleinfo['version'] = release_metadata['version']
|
||||
if os.path.exists(release_filename+'.gz'):
|
||||
roleinfo['compressions'].append('gz')
|
||||
|
||||
_check_if_partial_loaded('release', signable, roleinfo)
|
||||
tuf.roledb.update_roleinfo('release', roleinfo)
|
||||
|
||||
else:
|
||||
|
|
@ -1989,8 +1955,6 @@ def load_repository(repository_directory):
|
|||
roleinfo['version'] = timestamp_metadata['version']
|
||||
if os.path.exists(timestamp_filename+'.gz'):
|
||||
roleinfo['compressions'].append('gz')
|
||||
|
||||
_check_if_partial_loaded('timestamp', signable, roleinfo)
|
||||
tuf.roledb.update_roleinfo('timestamp', roleinfo)
|
||||
|
||||
else:
|
||||
|
|
@ -2036,8 +2000,6 @@ def load_repository(repository_directory):
|
|||
|
||||
if os.path.exists(metadata_path+'.gz'):
|
||||
roleinfo['compressions'].append('gz')
|
||||
|
||||
_check_if_partial_loaded(metadata_name, signable, roleinfo)
|
||||
tuf.roledb.update_roleinfo(metadata_name, roleinfo)
|
||||
|
||||
new_targets_object = Targets(targets_directory, metadata_name, roleinfo)
|
||||
|
|
@ -2057,10 +2019,12 @@ def load_repository(repository_directory):
|
|||
|
||||
for role in metadata_object['delegations']['roles']:
|
||||
rolename = role['name']
|
||||
roleinfo = {'name': role['name'], 'keyids': role['keyids'],
|
||||
'threshold': role['threshold'], 'compressions': [''],
|
||||
'signing_keyids': [], 'signatures': [],
|
||||
'partial_loaded': False,
|
||||
roleinfo = {'name': role['name'],
|
||||
'keyids': role['keyids'],
|
||||
'threshold': role['threshold'],
|
||||
'compressions': [''],
|
||||
'signing_keyids': [],
|
||||
'signatures': [],
|
||||
'delegations': {'keys': {},
|
||||
'roles': []}}
|
||||
tuf.roledb.add_role(rolename, roleinfo)
|
||||
|
|
@ -2075,16 +2039,6 @@ def generate_and_write_rsa_keypair(filepath, bits=DEFAULT_RSA_KEY_BITS,
|
|||
password=None):
|
||||
"""
|
||||
<Purpose>
|
||||
Generate an RSA key file, create an encrypted PEM string (using 'password'
|
||||
as the pass phrase), and store it in 'filepath'. The public key portion
|
||||
of the generated RSA key is stored in <'filepath'>.pub.
|
||||
|
||||
Which cryptography library performs the cryptographic decryption is
|
||||
determined by the string set in 'tuf.conf.RSA_CRYPTO_LIBRARY'. PyCrypto
|
||||
currently supported.
|
||||
|
||||
The PEM private key is encrypted with 3DES and CBC the mode of operation.
|
||||
The pass phrase is strengthened with PBKDF1-MD5.
|
||||
|
||||
<Arguments>
|
||||
filepath:
|
||||
|
|
@ -2095,8 +2049,7 @@ def generate_and_write_rsa_keypair(filepath, bits=DEFAULT_RSA_KEY_BITS,
|
|||
The number of bits of the generated RSA key.
|
||||
|
||||
password:
|
||||
The passphrase to encrypt 'filepath'.
|
||||
|
||||
|
||||
<Exceptions>
|
||||
tuf.FormatError, if the arguments are improperly formatted.
|
||||
|
||||
|
|
@ -2107,11 +2060,11 @@ def generate_and_write_rsa_keypair(filepath, bits=DEFAULT_RSA_KEY_BITS,
|
|||
None.
|
||||
"""
|
||||
|
||||
# Do the arguments have the correct format?
|
||||
# This check ensures arguments have the appropriate number of objects and
|
||||
# object types, and that all dict keys are properly named.
|
||||
# Raise 'tuf.FormatError' if the check fails.
|
||||
# Does 'filepath' have the correct format?
|
||||
# Raise 'tuf.FormatError' if there is a mismatch.
|
||||
tuf.formats.PATH_SCHEMA.check_match(filepath)
|
||||
|
||||
# Does 'bits' have the correct format?
|
||||
tuf.formats.RSAKEYBITS_SCHEMA.check_match(bits)
|
||||
|
||||
# If the caller does not provide a password argument, prompt for one.
|
||||
|
|
@ -2121,24 +2074,20 @@ def generate_and_write_rsa_keypair(filepath, bits=DEFAULT_RSA_KEY_BITS,
|
|||
|
||||
# Does 'password' have the correct format?
|
||||
tuf.formats.PASSWORD_SCHEMA.check_match(password)
|
||||
|
||||
# Generate public and private RSA keys, encrypt the private portion, and
|
||||
# store them in PEM format.
|
||||
|
||||
rsa_key = tuf.keys.generate_rsa_key(bits)
|
||||
public = rsa_key['keyval']['public']
|
||||
private = rsa_key['keyval']['private']
|
||||
encrypted_pem = tuf.keys.create_rsa_encrypted_pem(private, password)
|
||||
|
||||
# Write public key (i.e., 'public', which is in PEM format) to
|
||||
# '<filepath>.pub'. If the parent directory of filepath does not exist,
|
||||
# create it, and all parent directories if necessary.
|
||||
# '<filepath>.pub'.
|
||||
tuf.util.ensure_parent_dir(filepath)
|
||||
|
||||
with open(filepath+'.pub', 'w') as file_object:
|
||||
file_object.write(public)
|
||||
|
||||
# Write the private key in encrypted PEM format to '<filepath>'.
|
||||
# Unlike the public key file, the private key does not have a file extension.
|
||||
with open(filepath, 'w') as file_object:
|
||||
file_object.write(encrypted_pem)
|
||||
|
||||
|
|
@ -2149,38 +2098,23 @@ def generate_and_write_rsa_keypair(filepath, bits=DEFAULT_RSA_KEY_BITS,
|
|||
def import_rsa_privatekey_from_file(filepath, password=None):
|
||||
"""
|
||||
<Purpose>
|
||||
Import the encrypted PEM file in 'filepath', decrypt it, and return the key
|
||||
object in 'tuf.formats.RSAKEY_SCHEMA' format.
|
||||
|
||||
Which cryptography library performs the cryptographic decryption is
|
||||
determined by the string set in 'tuf.conf.RSA_CRYPTO_LIBRARY'. PyCrypto
|
||||
currently supported.
|
||||
|
||||
The PEM private key is encrypted with 3DES and CBC the mode of operation.
|
||||
The pass phrase is strengthened with PBKDF1-MD5.
|
||||
|
||||
<Arguments>
|
||||
filepath:
|
||||
<filepath> file, an RSA encrypted PEM file. Unlike the public RSA PEM
|
||||
key file, 'filepath' does not have an extension.
|
||||
<filepath> file, an RSA encrypted PEM file.
|
||||
|
||||
password:
|
||||
The passphrase to decrypt 'filepath'.
|
||||
|
||||
<Exceptions>
|
||||
tuf.FormatError, if 'filepath' is improperly formatted.
|
||||
|
||||
<Side Effects>
|
||||
The contents of 'filepath' is read, decrypted, and the key stored.
|
||||
|
||||
<Returns>
|
||||
An RSA key object, conformant to 'tuf.formats.RSAKEY_SCHEMA'.
|
||||
"""
|
||||
|
||||
# Does 'filepath' have the correct format?
|
||||
# This check ensures arguments have the appropriate number of objects and
|
||||
# object types, and that all dict keys are properly named.
|
||||
# Raise 'tuf.FormatError' if the check fails.
|
||||
# Raise 'tuf.FormatError' if there is a mismatch.
|
||||
tuf.formats.PATH_SCHEMA.check_match(filepath)
|
||||
|
||||
# If the caller does not provide a password argument, prompt for one.
|
||||
|
|
@ -2193,11 +2127,9 @@ def import_rsa_privatekey_from_file(filepath, password=None):
|
|||
|
||||
encrypted_pem = None
|
||||
|
||||
# Read the contents of 'filepath' that should be an encrypted PEM.
|
||||
with open(filepath, 'rb') as file_object:
|
||||
encrypted_pem = file_object.read()
|
||||
|
||||
# Convert 'rsa_pubkey_pem' in 'tuf.formats.RSAKEY_SCHEMA' format.
|
||||
rsa_key = tuf.keys.import_rsakey_from_encrypted_pem(encrypted_pem, password)
|
||||
|
||||
return rsa_key
|
||||
|
|
@ -2209,14 +2141,8 @@ def import_rsa_privatekey_from_file(filepath, password=None):
|
|||
def import_rsa_publickey_from_file(filepath):
|
||||
"""
|
||||
<Purpose>
|
||||
Import the RSA key stored in 'filepath'. The key object returned is a TUF
|
||||
key, specifically 'tuf.formats.RSAKEY_SCHEMA'. If the RSA PEM in 'filepath'
|
||||
contains a private key, it is discarded.
|
||||
|
||||
Which cryptography library performs the cryptographic decryption is
|
||||
determined by the string set in 'tuf.conf.RSA_CRYPTO_LIBRARY'. PyCrypto
|
||||
currently supported.
|
||||
|
||||
If the RSA PEM in 'filepath' contains a private key, it is discarded.
|
||||
|
||||
<Arguments>
|
||||
filepath:
|
||||
<filepath>.pub file, an RSA PEM file.
|
||||
|
|
@ -2225,24 +2151,18 @@ def import_rsa_publickey_from_file(filepath):
|
|||
tuf.FormatError, if 'filepath' is improperly formatted.
|
||||
|
||||
<Side Effects>
|
||||
'filepath' is read and its contents extracted.
|
||||
|
||||
|
||||
<Returns>
|
||||
An RSA key object conformant to 'tuf.formats.RSAKEY_SCHEMA'.
|
||||
"""
|
||||
|
||||
# Does 'filepath' have the correct format?
|
||||
# This check ensures arguments have the appropriate number of objects and
|
||||
# object types, and that all dict keys are properly named.
|
||||
# Raise 'tuf.FormatError' if the check fails.
|
||||
# Raise 'tuf.FormatError' if there is a mismatch.
|
||||
tuf.formats.PATH_SCHEMA.check_match(filepath)
|
||||
|
||||
# Read the contents of the key file that should be in PEM format and contains
|
||||
# the public portion of the RSA key.
|
||||
with open(filepath, 'r+b') as file_object:
|
||||
rsa_pubkey_pem = file_object.read()
|
||||
|
||||
# Convert 'rsa_pubkey_pem' in 'tuf.formats.RSAKEY_SCHEMA' format.
|
||||
rsakey_dict = tuf.keys.format_rsakey_from_pem(rsa_pubkey_pem)
|
||||
|
||||
return rsakey_dict
|
||||
|
|
@ -2251,11 +2171,44 @@ def import_rsa_publickey_from_file(filepath):
|
|||
|
||||
|
||||
|
||||
def expiration_datetime_utc(input_datetime_utc):
|
||||
"""
|
||||
<Purpose>
|
||||
TODO: return 'input_datetime_utc' in ISO 8601 format.
|
||||
|
||||
<Arguments>
|
||||
input_datetime_utc:
|
||||
|
||||
<Exceptions>
|
||||
tuf.FormatError, if 'input_datetime_utc' is invalid.
|
||||
|
||||
<Side Effects>
|
||||
None.
|
||||
|
||||
<Returns>
|
||||
"""
|
||||
if not tuf.formats.DATETIME_SCHEMA.matches(input_datetime_utc):
|
||||
message = 'The datetime argument must be in "YYYY-MM-DD HH:MM:SS" format.'
|
||||
raise tuf.FormatError(message)
|
||||
try:
|
||||
unix_timestamp = tuf.formats.parse_time(input_datetime_utc+' UTC')
|
||||
except (tuf.FormatError, ValueError), e:
|
||||
raise tuf.FormatError('Invalid date entered.')
|
||||
|
||||
if unix_timestamp < time.time():
|
||||
message = 'The expiration date must occur after the current date.'
|
||||
raise tuf.FormatError(message)
|
||||
|
||||
return input_datetime_utc+' UTC'
|
||||
|
||||
|
||||
|
||||
|
||||
def get_metadata_filenames(metadata_directory=None):
|
||||
"""
|
||||
<Purpose>
|
||||
Return a dictionary containing the filenames of the top-level roles.
|
||||
If 'metadata_directory' were set to 'metadata', the dictionary
|
||||
If 'metadata_directory' is set to 'metadata', the dictionary
|
||||
returned would contain:
|
||||
|
||||
filenames = {'root': 'metadata/root.txt',
|
||||
|
|
@ -2285,15 +2238,10 @@ def get_metadata_filenames(metadata_directory=None):
|
|||
metadata_directory = '.'
|
||||
|
||||
# Does 'metadata_directory' have the correct format?
|
||||
# This check ensures arguments have the appropriate number of objects and
|
||||
# object types, and that all dict keys are properly named.
|
||||
# Raise 'tuf.FormatError' if the check fails.
|
||||
# Raise 'tuf.FormatError' if there is a mismatch.
|
||||
tuf.formats.PATH_SCHEMA.check_match(metadata_directory)
|
||||
|
||||
# Store the filepaths of the top-level roles, including the
|
||||
# 'metadata_directory' for each one.
|
||||
filenames = {}
|
||||
|
||||
filenames[ROOT_FILENAME] = \
|
||||
os.path.join(metadata_directory, ROOT_FILENAME)
|
||||
|
||||
|
|
@ -2315,7 +2263,7 @@ def get_metadata_filenames(metadata_directory=None):
|
|||
def get_metadata_file_info(filename):
|
||||
"""
|
||||
<Purpose>
|
||||
Retrieve the file information of 'filename'. The object returned
|
||||
Retrieve the file information for 'filename'. The object returned
|
||||
conforms to 'tuf.formats.FILEINFO_SCHEMA'. The information
|
||||
generated for 'filename' is stored in metadata files like 'targets.txt'.
|
||||
The fileinfo object returned has the form:
|
||||
|
|
@ -2325,7 +2273,7 @@ def get_metadata_file_info(filename):
|
|||
|
||||
<Arguments>
|
||||
filename:
|
||||
The metadata file whose file information is needed. It must exist.
|
||||
The metadata file whose file information is needed.
|
||||
|
||||
<Exceptions>
|
||||
tuf.FormatError, if 'filename' is improperly formatted.
|
||||
|
|
@ -2338,14 +2286,12 @@ def get_metadata_file_info(filename):
|
|||
|
||||
<Returns>
|
||||
A dictionary conformant to 'tuf.formats.FILEINFO_SCHEMA'. This
|
||||
dictionary contains the length, hashes, and custom data about the
|
||||
'filename' metadata file.
|
||||
dictionary contains the length, hashes, and custom data about
|
||||
the 'filename' metadata file.
|
||||
"""
|
||||
|
||||
# Does 'filename' have the correct format?
|
||||
# This check ensures arguments have the appropriate number of objects and
|
||||
# object types, and that all dict keys are properly named.
|
||||
# Raise 'tuf.FormatError' if the check fails.
|
||||
# Raise 'tuf.FormatError' if there is a mismatch.
|
||||
tuf.formats.PATH_SCHEMA.check_match(filename)
|
||||
|
||||
if not os.path.isfile(filename):
|
||||
|
|
@ -2370,7 +2316,7 @@ def generate_root_metadata(version, expiration_date):
|
|||
"""
|
||||
<Purpose>
|
||||
Create the root metadata. 'tuf.roledb.py' and 'tuf.keydb.py' are read and
|
||||
the information returned by these modules is used to generate the root
|
||||
the information returned by these modules are used to generate the root
|
||||
metadata object.
|
||||
|
||||
<Arguments>
|
||||
|
|
@ -2380,8 +2326,6 @@ def generate_root_metadata(version, expiration_date):
|
|||
trusted.
|
||||
|
||||
expiration_date:
|
||||
The expiration date, in UTC, of the metadata file. Conformant to
|
||||
'tuf.formats.TIME_SCHEMA'.
|
||||
|
||||
<Exceptions>
|
||||
tuf.FormatError, if the generated root metadata object could not
|
||||
|
|
@ -2398,29 +2342,23 @@ def generate_root_metadata(version, expiration_date):
|
|||
"""
|
||||
|
||||
# Do the arguments have the correct format?
|
||||
# This check ensures arguments have the appropriate number of objects and
|
||||
# object types, and that all dict keys are properly named.
|
||||
# Raise 'tuf.FormatError' if the check fails.
|
||||
# Raise 'tuf.FormatError' if any of the arguments are improperly formatted.
|
||||
tuf.formats.METADATAVERSION_SCHEMA.check_match(version)
|
||||
tuf.formats.TIME_SCHEMA.check_match(expiration_date)
|
||||
|
||||
# The role and key dictionaries to be saved in the root metadata object.
|
||||
# Conformant to 'ROLEDICT_SCHEMA' and 'KEYDICT_SCHEMA', respectively.
|
||||
roledict = {}
|
||||
keydict = {}
|
||||
|
||||
# Extract the role, threshold, and keyid information of the top-level roles,
|
||||
# which Root stores in its metadata. The necessary role metadata is generated
|
||||
# from this information.
|
||||
# Extract the role, threshold, and keyid information from the config.
|
||||
# The necessary role metadata is generated from this information.
|
||||
for rolename in ['root', 'targets', 'release', 'timestamp']:
|
||||
|
||||
# If a top-level role is missing from 'tuf.roledb.py', raise an exception.
|
||||
if not tuf.roledb.role_exists(rolename):
|
||||
raise tuf.Error(repr(rolename)+' not in "tuf.roledb".')
|
||||
|
||||
# Keep track of the keys loaded so that duplicates is avoided.
|
||||
keyids = []
|
||||
|
||||
keyids = []
|
||||
# Generate keys for the keyids listed by the role being processed.
|
||||
for keyid in tuf.roledb.get_role_keyids(rolename):
|
||||
key = tuf.keydb.get_key(keyid)
|
||||
|
|
@ -2471,39 +2409,41 @@ def generate_targets_metadata(targets_directory, target_files, version,
|
|||
expiration_date, delegations=None):
|
||||
"""
|
||||
<Purpose>
|
||||
Generate the targets metadata object. The targets in 'target_files' must
|
||||
exist at the corresponding path on the repository targets directory.
|
||||
'target_files' is a list of targets. The 'custom' field of the targets
|
||||
metadata is not currently supported.
|
||||
Generate the targets metadata object. The targets must exist at the same
|
||||
path they should on the repo. 'target_files' is a list of targets. We're
|
||||
not worrying about custom metadata at the moment. It is allowed to not
|
||||
provide keys.
|
||||
|
||||
<Arguments>
|
||||
targets_directory:
|
||||
The directory containing the target files and directories of the
|
||||
repository.
|
||||
The directory (absolute path) containing the target files and directories.
|
||||
|
||||
target_files:
|
||||
The target files listed in 'targets.txt'. 'target_files' is a list of
|
||||
target paths that are relative to the targets directory
|
||||
(e.g., ['file1.txt', 'Django/module.py']).
|
||||
|
||||
The target files tracked by 'targets.txt'. 'target_files' is a list of
|
||||
paths/directories of target files that are relative to the targets
|
||||
directory (e.g., ['file1.txt', 'Django/module.py']). If the target files
|
||||
are saved in
|
||||
the root folder 'targets' on the repository, then 'targets' must be
|
||||
included in the target paths. The repository does not have to name
|
||||
this folder 'targets'.
|
||||
|
||||
version:
|
||||
The metadata version number. Clients use the version number to
|
||||
determine if the downloaded version is newer than the one currently
|
||||
trusted.
|
||||
|
||||
expiration_date:
|
||||
The expiration date, in UTC, of the metadata file. Conformant to
|
||||
'tuf.formats.TIME_SCHEMA'.
|
||||
The expiration date, in UTC, of the metadata file.
|
||||
Conformant to 'tuf.formats.TIME_SCHEMA'.
|
||||
|
||||
delegations:
|
||||
The delegations made by the targets role to be generated. 'delegations'
|
||||
must match 'tuf.formats.DELEGATIONS_SCHEMA'.
|
||||
|
||||
|
||||
<Exceptions>
|
||||
tuf.FormatError, if an error occurred trying to generate the targets
|
||||
metadata object.
|
||||
|
||||
tuf.Error, if any of the target files cannot be read.
|
||||
tuf.Error, if any of the target files could not be read.
|
||||
|
||||
<Side Effects>
|
||||
The target files are read and file information generated about them.
|
||||
|
|
@ -2512,42 +2452,33 @@ def generate_targets_metadata(targets_directory, target_files, version,
|
|||
A targets metadata object, conformant to 'tuf.formats.TARGETS_SCHEMA'.
|
||||
"""
|
||||
|
||||
# Do the arguments have the correct format?
|
||||
# This check ensures arguments have the appropriate number of objects and
|
||||
# object types, and that all dict keys are properly named.
|
||||
# Raise 'tuf.FormatError' if the check fails.
|
||||
# Do the arguments have the correct format.
|
||||
# Raise 'tuf.FormatError' if there is a mismatch.
|
||||
tuf.formats.PATH_SCHEMA.check_match(targets_directory)
|
||||
tuf.formats.PATHS_SCHEMA.check_match(target_files)
|
||||
tuf.formats.METADATAVERSION_SCHEMA.check_match(version)
|
||||
tuf.formats.TIME_SCHEMA.check_match(expiration_date)
|
||||
|
||||
if delegations is not None:
|
||||
tuf.formats.DELEGATIONS_SCHEMA.check_match(delegations)
|
||||
|
||||
# Store the file attributes of the targets in 'target_files'. 'filedict',
|
||||
# conformant to 'tuf.formats.FILEDICT_SCHEMA', is added the targes metadata
|
||||
# object returned.
|
||||
|
||||
filedict = {}
|
||||
|
||||
# Ensure the user is aware of a non-existent 'targets_directory', and convert
|
||||
# it to its absolute path if it exists.
|
||||
targets_directory = _check_directory(targets_directory)
|
||||
|
||||
# Generate the fileinfo for all the target files listed in 'target_files'.
|
||||
# Generate the file info for all the target files listed in 'target_files'.
|
||||
for target in target_files:
|
||||
|
||||
# The root folder of the targets directory should not be included
|
||||
# (e.g., 'targets/more_targets/somefile.txt' -> 'more_targets/somefile.txt')
|
||||
# Strip 'targets/' from from 'target' and keep the rest (e.g.,
|
||||
# 'targets/more_targets/somefile.txt' -> 'more_targets/somefile.txt'
|
||||
#relative_targetpath = os.path.sep.join(target.split(os.path.sep)[1:])
|
||||
relative_targetpath = target
|
||||
target_path = os.path.join(targets_directory, target)
|
||||
|
||||
# Ensure all target files listed in 'target_files' exist. If just one of
|
||||
# these files does not exist, raise an exception.
|
||||
|
||||
if not os.path.exists(target_path):
|
||||
message = repr(target_path)+' cannot be read. Unable to generate '+ \
|
||||
'targets metadata.'
|
||||
raise tuf.Error(message)
|
||||
|
||||
# Update 'filedict' with the file attributes of 'target_path'.
|
||||
|
||||
filedict[relative_targetpath] = get_metadata_file_info(target_path)
|
||||
|
||||
# Generate the targets metadata object.
|
||||
|
|
|
|||
|
|
@ -579,5 +579,3 @@ def load_json_file(filepath):
|
|||
return json.load(fileobject)
|
||||
finally:
|
||||
fileobject.close()
|
||||
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue