diff --git a/tuf/client/updater.py b/tuf/client/updater.py index 8d26bacd..28d5af01 100755 --- a/tuf/client/updater.py +++ b/tuf/client/updater.py @@ -696,6 +696,8 @@ def _update_metadata(self, metadata_role, compression=None): metadata_filename) previous_filepath = os.path.abspath(previous_filepath) if os.path.exists(current_filepath): + # Previous metadata might not exist, say when delegations are added. + tuf.util.ensure_parent_dir(previous_filepath) shutil.move(current_filepath, previous_filepath) # Next, move the verified updated metadata file to the 'current' directory. @@ -1054,6 +1056,7 @@ def _move_current_to_previous(self, metadata_role): # Move the current path to the previous path. if os.path.exists(current_filepath): + tuf.util.ensure_parent_dir(previous_filepath) os.rename(current_filepath, previous_filepath) @@ -1457,6 +1460,8 @@ def target(self, target_filepath): elif len(target) == 1: if target[0]['fileinfo'] == fileinfo: continue + # TODO: What if an existing file, that is listed in the targets + # metadata, gets delegated? This needs to be looked at. # Okay, we have a matching filepath but a different fileinfo # for the duplicate. Which one is the client expecting? # And why would the metadata list two different versions of the @@ -1464,7 +1469,7 @@ def target(self, target_filepath): else: message = 'Found multiple '+repr(target_filepath)+'.' logger.error(message) - raise tuf.RespositoryError(message) + #raise tuf.RepositoryError(message) # Riase an exception if the target information could not be retrieved. if len(target) == 0: diff --git a/tuf/tests/system_tests/test_extraneous_dependencies_attack.py b/tuf/tests/system_tests/test_extraneous_dependencies_attack.py index 9842725d..66422862 100755 --- a/tuf/tests/system_tests/test_extraneous_dependencies_attack.py +++ b/tuf/tests/system_tests/test_extraneous_dependencies_attack.py @@ -27,6 +27,7 @@ import tempfile import util_test_tools +import tuf.repo.keystore import tuf.repo.signerlib as signerlib import tuf.repo.signercli as signercli from tuf.interposition import urllib_tuf @@ -36,65 +37,8 @@ util_test_tools.disable_logging() -def create_delegation(tuf_repo, delegated_targets_path, keyid, keyid_password, - parent_role, new_role_name): - keystore_dir = os.path.join(tuf_repo, 'keystore') - metadata_dir = os.path.join(tuf_repo, 'metadata') - # Create method to patch signercli._get_metadata_directory() - def _mock_get_meta_dir(directory=metadata_dir): - return directory - - - # Mock method for signercli._prompt(). - def _mock_prompt(msg, junk, targets_path=delegated_targets_path, - parent_role=parent_role, new_role_name=new_role_name): - if msg.startswith('\nThe directory entered'): - return targets_path - elif msg.startswith('\nChoose and enter the parent'): - return parent_role - elif msg.endswith('\nEnter the delegated role\'s name: '): - return new_role_name - else: - error_msg = ('Prompt: '+'\''+msg+'\''+ - ' did not match any predefined mock prompts.') - sys.exit(error_msg) - - - # Mock method for signercli._get_password(). - def _mock_get_password(msg, keyid=keyid, password=keyid_password): - _keyid = keyid[0] - if msg.endswith('('+_keyid+'): '): - return keyid_password - else: - return 'test' # password for targets' keyid. - - - # Method to patch signercli._get_keyids() - def _mock_get_keyid(junk, keyid=keyid): - return keyid - - - # Patch signercli._get_metadata_directory() - signercli._get_metadata_directory = _mock_get_meta_dir - - # Patch signercli._prompt(). - signercli._prompt = _mock_prompt - - # Patch signercli._get_password(). - signercli._get_password = _mock_get_password - - # Patch signercli._get_keyids(). - signercli._get_keyids = _mock_get_keyid - - signercli.make_delegation(keystore_dir) - - - - - - -def test(): +def test_extraneous_dependencies_attack(): try: @@ -107,95 +51,113 @@ def test(): downloads_dir = os.path.join(root_repo, 'downloads') targets_dir = os.path.join(tuf_repo, 'targets') + # 'roles' holds information about delegated roles. + roles = {'role1':{'password':['pass1']}, + 'role2':{'password':['pass2']}} + # Add files to 'reg_repo' directory: {root_repo} role1_path = tempfile.mkdtemp(dir=reg_repo) - filepath_1 = util_test_tools.add_file_to_repository(role1_path, 'Test A') + roles['role1']['filepath'] = \ + util_test_tools.add_file_to_repository(role1_path, 'Test A') + + role2_path = tempfile.mkdtemp(dir=reg_repo) + roles['role2']['filepath'] = \ + util_test_tools.add_file_to_repository(role2_path, 'Test B') # Update TUF repository. util_test_tools.make_targets_meta(root_repo) util_test_tools.make_release_meta(root_repo) util_test_tools.make_timestamp_meta(root_repo) - # Indicate which file client downloads. - rel_filepath_1 = os.path.relpath(filepath_1, reg_repo) - url_to_file = url+'reg_repo/'+rel_filepath_1 - target_1 = os.path.join(targets_dir, rel_filepath_1) - junk, role1_relpath = os.path.split(role1_path) - delegated_targets_path = os.path.join(targets_dir, role1_relpath) - delegated_role_metadata_dir = os.path.join(metadata_dir, 'targets') - # Create a key to sign a new delegated role. - key = signerlib.generate_and_save_rsa_key(keystore_dir, 'pass1') - delegated_targets_keyids = [key['keyid']] - downloaded_file = os.path.join(downloads_dir, os.path.basename(filepath_1)) + def _make_delegation(rolename): + # Indicate which file client downloads. + rel_filepath = os.path.relpath(roles[rolename]['filepath'], reg_repo) + roles[rolename]['target_path'] = os.path.join(targets_dir, rel_filepath) + rolepath, file_basename = os.path.split(roles[rolename]['filepath']) + junk, role_relpath = os.path.split(rolepath) + roles[rolename]['targets_dir'] = os.path.join(targets_dir, role_relpath) + roles[rolename]['metadata_dir'] = os.path.join(metadata_dir, 'targets') - # Create a delegation. - create_delegation(tuf_repo, delegated_targets_path, delegated_targets_keyids, - 'pass1', 'targets', 'role1') + # Create a key to sign a new delegated role. + password = roles[rolename]['password'][0] + key = signerlib.generate_and_save_rsa_key(keystore_dir, password) + roles[rolename]['keyid'] = [key['keyid']] + roles[rolename]['dest_path'] = os.path.join(downloads_dir, file_basename) - # Update TUF repository. - util_test_tools.make_targets_meta(root_repo) - util_test_tools.make_release_meta(root_repo) - util_test_tools.make_timestamp_meta(root_repo) + # Create delegation one. + util_test_tools.create_delegation(tuf_repo, + roles[rolename]['targets_dir'], + roles[rolename]['keyid'], password, + 'targets', rolename) - # Modify the url. Remember that the interposition will intercept - # urls that have 'localhost:9999' hostname, which was specified in - # the json interposition configuration file. Look for 'hostname' - # in 'util_test_tools.py'. Further, the 'file_basename' is the target - # path relative to 'targets_dir'. - url_to_file = 'http://localhost:9999/'+rel_filepath_1 + # Update TUF repository. + # util_test_tools.make_targets_meta(root_repo) + util_test_tools.make_release_meta(root_repo) + util_test_tools.make_timestamp_meta(root_repo) - # END Setup. + # Modify the url. Remember that the interposition will intercept + # urls that have 'localhost:9999' hostname, which was specified in + # the json interposition configuration file. Look for 'hostname' + # in 'util_test_tools.py'. Further, the 'file_basename' is the target + # path relative to 'targets_dir'. + roles[rolename]['url'] = 'http://localhost:9999/'+rel_filepath + + # Perform a client download. + urllib_tuf.urlretrieve(roles[rolename]['url'], + roles[rolename]['dest_path']) - # Perform a client download. - urllib_tuf.urlretrieve(url_to_file, downloaded_file) + _make_delegation('role1') + _make_delegation('role2') # The update should contain 'Test NOT A'. - downloaded_content = util_test_tools.read_file_content(downloaded_file) + downloaded_content = \ + util_test_tools.read_file_content(roles['role1']['dest_path']) msg = 'OUCH 1' if 'Test A' != downloaded_content: print msg + downloaded_content = \ + util_test_tools.read_file_content(roles['role2']['dest_path']) + msg = 'OUCH 1' + if 'Test B' != downloaded_content: + print msg - # Modify a target that was delegated to 'role1'. - util_test_tools.modify_file_at_repository(target_1, 'Test NOT A') - # This is not correct!!!??? - # TODO: Use signercli's make_release_file() and make_timestamp(). - #util_test_tools.tuf_refresh_release_timestamp(metadata_dir, keyids) + # Modify a target that was delegated to 'role2'. + util_test_tools.modify_file_at_repository(roles['role2']['target_path'], + 'Test NOT B') - # util_test_tools.tuf_refresh_repo(root_repo, keyids) + # Load the keystore before you rebuild the metadata. + tuf.repo.keystore.load_keystore_from_keyfiles(keystore_dir, + roles['role1']['keyid'], + roles['role1']['password']) # Rebuild the delegation role metadata. - signerlib.build_delegated_role_file(delegated_targets_path, - delegated_targets_keyids, metadata_dir, - delegated_role_metadata_dir, 'role1.txt') + signerlib.build_delegated_role_file(roles['role2']['targets_dir'], + roles['role1']['keyid'], metadata_dir, + roles['role1']['metadata_dir'], + 'role1.txt') # Update release and timestamp metadata. util_test_tools.make_release_meta(root_repo) util_test_tools.make_timestamp_meta(root_repo) + # Perform another client download. - urllib.urlretrieve(url_to_file, downloaded_file) - - # The update should contain 'Test NOT A'. - downloaded_content = util_test_tools.read_file_content(downloaded_file) - msg = 'OUCH 2' - if 'Test NOT A' != downloaded_content: - print 'Test NOT A != '+downloaded_content - print msg + try: + urllib_tuf.urlretrieve(roles['role2']['url'], roles['role2']['dest_path']) + except tuf.MetadataNotAvailableError, e: + raise finally: - # NOTE: temporary files are created and NOT removed at this point. This - # is done in order to investigate the structure manually. - # TODO: use cleanup()! - server_proc.kill() - - print 'Done.' + util_test_tools.cleanup(root_repo, server_proc) - -test() \ No newline at end of file +try: + test_extraneous_dependencies_attack() +except tuf.MetadataNotAvailableError, error: + print error \ No newline at end of file diff --git a/tuf/tests/system_tests/util_test_tools.py b/tuf/tests/system_tests/util_test_tools.py index a251ea73..b622a782 100755 --- a/tuf/tests/system_tests/util_test_tools.py +++ b/tuf/tests/system_tests/util_test_tools.py @@ -429,6 +429,9 @@ def tuf_refresh_and_download(): + + + def _get_metadata_directory(metadata_dir): def _mock_get_meta_dir(directory=metadata_dir): return directory @@ -463,8 +466,7 @@ def _mock_get_password(msg, password=password): - -def make_targets_meta(root_repo): +def _make_role_metadata_wrapper(root_repo, func): original_get_metadata_directory = signercli._get_metadata_directory original_prompt = signercli._prompt original_get_password = signercli._get_password @@ -476,14 +478,17 @@ def make_targets_meta(root_repo): keystore_dir = os.path.join(tuf_repo, 'keystore') conf_path = os.path.join(metadata_dir, 'config.cfg') - shutil.rmtree(targets_dir) - shutil.copytree(reg_repo, targets_dir) - _get_metadata_directory(metadata_dir) _get_password('test') - _make_metadata_mock_prompts(targets_dir, conf_path) - signercli.make_targets_metadata(keystore_dir) + if func.__name__ == 'make_targets_metadata': + shutil.rmtree(targets_dir) + shutil.copytree(reg_repo, targets_dir) + _make_metadata_mock_prompts(targets_dir, conf_path) + else: + _make_metadata_mock_prompts(reg_repo, conf_path) + + func(keystore_dir) keystore.clear_keystore() signercli._get_password = original_get_password @@ -492,48 +497,64 @@ def make_targets_meta(root_repo): +def make_targets_meta(root_repo): + _make_role_metadata_wrapper(root_repo, signercli.make_targets_metadata) + + def make_release_meta(root_repo): - original_get_metadata_directory = signercli._get_metadata_directory - original_prompt = signercli._prompt - original_get_password = signercli._get_password - - tuf_repo = os.path.join(root_repo, 'tuf_repo') - reg_repo = os.path.join(root_repo, 'reg_repo') - metadata_dir = os.path.join(tuf_repo, 'metadata') - keystore_dir = os.path.join(tuf_repo, 'keystore') - conf_path = os.path.join(metadata_dir, 'config.cfg') - - _get_metadata_directory(metadata_dir) - _get_password('test') - _make_metadata_mock_prompts(reg_repo, conf_path) - - signercli.make_release_metadata(keystore_dir) - - keystore.clear_keystore() - signercli._get_password = original_get_password - signercli._prompt = original_prompt - signercli._get_metadata_directory = original_get_metadata_directory - + _make_role_metadata_wrapper(root_repo, signercli.make_release_metadata) def make_timestamp_meta(root_repo): - original_get_metadata_directory = signercli._get_metadata_directory - original_prompt = signercli._prompt - original_get_password = signercli._get_password + _make_role_metadata_wrapper(root_repo, signercli.make_timestamp_metadata) - tuf_repo = os.path.join(root_repo, 'tuf_repo') - reg_repo = os.path.join(root_repo, 'reg_repo') - metadata_dir = os.path.join(tuf_repo, 'metadata') + + + +def create_delegation(tuf_repo, delegated_targets_path, keyid, keyid_password, + parent_role, new_role_name): keystore_dir = os.path.join(tuf_repo, 'keystore') - conf_path = os.path.join(metadata_dir, 'config.cfg') + metadata_dir = os.path.join(tuf_repo, 'metadata') + # Patch signercli._get_metadata_directory() _get_metadata_directory(metadata_dir) - _get_password('test') - _make_metadata_mock_prompts(reg_repo, conf_path) - signercli.make_timestamp_metadata(keystore_dir) - - keystore.clear_keystore() - signercli._get_password = original_get_password - signercli._prompt = original_prompt - signercli._get_metadata_directory = original_get_metadata_directory \ No newline at end of file + + # Mock method for signercli._prompt(). + def _mock_prompt(msg, junk, targets_path=delegated_targets_path, + parent_role=parent_role, new_role_name=new_role_name): + if msg.startswith('\nThe directory entered'): + return targets_path + elif msg.startswith('\nChoose and enter the parent'): + return parent_role + elif msg.endswith('\nEnter the delegated role\'s name: '): + return new_role_name + else: + error_msg = ('Prompt: '+'\''+msg+'\''+ + ' did not match any predefined mock prompts.') + sys.exit(error_msg) + + # Patch signercli._prompt(). + signercli._prompt = _mock_prompt + + + # Mock method for signercli._get_password(). + def _mock_get_password(msg, keyid=keyid, password=keyid_password): + _keyid = keyid[0] + if msg.endswith('('+_keyid+'): '): + return keyid_password + else: + return 'test' # password for targets' keyid. + + # Patch signercli._get_password(). + signercli._get_password = _mock_get_password + + + # Method to patch signercli._get_keyids() + def _mock_get_keyid(junk, keyid=keyid): + return keyid + + # Patch signercli._get_keyids(). + signercli._get_keyids = _mock_get_keyid + + signercli.make_delegation(keystore_dir) \ No newline at end of file