From 08a2bad2c07cc0622293d97bdd22ac8e9f97a555 Mon Sep 17 00:00:00 2001 From: vladdd Date: Sun, 20 Apr 2014 16:15:19 -0400 Subject: [PATCH] Add Travis CI, coveralls, and coverage-related updates. --- .gitignore | 2 + .travis.yml | 17 + tests/.coveragerc | 15 + tests/aggregate_tests.py | 4 +- tests/test_arbitrary_package_attack.py | 4 +- tests/test_download.py | 3 +- tests/test_endless_data_attack.py | 4 +- tests/test_extraneous_dependencies_attack.py | 4 +- tests/test_hash.py | 15 + tests/test_indefinite_freeze_attack.py | 4 +- tests/test_keys.py | 86 ++-- tests/test_mirrors.py | 4 +- tests/test_mix_and_match_attack.py | 4 +- tests/test_replay_attack.py | 4 +- tests/test_roledb.py | 19 + tests/test_slow_retrieval_attack.py | 6 +- tests/test_updater.py | 8 +- tests/test_util.py | 5 +- tests/unittest_toolbox.py | 503 ------------------- tox.ini | 20 + tuf/__init__.py | 2 + tuf/download.py | 5 +- tuf/hash.py | 11 +- tuf/keys.py | 30 +- tuf/roledb.py | 7 +- tuf/unittest_toolbox.py | 137 +++++ 26 files changed, 334 insertions(+), 589 deletions(-) create mode 100644 .travis.yml create mode 100644 tests/.coveragerc delete mode 100755 tests/unittest_toolbox.py create mode 100644 tox.ini create mode 100755 tuf/unittest_toolbox.py diff --git a/.gitignore b/.gitignore index 0f43b246..ba102aff 100644 --- a/.gitignore +++ b/.gitignore @@ -9,3 +9,5 @@ build/* *.swo *.swp tuf.egg-info +.coverage +.tox/* diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 00000000..e5defd7a --- /dev/null +++ b/.travis.yml @@ -0,0 +1,17 @@ +language: python + +python: + - "2.7" + +install: + - pip install tox + - pip install coveralls + +script: tox + +after_success: + coveralls + +branches: + only: + - develop diff --git a/tests/.coveragerc b/tests/.coveragerc new file mode 100644 index 00000000..bb2d6cff --- /dev/null +++ b/tests/.coveragerc @@ -0,0 +1,15 @@ +[run] +branch = True + + +[report] +exclude_lines = + pragma: no cover + def _check_crypto_libraries + def __str__ + if __name__ == .__main__.: + +omit = + */tuf/interposition/* + */tuf/_vendor/* + */tuf/compatibility/* diff --git a/tests/aggregate_tests.py b/tests/aggregate_tests.py index 0713e217..78a9c9bd 100755 --- a/tests/aggregate_tests.py +++ b/tests/aggregate_tests.py @@ -45,8 +45,8 @@ # Provide command-line option to randomize the order in which the tests run. # Randomization might catch errors with unit tests that do not properly clean # up or restore monkey-patched modules. -if '--random' in sys.argv: - random.shuffle(tests_without_extension) +#if '--random' in sys.argv: +random.shuffle(tests_without_extension) suite = unittest.TestLoader().loadTestsFromNames(tests_without_extension) diff --git a/tests/test_arbitrary_package_attack.py b/tests/test_arbitrary_package_attack.py index 3a45b028..f533ef7c 100755 --- a/tests/test_arbitrary_package_attack.py +++ b/tests/test_arbitrary_package_attack.py @@ -48,7 +48,7 @@ import tuf.util import tuf.log import tuf.client.updater as updater -import tests.unittest_toolbox as unittest_toolbox +import tuf.unittest_toolbox as unittest_toolbox logger = logging.getLogger('tuf.test_arbitrary_package_attack') @@ -93,8 +93,6 @@ def tearDownClass(cls): # Remove the temporary repository directory, which should contain all the # metadata, targets, and key files generated of all the test cases. shutil.rmtree(cls.temporary_directory) - - unittest_toolbox.Modified_TestCase.clear_toolbox() # Kill the SimpleHTTPServer process. if cls.server_process.returncode is None: diff --git a/tests/test_download.py b/tests/test_download.py index e67f1f56..bdef71f2 100755 --- a/tests/test_download.py +++ b/tests/test_download.py @@ -20,6 +20,7 @@ Otherwise, module that launches simple server would not be found. """ +from __future__ import absolute_import import hashlib import logging @@ -35,7 +36,7 @@ import tuf.conf as conf import tuf.download as download import tuf.log -import tests.unittest_toolbox as unittest_toolbox +import tuf.unittest_toolbox as unittest_toolbox logger = logging.getLogger('tuf.test_download') diff --git a/tests/test_endless_data_attack.py b/tests/test_endless_data_attack.py index 7a820327..6da1e496 100755 --- a/tests/test_endless_data_attack.py +++ b/tests/test_endless_data_attack.py @@ -51,7 +51,7 @@ import tuf.util import tuf.log import tuf.client.updater as updater -import tests.unittest_toolbox as unittest_toolbox +import tuf.unittest_toolbox as unittest_toolbox logger = logging.getLogger('tuf.test_endless_data_attack') @@ -97,8 +97,6 @@ def tearDownClass(cls): # metadata, targets, and key files generated of all the test cases. shutil.rmtree(cls.temporary_directory) - unittest_toolbox.Modified_TestCase.clear_toolbox() - # Kill the SimpleHTTPServer process. if cls.server_process.returncode is None: logger.info('Server process '+str(cls.server_process.pid)+' terminated.') diff --git a/tests/test_extraneous_dependencies_attack.py b/tests/test_extraneous_dependencies_attack.py index 0fa1c1d1..bd0be5b9 100755 --- a/tests/test_extraneous_dependencies_attack.py +++ b/tests/test_extraneous_dependencies_attack.py @@ -52,7 +52,7 @@ import tuf.util import tuf.log import tuf.client.updater as updater -import tests.unittest_toolbox as unittest_toolbox +import tuf.unittest_toolbox as unittest_toolbox logger = logging.getLogger('tuf.test_extraneous_dependencies_attack') @@ -100,8 +100,6 @@ def tearDownClass(cls): # metadata, targets, and key files generated of all the test cases. shutil.rmtree(cls.temporary_directory) - unittest_toolbox.Modified_TestCase.clear_toolbox() - # Kill the SimpleHTTPServer process. if cls.server_process.returncode is None: logger.info('Server process '+str(cls.server_process.pid)+' terminated.') diff --git a/tests/test_hash.py b/tests/test_hash.py index 8e634a81..c38a4b2b 100755 --- a/tests/test_hash.py +++ b/tests/test_hash.py @@ -225,6 +225,21 @@ def _do_update_file_obj(self, library): # to always seek to the beginning. self.assertEqual(digest_object_truth.digest(), digest_object.digest()) + + def test_data_to_string(self): + self.assertEqual('12', tuf.hash.data_to_string('12')) + self.assertEqual(u'hello', tuf.hash.data_to_string(unicode('hello'))) + self.assertEqual('12', tuf.hash.data_to_string(12)) + + + def test_unsupported_digest_algorithm_and_library(self): + self.assertRaises(tuf.UnsupportedAlgorithmError, tuf.hash.digest, + 'sha123', 'hashlib') + self.assertRaises(tuf.UnsupportedAlgorithmError, tuf.hash.digest, + 'sha123', 'pycrypto') + + self.assertRaises(tuf.UnsupportedLibraryError, tuf.hash.digest, + 'sha256', 'badlib') # Run unit test. diff --git a/tests/test_indefinite_freeze_attack.py b/tests/test_indefinite_freeze_attack.py index 2c90a409..f6cc1c72 100755 --- a/tests/test_indefinite_freeze_attack.py +++ b/tests/test_indefinite_freeze_attack.py @@ -45,7 +45,7 @@ import tuf.log import tuf.client.updater as updater import tuf.repository_tool as repo_tool -import tests.unittest_toolbox as unittest_toolbox +import tuf.unittest_toolbox as unittest_toolbox # The repository tool is imported and logs console messages by default. Disable # console log messages generated by this unit test. @@ -95,8 +95,6 @@ def tearDownClass(cls): # metadata, targets, and key files generated of all the test cases. shutil.rmtree(cls.temporary_directory) - unittest_toolbox.Modified_TestCase.clear_toolbox() - # Kill the SimpleHTTPServer process. if cls.server_process.returncode is None: logger.info('Server process '+str(cls.server_process.pid)+' terminated.') diff --git a/tests/test_keys.py b/tests/test_keys.py index 758c1009..c1a2aad2 100755 --- a/tests/test_keys.py +++ b/tests/test_keys.py @@ -31,18 +31,13 @@ DATA = 'SOME DATA REQUIRING AUTHENTICITY.' -rsakey_dict = KEYS.generate_rsa_key() -temp_key_info_vals = rsakey_dict.values() -temp_key_vals = rsakey_dict['keyval'].values() - class TestKeys(unittest.TestCase): - def setUp(self): - rsakey_dict['keytype']=temp_key_info_vals[0] - rsakey_dict['keyid']=temp_key_info_vals[1] - rsakey_dict['keyval']=temp_key_info_vals[2] - rsakey_dict['keyval']['public']=temp_key_vals[0] - rsakey_dict['keyval']['private']=temp_key_vals[1] + + @classmethod + def setUpClass(cls): + cls.rsakey_dict = KEYS.generate_rsa_key() + cls.ed25519key_dict = KEYS.generate_ed25519_key() def test_generate_rsa_key(self): @@ -66,9 +61,10 @@ def test_generate_rsa_key(self): self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(KEYS.generate_rsa_key(4096))) + def test_format_keyval_to_metadata(self): - keyvalue = rsakey_dict['keyval'] - keytype = rsakey_dict['keytype'] + keyvalue = self.rsakey_dict['keyval'] + keytype = self.rsakey_dict['keytype'] key_meta = KEYS.format_keyval_to_metadata(keytype, keyvalue) # Check if the format of the object returned by this function corresponds @@ -87,39 +83,46 @@ def test_format_keyval_to_metadata(self): self.assertRaises(tuf.FormatError, KEYS.format_keyval_to_metadata, 'bad_keytype', keyvalue) + public = keyvalue['public'] del keyvalue['public'] self.assertRaises(tuf.FormatError, KEYS.format_keyval_to_metadata, keytype, keyvalue) + keyvalue['public'] = public + def test_format_metadata_to_key(self): # Reconfiguring rsakey_dict to conform to KEY_SCHEMA # i.e. {keytype: 'rsa', keyval: {public: pub_key, private: priv_key}} - #keyid = rsakey_dict['keyid'] - del rsakey_dict['keyid'] + keyid = self.rsakey_dict['keyid'] + del self.rsakey_dict['keyid'] - rsakey_dict_from_meta = KEYS.format_metadata_to_key(rsakey_dict) + rsakey_dict_from_meta = KEYS.format_metadata_to_key(self.rsakey_dict) # Check if the format of the object returned by this function corresponds # to RSAKEY_SCHEMA format. self.assertEqual(None, tuf.formats.RSAKEY_SCHEMA.check_match(rsakey_dict_from_meta), FORMAT_ERROR_MSG) - + self.rsakey_dict['keyid'] = keyid + # Supplying a wrong number of arguments. self.assertRaises(TypeError, KEYS.format_metadata_to_key) - args = (rsakey_dict, rsakey_dict) + args = (self.rsakey_dict, self.rsakey_dict) self.assertRaises(TypeError, KEYS.format_metadata_to_key, *args) # Supplying a malformed argument to the function - should get FormatError - del rsakey_dict['keyval'] + keyval = self.rsakey_dict['keyval'] + del self.rsakey_dict['keyval'] self.assertRaises(tuf.FormatError, KEYS.format_metadata_to_key, - rsakey_dict) + self.rsakey_dict) + self.rsakey_dict['keyval'] = keyval + def test_helper_get_keyid(self): - keytype = rsakey_dict['keytype'] - keyvalue = rsakey_dict['keyval'] + keytype = self.rsakey_dict['keytype'] + keyvalue = self.rsakey_dict['keyval'] # Check format of 'keytype'. self.assertEqual(None, tuf.formats.KEYTYPE_SCHEMA.check_match(keytype), @@ -138,49 +141,60 @@ def test_helper_get_keyid(self): def test_create_signature(self): # Creating a signature for 'DATA'. - signature = KEYS.create_signature(rsakey_dict, DATA) - + rsa_signature = KEYS.create_signature(self.rsakey_dict, DATA) + ed25519_signature = KEYS.create_signature(self.ed25519key_dict, DATA) + # Check format of output. self.assertEqual(None, - tuf.formats.SIGNATURE_SCHEMA.check_match(signature), + tuf.formats.SIGNATURE_SCHEMA.check_match(rsa_signature), + FORMAT_ERROR_MSG) + self.assertEqual(None, + tuf.formats.SIGNATURE_SCHEMA.check_match(ed25519_signature), FORMAT_ERROR_MSG) # Removing private key from 'rsakey_dict' - should raise a TypeError. - rsakey_dict['keyval']['private'] = '' + private = self.rsakey_dict['keyval']['private'] + self.rsakey_dict['keyval']['private'] = '' - args = (rsakey_dict, DATA) + args = (self.rsakey_dict, DATA) self.assertRaises(TypeError, KEYS.create_signature, *args) # Supplying an incorrect number of arguments. self.assertRaises(TypeError, KEYS.create_signature) + self.rsakey_dict['keyval']['private'] = private + def test_verify_signature(self): - # Creating a signature 'signature' of 'DATA' to be verified. - signature = KEYS.create_signature(rsakey_dict, DATA) + # Creating a signature of 'DATA' to be verified. + rsa_signature = KEYS.create_signature(self.rsakey_dict, DATA) + ed25519_signature = KEYS.create_signature(self.ed25519key_dict, DATA) # Verifying the 'signature' of 'DATA'. - verified = KEYS.verify_signature(rsakey_dict, signature, DATA) + verified = KEYS.verify_signature(self.rsakey_dict, rsa_signature, DATA) + self.assertTrue(verified, "Incorrect signature.") + + # Verifying the 'ed25519_signature' of 'DATA'. + verified = KEYS.verify_signature(self.ed25519key_dict, ed25519_signature, DATA) self.assertTrue(verified, "Incorrect signature.") - # Testing an invalid 'signature'. Same 'signature' is passed, with + # Testing an invalid 'rsa_signature'. Same 'rsa_signature' is passed, with # 'DATA' different than the original 'DATA' that was used - # in creating the 'signature'. Function should return 'False'. + # in creating the 'rsa_signature'. Function should return 'False'. # Modifying 'DATA'. _DATA = '1111'+DATA+'1111' # Verifying the 'signature' of modified '_DATA'. - verified = KEYS.verify_signature(rsakey_dict, signature, _DATA) + verified = KEYS.verify_signature(self.rsakey_dict, rsa_signature, _DATA) self.assertFalse(verified, 'Returned \'True\' on an incorrect signature.') # Modifying 'signature' to pass an incorrect method since only - # 'PyCrypto-PKCS#1 PSS' - # is accepted. - signature['method'] = 'Biff' + # 'PyCrypto-PKCS#1 PSS' is accepted. + rsa_signature['method'] = 'Biff' - args = (rsakey_dict, signature, DATA) + args = (self.rsakey_dict, rsa_signature, DATA) self.assertRaises(tuf.UnknownMethodError, KEYS.verify_signature, *args) # Passing incorrect number of arguments. diff --git a/tests/test_mirrors.py b/tests/test_mirrors.py index 603d5bf1..67572d9f 100755 --- a/tests/test_mirrors.py +++ b/tests/test_mirrors.py @@ -15,12 +15,14 @@ Unit test for 'mirrors.py'. """ +from __future__ import absolute_import + import unittest import tuf import tuf.formats as formats import tuf.mirrors as mirrors -import tests.unittest_toolbox as unittest_toolbox +import tuf.unittest_toolbox as unittest_toolbox diff --git a/tests/test_mix_and_match_attack.py b/tests/test_mix_and_match_attack.py index 5d7aa0bb..4b1191ed 100755 --- a/tests/test_mix_and_match_attack.py +++ b/tests/test_mix_and_match_attack.py @@ -50,7 +50,7 @@ import tuf.log import tuf.client.updater as updater import tuf.repository_tool as repo_tool -import tests.unittest_toolbox as unittest_toolbox +import tuf.unittest_toolbox as unittest_toolbox # The repository tool is imported and logs console messages by default. Disable # console log messages generated by this unit test. @@ -100,8 +100,6 @@ def tearDownClass(cls): # Remove the temporary repository directory, which should contain all the # metadata, targets, and key files generated of all the test cases. shutil.rmtree(cls.temporary_directory) - - unittest_toolbox.Modified_TestCase.clear_toolbox() # Kill the SimpleHTTPServer process. if cls.server_process.returncode is None: diff --git a/tests/test_replay_attack.py b/tests/test_replay_attack.py index 8906e665..ddb34d28 100755 --- a/tests/test_replay_attack.py +++ b/tests/test_replay_attack.py @@ -51,7 +51,7 @@ import tuf.log import tuf.client.updater as updater import tuf.repository_tool as repo_tool -import tests.unittest_toolbox as unittest_toolbox +import tuf.unittest_toolbox as unittest_toolbox # The repository tool is imported and logs console messages by default. Disable # console log messages generated by this unit test. @@ -102,8 +102,6 @@ def tearDownClass(cls): # metadata, targets, and key files generated of all the test cases. shutil.rmtree(cls.temporary_directory) - unittest_toolbox.Modified_TestCase.clear_toolbox() - # Kill the SimpleHTTPServer process. if cls.server_process.returncode is None: logger.info('Server process '+str(cls.server_process.pid)+' terminated.') diff --git a/tests/test_roledb.py b/tests/test_roledb.py index 117d4eba..eb34e43f 100755 --- a/tests/test_roledb.py +++ b/tests/test_roledb.py @@ -383,6 +383,25 @@ def test_create_roledb_from_root_metadata(self): + def test_update_roleinfo(self): + rolename = 'targets' + roleinfo = {'keyids': ['123'], 'threshold': 1} + tuf.roledb.add_role(rolename, roleinfo) + + # Test normal case. + tuf.roledb.update_roleinfo(rolename, roleinfo) + + # Test for an unknown role. + self.assertRaises(tuf.UnknownRoleError, tuf.roledb.update_roleinfo, + 'unknown_rolename', roleinfo) + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, tuf.roledb.update_roleinfo, 1, roleinfo) + self.assertRaises(tuf.FormatError, tuf.roledb.update_roleinfo, rolename, 1) + + + + def _test_rolename(self, test_function): # Private function that tests the 'rolename' argument of 'test_function' # for format, invalid name, and unknown role exceptions. diff --git a/tests/test_slow_retrieval_attack.py b/tests/test_slow_retrieval_attack.py index 3e4cf023..e497410c 100755 --- a/tests/test_slow_retrieval_attack.py +++ b/tests/test_slow_retrieval_attack.py @@ -53,7 +53,7 @@ import tuf.util import tuf.log import tuf.client.updater as updater -import tests.unittest_toolbox as unittest_toolbox +import tuf.unittest_toolbox as unittest_toolbox logger = logging.getLogger('tuf.test_slow_retrieval_attack') @@ -81,10 +81,8 @@ def tearDownClass(cls): # Remove the temporary repository directory, which should contain all the # metadata, targets, and key files generated of all the test cases. shutil.rmtree(cls.temporary_directory) - - unittest_toolbox.Modified_TestCase.clear_toolbox() - + def _start_slow_server(self, mode): # Launch a SimpleHTTPServer (serves files in the current directory). diff --git a/tests/test_updater.py b/tests/test_updater.py index 97348b84..9397177f 100755 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -23,7 +23,7 @@ The 'unittest_toolbox.py' module was created to provide additional testing tools, such as automatically deleting temporary files created in test cases. - For more information, see 'tuf/tests/unittest_toolbox.py'. + For more information, see 'tests/unittest_toolbox.py'. Test cases here should follow a specific order (i.e., independent methods are @@ -38,6 +38,8 @@ less dependent than 2. """ +from __future__ import absolute_import + import os import time import shutil @@ -56,7 +58,7 @@ import tuf.keydb import tuf.roledb import tuf.repository_tool as repo_tool -import tests.unittest_toolbox as unittest_toolbox +import tuf.unittest_toolbox as unittest_toolbox import tuf.client.updater as updater logger = logging.getLogger('tuf.test_updater') @@ -104,8 +106,6 @@ def tearDownClass(cls): # metadata, targets, and key files generated for the test cases. shutil.rmtree(cls.temporary_directory) - unittest_toolbox.Modified_TestCase.clear_toolbox() - # Kill the SimpleHTTPServer process. if cls.server_process.returncode is None: logger.info('\tServer process '+str(cls.server_process.pid)+' terminated.') diff --git a/tests/test_util.py b/tests/test_util.py index 4f4ae895..17a26262 100755 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -16,6 +16,7 @@ Unit test for 'util.py' """ +from __future__ import absolute_import import os import sys @@ -29,7 +30,7 @@ import tuf.log import tuf.hash import tuf.util as util -import tests.unittest_toolbox as unittest_toolbox +import tuf.unittest_toolbox as unittest_toolbox logger = logging.getLogger('tuf.test_util') @@ -221,7 +222,7 @@ def test_B1_get_file_details(self): # Test: Incorrect input. bogus_inputs = [self.random_string(), 1234, [self.random_string()], - {'a', 'a'}, None] + {'a': 'b'}, None] for bogus_input in bogus_inputs: if isinstance(bogus_input, basestring): self.assertRaises(tuf.Error, util.get_file_details, bogus_input) diff --git a/tests/unittest_toolbox.py b/tests/unittest_toolbox.py deleted file mode 100755 index ee0ac170..00000000 --- a/tests/unittest_toolbox.py +++ /dev/null @@ -1,503 +0,0 @@ -""" - - unittest_toolbox.py - - - Konstantin Andrianov - - - March 26, 2012 - - - See LICENSE for licensing information. - - - Provides an array of various methods for unit testing. Use it instead of - actual unittest module. This module builds on unittest module. - Specifically, Modified_TestCase is a derived class from unittest.TestCase. -""" - -import os -import sys -import shutil -import unittest -import tempfile -import random -import string -import ConfigParser - -import tuf.keys -#import tuf.repo.keystore as keystore - -# Modify the number of iterations (from the higher default count) so the unit -# tests run faster. -#keystore._PBKDF2_ITERATIONS = 1000 - - -class Modified_TestCase(unittest.TestCase): - """ - - Provide additional test-setup methods to make testing - of module's methods-under-test as independent as possible. - - If you want to modify setUp()/tearDown() do: - class Your_Test_Class(modified_TestCase): - def setUp(): - your setup modification - your setup modification - ... - modified_TestCase.setUp(self) - - - make_temp_directory(self, directory=None): - Creates and returns an absolute path of a temporary directory. - - make_temp_file(self, suffix='.txt', directory=None): - Creates and returns an absolute path of an empty temp file. - - make_temp_data_file(self, suffix='', directory=None, data = junk_data): - Returns an absolute path of a temp file containing some data. - - make_temp_config_file(self, suffix='', directory=None, config_dict={}, expiration=None): - Creates a temporary file and puts a config dictionary in it using - ConfigParser. It then returns a (config_file_path, config_dictionary) - tuple. - - make_temp_directory_with_data_files(self, _current_dir=None,directory_content=\ - directory_dictionary, directory=None): - Creates a temp directory with files, directories and sub-directories - based on the dictionary supplied. It returns a temp directory, which - is parent of the structure supplied in the dictionary. - - random_path(self, length = 7): - Generate a 'random' path consisting of n-length strings of random chars. - - get_keystore_key(self, keyid): - This a monkey patch for keystore's get_key method. - - - Static Methods: - -------------- - Following methods are static because they technically don't operate - on any instances of the class, what they do is: they modify class variables - (dictionaries) that are shared among all instances of the class. So - it is possible to call them without instantiating the class. - - generate_rsakey(): - Generate rsa key and put it into 'rsa_keystore' dictionary. - - bind_keys_to_a_role(role, threshold=1): - Binds a key to a 'role' thus modifying 'semi_roledict' and - 'rsa_keystore' dictionaries. - - bind_keys_to_roles(role_thresholds={}): - Bind keys to top level roles. If dictionary of roles-thresholds is - supplied set - use it to crate appropriate amount of keys. If you - want to set a dictionary specifying a threshold each role should have, - the dictionary should look like this: {role : 2, ... } where role - might be 'root' and # is a threshold #. - - random_string(length=7): - Generate a 'length' long string of random characters. - """ - - # List of all top level roles. - role_list = ['root', 'targets', 'snapshot', 'timestamp'] - - # List of delegated roles. - delegated_role_list = ['targets/delegated_role1', - 'targets/delegated_role1/delegated_role2'] - - # 'rsa_keyids' stores keyids of all created rsa keys. - rsa_keyids = [] - - # 'rsa_keystore' stores all created rsa keys, that are RSAKEY_SCHEMA - # conformant, as values for their corresponding keyid dictionary keys. - # {keyid : {-- rsa key --}, ...} - rsa_keystore = {} - - # 'rsa_passwords' stores the passwords for all created rsa keys. - rsa_passwords = {} - - # 'derived_keys' stores the salt and derived keys (e.g., PBKDF2) for the - # RSA keys. - rsa_derived_keys = {} - - # 'semi_roledict' because it lacks an item that a fully pledged - # ROLEDICT_SCHEMA dictionary would have i.e. 'path' key is absent. - semi_roledict = {} - - # 'top_level_role_info' same as 'semi_roledict' except that it only - # contains top-level roles. - top_level_role_info = {} - - junk_data = 'Stored data.' - - directory_dictionary = {'targets':[{'delegated_level1': - [{'delegated_level2':junk_data}, - junk_data]}, - junk_data, - junk_data]} - - config_expiration = {'expiration':{'days':0, 'years':0, - 'minutes':0, 'hours':0, 'seconds':0}} - - - mirrors = {'mirror1': {'url_prefix' : 'http://mirror1.com', - 'metadata_path' : 'metadata', - 'targets_path' : 'targets', - 'confined_target_dirs' : ['']}, - 'mirror2': {'url_prefix' : 'http://mirror2.com', - 'metadata_path' : 'metadata', - 'targets_path' : 'targets', - 'confined_target_dirs' : ['']}, - 'mirror3': {'url_prefix' : 'http://mirror3.com', - 'metadata_path' : 'metadata', - 'targets_path' : 'targets', - 'confined_target_dirs' : ['']}} - - - - - def setUp(self): - self._cleanup = [] - - - def tearDown(self): - for cleanup_function in self._cleanup: - # Perform clean up by executing clean-up functions. - try: - # OSError will occur if the directory was already removed. - cleanup_function() - except OSError: - pass - - - - - - def make_temp_directory(self, directory=None): - """Creates and returns an absolute path of a directory.""" - prefix = self.__class__.__name__+'_' - temp_directory = tempfile.mkdtemp(prefix=prefix, dir=directory) - def _destroy_temp_directory(): - shutil.rmtree(temp_directory) - self._cleanup.append(_destroy_temp_directory) - return temp_directory - - - - - - def make_temp_file(self, suffix='.txt', directory=None): - """Creates and returns an absolute path of an empty file.""" - prefix='tmp_file_'+self.__class__.__name__+'_' - temp_file = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=directory) - def _destroy_temp_file(): - os.unlink(temp_file[1]) - self._cleanup.append(_destroy_temp_file) - return temp_file[1] - - - - - - def make_temp_data_file(self, suffix='', directory=None, data = junk_data): - """Returns an absolute path of a temp file containing data.""" - temp_file_path = self.make_temp_file(suffix=suffix, directory=directory) - temp_file = open(temp_file_path, 'wb') - temp_file.write(data) - temp_file.close() - return temp_file_path - - - - - - def make_temp_config_file(self, suffix='', directory=None, config_dict={}, expiration=None): - """ - Creates a temporary file and puts a simple config - dictionary in it using ConfigParser. - It then returns the temp file path, dictionary tuple. - """ - - config = ConfigParser.RawConfigParser() - if not config_dict: - # Using the fact that empty sequences are false. - # Make some mock config data. Make sure it at least has 'keyid', - # 'threshold' and 'days' keys. - config_dict = {'expiration':{'days':100}, - 'root':{'keyids':['123abc','123abc'], 'threshold':2}} - if expiration: - config_dict['expiration'] = {} - config_dict['expiration'] = self.config_expiration['expiration'] - config_dict['expiration']['days'] = expiration - for section in config_dict: - config.add_section(section) - for key in config_dict[section]: - config.set(section, key, config_dict[section][key]) - config_path = self.make_temp_file(suffix=suffix, directory=directory) - config_file = open(config_path, 'wb') - config.write(config_file) - config_file.close() - return (config_path, config_dict) - - - - - - def make_temp_directory_with_data_files(self, _current_dir=None, - directory_content=directory_dictionary, directory=None): - """ - Creates a temp directory with files, directories and sub-directories - based on the dictionary supplied. It returns a temp directory, which - is parent of the structure supplied in the dictionary. When nested - directories desired use lists as values ex. {'dir_1':[{dir2:None}]} - to get '/tmp/tmp_dir_Test_random/dir_1/dir_2' without files. - - - directory: Specifies a path where to create the new directory in - (like repository directory). If 'None' temp directory would be - created (recommended). - - _current_dir: Used internally. Represents a current directory, for - example '/tmp/tmp_dir_Test_random', - '/tmp/tmp_dir_Test_random/targets/' and - '/tmp/tmp_dir_Test_random/targets/more_targets' would all be - current directories in turn since they all contain either files - or other directories. - - directory_content: Represents a dictionary with desired tree - structure to be attached to the 'directory'. - - Example: - - directory_dict = {'targets':[{'more_targets': junk_data}, - junk_data, junk_data]} - - self.make_temp_directory_with_data_files(directory_content= - directory_dict) - Creates: - /tmp/tmp_dir_Test_random/ - /tmp/tmp_dir_Test_random/targets/ - /tmp/tmp_dir_Test_random/targets/tmp_random1.txt - /tmp/tmp_dir_Test_random/targets/tmp_random2.txt - /tmp/tmp_dir_Test_random/targets/more_targets/ - /tmp/tmp_dir_Test_random/targets/more_targets/tmp_random3.txt - Returns: - ('/tmp/tmp_dir_Test_random/', [targets/tmp_random1.txt, - targets/tmp_random2.txt, targets/more_targets/tmp_random3.txt]) - """ - - if not _current_dir: - if directory: - _current_dir = directory - else: - _current_dir = self.make_temp_directory() - - # Calls itself with _current_dir set. - self.make_temp_directory_with_data_files(_current_dir=_current_dir) - temp_target_files = [] - - for directory, _junk, files in os.walk(_current_dir): - for target in files: - full_path = os.path.join(directory, target) - rel_path = os.path.relpath(full_path, _current_dir) - temp_target_files.append(rel_path) - - return _current_dir, temp_target_files - - for key in directory_content: - # Create directory 'key'. - _new_current_dir = os.path.join(_current_dir, key) - os.mkdir(_new_current_dir) - - # We have the directory. Check if value of key is a list or a str. - # If a list iterate through it. - # Else create a file with content of the item/value. - if isinstance(directory_content[key],list) and\ - len(directory_content[key]) > 1: - - # Check that there are more than 1 item in the list. - # else create a file with content of the item. - for item in range(len(directory_content[key])): - if isinstance(directory_content[key][item], dict): - # Pass current directory which is now '_new_current_dir' and the - # dictionary 'directory_content[key][item]' - self.make_temp_directory_with_data_files( - _current_dir=_new_current_dir, - directory_content=directory_content[key][item]) - else: - # Create a file w/ data, returning its address. - self.make_temp_data_file(suffix='.txt', - directory=_new_current_dir, - data=directory_content[key][item]) - - else: - # Create a file w/ data, returning its address. - if directory_content[key]: - if isinstance(directory_content[key], str): - self.make_temp_data_file(suffix='.txt', directory=_new_current_dir, - data=directory_content[key]) - - elif isinstance(directory_content[key], list) and\ - len(directory_content[key])==1: - self.make_temp_data_file(suffix='.txt', directory=_new_current_dir, - data=directory_content[key][0]) - - - - - - def random_path(self, length = 7): - """Generate a 'random' path consisting of random n-length strings.""" - - rand_path = '/'+self.random_string(length) - for i in range(2): - rand_path = os.path.join(rand_path, self.random_string(length)) - - return rand_path - - - - - - def get_keystore_key(self, keyid): - """This is a monkey patch for keystore's get_key method.""" - - return self.rsa_keystore[keyid] - - - - - - @staticmethod - def generate_rsakey(): - """ - This method generates a rsa key as shown below. It puts it in - 'rsa_keystore' and returns the 'keyid' of the created rsa dictionary. - - {'keytype': 'rsa', - 'keyid': keyid, - 'keyval': {'public': '-----BEGIN RSA PUBLIC KEY----- ...', - 'private': '-----BEGIN RSA PRIVATE KEY----- ...'}} - """ - - rsakey = tuf.keys.generate_rsa_key() - keyid = rsakey['keyid'] - Modified_TestCase.rsa_keyids.append(keyid) - password = Modified_TestCase.random_string() - Modified_TestCase.rsa_passwords[keyid] = password - salt, iterations, derived_key = keystore._generate_derived_key(password) - Modified_TestCase.rsa_derived_keys[keyid] = {'salt': salt, - 'derived_key': derived_key, - 'iterations': iterations} - Modified_TestCase.rsa_keystore[keyid] = rsakey - - return keyid - - - - - - def create_temp_keystore_directory(self, keystore_dicts=False): - - if not self.rsa_keystore or not self.rsa_derived_keys: - msg = 'Populate \'rsa_keystore\' and \'rsa_passwords\''+\ - ' before invoking this method.' - sys.exit(msg) - - temp_keystore_directory = self.make_temp_directory() - keystore._keystore = self.rsa_keystore - keystore._derived_keys = self.rsa_derived_keys - keystore.save_keystore_to_keyfiles(temp_keystore_directory) - if not keystore_dicts: - keystore._keystore={} - keystore._derived_keys={} - - return temp_keystore_directory - - - - - - @staticmethod - def bind_keys_to_a_role(role, threshold=1): - """ - Binds a key to a 'role' thus modifying 'semi_roledict' - and 'rsa_keystore' dictionaries. If 'threshold' is given, - 'threshold' number of keys are added to the 'role', otherwise - 'threshold' is set to 1. There might be existing keys bound - to the role, this method will add 'threshold' amount of keys - to already existing keys. - """ - - if not Modified_TestCase.semi_roledict.has_key(role): - # If 'semi_roledict' doesn't contain the 'role', initialize it. - Modified_TestCase.semi_roledict[role] = {} - Modified_TestCase.semi_roledict[role]['keyids'] = [] - Modified_TestCase.semi_roledict[role]['threshold'] = threshold - else: - # Update the role's threshold. - Modified_TestCase.semi_roledict[role]['threshold'] += threshold - - for number in range(threshold): - # Create rsa keys and store their keyids in 'keyids' list. - # Side effect: rsa_keystore gets populated with rsa keys. - Modified_TestCase.semi_roledict[role]['keyids'].\ - append(Modified_TestCase.generate_rsakey()) - - if role in Modified_TestCase.role_list: - Modified_TestCase.top_level_role_info[role] = {} - Modified_TestCase.top_level_role_info[role] = \ - Modified_TestCase.semi_roledict[role] - - - - - - @staticmethod - def bind_keys_to_roles(role_thresholds={}): - """ - Bind keys to top level roles. If dictionary of roles-thresholds - is supplied set - use it to create appropriate amount of keys. If you - want to set a dictionary specifying a threshold each role should have, - the dictionary should look like this: {role : 2, ... } where role - might be 'root' and # is a threshold #. - """ - - list_of_all_roles = Modified_TestCase.role_list + \ - Modified_TestCase.delegated_role_list - for role in list_of_all_roles: - if role_thresholds: - Modified_TestCase.bind_keys_to_a_role(role, - threshold=role_thresholds[role]) - else: - Modified_TestCase.bind_keys_to_a_role(role) - - - - - - @staticmethod - def random_string(length=15): - """Generate a random string of specified length.""" - - rand_str = '' - for letter in range(length): - rand_str += random.choice('abcdefABCDEF'+string.digits) - - return rand_str - - - - - - @staticmethod - def clear_toolbox(): - Modified_TestCase.rsa_keyids = [] - Modified_TestCase.rsa_keystore.clear() - Modified_TestCase.rsa_passwords.clear() - Modified_TestCase.rsa_derived_keys.clear() - Modified_TestCase.semi_roledict.clear() - Modified_TestCase.top_level_role_info.clear() diff --git a/tox.ini b/tox.ini new file mode 100644 index 00000000..2875a56e --- /dev/null +++ b/tox.ini @@ -0,0 +1,20 @@ +# Tox (http://tox.testrun.org/) is a tool for running tests +# in multiple virtualenvs. This configuration file will run the +# test suite on all supported python versions. To use it, "pip install tox" +# and then run "tox" from this directory. + +[tox] +envlist = py27 + + +[testenv] +changedir = tests + +commands = + coverage run --source tuf aggregate_tests.py + coverage report -m + +deps = + coverage + pynacl + pycrypto diff --git a/tuf/__init__.py b/tuf/__init__.py index eefde2d8..61a83dc4 100755 --- a/tuf/__init__.py +++ b/tuf/__init__.py @@ -313,9 +313,11 @@ def __str__(self): try: # http://docs.python.org/2/library/urlparse.html#urlparse.urlparse mirror_url_tokens = urlparse.urlparse(mirror_url) + except: logging.exception('Failed to parse mirror URL: '+str(mirror_url)) mirror_netloc = mirror_url + else: mirror_netloc = mirror_url_tokens.netloc diff --git a/tuf/download.py b/tuf/download.py index cd8bf2ac..6013e38d 100755 --- a/tuf/download.py +++ b/tuf/download.py @@ -276,9 +276,8 @@ class SaferHTTPResponse(httplib.HTTPResponse): def __init__(self, sock, debuglevel=0, strict=0, method=None, buffering=False): - httplib.HTTPResponse.__init__(self, sock, debuglevel=debuglevel, - strict=strict, method=method, - buffering=buffering) + httplib.HTTPResponse.__init__(self, sock, debuglevel, + strict, method) # Delete the previous socket file-like object... del self.fp diff --git a/tuf/hash.py b/tuf/hash.py index 161a181c..4fe96455 100755 --- a/tuf/hash.py +++ b/tuf/hash.py @@ -50,7 +50,8 @@ from Crypto.Hash import SHA384 from Crypto.Hash import SHA512 _supported_libraries.append('pycrypto') -except ImportError: + +except ImportError: # pragma: no cover logger.debug('Pycrypto hash algorithms could not be imported. ' 'Supported libraries: '+str(_SUPPORTED_LIB_LIST)) @@ -61,13 +62,14 @@ try: import hashlib _supported_libraries.append('hashlib') -except ImportError: + +except ImportError: # pragma: no cover logger.debug('Hashlib could not be imported. ' 'Supported libraries: '+str(_SUPPORTED_LIB_LIST)) pass # Were we able to import any hash libraries? -if not _supported_libraries: +if not _supported_libraries: # pragma: no cover # This is fatal, we'll have no way of generating hashes. raise tuf.Error('Unable to import a hash library from the ' 'following supported list: '+str(_SUPPORTED_LIB_LIST)) @@ -132,6 +134,7 @@ def digest(algorithm=_DEFAULT_HASH_ALGORITHM, if hash_library == 'hashlib' and hash_library in _supported_libraries: try: return hashlib.new(algorithm) + except ValueError: raise tuf.UnsupportedAlgorithmError(algorithm) @@ -295,7 +298,9 @@ def data_to_string(data): if isinstance(data, str): return data + elif isinstance(data, unicode): return data.encode("utf-8") + else: return str(data) diff --git a/tuf/keys.py b/tuf/keys.py index b59df3aa..3b32b8bb 100755 --- a/tuf/keys.py +++ b/tuf/keys.py @@ -80,7 +80,7 @@ import Crypto import tuf.pycrypto_keys _available_crypto_libraries.append('pycrypto') -except ImportError: +except ImportError: # pragma: no cover pass # Import the PyNaCl library, if available. It is recommended this library be @@ -99,7 +99,7 @@ # PyNaCl's 'cffi' dependency may raise an 'IOError' exception when importing # 'nacl.signing'. - except (ImportError, IOError): + except (ImportError, IOError): # pragma: no cover pass # The optimized version of the ed25519 library provided by default is imported @@ -213,7 +213,8 @@ def generate_rsa_key(bits=_DEFAULT_RSA_KEY_BITS): # tuf.formats.RSAKEYBITS_SCHEMA.check_match(). if _RSA_CRYPTO_LIBRARY == 'pycrypto': public, private = tuf.pycrypto_keys.generate_rsa_public_and_private(bits) - else: + + else: # pragma: no cover message = 'Invalid crypto library: '+repr(_RSA_CRYPTO_LIBRARY)+'.' raise tuf.UnsupportedLibraryError(message) @@ -295,7 +296,8 @@ def generate_ed25519_key(): if 'pynacl' in _available_crypto_libraries: public, private = \ tuf.ed25519_keys.generate_public_and_private() - else: + + else: # pragma: no cover message = 'The required PyNaCl library is unavailable.' raise tuf.UnsupportedLibraryError(message) @@ -652,7 +654,8 @@ def create_signature(key_dict, data): if keytype == 'rsa': if _RSA_CRYPTO_LIBRARY == 'pycrypto': sig, method = tuf.pycrypto_keys.create_rsa_signature(private, data) - else: + + else: # pragma: no cover message = 'Unsupported "tuf.conf.RSA_CRYPTO_LIBRARY": '+\ repr(_RSA_CRYPTO_LIBRARY)+'.' raise tuf.UnsupportedLibraryError(message) @@ -663,11 +666,12 @@ def create_signature(key_dict, data): if 'pynacl' in _available_crypto_libraries: sig, method = tuf.ed25519_keys.create_signature(public, private, data) - else: + else: # pragma: no cover message = 'The required PyNaCl library is unavailable.' raise tuf.UnsupportedLibraryError(message) - else: + # 'tuf.formats.ANYKEY_SCHEMA' should detect invalid key types. + else: # pragma: no cover raise TypeError('Invalid key type.') # Build the signature dictionary to be returned. @@ -778,15 +782,16 @@ def verify_signature(key_dict, signature, data): # otherwise raise an exception. if keytype == 'rsa': if _RSA_CRYPTO_LIBRARY == 'pycrypto': - if 'pycrypto' not in _available_crypto_libraries: + if 'pycrypto' not in _available_crypto_libraries: # pragma: no cover message = 'Metadata downloaded from the remote repository specified'+\ ' an RSA signature. Verifying RSA signatures requires PyCrypto.' +\ '\n$ pip install PyCrypto, or pip install tuf[tools].' raise tuf.UnsupportedLibraryError(message) + else: valid_signature = tuf.pycrypto_keys.verify_rsa_signature(sig, method, public, data) - else: + else: message = 'Unsupported "tuf.conf.RSA_CRYPTO_LIBRARY": '+\ repr(_RSA_CRYPTO_LIBRARY)+'.' raise tuf.UnsupportedLibraryError(message) @@ -803,7 +808,9 @@ def verify_signature(key_dict, signature, data): valid_signature = tuf.ed25519_keys.verify_signature(public, method, sig, data, use_pynacl=False) - else: + + # 'tuf.formats.ANYKEY_SCHEMA' should detect invalid key types. + else: # pragma: no cover raise TypeError('Unsupported key type.') return valid_signature @@ -1073,6 +1080,7 @@ def encrypt_key(key_object, password): if _GENERAL_CRYPTO_LIBRARY == 'pycrypto': encrypted_key = \ tuf.pycrypto_keys.encrypt_key(key_object, password) + else: message = 'Invalid crypto library: '+repr(_GENERAL_CRYPTO_LIBRARY)+'.' raise tuf.UnsupportedLibraryError(message) @@ -1169,6 +1177,7 @@ def decrypt_key(encrypted_key, passphrase): if _GENERAL_CRYPTO_LIBRARY == 'pycrypto': key_object = \ tuf.pycrypto_keys.decrypt_key(encrypted_key, passphrase) + else: message = 'Invalid crypto library: '+repr(_GENERAL_CRYPTO_LIBRARY)+'.' raise tuf.UnsupportedLibraryError(message) @@ -1250,6 +1259,7 @@ def create_rsa_encrypted_pem(private_key, passphrase): if _RSA_CRYPTO_LIBRARY == 'pycrypto': encrypted_pem = \ tuf.pycrypto_keys.create_rsa_encrypted_pem(private_key, passphrase) + else: message = 'Invalid crypto library: '+repr(_RSA_CRYPTO_LIBRARY)+'.' raise tuf.UnsupportedLibraryError(message) diff --git a/tuf/roledb.py b/tuf/roledb.py index 078ad089..835e5bb0 100755 --- a/tuf/roledb.py +++ b/tuf/roledb.py @@ -406,8 +406,11 @@ def remove_role(rolename): _check_rolename(rolename) remove_delegated_roles(rolename) - if rolename in _roledb_dict: - del _roledb_dict[rolename] + + # remove_delegated_roles() should have left 'rolename' in the database, and + # 'rolename' was verified to exist by _check_rolename(). + # Remove 'rolename'. + del _roledb_dict[rolename] diff --git a/tuf/unittest_toolbox.py b/tuf/unittest_toolbox.py new file mode 100755 index 00000000..7a5efb33 --- /dev/null +++ b/tuf/unittest_toolbox.py @@ -0,0 +1,137 @@ +""" + + unittest_toolbox.py + + + Konstantin Andrianov. + + + March 26, 2012. + + + See LICENSE for licensing information. + + + Provides an array of various methods for unit testing. Use it instead of + actual unittest module. This module builds on unittest module. + Specifically, Modified_TestCase is a derived class from unittest.TestCase. +""" + +import os +import sys +import shutil +import unittest +import tempfile +import random +import string + + +class Modified_TestCase(unittest.TestCase): + """ + + Provide additional test-setup methods to make testing + of module's methods-under-test as independent as possible. + + If you want to modify setUp()/tearDown() do: + class Your_Test_Class(modified_TestCase): + def setUp(): + your setup modification + your setup modification + ... + modified_TestCase.setUp(self) + + + make_temp_directory(self, directory=None): + Creates and returns an absolute path of a temporary directory. + + make_temp_file(self, suffix='.txt', directory=None): + Creates and returns an absolute path of an empty temp file. + + make_temp_data_file(self, suffix='', directory=None, data = junk_data): + Returns an absolute path of a temp file containing some data. + + random_path(self, length = 7): + Generate a 'random' path consisting of n-length strings of random chars. + + + Static Methods: + -------------- + Following methods are static because they technically don't operate + on any instances of the class, what they do is: they modify class variables + (dictionaries) that are shared among all instances of the class. So + it is possible to call them without instantiating the class. + + random_string(length=7): + Generate a 'length' long string of random characters. + """ + + + def setUp(self): + self._cleanup = [] + + + + def tearDown(self): + for cleanup_function in self._cleanup: + # Perform clean up by executing clean-up functions. + try: + # OSError will occur if the directory was already removed. + cleanup_function() + + except OSError: + pass + + + + def make_temp_directory(self, directory=None): + """Creates and returns an absolute path of a directory.""" + prefix = self.__class__.__name__+'_' + temp_directory = tempfile.mkdtemp(prefix=prefix, dir=directory) + def _destroy_temp_directory(): + shutil.rmtree(temp_directory) + self._cleanup.append(_destroy_temp_directory) + return temp_directory + + + + def make_temp_file(self, suffix='.txt', directory=None): + """Creates and returns an absolute path of an empty file.""" + prefix='tmp_file_'+self.__class__.__name__+'_' + temp_file = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=directory) + def _destroy_temp_file(): + os.unlink(temp_file[1]) + self._cleanup.append(_destroy_temp_file) + return temp_file[1] + + + + def make_temp_data_file(self, suffix='', directory=None, data = 'junk data'): + """Returns an absolute path of a temp file containing data.""" + temp_file_path = self.make_temp_file(suffix=suffix, directory=directory) + temp_file = open(temp_file_path, 'wb') + temp_file.write(data) + temp_file.close() + return temp_file_path + + + + def random_path(self, length = 7): + """Generate a 'random' path consisting of random n-length strings.""" + + rand_path = '/'+self.random_string(length) + for i in range(2): + rand_path = os.path.join(rand_path, self.random_string(length)) + + return rand_path + + + + @staticmethod + def random_string(length=15): + """Generate a random string of specified length.""" + + rand_str = '' + for letter in range(length): + rand_str += random.choice('abcdefABCDEF'+string.digits) + + return rand_str