mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Merge remote-tracking branch 'origin/master' into log-to-console
This commit is contained in:
commit
c70dbbeeef
24 changed files with 1060 additions and 236 deletions
|
|
@ -191,7 +191,7 @@ def parse_options():
|
|||
# Ensure the '--repo' option was set by the user.
|
||||
if options.REPOSITORY_MIRROR is None:
|
||||
message = '"--repo" must be set on the command-line.'
|
||||
option_parser.error(message)
|
||||
parser.error(message)
|
||||
|
||||
# Return the repository mirror containing the metadata and target files.
|
||||
return options.REPOSITORY_MIRROR
|
||||
|
|
|
|||
|
|
@ -561,12 +561,12 @@
|
|||
{ "keys" : {
|
||||
KEYID : KEY,
|
||||
... },
|
||||
"roles" : {
|
||||
ROLE : {
|
||||
"keyids" : [ KEYID, ... ] ,
|
||||
"threshold" : THRESHOLD,
|
||||
"paths" : [ PATHPATTERN, ... ] }
|
||||
, ... }
|
||||
"roles" : [{
|
||||
"name": ROLE,
|
||||
"keyids" : [ KEYID, ... ] ,
|
||||
"threshold" : THRESHOLD,
|
||||
"paths" : [ PATHPATTERN, ... ]
|
||||
}, ... ]
|
||||
}
|
||||
|
||||
The "paths" list describes paths that the role is trusted to provide.
|
||||
|
|
@ -586,15 +586,8 @@
|
|||
on. The metadata of the first delegation will override that of the second delegation,
|
||||
the metadata of the second delegation will override that of the third
|
||||
delegation, and so on. In order to accommodate this scheme, the "roles" key
|
||||
in the DELEGATIONS object above should point to an array, instead of a hash
|
||||
table, of delegated roles, like so:
|
||||
|
||||
"roles" : [{
|
||||
"role": ROLE,
|
||||
"keyids" : [ KEYID, ... ] ,
|
||||
"threshold" : THRESHOLD,
|
||||
"paths" : [ PATHPATTERN, ... ]
|
||||
}, ... ]
|
||||
in the DELEGATIONS object above points to an array, instead of a hash
|
||||
table, of delegated roles.
|
||||
|
||||
Another priority tag scheme would have the clients prefer the delegated role
|
||||
with the latest metadata for a conflicting target path. Similar ideas were
|
||||
|
|
|
|||
|
|
@ -51,6 +51,9 @@
|
|||
class EnvelopeError(evp.SSLError):
|
||||
pass
|
||||
|
||||
class KeygenError(evp.SSLError):
|
||||
pass
|
||||
|
||||
def _build_dkey_from_file(keyfile):
|
||||
fp = evp.fopen(keyfile, "r")
|
||||
if not fp:
|
||||
|
|
|
|||
|
|
@ -289,7 +289,7 @@ def build_repository(project_directory):
|
|||
os.mkdir(keystore_directory)
|
||||
# 'OSError' raised if the directory cannot be created.
|
||||
except OSError, e:
|
||||
if e.errno == EEXIST:
|
||||
if e.errno == errno.EEXIST:
|
||||
pass
|
||||
else:
|
||||
raise
|
||||
|
|
|
|||
3
setup.py
3
setup.py
|
|
@ -75,7 +75,8 @@
|
|||
'tuf.interposition',
|
||||
'tuf.pushtools',
|
||||
'tuf.pushtools.transfer',
|
||||
'tuf.repo'
|
||||
'tuf.repo',
|
||||
'tuf.tests'
|
||||
],
|
||||
scripts=[
|
||||
'quickstart.py',
|
||||
|
|
|
|||
|
|
@ -100,19 +100,20 @@
|
|||
|
||||
"""
|
||||
|
||||
import os
|
||||
import time
|
||||
import logging
|
||||
import shutil
|
||||
import errno
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import time
|
||||
|
||||
import tuf.conf
|
||||
import tuf.download
|
||||
import tuf.formats
|
||||
import tuf.keydb
|
||||
import tuf.roledb
|
||||
import tuf.mirrors
|
||||
import tuf.download
|
||||
import tuf.conf
|
||||
import tuf.log
|
||||
import tuf.mirrors
|
||||
import tuf.repo.signerlib
|
||||
import tuf.roledb
|
||||
import tuf.sig
|
||||
import tuf.util
|
||||
|
||||
|
|
@ -407,6 +408,7 @@ def _load_metadata_from_file(self, metadata_set, metadata_role):
|
|||
if metadata_role == 'root':
|
||||
self._rebuild_key_and_role_db()
|
||||
elif metadata_object['_type'] == 'Targets':
|
||||
# TODO: Should we also remove the keys of the delegated roles?
|
||||
tuf.roledb.remove_delegated_roles(metadata_role)
|
||||
self._import_delegations(metadata_role)
|
||||
|
||||
|
|
@ -489,7 +491,7 @@ def _import_delegations(self, parent_role):
|
|||
|
||||
# This could be quite slow with a huge number of delegations.
|
||||
keys_info = current_parent_metadata['delegations'].get('keys', {})
|
||||
roles_info = current_parent_metadata['delegations'].get('roles', {})
|
||||
roles_info = current_parent_metadata['delegations'].get('roles', [])
|
||||
|
||||
logger.debug('Adding roles delegated from '+repr(parent_role)+'.')
|
||||
|
||||
|
|
@ -514,14 +516,18 @@ def _import_delegations(self, parent_role):
|
|||
continue
|
||||
|
||||
# Add the roles to the role database.
|
||||
for rolename, roleinfo in roles_info.items():
|
||||
logger.debug('Adding delegated role: '+repr(rolename)+'.')
|
||||
for roleinfo in roles_info:
|
||||
try:
|
||||
# NOTE: tuf.roledb.add_role will take care
|
||||
# of the case where rolename is None.
|
||||
rolename = roleinfo.get('name')
|
||||
logger.debug('Adding delegated role: '+str(rolename)+'.')
|
||||
tuf.roledb.add_role(rolename, roleinfo)
|
||||
except tuf.RoleAlreadyExistsError, e:
|
||||
logger.warn('Role already exists: '+rolename)
|
||||
except (tuf.FormatError, tuf.InvalidNameError), e:
|
||||
except:
|
||||
logger.exception('Failed to add delegated role: '+rolename+'.')
|
||||
raise
|
||||
|
||||
|
||||
|
||||
|
|
@ -854,6 +860,7 @@ def _update_metadata_if_changed(self, metadata_role, referenced_metadata='releas
|
|||
# may not be trusted anymore.
|
||||
if metadata_role == 'targets' or metadata_role.startswith('targets/'):
|
||||
logger.debug('Removing delegated roles of '+repr(metadata_role)+'.')
|
||||
# TODO: Should we also remove the keys of the delegated roles?
|
||||
tuf.roledb.remove_delegated_roles(metadata_role)
|
||||
self._import_delegations(metadata_role)
|
||||
|
||||
|
|
@ -893,28 +900,47 @@ def _ensure_all_targets_allowed(self, metadata_role, metadata_object):
|
|||
None.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
MISSING_ROLE_MESSAGE = '{parent_role} has not delegated to ' \
|
||||
'{metadata_role}!'
|
||||
UNDELEGATED_TARGETS_MESSAGE = 'Role {metadata_role} specifies targets ' \
|
||||
'{undelegated_targets} which are not ' \
|
||||
'allowed paths according to the ' \
|
||||
'delegations set by {parent_role}.'
|
||||
|
||||
# Return if 'metadata_role' is 'targets'. 'targets' is not
|
||||
# a delegated role.
|
||||
if metadata_role == 'targets':
|
||||
return
|
||||
|
||||
# The targets of delegated roles are stored in the parent's
|
||||
# metadata file. Retrieve the parent role of 'metadata_role'
|
||||
# to confirm 'metadata_role' contains valid targets.
|
||||
parent_role = tuf.roledb.get_parent_rolename(metadata_role)
|
||||
else:
|
||||
# The targets of delegated roles are stored in the parent's
|
||||
# metadata file. Retrieve the parent role of 'metadata_role'
|
||||
# to confirm 'metadata_role' contains valid targets.
|
||||
parent_role = tuf.roledb.get_parent_rolename(metadata_role)
|
||||
|
||||
# Iterate through the targets of 'metadata_role' and confirm
|
||||
# these targets with the paths listed in the parent role.
|
||||
for target_filepath in metadata_object['targets'].keys():
|
||||
if target_filepath not in self.metadata['current'][parent_role] \
|
||||
['delegations']['roles'] \
|
||||
[metadata_role]['paths']:
|
||||
|
||||
message = 'Role '+repr(metadata_role)+' specifies target '+ \
|
||||
target_filepath+' which is not an allowed path according '+ \
|
||||
'to the delegations set by '+repr(parent_role)+'.'
|
||||
raise tuf.RepositoryError(message)
|
||||
# Iterate through the targets of 'metadata_role' and confirm
|
||||
# these targets with the paths listed in the parent role.
|
||||
roles = self.metadata['current'][parent_role]['delegations']['roles']
|
||||
role_index = tuf.repo.signerlib.find_delegated_role(roles, metadata_role)
|
||||
|
||||
if role_index is None:
|
||||
raise tuf.RepositoryError(MISSING_ROLE_MESSAGE.format(
|
||||
parent_role=parent_role,
|
||||
metadata_role=metadata_role))
|
||||
else:
|
||||
role = roles[role_index]
|
||||
|
||||
# Test for breach of delegation with set operations; asymptotically, this
|
||||
# is faster than a linear scan, but at the expense of memory.
|
||||
delegated_targets = set(role['paths'])
|
||||
signed_targets = set(metadata_object['targets'].keys())
|
||||
undelegated_targets = signed_targets - delegated_targets
|
||||
|
||||
if len(undelegated_targets) > 0:
|
||||
raise tuf.RepositoryError(UNDELEGATED_TARGETS_MESSAGE.format(
|
||||
metadata_role=metadata_role,
|
||||
undelegated_targets=undelegated_targets,
|
||||
parent_role=parent_role))
|
||||
|
||||
|
||||
|
||||
|
|
@ -1426,8 +1452,11 @@ def targets_of_role(self, rolename='targets'):
|
|||
def target(self, target_filepath):
|
||||
"""
|
||||
<Purpose>
|
||||
Return the target file information for 'target_filepath'.
|
||||
|
||||
Return the target file information for 'target_filepath'. We interrogate
|
||||
the tree of target delegations in order of appearance (which implicitly
|
||||
order trustworthiness), and return the matching target found in the most
|
||||
trusted role.
|
||||
|
||||
<Arguments>
|
||||
target_filepath:
|
||||
The path to the target file on the repository. This
|
||||
|
|
@ -1439,8 +1468,10 @@ def target(self, target_filepath):
|
|||
If 'target_filepath' is improperly formatted.
|
||||
|
||||
tuf.RepositoryError:
|
||||
If 'target_filepath' was not found or there were more multiple
|
||||
versions (same file path but different file attributes).
|
||||
If 'target_filepath' was not found.
|
||||
|
||||
Exception:
|
||||
In case of an unforeseen runtime error.
|
||||
|
||||
<Side Effects>
|
||||
The metadata for updated delegated roles are download and stored.
|
||||
|
|
@ -1457,51 +1488,57 @@ def target(self, target_filepath):
|
|||
|
||||
# Refresh the target metadata for all the delegated roles.
|
||||
self._refresh_targets_metadata(include_delegations=True)
|
||||
all_rolenames = tuf.roledb.get_rolenames()
|
||||
|
||||
# Iterate through all the target metadata. Take precautions
|
||||
# to avoid duplicate files.
|
||||
target = []
|
||||
for rolename in all_rolenames:
|
||||
if self.metadata['current'][rolename]['_type'] != 'Targets':
|
||||
continue
|
||||
# We have a target role. Extract the filepath and fileinfo
|
||||
# and compare it to 'target_filepath'. Compare the fileinfo
|
||||
# to avoid duplicates.
|
||||
for filepath, fileinfo in self.metadata['current'][rolename] \
|
||||
['targets'].items():
|
||||
if target_filepath == filepath:
|
||||
# If 'target' is empty, we can just go ahead and add 'target_filepath'
|
||||
# No need to check for duplicates in this case.
|
||||
if len(target) == 0:
|
||||
new_target = {}
|
||||
new_target['filepath'] = filepath
|
||||
new_target['fileinfo'] = fileinfo
|
||||
target.append(new_target)
|
||||
continue
|
||||
# It appears we have a duplicate. If the fileinfo match,
|
||||
# do not add the duplicate. Move on to the next target.
|
||||
elif len(target) == 1:
|
||||
if target[0]['fileinfo'] == fileinfo:
|
||||
continue
|
||||
# TODO: What if an existing file, that is listed in the targets
|
||||
# metadata, gets delegated? This needs to be looked at.
|
||||
# Okay, we have a matching filepath but a different fileinfo
|
||||
# for the duplicate. Which one is the client expecting?
|
||||
# And why would the metadata list two different versions of the
|
||||
# same file? Raise an exception.
|
||||
else:
|
||||
message = 'Found multiple '+repr(target_filepath)+'.'
|
||||
logger.error(message)
|
||||
#raise tuf.RepositoryError(message)
|
||||
|
||||
# Riase an exception if the target information could not be retrieved.
|
||||
if len(target) == 0:
|
||||
message = repr(target_filepath)+' not found.'
|
||||
logger.error(message)
|
||||
raise tuf.RepositoryError(message)
|
||||
|
||||
return target[0]
|
||||
# The target is assumed to be missing until proven otherwise.
|
||||
target = None
|
||||
|
||||
try:
|
||||
current_metadata = self.metadata['current']
|
||||
role_names = ['targets']
|
||||
|
||||
# Preorder depth-first traversal of the tree of target delegations.
|
||||
while len(role_names) > 0 and target is None:
|
||||
# Pop the role name from the top of the stack.
|
||||
role_name = role_names.pop(-1)
|
||||
role_metadata = current_metadata[role_name]
|
||||
targets = role_metadata['targets']
|
||||
delegations = role_metadata.get('delegations', {})
|
||||
child_roles = delegations.get('roles', [])
|
||||
|
||||
# Does the current role name have our target?
|
||||
logger.info('Asking role '+role_name+' about target '+target_filepath)
|
||||
for filepath, fileinfo in targets.iteritems():
|
||||
if filepath == target_filepath:
|
||||
logger.info('Found target '+target_filepath+' in role '+role_name)
|
||||
target = {'filepath': filepath, 'fileinfo': fileinfo}
|
||||
break
|
||||
|
||||
# Push children in reverse order of appearance onto the stack.
|
||||
for child_role in reversed(child_roles):
|
||||
child_role_name = child_role['name']
|
||||
child_role_paths = child_role['paths']
|
||||
|
||||
# Ensure that we explore only delegated roles trusted with the target.
|
||||
# We assume conservation of delegated paths in the complete tree of
|
||||
# delegations. Note that the call to _ensure_all_targets_allowed in
|
||||
# _update_metadata should already ensure that all targets metadata is
|
||||
# valid; i.e. that the targets signed by a delegatee is a proper
|
||||
# subset of the targets delegated to it by the delegator.
|
||||
# Nevertheless, we check it again here for performance and safety
|
||||
# reasons.
|
||||
if target_filepath in child_role_paths:
|
||||
role_names.append(child_role_name)
|
||||
except:
|
||||
raise
|
||||
finally:
|
||||
# Raise an exception if the target information could not be retrieved.
|
||||
if target is None:
|
||||
message = target_filepath+' not found.'
|
||||
logger.error(message)
|
||||
raise tuf.RepositoryError(message)
|
||||
# Otherwise, return the found target.
|
||||
else:
|
||||
return target
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
122
tuf/download.py
122
tuf/download.py
|
|
@ -226,7 +226,91 @@ def _check_hashes(input_file, trusted_hashes):
|
|||
|
||||
|
||||
|
||||
def download_url_to_tempfileobj(url, required_hashes=None, required_length=None):
|
||||
def _download_fixed_amount_of_data(connection, temp_file, file_length,
|
||||
required_length):
|
||||
"""
|
||||
<Purpose>
|
||||
This is a helper function, where the download really happens. While-block
|
||||
reads data from connection a fixed chunk of data at a time, or less, until
|
||||
'file_length' is reached.
|
||||
|
||||
<Arguments>
|
||||
connection:
|
||||
The object that the _open_connection returns for communicating with the
|
||||
server about the contents of a URL.
|
||||
|
||||
temp_file:
|
||||
A temporary file where the contents at the URL specified by the
|
||||
'connection' object will be stored.
|
||||
|
||||
file_length:
|
||||
The number of bytes that the server claims is the size of the file.
|
||||
|
||||
required_length:
|
||||
The number of bytes that we must download for the file. This is almost
|
||||
always specified by the TUF metadata for the data file in question
|
||||
(except in the case of timestamp metadata, in which case we would fix a
|
||||
reasonable upper bound).
|
||||
|
||||
<Side Effects>
|
||||
Data from the server will be written to 'temp_file'.
|
||||
|
||||
<Exceptions>
|
||||
Runtime or network exceptions will be raised without question.
|
||||
|
||||
<Returns>
|
||||
total_downloaded:
|
||||
The total number of bytes we have downloaded for the desired file and
|
||||
which should be equal to 'required_length'.
|
||||
|
||||
"""
|
||||
|
||||
# The maximum chunk of data, in bytes, we would download in every round.
|
||||
BLOCK_SIZE = 8192
|
||||
|
||||
# Keep track of total bytes downloaded.
|
||||
total_downloaded = 0
|
||||
|
||||
try:
|
||||
while True:
|
||||
# We download a fixed chunk of data in every round. This is so that we
|
||||
# can defend against slow retrieval attacks. Furthermore, we do not wish
|
||||
# to download an extremely large file in one shot.
|
||||
data = connection.read(min(BLOCK_SIZE, file_length-total_downloaded))
|
||||
|
||||
# We might have no more data to read. Check number of bytes downloaded.
|
||||
if not data:
|
||||
message = 'Downloaded '+str(total_downloaded)+'/'+ \
|
||||
str(file_length)+' bytes.'
|
||||
logger.debug(message)
|
||||
|
||||
# Did we download the correct amount indicated by 'Content-Length'
|
||||
# or user? Because file_length is always eaqual to required_length
|
||||
# we just need check one of them.
|
||||
if total_downloaded != file_length:
|
||||
message = 'Downloaded '+str(total_downloaded)+'. Expected '+ \
|
||||
str(file_length)+' for '+url
|
||||
raise tuf.DownloadError(message)
|
||||
|
||||
# Finally, we signal that the download is complete.
|
||||
break
|
||||
|
||||
# Data successfully read from the connection. Store it.
|
||||
temp_file.write(data)
|
||||
total_downloaded = total_downloaded + len(data)
|
||||
except:
|
||||
raise
|
||||
else:
|
||||
return total_downloaded
|
||||
finally:
|
||||
connection.close()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def download_url_to_tempfileobj(url, required_hashes=None,
|
||||
required_length=None):
|
||||
"""
|
||||
<Purpose>
|
||||
Given the url, hashes and length of the desired file, this function
|
||||
|
|
@ -260,7 +344,7 @@ def download_url_to_tempfileobj(url, required_hashes=None, required_length=None)
|
|||
|
||||
<Returns>
|
||||
'tuf.util.TempFile' instance.
|
||||
|
||||
|
||||
"""
|
||||
|
||||
# Do all of the arguments have the appropriate format?
|
||||
|
|
@ -279,8 +363,6 @@ def download_url_to_tempfileobj(url, required_hashes=None, required_length=None)
|
|||
connection = _open_connection(url)
|
||||
temp_file = tuf.util.TempFile()
|
||||
|
||||
# Keep track of total bytes downloaded.
|
||||
total_downloaded = 0
|
||||
|
||||
try:
|
||||
# info().get('Content-Length') gets the length of the url file.
|
||||
|
|
@ -314,32 +396,14 @@ def download_url_to_tempfileobj(url, required_hashes=None, required_length=None)
|
|||
', got '+str(file_length)+' bytes.'
|
||||
raise tuf.DownloadError(message)
|
||||
|
||||
# While-block reads data from connection 8192-bytes at a time, or less,
|
||||
# until 'file_length' is reached.
|
||||
while True:
|
||||
data = connection.read(min(8192, file_length - total_downloaded))
|
||||
# We might have no more data to read. Let us check bytes downloaded.
|
||||
if not data:
|
||||
message = 'Downloaded '+str(total_downloaded)+'/' \
|
||||
+str(file_length)+' bytes.'
|
||||
logger.debug(message)
|
||||
# Did we download the correct amount indicated by 'Content-Length'?
|
||||
if total_downloaded != file_length:
|
||||
message = 'Downloaded '+str(total_downloaded)+'. Expected '+ \
|
||||
str(file_length)+' for '+url
|
||||
raise tuf.DownloadError(message)
|
||||
# Did we download the correct amount indicated by the user?
|
||||
if required_length is not None and total_downloaded != required_length:
|
||||
message = 'The user-required length of '+str(required_length)+ \
|
||||
'did not match the '+str(len(total_downloaded))+' downloaded'
|
||||
raise tuf.DownloadError(message)
|
||||
break
|
||||
# Data successfully read from the connection. Store it.
|
||||
temp_file.write(data)
|
||||
total_downloaded = total_downloaded + len(data)
|
||||
# For readibility, we perform the download in a separate function, which
|
||||
# returns the total number of downloaded bytes; this number should be equal
|
||||
# to required_length.
|
||||
total_downloaded = _download_fixed_amount_of_data(connection, temp_file,
|
||||
file_length,
|
||||
required_length)
|
||||
|
||||
# We appear to have downloaded the correct amount. Check the hashes.
|
||||
connection.close()
|
||||
if required_length is not None and required_hashes is not None:
|
||||
_check_hashes(temp_file, required_hashes)
|
||||
|
||||
|
|
@ -351,3 +415,5 @@ def download_url_to_tempfileobj(url, required_hashes=None, required_length=None)
|
|||
raise tuf.DownloadError(e)
|
||||
|
||||
return temp_file
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -267,13 +267,14 @@
|
|||
targets_directory=PATH_SCHEMA,
|
||||
backup_directory=PATH_SCHEMA))
|
||||
|
||||
# Role object in {'keyids': [keydids..], 'threshold': 1, 'paths':[filepaths..]}
|
||||
# format.
|
||||
# Role object in {'keyids': [keydids..], 'name': 'ABC', 'threshold': 1,
|
||||
# 'paths':[filepaths..]} # format.
|
||||
ROLE_SCHEMA = SCHEMA.Object(
|
||||
object_name='role',
|
||||
keyids=SCHEMA.ListOf(KEYID_SCHEMA),
|
||||
name=SCHEMA.Optional(ROLENAME_SCHEMA),
|
||||
threshold=THRESHOLD_SCHEMA,
|
||||
paths=SCHEMA.Optional(SCHEMA.ListOf(RELPATH_SCHEMA)))
|
||||
paths=SCHEMA.Optional(RELPATHS_SCHEMA))
|
||||
|
||||
# A dict of roles where the dict keys are role names and the dict values holding
|
||||
# the role data/information.
|
||||
|
|
@ -281,6 +282,9 @@
|
|||
key_schema=ROLENAME_SCHEMA,
|
||||
value_schema=ROLE_SCHEMA)
|
||||
|
||||
# Like ROLEDICT_SCHEMA, except that ROLE_SCHEMA instances are stored in order.
|
||||
ROLELIST_SCHEMA = SCHEMA.ListOf(ROLE_SCHEMA)
|
||||
|
||||
# The root: indicates root keys and top-level roles.
|
||||
ROOT_SCHEMA = SCHEMA.Object(
|
||||
object_name='root',
|
||||
|
|
@ -299,7 +303,7 @@
|
|||
targets=FILEDICT_SCHEMA,
|
||||
delegations=SCHEMA.Optional(SCHEMA.Object(
|
||||
keys=KEYDICT_SCHEMA,
|
||||
roles=ROLEDICT_SCHEMA)))
|
||||
roles=ROLELIST_SCHEMA)))
|
||||
|
||||
# A Release: indicates the latest versions of all metadata (except timestamp).
|
||||
RELEASE_SCHEMA = SCHEMA.Object(
|
||||
|
|
@ -324,7 +328,7 @@
|
|||
url_prefix=URL_SCHEMA,
|
||||
metadata_path=RELPATH_SCHEMA,
|
||||
targets_path=RELPATH_SCHEMA,
|
||||
confined_target_dirs=SCHEMA.ListOf(RELPATH_SCHEMA),
|
||||
confined_target_dirs=RELPATHS_SCHEMA,
|
||||
custom=SCHEMA.Optional(SCHEMA.Object()))
|
||||
|
||||
# A dictionary of mirrors where the dict keys hold the mirror's name and
|
||||
|
|
@ -818,7 +822,7 @@ def make_fileinfo(length, hashes, custom=None):
|
|||
|
||||
|
||||
|
||||
def make_role_metadata(keyids, threshold, paths=None):
|
||||
def make_role_metadata(keyids, threshold, name=None, paths=None):
|
||||
"""
|
||||
<Purpose>
|
||||
Create a dictionary conforming to 'tuf.formats.ROLE_SCHEMA',
|
||||
|
|
@ -833,6 +837,9 @@ def make_role_metadata(keyids, threshold, paths=None):
|
|||
An integer denoting the number of required keys
|
||||
for the signing role.
|
||||
|
||||
name:
|
||||
A string that is the name of this role.
|
||||
|
||||
paths:
|
||||
The 'Target' role stores the paths of target files
|
||||
in its metadata file. 'paths' is a list of
|
||||
|
|
@ -856,6 +863,10 @@ def make_role_metadata(keyids, threshold, paths=None):
|
|||
role_meta = {}
|
||||
role_meta['keyids'] = keyids
|
||||
role_meta['threshold'] = threshold
|
||||
|
||||
if name is not None:
|
||||
role_meta['name'] = name
|
||||
|
||||
if paths is not None:
|
||||
role_meta['paths'] = paths
|
||||
|
||||
|
|
|
|||
|
|
@ -167,6 +167,46 @@ def __urllib2_urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
|
|||
|
||||
|
||||
|
||||
def __read_configuration(configuration_handler,
|
||||
filename="tuf.interposition.json",
|
||||
parent_repository_directory=None,
|
||||
parent_ssl_certificates_directory=None):
|
||||
"""
|
||||
A generic function to read a TUF interposition configuration off the disk,
|
||||
and handle it. configuration_handler must be a function which accepts a
|
||||
tuf.interposition.Configuration instance."""
|
||||
|
||||
INVALID_TUF_CONFIGURATION = "Invalid configuration for {network_location}!"
|
||||
INVALID_TUF_INTERPOSITION_JSON = "Invalid configuration in {filename}!"
|
||||
NO_CONFIGURATIONS = "No configurations found in configuration in {filename}!"
|
||||
|
||||
try:
|
||||
with open(filename) as tuf_interposition_json:
|
||||
tuf_interpositions = json.load(tuf_interposition_json)
|
||||
configurations = tuf_interpositions.get("configurations", {})
|
||||
|
||||
if len(configurations) == 0:
|
||||
raise InvalidConfiguration(NO_CONFIGURATIONS.format(filename=filename))
|
||||
|
||||
else:
|
||||
for network_location, configuration in configurations.iteritems():
|
||||
try:
|
||||
configuration_parser = ConfigurationParser(network_location,
|
||||
configuration, parent_repository_directory=parent_repository_directory,
|
||||
parent_ssl_certificates_directory=parent_ssl_certificates_directory)
|
||||
|
||||
configuration = configuration_parser.parse()
|
||||
configuration_handler(configuration)
|
||||
|
||||
except:
|
||||
Logger.exception(INVALID_TUF_CONFIGURATION.format(network_location=network_location))
|
||||
raise
|
||||
|
||||
except:
|
||||
Logger.exception(INVALID_TUF_INTERPOSITION_JSON.format(filename=filename))
|
||||
raise
|
||||
|
||||
|
||||
|
||||
|
||||
# TODO: Is parent_repository_directory a security risk? For example, would it
|
||||
|
|
@ -221,35 +261,18 @@ def configure(filename="tuf.interposition.json",
|
|||
optional; it must specify certificates bundled as PEM (RFC 1422).
|
||||
"""
|
||||
|
||||
INVALID_TUF_CONFIGURATION = "Invalid configuration for {network_location}!"
|
||||
INVALID_TUF_INTERPOSITION_JSON = "Invalid configuration in {filename}!"
|
||||
NO_CONFIGURATIONS = "No configurations found in configuration in {filename}!"
|
||||
__read_configuration(__updater_controller.add, filename=filename,
|
||||
parent_repository_directory=parent_repository_directory,
|
||||
parent_ssl_certificates_directory=parent_ssl_certificates_directory)
|
||||
|
||||
try:
|
||||
with open(filename) as tuf_interposition_json:
|
||||
tuf_interpositions = json.load(tuf_interposition_json)
|
||||
configurations = tuf_interpositions.get("configurations", {})
|
||||
|
||||
if len(configurations) == 0:
|
||||
raise InvalidConfiguration(NO_CONFIGURATIONS.format(filename=filename))
|
||||
|
||||
else:
|
||||
for network_location, configuration in configurations.iteritems():
|
||||
try:
|
||||
configuration_parser = ConfigurationParser(network_location,
|
||||
configuration, parent_repository_directory=parent_repository_directory,
|
||||
parent_ssl_certificates_directory=parent_ssl_certificates_directory)
|
||||
|
||||
configuration = configuration_parser.parse()
|
||||
__updater_controller.add(configuration)
|
||||
|
||||
except:
|
||||
Logger.exception(INVALID_TUF_CONFIGURATION.format(network_location=network_location))
|
||||
raise
|
||||
def deconfigure(filename="tuf.interposition.json"):
|
||||
"""Remove TUF interposition for a previously read configuration."""
|
||||
|
||||
except:
|
||||
Logger.exception(INVALID_TUF_INTERPOSITION_JSON.format(filename=filename))
|
||||
raise
|
||||
__read_configuration(__updater_controller.remove, filename=filename)
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -51,6 +51,24 @@ def __repr__(self):
|
|||
return MESSAGE.format(network_location=self.network_location)
|
||||
|
||||
|
||||
def get_repository_mirror_hostnames(self):
|
||||
"""Get a set of hostnames of every repository mirror of this
|
||||
configuration."""
|
||||
|
||||
# Parse TUF server repository mirrors.
|
||||
repository_mirrors = self.repository_mirrors
|
||||
repository_mirror_hostnames = set()
|
||||
|
||||
for repository_mirror in repository_mirrors:
|
||||
mirror_configuration = repository_mirrors[repository_mirror]
|
||||
url_prefix = mirror_configuration["url_prefix"]
|
||||
parsed_url = urlparse.urlparse(url_prefix)
|
||||
mirror_hostname = parsed_url.hostname
|
||||
repository_mirror_hostnames.add(mirror_hostname)
|
||||
|
||||
return repository_mirror_hostnames
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -177,7 +177,7 @@ def __init__(self):
|
|||
self.__repository_mirror_hostnames = set()
|
||||
|
||||
|
||||
def __check_configuration(self, configuration):
|
||||
def __check_configuration_on_add(self, configuration):
|
||||
"""
|
||||
If the given Configuration is invalid, I raise an exception.
|
||||
Otherwise, I return some information about the Configuration,
|
||||
|
|
@ -198,42 +198,33 @@ def __check_configuration(self, configuration):
|
|||
assert configuration.hostname not in self.__updaters
|
||||
assert configuration.hostname not in self.__repository_mirror_hostnames
|
||||
|
||||
# Parse TUF server repository mirrors.
|
||||
repository_mirrors = configuration.repository_mirrors
|
||||
repository_mirror_hostnames = set()
|
||||
|
||||
for repository_mirror in repository_mirrors:
|
||||
mirror_configuration = repository_mirrors[repository_mirror]
|
||||
# Check for redundancy in server repository mirrors.
|
||||
repository_mirror_hostnames = configuration.get_repository_mirror_hostnames()
|
||||
|
||||
for mirror_hostname in repository_mirror_hostnames:
|
||||
try:
|
||||
url_prefix = mirror_configuration["url_prefix"]
|
||||
parsed_url = urlparse.urlparse(url_prefix)
|
||||
mirror_hostname = parsed_url.hostname
|
||||
|
||||
# Restrict each (incoming, outgoing) hostname pair to be unique
|
||||
# across configurations; this prevents interposition cycles,
|
||||
# Restrict each hostname in every (incoming, outgoing) pair to be
|
||||
# unique across configurations; this prevents interposition cycles,
|
||||
# amongst other things.
|
||||
assert mirror_hostname not in self.__updaters
|
||||
assert mirror_hostname not in self.__repository_mirror_hostnames
|
||||
|
||||
# Remember this mirror's hostname for the next network_location.
|
||||
repository_mirror_hostnames.add(mirror_hostname)
|
||||
|
||||
except:
|
||||
error_message = \
|
||||
INVALID_REPOSITORY_MIRROR.format(repository_mirror=repository_mirror)
|
||||
INVALID_REPOSITORY_MIRROR.format(repository_mirror=mirror_hostname)
|
||||
Logger.exception(error_message)
|
||||
raise InvalidConfiguration(error_message)
|
||||
|
||||
return repository_mirror_hostnames
|
||||
|
||||
|
||||
|
||||
def add(self, configuration):
|
||||
"""Add an Updater based on the given Configuration."""
|
||||
|
||||
UPDATER_ADDED_MESSAGE = "Updater added for {configuration}."
|
||||
|
||||
repository_mirror_hostnames = self.__check_configuration(configuration)
|
||||
repository_mirror_hostnames = self.__check_configuration_on_add(configuration)
|
||||
|
||||
# If all is well, build and store an Updater, and remember hostnames.
|
||||
self.__updaters[configuration.hostname] = Updater(configuration)
|
||||
|
|
@ -296,3 +287,23 @@ def get(self, url):
|
|||
Logger.warn(GENERIC_WARNING_MESSAGE.format(url=url))
|
||||
|
||||
return updater
|
||||
|
||||
|
||||
def remove(self, configuration):
|
||||
"""Remove an Updater matching the given Configuration."""
|
||||
|
||||
UPDATER_REMOVED_MESSAGE = "Updater removed for {configuration}."
|
||||
|
||||
assert isinstance(configuration, Configuration)
|
||||
|
||||
repository_mirror_hostnames = configuration.get_repository_mirror_hostnames()
|
||||
|
||||
assert configuration.hostname in self.__updaters
|
||||
assert repository_mirror_hostnames.issubset(self.__repository_mirror_hostnames)
|
||||
|
||||
# If all is well, remove the stored Updater as well as its associated
|
||||
# repository mirror hostnames.
|
||||
del self.__updaters[configuration.hostname]
|
||||
self.__repository_mirror_hostnames.difference_update(repository_mirror_hostnames)
|
||||
|
||||
Logger.info(UPDATER_REMOVED_MESSAGE.format(configuration=configuration))
|
||||
|
|
|
|||
|
|
@ -1035,13 +1035,27 @@ def make_delegation(keystore_directory):
|
|||
delegated_role, delegated_keyids = _get_delegated_role(keystore_directory,
|
||||
metadata_directory)
|
||||
|
||||
# Retrieve the file paths for the delegated targets.
|
||||
delegated_paths =\
|
||||
tuf.repo.signerlib.get_targets(delegated_targets_directory,
|
||||
recursive_walk=recursive_walk,
|
||||
followlinks=True)
|
||||
|
||||
# The target paths need to be relative to the repository's targets
|
||||
# directory (e.g., 'targets/role1/target_file.gif').
|
||||
# [len(repository_directory)+1:] strips the repository path, including
|
||||
# its trailing path separator.
|
||||
repository_directory, junk = os.path.split(metadata_directory)
|
||||
|
||||
for index in xrange(len(delegated_paths)):
|
||||
full_target_path = delegated_paths[index]
|
||||
assert full_target_path.startswith(repository_directory)
|
||||
relative_target_path = full_target_path[len(repository_directory)+1:]
|
||||
delegated_paths[index] = relative_target_path
|
||||
|
||||
# Create, sign, and write the delegated role's metadata file.
|
||||
delegated_paths = _make_delegated_metadata(metadata_directory,
|
||||
delegated_targets_directory,
|
||||
parent_role, delegated_role,
|
||||
delegated_keyids,
|
||||
recursive_walk=recursive_walk,
|
||||
followlinks=True)
|
||||
_make_delegated_metadata(metadata_directory, delegated_paths, parent_role,
|
||||
delegated_role, delegated_keyids)
|
||||
|
||||
# Update the parent role's metadata file. The parent role's delegation
|
||||
# field must be updated with the newly created delegated role.
|
||||
|
|
@ -1140,39 +1154,17 @@ def _get_delegated_role(keystore_directory, metadata_directory):
|
|||
|
||||
|
||||
|
||||
def _make_delegated_metadata(metadata_directory, delegated_targets_directory,
|
||||
parent_role, delegated_role, delegated_keyids,
|
||||
recursive_walk=False, followlinks=False):
|
||||
def _make_delegated_metadata(metadata_directory, delegated_paths, parent_role,
|
||||
delegated_role, delegated_keyids):
|
||||
"""
|
||||
Create, sign, and write the metadata file for the newly added delegated
|
||||
role. Determine the delegated paths for the target files found in
|
||||
'delegated_targets_directory' and the other information needed
|
||||
role. Determine the delegated paths for the target files in
|
||||
'delegated_paths' and the other information needed
|
||||
to generate the targets metadata file for 'delegated_role'. Return
|
||||
the delegated paths to the caller.
|
||||
|
||||
"""
|
||||
|
||||
# Retrieve the file paths for the delegated targets.
|
||||
delegated_paths = []
|
||||
|
||||
# The target paths need to be relative to the repository's targets
|
||||
# directory (e.g., 'targets/role1/target_file.gif').
|
||||
# [len(repository_directory)+1:] strips the repository path, including
|
||||
# its trailing path separator.
|
||||
repository_directory, junk = os.path.split(metadata_directory)
|
||||
|
||||
for dirpath, dirnames, filenames in os.walk(delegated_targets_directory,
|
||||
followlinks=followlinks):
|
||||
for filename in filenames:
|
||||
full_target_path = os.path.join(dirpath, filename)
|
||||
relative_target_path = full_target_path[len(repository_directory)+1:]
|
||||
delegated_paths.append(relative_target_path)
|
||||
|
||||
# Prune the subdirectories to walk right now if we do not wish to
|
||||
# recursively walk the delegated targets directory.
|
||||
if recursive_walk is False:
|
||||
del dirnames[:]
|
||||
|
||||
message = 'There are '+str(len(delegated_paths))+' target paths for '+\
|
||||
str(delegated_role)
|
||||
logger.info(message)
|
||||
|
|
@ -1191,6 +1183,7 @@ def _make_delegated_metadata(metadata_directory, delegated_targets_directory,
|
|||
# When creating a delegated role, if the parent directory already
|
||||
# exists, this means a prior delegation has been perform by the parent.
|
||||
parent_directory = os.path.join(metadata_directory, parent_role)
|
||||
|
||||
try:
|
||||
os.mkdir(parent_directory)
|
||||
except OSError, e:
|
||||
|
|
@ -1198,6 +1191,7 @@ def _make_delegated_metadata(metadata_directory, delegated_targets_directory,
|
|||
pass
|
||||
else:
|
||||
raise
|
||||
|
||||
delegated_role_filename = delegated_role+'.txt'
|
||||
metadata_filename = os.path.join(parent_directory, delegated_role_filename)
|
||||
repository_directory, junk = os.path.split(metadata_directory)
|
||||
|
|
@ -1206,8 +1200,6 @@ def _make_delegated_metadata(metadata_directory, delegated_targets_directory,
|
|||
_sign_and_write_metadata(delegated_metadata, delegated_keyids,
|
||||
metadata_filename)
|
||||
|
||||
return delegated_paths
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -1248,15 +1240,26 @@ def _update_parent_metadata(metadata_directory, delegated_role, delegated_keyids
|
|||
delegations['keys'] = keys
|
||||
|
||||
# Update the 'roles' field.
|
||||
roles = delegations.get('roles', {})
|
||||
roles = delegations.get('roles', [])
|
||||
threshold = len(delegated_keyids)
|
||||
delegated_role = parent_role+'/'+delegated_role
|
||||
relative_paths = []
|
||||
for path in delegated_paths:
|
||||
relative_paths.append(os.path.sep.join(path.split(os.path.sep)[1:]))
|
||||
roles[delegated_role] = tuf.formats.make_role_metadata(delegated_keyids,
|
||||
threshold,
|
||||
relative_paths)
|
||||
role_metadata = tuf.formats.make_role_metadata(delegated_keyids, threshold,
|
||||
name=delegated_role,
|
||||
paths=relative_paths)
|
||||
role_index = tuf.repo.signerlib.find_delegated_role(roles, delegated_role)
|
||||
|
||||
if role_index is None:
|
||||
# Append role to the end of the list of delegated roles.
|
||||
logger.info('Appending role '+delegated_role+' to '+parent_role)
|
||||
roles.append(role_metadata)
|
||||
else:
|
||||
# Update role with the same name.
|
||||
logger.info('Replacing role '+delegated_role+' in '+parent_role)
|
||||
roles[role_index] = role_metadata
|
||||
|
||||
delegations['roles'] = roles
|
||||
|
||||
# Update the larger metadata structure.
|
||||
|
|
|
|||
|
|
@ -1148,10 +1148,10 @@ def build_delegated_role_file(delegated_targets_directory, delegated_keyids,
|
|||
delegation_role_name):
|
||||
"""
|
||||
<Purpose>
|
||||
Build the targets metadata file using the signing keys in 'targets_keyids'.
|
||||
The generated metadata file is saved to 'metadata_directory'. The target
|
||||
files located in 'targets_directory' will be tracked by the built targets
|
||||
metadata.
|
||||
Build the targets metadata file using the signing keys in
|
||||
'delegated_keyids'. The generated metadata file is saved to
|
||||
'metadata_directory'. The target files located in 'targets_directory' will
|
||||
be tracked by the built targets metadata.
|
||||
|
||||
<Arguments>
|
||||
delegated_targets_directory:
|
||||
|
|
@ -1216,3 +1216,146 @@ def build_delegated_role_file(delegated_targets_directory, delegated_keyids,
|
|||
signable = sign_metadata(targets_metadata, delegated_keyids, targets_filepath)
|
||||
|
||||
return write_metadata_file(signable, targets_filepath)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def find_delegated_role(roles, delegated_role):
|
||||
"""
|
||||
<Purpose>
|
||||
Find the index, if any, of a role with a given name in a list of roles.
|
||||
|
||||
<Arguments>
|
||||
roles:
|
||||
The list of roles, each of which must have a name.
|
||||
|
||||
delegated_role:
|
||||
The name of the role to be found in the list of roles.
|
||||
|
||||
<Exceptions>
|
||||
tuf.RepositoryError, if the list of roles has invalid data.
|
||||
|
||||
<Side Effects>
|
||||
No known side effects.
|
||||
|
||||
<Returns>
|
||||
None, if the role with the given name does not exist, or its unique index
|
||||
in the list of roles.
|
||||
|
||||
"""
|
||||
|
||||
# Check argument types.
|
||||
tuf.formats.ROLELIST_SCHEMA.check_match(roles)
|
||||
tuf.formats.ROLENAME_SCHEMA.check_match(delegated_role)
|
||||
|
||||
# The index of a role, if any, with the same name.
|
||||
role_index = None
|
||||
|
||||
for index in xrange(len(roles)):
|
||||
role = roles[index]
|
||||
name = role.get('name')
|
||||
# This role has no name.
|
||||
if name is None:
|
||||
no_name_message = 'Role with no name!'
|
||||
raise tuf.RepositoryError(no_name_message)
|
||||
# Does this role have the same name?
|
||||
else:
|
||||
# This role has the same name, and...
|
||||
if name == delegated_role:
|
||||
# ...it is the only known role with the same name.
|
||||
if role_index is None:
|
||||
role_index = index
|
||||
# ...there are at least two roles with the same name!
|
||||
else:
|
||||
duplicate_role_message = 'Duplicate role ('+str(delegated_role)+')!'
|
||||
raise tuf.RepositoryError(duplicate_role_message)
|
||||
# This role has a different name.
|
||||
else:
|
||||
continue
|
||||
|
||||
return role_index
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def accept_any_file(full_target_path):
|
||||
"""
|
||||
<Purpose>
|
||||
Simply accept any given file.
|
||||
|
||||
<Arguments>
|
||||
full_target_path:
|
||||
The absolute path to a target file.
|
||||
|
||||
<Exceptions>
|
||||
None.
|
||||
|
||||
<Side Effects>
|
||||
None.
|
||||
|
||||
<Returns>
|
||||
True.
|
||||
"""
|
||||
|
||||
return True
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def get_targets(files_directory, recursive_walk=False, followlinks=True,
|
||||
file_predicate=accept_any_file):
|
||||
"""
|
||||
<Purpose>
|
||||
Walk the given files_directory to build a list of target files in it.
|
||||
|
||||
<Arguments>
|
||||
files_directory:
|
||||
The path to a directory of target files.
|
||||
|
||||
recursive_walk:
|
||||
To recursively walk the directory, set recursive_walk=True.
|
||||
|
||||
followlinks:
|
||||
To follow symbolic links, set followlinks=True.
|
||||
|
||||
file_predicate:
|
||||
To filter a file based on a predicate, set file_predicate to a function
|
||||
which accepts a full path to a file and returns a Boolean.
|
||||
|
||||
<Exceptions>
|
||||
Python IO exceptions.
|
||||
|
||||
<Side Effects>
|
||||
None.
|
||||
|
||||
<Returns>
|
||||
A list of absolute paths to target files in the given files_directory.
|
||||
"""
|
||||
|
||||
targets = []
|
||||
|
||||
# FIXME: We need a way to tell Python 2, but not Python 3, to return
|
||||
# filenames in Unicode; see #61 and:
|
||||
# http://docs.python.org/2/howto/unicode.html#unicode-filenames
|
||||
|
||||
for dirpath, dirnames, filenames in os.walk(files_directory,
|
||||
followlinks=followlinks):
|
||||
for filename in filenames:
|
||||
full_target_path = os.path.join(dirpath, filename)
|
||||
if file_predicate(full_target_path):
|
||||
targets.append(full_target_path)
|
||||
|
||||
# Prune the subdirectories to walk right now if we do not wish to
|
||||
# recursively walk files_directory.
|
||||
if recursive_walk is False:
|
||||
del dirnames[:]
|
||||
|
||||
return targets
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -538,7 +538,8 @@ def get_delegated_rolenames(rolename):
|
|||
None.
|
||||
|
||||
<Returns>
|
||||
A list of rolenames.
|
||||
A list of rolenames. Note that the rolenames are *NOT* sorted by order of
|
||||
delegation!
|
||||
|
||||
"""
|
||||
|
||||
|
|
|
|||
|
|
@ -105,7 +105,7 @@ def generate(bits=_DEFAULT_RSA_KEY_BITS):
|
|||
|
||||
try:
|
||||
public_key, private_key = evpy.envelope.keygen(bits, pem=True)
|
||||
except (EnvelopeError, KeygenError, MemoryError), e:
|
||||
except (evpy.envelope.EnvelopeError, evpy.envelope.KeygenError, MemoryError), e:
|
||||
raise tuf.CryptoError(e)
|
||||
|
||||
# Generate the keyid for the RSA key. 'key_value' corresponds to the
|
||||
|
|
|
|||
|
|
@ -141,8 +141,10 @@ def _mock_prompt(msg, junk):
|
|||
return delegated_targets_dir
|
||||
elif msg.startswith('\nChoose and enter the parent'):
|
||||
return parent_role
|
||||
elif msg.endswith('\nEnter the delegated role\'s name: '):
|
||||
elif msg.startswith('\nEnter the delegated role\'s name: '):
|
||||
return delegated_role_name
|
||||
elif msg.startswith('Recursively walk the given directory? (Y)es/(N)o: '):
|
||||
return 'N'
|
||||
else:
|
||||
error_msg = ('Prompt: '+'\''+msg+'\''+
|
||||
' did not match any predefined mock prompts.')
|
||||
|
|
|
|||
473
tuf/tests/system_tests/test_delegations.py
Executable file
473
tuf/tests/system_tests/test_delegations.py
Executable file
|
|
@ -0,0 +1,473 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
"""
|
||||
<Program Name>
|
||||
test_delegations.py
|
||||
|
||||
<Author>
|
||||
Konstantin Andrianov
|
||||
|
||||
<Started>
|
||||
February 19, 2012
|
||||
|
||||
<Copyright>
|
||||
See LICENSE for licensing information.
|
||||
|
||||
<Purpose>
|
||||
Ensure that TUF meets expectations about target delegations.
|
||||
"""
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
import os
|
||||
import tempfile
|
||||
import unittest
|
||||
|
||||
import tuf.formats
|
||||
import tuf.repo.keystore as keystore
|
||||
import tuf.repo.signercli as signercli
|
||||
import tuf.repo.signerlib as signerlib
|
||||
import util_test_tools
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class TestDelegationFunctions(unittest.TestCase):
|
||||
|
||||
|
||||
def do_update(self):
|
||||
# Client side repository.
|
||||
tuf_client = os.path.join(self.root_repo, 'tuf_client')
|
||||
downloads_dir = os.path.join(self.root_repo, 'downloads')
|
||||
|
||||
# Adjust client's configuration file.
|
||||
tuf.conf.repository_directory = tuf_client
|
||||
|
||||
updater = tuf.client.updater.Updater('my_repo', self.mirrors)
|
||||
|
||||
# Refresh the repository's top-level roles, store the target information for
|
||||
# all the targets tracked, and determine which of these targets have been
|
||||
# updated.
|
||||
updater.refresh()
|
||||
|
||||
# Obtain a list of available targets.
|
||||
targets = []
|
||||
relative_target_filepaths = self.relpath_from_targets(self.target_filepaths)
|
||||
for target_filepath in relative_target_filepaths:
|
||||
target_info = updater.target(target_filepath)
|
||||
targets.append(target_info)
|
||||
|
||||
# Download each of these updated targets and save them locally.
|
||||
updated_targets = updater.updated_targets(targets, downloads_dir)
|
||||
for target in updated_targets:
|
||||
updater.download_target(target, downloads_dir)
|
||||
|
||||
# Return metadata about downloaded targets.
|
||||
make_fileinfo = signerlib.get_metadata_file_info
|
||||
targets_metadata = {}
|
||||
for target_filepath in relative_target_filepaths:
|
||||
download_filepath = os.path.join(downloads_dir, target_filepath)
|
||||
target_fileinfo = signerlib.get_metadata_file_info(download_filepath)
|
||||
targets_metadata[target_filepath] = target_fileinfo
|
||||
return targets_metadata
|
||||
|
||||
|
||||
def make_targets_metadata(self):
|
||||
"""Subclasses will override this method to generate metadata for all
|
||||
targets roles, with the understanding that there is a fixed structure of
|
||||
the targets roles."""
|
||||
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def relpath_from_targets(self, target_filepaths):
|
||||
"""Ex: 'targets/more_targets/somefile.txt' -> 'more_targets/somefile.txt'
|
||||
i.e. 'targets/' is removed from 'target'."""
|
||||
|
||||
new_target_filepaths = []
|
||||
for target in target_filepaths:
|
||||
relative_targetpath = os.path.sep.join(target.split(os.path.sep)[1:])
|
||||
new_target_filepaths.append(relative_targetpath)
|
||||
return new_target_filepaths
|
||||
|
||||
|
||||
def setUp(self):
|
||||
"""
|
||||
The target delegations tree is fixed as such:
|
||||
targets -> [T1, T2]
|
||||
T1 -> [T3]
|
||||
"""
|
||||
|
||||
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=True)
|
||||
|
||||
# Server side repository.
|
||||
tuf_repo = os.path.join(root_repo, 'tuf_repo')
|
||||
keystore_dir = os.path.join(tuf_repo, 'keystore')
|
||||
metadata_dir = os.path.join(tuf_repo, 'metadata')
|
||||
targets_dir = os.path.join(tuf_repo, 'targets')
|
||||
|
||||
# We need to provide clients with a way to reach the tuf repository.
|
||||
tuf_repo_relpath = os.path.basename(tuf_repo)
|
||||
tuf_url = url+tuf_repo_relpath
|
||||
|
||||
# Add files to the server side repository.
|
||||
# target1 = 'targets_dir/[random].txt'
|
||||
# target2 = 'targets_dir/[random].txt'
|
||||
add_target = util_test_tools.add_file_to_repository
|
||||
target1_path = add_target(targets_dir, data='target1')
|
||||
target2_path = add_target(targets_dir, data='target2')
|
||||
|
||||
# Target paths relative to the 'targets_dir'.
|
||||
# Ex: targetX = 'targets/delegator/delegatee.txt'
|
||||
target1 = os.path.relpath(target1_path, tuf_repo)
|
||||
target2 = os.path.relpath(target2_path, tuf_repo)
|
||||
|
||||
# Relative to repository's targets directory.
|
||||
target_filepaths = [target1, target2]
|
||||
|
||||
# Store in self only the variables relevant for tests.
|
||||
self.root_repo = root_repo
|
||||
self.tuf_repo = tuf_repo
|
||||
self.server_proc = server_proc
|
||||
self.target_filepaths = target_filepaths
|
||||
# Targets delegated from A to B.
|
||||
self.delegated_targets = {}
|
||||
# Targets actually signed by B.
|
||||
self.signed_targets = {}
|
||||
self.mirrors = {
|
||||
"mirror1": {
|
||||
"url_prefix": tuf_url,
|
||||
"metadata_path": "metadata",
|
||||
"targets_path": "targets",
|
||||
"confined_target_dirs": [""]
|
||||
}
|
||||
}
|
||||
# Aliases for targets roles.
|
||||
self.T0 = 'targets'
|
||||
self.T1 = 'targets/T1'
|
||||
self.T2 = 'targets/T2'
|
||||
self.T3 = 'targets/T1/T3'
|
||||
|
||||
# Get tracked and assigned targets, and generate targets metadata.
|
||||
self.make_targets_metadata()
|
||||
assert hasattr(self, 'T0_metadata')
|
||||
assert hasattr(self, 'T1_metadata')
|
||||
assert hasattr(self, 'T2_metadata')
|
||||
assert hasattr(self, 'T3_metadata')
|
||||
|
||||
# Make delegation directories at the server's repository.
|
||||
metadata_targets_dir = os.path.join(metadata_dir, 'targets')
|
||||
metadata_T1_dir = os.path.join(metadata_targets_dir, 'T1')
|
||||
os.makedirs(metadata_T1_dir)
|
||||
|
||||
# Delegations metadata paths for the 3 delegated targets roles.
|
||||
T0_path = os.path.join(metadata_dir, 'targets.txt')
|
||||
T1_path = os.path.join(metadata_targets_dir, 'T1.txt')
|
||||
T2_path = os.path.join(metadata_targets_dir, 'T2.txt')
|
||||
T3_path = os.path.join(metadata_T1_dir, 'T3.txt')
|
||||
|
||||
# Generate RSA keys for the 3 delegatees.
|
||||
key1 = signerlib.generate_and_save_rsa_key(keystore_dir, 'T1')
|
||||
key2 = signerlib.generate_and_save_rsa_key(keystore_dir, 'T2')
|
||||
key3 = signerlib.generate_and_save_rsa_key(keystore_dir, 'T3')
|
||||
|
||||
# ID for each of the 3 keys.
|
||||
key1_id = key1['keyid']
|
||||
key2_id = key2['keyid']
|
||||
key3_id = key3['keyid']
|
||||
|
||||
# ID, in a list, for each of the 3 keys.
|
||||
key1_ids = [key1_id]
|
||||
key2_ids = [key2_id]
|
||||
key3_ids = [key3_id]
|
||||
|
||||
# Public-key JSON for each of the 3 keys.
|
||||
key1_val = tuf.rsa_key.create_in_metadata_format(key1['keyval'])
|
||||
key2_val = tuf.rsa_key.create_in_metadata_format(key2['keyval'])
|
||||
key3_val = tuf.rsa_key.create_in_metadata_format(key3['keyval'])
|
||||
|
||||
# Create delegation role metadata for each of the 3 delegated targets roles.
|
||||
make_role_metadata = tuf.formats.make_role_metadata
|
||||
|
||||
T1_targets = self.relpath_from_targets(self.delegated_targets[self.T1])
|
||||
T1_role = make_role_metadata(key1_ids, 1, name=self.T1, paths=T1_targets)
|
||||
|
||||
T2_targets = self.relpath_from_targets(self.delegated_targets[self.T2])
|
||||
T2_role = make_role_metadata(key2_ids, 1, name=self.T2, paths=T2_targets)
|
||||
|
||||
T3_targets = self.relpath_from_targets(self.delegated_targets[self.T3])
|
||||
T3_role = make_role_metadata(key3_ids, 1, name=self.T3, paths=T3_targets)
|
||||
|
||||
# Assign 'delegations' object for 'targets':
|
||||
self.T0_metadata['signed']['delegations'] = {
|
||||
'keys': {key1_id: key1_val, key2_id: key2_val},
|
||||
'roles': [T1_role, T2_role]
|
||||
}
|
||||
|
||||
# Assign 'delegations' object for 'targets/T1':
|
||||
self.T1_metadata['signed']['delegations'] = {
|
||||
'keys': {key3_id: key3_val},
|
||||
'roles': [T3_role]
|
||||
}
|
||||
|
||||
sign = signerlib.sign_metadata
|
||||
write = signerlib.write_metadata_file
|
||||
|
||||
# Sign new metadata objects.
|
||||
T0_signable = sign(self.T0_metadata, keyids, T0_path)
|
||||
T1_signable = sign(self.T1_metadata, key1_ids, T1_path)
|
||||
T2_signable = sign(self.T2_metadata, key2_ids, T2_path)
|
||||
T3_signable = sign(self.T3_metadata, key3_ids, T3_path)
|
||||
# Save new metadata objects.
|
||||
write(T0_signable, T0_path)
|
||||
write(T1_signable, T1_path)
|
||||
write(T2_signable, T2_path)
|
||||
write(T3_signable, T3_path)
|
||||
|
||||
# Timestamp a new release to reflect latest targets.
|
||||
signerlib.build_release_file(keyids, metadata_dir)
|
||||
signerlib.build_timestamp_file(keyids, metadata_dir)
|
||||
|
||||
# Unload all keys.
|
||||
keystore.clear_keystore()
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
util_test_tools.cleanup(self.root_repo, server_process=self.server_proc)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class TestInitialUpdateWithTargetDelegations(TestDelegationFunctions):
|
||||
"""We show that making target delegations results in a successful initial
|
||||
update of targets."""
|
||||
|
||||
|
||||
def make_targets_metadata(self):
|
||||
make_metadata = signerlib.generate_targets_metadata
|
||||
target1, target2 = self.target_filepaths
|
||||
|
||||
# Targets signed for by each of the targets roles.
|
||||
self.signed_targets[self.T0] = [target1]
|
||||
self.signed_targets[self.T1] = [target1]
|
||||
self.signed_targets[self.T2] = [target2]
|
||||
self.signed_targets[self.T3] = [target1, target2]
|
||||
|
||||
# Targets delegated to each of the delegated targets roles.
|
||||
self.delegated_targets[self.T1] = [target1]
|
||||
self.delegated_targets[self.T2] = [target2]
|
||||
self.delegated_targets[self.T3] = [target1, target2]
|
||||
|
||||
self.T0_metadata =\
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T0])
|
||||
self.T1_metadata =\
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T1])
|
||||
self.T2_metadata =\
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T2])
|
||||
self.T3_metadata = \
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T3])
|
||||
|
||||
|
||||
def test_that_initial_update_works_with_target_delegations(self):
|
||||
# Get relative target paths, because that is what TUF recognizes.
|
||||
relative_target_filepaths = self.relpath_from_targets(self.target_filepaths)
|
||||
# Get metadata about downloaded targets.
|
||||
targets_metadata = self.do_update()
|
||||
# Do we have metadata about all the expected targets?
|
||||
for target_filepath in relative_target_filepaths:
|
||||
self.assertIn(target_filepath, targets_metadata)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class TestBreachOfTargetDelegation(TestDelegationFunctions):
|
||||
"""We show that a delegated targets role B cannot talk about targets that A
|
||||
did not delegate to B."""
|
||||
|
||||
|
||||
def make_targets_metadata(self):
|
||||
make_metadata = signerlib.generate_targets_metadata
|
||||
target1, target2 = self.target_filepaths
|
||||
|
||||
# Targets signed for by each of the targets roles.
|
||||
self.signed_targets[self.T0] = []
|
||||
self.signed_targets[self.T1] = [target2]
|
||||
self.signed_targets[self.T2] = [target1]
|
||||
self.signed_targets[self.T3] = []
|
||||
|
||||
# Targets delegated to each of the delegated targets roles.
|
||||
self.delegated_targets[self.T1] = [target1]
|
||||
self.delegated_targets[self.T2] = [target2]
|
||||
self.delegated_targets[self.T3] = []
|
||||
|
||||
self.T0_metadata =\
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T0])
|
||||
self.T1_metadata =\
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T1])
|
||||
self.T2_metadata =\
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T2])
|
||||
self.T3_metadata = \
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T3])
|
||||
|
||||
|
||||
def test_that_initial_update_fails_with_undelegated_signing_of_targets(self):
|
||||
# Expect to see a particular exception on initial update.
|
||||
self.assertRaises(tuf.MetadataNotAvailableError, self.do_update)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class TestOrderOfTargetDelegationWithSuccess(TestDelegationFunctions):
|
||||
"""We show that when multiple delegated targets roles talk about a target,
|
||||
the first one in order of appearance of delegation wins.
|
||||
|
||||
In this case, the first role has the correct metadata about the target."""
|
||||
|
||||
|
||||
def make_targets_metadata(self):
|
||||
make_metadata = signerlib.generate_targets_metadata
|
||||
target1, target2 = self.target_filepaths
|
||||
|
||||
# Targets signed for by each of the targets roles.
|
||||
self.signed_targets[self.T0] = [target2]
|
||||
self.signed_targets[self.T1] = []
|
||||
self.signed_targets[self.T2] = [target1]
|
||||
self.signed_targets[self.T3] = [target1]
|
||||
|
||||
# Targets delegated to each of the delegated targets roles.
|
||||
self.delegated_targets[self.T1] = [target1]
|
||||
self.delegated_targets[self.T2] = [target1]
|
||||
self.delegated_targets[self.T3] = [target1]
|
||||
|
||||
self.T0_metadata =\
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T0])
|
||||
self.T1_metadata =\
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T1])
|
||||
self.T2_metadata =\
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T2])
|
||||
self.T3_metadata = \
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T3])
|
||||
|
||||
# Modify the hash for target1 in T2.
|
||||
for target_filepath in self.relpath_from_targets([target1]):
|
||||
target_metadata = self.T2_metadata['signed']['targets'][target_filepath]
|
||||
sha256_hash = target_metadata['hashes']['sha256']
|
||||
last_character = sha256_hash[-1]
|
||||
last_character = chr(ord(last_character)-1)
|
||||
# "Subtract" the last character of the hash.
|
||||
target_metadata['hashes']['sha256'] = sha256_hash[:-1] + last_character
|
||||
|
||||
|
||||
def test_that_initial_update_works_with_many_roles_sharing_a_target(self):
|
||||
# Get relative target paths, because that is what TUF recognizes.
|
||||
relative_target_filepaths = self.relpath_from_targets(self.target_filepaths)
|
||||
# Get metadata about downloaded targets.
|
||||
targets_metadata = self.do_update()
|
||||
# Do we have metadata about all the expected targets?
|
||||
for target_filepath in relative_target_filepaths:
|
||||
self.assertIn(target_filepath, targets_metadata)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class TestOrderOfTargetDelegationWithFailure(TestDelegationFunctions):
|
||||
"""We show that when multiple delegated targets roles talk about a target,
|
||||
the first one in order of appearance of delegation wins.
|
||||
|
||||
In this case, the first role has the wrong metadata about the target."""
|
||||
|
||||
|
||||
def make_targets_metadata(self):
|
||||
make_metadata = signerlib.generate_targets_metadata
|
||||
target1, target2 = self.target_filepaths
|
||||
|
||||
# Targets signed for by each of the targets roles.
|
||||
self.signed_targets[self.T0] = [target2]
|
||||
self.signed_targets[self.T1] = []
|
||||
self.signed_targets[self.T2] = [target1]
|
||||
self.signed_targets[self.T3] = [target1]
|
||||
|
||||
# Targets delegated to each of the delegated targets roles.
|
||||
self.delegated_targets[self.T1] = [target1]
|
||||
self.delegated_targets[self.T2] = [target1]
|
||||
self.delegated_targets[self.T3] = [target1]
|
||||
|
||||
self.T0_metadata =\
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T0])
|
||||
self.T1_metadata =\
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T1])
|
||||
self.T2_metadata =\
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T2])
|
||||
self.T3_metadata = \
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T3])
|
||||
|
||||
# Modify the hash for target1 in T3.
|
||||
for target_filepath in self.relpath_from_targets([target1]):
|
||||
target_metadata = self.T3_metadata['signed']['targets'][target_filepath]
|
||||
sha256_hash = target_metadata['hashes']['sha256']
|
||||
last_character = sha256_hash[-1]
|
||||
last_character = chr(ord(last_character)-1)
|
||||
# "Subtract" the last character of the hash.
|
||||
target_metadata['hashes']['sha256'] = sha256_hash[:-1] + last_character
|
||||
|
||||
|
||||
def test_that_initial_update_fails_with_many_roles_sharing_a_target(self):
|
||||
# Expect to see a particular exception on initial update.
|
||||
self.assertRaises(tuf.DownloadError, self.do_update)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class TestConservationOfTargetDelegation(TestDelegationFunctions):
|
||||
"""We show that delegated targets roles have to neither sign for targets
|
||||
delegated to them nor further delegate them."""
|
||||
|
||||
|
||||
def make_targets_metadata(self):
|
||||
make_metadata = signerlib.generate_targets_metadata
|
||||
target1, target2 = self.target_filepaths
|
||||
|
||||
# Targets signed for by each of the targets roles.
|
||||
self.signed_targets[self.T0] = []
|
||||
self.signed_targets[self.T1] = [target1]
|
||||
self.signed_targets[self.T2] = [target2]
|
||||
self.signed_targets[self.T3] = []
|
||||
|
||||
# Targets delegated to each of the delegated targets roles.
|
||||
self.delegated_targets[self.T1] = [target1, target2]
|
||||
self.delegated_targets[self.T2] = [target1, target2]
|
||||
self.delegated_targets[self.T3] = []
|
||||
|
||||
self.T0_metadata =\
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T0])
|
||||
self.T1_metadata =\
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T1])
|
||||
self.T2_metadata =\
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T2])
|
||||
self.T3_metadata = \
|
||||
make_metadata(self.tuf_repo, self.signed_targets[self.T3])
|
||||
|
||||
|
||||
def test_that_initial_update_works_with_unconserved_targets(self):
|
||||
# Get relative target paths, because that is what TUF recognizes.
|
||||
relative_target_filepaths = self.relpath_from_targets(self.target_filepaths)
|
||||
# Get metadata about downloaded targets.
|
||||
targets_metadata = self.do_update()
|
||||
# Do we have metadata about all the expected targets?
|
||||
for target_filepath in relative_target_filepaths:
|
||||
self.assertIn(target_filepath, targets_metadata)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
@ -1,3 +1,5 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
"""
|
||||
<Program Name>
|
||||
test_endless_data_attack.py
|
||||
|
|
@ -143,7 +145,7 @@ def test_arbitrary_package_attack(TUF=False):
|
|||
test_arbitrary_package_attack(TUF=False)
|
||||
|
||||
except EndlessDataAttack, error:
|
||||
print error
|
||||
print('Without TUF: '+str(error))
|
||||
|
||||
|
||||
|
||||
|
|
@ -151,4 +153,4 @@ def test_arbitrary_package_attack(TUF=False):
|
|||
test_arbitrary_package_attack(TUF=True)
|
||||
|
||||
except EndlessDataAttack, error:
|
||||
print error
|
||||
print('With TUF: '+str(error))
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
"""
|
||||
<Program Name>
|
||||
test_extraneous_dependencies_attack.py
|
||||
|
|
@ -179,4 +181,4 @@ def _write_rogue_metadata():
|
|||
try:
|
||||
test_extraneous_dependencies_attack()
|
||||
except ExtraneousDependenciesAttackAlert, error:
|
||||
print 'error'
|
||||
print 'error'
|
||||
|
|
|
|||
|
|
@ -204,6 +204,11 @@ def cleanup(root_repo, server_process=None):
|
|||
# Clear the keystore.
|
||||
keystore.clear_keystore()
|
||||
|
||||
# Deconfigure interposition.
|
||||
interpose_json = os.path.join(root_repo, 'tuf.interposition.json')
|
||||
if os.path.exists(interpose_json):
|
||||
tuf.interposition.deconfigure(filename=interpose_json)
|
||||
|
||||
# Removing repository directory.
|
||||
try:
|
||||
shutil.rmtree(root_repo)
|
||||
|
|
@ -381,9 +386,8 @@ def create_interposition_config(root_repo, url):
|
|||
"targets_path": "targets",
|
||||
"confined_target_dirs": [ "" ]}}}}}
|
||||
|
||||
# "target_paths": [ { "(.*\\.html)": "{0}" } ]
|
||||
|
||||
junk, interpose_json = tempfile.mkstemp(prefix='conf_', dir=root_repo)
|
||||
# We write the interposition JSON configuration at a deterministic location.
|
||||
interpose_json = os.path.join(root_repo, 'tuf.interposition.json')
|
||||
with open(interpose_json, 'wb') as fileobj:
|
||||
tuf.util.json.dump(interposition_dict, fileobj)
|
||||
|
||||
|
|
@ -549,8 +553,10 @@ def _mock_prompt(msg, junk, targets_path=delegated_targets_path,
|
|||
return targets_path
|
||||
elif msg.startswith('\nChoose and enter the parent'):
|
||||
return parent_role
|
||||
elif msg.endswith('\nEnter the delegated role\'s name: '):
|
||||
elif msg.startswith('\nEnter the delegated role\'s name: '):
|
||||
return new_role_name
|
||||
elif msg.startswith('Recursively walk the given directory? (Y)es/(N)o: '):
|
||||
return 'N'
|
||||
else:
|
||||
error_msg = ('Prompt: '+'\''+msg+'\''+
|
||||
' did not match any predefined mock prompts.')
|
||||
|
|
@ -585,4 +591,4 @@ def _mock_get_keyid(junk, keyid=keyid):
|
|||
signercli._get_keyids = original_get_keyids
|
||||
signercli._get_password = original_get_password
|
||||
signercli._prompt = original_prompt
|
||||
signercli._get_metadata_directory = original_get_metadata_directory
|
||||
signercli._get_metadata_directory = original_get_metadata_directory
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
"""
|
||||
<Program>
|
||||
test_download.py
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
"""
|
||||
<Program Name>
|
||||
test_formats.py
|
||||
|
|
@ -199,9 +201,9 @@ def test_schemas(self):
|
|||
'delegations': {'keys': {'123abc': {'keytype':'rsa',
|
||||
'keyval': {'public': 'pubkey',
|
||||
'private': 'privkey'}}},
|
||||
'roles': {'root': {'keyids': ['123abc'],
|
||||
'threshold': 1,
|
||||
'paths': ['path1/', 'path2']}}}}),
|
||||
'roles': [{'name': 'root', 'keyids': ['123abc'],
|
||||
'threshold': 1,
|
||||
'paths': ['path1/', 'path2']}]}}),
|
||||
|
||||
'RELEASE_SCHEMA': (tuf.formats.RELEASE_SCHEMA,
|
||||
{'_type': 'Release',
|
||||
|
|
@ -365,9 +367,8 @@ def test_TargetsFile(self):
|
|||
delegations = {'keys': {'123abc': {'keytype':'rsa',
|
||||
'keyval': {'public': 'pubkey',
|
||||
'private': 'privkey'}}},
|
||||
'roles': {'root': {'keyids': ['123abc'],
|
||||
'threshold': 1,
|
||||
'paths': ['path1/', 'path2']}}}
|
||||
'roles': [{'name': 'root', 'keyids': ['123abc'],
|
||||
'threshold': 1, 'paths': ['path1/', 'path2']}]}
|
||||
|
||||
make_metadata = tuf.formats.TargetsFile.make_metadata
|
||||
from_metadata = tuf.formats.TargetsFile.from_metadata
|
||||
|
|
@ -491,23 +492,38 @@ def test_make_role_metadata(self):
|
|||
keyids = ['123abc', 'abc123']
|
||||
threshold = 2
|
||||
paths = ['path1/', 'path2']
|
||||
name = '123'
|
||||
|
||||
ROLE_SCHEMA = tuf.formats.ROLE_SCHEMA
|
||||
make_role = tuf.formats.make_role_metadata
|
||||
self.assertTrue(ROLE_SCHEMA.matches(make_role(keyids, threshold, paths)))
|
||||
|
||||
self.assertTrue(ROLE_SCHEMA.matches(make_role(keyids, threshold)))
|
||||
self.assertTrue(ROLE_SCHEMA.matches(make_role(keyids, threshold, name=name)))
|
||||
self.assertTrue(ROLE_SCHEMA.matches(make_role(keyids, threshold, paths=paths)))
|
||||
self.assertTrue(ROLE_SCHEMA.matches(make_role(keyids, threshold, name=name, paths=paths)))
|
||||
|
||||
# Test conditions for invalid arguments.
|
||||
bad_keyids = 'bad'
|
||||
bad_threshold = 'bad'
|
||||
bad_paths = 'bad'
|
||||
bad_name = 123
|
||||
|
||||
self.assertRaises(tuf.FormatError, make_role, bad_keyids, threshold, paths)
|
||||
self.assertRaises(tuf.FormatError, make_role, keyids, bad_threshold, paths)
|
||||
self.assertRaises(tuf.FormatError, make_role, keyids, threshold, bad_paths)
|
||||
self.assertRaises(tuf.FormatError, make_role, bad_keyids, threshold)
|
||||
self.assertRaises(tuf.FormatError, make_role, keyids, bad_threshold)
|
||||
|
||||
self.assertRaises(tuf.FormatError, make_role, bad_keyids, threshold, paths=paths)
|
||||
self.assertRaises(tuf.FormatError, make_role, keyids, bad_threshold, paths=paths)
|
||||
self.assertRaises(tuf.FormatError, make_role, keyids, threshold, paths=bad_paths)
|
||||
|
||||
self.assertRaises(tuf.FormatError, make_role, bad_keyids, threshold, name=name)
|
||||
self.assertRaises(tuf.FormatError, make_role, keyids, bad_threshold, name=name)
|
||||
self.assertRaises(tuf.FormatError, make_role, keyids, threshold, name=bad_name)
|
||||
|
||||
self.assertRaises(tuf.FormatError, make_role, bad_keyids, threshold, name=name, paths=paths)
|
||||
self.assertRaises(tuf.FormatError, make_role, keyids, bad_threshold, name=name, paths=paths)
|
||||
self.assertRaises(tuf.FormatError, make_role, keyids, threshold, name=bad_name, paths=paths)
|
||||
self.assertRaises(tuf.FormatError, make_role, keyids, threshold, name=name, paths=bad_paths)
|
||||
|
||||
|
||||
|
||||
def test_get_role_class(self):
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
"""
|
||||
<Program Name>
|
||||
test_signercli.py
|
||||
|
|
@ -1350,8 +1352,10 @@ def _mock_prompt(msg, junk):
|
|||
return delegated_targets_dir
|
||||
elif msg.startswith('\nChoose and enter the parent'):
|
||||
return parent_role
|
||||
elif msg.endswith('\nEnter the delegated role\'s name: '):
|
||||
elif msg.startswith('\nEnter the delegated role\'s name: '):
|
||||
return delegated_role
|
||||
elif msg.startswith('Recursively walk the given directory? (Y)es/(N)o: '):
|
||||
return 'N'
|
||||
else:
|
||||
error_msg = ('Prompt: '+'\''+msg+'\''+
|
||||
' did not match any predefined mock prompts.')
|
||||
|
|
@ -1464,8 +1468,13 @@ def _mock_get_keyids(junk):
|
|||
parent_role_file = os.path.join(meta_dir, parent_role+'.txt')
|
||||
signable = signerlib.read_metadata_file(parent_role_file)
|
||||
delegated_rolename = parent_role+'/'+delegated_role
|
||||
threshold = signable['signed']['delegations']['roles']\
|
||||
[delegated_rolename]['threshold']
|
||||
|
||||
roles = signable['signed']['delegations']['roles']
|
||||
role_index = signerlib.find_delegated_role(roles, delegated_rolename)
|
||||
self.assertIsNotNone(role_index)
|
||||
role = roles[role_index]
|
||||
|
||||
threshold = role['threshold']
|
||||
self.assertTrue(threshold == 2)
|
||||
|
||||
# RESTORE
|
||||
|
|
|
|||
|
|
@ -1,3 +1,5 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
"""
|
||||
<Program Name>
|
||||
test_updater.py
|
||||
|
|
|
|||
Loading…
Reference in a new issue