diff --git a/dev-requirements.txt b/dev-requirements.txt index 4754221d..1922ad28 100644 --- a/dev-requirements.txt +++ b/dev-requirements.txt @@ -19,7 +19,7 @@ cffi==1.7.0 pycrypto==2.6.1 pynacl==1.0.1 cryptography==1.4.0 -securesystemslib==0.10.1 +securesystemslib==0.10.6 # Testing requirements. The rest of the testing dependencies available in # 'tox.ini' diff --git a/setup.py b/setup.py index 4a54bad2..6a926834 100755 --- a/setup.py +++ b/setup.py @@ -105,7 +105,7 @@ 'Topic :: Security', 'Topic :: Software Development' ], - install_requires = ['iso8601', 'six', 'securesystemslib>=0.10.5'], + install_requires = ['iso8601', 'six', 'securesystemslib>=0.10.6'], packages = find_packages(exclude=['tests']), scripts = [ 'tuf/scripts/basic_client.py', diff --git a/tests/repository_data/client/test_repository/metadata/current/root.json.gz b/tests/repository_data/client/test_repository/metadata/current/root.json.gz new file mode 100644 index 00000000..2df907ec Binary files /dev/null and b/tests/repository_data/client/test_repository/metadata/current/root.json.gz differ diff --git a/tests/test_exceptions.py b/tests/test_exceptions.py new file mode 100755 index 00000000..b29b72b8 --- /dev/null +++ b/tests/test_exceptions.py @@ -0,0 +1,62 @@ +#!/usr/bin/env python + +""" + + test_exceptions.py + + + Vladimir Diaz + + + July 13, 2017. + + + See LICENSE for licensing information. + + + Test cases for exceptions.py (mainly the exceptions defined there). +""" + +# Help with Python 3 compatibility, where the print statement is a function, an +# implicit relative import is invalid, and the '/' operator performs true +# division. Example: print 'hello world' raises a 'SyntaxError' exception. +from __future__ import print_function +from __future__ import absolute_import +from __future__ import division +from __future__ import unicode_literals + +import unittest +import logging + +import tuf.exceptions + +logger = logging.getLogger('test_exceptions') + +class TestExceptions(unittest.TestCase): + def setUp(self): + pass + + + def tearDown(self): + pass + + + def test_bad_signature_error(self): + bad_signature_error = tuf.exceptions.BadSignatureError('bad sig') + logger.error(bad_signature_error) + + + def test_bad_hash_error(self): + bad_hash_error = tuf.exceptions.BadHashError('1234', '5678') + logger.error(bad_hash_error) + + + def test_decompression_error(self): + download_exception = tuf.exceptions.DownloadError() + decompression_error = tuf.exceptions.DecompressionError(download_exception) + logger.error(decompression_error) + + +# Run the unit tests. +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index 815ade6f..a3ef34ee 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -1018,6 +1018,11 @@ def test__load_top_level_metadata(self): repo_lib.write_metadata_file(signable, root_file, 8, ['gz'], False) + # Attempt to load a repository that contains a compressed Root file. + repository = repo_tool.create_new_repository(repository_directory, repository_name) + filenames = repo_lib.get_metadata_filenames(metadata_directory) + repo_lib._load_top_level_metadata(repository, filenames, repository_name) + # Remove compressed metadata so that we can test for loading of a # repository with no compression enabled. for role_file in os.listdir(metadata_directory): diff --git a/tests/test_repository_tool.py b/tests/test_repository_tool.py index 4acc4869..d43749c8 100755 --- a/tests/test_repository_tool.py +++ b/tests/test_repository_tool.py @@ -84,6 +84,9 @@ def tearDownClass(cls): def setUp(self): + tuf.roledb.clear_roledb(clear_all=True) + tuf.keydb.clear_keydb(clear_all=True) + tuf.roledb.create_roledb('test_repository') tuf.keydb.create_keydb('test_repository') diff --git a/tests/test_unittest_toolbox.py b/tests/test_unittest_toolbox.py new file mode 100755 index 00000000..18b39b3a --- /dev/null +++ b/tests/test_unittest_toolbox.py @@ -0,0 +1,55 @@ +#!/usr/bin/env python + +""" + + test_unittest_toolbox.py + + + Vladimir Diaz + + + July 14, 2017. + + + See LICENSE for licensing information. + + + Test cases for unittest_toolbox.py. +""" + +# Help with Python 3 compatibility, where the print statement is a function, an +# implicit relative import is invalid, and the '/' operator performs true +# division. Example: print 'hello world' raises a 'SyntaxError' exception. +from __future__ import print_function +from __future__ import absolute_import +from __future__ import division +from __future__ import unicode_literals + +import unittest +import logging +import shutil + +import tuf.unittest_toolbox as unittest_toolbox + +logger = logging.getLogger('test_unittest_toolbox') + + +class TestUnittestToolbox(unittest_toolbox.Modified_TestCase): + def setUp(self): + unittest_toolbox.Modified_TestCase.setUp(self) + + def tearDown(self): + unittest_toolbox.Modified_TestCase.tearDown(self) + + + def test_tear_down_already_deleted_dir(self): + temp_directory = self.make_temp_directory() + + # Delete the temp directory to make sure unittest_toolbox doesn't + # complain about the missing temp_directory. + shutil.rmtree(temp_directory) + + +# Run the unit tests. +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_updater.py b/tests/test_updater.py index 1163afa5..9bfd0ce3 100755 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -133,6 +133,8 @@ def tearDownClass(cls): def setUp(self): # We are inheriting from custom class. unittest_toolbox.Modified_TestCase.setUp(self) + tuf.roledb.clear_roledb(clear_all=True) + tuf.keydb.clear_keydb(clear_all=True) self.repository_name = 'test_repository' @@ -312,6 +314,14 @@ def test_1__load_metadata_from_file(self): self.assertEqual(self.repository_updater.metadata['current']['role1'], role1_meta['signed']) + # Verify that _load_metadata_from_file() doesn't raise an exception for + # improperly formatted metadata, and doesn't load the bad file. + with open(role1_filepath, 'a') as file_object: + file_object.write('bad JSON data') + + self.repository_updater._load_metadata_from_file('current', 'role1') + self.assertEqual(len(self.repository_updater.metadata['current']), 5) + # Test if we fail gracefully if we can't deserialize a meta file self.repository_updater._load_metadata_from_file('current', 'empty_file') self.assertFalse('empty_file' in self.repository_updater.metadata['current']) @@ -325,29 +335,29 @@ def test_1__load_metadata_from_file(self): - """ def test_1__rebuild_key_and_role_db(self): # Setup root_roleinfo = tuf.roledb.get_roleinfo('root', self.repository_name) root_metadata = self.repository_updater.metadata['current']['root'] root_threshold = root_metadata['roles']['root']['threshold'] - print('\nnumber of root keys: ' + str(len(root_metadata['keys'].keys()))) - print('\nKeys in root metadata: ' + repr(root_metadata['keys'].keys())) number_of_root_keys = len(root_metadata['keys']) self.assertEqual(root_roleinfo['threshold'], root_threshold) - # Ensure we add 1 to the number of root keys (actually, the number of root + + # Ensure we add 2 to the number of root keys (actually, the number of root # keys multiplied by the number of keyid hash algorithms), to include the - # delegated targets key. The delegated roles of 'targets.json' are also - # loaded when the repository object is instantiated. - print('\ndifference: ' + repr(list(set(tuf.keydb._keydb_dict[self.repository_name].keys()) - set(root_metadata['keys'].keys())))) - self.assertEqual(number_of_root_keys * 2 + 1, len(tuf.keydb._keydb_dict[self.repository_name])) + # delegated targets key (+1 for its sha512 keyid). The delegated roles of + # 'targets.json' are also loaded when the repository object is + # instantiated. + + self.assertEqual(number_of_root_keys * 2 + 2, len(tuf.keydb._keydb_dict[self.repository_name])) # Test: normal case. self.repository_updater._rebuild_key_and_role_db() root_roleinfo = tuf.roledb.get_roleinfo('root', self.repository_name) self.assertEqual(root_roleinfo['threshold'], root_threshold) + # _rebuild_key_and_role_db() will only rebuild the keys and roles specified # in the 'root.json' file, unlike __init__(). Instantiating an updater # object calls both _rebuild_key_and_role_db() and _import_delegations(). @@ -363,7 +373,6 @@ def test_1__rebuild_key_and_role_db(self): root_roleinfo = tuf.roledb.get_roleinfo('root', self.repository_name) self.assertEqual(root_roleinfo['threshold'], 8) self.assertEqual(number_of_root_keys * 2 - 2, len(tuf.keydb._keydb_dict[self.repository_name])) - """ @@ -459,14 +468,13 @@ def test_2__fileinfo_has_changed(self): - """ def test_2__import_delegations(self): # Setup. # In order to test '_import_delegations' the parent of the delegation # has to be in Repository.metadata['current'], but it has to be inserted # there without using '_load_metadata_from_file()' since it calls # '_import_delegations()'. - repository_name = self.repository_updater.updater_name + repository_name = self.repository_updater.repository_name tuf.keydb.clear_keydb(repository_name) tuf.roledb.clear_roledb(repository_name) @@ -476,10 +484,9 @@ def test_2__import_delegations(self): self.repository_updater._rebuild_key_and_role_db() self.assertEqual(len(tuf.roledb._roledb_dict[repository_name]), 4) + # Take into account the number of keyids algorithms supported by default, # which this test condition expects to be two (sha256 and sha512). - print('\nkeydb_dict len: ' + repr(len(tuf.keydb._keydb_dict[repository_name].keys()))) - print('\nkeydb_dict: ' + repr(tuf.keydb._keydb_dict[repository_name].keys())) self.assertEqual(4 * 2, len(tuf.keydb._keydb_dict[repository_name])) # Test: pass a role without delegations. @@ -497,8 +504,8 @@ def test_2__import_delegations(self): self.assertEqual(len(tuf.roledb._roledb_dict[repository_name]), 5) # The number of root keys (times the number of key hash algorithms) + - # delegation's key. - self.assertEqual(len(tuf.keydb._keydb_dict[repository_name]), 4 * 2 + 1) + # delegation's key (+1 for its sha512 keyid). + self.assertEqual(len(tuf.keydb._keydb_dict[repository_name]), 4 * 2 + 2) # Verify that roledb dictionary was added. self.assertTrue('role1' in tuf.roledb._roledb_dict[repository_name]) @@ -526,24 +533,23 @@ def test_2__import_delegations(self): self.repository_updater.metadata['current']['targets']\ ['delegations']['keys'][existing_keyid]['keytype'] = 'ed25519' - # Verify that _import_delegations() raises an exception if any key in - # 'delegations' is improperly formatted (i.e., bad keyid). - tuf.keydb.clear_keydb(repository_name) + # Verify that _import_delegations() raises an exception if one of the + # delegated keys is malformed. + valid_keyval = self.repository_updater.metadata['current']['targets']\ + ['delegations']['keys'][existing_keyid]['keyval'] - self.repository_updater.metadata['current']['targets']['delegations']\ - ['keys'].update({'123': self.repository_updater.metadata['current']\ - ['targets']['delegations']['keys'][existing_keyid]}) - self.assertRaises(securesystemslib.exceptions.Error, self.repository_updater._import_delegations, - 'targets') - - # Restore the keyid of 'existing_keyids2'. self.repository_updater.metadata['current']['targets']\ - ['delegations']['keys'][existing_keyid]['keyid'] = existing_keyid + ['delegations']['keys'][existing_keyid]['keyval'] = 1 + self.assertRaises(securesystemslib.exceptions.FormatError, self.repository_updater._import_delegations, 'targets') - # Verify that _import_delegations() raises an exception if it fails to add - # one of the roles loaded from parent role's 'delegations'. - """ + self.repository_updater.metadata['current']['targets']\ + ['delegations']['keys'][existing_keyid]['keyval'] = valid_keyval + # Verify that _import_delegations() raises an exception if one of the + # delegated roles is malformed. + self.repository_updater.metadata['current']['targets']\ + ['delegations']['roles'][0]['name'] = 1 + self.assertRaises(securesystemslib.exceptions.FormatError, self.repository_updater._import_delegations, 'targets') @@ -1393,6 +1399,14 @@ def test_10__soft_check_file_length(self): self.repository_updater._soft_check_file_length, temp_file_object, 1) + # Verify that an exception is not raised if the file length <= the observed + # file length. + temp_file_object.seek(0) + self.repository_updater._soft_check_file_length(temp_file_object, 3) + temp_file_object.seek(0) + self.repository_updater._soft_check_file_length(temp_file_object, 4) + + def test_10__targets_of_role(self): diff --git a/tuf/client/updater.py b/tuf/client/updater.py index 72f9574d..a331ff36 100755 --- a/tuf/client/updater.py +++ b/tuf/client/updater.py @@ -540,12 +540,11 @@ def _import_delegations(self, parent_role): # Iterate the keys of the delegated roles of 'parent_role' and load them. for keyid, keyinfo in six.iteritems(keys_info): if keyinfo['keytype'] in ['rsa', 'ed25519']: - key, keyids = securesystemslib.keys.format_metadata_to_key(keyinfo) # We specify the keyid to ensure that it's the correct keyid # for the key. try: - tuf.keydb.add_key(key, keyid, self.repository_name) + key, keyids = securesystemslib.keys.format_metadata_to_key(keyinfo) for keyid in keyids: key['keyid'] = keyid tuf.keydb.add_key(key, keyid=None, repository_name=self.repository_name) @@ -575,7 +574,7 @@ def _import_delegations(self, parent_role): logger.warning('Role already exists: ' + rolename) except: - logger.exception('Failed to add delegated role: ' + rolename + '.') + logger.exception('Failed to add delegated role: ' + repr(rolename) + '.') raise diff --git a/tuf/download.py b/tuf/download.py index 48bd77cd..e1f7ab2e 100755 --- a/tuf/download.py +++ b/tuf/download.py @@ -682,10 +682,7 @@ class VerifiedHTTPSConnection(six.moves.http_client.HTTPSConnection): def connect(self): self.connection_kwargs = {} - - # for > py2.5 - if hasattr(self, 'timeout'): - self.connection_kwargs.update(timeout = self.timeout) + self.connection_kwargs.update(timeout = self.timeout) # for >= py2.7 if hasattr(self, 'source_address'): diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index 0d56fc64..338e5049 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -737,7 +737,6 @@ def _load_top_level_metadata(repository, top_level_filenames, repository_name): # repository maintainer should have also been made aware of the duplicate # key when it was added. try: - tuf.keydb.add_key(key_object, repository_name=repository_name) for keyid in keyids: #pragma: no branch key_object['keyid'] = keyid tuf.keydb.add_key(key_object, keyid=None, diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index 86a49f19..e236ff29 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -3126,9 +3126,12 @@ def load_repository(repository_directory, repository_name='default'): # The repository maintainer should have also been made aware of the # duplicate key when it was added. for key_metadata in six.itervalues(metadata_object['delegations']['keys']): - key_object, junk = securesystemslib.keys.format_metadata_to_key(key_metadata) + key_object, keyids = securesystemslib.keys.format_metadata_to_key(key_metadata) try: - tuf.keydb.add_key(key_object, repository_name=repository_name) + for keyid in keyids: + key_object['keyid'] = keyid + tuf.keydb.add_key(key_object, keyid=None, + repository_name=repository_name) except securesystemslib.exceptions.KeyAlreadyExistsError: pass diff --git a/tuf/roledb.py b/tuf/roledb.py index 1ad93cbd..26403569 100755 --- a/tuf/roledb.py +++ b/tuf/roledb.py @@ -810,8 +810,8 @@ def get_role_threshold(rolename, repository_name='default'): securesystemslib.exceptions.FormatError, if the arguments do not have the correct object format. - securesystemslib.exceptions.UnknownRoleError, if 'rolename' cannot be found - in in the role database. + tuf.exceptions.UnknownRoleError, if 'rolename' cannot be found + in the role database. securesystemslib.exceptions.InvalidNameError, if 'rolename' is incorrectly formatted, or 'repository_name' does not exist in the role database. @@ -822,6 +822,7 @@ def get_role_threshold(rolename, repository_name='default'): A threshold integer value. """ + # Raise 'securesystemslib.exceptions.FormatError' if 'repository_name' is # improperly formatted. securesystemslib.formats.NAME_SCHEMA.check_match(repository_name) @@ -885,10 +886,6 @@ def get_role_paths(rolename, repository_name='default'): global _roledb_dict global _dirty_roles - if repository_name not in _roledb_dict or repository_name not in _dirty_roles: - raise securesystemslib.exceptions.InvalidNameError('Repository name does not' ' exist: ' + - repository_name) - roleinfo = _roledb_dict[repository_name][rolename] # Paths won't exist for non-target roles. @@ -949,10 +946,6 @@ def get_delegated_rolenames(rolename, repository_name='default'): global _roledb_dict global _dirty_roles - if repository_name not in _roledb_dict or repository_name not in _dirty_roles: - raise securesystemslib.exceptions.InvalidNameError('Repository name does not' - ' exist: ' + repository_name) - # get_roleinfo() raises a 'securesystemslib.exceptions.InvalidNameError' if # 'repository_name' does not exist in the role database. roleinfo = get_roleinfo(rolename, repository_name) diff --git a/tuf/sig.py b/tuf/sig.py index d572f92b..0f3b45c9 100755 --- a/tuf/sig.py +++ b/tuf/sig.py @@ -101,7 +101,7 @@ def get_signature_status(signable, role=None, repository_name='default', securesystemslib.exceptions.FormatError, if 'signable' does not have the correct format. - securesystemslib.exceptions.UnknownRoleError, if 'role' is not recognized. + tuf.exceptions.UnknownRoleError, if 'role' is not recognized. None. @@ -188,7 +188,7 @@ def get_signature_status(signable, role=None, repository_name='default', continue # Unknown role, re-raise exception. - except securesystemslib.exceptions.UnknownRoleError: + except tuf.exceptions.UnknownRoleError: raise # This is an unset role, thus an unknown signature. @@ -211,7 +211,7 @@ def get_signature_status(signable, role=None, repository_name='default', threshold = \ tuf.roledb.get_role_threshold(role, repository_name=repository_name) - except securesystemslib.exceptions.UnknownRoleError: + except tuf.exceptions.UnknownRoleError: raise else: diff --git a/tuf/unittest_toolbox.py b/tuf/unittest_toolbox.py index 67ef7286..b9ced979 100755 --- a/tuf/unittest_toolbox.py +++ b/tuf/unittest_toolbox.py @@ -85,7 +85,7 @@ def tearDown(self): try: # OSError will occur if the directory was already removed. cleanup_function() - + except OSError: pass @@ -93,11 +93,15 @@ def tearDown(self): def make_temp_directory(self, directory=None): """Creates and returns an absolute path of a directory.""" + prefix = self.__class__.__name__+'_' temp_directory = tempfile.mkdtemp(prefix=prefix, dir=directory) + def _destroy_temp_directory(): shutil.rmtree(temp_directory) + self._cleanup.append(_destroy_temp_directory) + return temp_directory