diff --git a/tests/integration/test_replay_attack.py b/tests/integration/test_replay_attack.py index 8e12b684..284eba2d 100755 --- a/tests/integration/test_replay_attack.py +++ b/tests/integration/test_replay_attack.py @@ -5,34 +5,26 @@ test_replay_attack.py - Konstantin Andrianov + Konstantin Andrianov. - February 22, 2012 + February 22, 2012. + + April 5, 2014. + Refactored to use the 'unittest' module (test conditions in code, rather + than verifying text output), use pre-generated repository files, and + discontinue use of the old repository tools. Expanded comments. + -vladimir.v.diaz See LICENSE for licensing information. - Simulate a replay attack. A simple client update vs. client update - implementing TUF. - - In the replay attack an attacker is able to trick clients into installing - software that is older than that which the client previously knew to be - available. - - NOTE: The interposition provided by 'tuf.interposition' is used to intercept - all calls made by urllib/urillib2 to certain hostname specified in - the interposition configuration file. Look up interposition.py for more - information and illustration of a sample contents of the interposition - configuration file. Interposition was meant to make TUF integration with an - existing software updater an easy process. This allows for more flexibility - to the existing software updater. However, if you are planning to solely use - TUF there should be no need for interposition, all necessary calls will be - generated from within TUF. - - There is no difference between 'updates' and 'target' files. + Simulate a replay, or rollback, attack. In a replay attack, a client is + tricked into installing software that is older than that which the client + previously knew to be available. + Note: There is no difference between 'updates' and 'target' files. """ # Help with Python 3 compatability, where the print statement is a function, an @@ -43,162 +35,296 @@ from __future__ import division import os -import shutil import urllib import tempfile +import random +import time +import shutil +import json +import subprocess +import unittest +import logging -import tuf.interposition -import tuf.tests.util_test_tools as util_test_tools +import tuf.formats +import tuf.util +import tuf.log +import tuf.client.updater as updater +import tuf.repository_tool as repo_tool +import tuf.tests.unittest_toolbox as unittest_toolbox + +# The repository tool is imported and logs console messages by default. Disable +# console log messages generated by this unit test. +repo_tool.disable_console_log_messages() + +logger = logging.getLogger('tuf.test_replay_attack') -class TestSetupError(Exception): pass -class ReplayAttackAlert(Exception): pass +class TestReplayAttack(unittest_toolbox.Modified_TestCase): -def _download(url, filename, using_tuf=False): - if using_tuf: - tuf.interposition.urllib_tuf.urlretrieve(url, filename) + @classmethod + def setUpClass(cls): + # setUpClass() is called before any of the test cases are executed. - else: - urllib.urlretrieve(url, filename) + # Create a temporary directory to store the repository, metadata, and target + # files. 'temporary_directory' must be deleted in TearDownModule() so that + # temporary files are always removed, even when exceptions occur. + cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) + + # Launch a SimpleHTTPServer (serves files in the current directory). + # Test cases will request metadata and target files that have been + # pre-generated in 'tuf/tests/repository_data', which will be served by the + # SimpleHTTPServer launched here. The test cases of this unit test assume + # the pre-generated metadata files have a specific structure, such + # as a delegated role 'targets/role1', three target files, five key files, + # etc. + cls.SERVER_PORT = random.randint(30000, 45000) + command = ['python', 'simple_server.py', str(cls.SERVER_PORT)] + cls.server_process = subprocess.Popen(command, stderr=subprocess.PIPE) + logger.info('Server process started.') + logger.info('Server process id: '+str(cls.server_process.pid)) + logger.info('Serving on port: '+str(cls.SERVER_PORT)) + cls.url = 'http://localhost:'+str(cls.SERVER_PORT) + os.path.sep + + # NOTE: Following error is raised if a delay is not applied: + # + time.sleep(.2) + @classmethod + def tearDownClass(cls): + # tearDownModule() is called after all the test cases have run. + # http://docs.python.org/2/library/unittest.html#class-and-module-fixtures + + # Remove the temporary repository directory, which should contain all the + # metadata, targets, and key files generated of all the test cases. + shutil.rmtree(cls.temporary_directory) + + unittest_toolbox.Modified_TestCase.clear_toolbox() + + # Kill the SimpleHTTPServer process. + if cls.server_process.returncode is None: + logger.info('Server process '+str(cls.server_process.pid)+' terminated.') + cls.server_process.kill() -def test_replay_attack(using_tuf=False): - """ - - using_tuf: - If set to 'False' all directories that start with 'tuf_' are ignored, - indicating that tuf is not implemented. - - Illustrate replay attack vulnerability. + def setUp(self): + # We are inheriting from custom class. + unittest_toolbox.Modified_TestCase.setUp(self) + + # Copy the original repository files provided in the test folder so that + # any modifications made to repository files are restricted to the copies. + # The 'repository_data' directory is expected to exist in 'tuf/tests/'. + original_repository_files = os.path.join(os.getcwd(), os.pardir, + 'repository_data') + temporary_repository_root = \ + self.make_temp_directory(directory=self.temporary_directory) + + # The original repository, keystore, and client directories will be copied + # for each test case. + original_repository = os.path.join(original_repository_files, 'repository') + original_client = os.path.join(original_repository_files, 'client') + original_keystore = os.path.join(original_repository_files, 'keystore') + + # Save references to the often-needed client repository directories. + # Test cases need these references to access metadata and target files. + self.repository_directory = \ + os.path.join(temporary_repository_root, 'repository') + self.client_directory = os.path.join(temporary_repository_root, 'client') + self.keystore_directory = os.path.join(temporary_repository_root, 'keystore') + + # Copy the original 'repository', 'client', and 'keystore' directories + # to the temporary repository the test cases can use. + shutil.copytree(original_repository, self.repository_directory) + shutil.copytree(original_client, self.client_directory) + shutil.copytree(original_keystore, self.keystore_directory) + + # Set the url prefix required by the 'tuf/client/updater.py' updater. + # 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'. + repository_basepath = self.repository_directory[len(os.getcwd()):] + url_prefix = \ + 'http://localhost:' + str(self.SERVER_PORT) + repository_basepath + + # Setting 'tuf.conf.repository_directory' with the temporary client + # directory copied from the original repository files. + tuf.conf.repository_directory = self.client_directory + self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix, + 'metadata_path': 'metadata', + 'targets_path': 'targets', + 'confined_target_dirs': ['']}} - """ + # Create the repository instance. The test cases will use this client + # updater to refresh metadata, fetch target files, etc. + self.repository_updater = updater.Updater('test_repository', + self.repository_mirrors) - ERROR_MSG = '\tReplay Attack was Successful!\n\n' - FIRST_CONTENT = 'Test A' - SECOND_CONTENT = 'Test B' - try: - # Setup. - root_repo, url, server_proc, keyids = \ - util_test_tools.init_repo(using_tuf=using_tuf) - reg_repo = os.path.join(root_repo, 'reg_repo') - tuf_repo = os.path.join(root_repo, 'tuf_repo') - tuf_repo_copy = os.path.join(root_repo, 'tuf_repo_copy') - downloads = os.path.join(root_repo, 'downloads') - tuf_targets = os.path.join(tuf_repo, 'targets') + def tearDown(self): + # Modified_TestCase.tearDown() automatically deletes temporary files and + # directories that may have been created during each test case. + unittest_toolbox.Modified_TestCase.tearDown(self) - # Add file to 'repo' directory: {root_repo} - filepath = util_test_tools.add_file_to_repository(reg_repo, FIRST_CONTENT) - file_basename = os.path.basename(filepath) - url_to_repo = url+'reg_repo/'+file_basename - downloaded_file = os.path.join(downloads, file_basename) - # Attacker saves the original file into 'evil_dir'. - evil_dir = tempfile.mkdtemp(dir=root_repo) - original_file = os.path.join(evil_dir, file_basename) - shutil.copy(filepath, evil_dir) - if using_tuf: - # Update TUF metadata before attacker modifies anything. - util_test_tools.tuf_refresh_repo(root_repo, keyids) - # Copy the first version of the repository for replay later. - shutil.copytree(tuf_repo, tuf_repo_copy) + def test_without_tuf(self): + # Scenario: + # 'timestamp.json' specifies the latest version of the repository files. + # A client should only accept the same version number (specified in the + # file) of the metadata, or greater. A version number less than the one + # currently trusted should be rejected. A non-TUF client may use a + # different mechanism for determining versions of metadata, but version + # numbers in this integrations because that is what TUF uses. + # + # Modify the repository's timestamp.json' so that a new version is generated + # and accepted by the client, and backup the previous version. The previous + # is then returned the next time the client requests an update. A non-TUF + # client (without a way to detect older versions of metadata, and thus + # updates) is expected to download older metadata and outdated files. + # Verify that the older version of timestamp.json' is downloaded by the + # non-TUF client. - # Modify the url. Remember that the interposition will intercept - # urls that have 'localhost:9999' hostname, which was specified in - # the json interposition configuration file. Look for 'hostname' - # in 'util_test_tools.py'. Further, the 'file_basename' is the target - # path relative to 'targets_dir'. - url_to_repo = 'http://localhost:9999/'+file_basename - # End of Setup. + # Backup the current version of 'timestamp'. It will be used as the + # outdated version returned to the client. The repository tool removes + # obsolete metadadata, so do *not* save the backup version in the + # repository's metadata directory. + timestamp_path = os.path.join(self.repository_directory, 'metadata', + 'timestamp.json') + backup_timestamp = os.path.join(self.repository_directory, + 'timestamp.json.backup') + shutil.copy(timestamp_path, backup_timestamp) + + # The fileinfo of the previous version is saved to verify that it is indeed + # accepted by the non-TUF client. + length, hashes = tuf.util.get_file_details(backup_timestamp) + previous_fileinfo = tuf.formats.make_fileinfo(length, hashes) + + # Modify the timestamp file on the remote repository. + repository = repo_tool.load_repository(self.repository_directory) + key_file = os.path.join(self.keystore_directory, 'timestamp_key') + timestamp_private = repo_tool.import_rsa_privatekey_from_file(key_file, + 'password') + repository.timestamp.load_signing_key(timestamp_private) + + # Set an arbitrary expiration so that the repository tool generates a new + # version. + repository.timestamp.expiration = '2088-01-01 12:12:00' + repository.write() + + # Move the staged metadata to the "live" metadata. + shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) + shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), + os.path.join(self.repository_directory, 'metadata')) - # Client performs initial update. - _download(url=url_to_repo, filename=downloaded_file, using_tuf=using_tuf) + # Save the fileinfo of the new version generated to verify that it is + # saved by the client. + length, hashes = tuf.util.get_file_details(timestamp_path) + new_fileinfo = tuf.formats.make_fileinfo(length, hashes) - # Downloads are stored in the same directory '{root_repo}/downloads/' - # for regular and tuf clients. - downloaded_content = util_test_tools.read_file_content(downloaded_file) - if FIRST_CONTENT != downloaded_content: - raise TestSetupError('[Initial Update] Failed to download the file.') + url_prefix = self.repository_mirrors['mirror1']['url_prefix'] + url_file = os.path.join(url_prefix, 'metadata', 'timestamp.json') + client_timestamp_path = os.path.join(self.client_directory, 'metadata', + 'current', 'timestamp.json') + + urllib.urlretrieve(url_file, client_timestamp_path) + + length, hashes = tuf.util.get_file_details(client_timestamp_path) + download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + + # Verify 'download_fileinfo' is equal to the new version. + self.assertEqual(download_fileinfo, new_fileinfo) - # Developer patches the file and updates the repository. - util_test_tools.modify_file_at_repository(filepath, SECOND_CONTENT) + # Restore the previous version of 'timestamp.json' on the remote repository + # and verify that the non-TUF client downloads it (expected, but not ideal). + shutil.move(backup_timestamp, timestamp_path) + + urllib.urlretrieve(url_file, client_timestamp_path) + + length, hashes = tuf.util.get_file_details(client_timestamp_path) + download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + + # Verify 'download_fileinfo' is equal to the previous version. + self.assertEqual(download_fileinfo, previous_fileinfo) + self.assertNotEqual(download_fileinfo, new_fileinfo) - # Updating tuf repository. This will copy files from regular repository - # into tuf repository and refresh the metadata - if using_tuf: - util_test_tools.tuf_refresh_repo(root_repo, keyids) - # Client downloads the patched file. - _download(url=url_to_repo, filename=downloaded_file, using_tuf=using_tuf) - # Content of the downloaded file. - downloaded_content = util_test_tools.read_file_content(downloaded_file) - if SECOND_CONTENT != downloaded_content: - raise TestSetupError('[Update] Failed to update the file.') + def test_with_tuf(self): + # The same scenario outlined in test_without_tuf() is followed here, except + # with a TUF client (scenario description provided in the opening comment + # block of that test case.) The TUF client performs a refresh of top-level + # metadata, which also includes 'timestamp.json'. + + # Backup the current version of 'timestamp'. It will be used as the + # outdated version returned to the client. The repository tool removes + # obsolete metadadata, so do *not* save the backup version in the + # repository's metadata directory. + timestamp_path = os.path.join(self.repository_directory, 'metadata', + 'timestamp.json') + backup_timestamp = os.path.join(self.repository_directory, + 'timestamp.json.backup') + shutil.copy(timestamp_path, backup_timestamp) + + # The fileinfo of the previous version is saved to verify that it is indeed + # accepted by the non-TUF client. + length, hashes = tuf.util.get_file_details(backup_timestamp) + previous_fileinfo = tuf.formats.make_fileinfo(length, hashes) + + # Modify the timestamp file on the remote repository. + repository = repo_tool.load_repository(self.repository_directory) + key_file = os.path.join(self.keystore_directory, 'timestamp_key') + timestamp_private = repo_tool.import_rsa_privatekey_from_file(key_file, + 'password') + repository.timestamp.load_signing_key(timestamp_private) + + # Set an arbitrary expiration so that the repository tool generates a new + # version. + repository.timestamp.expiration = '2088-01-01 12:12:00' + repository.write() + + # Move the staged metadata to the "live" metadata. + shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) + shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), + os.path.join(self.repository_directory, 'metadata')) - # Attacker tries to be clever, he manages to modifies regular and tuf - # targets directory by replacing a patched file with an old one. - if using_tuf: - # Delete the current TUF repository... - shutil.rmtree(tuf_repo) - # ...and replace it with a previous copy. - shutil.move(tuf_repo_copy, tuf_repo) - else: - # Delete the current file... - util_test_tools.delete_file_at_repository(filepath) - # ...and replace it with a previous copy. - shutil.copy(original_file, reg_repo) + # Save the fileinfo of the new version generated to verify that it is + # saved by the client. + length, hashes = tuf.util.get_file_details(timestamp_path) + new_fileinfo = tuf.formats.make_fileinfo(length, hashes) + # Refresh top-level metadata, including 'timestamp.json'. Installation of + # new version of 'timestamp.json' is expected. + self.repository_updater.refresh() + + client_timestamp_path = os.path.join(self.client_directory, 'metadata', + 'current', 'timestamp.json') + length, hashes = tuf.util.get_file_details(client_timestamp_path) + download_fileinfo = tuf.formats.make_fileinfo(length, hashes) + + # Verify 'download_fileinfo' is equal to the new version. + self.assertEqual(download_fileinfo, new_fileinfo) + + # Restore the previous version of 'timestamp.json' on the remote repository + # and verify that the non-TUF client downloads it (expected, but not ideal). + shutil.move(backup_timestamp, timestamp_path) + + # Verify that the TUF client detects replayed metadata and refuses to + # continue the update process. try: - # Client downloads the file once more. - _download(url=url_to_repo, filename=downloaded_file, using_tuf=using_tuf) + self.repository_updater.refresh() + + # Verify that the specific 'tuf.ReplayedMetadataError' is raised by each + # mirror. except tuf.NoWorkingMirrorError, exception: - replayed_metadata_attack = False - for mirror_url, mirror_error in exception.mirror_errors.iteritems(): - if isinstance(mirror_error, tuf.ReplayedMetadataError): - replayed_metadata_attack = True - break - - # In case we did not detect what was likely a replayed metadata attack, - # we reraise the exception to indicate that replayed metadata attack - # detection failed. - if not replayed_metadata_attack: raise - else: - # Check whether the attack succeeded by inspecting the content of the - # update. The update should contain 'Test NOT A'. - downloaded_content = util_test_tools.read_file_content(downloaded_file) - # If we ended up downloading replayed content, then we failed. - if FIRST_CONTENT == downloaded_content: - raise ReplayAttackAlert(ERROR_MSG) - - finally: - util_test_tools.cleanup(root_repo, server_proc) + url_prefix = self.repository_mirrors['mirror1']['url_prefix'] + url_file = os.path.join(url_prefix, 'metadata', 'timestamp.json') + + # Verify that 'timestamp.json' is the culprit. + self.assertEqual(url_file, mirror_url) + self.assertTrue(isinstance(mirror_error, tuf.ReplayedMetadataError)) - - - -try: - test_replay_attack(using_tuf=False) -except ReplayAttackAlert, exception: - print('Download without TUF fell prey to replayed metadata attack.') - - try: - test_replay_attack(using_tuf=True) - except ReplayAttackAlert, exception: - print('Download with TUF fell prey to replayed metadata attack!') - except Exception, exception: - print('Download with TUF failed due to: '+str(exception)) - else: - print('Download with TUF defended against replayed metadata attack.') -except Exception, exception: - print('Download without TUF failed due to: '+str(exception)) -else: - print('Download without TUF did NOT fail due to replayed metadata attack!') +if __name__ == '__main__': + unittest.main() diff --git a/tuf/formats.py b/tuf/formats.py index f79a52e7..50e5bbbc 100755 --- a/tuf/formats.py +++ b/tuf/formats.py @@ -158,8 +158,9 @@ # http://www.emc.com/emc-plus/rsa-labs/historical/twirl-and-rsa-key-size.htm#table1 RSAKEYBITS_SCHEMA = SCHEMA.Integer(lo=2048) -# The number of bins used to delegate to hashed roles. -NUMBINS_SCHEMA = SCHEMA.Integer(lo=16) +# The number of bins, or the requested number of delegated hashed roles. +# Expected to be a power of 2. +NUMBINS_SCHEMA = SCHEMA.Integer(lo=1) # A PyCrypto signature. PYCRYPTOSIGNATURE_SCHEMA = SCHEMA.AnyString() diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index 27e12c6d..69211c4e 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -445,7 +445,7 @@ def status(self): @staticmethod - def get_filepaths_in_directory(self, files_directory, recursive_walk=False, + def get_filepaths_in_directory(files_directory, recursive_walk=False, followlinks=True): """ @@ -2151,7 +2151,7 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins, number_of_bins=1024): """ - Distribute a large number of target files into multiple delegated roles + Distribute a large number of target files over multiple delegated roles (hashed bins). The metadata files of delegated roles will be nearly equal in size (i.e., 'list_of_targets' is uniformly distributed by calculating the target filepath's hash and determing which bin it should reside in. @@ -2184,14 +2184,14 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins, number_of_bins: The number of delegated roles, or hashed bins, that should be generated and contain the target file attributes listed in 'list_of_targets'. - 'number_of_bins' must be a multiple of 16. Each bin may contain a + 'number_of_bins' must be a power of 2. Each bin may contain a range of path hash prefixes (e.g., target filepath digests that range from [000]... - [003]..., where the series of digits in brackets is considered the hash prefix). tuf.FormatError, if the arguments are improperly formatted, - 'number_of_bins' is not a multiple of 16, or one of the targets + 'number_of_bins' is not a power of 2, or one of the targets in 'list_of_targets' is not located under the repository's targets directory. @@ -2210,28 +2210,35 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins, tuf.formats.ANYKEYLIST_SCHEMA.check_match(keys_of_hashed_bins) tuf.formats.NUMBINS_SCHEMA.check_match(number_of_bins) - # Determine the hex number of hashed bins from 'number_of_bins' and the - # maximum number of bins provided by the total number of hex digits needed. - # Strip the '0x' from the Python hex representation. 'prefix_length' - # and 'max_number_of_bins' affect hashed bin rolenames and the range of - # prefixes of each bin. + # Convert 'number_of_bins' to hexadecimal and determine the number of + # hexadecimal digits needed by each hash prefix. Calculate the total number + # of hash prefixes (e.g., 000 - FFF total values) to be spread over + # 'number_of_bins' and strip the first two characters ('0x') from Python's + # representation of hexadecimal values (so that they are not used in + # the calculation of the prefix length.) + # Example: number_of_bins = 32, total_hash_prefixes = 256, and each hashed + # bin is responsible for 8 hash prefixes. + # Hashed bin roles created = 00-07.json, 08-0f.json, ..., f8-ff.json. prefix_length = len(hex(number_of_bins - 1)[2:]) - max_number_of_bins = 16 ** prefix_length + total_hash_prefixes = 16 ** prefix_length - # For simplicity, ensure that we can evenly distribute 'max_number_of_bins' - # over 'number_of_bins'. Each bin will contain - # max_number_of_bin/number_of_bins hash prefixes. - if max_number_of_bins % number_of_bins != 0: - message = 'The number of bins argument must be a multiple of 16.' + # For simplicity, ensure that 'total_hash_prefixes' (16 ^ n) can be evenly + # distributed over 'number_of_bins' (must be 2 ^ n). Each bin will contain + # (total_hash_prefixes / number_of_bins) hash prefixes. + if total_hash_prefixes % number_of_bins != 0: + message = 'The "number_of_bins" argument must be a power of 2.' raise tuf.FormatError(message) - logger.info('There are '+str(len(list_of_targets))+' total targets.') + logger.info('Creating hashed bin delegations.') + logger.info(repr(len(list_of_targets)) + ' total targets.') + logger.info(repr(number_of_bins) + ' hashed bins.') + logger.info(repr(total_hash_prefixes) + ' total hash prefixes.') # Store the target paths that fall into each bin. The digest of the # target path, reduced to the first 'prefix_length' hex digits, is # calculated to determine which 'bin_index' is should go. target_paths_in_bin = {} - for bin_index in xrange(max_number_of_bins): + for bin_index in xrange(total_hash_prefixes): target_paths_in_bin[bin_index] = [] # Assign every path to its bin. Ensure every target is located under the @@ -2261,16 +2268,18 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins, # later added to the targets of the 'bin_index' role. target_paths_in_bin[bin_index].append(target_path) - # Calculate the path hash prefixes of each bin_offset stored in the parent + # Calculate the path hash prefixes of each 'bin_offset' stored in the parent # role. For example: 'targets/unclaimed/000-003' may list the path hash # prefixes "000", "001", "002", "003" in the delegations dict of # 'targets/unclaimed'. - bin_offset = max_number_of_bins // number_of_bins - + bin_offset = total_hash_prefixes // number_of_bins + + logger.info('Each bin ranges over ' + repr(bin_offset) + ' hash prefixes.') + # The parent roles will list bin roles starting from "0" to - # 'max_number_of_bins' in 'bin_offset' increments. The skipped bin roles + # 'total_hash_prefixes' in 'bin_offset' increments. The skipped bin roles # are listed in 'path_hash_prefixes' of 'outer_bin_index. - for outer_bin_index in xrange(0, max_number_of_bins, bin_offset): + for outer_bin_index in xrange(0, total_hash_prefixes, bin_offset): # The bin index is hex padded from the left with zeroes for up to the # 'prefix_length' (e.g., 'targets/unclaimed/000-003'). Ensure the correct # hash bin name is generated if a prefix range is unneeded.