From 3ebf76182d7d34fc438b07257db40d4aa9f3b49d Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Wed, 11 Jan 2017 16:41:13 -0500 Subject: [PATCH] Fix imports statements in test_arbitrary_package_attack.py --- tests/test_arbitrary_package_attack.py | 111 ++++++++++++------------- 1 file changed, 55 insertions(+), 56 deletions(-) diff --git a/tests/test_arbitrary_package_attack.py b/tests/test_arbitrary_package_attack.py index 50d52561..b76301d4 100755 --- a/tests/test_arbitrary_package_attack.py +++ b/tests/test_arbitrary_package_attack.py @@ -20,7 +20,7 @@ Simulate an arbitrary package attack, where an updater client attempts to - download a malicious file. TUF and non-TUF client scenarios are tested. + download a malicious file. TUF and non-TUF client scenarios are tested. There is no difference between 'updates' and 'target' files. """ @@ -48,18 +48,17 @@ import unittest else: - import unittest2 as unittest + import unittest2 as unittest import tuf import tuf.formats -import tuf.ssl_crypto.util import tuf.roledb -import tuf.ssl_crypto.keydb +import tuf.keydb import tuf.log import tuf.client.updater as updater import tuf.unittest_toolbox as unittest_toolbox -from simple_settings import settings +import securesystemslib import six logger = logging.getLogger('tuf.test_arbitrary_package_attack') @@ -70,16 +69,16 @@ class TestArbitraryPackageAttack(unittest_toolbox.Modified_TestCase): @classmethod def setUpClass(cls): # setUpClass() is called before any of the test cases are executed. - + # Create a temporary directory to store the repository, metadata, and target # files. 'temporary_directory' must be deleted in TearDownModule() so that - # temporary files are always removed, even when exceptions occur. + # temporary files are always removed, even when exceptions occur. cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) - + # Launch a SimpleHTTPServer (serves files in the current directory). # Test cases will request metadata and target files that have been # pre-generated in 'tuf/tests/repository_data', which will be served by the - # SimpleHTTPServer launched here. The test cases of this unit test assume + # SimpleHTTPServer launched here. The test cases of this unit test assume # the pre-generated metadata files have a specific structure, such # as a delegated role 'targets/role1', three target files, five key files, # etc. @@ -97,15 +96,15 @@ def setUpClass(cls): - @classmethod + @classmethod def tearDownClass(cls): # tearDownModule() is called after all the test cases have run. # http://docs.python.org/2/library/unittest.html#class-and-module-fixtures - + # Remove the temporary repository directory, which should contain all the # metadata, targets, and key files generated of all the test cases. shutil.rmtree(cls.temporary_directory) - + # Kill the SimpleHTTPServer process. if cls.server_process.returncode is None: logger.info('Server process '+str(cls.server_process.pid)+' terminated.') @@ -116,21 +115,21 @@ def tearDownClass(cls): def setUp(self): # We are inheriting from custom class. unittest_toolbox.Modified_TestCase.setUp(self) - + # Copy the original repository files provided in the test folder so that # any modifications made to repository files are restricted to the copies. # The 'repository_data' directory is expected to exist in 'tuf/tests/'. - original_repository_files = os.path.join(os.getcwd(), 'repository_data') + original_repository_files = os.path.join(os.getcwd(), 'repository_data') temporary_repository_root = \ self.make_temp_directory(directory=self.temporary_directory) - + # The original repository, keystore, and client directories will be copied - # for each test case. + # for each test case. original_repository = os.path.join(original_repository_files, 'repository') original_client = os.path.join(original_repository_files, 'client') # Save references to the often-needed client repository directories. - # Test cases need these references to access metadata and target files. + # Test cases need these references to access metadata and target files. self.repository_directory = \ os.path.join(temporary_repository_root, 'repository') self.client_directory = os.path.join(temporary_repository_root, 'client') @@ -141,14 +140,14 @@ def setUp(self): shutil.copytree(original_client, self.client_directory) # Set the url prefix required by the 'tuf/client/updater.py' updater. - # 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'. + # 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'. repository_basepath = self.repository_directory[len(os.getcwd()):] url_prefix = \ - 'http://localhost:' + str(self.SERVER_PORT) + repository_basepath - - # Setting 'settings.repository_directory' with the temporary client + 'http://localhost:' + str(self.SERVER_PORT) + repository_basepath + + # Setting 'tuf.settings.repository_directory' with the temporary client # directory copied from the original repository files. - settings.repository_directory = self.client_directory + tuf.settings.repository_directory = self.client_directory self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix, 'metadata_path': 'metadata', 'targets_path': 'targets', @@ -164,46 +163,46 @@ def tearDown(self): # Modified_TestCase.tearDown() automatically deletes temporary files and # directories that may have been created during each test case. unittest_toolbox.Modified_TestCase.tearDown(self) - # updater.Updater() populates the roledb with the name "test_repository" + # updater.Updater() populates the roledb with the name "test_repository" tuf.roledb.clear_roledb(clear_all=True) - tuf.ssl_crypto.keydb.clear_keydb(clear_all=True) + tuf.keydb.clear_keydb(clear_all=True) def test_without_tuf(self): # Verify that a target file replaced with a malicious version is downloaded # by a non-TUF client (i.e., a non-TUF client that does not verify hashes, # detect mix-and-mix attacks, etc.) A tuf client, on the other hand, should # detect that the downloaded target file is invalid. - + # Test: Download a valid target file from the repository. # Ensure the target file to be downloaded has not already been downloaded, # and generate its file size and digest. The file size and digest is needed # to check that the malicious file was indeed downloaded. target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt') - client_target_path = os.path.join(self.client_directory, 'file1.txt') + client_target_path = os.path.join(self.client_directory, 'file1.txt') self.assertFalse(os.path.exists(client_target_path)) - length, hashes = tuf.ssl_crypto.util.get_file_details(target_path) + length, hashes = securesystemslib.util.get_file_details(target_path) fileinfo = tuf.formats.make_fileinfo(length, hashes) - + url_prefix = self.repository_mirrors['mirror1']['url_prefix'] url_file = os.path.join(url_prefix, 'targets', 'file1.txt') six.moves.urllib.request.urlretrieve(url_file, client_target_path) - + self.assertTrue(os.path.exists(client_target_path)) - length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path) + length, hashes = securesystemslib.util.get_file_details(client_target_path) download_fileinfo = tuf.formats.make_fileinfo(length, hashes) self.assertEqual(fileinfo, download_fileinfo) - + # Test: Download a target file that has been modified by an attacker. with open(target_path, 'wt') as file_object: file_object.write('add malicious content.') - length, hashes = tuf.ssl_crypto.util.get_file_details(target_path) + length, hashes = securesystemslib.util.get_file_details(target_path) malicious_fileinfo = tuf.formats.make_fileinfo(length, hashes) - + six.moves.urllib.request.urlretrieve(url_file, client_target_path) - - length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path) + + length, hashes = securesystemslib.util.get_file_details(client_target_path) download_fileinfo = tuf.formats.make_fileinfo(length, hashes) - + # Verify 'download_fileinfo' is unequal to the original trusted version. self.assertNotEqual(download_fileinfo, fileinfo) @@ -226,32 +225,32 @@ def test_with_tuf(self): target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt') with open(target_path, 'wt') as file_object: file_object.write('malicious content, size 33 bytes.') - + try: self.repository_updater.download_target(file1_fileinfo, destination) - - except tuf.ssl_commons.exceptions.NoWorkingMirrorError as exception: + + except tuf.exceptions.NoWorkingMirrorError as exception: url_prefix = self.repository_mirrors['mirror1']['url_prefix'] url_file = os.path.join(url_prefix, 'targets', 'file1.txt') # Verify that only one exception is raised for 'url_file'. self.assertTrue(len(exception.mirror_errors), 1) - # Verify that the expected 'tuf.ssl_commons.exceptions.DownloadLengthMismatchError' exception + # Verify that the expected 'tuf.exceptions.DownloadLengthMismatchError' exception # is raised for 'url_file'. self.assertTrue(url_file in exception.mirror_errors) self.assertTrue(isinstance(exception.mirror_errors[url_file], - tuf.ssl_commons.exceptions.BadHashError)) - + securesystemslib.exceptions.BadHashError)) + else: self.fail('TUF did not prevent an arbitrary package attack.') - - + + def test_with_tuf_and_metadata_tampering(self): # Test that a TUF client does not download a malicious target file, and a # 'targets.json' metadata file that has also been modified by the attacker. # The attacker does not attach a valid signature to 'targets.json' - + # An attacker modifies 'file1.txt'. target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt') with open(target_path, 'wt') as file_object: @@ -259,20 +258,20 @@ def test_with_tuf_and_metadata_tampering(self): # An attacker also tries to add the malicious target's length and digest # to its metadata file. - length, hashes = tuf.ssl_crypto.util.get_file_details(target_path) + length, hashes = securesystemslib.util.get_file_details(target_path) metadata_path = \ os.path.join(self.repository_directory, 'metadata', 'targets.json') - - metadata = tuf.ssl_crypto.util.load_json_file(metadata_path) + + metadata = securesystemslib.util.load_json_file(metadata_path) metadata['signed']['targets']['/file1.txt']['hashes'] = hashes metadata['signed']['targets']['/file1.txt']['length'] = length - tuf.formats.check_signable_object_format(metadata) - + tuf.formats.check_signable_object_format(metadata) + with open(metadata_path, 'wb') as file_object: - json.dumps(metadata, file_object, indent=1, sort_keys=True).encode('utf-8') - + json.dumps(metadata, file_object, indent=1, sort_keys=True).encode('utf-8') + # Verify that the malicious 'targets.json' is not downloaded. Perform # a refresh of top-level metadata to demonstrate that the malicious # 'targets.json' is not downloaded. @@ -281,8 +280,8 @@ def test_with_tuf_and_metadata_tampering(self): file1_fileinfo = self.repository_updater.get_one_valid_targetinfo('file1.txt') destination = os.path.join(self.client_directory) self.repository_updater.download_target(file1_fileinfo, destination) - - except tuf.ssl_commons.exceptions.NoWorkingMirrorError as exception: + + except tuf.exceptions.NoWorkingMirrorError as exception: url_prefix = self.repository_mirrors['mirror1']['url_prefix'] url_file = os.path.join(url_prefix, 'targets', 'file1.txt') @@ -292,8 +291,8 @@ def test_with_tuf_and_metadata_tampering(self): # Verify that the specific and expected mirror exception is raised. self.assertTrue(url_file in exception.mirror_errors) self.assertTrue(isinstance(exception.mirror_errors[url_file], - tuf.ssl_commons.exceptions.BadHashError)) - + securesystemslib.exceptions.BadHashError)) + else: self.fail('TUF did not prevent an arbitrary package attack.')