Add Travis CI, coveralls, and coverage-related updates.

This commit is contained in:
vladdd 2014-04-20 16:15:19 -04:00
parent 7b4c3218e1
commit 08a2bad2c0
26 changed files with 334 additions and 589 deletions

2
.gitignore vendored
View file

@ -9,3 +9,5 @@ build/*
*.swo
*.swp
tuf.egg-info
.coverage
.tox/*

17
.travis.yml Normal file
View file

@ -0,0 +1,17 @@
language: python
python:
- "2.7"
install:
- pip install tox
- pip install coveralls
script: tox
after_success:
coveralls
branches:
only:
- develop

15
tests/.coveragerc Normal file
View file

@ -0,0 +1,15 @@
[run]
branch = True
[report]
exclude_lines =
pragma: no cover
def _check_crypto_libraries
def __str__
if __name__ == .__main__.:
omit =
*/tuf/interposition/*
*/tuf/_vendor/*
*/tuf/compatibility/*

View file

@ -45,8 +45,8 @@
# Provide command-line option to randomize the order in which the tests run.
# Randomization might catch errors with unit tests that do not properly clean
# up or restore monkey-patched modules.
if '--random' in sys.argv:
random.shuffle(tests_without_extension)
#if '--random' in sys.argv:
random.shuffle(tests_without_extension)
suite = unittest.TestLoader().loadTestsFromNames(tests_without_extension)

View file

@ -48,7 +48,7 @@
import tuf.util
import tuf.log
import tuf.client.updater as updater
import tests.unittest_toolbox as unittest_toolbox
import tuf.unittest_toolbox as unittest_toolbox
logger = logging.getLogger('tuf.test_arbitrary_package_attack')
@ -93,8 +93,6 @@ def tearDownClass(cls):
# Remove the temporary repository directory, which should contain all the
# metadata, targets, and key files generated of all the test cases.
shutil.rmtree(cls.temporary_directory)
unittest_toolbox.Modified_TestCase.clear_toolbox()
# Kill the SimpleHTTPServer process.
if cls.server_process.returncode is None:

View file

@ -20,6 +20,7 @@
Otherwise, module that launches simple server would not be found.
"""
from __future__ import absolute_import
import hashlib
import logging
@ -35,7 +36,7 @@
import tuf.conf as conf
import tuf.download as download
import tuf.log
import tests.unittest_toolbox as unittest_toolbox
import tuf.unittest_toolbox as unittest_toolbox
logger = logging.getLogger('tuf.test_download')

View file

@ -51,7 +51,7 @@
import tuf.util
import tuf.log
import tuf.client.updater as updater
import tests.unittest_toolbox as unittest_toolbox
import tuf.unittest_toolbox as unittest_toolbox
logger = logging.getLogger('tuf.test_endless_data_attack')
@ -97,8 +97,6 @@ def tearDownClass(cls):
# metadata, targets, and key files generated of all the test cases.
shutil.rmtree(cls.temporary_directory)
unittest_toolbox.Modified_TestCase.clear_toolbox()
# Kill the SimpleHTTPServer process.
if cls.server_process.returncode is None:
logger.info('Server process '+str(cls.server_process.pid)+' terminated.')

View file

@ -52,7 +52,7 @@
import tuf.util
import tuf.log
import tuf.client.updater as updater
import tests.unittest_toolbox as unittest_toolbox
import tuf.unittest_toolbox as unittest_toolbox
logger = logging.getLogger('tuf.test_extraneous_dependencies_attack')
@ -100,8 +100,6 @@ def tearDownClass(cls):
# metadata, targets, and key files generated of all the test cases.
shutil.rmtree(cls.temporary_directory)
unittest_toolbox.Modified_TestCase.clear_toolbox()
# Kill the SimpleHTTPServer process.
if cls.server_process.returncode is None:
logger.info('Server process '+str(cls.server_process.pid)+' terminated.')

View file

@ -225,6 +225,21 @@ def _do_update_file_obj(self, library):
# to always seek to the beginning.
self.assertEqual(digest_object_truth.digest(), digest_object.digest())
def test_data_to_string(self):
self.assertEqual('12', tuf.hash.data_to_string('12'))
self.assertEqual(u'hello', tuf.hash.data_to_string(unicode('hello')))
self.assertEqual('12', tuf.hash.data_to_string(12))
def test_unsupported_digest_algorithm_and_library(self):
self.assertRaises(tuf.UnsupportedAlgorithmError, tuf.hash.digest,
'sha123', 'hashlib')
self.assertRaises(tuf.UnsupportedAlgorithmError, tuf.hash.digest,
'sha123', 'pycrypto')
self.assertRaises(tuf.UnsupportedLibraryError, tuf.hash.digest,
'sha256', 'badlib')
# Run unit test.

View file

@ -45,7 +45,7 @@
import tuf.log
import tuf.client.updater as updater
import tuf.repository_tool as repo_tool
import tests.unittest_toolbox as unittest_toolbox
import tuf.unittest_toolbox as unittest_toolbox
# The repository tool is imported and logs console messages by default. Disable
# console log messages generated by this unit test.
@ -95,8 +95,6 @@ def tearDownClass(cls):
# metadata, targets, and key files generated of all the test cases.
shutil.rmtree(cls.temporary_directory)
unittest_toolbox.Modified_TestCase.clear_toolbox()
# Kill the SimpleHTTPServer process.
if cls.server_process.returncode is None:
logger.info('Server process '+str(cls.server_process.pid)+' terminated.')

View file

@ -31,18 +31,13 @@
DATA = 'SOME DATA REQUIRING AUTHENTICITY.'
rsakey_dict = KEYS.generate_rsa_key()
temp_key_info_vals = rsakey_dict.values()
temp_key_vals = rsakey_dict['keyval'].values()
class TestKeys(unittest.TestCase):
def setUp(self):
rsakey_dict['keytype']=temp_key_info_vals[0]
rsakey_dict['keyid']=temp_key_info_vals[1]
rsakey_dict['keyval']=temp_key_info_vals[2]
rsakey_dict['keyval']['public']=temp_key_vals[0]
rsakey_dict['keyval']['private']=temp_key_vals[1]
@classmethod
def setUpClass(cls):
cls.rsakey_dict = KEYS.generate_rsa_key()
cls.ed25519key_dict = KEYS.generate_ed25519_key()
def test_generate_rsa_key(self):
@ -66,9 +61,10 @@ def test_generate_rsa_key(self):
self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(KEYS.generate_rsa_key(4096)))
def test_format_keyval_to_metadata(self):
keyvalue = rsakey_dict['keyval']
keytype = rsakey_dict['keytype']
keyvalue = self.rsakey_dict['keyval']
keytype = self.rsakey_dict['keytype']
key_meta = KEYS.format_keyval_to_metadata(keytype, keyvalue)
# Check if the format of the object returned by this function corresponds
@ -87,39 +83,46 @@ def test_format_keyval_to_metadata(self):
self.assertRaises(tuf.FormatError, KEYS.format_keyval_to_metadata,
'bad_keytype', keyvalue)
public = keyvalue['public']
del keyvalue['public']
self.assertRaises(tuf.FormatError, KEYS.format_keyval_to_metadata,
keytype, keyvalue)
keyvalue['public'] = public
def test_format_metadata_to_key(self):
# Reconfiguring rsakey_dict to conform to KEY_SCHEMA
# i.e. {keytype: 'rsa', keyval: {public: pub_key, private: priv_key}}
#keyid = rsakey_dict['keyid']
del rsakey_dict['keyid']
keyid = self.rsakey_dict['keyid']
del self.rsakey_dict['keyid']
rsakey_dict_from_meta = KEYS.format_metadata_to_key(rsakey_dict)
rsakey_dict_from_meta = KEYS.format_metadata_to_key(self.rsakey_dict)
# Check if the format of the object returned by this function corresponds
# to RSAKEY_SCHEMA format.
self.assertEqual(None,
tuf.formats.RSAKEY_SCHEMA.check_match(rsakey_dict_from_meta),
FORMAT_ERROR_MSG)
self.rsakey_dict['keyid'] = keyid
# Supplying a wrong number of arguments.
self.assertRaises(TypeError, KEYS.format_metadata_to_key)
args = (rsakey_dict, rsakey_dict)
args = (self.rsakey_dict, self.rsakey_dict)
self.assertRaises(TypeError, KEYS.format_metadata_to_key, *args)
# Supplying a malformed argument to the function - should get FormatError
del rsakey_dict['keyval']
keyval = self.rsakey_dict['keyval']
del self.rsakey_dict['keyval']
self.assertRaises(tuf.FormatError, KEYS.format_metadata_to_key,
rsakey_dict)
self.rsakey_dict)
self.rsakey_dict['keyval'] = keyval
def test_helper_get_keyid(self):
keytype = rsakey_dict['keytype']
keyvalue = rsakey_dict['keyval']
keytype = self.rsakey_dict['keytype']
keyvalue = self.rsakey_dict['keyval']
# Check format of 'keytype'.
self.assertEqual(None, tuf.formats.KEYTYPE_SCHEMA.check_match(keytype),
@ -138,49 +141,60 @@ def test_helper_get_keyid(self):
def test_create_signature(self):
# Creating a signature for 'DATA'.
signature = KEYS.create_signature(rsakey_dict, DATA)
rsa_signature = KEYS.create_signature(self.rsakey_dict, DATA)
ed25519_signature = KEYS.create_signature(self.ed25519key_dict, DATA)
# Check format of output.
self.assertEqual(None,
tuf.formats.SIGNATURE_SCHEMA.check_match(signature),
tuf.formats.SIGNATURE_SCHEMA.check_match(rsa_signature),
FORMAT_ERROR_MSG)
self.assertEqual(None,
tuf.formats.SIGNATURE_SCHEMA.check_match(ed25519_signature),
FORMAT_ERROR_MSG)
# Removing private key from 'rsakey_dict' - should raise a TypeError.
rsakey_dict['keyval']['private'] = ''
private = self.rsakey_dict['keyval']['private']
self.rsakey_dict['keyval']['private'] = ''
args = (rsakey_dict, DATA)
args = (self.rsakey_dict, DATA)
self.assertRaises(TypeError, KEYS.create_signature, *args)
# Supplying an incorrect number of arguments.
self.assertRaises(TypeError, KEYS.create_signature)
self.rsakey_dict['keyval']['private'] = private
def test_verify_signature(self):
# Creating a signature 'signature' of 'DATA' to be verified.
signature = KEYS.create_signature(rsakey_dict, DATA)
# Creating a signature of 'DATA' to be verified.
rsa_signature = KEYS.create_signature(self.rsakey_dict, DATA)
ed25519_signature = KEYS.create_signature(self.ed25519key_dict, DATA)
# Verifying the 'signature' of 'DATA'.
verified = KEYS.verify_signature(rsakey_dict, signature, DATA)
verified = KEYS.verify_signature(self.rsakey_dict, rsa_signature, DATA)
self.assertTrue(verified, "Incorrect signature.")
# Verifying the 'ed25519_signature' of 'DATA'.
verified = KEYS.verify_signature(self.ed25519key_dict, ed25519_signature, DATA)
self.assertTrue(verified, "Incorrect signature.")
# Testing an invalid 'signature'. Same 'signature' is passed, with
# Testing an invalid 'rsa_signature'. Same 'rsa_signature' is passed, with
# 'DATA' different than the original 'DATA' that was used
# in creating the 'signature'. Function should return 'False'.
# in creating the 'rsa_signature'. Function should return 'False'.
# Modifying 'DATA'.
_DATA = '1111'+DATA+'1111'
# Verifying the 'signature' of modified '_DATA'.
verified = KEYS.verify_signature(rsakey_dict, signature, _DATA)
verified = KEYS.verify_signature(self.rsakey_dict, rsa_signature, _DATA)
self.assertFalse(verified,
'Returned \'True\' on an incorrect signature.')
# Modifying 'signature' to pass an incorrect method since only
# 'PyCrypto-PKCS#1 PSS'
# is accepted.
signature['method'] = 'Biff'
# 'PyCrypto-PKCS#1 PSS' is accepted.
rsa_signature['method'] = 'Biff'
args = (rsakey_dict, signature, DATA)
args = (self.rsakey_dict, rsa_signature, DATA)
self.assertRaises(tuf.UnknownMethodError, KEYS.verify_signature, *args)
# Passing incorrect number of arguments.

View file

@ -15,12 +15,14 @@
Unit test for 'mirrors.py'.
"""
from __future__ import absolute_import
import unittest
import tuf
import tuf.formats as formats
import tuf.mirrors as mirrors
import tests.unittest_toolbox as unittest_toolbox
import tuf.unittest_toolbox as unittest_toolbox

View file

@ -50,7 +50,7 @@
import tuf.log
import tuf.client.updater as updater
import tuf.repository_tool as repo_tool
import tests.unittest_toolbox as unittest_toolbox
import tuf.unittest_toolbox as unittest_toolbox
# The repository tool is imported and logs console messages by default. Disable
# console log messages generated by this unit test.
@ -100,8 +100,6 @@ def tearDownClass(cls):
# Remove the temporary repository directory, which should contain all the
# metadata, targets, and key files generated of all the test cases.
shutil.rmtree(cls.temporary_directory)
unittest_toolbox.Modified_TestCase.clear_toolbox()
# Kill the SimpleHTTPServer process.
if cls.server_process.returncode is None:

View file

@ -51,7 +51,7 @@
import tuf.log
import tuf.client.updater as updater
import tuf.repository_tool as repo_tool
import tests.unittest_toolbox as unittest_toolbox
import tuf.unittest_toolbox as unittest_toolbox
# The repository tool is imported and logs console messages by default. Disable
# console log messages generated by this unit test.
@ -102,8 +102,6 @@ def tearDownClass(cls):
# metadata, targets, and key files generated of all the test cases.
shutil.rmtree(cls.temporary_directory)
unittest_toolbox.Modified_TestCase.clear_toolbox()
# Kill the SimpleHTTPServer process.
if cls.server_process.returncode is None:
logger.info('Server process '+str(cls.server_process.pid)+' terminated.')

View file

@ -383,6 +383,25 @@ def test_create_roledb_from_root_metadata(self):
def test_update_roleinfo(self):
rolename = 'targets'
roleinfo = {'keyids': ['123'], 'threshold': 1}
tuf.roledb.add_role(rolename, roleinfo)
# Test normal case.
tuf.roledb.update_roleinfo(rolename, roleinfo)
# Test for an unknown role.
self.assertRaises(tuf.UnknownRoleError, tuf.roledb.update_roleinfo,
'unknown_rolename', roleinfo)
# Test improperly formatted arguments.
self.assertRaises(tuf.FormatError, tuf.roledb.update_roleinfo, 1, roleinfo)
self.assertRaises(tuf.FormatError, tuf.roledb.update_roleinfo, rolename, 1)
def _test_rolename(self, test_function):
# Private function that tests the 'rolename' argument of 'test_function'
# for format, invalid name, and unknown role exceptions.

View file

@ -53,7 +53,7 @@
import tuf.util
import tuf.log
import tuf.client.updater as updater
import tests.unittest_toolbox as unittest_toolbox
import tuf.unittest_toolbox as unittest_toolbox
logger = logging.getLogger('tuf.test_slow_retrieval_attack')
@ -81,10 +81,8 @@ def tearDownClass(cls):
# Remove the temporary repository directory, which should contain all the
# metadata, targets, and key files generated of all the test cases.
shutil.rmtree(cls.temporary_directory)
unittest_toolbox.Modified_TestCase.clear_toolbox()
def _start_slow_server(self, mode):
# Launch a SimpleHTTPServer (serves files in the current directory).

View file

@ -23,7 +23,7 @@
The 'unittest_toolbox.py' module was created to provide additional testing
tools, such as automatically deleting temporary files created in test cases.
For more information, see 'tuf/tests/unittest_toolbox.py'.
For more information, see 'tests/unittest_toolbox.py'.
<Methodology>
Test cases here should follow a specific order (i.e., independent methods are
@ -38,6 +38,8 @@
less dependent than 2.
"""
from __future__ import absolute_import
import os
import time
import shutil
@ -56,7 +58,7 @@
import tuf.keydb
import tuf.roledb
import tuf.repository_tool as repo_tool
import tests.unittest_toolbox as unittest_toolbox
import tuf.unittest_toolbox as unittest_toolbox
import tuf.client.updater as updater
logger = logging.getLogger('tuf.test_updater')
@ -104,8 +106,6 @@ def tearDownClass(cls):
# metadata, targets, and key files generated for the test cases.
shutil.rmtree(cls.temporary_directory)
unittest_toolbox.Modified_TestCase.clear_toolbox()
# Kill the SimpleHTTPServer process.
if cls.server_process.returncode is None:
logger.info('\tServer process '+str(cls.server_process.pid)+' terminated.')

View file

@ -16,6 +16,7 @@
<Purpose>
Unit test for 'util.py'
"""
from __future__ import absolute_import
import os
import sys
@ -29,7 +30,7 @@
import tuf.log
import tuf.hash
import tuf.util as util
import tests.unittest_toolbox as unittest_toolbox
import tuf.unittest_toolbox as unittest_toolbox
logger = logging.getLogger('tuf.test_util')
@ -221,7 +222,7 @@ def test_B1_get_file_details(self):
# Test: Incorrect input.
bogus_inputs = [self.random_string(), 1234, [self.random_string()],
{'a', 'a'}, None]
{'a': 'b'}, None]
for bogus_input in bogus_inputs:
if isinstance(bogus_input, basestring):
self.assertRaises(tuf.Error, util.get_file_details, bogus_input)

View file

@ -1,503 +0,0 @@
"""
<Program>
unittest_toolbox.py
<Author>
Konstantin Andrianov
<Started>
March 26, 2012
<Copyright>
See LICENSE for licensing information.
<Purpose>
Provides an array of various methods for unit testing. Use it instead of
actual unittest module. This module builds on unittest module.
Specifically, Modified_TestCase is a derived class from unittest.TestCase.
"""
import os
import sys
import shutil
import unittest
import tempfile
import random
import string
import ConfigParser
import tuf.keys
#import tuf.repo.keystore as keystore
# Modify the number of iterations (from the higher default count) so the unit
# tests run faster.
#keystore._PBKDF2_ITERATIONS = 1000
class Modified_TestCase(unittest.TestCase):
"""
<Purpose>
Provide additional test-setup methods to make testing
of module's methods-under-test as independent as possible.
If you want to modify setUp()/tearDown() do:
class Your_Test_Class(modified_TestCase):
def setUp():
your setup modification
your setup modification
...
modified_TestCase.setUp(self)
<Methods>
make_temp_directory(self, directory=None):
Creates and returns an absolute path of a temporary directory.
make_temp_file(self, suffix='.txt', directory=None):
Creates and returns an absolute path of an empty temp file.
make_temp_data_file(self, suffix='', directory=None, data = junk_data):
Returns an absolute path of a temp file containing some data.
make_temp_config_file(self, suffix='', directory=None, config_dict={}, expiration=None):
Creates a temporary file and puts a config dictionary in it using
ConfigParser. It then returns a (config_file_path, config_dictionary)
tuple.
make_temp_directory_with_data_files(self, _current_dir=None,directory_content=\
directory_dictionary, directory=None):
Creates a temp directory with files, directories and sub-directories
based on the dictionary supplied. It returns a temp directory, which
is parent of the structure supplied in the dictionary.
random_path(self, length = 7):
Generate a 'random' path consisting of n-length strings of random chars.
get_keystore_key(self, keyid):
This a monkey patch for keystore's get_key method.
Static Methods:
--------------
Following methods are static because they technically don't operate
on any instances of the class, what they do is: they modify class variables
(dictionaries) that are shared among all instances of the class. So
it is possible to call them without instantiating the class.
generate_rsakey():
Generate rsa key and put it into 'rsa_keystore' dictionary.
bind_keys_to_a_role(role, threshold=1):
Binds a key to a 'role' thus modifying 'semi_roledict' and
'rsa_keystore' dictionaries.
bind_keys_to_roles(role_thresholds={}):
Bind keys to top level roles. If dictionary of roles-thresholds is
supplied set - use it to crate appropriate amount of keys. If you
want to set a dictionary specifying a threshold each role should have,
the dictionary should look like this: {role : 2, ... } where role
might be 'root' and # is a threshold #.
random_string(length=7):
Generate a 'length' long string of random characters.
"""
# List of all top level roles.
role_list = ['root', 'targets', 'snapshot', 'timestamp']
# List of delegated roles.
delegated_role_list = ['targets/delegated_role1',
'targets/delegated_role1/delegated_role2']
# 'rsa_keyids' stores keyids of all created rsa keys.
rsa_keyids = []
# 'rsa_keystore' stores all created rsa keys, that are RSAKEY_SCHEMA
# conformant, as values for their corresponding keyid dictionary keys.
# {keyid : {-- rsa key --}, ...}
rsa_keystore = {}
# 'rsa_passwords' stores the passwords for all created rsa keys.
rsa_passwords = {}
# 'derived_keys' stores the salt and derived keys (e.g., PBKDF2) for the
# RSA keys.
rsa_derived_keys = {}
# 'semi_roledict' because it lacks an item that a fully pledged
# ROLEDICT_SCHEMA dictionary would have i.e. 'path' key is absent.
semi_roledict = {}
# 'top_level_role_info' same as 'semi_roledict' except that it only
# contains top-level roles.
top_level_role_info = {}
junk_data = 'Stored data.'
directory_dictionary = {'targets':[{'delegated_level1':
[{'delegated_level2':junk_data},
junk_data]},
junk_data,
junk_data]}
config_expiration = {'expiration':{'days':0, 'years':0,
'minutes':0, 'hours':0, 'seconds':0}}
mirrors = {'mirror1': {'url_prefix' : 'http://mirror1.com',
'metadata_path' : 'metadata',
'targets_path' : 'targets',
'confined_target_dirs' : ['']},
'mirror2': {'url_prefix' : 'http://mirror2.com',
'metadata_path' : 'metadata',
'targets_path' : 'targets',
'confined_target_dirs' : ['']},
'mirror3': {'url_prefix' : 'http://mirror3.com',
'metadata_path' : 'metadata',
'targets_path' : 'targets',
'confined_target_dirs' : ['']}}
def setUp(self):
self._cleanup = []
def tearDown(self):
for cleanup_function in self._cleanup:
# Perform clean up by executing clean-up functions.
try:
# OSError will occur if the directory was already removed.
cleanup_function()
except OSError:
pass
def make_temp_directory(self, directory=None):
"""Creates and returns an absolute path of a directory."""
prefix = self.__class__.__name__+'_'
temp_directory = tempfile.mkdtemp(prefix=prefix, dir=directory)
def _destroy_temp_directory():
shutil.rmtree(temp_directory)
self._cleanup.append(_destroy_temp_directory)
return temp_directory
def make_temp_file(self, suffix='.txt', directory=None):
"""Creates and returns an absolute path of an empty file."""
prefix='tmp_file_'+self.__class__.__name__+'_'
temp_file = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=directory)
def _destroy_temp_file():
os.unlink(temp_file[1])
self._cleanup.append(_destroy_temp_file)
return temp_file[1]
def make_temp_data_file(self, suffix='', directory=None, data = junk_data):
"""Returns an absolute path of a temp file containing data."""
temp_file_path = self.make_temp_file(suffix=suffix, directory=directory)
temp_file = open(temp_file_path, 'wb')
temp_file.write(data)
temp_file.close()
return temp_file_path
def make_temp_config_file(self, suffix='', directory=None, config_dict={}, expiration=None):
"""
Creates a temporary file and puts a simple config
dictionary in it using ConfigParser.
It then returns the temp file path, dictionary tuple.
"""
config = ConfigParser.RawConfigParser()
if not config_dict:
# Using the fact that empty sequences are false.
# Make some mock config data. Make sure it at least has 'keyid',
# 'threshold' and 'days' keys.
config_dict = {'expiration':{'days':100},
'root':{'keyids':['123abc','123abc'], 'threshold':2}}
if expiration:
config_dict['expiration'] = {}
config_dict['expiration'] = self.config_expiration['expiration']
config_dict['expiration']['days'] = expiration
for section in config_dict:
config.add_section(section)
for key in config_dict[section]:
config.set(section, key, config_dict[section][key])
config_path = self.make_temp_file(suffix=suffix, directory=directory)
config_file = open(config_path, 'wb')
config.write(config_file)
config_file.close()
return (config_path, config_dict)
def make_temp_directory_with_data_files(self, _current_dir=None,
directory_content=directory_dictionary, directory=None):
"""
Creates a temp directory with files, directories and sub-directories
based on the dictionary supplied. It returns a temp directory, which
is parent of the structure supplied in the dictionary. When nested
directories desired use lists as values ex. {'dir_1':[{dir2:None}]}
to get '/tmp/tmp_dir_Test_random/dir_1/dir_2' without files.
<Arguments>
directory: Specifies a path where to create the new directory in
(like repository directory). If 'None' temp directory would be
created (recommended).
_current_dir: Used internally. Represents a current directory, for
example '/tmp/tmp_dir_Test_random',
'/tmp/tmp_dir_Test_random/targets/' and
'/tmp/tmp_dir_Test_random/targets/more_targets' would all be
current directories in turn since they all contain either files
or other directories.
directory_content: Represents a dictionary with desired tree
structure to be attached to the 'directory'.
Example:
directory_dict = {'targets':[{'more_targets': junk_data},
junk_data, junk_data]}
self.make_temp_directory_with_data_files(directory_content=
directory_dict)
Creates:
/tmp/tmp_dir_Test_random/
/tmp/tmp_dir_Test_random/targets/
/tmp/tmp_dir_Test_random/targets/tmp_random1.txt
/tmp/tmp_dir_Test_random/targets/tmp_random2.txt
/tmp/tmp_dir_Test_random/targets/more_targets/
/tmp/tmp_dir_Test_random/targets/more_targets/tmp_random3.txt
Returns:
('/tmp/tmp_dir_Test_random/', [targets/tmp_random1.txt,
targets/tmp_random2.txt, targets/more_targets/tmp_random3.txt])
"""
if not _current_dir:
if directory:
_current_dir = directory
else:
_current_dir = self.make_temp_directory()
# Calls itself with _current_dir set.
self.make_temp_directory_with_data_files(_current_dir=_current_dir)
temp_target_files = []
for directory, _junk, files in os.walk(_current_dir):
for target in files:
full_path = os.path.join(directory, target)
rel_path = os.path.relpath(full_path, _current_dir)
temp_target_files.append(rel_path)
return _current_dir, temp_target_files
for key in directory_content:
# Create directory 'key'.
_new_current_dir = os.path.join(_current_dir, key)
os.mkdir(_new_current_dir)
# We have the directory. Check if value of key is a list or a str.
# If a list iterate through it.
# Else create a file with content of the item/value.
if isinstance(directory_content[key],list) and\
len(directory_content[key]) > 1:
# Check that there are more than 1 item in the list.
# else create a file with content of the item.
for item in range(len(directory_content[key])):
if isinstance(directory_content[key][item], dict):
# Pass current directory which is now '_new_current_dir' and the
# dictionary 'directory_content[key][item]'
self.make_temp_directory_with_data_files(
_current_dir=_new_current_dir,
directory_content=directory_content[key][item])
else:
# Create a file w/ data, returning its address.
self.make_temp_data_file(suffix='.txt',
directory=_new_current_dir,
data=directory_content[key][item])
else:
# Create a file w/ data, returning its address.
if directory_content[key]:
if isinstance(directory_content[key], str):
self.make_temp_data_file(suffix='.txt', directory=_new_current_dir,
data=directory_content[key])
elif isinstance(directory_content[key], list) and\
len(directory_content[key])==1:
self.make_temp_data_file(suffix='.txt', directory=_new_current_dir,
data=directory_content[key][0])
def random_path(self, length = 7):
"""Generate a 'random' path consisting of random n-length strings."""
rand_path = '/'+self.random_string(length)
for i in range(2):
rand_path = os.path.join(rand_path, self.random_string(length))
return rand_path
def get_keystore_key(self, keyid):
"""This is a monkey patch for keystore's get_key method."""
return self.rsa_keystore[keyid]
@staticmethod
def generate_rsakey():
"""
This method generates a rsa key as shown below. It puts it in
'rsa_keystore' and returns the 'keyid' of the created rsa dictionary.
{'keytype': 'rsa',
'keyid': keyid,
'keyval': {'public': '-----BEGIN RSA PUBLIC KEY----- ...',
'private': '-----BEGIN RSA PRIVATE KEY----- ...'}}
"""
rsakey = tuf.keys.generate_rsa_key()
keyid = rsakey['keyid']
Modified_TestCase.rsa_keyids.append(keyid)
password = Modified_TestCase.random_string()
Modified_TestCase.rsa_passwords[keyid] = password
salt, iterations, derived_key = keystore._generate_derived_key(password)
Modified_TestCase.rsa_derived_keys[keyid] = {'salt': salt,
'derived_key': derived_key,
'iterations': iterations}
Modified_TestCase.rsa_keystore[keyid] = rsakey
return keyid
def create_temp_keystore_directory(self, keystore_dicts=False):
if not self.rsa_keystore or not self.rsa_derived_keys:
msg = 'Populate \'rsa_keystore\' and \'rsa_passwords\''+\
' before invoking this method.'
sys.exit(msg)
temp_keystore_directory = self.make_temp_directory()
keystore._keystore = self.rsa_keystore
keystore._derived_keys = self.rsa_derived_keys
keystore.save_keystore_to_keyfiles(temp_keystore_directory)
if not keystore_dicts:
keystore._keystore={}
keystore._derived_keys={}
return temp_keystore_directory
@staticmethod
def bind_keys_to_a_role(role, threshold=1):
"""
Binds a key to a 'role' thus modifying 'semi_roledict'
and 'rsa_keystore' dictionaries. If 'threshold' is given,
'threshold' number of keys are added to the 'role', otherwise
'threshold' is set to 1. There might be existing keys bound
to the role, this method will add 'threshold' amount of keys
to already existing keys.
"""
if not Modified_TestCase.semi_roledict.has_key(role):
# If 'semi_roledict' doesn't contain the 'role', initialize it.
Modified_TestCase.semi_roledict[role] = {}
Modified_TestCase.semi_roledict[role]['keyids'] = []
Modified_TestCase.semi_roledict[role]['threshold'] = threshold
else:
# Update the role's threshold.
Modified_TestCase.semi_roledict[role]['threshold'] += threshold
for number in range(threshold):
# Create rsa keys and store their keyids in 'keyids' list.
# Side effect: rsa_keystore gets populated with rsa keys.
Modified_TestCase.semi_roledict[role]['keyids'].\
append(Modified_TestCase.generate_rsakey())
if role in Modified_TestCase.role_list:
Modified_TestCase.top_level_role_info[role] = {}
Modified_TestCase.top_level_role_info[role] = \
Modified_TestCase.semi_roledict[role]
@staticmethod
def bind_keys_to_roles(role_thresholds={}):
"""
Bind keys to top level roles. If dictionary of roles-thresholds
is supplied set - use it to create appropriate amount of keys. If you
want to set a dictionary specifying a threshold each role should have,
the dictionary should look like this: {role : 2, ... } where role
might be 'root' and # is a threshold #.
"""
list_of_all_roles = Modified_TestCase.role_list + \
Modified_TestCase.delegated_role_list
for role in list_of_all_roles:
if role_thresholds:
Modified_TestCase.bind_keys_to_a_role(role,
threshold=role_thresholds[role])
else:
Modified_TestCase.bind_keys_to_a_role(role)
@staticmethod
def random_string(length=15):
"""Generate a random string of specified length."""
rand_str = ''
for letter in range(length):
rand_str += random.choice('abcdefABCDEF'+string.digits)
return rand_str
@staticmethod
def clear_toolbox():
Modified_TestCase.rsa_keyids = []
Modified_TestCase.rsa_keystore.clear()
Modified_TestCase.rsa_passwords.clear()
Modified_TestCase.rsa_derived_keys.clear()
Modified_TestCase.semi_roledict.clear()
Modified_TestCase.top_level_role_info.clear()

20
tox.ini Normal file
View file

@ -0,0 +1,20 @@
# Tox (http://tox.testrun.org/) is a tool for running tests
# in multiple virtualenvs. This configuration file will run the
# test suite on all supported python versions. To use it, "pip install tox"
# and then run "tox" from this directory.
[tox]
envlist = py27
[testenv]
changedir = tests
commands =
coverage run --source tuf aggregate_tests.py
coverage report -m
deps =
coverage
pynacl
pycrypto

View file

@ -313,9 +313,11 @@ def __str__(self):
try:
# http://docs.python.org/2/library/urlparse.html#urlparse.urlparse
mirror_url_tokens = urlparse.urlparse(mirror_url)
except:
logging.exception('Failed to parse mirror URL: '+str(mirror_url))
mirror_netloc = mirror_url
else:
mirror_netloc = mirror_url_tokens.netloc

View file

@ -276,9 +276,8 @@ class SaferHTTPResponse(httplib.HTTPResponse):
def __init__(self, sock, debuglevel=0, strict=0, method=None,
buffering=False):
httplib.HTTPResponse.__init__(self, sock, debuglevel=debuglevel,
strict=strict, method=method,
buffering=buffering)
httplib.HTTPResponse.__init__(self, sock, debuglevel,
strict, method)
# Delete the previous socket file-like object...
del self.fp

View file

@ -50,7 +50,8 @@
from Crypto.Hash import SHA384
from Crypto.Hash import SHA512
_supported_libraries.append('pycrypto')
except ImportError:
except ImportError: # pragma: no cover
logger.debug('Pycrypto hash algorithms could not be imported. '
'Supported libraries: '+str(_SUPPORTED_LIB_LIST))
@ -61,13 +62,14 @@
try:
import hashlib
_supported_libraries.append('hashlib')
except ImportError:
except ImportError: # pragma: no cover
logger.debug('Hashlib could not be imported. '
'Supported libraries: '+str(_SUPPORTED_LIB_LIST))
pass
# Were we able to import any hash libraries?
if not _supported_libraries:
if not _supported_libraries: # pragma: no cover
# This is fatal, we'll have no way of generating hashes.
raise tuf.Error('Unable to import a hash library from the '
'following supported list: '+str(_SUPPORTED_LIB_LIST))
@ -132,6 +134,7 @@ def digest(algorithm=_DEFAULT_HASH_ALGORITHM,
if hash_library == 'hashlib' and hash_library in _supported_libraries:
try:
return hashlib.new(algorithm)
except ValueError:
raise tuf.UnsupportedAlgorithmError(algorithm)
@ -295,7 +298,9 @@ def data_to_string(data):
if isinstance(data, str):
return data
elif isinstance(data, unicode):
return data.encode("utf-8")
else:
return str(data)

View file

@ -80,7 +80,7 @@
import Crypto
import tuf.pycrypto_keys
_available_crypto_libraries.append('pycrypto')
except ImportError:
except ImportError: # pragma: no cover
pass
# Import the PyNaCl library, if available. It is recommended this library be
@ -99,7 +99,7 @@
# PyNaCl's 'cffi' dependency may raise an 'IOError' exception when importing
# 'nacl.signing'.
except (ImportError, IOError):
except (ImportError, IOError): # pragma: no cover
pass
# The optimized version of the ed25519 library provided by default is imported
@ -213,7 +213,8 @@ def generate_rsa_key(bits=_DEFAULT_RSA_KEY_BITS):
# tuf.formats.RSAKEYBITS_SCHEMA.check_match().
if _RSA_CRYPTO_LIBRARY == 'pycrypto':
public, private = tuf.pycrypto_keys.generate_rsa_public_and_private(bits)
else:
else: # pragma: no cover
message = 'Invalid crypto library: '+repr(_RSA_CRYPTO_LIBRARY)+'.'
raise tuf.UnsupportedLibraryError(message)
@ -295,7 +296,8 @@ def generate_ed25519_key():
if 'pynacl' in _available_crypto_libraries:
public, private = \
tuf.ed25519_keys.generate_public_and_private()
else:
else: # pragma: no cover
message = 'The required PyNaCl library is unavailable.'
raise tuf.UnsupportedLibraryError(message)
@ -652,7 +654,8 @@ def create_signature(key_dict, data):
if keytype == 'rsa':
if _RSA_CRYPTO_LIBRARY == 'pycrypto':
sig, method = tuf.pycrypto_keys.create_rsa_signature(private, data)
else:
else: # pragma: no cover
message = 'Unsupported "tuf.conf.RSA_CRYPTO_LIBRARY": '+\
repr(_RSA_CRYPTO_LIBRARY)+'.'
raise tuf.UnsupportedLibraryError(message)
@ -663,11 +666,12 @@ def create_signature(key_dict, data):
if 'pynacl' in _available_crypto_libraries:
sig, method = tuf.ed25519_keys.create_signature(public, private, data)
else:
else: # pragma: no cover
message = 'The required PyNaCl library is unavailable.'
raise tuf.UnsupportedLibraryError(message)
else:
# 'tuf.formats.ANYKEY_SCHEMA' should detect invalid key types.
else: # pragma: no cover
raise TypeError('Invalid key type.')
# Build the signature dictionary to be returned.
@ -778,15 +782,16 @@ def verify_signature(key_dict, signature, data):
# otherwise raise an exception.
if keytype == 'rsa':
if _RSA_CRYPTO_LIBRARY == 'pycrypto':
if 'pycrypto' not in _available_crypto_libraries:
if 'pycrypto' not in _available_crypto_libraries: # pragma: no cover
message = 'Metadata downloaded from the remote repository specified'+\
' an RSA signature. Verifying RSA signatures requires PyCrypto.' +\
'\n$ pip install PyCrypto, or pip install tuf[tools].'
raise tuf.UnsupportedLibraryError(message)
else:
valid_signature = tuf.pycrypto_keys.verify_rsa_signature(sig, method,
public, data)
else:
else:
message = 'Unsupported "tuf.conf.RSA_CRYPTO_LIBRARY": '+\
repr(_RSA_CRYPTO_LIBRARY)+'.'
raise tuf.UnsupportedLibraryError(message)
@ -803,7 +808,9 @@ def verify_signature(key_dict, signature, data):
valid_signature = tuf.ed25519_keys.verify_signature(public,
method, sig, data,
use_pynacl=False)
else:
# 'tuf.formats.ANYKEY_SCHEMA' should detect invalid key types.
else: # pragma: no cover
raise TypeError('Unsupported key type.')
return valid_signature
@ -1073,6 +1080,7 @@ def encrypt_key(key_object, password):
if _GENERAL_CRYPTO_LIBRARY == 'pycrypto':
encrypted_key = \
tuf.pycrypto_keys.encrypt_key(key_object, password)
else:
message = 'Invalid crypto library: '+repr(_GENERAL_CRYPTO_LIBRARY)+'.'
raise tuf.UnsupportedLibraryError(message)
@ -1169,6 +1177,7 @@ def decrypt_key(encrypted_key, passphrase):
if _GENERAL_CRYPTO_LIBRARY == 'pycrypto':
key_object = \
tuf.pycrypto_keys.decrypt_key(encrypted_key, passphrase)
else:
message = 'Invalid crypto library: '+repr(_GENERAL_CRYPTO_LIBRARY)+'.'
raise tuf.UnsupportedLibraryError(message)
@ -1250,6 +1259,7 @@ def create_rsa_encrypted_pem(private_key, passphrase):
if _RSA_CRYPTO_LIBRARY == 'pycrypto':
encrypted_pem = \
tuf.pycrypto_keys.create_rsa_encrypted_pem(private_key, passphrase)
else:
message = 'Invalid crypto library: '+repr(_RSA_CRYPTO_LIBRARY)+'.'
raise tuf.UnsupportedLibraryError(message)

View file

@ -406,8 +406,11 @@ def remove_role(rolename):
_check_rolename(rolename)
remove_delegated_roles(rolename)
if rolename in _roledb_dict:
del _roledb_dict[rolename]
# remove_delegated_roles() should have left 'rolename' in the database, and
# 'rolename' was verified to exist by _check_rolename().
# Remove 'rolename'.
del _roledb_dict[rolename]

137
tuf/unittest_toolbox.py Executable file
View file

@ -0,0 +1,137 @@
"""
<Program>
unittest_toolbox.py
<Author>
Konstantin Andrianov.
<Started>
March 26, 2012.
<Copyright>
See LICENSE for licensing information.
<Purpose>
Provides an array of various methods for unit testing. Use it instead of
actual unittest module. This module builds on unittest module.
Specifically, Modified_TestCase is a derived class from unittest.TestCase.
"""
import os
import sys
import shutil
import unittest
import tempfile
import random
import string
class Modified_TestCase(unittest.TestCase):
"""
<Purpose>
Provide additional test-setup methods to make testing
of module's methods-under-test as independent as possible.
If you want to modify setUp()/tearDown() do:
class Your_Test_Class(modified_TestCase):
def setUp():
your setup modification
your setup modification
...
modified_TestCase.setUp(self)
<Methods>
make_temp_directory(self, directory=None):
Creates and returns an absolute path of a temporary directory.
make_temp_file(self, suffix='.txt', directory=None):
Creates and returns an absolute path of an empty temp file.
make_temp_data_file(self, suffix='', directory=None, data = junk_data):
Returns an absolute path of a temp file containing some data.
random_path(self, length = 7):
Generate a 'random' path consisting of n-length strings of random chars.
Static Methods:
--------------
Following methods are static because they technically don't operate
on any instances of the class, what they do is: they modify class variables
(dictionaries) that are shared among all instances of the class. So
it is possible to call them without instantiating the class.
random_string(length=7):
Generate a 'length' long string of random characters.
"""
def setUp(self):
self._cleanup = []
def tearDown(self):
for cleanup_function in self._cleanup:
# Perform clean up by executing clean-up functions.
try:
# OSError will occur if the directory was already removed.
cleanup_function()
except OSError:
pass
def make_temp_directory(self, directory=None):
"""Creates and returns an absolute path of a directory."""
prefix = self.__class__.__name__+'_'
temp_directory = tempfile.mkdtemp(prefix=prefix, dir=directory)
def _destroy_temp_directory():
shutil.rmtree(temp_directory)
self._cleanup.append(_destroy_temp_directory)
return temp_directory
def make_temp_file(self, suffix='.txt', directory=None):
"""Creates and returns an absolute path of an empty file."""
prefix='tmp_file_'+self.__class__.__name__+'_'
temp_file = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=directory)
def _destroy_temp_file():
os.unlink(temp_file[1])
self._cleanup.append(_destroy_temp_file)
return temp_file[1]
def make_temp_data_file(self, suffix='', directory=None, data = 'junk data'):
"""Returns an absolute path of a temp file containing data."""
temp_file_path = self.make_temp_file(suffix=suffix, directory=directory)
temp_file = open(temp_file_path, 'wb')
temp_file.write(data)
temp_file.close()
return temp_file_path
def random_path(self, length = 7):
"""Generate a 'random' path consisting of random n-length strings."""
rand_path = '/'+self.random_string(length)
for i in range(2):
rand_path = os.path.join(rand_path, self.random_string(length))
return rand_path
@staticmethod
def random_string(length=15):
"""Generate a random string of specified length."""
rand_str = ''
for letter in range(length):
rand_str += random.choice('abcdefABCDEF'+string.digits)
return rand_str