mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Rewrote extraneous dependencies test, based on arbitrary package test.
This commit is contained in:
parent
794ba94b2d
commit
a990059329
1 changed files with 109 additions and 127 deletions
236
tuf/tests/system_tests/test_extraneous_dependencies_attack.py
Executable file → Normal file
236
tuf/tests/system_tests/test_extraneous_dependencies_attack.py
Executable file → Normal file
|
|
@ -1,190 +1,172 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
"""
|
||||
<Program Name>
|
||||
test_extraneous_dependencies_attack.py
|
||||
|
||||
<Author>
|
||||
Konstantin Andrianov
|
||||
Zane Fisher
|
||||
|
||||
<Started>
|
||||
February 19, 2012
|
||||
August 19, 2013
|
||||
|
||||
<Copyright>
|
||||
See LICENSE for licensing information.
|
||||
|
||||
<Purpose>
|
||||
Simulate an extraneous dependencies attack.
|
||||
Simulate an extraneous dependencies attack. The client attempts to download
|
||||
a file with one legitimate dependency, and one extraneous dependency.
|
||||
|
||||
In an extraneous dependencies attack, attacker is able to cause clients to
|
||||
download software dependencies that are not the intended dependencies.
|
||||
Note: The interposition provided by 'tuf.interposition' is used to intercept
|
||||
all calls made by urllib/urillib2 to certain hostnames specified in
|
||||
the interposition configuration file. Look up interposition.py for more
|
||||
information and illustration of a sample contents of the interposition
|
||||
configuration file. Interposition was meant to make TUF integration with an
|
||||
existing software updater an easy process. This allows for more flexibility
|
||||
to the existing software updater. However, if you are planning to solely use
|
||||
TUF there should be no need for interposition, all necessary calls will be
|
||||
generated from within TUF.
|
||||
|
||||
Note: There is no difference between 'updates' and 'target' files.
|
||||
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import shutil
|
||||
import urllib
|
||||
import tempfile
|
||||
import time
|
||||
|
||||
import util_test_tools
|
||||
import tuf.repo.keystore
|
||||
import tuf.repo.signerlib as signerlib
|
||||
import tuf.repo.signercli as signercli
|
||||
|
||||
import tuf
|
||||
from tuf.interposition import urllib_tuf
|
||||
|
||||
|
||||
|
||||
# Disable logging.
|
||||
util_test_tools.disable_logging()
|
||||
|
||||
version = 1
|
||||
|
||||
class ExtraneousDependenciesAttackAlert(Exception):
|
||||
|
||||
class ExtraneousDependencyAlert(Exception):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
def test_extraneous_dependencies_attack():
|
||||
# Interprets the contents of the file it downloads as a list of dependent
|
||||
# files from the same repository
|
||||
def _download(url, filename, directory, tuf=False):
|
||||
destination = os.path.join(directory, filename)
|
||||
#print 'downloading to '+destination
|
||||
#print 'from '+url
|
||||
if tuf:
|
||||
urllib_tuf.urlretrieve(url, destination)
|
||||
else:
|
||||
urllib.urlretrieve(url, destination)
|
||||
|
||||
#print 'DDDDDD '+util_test_tools.read_file_content(destination)
|
||||
if util_test_tools.read_file_content(destination) != '':
|
||||
required_files = util_test_tools.read_file_content(destination).split(',')
|
||||
for required_filename in required_files:
|
||||
#print 'requires '+required_filename
|
||||
required_file_url = os.path.dirname(url)+'/'+required_filename
|
||||
_download(required_file_url, required_filename, directory, tuf)
|
||||
|
||||
|
||||
|
||||
def test_extraneous_dependency_attack(TUF=False):
|
||||
"""
|
||||
<Arguments>
|
||||
TUF:
|
||||
If set to 'False' all directories that start with 'tuf_' are ignored,
|
||||
indicating that tuf is not implemented.
|
||||
|
||||
<Purpose>
|
||||
Illustrate arbitrary package attack vulnerability.
|
||||
|
||||
"""
|
||||
|
||||
ERROR_MSG = 'Extraneous Dependency Attack was Successful!\n'
|
||||
|
||||
ERROR_MSG = '\tExtraneous Dependencies Attack Succeeded!\n\n'
|
||||
|
||||
try:
|
||||
|
||||
# Setup.
|
||||
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=True)
|
||||
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF)
|
||||
reg_repo = os.path.join(root_repo, 'reg_repo')
|
||||
tuf_repo = os.path.join(root_repo, 'tuf_repo')
|
||||
keystore_dir = os.path.join(tuf_repo, 'keystore')
|
||||
metadata_dir = os.path.join(tuf_repo, 'metadata')
|
||||
downloads_dir = os.path.join(root_repo, 'downloads')
|
||||
downloads = os.path.join(root_repo, 'downloads')
|
||||
targets_dir = os.path.join(tuf_repo, 'targets')
|
||||
|
||||
# 'roles' holds information about delegated roles.
|
||||
roles = {'role1':{'password':['pass1']},
|
||||
'role2':{'password':['pass2']}}
|
||||
# Add files to 'repo' directory: {root_repo}
|
||||
good_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo, '')
|
||||
good_dependency_basename = os.path.basename(good_dependency_filepath)
|
||||
|
||||
# Add files to 'reg_repo' directory: {root_repo}
|
||||
role1_path = tempfile.mkdtemp(dir=reg_repo)
|
||||
roles['role1']['filepath'] = \
|
||||
util_test_tools.add_file_to_repository(role1_path, 'Test A')
|
||||
bad_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo, '')
|
||||
bad_dependency_basename = os.path.basename(bad_dependency_filepath)
|
||||
|
||||
role2_path = tempfile.mkdtemp(dir=reg_repo)
|
||||
roles['role2']['filepath'] = \
|
||||
util_test_tools.add_file_to_repository(role2_path, 'Test B')
|
||||
# The dependent file lists the good dependency
|
||||
dependent_filepath = util_test_tools.add_file_to_repository(reg_repo, good_dependency_basename)
|
||||
dependent_basename = os.path.basename(dependent_filepath)
|
||||
|
||||
# Update TUF repository.
|
||||
util_test_tools.make_targets_meta(root_repo)
|
||||
util_test_tools.make_release_meta(root_repo)
|
||||
util_test_tools.make_timestamp_meta(root_repo)
|
||||
url_to_repo = url+'reg_repo/'+dependent_basename
|
||||
#downloaded_file = os.path.join(downloads, dependent_basename)
|
||||
modified_dependency_list = good_dependency_basename+','+bad_dependency_basename
|
||||
|
||||
|
||||
def _make_delegation(rolename):
|
||||
expiration_date = tuf.formats.format_time(time.time()+86400)
|
||||
expiration_date = expiration_date[0:expiration_date.rfind(' UTC')]
|
||||
# Indicate which file client downloads.
|
||||
rel_filepath = os.path.relpath(roles[rolename]['filepath'], reg_repo)
|
||||
roles[rolename]['target_path'] = os.path.join(targets_dir, rel_filepath)
|
||||
rolepath, file_basename = os.path.split(roles[rolename]['filepath'])
|
||||
junk, role_relpath = os.path.split(rolepath)
|
||||
roles[rolename]['targets_dir'] = os.path.join(targets_dir, role_relpath)
|
||||
roles[rolename]['metadata_dir'] = os.path.join(metadata_dir, 'targets')
|
||||
|
||||
# Create a key to sign a new delegated role.
|
||||
password = roles[rolename]['password'][0]
|
||||
key = signerlib.generate_and_save_rsa_key(keystore_dir, password)
|
||||
roles[rolename]['keyid'] = [key['keyid']]
|
||||
roles[rolename]['dest_path'] = os.path.join(downloads_dir, file_basename)
|
||||
|
||||
# Create delegation one.
|
||||
util_test_tools.create_delegation(tuf_repo,
|
||||
roles[rolename]['targets_dir'],
|
||||
roles[rolename]['keyid'], password,
|
||||
'targets', rolename, expiration_date)
|
||||
|
||||
# Update TUF repository.
|
||||
# util_test_tools.make_targets_meta(root_repo)
|
||||
util_test_tools.make_release_meta(root_repo)
|
||||
util_test_tools.make_timestamp_meta(root_repo)
|
||||
if TUF:
|
||||
# Update TUF metadata before attacker modifies anything.
|
||||
util_test_tools.tuf_refresh_repo(root_repo, keyids)
|
||||
|
||||
# Modify the url. Remember that the interposition will intercept
|
||||
# urls that have 'localhost:9999' hostname, which was specified in
|
||||
# the json interposition configuration file. Look for 'hostname'
|
||||
# in 'util_test_tools.py'. Further, the 'file_basename' is the target
|
||||
# path relative to 'targets_dir'.
|
||||
roles[rolename]['url'] = 'http://localhost:9999/'+rel_filepath
|
||||
url_to_repo = 'http://localhost:9999/'+dependent_basename
|
||||
|
||||
# Perform a client download.
|
||||
urllib_tuf.urlretrieve(roles[rolename]['url'],
|
||||
roles[rolename]['dest_path'])
|
||||
# Attacker adds the dependency in the targets repository.
|
||||
target = os.path.join(targets_dir, dependent_basename)
|
||||
util_test_tools.modify_file_at_repository(target, modified_dependency_list)
|
||||
|
||||
# Attacker adds the dependency in the regular repository.
|
||||
util_test_tools.modify_file_at_repository(dependent_filepath, modified_dependency_list)
|
||||
|
||||
# End of Setup.
|
||||
|
||||
|
||||
_make_delegation('role1')
|
||||
_make_delegation('role2')
|
||||
|
||||
|
||||
# The attacks.
|
||||
|
||||
def _write_rogue_metadata():
|
||||
global version
|
||||
version = version+1
|
||||
expiration_date = tuf.formats.format_time(time.time()+86400)
|
||||
# Load the keystore before rebuilding the metadata.
|
||||
tuf.repo.keystore.load_keystore_from_keyfiles(keystore_dir,
|
||||
roles['role1']['keyid'],
|
||||
roles['role1']['password'])
|
||||
|
||||
# Rebuild the delegation role metadata.
|
||||
signerlib.build_delegated_role_file(roles['role2']['targets_dir'],
|
||||
roles['role1']['keyid'], metadata_dir,
|
||||
roles['role1']['metadata_dir'],
|
||||
'role1.txt', version, expiration_date)
|
||||
|
||||
# Update release and timestamp metadata.
|
||||
util_test_tools.make_release_meta(root_repo)
|
||||
util_test_tools.make_timestamp_meta(root_repo)
|
||||
|
||||
|
||||
# Modify a target that was delegated to 'role2'.
|
||||
util_test_tools.modify_file_at_repository(roles['role2']['target_path'],
|
||||
'Test NOT B')
|
||||
|
||||
# Update rogue delegatee metadata.
|
||||
_write_rogue_metadata()
|
||||
|
||||
# Perform another client download.
|
||||
try:
|
||||
urllib_tuf.urlretrieve(roles['role2']['url'], roles['role2']['dest_path'])
|
||||
except tuf.MetadataNotAvailableError, e:
|
||||
# Client downloads (tries to download) the file.
|
||||
_download(url=url_to_repo, filename=dependent_basename, directory=downloads, tuf=TUF)
|
||||
|
||||
except tuf.DownloadError:
|
||||
# If tuf.DownloadError is raised, this means that TUF has prevented
|
||||
# the download of an unrecognized file. Enable the logging to see,
|
||||
# what actually happened.
|
||||
pass
|
||||
|
||||
else:
|
||||
raise ExtraneousDependenciesAttackAlert(ERROR_MSG)
|
||||
|
||||
|
||||
# Add a target file to the directory delegated to 'role2' but not 'role1'.
|
||||
util_test_tools.add_file_to_repository(roles['role2']['targets_dir'], 'AAAA')
|
||||
|
||||
# Update rogue delegatee metadata.
|
||||
_write_rogue_metadata()
|
||||
|
||||
# Perform another client download.
|
||||
try:
|
||||
urllib_tuf.urlretrieve(roles['role2']['url'], roles['role2']['dest_path'])
|
||||
except tuf.MetadataNotAvailableError, e:
|
||||
pass
|
||||
else:
|
||||
raise ExtraneousDependenciesAttackAlert(ERROR_MSG)
|
||||
# Check if the legitimate dependency was downloaded
|
||||
if not(os.path.exists(os.path.join(downloads, good_dependency_basename))):
|
||||
raise tuf.DownloadError
|
||||
|
||||
# Check if the extraneous dependency was downloaded
|
||||
if os.path.exists(os.path.join(downloads, bad_dependency_basename)):
|
||||
raise ExtraneousDependencyAlert(ERROR_MSG)
|
||||
|
||||
|
||||
finally:
|
||||
server_proc.kill()
|
||||
#util_test_tools.cleanup(root_repo, server_proc)
|
||||
|
||||
|
||||
util_test_tools.cleanup(root_repo, server_proc)
|
||||
|
||||
|
||||
|
||||
print 'Attempting extraneous dependency attack without TUF:'
|
||||
try:
|
||||
test_extraneous_dependencies_attack()
|
||||
except ExtraneousDependenciesAttackAlert, error:
|
||||
print 'error'
|
||||
test_extraneous_dependency_attack(TUF=False)
|
||||
|
||||
except ExtraneousDependencyAlert, error:
|
||||
print error
|
||||
|
||||
|
||||
|
||||
print 'Attempting extraneous dependency attack with TUF:'
|
||||
try:
|
||||
test_extraneous_dependency_attack(TUF=True)
|
||||
|
||||
except ExtraneousDependencyAlert, error:
|
||||
print error
|
||||
|
|
|
|||
Loading…
Reference in a new issue