diff --git a/tests/integration/test_replay_attack.py b/tests/integration/test_replay_attack.py index 8e12b684..284eba2d 100755 --- a/tests/integration/test_replay_attack.py +++ b/tests/integration/test_replay_attack.py @@ -5,34 +5,26 @@ test_replay_attack.py - Konstantin Andrianov + Konstantin Andrianov. - February 22, 2012 + February 22, 2012. + + April 5, 2014. + Refactored to use the 'unittest' module (test conditions in code, rather + than verifying text output), use pre-generated repository files, and + discontinue use of the old repository tools. Expanded comments. + -vladimir.v.diaz See LICENSE for licensing information. - Simulate a replay attack. A simple client update vs. client update - implementing TUF. - - In the replay attack an attacker is able to trick clients into installing - software that is older than that which the client previously knew to be - available. - - NOTE: The interposition provided by 'tuf.interposition' is used to intercept - all calls made by urllib/urillib2 to certain hostname specified in - the interposition configuration file. Look up interposition.py for more - information and illustration of a sample contents of the interposition - configuration file. Interposition was meant to make TUF integration with an - existing software updater an easy process. This allows for more flexibility - to the existing software updater. However, if you are planning to solely use - TUF there should be no need for interposition, all necessary calls will be - generated from within TUF. - - There is no difference between 'updates' and 'target' files. + Simulate a replay, or rollback, attack. In a replay attack, a client is + tricked into installing software that is older than that which the client + previously knew to be available. + Note: There is no difference between 'updates' and 'target' files. """ # Help with Python 3 compatability, where the print statement is a function, an @@ -43,162 +35,296 @@ from __future__ import division import os -import shutil import urllib import tempfile +import random +import time +import shutil +import json +import subprocess +import unittest +import logging -import tuf.interposition -import tuf.tests.util_test_tools as util_test_tools +import tuf.formats +import tuf.util +import tuf.log +import tuf.client.updater as updater +import tuf.repository_tool as repo_tool +import tuf.tests.unittest_toolbox as unittest_toolbox + +# The repository tool is imported and logs console messages by default. Disable +# console log messages generated by this unit test. +repo_tool.disable_console_log_messages() + +logger = logging.getLogger('tuf.test_replay_attack') -class TestSetupError(Exception): pass -class ReplayAttackAlert(Exception): pass +class TestReplayAttack(unittest_toolbox.Modified_TestCase): -def _download(url, filename, using_tuf=False): - if using_tuf: - tuf.interposition.urllib_tuf.urlretrieve(url, filename) + @classmethod + def setUpClass(cls): + # setUpClass() is called before any of the test cases are executed. - else: - urllib.urlretrieve(url, filename) + # Create a temporary directory to store the repository, metadata, and target + # files. 'temporary_directory' must be deleted in TearDownModule() so that + # temporary files are always removed, even when exceptions occur. + cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) + + # Launch a SimpleHTTPServer (serves files in the current directory). + # Test cases will request metadata and target files that have been + # pre-generated in 'tuf/tests/repository_data', which will be served by the + # SimpleHTTPServer launched here. The test cases of this unit test assume + # the pre-generated metadata files have a specific structure, such + # as a delegated role 'targets/role1', three target files, five key files, + # etc. + cls.SERVER_PORT = random.randint(30000, 45000) + command = ['python', 'simple_server.py', str(cls.SERVER_PORT)] + cls.server_process = subprocess.Popen(command, stderr=subprocess.PIPE) + logger.info('Server process started.') + logger.info('Server process id: '+str(cls.server_process.pid)) + logger.info('Serving on port: '+str(cls.SERVER_PORT)) + cls.url = 'http://localhost:'+str(cls.SERVER_PORT) + os.path.sep + + # NOTE: Following error is raised if a delay is not applied: + # + time.sleep(.2) + @classmethod + def tearDownClass(cls): + # tearDownModule() is called after all the test cases have run. + # http://docs.python.org/2/library/unittest.html#class-and-module-fixtures + + # Remove the temporary repository directory, which should contain all the + # metadata, targets, and key files generated of all the test cases. + shutil.rmtree(cls.temporary_directory) + + unittest_toolbox.Modified_TestCase.clear_toolbox() + + # Kill the SimpleHTTPServer process. + if cls.server_process.returncode is None: + logger.info('Server process '+str(cls.server_process.pid)+' terminated.') + cls.server_process.kill() -def test_replay_attack(using_tuf=False): - """ - - using_tuf: - If set to 'False' all directories that start with 'tuf_' are ignored, - indicating that tuf is not implemented. - - Illustrate replay attack vulnerability. + def setUp(self): + # We are inheriting from custom class. + unittest_toolbox.Modified_TestCase.setUp(self) + + # Copy the original repository files provided in the test folder so that + # any modifications made to repository files are restricted to the copies. + # The 'repository_data' directory is expected to exist in 'tuf/tests/'. + original_repository_files = os.path.join(os.getcwd(), os.pardir, + 'repository_data') + temporary_repository_root = \ + self.make_temp_directory(directory=self.temporary_directory) + + # The original repository, keystore, and client directories will be copied + # for each test case. + original_repository = os.path.join(original_repository_files, 'repository') + original_client = os.path.join(original_repository_files, 'client') + original_keystore = os.path.join(original_repository_files, 'keystore') + + # Save references to the often-needed client repository directories. + # Test cases need these references to access metadata and target files. + self.repository_directory = \ + os.path.join(temporary_repository_root, 'repository') + self.client_directory = os.path.join(temporary_repository_root, 'client') + self.keystore_directory = os.path.join(temporary_repository_root, 'keystore') + + # Copy the original 'repository', 'client', and 'keystore' directories + # to the temporary repository the test cases can use. + shutil.copytree(original_repository, self.repository_directory) + shutil.copytree(original_client, self.client_directory) + shutil.copytree(original_keystore, self.keystore_directory) + + # Set the url prefix required by the 'tuf/client/updater.py' updater. + # 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'. + repository_basepath = self.repository_directory[len(os.getcwd()):] + url_prefix = \ + 'http://localhost:' + str(self.SERVER_PORT) + repository_basepath + + # Setting 'tuf.conf.repository_directory' with the temporary client + # directory copied from the original repository files. + tuf.conf.repository_directory = self.client_directory + self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix, + 'metadata_path': 'metadata', + 'targets_path': 'targets', + 'confined_target_dirs': ['']}} - """ + # Create the repository instance. The test cases will use this client + # updater to refresh metadata, fetch target files, etc. + self.repository_updater = updater.Updater('test_repository', + self.repository_mirrors) - ERROR_MSG = '\tReplay Attack was Successful!\n\n' - FIRST_CONTENT = 'Test A' - SECOND_CONTENT = 'Test B' - try: - # Setup. - root_repo, url, server_proc, keyids = \ - util_test_tools.init_repo(using_tuf=using_tuf) - reg_repo = os.path.join(root_repo, 'reg_repo') - tuf_repo = os.path.join(root_repo, 'tuf_repo') - tuf_repo_copy = os.path.join(root_repo, 'tuf_repo_copy') - downloads = os.path.join(root_repo, 'downloads') - tuf_targets = os.path.join(tuf_repo, 'targets') + def tearDown(self): + # Modified_TestCase.tearDown() automatically deletes temporary files and + # directories that may have been created during each test case. + unittest_toolbox.Modified_TestCase.tearDown(self) - # Add file to 'repo' directory: {root_repo} - filepath = util_test_tools.add_file_to_repository(reg_repo, FIRST_CONTENT) - file_basename = os.path.basename(filepath) - url_to_repo = url+'reg_repo/'+file_basename - downloaded_file = os.path.join(downloads, file_basename) - # Attacker saves the original file into 'evil_dir'. - evil_dir = tempfile.mkdtemp(dir=root_repo) - original_file = os.path.join(evil_dir, file_basename) - shutil.copy(filepath, evil_dir) - if using_tuf: - # Update TUF metadata before attacker modifies anything. - util_test_tools.tuf_refresh_repo(root_repo, keyids) - # Copy the first version of the repository for replay later. - shutil.copytree(tuf_repo, tuf_repo_copy) + def test_without_tuf(self): + # Scenario: + # 'timestamp.json' specifies the latest version of the repository files. + # A client should only accept the same version number (specified in the + # file) of the metadata, or greater. A version number less than the one + # currently trusted should be rejected. A non-TUF client may use a + # different mechanism for determining versions of metadata, but version + # numbers in this integrations because that is what TUF uses. + # + # Modify the repository's timestamp.json' so that a new version is generated + # and accepted by the client, and backup the previous version. The previous + # is then returned the next time the client requests an update. A non-TUF + # client (without a way to detect older versions of metadata, and thus + # updates) is expected to download older metadata and outdated files. + # Verify that the older version of timestamp.json' is downloaded by the + # non-TUF client. - # Modify the url. Remember that the interposition will intercept - # urls that have 'localhost:9999' hostname, which was specified in - # the json interposition configuration file. Look for 'hostname' - # in 'util_test_tools.py'. Further, the 'file_basename' is the target - # path relative to 'targets_dir'. - url_to_repo = 'http://localhost:9999/'+file_basename - # End of Setup. + # Backup the current version of 'timestamp'. It will be used as the + # outdated version returned to the client. The repository tool removes + # obsolete metadadata, so do *not* save the backup version in the + # repository's metadata directory. + timestamp_path = os.path.join(self.repository_directory, 'metadata', + 'timestamp.json') + backup_timestamp = os.path.join(self.repository_directory, + 'timestamp.json.backup') + shutil.copy(timestamp_path, backup_timestamp) + + # The fileinfo of the previous version is saved to verify that it is indeed + # accepted by the non-TUF client. + length, hashes = tuf.util.get_file_details(backup_timestamp) + previous_fileinfo = tuf.formats.make_fileinfo(length, hashes) + + # Modify the timestamp file on the remote repository. + repository = repo_tool.load_repository(self.repository_directory) + key_file = os.path.join(self.keystore_directory, 'timestamp_key') + timestamp_private = repo_tool.import_rsa_privatekey_from_file(key_file, + 'password') + repository.timestamp.load_signing_key(timestamp_private) + + # Set an arbitrary expiration so that the repository tool generates a new + # version. + repository.timestamp.expiration = '2088-01-01 12:12:00' + repository.write() + + # Move the staged metadata to the "live" metadata. + shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) + shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), + os.path.join(self.repository_directory, 'metadata')) - # Client performs initial update. - _download(url=url_to_repo, filename=downloaded_file, using_tuf=using_tuf) + # Save the fileinfo of the new version generated to verify that it is + # saved by the client. + length, hashes = tuf.util.get_file_details(timestamp_path) + new_fileinfo = tuf.formats.make_fileinfo(length, hashes) - # Downloads are stored in the same directory '{root_repo}/downloads/' - # for regular and tuf clients. - downloaded_content = util_test_tools.read_file_content(downloaded_file) - if FIRST_CONTENT != downloaded_content: - raise TestSetupError('[Initial Update] Failed to download the file.') + url_prefix = self.repository_mirrors['mirror1']['url_prefix'] + url_file = os.path.join(url_prefix, 'metadata', 'timestamp.json') + client_timestamp_path = os.path.join(self.client_directory, 'metadata', + 'current', 'timestamp.json') + + urllib.urlretrieve(url_file, client_timestamp_path) + + length, hashes = tuf.util.get_file_details(client_timestamp_path) + download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + + # Verify 'download_fileinfo' is equal to the new version. + self.assertEqual(download_fileinfo, new_fileinfo) - # Developer patches the file and updates the repository. - util_test_tools.modify_file_at_repository(filepath, SECOND_CONTENT) + # Restore the previous version of 'timestamp.json' on the remote repository + # and verify that the non-TUF client downloads it (expected, but not ideal). + shutil.move(backup_timestamp, timestamp_path) + + urllib.urlretrieve(url_file, client_timestamp_path) + + length, hashes = tuf.util.get_file_details(client_timestamp_path) + download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + + # Verify 'download_fileinfo' is equal to the previous version. + self.assertEqual(download_fileinfo, previous_fileinfo) + self.assertNotEqual(download_fileinfo, new_fileinfo) - # Updating tuf repository. This will copy files from regular repository - # into tuf repository and refresh the metadata - if using_tuf: - util_test_tools.tuf_refresh_repo(root_repo, keyids) - # Client downloads the patched file. - _download(url=url_to_repo, filename=downloaded_file, using_tuf=using_tuf) - # Content of the downloaded file. - downloaded_content = util_test_tools.read_file_content(downloaded_file) - if SECOND_CONTENT != downloaded_content: - raise TestSetupError('[Update] Failed to update the file.') + def test_with_tuf(self): + # The same scenario outlined in test_without_tuf() is followed here, except + # with a TUF client (scenario description provided in the opening comment + # block of that test case.) The TUF client performs a refresh of top-level + # metadata, which also includes 'timestamp.json'. + + # Backup the current version of 'timestamp'. It will be used as the + # outdated version returned to the client. The repository tool removes + # obsolete metadadata, so do *not* save the backup version in the + # repository's metadata directory. + timestamp_path = os.path.join(self.repository_directory, 'metadata', + 'timestamp.json') + backup_timestamp = os.path.join(self.repository_directory, + 'timestamp.json.backup') + shutil.copy(timestamp_path, backup_timestamp) + + # The fileinfo of the previous version is saved to verify that it is indeed + # accepted by the non-TUF client. + length, hashes = tuf.util.get_file_details(backup_timestamp) + previous_fileinfo = tuf.formats.make_fileinfo(length, hashes) + + # Modify the timestamp file on the remote repository. + repository = repo_tool.load_repository(self.repository_directory) + key_file = os.path.join(self.keystore_directory, 'timestamp_key') + timestamp_private = repo_tool.import_rsa_privatekey_from_file(key_file, + 'password') + repository.timestamp.load_signing_key(timestamp_private) + + # Set an arbitrary expiration so that the repository tool generates a new + # version. + repository.timestamp.expiration = '2088-01-01 12:12:00' + repository.write() + + # Move the staged metadata to the "live" metadata. + shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) + shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), + os.path.join(self.repository_directory, 'metadata')) - # Attacker tries to be clever, he manages to modifies regular and tuf - # targets directory by replacing a patched file with an old one. - if using_tuf: - # Delete the current TUF repository... - shutil.rmtree(tuf_repo) - # ...and replace it with a previous copy. - shutil.move(tuf_repo_copy, tuf_repo) - else: - # Delete the current file... - util_test_tools.delete_file_at_repository(filepath) - # ...and replace it with a previous copy. - shutil.copy(original_file, reg_repo) + # Save the fileinfo of the new version generated to verify that it is + # saved by the client. + length, hashes = tuf.util.get_file_details(timestamp_path) + new_fileinfo = tuf.formats.make_fileinfo(length, hashes) + # Refresh top-level metadata, including 'timestamp.json'. Installation of + # new version of 'timestamp.json' is expected. + self.repository_updater.refresh() + + client_timestamp_path = os.path.join(self.client_directory, 'metadata', + 'current', 'timestamp.json') + length, hashes = tuf.util.get_file_details(client_timestamp_path) + download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + + # Verify 'download_fileinfo' is equal to the new version. + self.assertEqual(download_fileinfo, new_fileinfo) + + # Restore the previous version of 'timestamp.json' on the remote repository + # and verify that the non-TUF client downloads it (expected, but not ideal). + shutil.move(backup_timestamp, timestamp_path) + + # Verify that the TUF client detects replayed metadata and refuses to + # continue the update process. try: - # Client downloads the file once more. - _download(url=url_to_repo, filename=downloaded_file, using_tuf=using_tuf) + self.repository_updater.refresh() + + # Verify that the specific 'tuf.ReplayedMetadataError' is raised by each + # mirror. except tuf.NoWorkingMirrorError, exception: - replayed_metadata_attack = False - for mirror_url, mirror_error in exception.mirror_errors.iteritems(): - if isinstance(mirror_error, tuf.ReplayedMetadataError): - replayed_metadata_attack = True - break - - # In case we did not detect what was likely a replayed metadata attack, - # we reraise the exception to indicate that replayed metadata attack - # detection failed. - if not replayed_metadata_attack: raise - else: - # Check whether the attack succeeded by inspecting the content of the - # update. The update should contain 'Test NOT A'. - downloaded_content = util_test_tools.read_file_content(downloaded_file) - # If we ended up downloading replayed content, then we failed. - if FIRST_CONTENT == downloaded_content: - raise ReplayAttackAlert(ERROR_MSG) - - finally: - util_test_tools.cleanup(root_repo, server_proc) + url_prefix = self.repository_mirrors['mirror1']['url_prefix'] + url_file = os.path.join(url_prefix, 'metadata', 'timestamp.json') + + # Verify that 'timestamp.json' is the culprit. + self.assertEqual(url_file, mirror_url) + self.assertTrue(isinstance(mirror_error, tuf.ReplayedMetadataError)) - - - -try: - test_replay_attack(using_tuf=False) -except ReplayAttackAlert, exception: - print('Download without TUF fell prey to replayed metadata attack.') - - try: - test_replay_attack(using_tuf=True) - except ReplayAttackAlert, exception: - print('Download with TUF fell prey to replayed metadata attack!') - except Exception, exception: - print('Download with TUF failed due to: '+str(exception)) - else: - print('Download with TUF defended against replayed metadata attack.') -except Exception, exception: - print('Download without TUF failed due to: '+str(exception)) -else: - print('Download without TUF did NOT fail due to replayed metadata attack!') +if __name__ == '__main__': + unittest.main()