mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Merge pull request #202 from vladimir-v-diaz/develop
Refactor test_extraneous_dependencies_attack.py.
This commit is contained in:
commit
77fd5e8efa
1 changed files with 177 additions and 191 deletions
|
|
@ -5,10 +5,17 @@
|
|||
test_extraneous_dependencies_attack.py
|
||||
|
||||
<Author>
|
||||
Zane Fisher
|
||||
Zane Fisher.
|
||||
|
||||
<Started>
|
||||
August 19, 2013
|
||||
|
||||
April 6, 2014.
|
||||
Refactored to use the 'unittest' module (test conditions in code, rather
|
||||
than verifying text output), use pre-generated repository files, and
|
||||
discontinue use of the old repository tools. Modify the previous scenario
|
||||
simulated for the mix-and-match attack. The metadata that specified the
|
||||
dependencies of a project modified (previously a text file.) -vladimir.v.diaz
|
||||
|
||||
<Copyright>
|
||||
See LICENSE for licensing information.
|
||||
|
|
@ -20,20 +27,7 @@
|
|||
target dependency even if it is found on the repository. Valid targets are
|
||||
listed and verified by TUF metadata, such as 'targets.txt'.
|
||||
|
||||
Target dependencies listed in the file are comma-separated.
|
||||
|
||||
Note: The interposition provided by 'tuf.interposition' is used to intercept
|
||||
all calls made by urllib/urillib2 to certain hostnames specified in
|
||||
the interposition configuration file. Look up interposition.py for more
|
||||
information and illustration of a sample contents of the interposition
|
||||
configuration file. Interposition was meant to make TUF integration with an
|
||||
existing software updater an easy process. This allows for more flexibility
|
||||
to the existing software updater. However, if you are planning to solely use
|
||||
TUF there should be no need for interposition, all necessary calls will be
|
||||
generated from within TUF.
|
||||
|
||||
There is no difference between 'updates' and 'target' files.
|
||||
|
||||
"""
|
||||
|
||||
# Help with Python 3 compatability, where the print statement is a function, an
|
||||
|
|
@ -45,198 +39,190 @@
|
|||
|
||||
import os
|
||||
import urllib
|
||||
import tempfile
|
||||
import random
|
||||
import time
|
||||
import shutil
|
||||
import json
|
||||
import subprocess
|
||||
import unittest
|
||||
import logging
|
||||
|
||||
import tuf
|
||||
import tuf.interposition
|
||||
import tuf.tests.util_test_tools as util_test_tools
|
||||
import tuf.formats
|
||||
import tuf.util
|
||||
import tuf.log
|
||||
import tuf.client.updater as updater
|
||||
import tuf.tests.unittest_toolbox as unittest_toolbox
|
||||
|
||||
|
||||
class ExtraneousDependencyAlert(Exception):
|
||||
pass
|
||||
logger = logging.getLogger('tuf.test_extraneous_dependencies_attack')
|
||||
|
||||
|
||||
|
||||
# Interpret anything following 'requires:' in the contents of the file it
|
||||
# downloads as a comma-separated list of dependent files from the same
|
||||
# repository.
|
||||
def _download(url, filename, directory, using_tuf=False):
|
||||
destination = os.path.join(directory, filename)
|
||||
if using_tuf:
|
||||
tuf.interposition.urllib_tuf.urlretrieve(url, destination)
|
||||
else:
|
||||
urllib.urlretrieve(url, destination)
|
||||
class TestExtraneousDependenciesAttack(unittest_toolbox.Modified_TestCase):
|
||||
|
||||
file_contents = util_test_tools.read_file_content(destination)
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
# setUpClass() is called before any of the test cases are executed.
|
||||
|
||||
# Create a temporary directory to store the repository, metadata, and target
|
||||
# files. 'temporary_directory' must be deleted in TearDownModule() so that
|
||||
# temporary files are always removed, even when exceptions occur.
|
||||
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
|
||||
|
||||
# Launch a SimpleHTTPServer (serves files in the current directory).
|
||||
# Test cases will request metadata and target files that have been
|
||||
# pre-generated in 'tuf/tests/repository_data', which will be served by the
|
||||
# SimpleHTTPServer launched here. The test cases of this unit test assume
|
||||
# the pre-generated metadata files have a specific structure, such
|
||||
# as a delegated role 'targets/role1', three target files, five key files,
|
||||
# etc.
|
||||
cls.SERVER_PORT = random.randint(30000, 45000)
|
||||
command = ['python', 'simple_server.py', str(cls.SERVER_PORT)]
|
||||
cls.server_process = subprocess.Popen(command, stderr=subprocess.PIPE)
|
||||
logger.info('Server process started.')
|
||||
logger.info('Server process id: '+str(cls.server_process.pid))
|
||||
logger.info('Serving on port: '+str(cls.SERVER_PORT))
|
||||
cls.url = 'http://localhost:'+str(cls.SERVER_PORT) + os.path.sep
|
||||
|
||||
# Parse the list of required files (if it exists) and download them.
|
||||
if file_contents.find('requires:') != -1:
|
||||
required_files =\
|
||||
file_contents[file_contents.find('requires:') + 9:].split(',')
|
||||
for required_filename in required_files:
|
||||
required_file_url = os.path.dirname(url)+os.sep+required_filename
|
||||
_download(required_file_url, required_filename, directory, using_tuf)
|
||||
# NOTE: Following error is raised if a delay is not applied:
|
||||
# <urlopen error [Errno 111] Connection refused>
|
||||
time.sleep(.2)
|
||||
|
||||
|
||||
|
||||
|
||||
def test_extraneous_dependency_attack(using_tuf=False, modify_metadata=False):
|
||||
"""
|
||||
<Purpose>
|
||||
Illustrate extraneous dependency attack vulnerability.
|
||||
|
||||
<Arguments>
|
||||
using_tuf:
|
||||
If set to 'False' all directories that start with 'tuf_' are ignored,
|
||||
indicating that tuf is not implemented.
|
||||
|
||||
"""
|
||||
|
||||
ERROR_MSG = 'Extraneous Dependency Attack was Successful!'
|
||||
|
||||
try:
|
||||
# Setup.
|
||||
root_repo, url, server_proc, keyids = util_test_tools.init_repo(using_tuf)
|
||||
reg_repo = os.path.join(root_repo, 'reg_repo')
|
||||
tuf_repo = os.path.join(root_repo, 'tuf_repo')
|
||||
downloads = os.path.join(root_repo, 'downloads')
|
||||
targets_dir = os.path.join(tuf_repo, 'targets')
|
||||
|
||||
# Add files to 'repo' directory: {root_repo}.
|
||||
good_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo,
|
||||
'the file you need')
|
||||
good_dependency_basename = os.path.basename(good_dependency_filepath)
|
||||
|
||||
bad_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo,
|
||||
'the file you don\'t need')
|
||||
bad_dependency_basename = os.path.basename(bad_dependency_filepath)
|
||||
|
||||
# The dependent file lists the good dependency.
|
||||
dependent_filepath = util_test_tools.add_file_to_repository(reg_repo,
|
||||
'requires:'+good_dependency_basename)
|
||||
dependent_basename = os.path.basename(dependent_filepath)
|
||||
|
||||
url_to_repo = url+'reg_repo/'+dependent_basename
|
||||
|
||||
# List the bad dependency first. If an attacker modifies a target by
|
||||
# simply appending the file contents, tuf.download will ignore the appended
|
||||
# data, downloading only as much data as the TUF metadata says the target
|
||||
# should contain.
|
||||
modified_dependency_list = bad_dependency_basename+','+\
|
||||
good_dependency_basename
|
||||
|
||||
if using_tuf:
|
||||
# Update TUF metadata before attacker modifies anything.
|
||||
util_test_tools.tuf_refresh_repo(root_repo, keyids)
|
||||
|
||||
# Modify the url. Remember that the interposition will intercept
|
||||
# urls that have 'localhost:9999' hostname, which was specified in
|
||||
# the json interposition configuration file. Look for 'hostname'
|
||||
# in 'util_test_tools.py'. Further, the 'file_basename' is the target
|
||||
# path relative to 'targets_dir'.
|
||||
url_to_repo = 'http://localhost:9999/'+dependent_basename
|
||||
|
||||
# Attacker modifies the depenent file in the targets repository, adding
|
||||
# the bad dependency to its list.
|
||||
dependent_target_filepath = os.path.join(targets_dir, dependent_basename)
|
||||
util_test_tools.modify_file_at_repository(dependent_target_filepath,
|
||||
'requires:'+modified_dependency_list)
|
||||
|
||||
if modify_metadata:
|
||||
|
||||
# Modify targets metadata to reflect the change to the target file.
|
||||
targets_metadata_filepath = os.path.join(tuf_repo, 'metadata',
|
||||
'targets.txt')
|
||||
util_test_tools.update_target_in_metadata(dependent_target_filepath,
|
||||
targets_metadata_filepath)
|
||||
|
||||
# Modify release metadata to reflect the change to targets metadata.
|
||||
release_metadata_filepath = os.path.join(tuf_repo, 'metadata',
|
||||
'release.txt')
|
||||
util_test_tools.update_role_in_metadata(targets_metadata_filepath,
|
||||
release_metadata_filepath)
|
||||
|
||||
# Modify timestamp metadata to reflect the change to release metadata.
|
||||
timestamp_metadata_filepath = os.path.join(tuf_repo, 'metadata',
|
||||
'timestamp.txt')
|
||||
util_test_tools.update_role_in_metadata(release_metadata_filepath,
|
||||
timestamp_metadata_filepath)
|
||||
|
||||
|
||||
|
||||
|
||||
# Attacker modifies the depenent file in the regular repository, adding
|
||||
# the bad dependency to its list.
|
||||
util_test_tools.modify_file_at_repository(dependent_filepath,
|
||||
'requires:'+modified_dependency_list)
|
||||
|
||||
# End of Setup.
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
# tearDownModule() is called after all the test cases have run.
|
||||
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
|
||||
|
||||
# Remove the temporary repository directory, which should contain all the
|
||||
# metadata, targets, and key files generated of all the test cases.
|
||||
shutil.rmtree(cls.temporary_directory)
|
||||
|
||||
unittest_toolbox.Modified_TestCase.clear_toolbox()
|
||||
|
||||
# Kill the SimpleHTTPServer process.
|
||||
if cls.server_process.returncode is None:
|
||||
logger.info('Server process '+str(cls.server_process.pid)+' terminated.')
|
||||
cls.server_process.kill()
|
||||
|
||||
|
||||
|
||||
def setUp(self):
|
||||
# We are inheriting from custom class.
|
||||
unittest_toolbox.Modified_TestCase.setUp(self)
|
||||
|
||||
# Copy the original repository files provided in the test folder so that
|
||||
# any modifications made to repository files are restricted to the copies.
|
||||
# The 'repository_data' directory is expected to exist in 'tuf/tests/'.
|
||||
original_repository_files = os.path.join(os.getcwd(), os.pardir,
|
||||
'repository_data')
|
||||
temporary_repository_root = \
|
||||
self.make_temp_directory(directory=self.temporary_directory)
|
||||
|
||||
# The original repository, keystore, and client directories will be copied
|
||||
# for each test case.
|
||||
original_repository = os.path.join(original_repository_files, 'repository')
|
||||
original_client = os.path.join(original_repository_files, 'client')
|
||||
original_keystore = os.path.join(original_repository_files, 'keystore')
|
||||
|
||||
# Save references to the often-needed client repository directories.
|
||||
# Test cases need these references to access metadata and target files.
|
||||
self.repository_directory = \
|
||||
os.path.join(temporary_repository_root, 'repository')
|
||||
self.client_directory = os.path.join(temporary_repository_root, 'client')
|
||||
self.keystore_directory = os.path.join(temporary_repository_root, 'keystore')
|
||||
|
||||
# Copy the original 'repository', 'client', and 'keystore' directories
|
||||
# to the temporary repository the test cases can use.
|
||||
shutil.copytree(original_repository, self.repository_directory)
|
||||
shutil.copytree(original_client, self.client_directory)
|
||||
shutil.copytree(original_keystore, self.keystore_directory)
|
||||
|
||||
# Set the url prefix required by the 'tuf/client/updater.py' updater.
|
||||
# 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'.
|
||||
repository_basepath = self.repository_directory[len(os.getcwd()):]
|
||||
url_prefix = \
|
||||
'http://localhost:' + str(self.SERVER_PORT) + repository_basepath
|
||||
|
||||
# Setting 'tuf.conf.repository_directory' with the temporary client
|
||||
# directory copied from the original repository files.
|
||||
tuf.conf.repository_directory = self.client_directory
|
||||
self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix,
|
||||
'metadata_path': 'metadata',
|
||||
'targets_path': 'targets',
|
||||
'confined_target_dirs': ['']}}
|
||||
|
||||
# Create the repository instance. The test cases will use this client
|
||||
# updater to refresh metadata, fetch target files, etc.
|
||||
self.repository_updater = updater.Updater('test_repository',
|
||||
self.repository_mirrors)
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
# Modified_TestCase.tearDown() automatically deletes temporary files and
|
||||
# directories that may have been created during each test case.
|
||||
unittest_toolbox.Modified_TestCase.tearDown(self)
|
||||
|
||||
|
||||
|
||||
def test_with_tuf(self):
|
||||
# An attacker tries to trick a client into installing an extraneous target
|
||||
# file (a valid file on the repository, in this case) by listing it in the
|
||||
# project's metadata file. For the purposes of test_with_tuf(),
|
||||
# 'targets/role1.json' is treated as the metadata file that indicates all
|
||||
# the files needed to install/update the 'role1' project. The attacker
|
||||
# simply adds the extraneous target file to 'role1.json', which the TUF
|
||||
# client should reject as untrusted.
|
||||
role1_filepath = os.path.join(self.repository_directory, 'metadata',
|
||||
'targets', 'role1.json')
|
||||
file1_filepath = os.path.join(self.repository_directory, 'targets',
|
||||
'file1.txt')
|
||||
length, hashes = tuf.util.get_file_details(file1_filepath)
|
||||
|
||||
role1_metadata = tuf.util.load_json_file(role1_filepath)
|
||||
role1_metadata['signed']['targets']['/file2.txt'] = {}
|
||||
role1_metadata['signed']['targets']['/file2.txt']['hashes'] = hashes
|
||||
role1_metadata['signed']['targets']['/file2.txt']['length'] = length
|
||||
|
||||
tuf.formats.check_signable_object_format(role1_metadata)
|
||||
|
||||
with open(role1_filepath, 'wb') as file_object:
|
||||
json.dump(role1_metadata, file_object, indent=1, sort_keys=True)
|
||||
|
||||
# Un-install the metadata of the top-level roles so that the client can
|
||||
# download and detect the invalid 'role1.json'.
|
||||
os.remove(os.path.join(self.client_directory, 'metadata', 'current',
|
||||
'snapshot.json'))
|
||||
os.remove(os.path.join(self.client_directory, 'metadata', 'current',
|
||||
'targets.json'))
|
||||
os.remove(os.path.join(self.client_directory, 'metadata', 'current',
|
||||
'timestamp.json'))
|
||||
os.remove(os.path.join(self.client_directory, 'metadata', 'current',
|
||||
'targets', 'role1.json'))
|
||||
|
||||
# Verify that the TUF client rejects the invalid metadata and refuses to
|
||||
# continue the update process.
|
||||
self.repository_updater.refresh()
|
||||
|
||||
try:
|
||||
# Client downloads (tries to download) the file.
|
||||
_download(url_to_repo, dependent_basename, downloads, using_tuf)
|
||||
|
||||
except tuf.NoWorkingMirrorError, error:
|
||||
# We only set up one mirror, so if it fails, we expect a
|
||||
# NoWorkingMirrorError. If TUF has worked as intended, the mirror error
|
||||
# contained within should be a BadHashError or a BadSignatureError,
|
||||
# depending on whether the metadata was modified.
|
||||
if modify_metadata:
|
||||
mirror_error = \
|
||||
error.mirror_errors[url+'tuf_repo/metadata/timestamp.txt']
|
||||
|
||||
assert isinstance(mirror_error, tuf.BadSignatureError)
|
||||
|
||||
else:
|
||||
mirror_error = \
|
||||
error.mirror_errors[url+'tuf_repo/targets/'+dependent_basename]
|
||||
|
||||
assert isinstance(mirror_error, tuf.BadHashError)
|
||||
self.repository_updater.targets_of_role('targets/role1')
|
||||
|
||||
# Verify that the specific 'tuf.BadHashError' exception is raised by each
|
||||
# mirror.
|
||||
except tuf.NoWorkingMirrorError, exception:
|
||||
for mirror_url, mirror_error in exception.mirror_errors.iteritems():
|
||||
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
|
||||
url_file = os.path.join(url_prefix, 'metadata', 'targets', 'role1.json')
|
||||
|
||||
# Verify that 'role1.json' is the culprit.
|
||||
self.assertEqual(url_file, mirror_url)
|
||||
self.assertTrue(isinstance(mirror_error, tuf.BadHashError))
|
||||
|
||||
else:
|
||||
# Check if the legitimate dependency was downloaded.
|
||||
if not(os.path.exists(os.path.join(downloads, good_dependency_basename))):
|
||||
raise tuf.DownloadError
|
||||
|
||||
# Check if the extraneous dependency was downloaded.
|
||||
if os.path.exists(os.path.join(downloads, bad_dependency_basename)):
|
||||
raise ExtraneousDependencyAlert(ERROR_MSG)
|
||||
|
||||
finally:
|
||||
util_test_tools.cleanup(root_repo, server_proc)
|
||||
self.fail('TUF did not prevent an extraneous dependencies attack.')
|
||||
|
||||
|
||||
|
||||
print('Attempting extraneous dependency attack without TUF:')
|
||||
try:
|
||||
test_extraneous_dependency_attack(using_tuf=False)
|
||||
|
||||
except ExtraneousDependencyAlert, error:
|
||||
print(error)
|
||||
|
||||
else:
|
||||
print('Extraneous dependency attack failed.')
|
||||
print()
|
||||
|
||||
print('Attempting extraneous dependency attack with TUF:')
|
||||
try:
|
||||
test_extraneous_dependency_attack(using_tuf=True, modify_metadata=False)
|
||||
|
||||
except ExtraneousDependencyAlert, error:
|
||||
print(error)
|
||||
|
||||
else:
|
||||
print('Extraneous dependency attack failed.')
|
||||
print()
|
||||
|
||||
print('Attempting extraneous dependency attack with TUF'+\
|
||||
' (and tampering with metadata):')
|
||||
try:
|
||||
test_extraneous_dependency_attack(using_tuf=True, modify_metadata=True)
|
||||
|
||||
except ExtraneousDependencyAlert, error:
|
||||
print(error)
|
||||
|
||||
else:
|
||||
print('Extraneous dependency attack failed.')
|
||||
print()
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
|||
Loading…
Reference in a new issue