From b52194cdf4f5a24bedce2ceac01a3849aa7f5cc4 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Thu, 3 Apr 2014 11:59:50 -0400 Subject: [PATCH] Refactor test_endless_data_attack.py Refactored 'test_endless_data_attack.py' to use the 'unittest' module (test conditions in code, rather than verifying text output), use pre-generated repository files, and discontinue use of the old repository tools. Minor edits to the test cases. --- tests/integration/test_endless_data_attack.py | 376 +++++++++++------- 1 file changed, 234 insertions(+), 142 deletions(-) diff --git a/tests/integration/test_endless_data_attack.py b/tests/integration/test_endless_data_attack.py index cb555dbc..caf996cc 100755 --- a/tests/integration/test_endless_data_attack.py +++ b/tests/integration/test_endless_data_attack.py @@ -5,30 +5,27 @@ test_endless_data_attack.py - Konstantin Andrianov + Konstantin Andrianov. - March 13, 2012 + March 13, 2012. + + April 3, 2014. + Refactored to use the 'unittest' module (test conditions in code, rather + than verifying text output), use pre-generated repository files, and + discontinue use of the old repository tools. Minor edits to the test cases. + -vladimir.v.diaz See LICENSE for licensing information. - Simulate an endless data attack. A simple client update vs. client - update implementing TUF. - - Note: The interposition provided by 'tuf.interposition' is used to intercept - all calls made by urllib/urillib2 to certain hostname specified in - the interposition configuration file. Look up interposition.py for more - information and illustration of a sample contents of the interposition - configuration file. Interposition was meant to make TUF integration with an - existing software updater an easy process. This allows for more flexibility - to the existing software updater. However, if you are planning to solely use - TUF there should be no need for interposition, all necessary calls will be - generated from within TUF. + Simulate an endless data attack, where an updater client tries to download a + target file modified by an attacker to contain a large amount of data (a TUF + client should only download up to the file's expected length). TUF and + non-TUF client scenarios are tested. There is no difference between 'updates' and 'target' files. - """ # Help with Python 3 compatability, where the print statement is a function, an @@ -40,150 +37,245 @@ import os import urllib +import tempfile +import random +import time +import shutil +import json +import subprocess +import unittest +import logging import tuf -import tuf.interposition -import tuf.tests.util_test_tools as util_test_tools +import tuf.formats +import tuf.util +import tuf.log +import tuf.client.updater as updater +import tuf.tests.unittest_toolbox as unittest_toolbox + +logger = logging.getLogger('tuf.test_endless_data_attack') + + +class TestEndlessDataAttack(unittest_toolbox.Modified_TestCase): + + @classmethod + def setUpClass(cls): + # setUpClass() is called before any of the test cases are executed. + + # Create a temporary directory to store the repository, metadata, and target + # files. 'temporary_directory' must be deleted in TearDownModule() so that + # temporary files are always removed, even when exceptions occur. + cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) + + # Launch a SimpleHTTPServer (serves files in the current directory). + # Test cases will request metadata and target files that have been + # pre-generated in 'tuf/tests/repository_data', which will be served by the + # SimpleHTTPServer launched here. The test cases of this unit test assume + # the pre-generated metadata files have a specific structure, such + # as a delegated role 'targets/role1', three target files, five key files, + # etc. + cls.SERVER_PORT = random.randint(30000, 45000) + command = ['python', 'simple_server.py', str(cls.SERVER_PORT)] + cls.server_process = subprocess.Popen(command, stderr=subprocess.PIPE) + logger.info('Server process started.') + logger.info('Server process id: '+str(cls.server_process.pid)) + logger.info('Serving on port: '+str(cls.SERVER_PORT)) + cls.url = 'http://localhost:'+str(cls.SERVER_PORT) + os.path.sep + + # NOTE: Following error is raised if a delay is not applied: + # + time.sleep(.2) -class EndlessDataAttack(Exception): - pass + @classmethod + def tearDownClass(cls): + # tearDownModule() is called after all the test cases have run. + # http://docs.python.org/2/library/unittest.html#class-and-module-fixtures + + # Remove the temporary repository directory, which should contain all the + # metadata, targets, and key files generated of all the test cases. + shutil.rmtree(cls.temporary_directory) + + unittest_toolbox.Modified_TestCase.clear_toolbox() + + # Kill the SimpleHTTPServer process. + if cls.server_process.returncode is None: + logger.info('Server process '+str(cls.server_process.pid)+' terminated.') + cls.server_process.kill() -def _download(url, filename, using_tuf=False): - if using_tuf: - tuf.interposition.urllib_tuf.urlretrieve(url, filename) - else: - urllib.urlretrieve(url, filename) + def setUp(self): + # We are inheriting from custom class. + unittest_toolbox.Modified_TestCase.setUp(self) + + # Copy the original repository files provided in the test folder so that + # any modifications made to repository files are restricted to the copies. + # The 'repository_data' directory is expected to exist in 'tuf/tests/'. + original_repository_files = os.path.join(os.getcwd(), os.pardir, + 'repository_data') + temporary_repository_root = \ + self.make_temp_directory(directory=self.temporary_directory) + + # The original repository, keystore, and client directories will be copied + # for each test case. + original_repository = os.path.join(original_repository_files, 'repository') + original_client = os.path.join(original_repository_files, 'client') + + # Save references to the often-needed client repository directories. + # Test cases need these references to access metadata and target files. + self.repository_directory = \ + os.path.join(temporary_repository_root, 'repository') + self.client_directory = os.path.join(temporary_repository_root, 'client') + + # Copy the original 'repository', 'client', and 'keystore' directories + # to the temporary repository the test cases can use. + shutil.copytree(original_repository, self.repository_directory) + shutil.copytree(original_client, self.client_directory) + + # Set the url prefix required by the 'tuf/client/updater.py' updater. + # 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'. + repository_basepath = self.repository_directory[len(os.getcwd()):] + url_prefix = \ + 'http://localhost:' + str(self.SERVER_PORT) + repository_basepath + + # Setting 'tuf.conf.repository_directory' with the temporary client + # directory copied from the original repository files. + tuf.conf.repository_directory = self.client_directory + self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix, + 'metadata_path': 'metadata', + 'targets_path': 'targets', + 'confined_target_dirs': ['']}} + + # Create the repository instance. The test cases will use this client + # updater to refresh metadata, fetch target files, etc. + self.repository_updater = updater.Updater('test_repository', + self.repository_mirrors) + + + def tearDown(self): + # Modified_TestCase.tearDown() automatically deletes temporary files and + # directories that may have been created during each test case. + unittest_toolbox.Modified_TestCase.tearDown(self) -def test_endless_data_attack(using_tuf=False, TIMESTAMP=False): - """ - - Illustrate endless data attack vulnerability. + def test_without_tuf(self): + # Verify that a target file replaced with a larger malicious version (to + # simulate an endless data attack) is downloaded by a non-TUF client (i.e., + # a non-TUF client that does not verify hashes, detect mix-and-mix attacks, + # etc.) A tuf client, on the other hand, should only download target files + # up to their expected lengths, as explicitly specified in metadata, or + # 'tuf/conf.py' (when retrieving 'timestamp.json' and 'root.json unsafely'.) + + # Test: Download a valid target file from the repository. + # Ensure the target file to be downloaded has not already been downloaded, + # and generate its file size and digest. The file size and digest is needed + # to verify that the malicious file was indeed downloaded. + target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt') + client_target_path = os.path.join(self.client_directory, 'file1.txt') + self.assertFalse(os.path.exists(client_target_path)) + length, hashes = tuf.util.get_file_details(target_path) + fileinfo = tuf.formats.make_fileinfo(length, hashes) + + url_prefix = self.repository_mirrors['mirror1']['url_prefix'] + url_file = os.path.join(url_prefix, 'targets', 'file1.txt') + urllib.urlretrieve(url_file, client_target_path) + + self.assertTrue(os.path.exists(client_target_path)) + length, hashes = tuf.util.get_file_details(client_target_path) + download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + self.assertEqual(fileinfo, download_fileinfo) + + # Test: Download a target file that has been modified by an attacker with + # extra data. + with open(target_path, 'r+b') as file_object: + original_content = file_object.read() + file_object.write(original_content+('append large amount of data' * 100000)) + large_length, hashes = tuf.util.get_file_details(target_path) + malicious_fileinfo = tuf.formats.make_fileinfo(large_length, hashes) + + # Is the modified file actually larger? + self.assertTrue(large_length > length) + + urllib.urlretrieve(url_file, client_target_path) + + length, hashes = tuf.util.get_file_details(client_target_path) + download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + + # Verify 'download_fileinfo' is unequal to the original trusted version. + self.assertNotEqual(download_fileinfo, fileinfo) - - using_tuf: - If set to 'False' all directories that start with 'tuf_' are ignored, - indicating that tuf is not implemented. - - """ - - ERROR_MSG = 'Endless Data Attack was Successful!\n' - - try: - # Setup. - root_repo, url, server_proc, keyids = util_test_tools.init_repo(using_tuf) - reg_repo = os.path.join(root_repo, 'reg_repo') - tuf_repo = os.path.join(root_repo, 'tuf_repo') - downloads = os.path.join(root_repo, 'downloads') - tuf_targets = os.path.join(tuf_repo, 'targets') - - # Original data. - INTENDED_DATA = 'Test A' - - # Add a file to 'repo' directory: {root_repo} - filepath = util_test_tools.add_file_to_repository(reg_repo, INTENDED_DATA) - file_basename = os.path.basename(filepath) - url_to_repo = url+'reg_repo/'+file_basename - downloaded_file = os.path.join(downloads, file_basename) - # We do not deliver truly endless data, but we will extend the original - # file by many bytes. - noisy_data = 'X'*100000 + # Verify 'download_fileinfo' is equal to the malicious version. + self.assertEqual(download_fileinfo, malicious_fileinfo) - if using_tuf: - # Update TUF metadata before attacker modifies anything. - util_test_tools.tuf_refresh_repo(root_repo, keyids) - # Modify the url. Remember that the interposition will intercept - # urls that have 'localhost:9999' hostname, which was specified in - # the json interposition configuration file. Look for 'hostname' - # in 'util_test_tools.py'. Further, the 'file_basename' is the target - # path relative to 'targets_dir'. - url_to_repo = 'http://localhost:9999/'+file_basename - # Attacker modifies the file at the targets repository. - target = os.path.join(tuf_targets, file_basename) - original_data = util_test_tools.read_file_content(target) - larger_original_data = original_data + noisy_data - util_test_tools.modify_file_at_repository(target, larger_original_data) + def test_with_tuf(self): + # Verify that a target file (on the remote repository) modified by an + # attacker, to contain a large amount of extra data, is not downloaded by + # the TUF client. First test that the valid target file is successfully + # downloaded. + file1_fileinfo = self.repository_updater.target('file1.txt') + destination = os.path.join(self.client_directory) + self.repository_updater.download_target(file1_fileinfo, destination) + client_target_path = os.path.join(destination, 'file1.txt') + self.assertTrue(os.path.exists(client_target_path)) + + # Verify the client's downloaded file matches the repository's. + target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt') + length, hashes = tuf.util.get_file_details(client_target_path) + fileinfo = tuf.formats.make_fileinfo(length, hashes) + + length, hashes = tuf.util.get_file_details(client_target_path) + download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + self.assertEqual(fileinfo, download_fileinfo) - # Attacker modifies the timestamp.txt metadata. - if TIMESTAMP: - metadata = os.path.join(tuf_repo, 'metadata') - timestamp = os.path.join(metadata, 'timestamp.txt') - original_data = util_test_tools.read_file_content(timestamp) - larger_original_data = original_data + noisy_data - util_test_tools.modify_file_at_repository(timestamp, - larger_original_data) - - # Attacker modifies the file at the regular repository. - original_data = util_test_tools.read_file_content(filepath) - larger_original_data = original_data + noisy_data - util_test_tools.modify_file_at_repository(filepath, larger_original_data) - - # End Setup. - - - # Client downloads (tries to download) the file. + # Modify 'file1.txt' and confirm that the TUF client only downloads up to + # the expected file length. + with open(target_path, 'r+b') as file_object: + original_content = file_object.read() + file_object.write(original_content+('append large amount of data' * 10000)) + + # Is the modified file actually larger? + large_length, hashes = tuf.util.get_file_details(target_path) + self.assertTrue(large_length > length) + + os.remove(client_target_path) + self.repository_updater.download_target(file1_fileinfo, destination) + + # A large amount of data has been appended to the original content. The + # extra data appended should be discarded by the client, so the downloaded + # file size and hash should not have changed. + length, hashes = tuf.util.get_file_details(client_target_path) + download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + self.assertEqual(fileinfo, download_fileinfo) + + # Test that the TUF client does not download large metadata files, as well. + timestamp_path = os.path.join(self.repository_directory, 'metadata', + 'timestamp.json') + + original_length, hashes = tuf.util.get_file_details(timestamp_path) + + with open(timestamp_path, 'r+b') as file_object: + original_content = file_object.read() + file_object.write(original_content+('append large amount of data' * 10000)) + + modified_length, hashes = tuf.util.get_file_details(timestamp_path) + self.assertTrue(modified_length > original_length) + + # Does the TUF client download the upper limit of an unsafely fetched + # 'timestamp.json'? 'timestamp.json' must not be greater than + # 'tuf.conf.DEFAULT_TIMESTAMP_REQUIRED_LENGTH'. try: - _download(url_to_repo, downloaded_file, using_tuf) - except Exception, exception: - # Because we are extending the true timestamp TUF metadata with invalid - # JSON, we except to catch an error about invalid metadata JSON. - if using_tuf and TIMESTAMP: - endless_data_attack = False - - for mirror_url, mirror_error in exception.mirror_errors.iteritems(): - if isinstance(mirror_error, tuf.InvalidMetadataJSONError): - endless_data_attack = True - break - - # In case we did not detect what was likely an endless data attack, we - # reraise the exception to indicate that endless data attack detection - # failed. - if not endless_data_attack: raise - else: raise - - # When we test downloading "endless" timestamp with TUF, we want to skip - # the following test because downloading the timestamp should have failed. - if not (using_tuf and TIMESTAMP): - # Check whether the attack succeeded by inspecting the content of the - # update. The update should contain 'Test A'. Technically it suffices - # to check whether the file was downloaded or not. - downloaded_content = util_test_tools.read_file_content(downloaded_file) - if downloaded_content != INTENDED_DATA: - raise EndlessDataAttack(ERROR_MSG) - - finally: - util_test_tools.cleanup(root_repo, server_proc) + self.repository_updater.refresh() + + except tuf.NoWorkingMirrorError, exception: + for mirror_url, mirror_error in exception.mirror_errors.iteritems(): + self.assertTrue(isinstance(mirror_error, tuf.InvalidMetadataJSONError)) - - -try: - test_endless_data_attack(using_tuf=False, TIMESTAMP=False) -except EndlessDataAttack, error: - print('Endless data attack worked on download without TUF!') - -try: - test_endless_data_attack(using_tuf=True, TIMESTAMP=False) -except EndlessDataAttack, error: - print('Endless data attack worked on download without TUF!') - print(str(error)) -else: - print('Endless data attack did not work on download with TUF!') - -try: - # This test fails because the timestamp metadata has been extended with - # random data from its true length, thereby resulting in invalid JSON. - test_endless_data_attack(using_tuf=True, TIMESTAMP=True) -except EndlessDataAttack, error: - print('Endless data attack worked on download without TUF!') - print(str(error)) -else: - print('Endless data attack did not work on download with TUF!') +if __name__ == '__main__': + unittest.main()