Added test_extraneous_dependencies_attack.py

This commit is contained in:
Kon 2013-03-06 08:00:03 -05:00
parent 31b97634b4
commit eeb0383376
3 changed files with 279 additions and 1 deletions

View file

@ -1138,3 +1138,81 @@ def build_timestamp_file(timestamp_keyids, metadata_directory):
signable = sign_metadata(timestamp_metadata, timestamp_keyids, timestamp_filepath)
return write_metadata_file(signable, timestamp_filepath)
def build_delegated_role_file(delegated_targets_directory, delegated_keyids,
metadata_directory, delegation_metadata_directory,
delegation_role_name):
"""
<Purpose>
Build the targets metadata file using the signing keys in 'targets_keyids'.
The generated metadata file is saved to 'metadata_directory'. The target
files located in 'targets_directory' will be tracked by the built targets
metadata.
<Arguments>
delegated_targets_directory:
The directory (absolute path) containing all the delegated target
files.
delegated_keyids:
The list of keyids to be used as the signing keys for the delegated
role file.
metadata_directory:
The metadata directory (absolute path) containing all the metadata files.
delegation_metadata_directory:
The location of the delegated role's metadata.
delegation_role_name:
The delegated role's file name ending in '.txt'. Ex: 'role1.txt'
<Exceptions>
tuf.FormatError, if any of the arguments are improperly formatted.
tuf.Error, if there was an error while building the targets file.
<Side Effects>
The targets metadata file is written to a file.
<Returns>
The path for the written targets metadata file.
"""
# Do the arguments have the correct format?
# Raise 'tuf.FormatError' if there is a mismatch.
tuf.formats.PATH_SCHEMA.check_match(delegated_targets_directory)
tuf.formats.KEYIDS_SCHEMA.check_match(delegated_keyids)
tuf.formats.PATH_SCHEMA.check_match(metadata_directory)
tuf.formats.PATH_SCHEMA.check_match(delegation_metadata_directory)
tuf.formats.NAME_SCHEMA.check_match(delegation_role_name)
# Check if 'targets_directory' and 'metadata_directory' are valid.
targets_directory = check_directory(delegated_targets_directory)
metadata_directory = check_directory(metadata_directory)
repository_directory, junk = os.path.split(metadata_directory)
repository_directory_length = len(repository_directory)
# Get the list of targets.
targets = []
for root, directories, files in os.walk(targets_directory):
for target_file in files:
# Note: '+1' in the line below is there to remove '/'.
filename = os.path.join(root, target_file)[repository_directory_length+1:]
targets.append(filename)
# Create the targets metadata object.
targets_metadata = generate_targets_metadata(repository_directory, targets)
# Sign it.
targets_filepath = os.path.join(delegation_metadata_directory,
delegation_role_name)
signable = sign_metadata(targets_metadata, delegated_keyids, targets_filepath)
return write_metadata_file(signable, targets_filepath)

View file

@ -0,0 +1,189 @@
"""
<Program Name>
test_extraneous_dependencies_attack.py
<Author>
Konstantin Andrianov
<Started>
February 19, 2012
<Copyright>
See LICENSE for licensing information.
<Purpose>
<Plan>
1) Create a delegation role.
2) Make sure a client is able to download changes made by a delegation role.
3) Make sure a client is unable to download changes made by a delegation to
to parts of repository that he has no authority over.
"""
import os
import sys
import urllib
import tempfile
import util_test_tools
import tuf.interposition
import tuf.repo.signerlib as signerlib
import tuf.repo.signercli as signercli
def create_delegation(tuf_repo, delegated_targets_path, keyid, keyid_password,
parent_role, new_role_name):
keystore_dir = os.path.join(tuf_repo, 'keystore')
metadata_dir = os.path.join(tuf_repo, 'metadata')
# Create method to patch signercli._get_metadata_directory()
def _mock_get_meta_dir(directory=metadata_dir):
return directory
# Mock method for signercli._prompt().
def _mock_prompt(msg, junk, targets_path=delegated_targets_path,
parent_role=parent_role, new_role_name=new_role_name):
if msg.startswith('\nThe directory entered'):
return targets_path
elif msg.startswith('\nChoose and enter the parent'):
return parent_role
elif msg.endswith('\nEnter the delegated role\'s name: '):
return new_role_name
else:
error_msg = ('Prompt: '+'\''+msg+'\''+
' did not match any predefined mock prompts.')
sys.exit(error_msg)
# Mock method for signercli._get_password().
def _mock_get_password(msg, keyid=keyid, password=keyid_password):
_keyid = keyid[0]
if msg.endswith('('+_keyid+'): '):
return keyid_password
else:
return 'test' # password for targets' keyid.
# Method to patch signercli._get_keyids()
def _mock_get_keyid(junk, keyid=keyid):
return keyid
# Patch signercli._get_metadata_directory()
signercli._get_metadata_directory = _mock_get_meta_dir
# Patch signercli._prompt().
signercli._prompt = _mock_prompt
# Patch signercli._get_password().
signercli._get_password = _mock_get_password
# Patch signercli._get_keyids().
signercli._get_keyids = _mock_get_keyid
signercli.make_delegation(keystore_dir)
def test():
try:
# Setup.
root_repo, url, server_proc, keyids, interpose_json = \
util_test_tools.init_repo(tuf=True)
# Implement interposition!
tuf.interposition.configure(interpose_json)
tuf.interposition.interpose()
reg_repo = os.path.join(root_repo, 'reg_repo')
tuf_repo = os.path.join(root_repo, 'tuf_repo')
keystore_dir = os.path.join(tuf_repo, 'keystore')
metadata_dir = os.path.join(tuf_repo, 'metadata')
downloads_dir = os.path.join(root_repo, 'downloads')
targets_dir = os.path.join(tuf_repo, 'targets')
# Add two files to 'reg_repo' directory: {root_repo}
role1_path = tempfile.mkdtemp(dir=reg_repo)
filepath_1 = util_test_tools.add_file_to_repository(role1_path, 'Test A')
# Update TUF repository.
util_test_tools.tuf_refresh_repo(root_repo, keyids)
rel_filepath_1 = os.path.relpath(filepath_1, reg_repo)
# Indicate which file client downloads.
url_to_file = url+'reg_repo/'+rel_filepath_1
target_1 = os.path.join(targets_dir, rel_filepath_1)
junk, role1_relpath = os.path.split(role1_path)
delegated_targets_path = os.path.join(targets_dir, role1_relpath)
delegated_role_metadata_dir = os.path.join(metadata_dir, 'targets')
# Create a key to sign a new delegated role.
key = signerlib.generate_and_save_rsa_key(keystore_dir, 'pass1')
delegated_targets_keyids = [key['keyid']]
downloaded_file = os.path.join(downloads_dir, os.path.basename(filepath_1))
# Create a delegation.
create_delegation(tuf_repo, delegated_targets_path, delegated_targets_keyids,
'pass1', 'targets', 'role1')
util_test_tools.tuf_refresh_repo(root_repo, keyids)
# END Setup.
# Perform a client download.
urllib.urlretrieve(url_to_file, downloaded_file)
# Check whether the attack succeeded by inspecting the content of the
# update. The update should contain 'Test NOT A'.
downloaded_content = util_test_tools.read_file_content(downloaded_file)
msg = 'OUCH 1'
if 'Test A' != downloaded_content:
print msg
# Modify a target that was delegated to 'role1'.
util_test_tools.modify_file_at_repository(target_1, 'Test NOT A')
# This is not correct!!!???
util_test_tools.tuf_refresh_release_timestamp(metadata_dir, keyids)
# util_test_tools.tuf_refresh_repo(root_repo, keyids)
# Rebuild the delegation role metadata.
signerlib.build_delegated_role_file(delegated_targets_path,
delegated_targets_keyids, metadata_dir,
delegated_role_metadata_dir, 'role1.txt')
# Perform another client download.
urllib.urlretrieve(url_to_file, downloaded_file)
# Check whether the attack succeeded by inspecting the content of the
# update. The update should contain 'Test NOT A'.
downloaded_content = util_test_tools.read_file_content(downloaded_file)
msg = 'OUCH 2'
if 'Test NOT A' != downloaded_content:
print msg
finally:
tuf.interposition.go_away()
# NOTE: temporary files are created and NOT removed at this point. This
# is done in order to investigate the structure manually.
# TODO: use cleanup()!
server_proc.kill()
print 'Done.'
test()

View file

@ -358,7 +358,7 @@ def init_tuf(root_repo, url):
"confined_target_dirs": [ "" ]}},
"target_paths": [ { "(.*\\.html)": "{0}" } ]}}}
junk, interpose_json = tempfile.mkstemp(dir=root_repo)
junk, interpose_json = tempfile.mkstemp(prefix='conf_', dir=root_repo)
with open(interpose_json, 'wb') as fileobj:
tuf.util.json.dump(interposition_dict, fileobj)
@ -398,3 +398,14 @@ def tuf_refresh_repo(root_repo, keyids):
# Regenerate the 'timestamp.txt' metadata file.
signerlib.build_timestamp_file(keyids, metadata_dir)
def tuf_refresh_release_timestamp(metadata_dir, keyids):
# Regenerate the 'release.txt' metadata file.
signerlib.build_release_file(keyids, metadata_dir)
# Regenerate the 'timestamp.txt' metadata file.
signerlib.build_timestamp_file(keyids, metadata_dir)