Review Zane's unit test fixes and resolve merge conflicts

This commit is contained in:
vladdd 2013-08-30 14:56:33 -04:00
commit c99a8d1a0c
6 changed files with 193 additions and 185 deletions

View file

@ -19,20 +19,23 @@
<Purpose>
Run all the unit tests from every .py file beginning with "test_" in 'tuf/tests'.
Use --random to run the tests in random order.
"""
import sys
import unittest
import glob
import tuf.keydb as keydb
import tuf.repo.keystore as keystore
import tuf.roledb as roledb
import random
tests_list = glob.glob('test_*.py')
# Remove '.py' from each filename.
tests_list = [test[:-3] for test in tests_list]
if '--random' in sys.argv:
random.shuffle(tests_list)
suite = unittest.TestLoader().loadTestsFromNames(tests_list)
unittest.TextTestRunner(verbosity=2).run(suite)

View file

@ -31,17 +31,21 @@
import tuf.tests.unittest_toolbox as unittest_toolbox
# Populating 'rsa_keystore' and 'rsa_passwords' dictionaries.
# We will need them in creating keystore directory.
unittest_toolbox.Modified_TestCase.bind_keys_to_roles()
# Role:keyids dictionary.
role_keyids = {}
for role in unittest_toolbox.Modified_TestCase.semi_roledict.keys():
role_keyids[role] = unittest_toolbox.Modified_TestCase.semi_roledict[role]['keyids']
def _init_role_keyids():
# Populating 'rsa_keystore' and 'rsa_passwords' dictionaries.
# We will need them in creating keystore directory.
unittest_toolbox.Modified_TestCase.bind_keys_to_roles()
global role_keyids
for role in unittest_toolbox.Modified_TestCase.semi_roledict.keys():
role_keyids[role] = unittest_toolbox.Modified_TestCase.semi_roledict[role]['keyids']
@ -275,6 +279,7 @@ def create_repositories():
"""
_init_role_keyids()
# Make a temporary general repository directory.
repository_dir = tempfile.mkdtemp()

View file

@ -1,189 +1,171 @@
#!/usr/bin/env python
"""
<Program Name>
test_extraneous_dependencies_attack.py
<Author>
Konstantin Andrianov
Zane Fisher
<Started>
February 19, 2012
August 19, 2013
<Copyright>
See LICENSE for licensing information.
<Purpose>
Simulate an extraneous dependencies attack.
Simulate an extraneous dependencies attack. The client attempts to download
a file, which lists all the target dependencies, with one legitimate
dependency, and one extraneous dependency. A client should not download a
target dependency even if it is found on the repository. Valid targets are
listed and verified by TUF metadata, such as 'targets.txt'.
In an extraneous dependencies attack, attacker is able to cause clients to
download software dependencies that are not the intended dependencies.
Target dependencies listed in the file are comma-separated.
Note: The interposition provided by 'tuf.interposition' is used to intercept
all calls made by urllib/urillib2 to certain hostnames specified in
the interposition configuration file. Look up interposition.py for more
information and illustration of a sample contents of the interposition
configuration file. Interposition was meant to make TUF integration with an
existing software updater an easy process. This allows for more flexibility
to the existing software updater. However, if you are planning to solely use
TUF there should be no need for interposition, all necessary calls will be
generated from within TUF.
There is no difference between 'updates' and 'target' files.
"""
import os
import sys
import shutil
import urllib
import tempfile
import time
import tuf
import tuf.formats
import tuf.tests.system_tests.util_test_tools
import tuf.repo.keystore
import tuf.repo.signerlib as signerlib
import tuf.repo.signercli as signercli
from tuf.interposition import urllib_tuf
version = 1
import tuf.interposition
import util_test_tools
class ExtraneousDependenciesAttackAlert(Exception):
class ExtraneousDependencyAlert(Exception):
pass
def test_extraneous_dependencies_attack():
# Interpret the contents of the file it downloads as a list of dependent
# files from the same repository.
def _download(url, filename, directory, TUF=False):
destination = os.path.join(directory, filename)
if TUF:
tuf.interposition.urllib_tuf.urlretrieve(url, destination)
else:
urllib.urlretrieve(url, destination)
if util_test_tools.read_file_content(destination) != '':
required_files = util_test_tools.read_file_content(destination).split(',')
for required_filename in required_files:
required_file_url = os.path.dirname(url)+os.sep+required_filename
_download(required_file_url, required_filename, directory, TUF)
def test_extraneous_dependency_attack(TUF=False):
"""
<Purpose>
Illustrate arbitrary package attack vulnerability.
<Arguments>
TUF:
If set to 'False' all directories that start with 'tuf_' are ignored,
indicating that tuf is not implemented.
"""
ERROR_MSG = 'Extraneous Dependency Attack was Successful!\n'
ERROR_MSG = '\tExtraneous Dependencies Attack Succeeded!\n\n'
try:
# Setup.
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=True)
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF)
reg_repo = os.path.join(root_repo, 'reg_repo')
tuf_repo = os.path.join(root_repo, 'tuf_repo')
keystore_dir = os.path.join(tuf_repo, 'keystore')
metadata_dir = os.path.join(tuf_repo, 'metadata')
downloads_dir = os.path.join(root_repo, 'downloads')
downloads = os.path.join(root_repo, 'downloads')
targets_dir = os.path.join(tuf_repo, 'targets')
# 'roles' holds information about delegated roles.
roles = {'role1':{'password':['pass1']},
'role2':{'password':['pass2']}}
# Add files to 'repo' directory: {root_repo}.
good_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo, '')
good_dependency_basename = os.path.basename(good_dependency_filepath)
# Add files to 'reg_repo' directory: {root_repo}
role1_path = tempfile.mkdtemp(dir=reg_repo)
roles['role1']['filepath'] = \
util_test_tools.add_file_to_repository(role1_path, 'Test A')
bad_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo, '')
bad_dependency_basename = os.path.basename(bad_dependency_filepath)
role2_path = tempfile.mkdtemp(dir=reg_repo)
roles['role2']['filepath'] = \
util_test_tools.add_file_to_repository(role2_path, 'Test B')
# The dependent file lists the good dependency.
dependent_filepath = util_test_tools.add_file_to_repository(reg_repo,
good_dependency_basename)
dependent_basename = os.path.basename(dependent_filepath)
# Update TUF repository.
util_test_tools.make_targets_meta(root_repo)
util_test_tools.make_release_meta(root_repo)
util_test_tools.make_timestamp_meta(root_repo)
url_to_repo = url+'reg_repo/'+dependent_basename
modified_dependency_list = good_dependency_basename+','+\
bad_dependency_basename
def _make_delegation(rolename):
expiration_date = tuf.formats.format_time(time.time()+86400)
expiration_date = expiration_date[0:expiration_date.rfind(' UTC')]
# Indicate which file client downloads.
rel_filepath = os.path.relpath(roles[rolename]['filepath'], reg_repo)
roles[rolename]['target_path'] = os.path.join(targets_dir, rel_filepath)
rolepath, file_basename = os.path.split(roles[rolename]['filepath'])
junk, role_relpath = os.path.split(rolepath)
roles[rolename]['targets_dir'] = os.path.join(targets_dir, role_relpath)
roles[rolename]['metadata_dir'] = os.path.join(metadata_dir, 'targets')
# Create a key to sign a new delegated role.
password = roles[rolename]['password'][0]
key = signerlib.generate_and_save_rsa_key(keystore_dir, password)
roles[rolename]['keyid'] = [key['keyid']]
roles[rolename]['dest_path'] = os.path.join(downloads_dir, file_basename)
# Create delegation one.
util_test_tools.create_delegation(tuf_repo,
roles[rolename]['targets_dir'],
roles[rolename]['keyid'], password,
'targets', rolename, expiration_date)
# Update TUF repository.
# util_test_tools.make_targets_meta(root_repo)
util_test_tools.make_release_meta(root_repo)
util_test_tools.make_timestamp_meta(root_repo)
if TUF:
# Update TUF metadata before attacker modifies anything.
util_test_tools.tuf_refresh_repo(root_repo, keyids)
# Modify the url. Remember that the interposition will intercept
# urls that have 'localhost:9999' hostname, which was specified in
# the json interposition configuration file. Look for 'hostname'
# in 'util_test_tools.py'. Further, the 'file_basename' is the target
# path relative to 'targets_dir'.
roles[rolename]['url'] = 'http://localhost:9999/'+rel_filepath
url_to_repo = 'http://localhost:9999/'+dependent_basename
# Perform a client download.
urllib_tuf.urlretrieve(roles[rolename]['url'],
roles[rolename]['dest_path'])
# Attacker adds the dependency in the targets repository.
target = os.path.join(targets_dir, dependent_basename)
util_test_tools.modify_file_at_repository(target,
modified_dependency_list)
# Attacker adds the dependency in the regular repository.
util_test_tools.modify_file_at_repository(dependent_filepath,
modified_dependency_list)
# End of Setup.
_make_delegation('role1')
_make_delegation('role2')
# The attacks.
def _write_rogue_metadata():
global version
version = version+1
expiration_date = tuf.formats.format_time(time.time()+86400)
# Load the keystore before rebuilding the metadata.
tuf.repo.keystore.load_keystore_from_keyfiles(keystore_dir,
roles['role1']['keyid'],
roles['role1']['password'])
# Rebuild the delegation role metadata.
signerlib.build_delegated_role_file(roles['role2']['targets_dir'],
roles['role1']['keyid'], metadata_dir,
roles['role1']['metadata_dir'],
'role1.txt', version, expiration_date)
# Update release and timestamp metadata.
util_test_tools.make_release_meta(root_repo)
util_test_tools.make_timestamp_meta(root_repo)
# Modify a target that was delegated to 'role2'.
util_test_tools.modify_file_at_repository(roles['role2']['target_path'],
'Test NOT B')
# Update rogue delegatee metadata.
_write_rogue_metadata()
# Perform another client download.
try:
urllib_tuf.urlretrieve(roles['role2']['url'], roles['role2']['dest_path'])
except tuf.MetadataNotAvailableError, e:
# Client downloads (tries to download) the file.
_download(url=url_to_repo, filename=dependent_basename,
directory=downloads, TUF=TUF)
except tuf.DownloadError:
# If tuf.DownloadError is raised, this means that TUF has prevented
# the download of an unrecognized file. Enable the logging to see,
# what actually happened.
pass
else:
raise ExtraneousDependenciesAttackAlert(ERROR_MSG)
# Add a target file to the directory delegated to 'role2' but not 'role1'.
util_test_tools.add_file_to_repository(roles['role2']['targets_dir'], 'AAAA')
# Update rogue delegatee metadata.
_write_rogue_metadata()
# Perform another client download.
try:
urllib_tuf.urlretrieve(roles['role2']['url'], roles['role2']['dest_path'])
except tuf.MetadataNotAvailableError, e:
pass
else:
raise ExtraneousDependenciesAttackAlert(ERROR_MSG)
# Check if the legitimate dependency was downloaded.
if not(os.path.exists(os.path.join(downloads, good_dependency_basename))):
raise tuf.DownloadError
# Check if the extraneous dependency was downloaded.
if os.path.exists(os.path.join(downloads, bad_dependency_basename)):
raise ExtraneousDependencyAlert(ERROR_MSG)
finally:
server_proc.kill()
#util_test_tools.cleanup(root_repo, server_proc)
util_test_tools.cleanup(root_repo, server_proc)
print 'Attempting extraneous dependency attack without TUF:'
try:
test_extraneous_dependencies_attack()
except ExtraneousDependenciesAttackAlert, error:
print 'error'
test_extraneous_dependency_attack(TUF=False)
except ExtraneousDependencyAlert, error:
print error
print 'Attempting extraneous dependency attack with TUF:'
try:
test_extraneous_dependency_attack(TUF=True)
except ExtraneousDependencyAlert, error:
print error

View file

@ -56,9 +56,6 @@ class guarantees the order of unit tests. So that, 'test_something_A'
logger = logging.getLogger('tuf.test_signercli')
# Populating 'rsa_keystore' and 'rsa_passwords' dictionaries.
# We will need them when creating keystore directories.
unittest_toolbox.Modified_TestCase.bind_keys_to_roles()
class TestSignercli(unittest_toolbox.Modified_TestCase):
@ -1556,5 +1553,15 @@ def _mock_get_keyids(junk):
signercli._get_metadata_directory = original_get_metadata_directory
def setUpModule():
# Populating 'rsa_keystore' and 'rsa_passwords' dictionaries.
# We will need them when creating keystore directories.
unittest_toolbox.Modified_TestCase.bind_keys_to_roles()
def tearDownModule():
unittest_toolbox.Modified_TestCase.clear_toolbox()
if __name__ == '__main__':
unittest.main()

View file

@ -68,8 +68,6 @@
# 'unittest_toolbox.Modified_TestCase' is too long, I'll set it to 'unit_tbox'.
unit_tbox = tuf.tests.unittest_toolbox.Modified_TestCase
# Generate rsa keys and roles dictionary dictionaries.
unit_tbox.bind_keys_to_roles()
class TestSignerlib(unit_tbox):
@ -973,5 +971,14 @@ def _get_signed_role_info(self, role, directory=None):
return signed_meta, role_info
def setUpModule():
# Generate rsa keys and roles dictionary dictionaries.
unit_tbox.bind_keys_to_roles()
def tearDownModule():
unit_tbox.clear_toolbox()
tuf.repo.keystore.clear_keystore()
if __name__ == '__main__':
unittest.main()

View file

@ -56,10 +56,6 @@ class guarantees the order of unit tests. So that, 'test_something_A'
logger = logging.getLogger('tuf.test_updater')
# References to roledb and keydb dictionaries (improve readability).
roledb = tuf.roledb
keydb = tuf.keydb
class TestUpdater_init_(unittest_toolbox.Modified_TestCase):
@ -105,39 +101,43 @@ def test__init__exceptions(self):
os.remove(role_filepath)
updater.Updater('Repo_Name', self.mirrors)
# Remove all created repositories.
# Remove all created repositories and roles.
setup.remove_all_repositories(repositories['main_repository'])
tuf.roledb.clear_roledb()
class TestUpdater(unittest_toolbox.Modified_TestCase):
# Create repositories. 'repositories' is a tuple that looks like this:
# (repository_dir, client_repository_dir, server_repository_dir), see
# repository_setup.py odule.
repositories = setup.create_repositories()
# Save references to repository directories and metadata.
# Server side references.
server_repo_dir = repositories['server_repository']
server_meta_dir = os.path.join(server_repo_dir, 'metadata')
root_filepath = os.path.join(server_meta_dir, 'root.txt')
timestamp_filepath = os.path.join(server_meta_dir, 'timestamp.txt')
targets_filepath = os.path.join(server_meta_dir, 'targets.txt')
release_filepath = os.path.join(server_meta_dir, 'release.txt')
@classmethod
def setUpClass(cls):
# Create repositories. 'repositories' is a tuple that looks like this:
# (repository_dir, client_repository_dir, server_repository_dir), see
# repository_setup.py odule.
cls.repositories = setup.create_repositories()
# References to delegated metadata paths and directories.
delegated_dir1 = os.path.join(server_meta_dir, 'targets')
delegated_filepath1 = os.path.join(delegated_dir1, 'delegated_role1.txt')
delegated_dir2 = os.path.join(delegated_dir1, 'delegated_role1')
delegated_filepath2 = os.path.join(delegated_dir2, 'delegated_role2.txt')
targets_dir = os.path.join(server_repo_dir, 'targets')
# Save references to repository directories and metadata.
# Server side references.
cls.server_repo_dir = cls.repositories['server_repository']
cls.server_meta_dir = os.path.join(cls.server_repo_dir, 'metadata')
cls.root_filepath = os.path.join(cls.server_meta_dir, 'root.txt')
cls.timestamp_filepath = os.path.join(cls.server_meta_dir, 'timestamp.txt')
cls.targets_filepath = os.path.join(cls.server_meta_dir, 'targets.txt')
cls.release_filepath = os.path.join(cls.server_meta_dir, 'release.txt')
# Client side references.
client_repo_dir = repositories['client_repository']
client_meta_dir = os.path.join(client_repo_dir, 'metadata')
client_current_dir = os.path.join(client_meta_dir, 'current')
client_previous_dir = os.path.join(client_meta_dir, 'previous')
# References to delegated metadata paths and directories.
cls.delegated_dir1 = os.path.join(cls.server_meta_dir, 'targets')
cls.delegated_filepath1 = os.path.join(cls.delegated_dir1, 'delegated_role1.txt')
cls.delegated_dir2 = os.path.join(cls.delegated_dir1, 'delegated_role1')
cls.delegated_filepath2 = os.path.join(cls.delegated_dir2, 'delegated_role2.txt')
cls.targets_dir = os.path.join(cls.server_repo_dir, 'targets')
# Client side references.
cls.client_repo_dir = cls.repositories['client_repository']
cls.client_meta_dir = os.path.join(cls.client_repo_dir, 'metadata')
cls.client_current_dir = os.path.join(cls.client_meta_dir, 'current')
cls.client_previous_dir = os.path.join(cls.client_meta_dir, 'previous')
@ -176,8 +176,8 @@ def tearDown(self):
unittest_toolbox.Modified_TestCase.tearDown(self)
# Clear roledb and keydb dictionaries.
roledb.clear_roledb()
keydb.clear_keydb()
tuf.roledb.clear_roledb()
tuf.keydb.clear_keydb()
@ -379,14 +379,14 @@ def test_1__rebuild_key_and_role_db(self):
# are populated. 'top_level_role_info' is a unittest_toolbox's dict
# that contains top level role information it corresponds to a
# ROLEDICT_SCHEMA where roles are keys and role information their values.
self.assertEqual(roledb._roledb_dict, self.top_level_role_info)
self.assertEqual(len(keydb._keydb_dict), 4)
self.assertEqual(tuf.roledb._roledb_dict, self.top_level_role_info)
self.assertEqual(len(tuf.keydb._keydb_dict), 4)
# Verify that keydb dictionary was updated.
for role in self.role_list:
keyids = self.top_level_role_info[role]['keyids']
for keyid in keyids:
self.assertTrue(keyid in keydb._keydb_dict)
self.assertTrue(keyid in tuf.keydb._keydb_dict)
@ -408,22 +408,22 @@ def test_2__import_delegations(self):
# Verify that there was no change in roledb and keydb dictionaries
# by checking the number of elements in the dictionaries.
self.assertEqual(len(roledb._roledb_dict), 5)
self.assertEqual(len(keydb._keydb_dict), 5)
self.assertEqual(len(tuf.roledb._roledb_dict), 5)
self.assertEqual(len(tuf.keydb._keydb_dict), 5)
# Test: normal case, first level delegation.
self.Repository._import_delegations('targets/delegated_role1')
self.assertEqual(len(roledb._roledb_dict), 6)
self.assertEqual(len(keydb._keydb_dict), 6)
self.assertEqual(len(tuf.roledb._roledb_dict), 6)
self.assertEqual(len(tuf.keydb._keydb_dict), 6)
# Verify that roledb dictionary was updated.
self.assertTrue('targets/delegated_role1' in roledb._roledb_dict)
self.assertTrue('targets/delegated_role1' in tuf.roledb._roledb_dict)
# Verify that keydb dictionary was updated.
keyids = self.semi_roledict['targets/delegated_role1']['keyids']
for keyid in keyids:
self.assertTrue(keyid in keydb._keydb_dict)
self.assertTrue(keyid in tuf.keydb._keydb_dict)
@ -1149,5 +1149,9 @@ def test_8_remove_obsolete_targets(self):
tuf.download.download_url_to_tempfileobj = original_download
def tearDownModule():
setup.remove_all_repositories(TestUpdater.repositories['main_repository'])
unittest_toolbox.Modified_TestCase.clear_toolbox()
if __name__ == '__main__':
unittest.main()