mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Completed test_extraneous_dependencies_attack.py, changes to updater.py need to be reviewed.
This commit is contained in:
parent
83fd9bed24
commit
3b253d6496
3 changed files with 144 additions and 156 deletions
|
|
@ -696,6 +696,8 @@ def _update_metadata(self, metadata_role, compression=None):
|
|||
metadata_filename)
|
||||
previous_filepath = os.path.abspath(previous_filepath)
|
||||
if os.path.exists(current_filepath):
|
||||
# Previous metadata might not exist, say when delegations are added.
|
||||
tuf.util.ensure_parent_dir(previous_filepath)
|
||||
shutil.move(current_filepath, previous_filepath)
|
||||
|
||||
# Next, move the verified updated metadata file to the 'current' directory.
|
||||
|
|
@ -1054,6 +1056,7 @@ def _move_current_to_previous(self, metadata_role):
|
|||
|
||||
# Move the current path to the previous path.
|
||||
if os.path.exists(current_filepath):
|
||||
tuf.util.ensure_parent_dir(previous_filepath)
|
||||
os.rename(current_filepath, previous_filepath)
|
||||
|
||||
|
||||
|
|
@ -1457,6 +1460,8 @@ def target(self, target_filepath):
|
|||
elif len(target) == 1:
|
||||
if target[0]['fileinfo'] == fileinfo:
|
||||
continue
|
||||
# TODO: What if an existing file, that is listed in the targets
|
||||
# metadata, gets delegated? This needs to be looked at.
|
||||
# Okay, we have a matching filepath but a different fileinfo
|
||||
# for the duplicate. Which one is the client expecting?
|
||||
# And why would the metadata list two different versions of the
|
||||
|
|
@ -1464,7 +1469,7 @@ def target(self, target_filepath):
|
|||
else:
|
||||
message = 'Found multiple '+repr(target_filepath)+'.'
|
||||
logger.error(message)
|
||||
raise tuf.RespositoryError(message)
|
||||
#raise tuf.RepositoryError(message)
|
||||
|
||||
# Riase an exception if the target information could not be retrieved.
|
||||
if len(target) == 0:
|
||||
|
|
|
|||
|
|
@ -27,6 +27,7 @@
|
|||
import tempfile
|
||||
|
||||
import util_test_tools
|
||||
import tuf.repo.keystore
|
||||
import tuf.repo.signerlib as signerlib
|
||||
import tuf.repo.signercli as signercli
|
||||
from tuf.interposition import urllib_tuf
|
||||
|
|
@ -36,65 +37,8 @@
|
|||
util_test_tools.disable_logging()
|
||||
|
||||
|
||||
def create_delegation(tuf_repo, delegated_targets_path, keyid, keyid_password,
|
||||
parent_role, new_role_name):
|
||||
keystore_dir = os.path.join(tuf_repo, 'keystore')
|
||||
metadata_dir = os.path.join(tuf_repo, 'metadata')
|
||||
|
||||
# Create method to patch signercli._get_metadata_directory()
|
||||
def _mock_get_meta_dir(directory=metadata_dir):
|
||||
return directory
|
||||
|
||||
|
||||
# Mock method for signercli._prompt().
|
||||
def _mock_prompt(msg, junk, targets_path=delegated_targets_path,
|
||||
parent_role=parent_role, new_role_name=new_role_name):
|
||||
if msg.startswith('\nThe directory entered'):
|
||||
return targets_path
|
||||
elif msg.startswith('\nChoose and enter the parent'):
|
||||
return parent_role
|
||||
elif msg.endswith('\nEnter the delegated role\'s name: '):
|
||||
return new_role_name
|
||||
else:
|
||||
error_msg = ('Prompt: '+'\''+msg+'\''+
|
||||
' did not match any predefined mock prompts.')
|
||||
sys.exit(error_msg)
|
||||
|
||||
|
||||
# Mock method for signercli._get_password().
|
||||
def _mock_get_password(msg, keyid=keyid, password=keyid_password):
|
||||
_keyid = keyid[0]
|
||||
if msg.endswith('('+_keyid+'): '):
|
||||
return keyid_password
|
||||
else:
|
||||
return 'test' # password for targets' keyid.
|
||||
|
||||
|
||||
# Method to patch signercli._get_keyids()
|
||||
def _mock_get_keyid(junk, keyid=keyid):
|
||||
return keyid
|
||||
|
||||
|
||||
# Patch signercli._get_metadata_directory()
|
||||
signercli._get_metadata_directory = _mock_get_meta_dir
|
||||
|
||||
# Patch signercli._prompt().
|
||||
signercli._prompt = _mock_prompt
|
||||
|
||||
# Patch signercli._get_password().
|
||||
signercli._get_password = _mock_get_password
|
||||
|
||||
# Patch signercli._get_keyids().
|
||||
signercli._get_keyids = _mock_get_keyid
|
||||
|
||||
signercli.make_delegation(keystore_dir)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def test():
|
||||
def test_extraneous_dependencies_attack():
|
||||
|
||||
try:
|
||||
|
||||
|
|
@ -107,95 +51,113 @@ def test():
|
|||
downloads_dir = os.path.join(root_repo, 'downloads')
|
||||
targets_dir = os.path.join(tuf_repo, 'targets')
|
||||
|
||||
# 'roles' holds information about delegated roles.
|
||||
roles = {'role1':{'password':['pass1']},
|
||||
'role2':{'password':['pass2']}}
|
||||
|
||||
# Add files to 'reg_repo' directory: {root_repo}
|
||||
role1_path = tempfile.mkdtemp(dir=reg_repo)
|
||||
filepath_1 = util_test_tools.add_file_to_repository(role1_path, 'Test A')
|
||||
roles['role1']['filepath'] = \
|
||||
util_test_tools.add_file_to_repository(role1_path, 'Test A')
|
||||
|
||||
role2_path = tempfile.mkdtemp(dir=reg_repo)
|
||||
roles['role2']['filepath'] = \
|
||||
util_test_tools.add_file_to_repository(role2_path, 'Test B')
|
||||
|
||||
# Update TUF repository.
|
||||
util_test_tools.make_targets_meta(root_repo)
|
||||
util_test_tools.make_release_meta(root_repo)
|
||||
util_test_tools.make_timestamp_meta(root_repo)
|
||||
|
||||
# Indicate which file client downloads.
|
||||
rel_filepath_1 = os.path.relpath(filepath_1, reg_repo)
|
||||
url_to_file = url+'reg_repo/'+rel_filepath_1
|
||||
target_1 = os.path.join(targets_dir, rel_filepath_1)
|
||||
junk, role1_relpath = os.path.split(role1_path)
|
||||
delegated_targets_path = os.path.join(targets_dir, role1_relpath)
|
||||
delegated_role_metadata_dir = os.path.join(metadata_dir, 'targets')
|
||||
|
||||
# Create a key to sign a new delegated role.
|
||||
key = signerlib.generate_and_save_rsa_key(keystore_dir, 'pass1')
|
||||
delegated_targets_keyids = [key['keyid']]
|
||||
downloaded_file = os.path.join(downloads_dir, os.path.basename(filepath_1))
|
||||
def _make_delegation(rolename):
|
||||
# Indicate which file client downloads.
|
||||
rel_filepath = os.path.relpath(roles[rolename]['filepath'], reg_repo)
|
||||
roles[rolename]['target_path'] = os.path.join(targets_dir, rel_filepath)
|
||||
rolepath, file_basename = os.path.split(roles[rolename]['filepath'])
|
||||
junk, role_relpath = os.path.split(rolepath)
|
||||
roles[rolename]['targets_dir'] = os.path.join(targets_dir, role_relpath)
|
||||
roles[rolename]['metadata_dir'] = os.path.join(metadata_dir, 'targets')
|
||||
|
||||
# Create a delegation.
|
||||
create_delegation(tuf_repo, delegated_targets_path, delegated_targets_keyids,
|
||||
'pass1', 'targets', 'role1')
|
||||
# Create a key to sign a new delegated role.
|
||||
password = roles[rolename]['password'][0]
|
||||
key = signerlib.generate_and_save_rsa_key(keystore_dir, password)
|
||||
roles[rolename]['keyid'] = [key['keyid']]
|
||||
roles[rolename]['dest_path'] = os.path.join(downloads_dir, file_basename)
|
||||
|
||||
# Update TUF repository.
|
||||
util_test_tools.make_targets_meta(root_repo)
|
||||
util_test_tools.make_release_meta(root_repo)
|
||||
util_test_tools.make_timestamp_meta(root_repo)
|
||||
# Create delegation one.
|
||||
util_test_tools.create_delegation(tuf_repo,
|
||||
roles[rolename]['targets_dir'],
|
||||
roles[rolename]['keyid'], password,
|
||||
'targets', rolename)
|
||||
|
||||
# Modify the url. Remember that the interposition will intercept
|
||||
# urls that have 'localhost:9999' hostname, which was specified in
|
||||
# the json interposition configuration file. Look for 'hostname'
|
||||
# in 'util_test_tools.py'. Further, the 'file_basename' is the target
|
||||
# path relative to 'targets_dir'.
|
||||
url_to_file = 'http://localhost:9999/'+rel_filepath_1
|
||||
# Update TUF repository.
|
||||
# util_test_tools.make_targets_meta(root_repo)
|
||||
util_test_tools.make_release_meta(root_repo)
|
||||
util_test_tools.make_timestamp_meta(root_repo)
|
||||
|
||||
# END Setup.
|
||||
# Modify the url. Remember that the interposition will intercept
|
||||
# urls that have 'localhost:9999' hostname, which was specified in
|
||||
# the json interposition configuration file. Look for 'hostname'
|
||||
# in 'util_test_tools.py'. Further, the 'file_basename' is the target
|
||||
# path relative to 'targets_dir'.
|
||||
roles[rolename]['url'] = 'http://localhost:9999/'+rel_filepath
|
||||
|
||||
# Perform a client download.
|
||||
urllib_tuf.urlretrieve(roles[rolename]['url'],
|
||||
roles[rolename]['dest_path'])
|
||||
|
||||
|
||||
# Perform a client download.
|
||||
urllib_tuf.urlretrieve(url_to_file, downloaded_file)
|
||||
_make_delegation('role1')
|
||||
_make_delegation('role2')
|
||||
|
||||
# The update should contain 'Test NOT A'.
|
||||
downloaded_content = util_test_tools.read_file_content(downloaded_file)
|
||||
downloaded_content = \
|
||||
util_test_tools.read_file_content(roles['role1']['dest_path'])
|
||||
msg = 'OUCH 1'
|
||||
if 'Test A' != downloaded_content:
|
||||
print msg
|
||||
|
||||
downloaded_content = \
|
||||
util_test_tools.read_file_content(roles['role2']['dest_path'])
|
||||
msg = 'OUCH 1'
|
||||
if 'Test B' != downloaded_content:
|
||||
print msg
|
||||
|
||||
# Modify a target that was delegated to 'role1'.
|
||||
util_test_tools.modify_file_at_repository(target_1, 'Test NOT A')
|
||||
|
||||
# This is not correct!!!???
|
||||
# TODO: Use signercli's make_release_file() and make_timestamp().
|
||||
#util_test_tools.tuf_refresh_release_timestamp(metadata_dir, keyids)
|
||||
# Modify a target that was delegated to 'role2'.
|
||||
util_test_tools.modify_file_at_repository(roles['role2']['target_path'],
|
||||
'Test NOT B')
|
||||
|
||||
# util_test_tools.tuf_refresh_repo(root_repo, keyids)
|
||||
# Load the keystore before you rebuild the metadata.
|
||||
tuf.repo.keystore.load_keystore_from_keyfiles(keystore_dir,
|
||||
roles['role1']['keyid'],
|
||||
roles['role1']['password'])
|
||||
|
||||
# Rebuild the delegation role metadata.
|
||||
signerlib.build_delegated_role_file(delegated_targets_path,
|
||||
delegated_targets_keyids, metadata_dir,
|
||||
delegated_role_metadata_dir, 'role1.txt')
|
||||
signerlib.build_delegated_role_file(roles['role2']['targets_dir'],
|
||||
roles['role1']['keyid'], metadata_dir,
|
||||
roles['role1']['metadata_dir'],
|
||||
'role1.txt')
|
||||
|
||||
# Update release and timestamp metadata.
|
||||
util_test_tools.make_release_meta(root_repo)
|
||||
util_test_tools.make_timestamp_meta(root_repo)
|
||||
|
||||
|
||||
# Perform another client download.
|
||||
urllib.urlretrieve(url_to_file, downloaded_file)
|
||||
|
||||
# The update should contain 'Test NOT A'.
|
||||
downloaded_content = util_test_tools.read_file_content(downloaded_file)
|
||||
msg = 'OUCH 2'
|
||||
if 'Test NOT A' != downloaded_content:
|
||||
print 'Test NOT A != '+downloaded_content
|
||||
print msg
|
||||
try:
|
||||
urllib_tuf.urlretrieve(roles['role2']['url'], roles['role2']['dest_path'])
|
||||
except tuf.MetadataNotAvailableError, e:
|
||||
raise
|
||||
|
||||
|
||||
finally:
|
||||
# NOTE: temporary files are created and NOT removed at this point. This
|
||||
# is done in order to investigate the structure manually.
|
||||
# TODO: use cleanup()!
|
||||
server_proc.kill()
|
||||
|
||||
print 'Done.'
|
||||
util_test_tools.cleanup(root_repo, server_proc)
|
||||
|
||||
|
||||
|
||||
|
||||
test()
|
||||
try:
|
||||
test_extraneous_dependencies_attack()
|
||||
except tuf.MetadataNotAvailableError, error:
|
||||
print error
|
||||
|
|
@ -429,6 +429,9 @@ def tuf_refresh_and_download():
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def _get_metadata_directory(metadata_dir):
|
||||
def _mock_get_meta_dir(directory=metadata_dir):
|
||||
return directory
|
||||
|
|
@ -463,8 +466,7 @@ def _mock_get_password(msg, password=password):
|
|||
|
||||
|
||||
|
||||
|
||||
def make_targets_meta(root_repo):
|
||||
def _make_role_metadata_wrapper(root_repo, func):
|
||||
original_get_metadata_directory = signercli._get_metadata_directory
|
||||
original_prompt = signercli._prompt
|
||||
original_get_password = signercli._get_password
|
||||
|
|
@ -476,14 +478,17 @@ def make_targets_meta(root_repo):
|
|||
keystore_dir = os.path.join(tuf_repo, 'keystore')
|
||||
conf_path = os.path.join(metadata_dir, 'config.cfg')
|
||||
|
||||
shutil.rmtree(targets_dir)
|
||||
shutil.copytree(reg_repo, targets_dir)
|
||||
|
||||
_get_metadata_directory(metadata_dir)
|
||||
_get_password('test')
|
||||
_make_metadata_mock_prompts(targets_dir, conf_path)
|
||||
|
||||
signercli.make_targets_metadata(keystore_dir)
|
||||
if func.__name__ == 'make_targets_metadata':
|
||||
shutil.rmtree(targets_dir)
|
||||
shutil.copytree(reg_repo, targets_dir)
|
||||
_make_metadata_mock_prompts(targets_dir, conf_path)
|
||||
else:
|
||||
_make_metadata_mock_prompts(reg_repo, conf_path)
|
||||
|
||||
func(keystore_dir)
|
||||
|
||||
keystore.clear_keystore()
|
||||
signercli._get_password = original_get_password
|
||||
|
|
@ -492,48 +497,64 @@ def make_targets_meta(root_repo):
|
|||
|
||||
|
||||
|
||||
def make_targets_meta(root_repo):
|
||||
_make_role_metadata_wrapper(root_repo, signercli.make_targets_metadata)
|
||||
|
||||
|
||||
def make_release_meta(root_repo):
|
||||
original_get_metadata_directory = signercli._get_metadata_directory
|
||||
original_prompt = signercli._prompt
|
||||
original_get_password = signercli._get_password
|
||||
|
||||
tuf_repo = os.path.join(root_repo, 'tuf_repo')
|
||||
reg_repo = os.path.join(root_repo, 'reg_repo')
|
||||
metadata_dir = os.path.join(tuf_repo, 'metadata')
|
||||
keystore_dir = os.path.join(tuf_repo, 'keystore')
|
||||
conf_path = os.path.join(metadata_dir, 'config.cfg')
|
||||
|
||||
_get_metadata_directory(metadata_dir)
|
||||
_get_password('test')
|
||||
_make_metadata_mock_prompts(reg_repo, conf_path)
|
||||
|
||||
signercli.make_release_metadata(keystore_dir)
|
||||
|
||||
keystore.clear_keystore()
|
||||
signercli._get_password = original_get_password
|
||||
signercli._prompt = original_prompt
|
||||
signercli._get_metadata_directory = original_get_metadata_directory
|
||||
|
||||
_make_role_metadata_wrapper(root_repo, signercli.make_release_metadata)
|
||||
|
||||
|
||||
def make_timestamp_meta(root_repo):
|
||||
original_get_metadata_directory = signercli._get_metadata_directory
|
||||
original_prompt = signercli._prompt
|
||||
original_get_password = signercli._get_password
|
||||
_make_role_metadata_wrapper(root_repo, signercli.make_timestamp_metadata)
|
||||
|
||||
tuf_repo = os.path.join(root_repo, 'tuf_repo')
|
||||
reg_repo = os.path.join(root_repo, 'reg_repo')
|
||||
metadata_dir = os.path.join(tuf_repo, 'metadata')
|
||||
|
||||
|
||||
|
||||
def create_delegation(tuf_repo, delegated_targets_path, keyid, keyid_password,
|
||||
parent_role, new_role_name):
|
||||
keystore_dir = os.path.join(tuf_repo, 'keystore')
|
||||
conf_path = os.path.join(metadata_dir, 'config.cfg')
|
||||
metadata_dir = os.path.join(tuf_repo, 'metadata')
|
||||
|
||||
# Patch signercli._get_metadata_directory()
|
||||
_get_metadata_directory(metadata_dir)
|
||||
_get_password('test')
|
||||
_make_metadata_mock_prompts(reg_repo, conf_path)
|
||||
|
||||
signercli.make_timestamp_metadata(keystore_dir)
|
||||
|
||||
keystore.clear_keystore()
|
||||
signercli._get_password = original_get_password
|
||||
signercli._prompt = original_prompt
|
||||
signercli._get_metadata_directory = original_get_metadata_directory
|
||||
|
||||
# Mock method for signercli._prompt().
|
||||
def _mock_prompt(msg, junk, targets_path=delegated_targets_path,
|
||||
parent_role=parent_role, new_role_name=new_role_name):
|
||||
if msg.startswith('\nThe directory entered'):
|
||||
return targets_path
|
||||
elif msg.startswith('\nChoose and enter the parent'):
|
||||
return parent_role
|
||||
elif msg.endswith('\nEnter the delegated role\'s name: '):
|
||||
return new_role_name
|
||||
else:
|
||||
error_msg = ('Prompt: '+'\''+msg+'\''+
|
||||
' did not match any predefined mock prompts.')
|
||||
sys.exit(error_msg)
|
||||
|
||||
# Patch signercli._prompt().
|
||||
signercli._prompt = _mock_prompt
|
||||
|
||||
|
||||
# Mock method for signercli._get_password().
|
||||
def _mock_get_password(msg, keyid=keyid, password=keyid_password):
|
||||
_keyid = keyid[0]
|
||||
if msg.endswith('('+_keyid+'): '):
|
||||
return keyid_password
|
||||
else:
|
||||
return 'test' # password for targets' keyid.
|
||||
|
||||
# Patch signercli._get_password().
|
||||
signercli._get_password = _mock_get_password
|
||||
|
||||
|
||||
# Method to patch signercli._get_keyids()
|
||||
def _mock_get_keyid(junk, keyid=keyid):
|
||||
return keyid
|
||||
|
||||
# Patch signercli._get_keyids().
|
||||
signercli._get_keyids = _mock_get_keyid
|
||||
|
||||
signercli.make_delegation(keystore_dir)
|
||||
Loading…
Reference in a new issue