From eeb03833764c8f6d027bf3e7ea5cdbc281d3f2fe Mon Sep 17 00:00:00 2001 From: Kon Date: Wed, 6 Mar 2013 08:00:03 -0500 Subject: [PATCH] Added test_extraneous_dependencies_attack.py --- tuf/repo/signerlib.py | 78 ++++++++ .../test_extraneous_dependencies_attack.py | 189 ++++++++++++++++++ tuf/tests/system_tests/util_test_tools.py | 13 +- 3 files changed, 279 insertions(+), 1 deletion(-) create mode 100644 tuf/tests/system_tests/test_extraneous_dependencies_attack.py diff --git a/tuf/repo/signerlib.py b/tuf/repo/signerlib.py index 98b88a1d..2ed337a3 100755 --- a/tuf/repo/signerlib.py +++ b/tuf/repo/signerlib.py @@ -1138,3 +1138,81 @@ def build_timestamp_file(timestamp_keyids, metadata_directory): signable = sign_metadata(timestamp_metadata, timestamp_keyids, timestamp_filepath) return write_metadata_file(signable, timestamp_filepath) + + + + + +def build_delegated_role_file(delegated_targets_directory, delegated_keyids, + metadata_directory, delegation_metadata_directory, + delegation_role_name): + """ + + Build the targets metadata file using the signing keys in 'targets_keyids'. + The generated metadata file is saved to 'metadata_directory'. The target + files located in 'targets_directory' will be tracked by the built targets + metadata. + + + delegated_targets_directory: + The directory (absolute path) containing all the delegated target + files. + + delegated_keyids: + The list of keyids to be used as the signing keys for the delegated + role file. + + metadata_directory: + The metadata directory (absolute path) containing all the metadata files. + + delegation_metadata_directory: + The location of the delegated role's metadata. + + delegation_role_name: + The delegated role's file name ending in '.txt'. Ex: 'role1.txt' + + + tuf.FormatError, if any of the arguments are improperly formatted. + + tuf.Error, if there was an error while building the targets file. + + + The targets metadata file is written to a file. + + + The path for the written targets metadata file. + + """ + + # Do the arguments have the correct format? + # Raise 'tuf.FormatError' if there is a mismatch. + tuf.formats.PATH_SCHEMA.check_match(delegated_targets_directory) + tuf.formats.KEYIDS_SCHEMA.check_match(delegated_keyids) + tuf.formats.PATH_SCHEMA.check_match(metadata_directory) + tuf.formats.PATH_SCHEMA.check_match(delegation_metadata_directory) + tuf.formats.NAME_SCHEMA.check_match(delegation_role_name) + + # Check if 'targets_directory' and 'metadata_directory' are valid. + targets_directory = check_directory(delegated_targets_directory) + metadata_directory = check_directory(metadata_directory) + + repository_directory, junk = os.path.split(metadata_directory) + repository_directory_length = len(repository_directory) + + # Get the list of targets. + targets = [] + for root, directories, files in os.walk(targets_directory): + for target_file in files: + # Note: '+1' in the line below is there to remove '/'. + filename = os.path.join(root, target_file)[repository_directory_length+1:] + targets.append(filename) + + # Create the targets metadata object. + targets_metadata = generate_targets_metadata(repository_directory, targets) + + # Sign it. + targets_filepath = os.path.join(delegation_metadata_directory, + delegation_role_name) + signable = sign_metadata(targets_metadata, delegated_keyids, targets_filepath) + + return write_metadata_file(signable, targets_filepath) \ No newline at end of file diff --git a/tuf/tests/system_tests/test_extraneous_dependencies_attack.py b/tuf/tests/system_tests/test_extraneous_dependencies_attack.py new file mode 100644 index 00000000..485b2791 --- /dev/null +++ b/tuf/tests/system_tests/test_extraneous_dependencies_attack.py @@ -0,0 +1,189 @@ +""" + + test_extraneous_dependencies_attack.py + + + Konstantin Andrianov + + + February 19, 2012 + + + See LICENSE for licensing information. + + + + + 1) Create a delegation role. + 2) Make sure a client is able to download changes made by a delegation role. + 3) Make sure a client is unable to download changes made by a delegation to + to parts of repository that he has no authority over. + +""" + +import os +import sys +import urllib +import tempfile + +import util_test_tools +import tuf.interposition +import tuf.repo.signerlib as signerlib +import tuf.repo.signercli as signercli + + + + + +def create_delegation(tuf_repo, delegated_targets_path, keyid, keyid_password, + parent_role, new_role_name): + keystore_dir = os.path.join(tuf_repo, 'keystore') + metadata_dir = os.path.join(tuf_repo, 'metadata') + + # Create method to patch signercli._get_metadata_directory() + def _mock_get_meta_dir(directory=metadata_dir): + return directory + + + # Mock method for signercli._prompt(). + def _mock_prompt(msg, junk, targets_path=delegated_targets_path, + parent_role=parent_role, new_role_name=new_role_name): + if msg.startswith('\nThe directory entered'): + return targets_path + elif msg.startswith('\nChoose and enter the parent'): + return parent_role + elif msg.endswith('\nEnter the delegated role\'s name: '): + return new_role_name + else: + error_msg = ('Prompt: '+'\''+msg+'\''+ + ' did not match any predefined mock prompts.') + sys.exit(error_msg) + + + # Mock method for signercli._get_password(). + def _mock_get_password(msg, keyid=keyid, password=keyid_password): + _keyid = keyid[0] + if msg.endswith('('+_keyid+'): '): + return keyid_password + else: + return 'test' # password for targets' keyid. + + + # Method to patch signercli._get_keyids() + def _mock_get_keyid(junk, keyid=keyid): + return keyid + + + # Patch signercli._get_metadata_directory() + signercli._get_metadata_directory = _mock_get_meta_dir + + # Patch signercli._prompt(). + signercli._prompt = _mock_prompt + + # Patch signercli._get_password(). + signercli._get_password = _mock_get_password + + # Patch signercli._get_keyids(). + signercli._get_keyids = _mock_get_keyid + + signercli.make_delegation(keystore_dir) + + + + + + +def test(): + + try: + + # Setup. + root_repo, url, server_proc, keyids, interpose_json = \ + util_test_tools.init_repo(tuf=True) + + # Implement interposition! + tuf.interposition.configure(interpose_json) + tuf.interposition.interpose() + + reg_repo = os.path.join(root_repo, 'reg_repo') + tuf_repo = os.path.join(root_repo, 'tuf_repo') + keystore_dir = os.path.join(tuf_repo, 'keystore') + metadata_dir = os.path.join(tuf_repo, 'metadata') + downloads_dir = os.path.join(root_repo, 'downloads') + targets_dir = os.path.join(tuf_repo, 'targets') + + # Add two files to 'reg_repo' directory: {root_repo} + role1_path = tempfile.mkdtemp(dir=reg_repo) + filepath_1 = util_test_tools.add_file_to_repository(role1_path, 'Test A') + # Update TUF repository. + util_test_tools.tuf_refresh_repo(root_repo, keyids) + rel_filepath_1 = os.path.relpath(filepath_1, reg_repo) + # Indicate which file client downloads. + url_to_file = url+'reg_repo/'+rel_filepath_1 + target_1 = os.path.join(targets_dir, rel_filepath_1) + junk, role1_relpath = os.path.split(role1_path) + delegated_targets_path = os.path.join(targets_dir, role1_relpath) + delegated_role_metadata_dir = os.path.join(metadata_dir, 'targets') + # Create a key to sign a new delegated role. + key = signerlib.generate_and_save_rsa_key(keystore_dir, 'pass1') + delegated_targets_keyids = [key['keyid']] + downloaded_file = os.path.join(downloads_dir, os.path.basename(filepath_1)) + + # Create a delegation. + create_delegation(tuf_repo, delegated_targets_path, delegated_targets_keyids, + 'pass1', 'targets', 'role1') + util_test_tools.tuf_refresh_repo(root_repo, keyids) + + # END Setup. + + + # Perform a client download. + urllib.urlretrieve(url_to_file, downloaded_file) + + # Check whether the attack succeeded by inspecting the content of the + # update. The update should contain 'Test NOT A'. + downloaded_content = util_test_tools.read_file_content(downloaded_file) + msg = 'OUCH 1' + if 'Test A' != downloaded_content: + print msg + + + # Modify a target that was delegated to 'role1'. + util_test_tools.modify_file_at_repository(target_1, 'Test NOT A') + + # This is not correct!!!??? + util_test_tools.tuf_refresh_release_timestamp(metadata_dir, keyids) + + # util_test_tools.tuf_refresh_repo(root_repo, keyids) + + # Rebuild the delegation role metadata. + signerlib.build_delegated_role_file(delegated_targets_path, + delegated_targets_keyids, metadata_dir, + delegated_role_metadata_dir, 'role1.txt') + + + # Perform another client download. + urllib.urlretrieve(url_to_file, downloaded_file) + + # Check whether the attack succeeded by inspecting the content of the + # update. The update should contain 'Test NOT A'. + downloaded_content = util_test_tools.read_file_content(downloaded_file) + msg = 'OUCH 2' + if 'Test NOT A' != downloaded_content: + print msg + + + finally: + tuf.interposition.go_away() + + # NOTE: temporary files are created and NOT removed at this point. This + # is done in order to investigate the structure manually. + # TODO: use cleanup()! + server_proc.kill() + + print 'Done.' + + + + +test() \ No newline at end of file diff --git a/tuf/tests/system_tests/util_test_tools.py b/tuf/tests/system_tests/util_test_tools.py index ed4758cf..30d0849d 100644 --- a/tuf/tests/system_tests/util_test_tools.py +++ b/tuf/tests/system_tests/util_test_tools.py @@ -358,7 +358,7 @@ def init_tuf(root_repo, url): "confined_target_dirs": [ "" ]}}, "target_paths": [ { "(.*\\.html)": "{0}" } ]}}} - junk, interpose_json = tempfile.mkstemp(dir=root_repo) + junk, interpose_json = tempfile.mkstemp(prefix='conf_', dir=root_repo) with open(interpose_json, 'wb') as fileobj: tuf.util.json.dump(interposition_dict, fileobj) @@ -398,3 +398,14 @@ def tuf_refresh_repo(root_repo, keyids): # Regenerate the 'timestamp.txt' metadata file. signerlib.build_timestamp_file(keyids, metadata_dir) + + + + + +def tuf_refresh_release_timestamp(metadata_dir, keyids): + # Regenerate the 'release.txt' metadata file. + signerlib.build_release_file(keyids, metadata_dir) + + # Regenerate the 'timestamp.txt' metadata file. + signerlib.build_timestamp_file(keyids, metadata_dir) \ No newline at end of file