diff --git a/tests/integration/test_extraneous_dependencies_attack.py b/tests/integration/test_extraneous_dependencies_attack.py index a97b5494..9545b5b1 100755 --- a/tests/integration/test_extraneous_dependencies_attack.py +++ b/tests/integration/test_extraneous_dependencies_attack.py @@ -5,10 +5,17 @@ test_extraneous_dependencies_attack.py - Zane Fisher + Zane Fisher. August 19, 2013 + + April 6, 2014. + Refactored to use the 'unittest' module (test conditions in code, rather + than verifying text output), use pre-generated repository files, and + discontinue use of the old repository tools. Modify the previous scenario + simulated for the mix-and-match attack. The metadata that specified the + dependencies of a project modified (previously a text file.) -vladimir.v.diaz See LICENSE for licensing information. @@ -20,20 +27,7 @@ target dependency even if it is found on the repository. Valid targets are listed and verified by TUF metadata, such as 'targets.txt'. - Target dependencies listed in the file are comma-separated. - - Note: The interposition provided by 'tuf.interposition' is used to intercept - all calls made by urllib/urillib2 to certain hostnames specified in - the interposition configuration file. Look up interposition.py for more - information and illustration of a sample contents of the interposition - configuration file. Interposition was meant to make TUF integration with an - existing software updater an easy process. This allows for more flexibility - to the existing software updater. However, if you are planning to solely use - TUF there should be no need for interposition, all necessary calls will be - generated from within TUF. - There is no difference between 'updates' and 'target' files. - """ # Help with Python 3 compatability, where the print statement is a function, an @@ -45,198 +39,190 @@ import os import urllib +import tempfile +import random +import time +import shutil +import json +import subprocess +import unittest +import logging -import tuf -import tuf.interposition -import tuf.tests.util_test_tools as util_test_tools +import tuf.formats +import tuf.util +import tuf.log +import tuf.client.updater as updater +import tuf.tests.unittest_toolbox as unittest_toolbox -class ExtraneousDependencyAlert(Exception): - pass +logger = logging.getLogger('tuf.test_extraneous_dependencies_attack') -# Interpret anything following 'requires:' in the contents of the file it -# downloads as a comma-separated list of dependent files from the same -# repository. -def _download(url, filename, directory, using_tuf=False): - destination = os.path.join(directory, filename) - if using_tuf: - tuf.interposition.urllib_tuf.urlretrieve(url, destination) - else: - urllib.urlretrieve(url, destination) +class TestExtraneousDependenciesAttack(unittest_toolbox.Modified_TestCase): - file_contents = util_test_tools.read_file_content(destination) + @classmethod + def setUpClass(cls): + # setUpClass() is called before any of the test cases are executed. + + # Create a temporary directory to store the repository, metadata, and target + # files. 'temporary_directory' must be deleted in TearDownModule() so that + # temporary files are always removed, even when exceptions occur. + cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) + + # Launch a SimpleHTTPServer (serves files in the current directory). + # Test cases will request metadata and target files that have been + # pre-generated in 'tuf/tests/repository_data', which will be served by the + # SimpleHTTPServer launched here. The test cases of this unit test assume + # the pre-generated metadata files have a specific structure, such + # as a delegated role 'targets/role1', three target files, five key files, + # etc. + cls.SERVER_PORT = random.randint(30000, 45000) + command = ['python', 'simple_server.py', str(cls.SERVER_PORT)] + cls.server_process = subprocess.Popen(command, stderr=subprocess.PIPE) + logger.info('Server process started.') + logger.info('Server process id: '+str(cls.server_process.pid)) + logger.info('Serving on port: '+str(cls.SERVER_PORT)) + cls.url = 'http://localhost:'+str(cls.SERVER_PORT) + os.path.sep - # Parse the list of required files (if it exists) and download them. - if file_contents.find('requires:') != -1: - required_files =\ - file_contents[file_contents.find('requires:') + 9:].split(',') - for required_filename in required_files: - required_file_url = os.path.dirname(url)+os.sep+required_filename - _download(required_file_url, required_filename, directory, using_tuf) + # NOTE: Following error is raised if a delay is not applied: + # + time.sleep(.2) - -def test_extraneous_dependency_attack(using_tuf=False, modify_metadata=False): - """ - - Illustrate extraneous dependency attack vulnerability. - - - using_tuf: - If set to 'False' all directories that start with 'tuf_' are ignored, - indicating that tuf is not implemented. - - """ - - ERROR_MSG = 'Extraneous Dependency Attack was Successful!' - - try: - # Setup. - root_repo, url, server_proc, keyids = util_test_tools.init_repo(using_tuf) - reg_repo = os.path.join(root_repo, 'reg_repo') - tuf_repo = os.path.join(root_repo, 'tuf_repo') - downloads = os.path.join(root_repo, 'downloads') - targets_dir = os.path.join(tuf_repo, 'targets') - - # Add files to 'repo' directory: {root_repo}. - good_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo, - 'the file you need') - good_dependency_basename = os.path.basename(good_dependency_filepath) - - bad_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo, - 'the file you don\'t need') - bad_dependency_basename = os.path.basename(bad_dependency_filepath) - - # The dependent file lists the good dependency. - dependent_filepath = util_test_tools.add_file_to_repository(reg_repo, - 'requires:'+good_dependency_basename) - dependent_basename = os.path.basename(dependent_filepath) - - url_to_repo = url+'reg_repo/'+dependent_basename - - # List the bad dependency first. If an attacker modifies a target by - # simply appending the file contents, tuf.download will ignore the appended - # data, downloading only as much data as the TUF metadata says the target - # should contain. - modified_dependency_list = bad_dependency_basename+','+\ - good_dependency_basename - - if using_tuf: - # Update TUF metadata before attacker modifies anything. - util_test_tools.tuf_refresh_repo(root_repo, keyids) - - # Modify the url. Remember that the interposition will intercept - # urls that have 'localhost:9999' hostname, which was specified in - # the json interposition configuration file. Look for 'hostname' - # in 'util_test_tools.py'. Further, the 'file_basename' is the target - # path relative to 'targets_dir'. - url_to_repo = 'http://localhost:9999/'+dependent_basename - - # Attacker modifies the depenent file in the targets repository, adding - # the bad dependency to its list. - dependent_target_filepath = os.path.join(targets_dir, dependent_basename) - util_test_tools.modify_file_at_repository(dependent_target_filepath, - 'requires:'+modified_dependency_list) - - if modify_metadata: - - # Modify targets metadata to reflect the change to the target file. - targets_metadata_filepath = os.path.join(tuf_repo, 'metadata', - 'targets.txt') - util_test_tools.update_target_in_metadata(dependent_target_filepath, - targets_metadata_filepath) - - # Modify release metadata to reflect the change to targets metadata. - release_metadata_filepath = os.path.join(tuf_repo, 'metadata', - 'release.txt') - util_test_tools.update_role_in_metadata(targets_metadata_filepath, - release_metadata_filepath) - - # Modify timestamp metadata to reflect the change to release metadata. - timestamp_metadata_filepath = os.path.join(tuf_repo, 'metadata', - 'timestamp.txt') - util_test_tools.update_role_in_metadata(release_metadata_filepath, - timestamp_metadata_filepath) - - - - - # Attacker modifies the depenent file in the regular repository, adding - # the bad dependency to its list. - util_test_tools.modify_file_at_repository(dependent_filepath, - 'requires:'+modified_dependency_list) - - # End of Setup. + @classmethod + def tearDownClass(cls): + # tearDownModule() is called after all the test cases have run. + # http://docs.python.org/2/library/unittest.html#class-and-module-fixtures + + # Remove the temporary repository directory, which should contain all the + # metadata, targets, and key files generated of all the test cases. + shutil.rmtree(cls.temporary_directory) + + unittest_toolbox.Modified_TestCase.clear_toolbox() + + # Kill the SimpleHTTPServer process. + if cls.server_process.returncode is None: + logger.info('Server process '+str(cls.server_process.pid)+' terminated.') + cls.server_process.kill() + + def setUp(self): + # We are inheriting from custom class. + unittest_toolbox.Modified_TestCase.setUp(self) + + # Copy the original repository files provided in the test folder so that + # any modifications made to repository files are restricted to the copies. + # The 'repository_data' directory is expected to exist in 'tuf/tests/'. + original_repository_files = os.path.join(os.getcwd(), os.pardir, + 'repository_data') + temporary_repository_root = \ + self.make_temp_directory(directory=self.temporary_directory) + + # The original repository, keystore, and client directories will be copied + # for each test case. + original_repository = os.path.join(original_repository_files, 'repository') + original_client = os.path.join(original_repository_files, 'client') + original_keystore = os.path.join(original_repository_files, 'keystore') + + # Save references to the often-needed client repository directories. + # Test cases need these references to access metadata and target files. + self.repository_directory = \ + os.path.join(temporary_repository_root, 'repository') + self.client_directory = os.path.join(temporary_repository_root, 'client') + self.keystore_directory = os.path.join(temporary_repository_root, 'keystore') + + # Copy the original 'repository', 'client', and 'keystore' directories + # to the temporary repository the test cases can use. + shutil.copytree(original_repository, self.repository_directory) + shutil.copytree(original_client, self.client_directory) + shutil.copytree(original_keystore, self.keystore_directory) + + # Set the url prefix required by the 'tuf/client/updater.py' updater. + # 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'. + repository_basepath = self.repository_directory[len(os.getcwd()):] + url_prefix = \ + 'http://localhost:' + str(self.SERVER_PORT) + repository_basepath + + # Setting 'tuf.conf.repository_directory' with the temporary client + # directory copied from the original repository files. + tuf.conf.repository_directory = self.client_directory + self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix, + 'metadata_path': 'metadata', + 'targets_path': 'targets', + 'confined_target_dirs': ['']}} + + # Create the repository instance. The test cases will use this client + # updater to refresh metadata, fetch target files, etc. + self.repository_updater = updater.Updater('test_repository', + self.repository_mirrors) + + + def tearDown(self): + # Modified_TestCase.tearDown() automatically deletes temporary files and + # directories that may have been created during each test case. + unittest_toolbox.Modified_TestCase.tearDown(self) + + + + def test_with_tuf(self): + # An attacker tries to trick a client into installing an extraneous target + # file (a valid file on the repository, in this case) by listing it in the + # project's metadata file. For the purposes of test_with_tuf(), + # 'targets/role1.json' is treated as the metadata file that indicates all + # the files needed to install/update the 'role1' project. The attacker + # simply adds the extraneous target file to 'role1.json', which the TUF + # client should reject as untrusted. + role1_filepath = os.path.join(self.repository_directory, 'metadata', + 'targets', 'role1.json') + file1_filepath = os.path.join(self.repository_directory, 'targets', + 'file1.txt') + length, hashes = tuf.util.get_file_details(file1_filepath) + + role1_metadata = tuf.util.load_json_file(role1_filepath) + role1_metadata['signed']['targets']['/file2.txt'] = {} + role1_metadata['signed']['targets']['/file2.txt']['hashes'] = hashes + role1_metadata['signed']['targets']['/file2.txt']['length'] = length + + tuf.formats.check_signable_object_format(role1_metadata) + + with open(role1_filepath, 'wb') as file_object: + json.dump(role1_metadata, file_object, indent=1, sort_keys=True) + + # Un-install the metadata of the top-level roles so that the client can + # download and detect the invalid 'role1.json'. + os.remove(os.path.join(self.client_directory, 'metadata', 'current', + 'snapshot.json')) + os.remove(os.path.join(self.client_directory, 'metadata', 'current', + 'targets.json')) + os.remove(os.path.join(self.client_directory, 'metadata', 'current', + 'timestamp.json')) + os.remove(os.path.join(self.client_directory, 'metadata', 'current', + 'targets', 'role1.json')) + + # Verify that the TUF client rejects the invalid metadata and refuses to + # continue the update process. + self.repository_updater.refresh() + try: - # Client downloads (tries to download) the file. - _download(url_to_repo, dependent_basename, downloads, using_tuf) - - except tuf.NoWorkingMirrorError, error: - # We only set up one mirror, so if it fails, we expect a - # NoWorkingMirrorError. If TUF has worked as intended, the mirror error - # contained within should be a BadHashError or a BadSignatureError, - # depending on whether the metadata was modified. - if modify_metadata: - mirror_error = \ - error.mirror_errors[url+'tuf_repo/metadata/timestamp.txt'] - - assert isinstance(mirror_error, tuf.BadSignatureError) - - else: - mirror_error = \ - error.mirror_errors[url+'tuf_repo/targets/'+dependent_basename] - - assert isinstance(mirror_error, tuf.BadHashError) + self.repository_updater.targets_of_role('targets/role1') + + # Verify that the specific 'tuf.BadHashError' exception is raised by each + # mirror. + except tuf.NoWorkingMirrorError, exception: + for mirror_url, mirror_error in exception.mirror_errors.iteritems(): + url_prefix = self.repository_mirrors['mirror1']['url_prefix'] + url_file = os.path.join(url_prefix, 'metadata', 'targets', 'role1.json') + + # Verify that 'role1.json' is the culprit. + self.assertEqual(url_file, mirror_url) + self.assertTrue(isinstance(mirror_error, tuf.BadHashError)) else: - # Check if the legitimate dependency was downloaded. - if not(os.path.exists(os.path.join(downloads, good_dependency_basename))): - raise tuf.DownloadError - - # Check if the extraneous dependency was downloaded. - if os.path.exists(os.path.join(downloads, bad_dependency_basename)): - raise ExtraneousDependencyAlert(ERROR_MSG) - - finally: - util_test_tools.cleanup(root_repo, server_proc) + self.fail('TUF did not prevent an extraneous dependencies attack.') - -print('Attempting extraneous dependency attack without TUF:') -try: - test_extraneous_dependency_attack(using_tuf=False) - -except ExtraneousDependencyAlert, error: - print(error) - -else: - print('Extraneous dependency attack failed.') -print() - -print('Attempting extraneous dependency attack with TUF:') -try: - test_extraneous_dependency_attack(using_tuf=True, modify_metadata=False) - -except ExtraneousDependencyAlert, error: - print(error) - -else: - print('Extraneous dependency attack failed.') -print() - -print('Attempting extraneous dependency attack with TUF'+\ - ' (and tampering with metadata):') -try: - test_extraneous_dependency_attack(using_tuf=True, modify_metadata=True) - -except ExtraneousDependencyAlert, error: - print(error) - -else: - print('Extraneous dependency attack failed.') -print() +if __name__ == '__main__': + unittest.main()