Merge pull request #473 from vladimir-v-diaz/coverage

Coverage and bug fixes
This commit is contained in:
Vladimir Diaz 2017-07-19 11:02:22 -04:00 committed by GitHub
commit 67eb57ae1c
15 changed files with 189 additions and 55 deletions

View file

@ -19,7 +19,7 @@ cffi==1.7.0
pycrypto==2.6.1
pynacl==1.0.1
cryptography==1.4.0
securesystemslib==0.10.1
securesystemslib==0.10.6
# Testing requirements. The rest of the testing dependencies available in
# 'tox.ini'

View file

@ -105,7 +105,7 @@
'Topic :: Security',
'Topic :: Software Development'
],
install_requires = ['iso8601', 'six', 'securesystemslib>=0.10.5'],
install_requires = ['iso8601', 'six', 'securesystemslib>=0.10.6'],
packages = find_packages(exclude=['tests']),
scripts = [
'tuf/scripts/basic_client.py',

62
tests/test_exceptions.py Executable file
View file

@ -0,0 +1,62 @@
#!/usr/bin/env python
"""
<Program Name>
test_exceptions.py
<Author>
Vladimir Diaz
<Started>
July 13, 2017.
<Copyright>
See LICENSE for licensing information.
<Purpose>
Test cases for exceptions.py (mainly the exceptions defined there).
"""
# Help with Python 3 compatibility, where the print statement is a function, an
# implicit relative import is invalid, and the '/' operator performs true
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import unittest
import logging
import tuf.exceptions
logger = logging.getLogger('test_exceptions')
class TestExceptions(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_bad_signature_error(self):
bad_signature_error = tuf.exceptions.BadSignatureError('bad sig')
logger.error(bad_signature_error)
def test_bad_hash_error(self):
bad_hash_error = tuf.exceptions.BadHashError('1234', '5678')
logger.error(bad_hash_error)
def test_decompression_error(self):
download_exception = tuf.exceptions.DownloadError()
decompression_error = tuf.exceptions.DecompressionError(download_exception)
logger.error(decompression_error)
# Run the unit tests.
if __name__ == '__main__':
unittest.main()

View file

@ -1018,6 +1018,11 @@ def test__load_top_level_metadata(self):
repo_lib.write_metadata_file(signable, root_file, 8, ['gz'], False)
# Attempt to load a repository that contains a compressed Root file.
repository = repo_tool.create_new_repository(repository_directory, repository_name)
filenames = repo_lib.get_metadata_filenames(metadata_directory)
repo_lib._load_top_level_metadata(repository, filenames, repository_name)
# Remove compressed metadata so that we can test for loading of a
# repository with no compression enabled.
for role_file in os.listdir(metadata_directory):

View file

@ -84,6 +84,9 @@ def tearDownClass(cls):
def setUp(self):
tuf.roledb.clear_roledb(clear_all=True)
tuf.keydb.clear_keydb(clear_all=True)
tuf.roledb.create_roledb('test_repository')
tuf.keydb.create_keydb('test_repository')

55
tests/test_unittest_toolbox.py Executable file
View file

@ -0,0 +1,55 @@
#!/usr/bin/env python
"""
<Program Name>
test_unittest_toolbox.py
<Author>
Vladimir Diaz
<Started>
July 14, 2017.
<Copyright>
See LICENSE for licensing information.
<Purpose>
Test cases for unittest_toolbox.py.
"""
# Help with Python 3 compatibility, where the print statement is a function, an
# implicit relative import is invalid, and the '/' operator performs true
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import unittest
import logging
import shutil
import tuf.unittest_toolbox as unittest_toolbox
logger = logging.getLogger('test_unittest_toolbox')
class TestUnittestToolbox(unittest_toolbox.Modified_TestCase):
def setUp(self):
unittest_toolbox.Modified_TestCase.setUp(self)
def tearDown(self):
unittest_toolbox.Modified_TestCase.tearDown(self)
def test_tear_down_already_deleted_dir(self):
temp_directory = self.make_temp_directory()
# Delete the temp directory to make sure unittest_toolbox doesn't
# complain about the missing temp_directory.
shutil.rmtree(temp_directory)
# Run the unit tests.
if __name__ == '__main__':
unittest.main()

View file

@ -133,6 +133,8 @@ def tearDownClass(cls):
def setUp(self):
# We are inheriting from custom class.
unittest_toolbox.Modified_TestCase.setUp(self)
tuf.roledb.clear_roledb(clear_all=True)
tuf.keydb.clear_keydb(clear_all=True)
self.repository_name = 'test_repository'
@ -312,6 +314,14 @@ def test_1__load_metadata_from_file(self):
self.assertEqual(self.repository_updater.metadata['current']['role1'],
role1_meta['signed'])
# Verify that _load_metadata_from_file() doesn't raise an exception for
# improperly formatted metadata, and doesn't load the bad file.
with open(role1_filepath, 'a') as file_object:
file_object.write('bad JSON data')
self.repository_updater._load_metadata_from_file('current', 'role1')
self.assertEqual(len(self.repository_updater.metadata['current']), 5)
# Test if we fail gracefully if we can't deserialize a meta file
self.repository_updater._load_metadata_from_file('current', 'empty_file')
self.assertFalse('empty_file' in self.repository_updater.metadata['current'])
@ -325,29 +335,29 @@ def test_1__load_metadata_from_file(self):
"""
def test_1__rebuild_key_and_role_db(self):
# Setup
root_roleinfo = tuf.roledb.get_roleinfo('root', self.repository_name)
root_metadata = self.repository_updater.metadata['current']['root']
root_threshold = root_metadata['roles']['root']['threshold']
print('\nnumber of root keys: ' + str(len(root_metadata['keys'].keys())))
print('\nKeys in root metadata: ' + repr(root_metadata['keys'].keys()))
number_of_root_keys = len(root_metadata['keys'])
self.assertEqual(root_roleinfo['threshold'], root_threshold)
# Ensure we add 1 to the number of root keys (actually, the number of root
# Ensure we add 2 to the number of root keys (actually, the number of root
# keys multiplied by the number of keyid hash algorithms), to include the
# delegated targets key. The delegated roles of 'targets.json' are also
# loaded when the repository object is instantiated.
print('\ndifference: ' + repr(list(set(tuf.keydb._keydb_dict[self.repository_name].keys()) - set(root_metadata['keys'].keys()))))
self.assertEqual(number_of_root_keys * 2 + 1, len(tuf.keydb._keydb_dict[self.repository_name]))
# delegated targets key (+1 for its sha512 keyid). The delegated roles of
# 'targets.json' are also loaded when the repository object is
# instantiated.
self.assertEqual(number_of_root_keys * 2 + 2, len(tuf.keydb._keydb_dict[self.repository_name]))
# Test: normal case.
self.repository_updater._rebuild_key_and_role_db()
root_roleinfo = tuf.roledb.get_roleinfo('root', self.repository_name)
self.assertEqual(root_roleinfo['threshold'], root_threshold)
# _rebuild_key_and_role_db() will only rebuild the keys and roles specified
# in the 'root.json' file, unlike __init__(). Instantiating an updater
# object calls both _rebuild_key_and_role_db() and _import_delegations().
@ -363,7 +373,6 @@ def test_1__rebuild_key_and_role_db(self):
root_roleinfo = tuf.roledb.get_roleinfo('root', self.repository_name)
self.assertEqual(root_roleinfo['threshold'], 8)
self.assertEqual(number_of_root_keys * 2 - 2, len(tuf.keydb._keydb_dict[self.repository_name]))
"""
@ -459,14 +468,13 @@ def test_2__fileinfo_has_changed(self):
"""
def test_2__import_delegations(self):
# Setup.
# In order to test '_import_delegations' the parent of the delegation
# has to be in Repository.metadata['current'], but it has to be inserted
# there without using '_load_metadata_from_file()' since it calls
# '_import_delegations()'.
repository_name = self.repository_updater.updater_name
repository_name = self.repository_updater.repository_name
tuf.keydb.clear_keydb(repository_name)
tuf.roledb.clear_roledb(repository_name)
@ -476,10 +484,9 @@ def test_2__import_delegations(self):
self.repository_updater._rebuild_key_and_role_db()
self.assertEqual(len(tuf.roledb._roledb_dict[repository_name]), 4)
# Take into account the number of keyids algorithms supported by default,
# which this test condition expects to be two (sha256 and sha512).
print('\nkeydb_dict len: ' + repr(len(tuf.keydb._keydb_dict[repository_name].keys())))
print('\nkeydb_dict: ' + repr(tuf.keydb._keydb_dict[repository_name].keys()))
self.assertEqual(4 * 2, len(tuf.keydb._keydb_dict[repository_name]))
# Test: pass a role without delegations.
@ -497,8 +504,8 @@ def test_2__import_delegations(self):
self.assertEqual(len(tuf.roledb._roledb_dict[repository_name]), 5)
# The number of root keys (times the number of key hash algorithms) +
# delegation's key.
self.assertEqual(len(tuf.keydb._keydb_dict[repository_name]), 4 * 2 + 1)
# delegation's key (+1 for its sha512 keyid).
self.assertEqual(len(tuf.keydb._keydb_dict[repository_name]), 4 * 2 + 2)
# Verify that roledb dictionary was added.
self.assertTrue('role1' in tuf.roledb._roledb_dict[repository_name])
@ -526,24 +533,23 @@ def test_2__import_delegations(self):
self.repository_updater.metadata['current']['targets']\
['delegations']['keys'][existing_keyid]['keytype'] = 'ed25519'
# Verify that _import_delegations() raises an exception if any key in
# 'delegations' is improperly formatted (i.e., bad keyid).
tuf.keydb.clear_keydb(repository_name)
# Verify that _import_delegations() raises an exception if one of the
# delegated keys is malformed.
valid_keyval = self.repository_updater.metadata['current']['targets']\
['delegations']['keys'][existing_keyid]['keyval']
self.repository_updater.metadata['current']['targets']['delegations']\
['keys'].update({'123': self.repository_updater.metadata['current']\
['targets']['delegations']['keys'][existing_keyid]})
self.assertRaises(securesystemslib.exceptions.Error, self.repository_updater._import_delegations,
'targets')
# Restore the keyid of 'existing_keyids2'.
self.repository_updater.metadata['current']['targets']\
['delegations']['keys'][existing_keyid]['keyid'] = existing_keyid
['delegations']['keys'][existing_keyid]['keyval'] = 1
self.assertRaises(securesystemslib.exceptions.FormatError, self.repository_updater._import_delegations, 'targets')
# Verify that _import_delegations() raises an exception if it fails to add
# one of the roles loaded from parent role's 'delegations'.
"""
self.repository_updater.metadata['current']['targets']\
['delegations']['keys'][existing_keyid]['keyval'] = valid_keyval
# Verify that _import_delegations() raises an exception if one of the
# delegated roles is malformed.
self.repository_updater.metadata['current']['targets']\
['delegations']['roles'][0]['name'] = 1
self.assertRaises(securesystemslib.exceptions.FormatError, self.repository_updater._import_delegations, 'targets')
@ -1393,6 +1399,14 @@ def test_10__soft_check_file_length(self):
self.repository_updater._soft_check_file_length,
temp_file_object, 1)
# Verify that an exception is not raised if the file length <= the observed
# file length.
temp_file_object.seek(0)
self.repository_updater._soft_check_file_length(temp_file_object, 3)
temp_file_object.seek(0)
self.repository_updater._soft_check_file_length(temp_file_object, 4)
def test_10__targets_of_role(self):

View file

@ -540,12 +540,11 @@ def _import_delegations(self, parent_role):
# Iterate the keys of the delegated roles of 'parent_role' and load them.
for keyid, keyinfo in six.iteritems(keys_info):
if keyinfo['keytype'] in ['rsa', 'ed25519']:
key, keyids = securesystemslib.keys.format_metadata_to_key(keyinfo)
# We specify the keyid to ensure that it's the correct keyid
# for the key.
try:
tuf.keydb.add_key(key, keyid, self.repository_name)
key, keyids = securesystemslib.keys.format_metadata_to_key(keyinfo)
for keyid in keyids:
key['keyid'] = keyid
tuf.keydb.add_key(key, keyid=None, repository_name=self.repository_name)
@ -575,7 +574,7 @@ def _import_delegations(self, parent_role):
logger.warning('Role already exists: ' + rolename)
except:
logger.exception('Failed to add delegated role: ' + rolename + '.')
logger.exception('Failed to add delegated role: ' + repr(rolename) + '.')
raise

View file

@ -682,10 +682,7 @@ class VerifiedHTTPSConnection(six.moves.http_client.HTTPSConnection):
def connect(self):
self.connection_kwargs = {}
# for > py2.5
if hasattr(self, 'timeout'):
self.connection_kwargs.update(timeout = self.timeout)
self.connection_kwargs.update(timeout = self.timeout)
# for >= py2.7
if hasattr(self, 'source_address'):

View file

@ -737,7 +737,6 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name):
# repository maintainer should have also been made aware of the duplicate
# key when it was added.
try:
tuf.keydb.add_key(key_object, repository_name=repository_name)
for keyid in keyids: #pragma: no branch
key_object['keyid'] = keyid
tuf.keydb.add_key(key_object, keyid=None,

View file

@ -3126,9 +3126,12 @@ def load_repository(repository_directory, repository_name='default'):
# The repository maintainer should have also been made aware of the
# duplicate key when it was added.
for key_metadata in six.itervalues(metadata_object['delegations']['keys']):
key_object, junk = securesystemslib.keys.format_metadata_to_key(key_metadata)
key_object, keyids = securesystemslib.keys.format_metadata_to_key(key_metadata)
try:
tuf.keydb.add_key(key_object, repository_name=repository_name)
for keyid in keyids:
key_object['keyid'] = keyid
tuf.keydb.add_key(key_object, keyid=None,
repository_name=repository_name)
except securesystemslib.exceptions.KeyAlreadyExistsError:
pass

View file

@ -810,8 +810,8 @@ def get_role_threshold(rolename, repository_name='default'):
securesystemslib.exceptions.FormatError, if the arguments do not have the
correct object format.
securesystemslib.exceptions.UnknownRoleError, if 'rolename' cannot be found
in in the role database.
tuf.exceptions.UnknownRoleError, if 'rolename' cannot be found
in the role database.
securesystemslib.exceptions.InvalidNameError, if 'rolename' is incorrectly
formatted, or 'repository_name' does not exist in the role database.
@ -822,6 +822,7 @@ def get_role_threshold(rolename, repository_name='default'):
<Returns>
A threshold integer value.
"""
# Raise 'securesystemslib.exceptions.FormatError' if 'repository_name' is
# improperly formatted.
securesystemslib.formats.NAME_SCHEMA.check_match(repository_name)
@ -885,10 +886,6 @@ def get_role_paths(rolename, repository_name='default'):
global _roledb_dict
global _dirty_roles
if repository_name not in _roledb_dict or repository_name not in _dirty_roles:
raise securesystemslib.exceptions.InvalidNameError('Repository name does not' ' exist: ' +
repository_name)
roleinfo = _roledb_dict[repository_name][rolename]
# Paths won't exist for non-target roles.
@ -949,10 +946,6 @@ def get_delegated_rolenames(rolename, repository_name='default'):
global _roledb_dict
global _dirty_roles
if repository_name not in _roledb_dict or repository_name not in _dirty_roles:
raise securesystemslib.exceptions.InvalidNameError('Repository name does not'
' exist: ' + repository_name)
# get_roleinfo() raises a 'securesystemslib.exceptions.InvalidNameError' if
# 'repository_name' does not exist in the role database.
roleinfo = get_roleinfo(rolename, repository_name)

View file

@ -101,7 +101,7 @@ def get_signature_status(signable, role=None, repository_name='default',
securesystemslib.exceptions.FormatError, if 'signable' does not have the
correct format.
securesystemslib.exceptions.UnknownRoleError, if 'role' is not recognized.
tuf.exceptions.UnknownRoleError, if 'role' is not recognized.
<Side Effects>
None.
@ -188,7 +188,7 @@ def get_signature_status(signable, role=None, repository_name='default',
continue
# Unknown role, re-raise exception.
except securesystemslib.exceptions.UnknownRoleError:
except tuf.exceptions.UnknownRoleError:
raise
# This is an unset role, thus an unknown signature.
@ -211,7 +211,7 @@ def get_signature_status(signable, role=None, repository_name='default',
threshold = \
tuf.roledb.get_role_threshold(role, repository_name=repository_name)
except securesystemslib.exceptions.UnknownRoleError:
except tuf.exceptions.UnknownRoleError:
raise
else:

View file

@ -85,7 +85,7 @@ def tearDown(self):
try:
# OSError will occur if the directory was already removed.
cleanup_function()
except OSError:
pass
@ -93,11 +93,15 @@ def tearDown(self):
def make_temp_directory(self, directory=None):
"""Creates and returns an absolute path of a directory."""
prefix = self.__class__.__name__+'_'
temp_directory = tempfile.mkdtemp(prefix=prefix, dir=directory)
def _destroy_temp_directory():
shutil.rmtree(temp_directory)
self._cleanup.append(_destroy_temp_directory)
return temp_directory