Merge branch 'zanefisher-fix_tests' into develop

This commit is contained in:
vladdd 2013-08-30 14:59:45 -04:00
commit 471aedb5a1
6 changed files with 223 additions and 190 deletions

View file

@ -11,28 +11,45 @@
<Started>
January 26, 2013
August 2013. Modified previous behavior that explicitly imported individual
August 2013.
Modified previous behavior that explicitly imported individual
unit tests. -Zane Fisher
<Copyright>
See LICENSE for licensing information.
<Purpose>
Run all the unit tests from every .py file beginning with "test_" in 'tuf/tests'.
Run all the unit tests from every .py file beginning with "test_" in
'tuf/tests'. Use --random to run the tests in random order.
"""
import sys
import unittest
import glob
import tuf.keydb as keydb
import tuf.repo.keystore as keystore
import tuf.roledb as roledb
import random
# Generate a list of pathnames that match a pattern (i.e., that begin with
# 'test_' and end with '.py'. A shell-style wildcard is used with glob() to
# match desired filenames. All the tests matching the pattern will be loaded
# and run in a test suite.
tests_list = glob.glob('test_*.py')
# Remove '.py' from each filename.
tests_list = [test[:-3] for test in tests_list]
# Remove '.py' from each filename to allow loadTestsFromNames() (called below)
# to properly load the file as a module.
tests_without_extension = []
for test in tests_list:
test = test[:-3]
tests_without_extension.append(test)
suite = unittest.TestLoader().loadTestsFromNames(tests_list)
# Provide command-line option to randomize the order in which the tests run.
# Randomization might catch errors with unit tests that do not properly clean
# up or restore monkey-patched modules.
if '--random' in sys.argv:
random.shuffle(tests_without_extension)
suite = unittest.TestLoader().loadTestsFromNames(tests_without_extension)
unittest.TextTestRunner(verbosity=2).run(suite)

View file

@ -31,17 +31,21 @@
import tuf.tests.unittest_toolbox as unittest_toolbox
# Populating 'rsa_keystore' and 'rsa_passwords' dictionaries.
# We will need them in creating keystore directory.
unittest_toolbox.Modified_TestCase.bind_keys_to_roles()
# Role:keyids dictionary.
role_keyids = {}
for role in unittest_toolbox.Modified_TestCase.semi_roledict.keys():
role_keyids[role] = unittest_toolbox.Modified_TestCase.semi_roledict[role]['keyids']
def _init_role_keyids():
# Populating 'rsa_keystore' and 'rsa_passwords' dictionaries.
# We will need them in creating the keystore directory and metadata files.
unittest_toolbox.Modified_TestCase.bind_keys_to_roles()
global role_keyids
for role in unittest_toolbox.Modified_TestCase.semi_roledict.keys():
role_keyids[role] = unittest_toolbox.Modified_TestCase.semi_roledict[role]['keyids']
@ -275,6 +279,9 @@ def create_repositories():
"""
# Ensure the keyids for the required roles are loaded. Role keyids are
# needed for the creation of metadata file and the keystore.
_init_role_keyids()
# Make a temporary general repository directory.
repository_dir = tempfile.mkdtemp()

View file

@ -1,189 +1,171 @@
#!/usr/bin/env python
"""
<Program Name>
test_extraneous_dependencies_attack.py
<Author>
Konstantin Andrianov
Zane Fisher
<Started>
February 19, 2012
August 19, 2013
<Copyright>
See LICENSE for licensing information.
<Purpose>
Simulate an extraneous dependencies attack.
Simulate an extraneous dependencies attack. The client attempts to download
a file, which lists all the target dependencies, with one legitimate
dependency, and one extraneous dependency. A client should not download a
target dependency even if it is found on the repository. Valid targets are
listed and verified by TUF metadata, such as 'targets.txt'.
In an extraneous dependencies attack, attacker is able to cause clients to
download software dependencies that are not the intended dependencies.
Target dependencies listed in the file are comma-separated.
Note: The interposition provided by 'tuf.interposition' is used to intercept
all calls made by urllib/urillib2 to certain hostnames specified in
the interposition configuration file. Look up interposition.py for more
information and illustration of a sample contents of the interposition
configuration file. Interposition was meant to make TUF integration with an
existing software updater an easy process. This allows for more flexibility
to the existing software updater. However, if you are planning to solely use
TUF there should be no need for interposition, all necessary calls will be
generated from within TUF.
There is no difference between 'updates' and 'target' files.
"""
import os
import sys
import shutil
import urllib
import tempfile
import time
import tuf
import tuf.formats
import tuf.tests.system_tests.util_test_tools
import tuf.repo.keystore
import tuf.repo.signerlib as signerlib
import tuf.repo.signercli as signercli
from tuf.interposition import urllib_tuf
version = 1
import tuf.interposition
import util_test_tools
class ExtraneousDependenciesAttackAlert(Exception):
class ExtraneousDependencyAlert(Exception):
pass
def test_extraneous_dependencies_attack():
# Interpret the contents of the file it downloads as a list of dependent
# files from the same repository.
def _download(url, filename, directory, TUF=False):
destination = os.path.join(directory, filename)
if TUF:
tuf.interposition.urllib_tuf.urlretrieve(url, destination)
else:
urllib.urlretrieve(url, destination)
if util_test_tools.read_file_content(destination) != '':
required_files = util_test_tools.read_file_content(destination).split(',')
for required_filename in required_files:
required_file_url = os.path.dirname(url)+os.sep+required_filename
_download(required_file_url, required_filename, directory, TUF)
def test_extraneous_dependency_attack(TUF=False):
"""
<Purpose>
Illustrate arbitrary package attack vulnerability.
<Arguments>
TUF:
If set to 'False' all directories that start with 'tuf_' are ignored,
indicating that tuf is not implemented.
"""
ERROR_MSG = 'Extraneous Dependency Attack was Successful!\n'
ERROR_MSG = '\tExtraneous Dependencies Attack Succeeded!\n\n'
try:
# Setup.
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=True)
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF)
reg_repo = os.path.join(root_repo, 'reg_repo')
tuf_repo = os.path.join(root_repo, 'tuf_repo')
keystore_dir = os.path.join(tuf_repo, 'keystore')
metadata_dir = os.path.join(tuf_repo, 'metadata')
downloads_dir = os.path.join(root_repo, 'downloads')
downloads = os.path.join(root_repo, 'downloads')
targets_dir = os.path.join(tuf_repo, 'targets')
# 'roles' holds information about delegated roles.
roles = {'role1':{'password':['pass1']},
'role2':{'password':['pass2']}}
# Add files to 'repo' directory: {root_repo}.
good_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo, '')
good_dependency_basename = os.path.basename(good_dependency_filepath)
# Add files to 'reg_repo' directory: {root_repo}
role1_path = tempfile.mkdtemp(dir=reg_repo)
roles['role1']['filepath'] = \
util_test_tools.add_file_to_repository(role1_path, 'Test A')
bad_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo, '')
bad_dependency_basename = os.path.basename(bad_dependency_filepath)
role2_path = tempfile.mkdtemp(dir=reg_repo)
roles['role2']['filepath'] = \
util_test_tools.add_file_to_repository(role2_path, 'Test B')
# The dependent file lists the good dependency.
dependent_filepath = util_test_tools.add_file_to_repository(reg_repo,
good_dependency_basename)
dependent_basename = os.path.basename(dependent_filepath)
# Update TUF repository.
util_test_tools.make_targets_meta(root_repo)
util_test_tools.make_release_meta(root_repo)
util_test_tools.make_timestamp_meta(root_repo)
url_to_repo = url+'reg_repo/'+dependent_basename
modified_dependency_list = good_dependency_basename+','+\
bad_dependency_basename
def _make_delegation(rolename):
expiration_date = tuf.formats.format_time(time.time()+86400)
expiration_date = expiration_date[0:expiration_date.rfind(' UTC')]
# Indicate which file client downloads.
rel_filepath = os.path.relpath(roles[rolename]['filepath'], reg_repo)
roles[rolename]['target_path'] = os.path.join(targets_dir, rel_filepath)
rolepath, file_basename = os.path.split(roles[rolename]['filepath'])
junk, role_relpath = os.path.split(rolepath)
roles[rolename]['targets_dir'] = os.path.join(targets_dir, role_relpath)
roles[rolename]['metadata_dir'] = os.path.join(metadata_dir, 'targets')
# Create a key to sign a new delegated role.
password = roles[rolename]['password'][0]
key = signerlib.generate_and_save_rsa_key(keystore_dir, password)
roles[rolename]['keyid'] = [key['keyid']]
roles[rolename]['dest_path'] = os.path.join(downloads_dir, file_basename)
# Create delegation one.
util_test_tools.create_delegation(tuf_repo,
roles[rolename]['targets_dir'],
roles[rolename]['keyid'], password,
'targets', rolename, expiration_date)
# Update TUF repository.
# util_test_tools.make_targets_meta(root_repo)
util_test_tools.make_release_meta(root_repo)
util_test_tools.make_timestamp_meta(root_repo)
if TUF:
# Update TUF metadata before attacker modifies anything.
util_test_tools.tuf_refresh_repo(root_repo, keyids)
# Modify the url. Remember that the interposition will intercept
# urls that have 'localhost:9999' hostname, which was specified in
# the json interposition configuration file. Look for 'hostname'
# in 'util_test_tools.py'. Further, the 'file_basename' is the target
# path relative to 'targets_dir'.
roles[rolename]['url'] = 'http://localhost:9999/'+rel_filepath
url_to_repo = 'http://localhost:9999/'+dependent_basename
# Perform a client download.
urllib_tuf.urlretrieve(roles[rolename]['url'],
roles[rolename]['dest_path'])
# Attacker adds the dependency in the targets repository.
target = os.path.join(targets_dir, dependent_basename)
util_test_tools.modify_file_at_repository(target,
modified_dependency_list)
# Attacker adds the dependency in the regular repository.
util_test_tools.modify_file_at_repository(dependent_filepath,
modified_dependency_list)
# End of Setup.
_make_delegation('role1')
_make_delegation('role2')
# The attacks.
def _write_rogue_metadata():
global version
version = version+1
expiration_date = tuf.formats.format_time(time.time()+86400)
# Load the keystore before rebuilding the metadata.
tuf.repo.keystore.load_keystore_from_keyfiles(keystore_dir,
roles['role1']['keyid'],
roles['role1']['password'])
# Rebuild the delegation role metadata.
signerlib.build_delegated_role_file(roles['role2']['targets_dir'],
roles['role1']['keyid'], metadata_dir,
roles['role1']['metadata_dir'],
'role1.txt', version, expiration_date)
# Update release and timestamp metadata.
util_test_tools.make_release_meta(root_repo)
util_test_tools.make_timestamp_meta(root_repo)
# Modify a target that was delegated to 'role2'.
util_test_tools.modify_file_at_repository(roles['role2']['target_path'],
'Test NOT B')
# Update rogue delegatee metadata.
_write_rogue_metadata()
# Perform another client download.
try:
urllib_tuf.urlretrieve(roles['role2']['url'], roles['role2']['dest_path'])
except tuf.MetadataNotAvailableError, e:
# Client downloads (tries to download) the file.
_download(url=url_to_repo, filename=dependent_basename,
directory=downloads, TUF=TUF)
except tuf.DownloadError:
# If tuf.DownloadError is raised, this means that TUF has prevented
# the download of an unrecognized file. Enable the logging to see,
# what actually happened.
pass
else:
raise ExtraneousDependenciesAttackAlert(ERROR_MSG)
# Add a target file to the directory delegated to 'role2' but not 'role1'.
util_test_tools.add_file_to_repository(roles['role2']['targets_dir'], 'AAAA')
# Update rogue delegatee metadata.
_write_rogue_metadata()
# Perform another client download.
try:
urllib_tuf.urlretrieve(roles['role2']['url'], roles['role2']['dest_path'])
except tuf.MetadataNotAvailableError, e:
pass
else:
raise ExtraneousDependenciesAttackAlert(ERROR_MSG)
# Check if the legitimate dependency was downloaded.
if not(os.path.exists(os.path.join(downloads, good_dependency_basename))):
raise tuf.DownloadError
# Check if the extraneous dependency was downloaded.
if os.path.exists(os.path.join(downloads, bad_dependency_basename)):
raise ExtraneousDependencyAlert(ERROR_MSG)
finally:
server_proc.kill()
#util_test_tools.cleanup(root_repo, server_proc)
util_test_tools.cleanup(root_repo, server_proc)
print 'Attempting extraneous dependency attack without TUF:'
try:
test_extraneous_dependencies_attack()
except ExtraneousDependenciesAttackAlert, error:
print 'error'
test_extraneous_dependency_attack(TUF=False)
except ExtraneousDependencyAlert, error:
print error
print 'Attempting extraneous dependency attack with TUF:'
try:
test_extraneous_dependency_attack(TUF=True)
except ExtraneousDependencyAlert, error:
print error

View file

@ -56,9 +56,6 @@ class guarantees the order of unit tests. So that, 'test_something_A'
logger = logging.getLogger('tuf.test_signercli')
# Populating 'rsa_keystore' and 'rsa_passwords' dictionaries.
# We will need them when creating keystore directories.
unittest_toolbox.Modified_TestCase.bind_keys_to_roles()
class TestSignercli(unittest_toolbox.Modified_TestCase):
@ -1556,5 +1553,17 @@ def _mock_get_keyids(junk):
signercli._get_metadata_directory = original_get_metadata_directory
def setUpModule():
# setUpModule() is called before any test cases run.
# Populating 'rsa_keystore' and 'rsa_passwords' dictionaries.
# We will need them when creating keystore directories.
unittest_toolbox.Modified_TestCase.bind_keys_to_roles()
def tearDownModule():
# tearDownModule() is called after all the test cases have run.
unittest_toolbox.Modified_TestCase.clear_toolbox()
if __name__ == '__main__':
unittest.main()

View file

@ -68,8 +68,6 @@
# 'unittest_toolbox.Modified_TestCase' is too long, I'll set it to 'unit_tbox'.
unit_tbox = tuf.tests.unittest_toolbox.Modified_TestCase
# Generate rsa keys and roles dictionary dictionaries.
unit_tbox.bind_keys_to_roles()
class TestSignerlib(unit_tbox):
@ -973,5 +971,16 @@ def _get_signed_role_info(self, role, directory=None):
return signed_meta, role_info
def setUpModule():
# setUpModule() is called before any test cases run.
# Generate rsa keys and roles dictionary dictionaries.
unit_tbox.bind_keys_to_roles()
def tearDownModule():
# tearDownModule() is called after all the test cases have run.
unit_tbox.clear_toolbox()
tuf.repo.keystore.clear_keystore()
if __name__ == '__main__':
unittest.main()

View file

@ -56,10 +56,6 @@ class guarantees the order of unit tests. So that, 'test_something_A'
logger = logging.getLogger('tuf.test_updater')
# References to roledb and keydb dictionaries (improve readability).
roledb = tuf.roledb
keydb = tuf.keydb
class TestUpdater_init_(unittest_toolbox.Modified_TestCase):
@ -105,39 +101,46 @@ def test__init__exceptions(self):
os.remove(role_filepath)
updater.Updater('Repo_Name', self.mirrors)
# Remove all created repositories.
# Remove all created repositories and roles.
setup.remove_all_repositories(repositories['main_repository'])
tuf.roledb.clear_roledb()
class TestUpdater(unittest_toolbox.Modified_TestCase):
# Create repositories. 'repositories' is a tuple that looks like this:
# (repository_dir, client_repository_dir, server_repository_dir), see
# repository_setup.py odule.
repositories = setup.create_repositories()
# Save references to repository directories and metadata.
# Server side references.
server_repo_dir = repositories['server_repository']
server_meta_dir = os.path.join(server_repo_dir, 'metadata')
root_filepath = os.path.join(server_meta_dir, 'root.txt')
timestamp_filepath = os.path.join(server_meta_dir, 'timestamp.txt')
targets_filepath = os.path.join(server_meta_dir, 'targets.txt')
release_filepath = os.path.join(server_meta_dir, 'release.txt')
@classmethod
def setUpClass(cls):
# setUpClass() is called before tests in an individual class run.
# Create repositories. 'repositories' is a tuple that looks like this:
# (repository_dir, client_repository_dir, server_repository_dir), see
# 'repository_setup.py' module.
cls.repositories = setup.create_repositories()
# References to delegated metadata paths and directories.
delegated_dir1 = os.path.join(server_meta_dir, 'targets')
delegated_filepath1 = os.path.join(delegated_dir1, 'delegated_role1.txt')
delegated_dir2 = os.path.join(delegated_dir1, 'delegated_role1')
delegated_filepath2 = os.path.join(delegated_dir2, 'delegated_role2.txt')
targets_dir = os.path.join(server_repo_dir, 'targets')
# Save references to repository directories and metadata.
# Server side references.
cls.server_repo_dir = cls.repositories['server_repository']
cls.server_meta_dir = os.path.join(cls.server_repo_dir, 'metadata')
cls.root_filepath = os.path.join(cls.server_meta_dir, 'root.txt')
cls.timestamp_filepath = os.path.join(cls.server_meta_dir, 'timestamp.txt')
cls.targets_filepath = os.path.join(cls.server_meta_dir, 'targets.txt')
cls.release_filepath = os.path.join(cls.server_meta_dir, 'release.txt')
# Client side references.
client_repo_dir = repositories['client_repository']
client_meta_dir = os.path.join(client_repo_dir, 'metadata')
client_current_dir = os.path.join(client_meta_dir, 'current')
client_previous_dir = os.path.join(client_meta_dir, 'previous')
# References to delegated metadata paths and directories.
cls.delegated_dir1 = os.path.join(cls.server_meta_dir, 'targets')
cls.delegated_filepath1 = os.path.join(cls.delegated_dir1,
'delegated_role1.txt')
cls.delegated_dir2 = os.path.join(cls.delegated_dir1, 'delegated_role1')
cls.delegated_filepath2 = os.path.join(cls.delegated_dir2,
'delegated_role2.txt')
cls.targets_dir = os.path.join(cls.server_repo_dir, 'targets')
# Client side references.
cls.client_repo_dir = cls.repositories['client_repository']
cls.client_meta_dir = os.path.join(cls.client_repo_dir, 'metadata')
cls.client_current_dir = os.path.join(cls.client_meta_dir, 'current')
cls.client_previous_dir = os.path.join(cls.client_meta_dir, 'previous')
@ -176,8 +179,8 @@ def tearDown(self):
unittest_toolbox.Modified_TestCase.tearDown(self)
# Clear roledb and keydb dictionaries.
roledb.clear_roledb()
keydb.clear_keydb()
tuf.roledb.clear_roledb()
tuf.keydb.clear_keydb()
@ -379,14 +382,14 @@ def test_1__rebuild_key_and_role_db(self):
# are populated. 'top_level_role_info' is a unittest_toolbox's dict
# that contains top level role information it corresponds to a
# ROLEDICT_SCHEMA where roles are keys and role information their values.
self.assertEqual(roledb._roledb_dict, self.top_level_role_info)
self.assertEqual(len(keydb._keydb_dict), 4)
self.assertEqual(tuf.roledb._roledb_dict, self.top_level_role_info)
self.assertEqual(len(tuf.keydb._keydb_dict), 4)
# Verify that keydb dictionary was updated.
for role in self.role_list:
keyids = self.top_level_role_info[role]['keyids']
for keyid in keyids:
self.assertTrue(keyid in keydb._keydb_dict)
self.assertTrue(keyid in tuf.keydb._keydb_dict)
@ -408,22 +411,22 @@ def test_2__import_delegations(self):
# Verify that there was no change in roledb and keydb dictionaries
# by checking the number of elements in the dictionaries.
self.assertEqual(len(roledb._roledb_dict), 5)
self.assertEqual(len(keydb._keydb_dict), 5)
self.assertEqual(len(tuf.roledb._roledb_dict), 5)
self.assertEqual(len(tuf.keydb._keydb_dict), 5)
# Test: normal case, first level delegation.
self.Repository._import_delegations('targets/delegated_role1')
self.assertEqual(len(roledb._roledb_dict), 6)
self.assertEqual(len(keydb._keydb_dict), 6)
self.assertEqual(len(tuf.roledb._roledb_dict), 6)
self.assertEqual(len(tuf.keydb._keydb_dict), 6)
# Verify that roledb dictionary was updated.
self.assertTrue('targets/delegated_role1' in roledb._roledb_dict)
self.assertTrue('targets/delegated_role1' in tuf.roledb._roledb_dict)
# Verify that keydb dictionary was updated.
keyids = self.semi_roledict['targets/delegated_role1']['keyids']
for keyid in keyids:
self.assertTrue(keyid in keydb._keydb_dict)
self.assertTrue(keyid in tuf.keydb._keydb_dict)
@ -1149,5 +1152,11 @@ def test_8_remove_obsolete_targets(self):
tuf.download.download_url_to_tempfileobj = original_download
def tearDownModule():
# tearDownModule() is called after all the tests have run.
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
setup.remove_all_repositories(TestUpdater.repositories['main_repository'])
unittest_toolbox.Modified_TestCase.clear_toolbox()
if __name__ == '__main__':
unittest.main()