Fix imports statements in test_arbitrary_package_attack.py

This commit is contained in:
Vladimir Diaz 2017-01-11 16:41:13 -05:00
parent e5c002695b
commit 3ebf76182d

View file

@ -20,7 +20,7 @@
<Purpose>
Simulate an arbitrary package attack, where an updater client attempts to
download a malicious file. TUF and non-TUF client scenarios are tested.
download a malicious file. TUF and non-TUF client scenarios are tested.
There is no difference between 'updates' and 'target' files.
"""
@ -48,18 +48,17 @@
import unittest
else:
import unittest2 as unittest
import unittest2 as unittest
import tuf
import tuf.formats
import tuf.ssl_crypto.util
import tuf.roledb
import tuf.ssl_crypto.keydb
import tuf.keydb
import tuf.log
import tuf.client.updater as updater
import tuf.unittest_toolbox as unittest_toolbox
from simple_settings import settings
import securesystemslib
import six
logger = logging.getLogger('tuf.test_arbitrary_package_attack')
@ -70,16 +69,16 @@ class TestArbitraryPackageAttack(unittest_toolbox.Modified_TestCase):
@classmethod
def setUpClass(cls):
# setUpClass() is called before any of the test cases are executed.
# Create a temporary directory to store the repository, metadata, and target
# files. 'temporary_directory' must be deleted in TearDownModule() so that
# temporary files are always removed, even when exceptions occur.
# temporary files are always removed, even when exceptions occur.
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
# Launch a SimpleHTTPServer (serves files in the current directory).
# Test cases will request metadata and target files that have been
# pre-generated in 'tuf/tests/repository_data', which will be served by the
# SimpleHTTPServer launched here. The test cases of this unit test assume
# SimpleHTTPServer launched here. The test cases of this unit test assume
# the pre-generated metadata files have a specific structure, such
# as a delegated role 'targets/role1', three target files, five key files,
# etc.
@ -97,15 +96,15 @@ def setUpClass(cls):
@classmethod
@classmethod
def tearDownClass(cls):
# tearDownModule() is called after all the test cases have run.
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
# Remove the temporary repository directory, which should contain all the
# metadata, targets, and key files generated of all the test cases.
shutil.rmtree(cls.temporary_directory)
# Kill the SimpleHTTPServer process.
if cls.server_process.returncode is None:
logger.info('Server process '+str(cls.server_process.pid)+' terminated.')
@ -116,21 +115,21 @@ def tearDownClass(cls):
def setUp(self):
# We are inheriting from custom class.
unittest_toolbox.Modified_TestCase.setUp(self)
# Copy the original repository files provided in the test folder so that
# any modifications made to repository files are restricted to the copies.
# The 'repository_data' directory is expected to exist in 'tuf/tests/'.
original_repository_files = os.path.join(os.getcwd(), 'repository_data')
original_repository_files = os.path.join(os.getcwd(), 'repository_data')
temporary_repository_root = \
self.make_temp_directory(directory=self.temporary_directory)
# The original repository, keystore, and client directories will be copied
# for each test case.
# for each test case.
original_repository = os.path.join(original_repository_files, 'repository')
original_client = os.path.join(original_repository_files, 'client')
# Save references to the often-needed client repository directories.
# Test cases need these references to access metadata and target files.
# Test cases need these references to access metadata and target files.
self.repository_directory = \
os.path.join(temporary_repository_root, 'repository')
self.client_directory = os.path.join(temporary_repository_root, 'client')
@ -141,14 +140,14 @@ def setUp(self):
shutil.copytree(original_client, self.client_directory)
# Set the url prefix required by the 'tuf/client/updater.py' updater.
# 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'.
# 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'.
repository_basepath = self.repository_directory[len(os.getcwd()):]
url_prefix = \
'http://localhost:' + str(self.SERVER_PORT) + repository_basepath
# Setting 'settings.repository_directory' with the temporary client
'http://localhost:' + str(self.SERVER_PORT) + repository_basepath
# Setting 'tuf.settings.repository_directory' with the temporary client
# directory copied from the original repository files.
settings.repository_directory = self.client_directory
tuf.settings.repository_directory = self.client_directory
self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix,
'metadata_path': 'metadata',
'targets_path': 'targets',
@ -164,46 +163,46 @@ def tearDown(self):
# Modified_TestCase.tearDown() automatically deletes temporary files and
# directories that may have been created during each test case.
unittest_toolbox.Modified_TestCase.tearDown(self)
# updater.Updater() populates the roledb with the name "test_repository"
# updater.Updater() populates the roledb with the name "test_repository"
tuf.roledb.clear_roledb(clear_all=True)
tuf.ssl_crypto.keydb.clear_keydb(clear_all=True)
tuf.keydb.clear_keydb(clear_all=True)
def test_without_tuf(self):
# Verify that a target file replaced with a malicious version is downloaded
# by a non-TUF client (i.e., a non-TUF client that does not verify hashes,
# detect mix-and-mix attacks, etc.) A tuf client, on the other hand, should
# detect that the downloaded target file is invalid.
# Test: Download a valid target file from the repository.
# Ensure the target file to be downloaded has not already been downloaded,
# and generate its file size and digest. The file size and digest is needed
# to check that the malicious file was indeed downloaded.
target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt')
client_target_path = os.path.join(self.client_directory, 'file1.txt')
client_target_path = os.path.join(self.client_directory, 'file1.txt')
self.assertFalse(os.path.exists(client_target_path))
length, hashes = tuf.ssl_crypto.util.get_file_details(target_path)
length, hashes = securesystemslib.util.get_file_details(target_path)
fileinfo = tuf.formats.make_fileinfo(length, hashes)
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
url_file = os.path.join(url_prefix, 'targets', 'file1.txt')
six.moves.urllib.request.urlretrieve(url_file, client_target_path)
self.assertTrue(os.path.exists(client_target_path))
length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path)
length, hashes = securesystemslib.util.get_file_details(client_target_path)
download_fileinfo = tuf.formats.make_fileinfo(length, hashes)
self.assertEqual(fileinfo, download_fileinfo)
# Test: Download a target file that has been modified by an attacker.
with open(target_path, 'wt') as file_object:
file_object.write('add malicious content.')
length, hashes = tuf.ssl_crypto.util.get_file_details(target_path)
length, hashes = securesystemslib.util.get_file_details(target_path)
malicious_fileinfo = tuf.formats.make_fileinfo(length, hashes)
six.moves.urllib.request.urlretrieve(url_file, client_target_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path)
length, hashes = securesystemslib.util.get_file_details(client_target_path)
download_fileinfo = tuf.formats.make_fileinfo(length, hashes)
# Verify 'download_fileinfo' is unequal to the original trusted version.
self.assertNotEqual(download_fileinfo, fileinfo)
@ -226,32 +225,32 @@ def test_with_tuf(self):
target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt')
with open(target_path, 'wt') as file_object:
file_object.write('malicious content, size 33 bytes.')
try:
self.repository_updater.download_target(file1_fileinfo, destination)
except tuf.ssl_commons.exceptions.NoWorkingMirrorError as exception:
except tuf.exceptions.NoWorkingMirrorError as exception:
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
url_file = os.path.join(url_prefix, 'targets', 'file1.txt')
# Verify that only one exception is raised for 'url_file'.
self.assertTrue(len(exception.mirror_errors), 1)
# Verify that the expected 'tuf.ssl_commons.exceptions.DownloadLengthMismatchError' exception
# Verify that the expected 'tuf.exceptions.DownloadLengthMismatchError' exception
# is raised for 'url_file'.
self.assertTrue(url_file in exception.mirror_errors)
self.assertTrue(isinstance(exception.mirror_errors[url_file],
tuf.ssl_commons.exceptions.BadHashError))
securesystemslib.exceptions.BadHashError))
else:
self.fail('TUF did not prevent an arbitrary package attack.')
def test_with_tuf_and_metadata_tampering(self):
# Test that a TUF client does not download a malicious target file, and a
# 'targets.json' metadata file that has also been modified by the attacker.
# The attacker does not attach a valid signature to 'targets.json'
# An attacker modifies 'file1.txt'.
target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt')
with open(target_path, 'wt') as file_object:
@ -259,20 +258,20 @@ def test_with_tuf_and_metadata_tampering(self):
# An attacker also tries to add the malicious target's length and digest
# to its metadata file.
length, hashes = tuf.ssl_crypto.util.get_file_details(target_path)
length, hashes = securesystemslib.util.get_file_details(target_path)
metadata_path = \
os.path.join(self.repository_directory, 'metadata', 'targets.json')
metadata = tuf.ssl_crypto.util.load_json_file(metadata_path)
metadata = securesystemslib.util.load_json_file(metadata_path)
metadata['signed']['targets']['/file1.txt']['hashes'] = hashes
metadata['signed']['targets']['/file1.txt']['length'] = length
tuf.formats.check_signable_object_format(metadata)
tuf.formats.check_signable_object_format(metadata)
with open(metadata_path, 'wb') as file_object:
json.dumps(metadata, file_object, indent=1, sort_keys=True).encode('utf-8')
json.dumps(metadata, file_object, indent=1, sort_keys=True).encode('utf-8')
# Verify that the malicious 'targets.json' is not downloaded. Perform
# a refresh of top-level metadata to demonstrate that the malicious
# 'targets.json' is not downloaded.
@ -281,8 +280,8 @@ def test_with_tuf_and_metadata_tampering(self):
file1_fileinfo = self.repository_updater.get_one_valid_targetinfo('file1.txt')
destination = os.path.join(self.client_directory)
self.repository_updater.download_target(file1_fileinfo, destination)
except tuf.ssl_commons.exceptions.NoWorkingMirrorError as exception:
except tuf.exceptions.NoWorkingMirrorError as exception:
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
url_file = os.path.join(url_prefix, 'targets', 'file1.txt')
@ -292,8 +291,8 @@ def test_with_tuf_and_metadata_tampering(self):
# Verify that the specific and expected mirror exception is raised.
self.assertTrue(url_file in exception.mirror_errors)
self.assertTrue(isinstance(exception.mirror_errors[url_file],
tuf.ssl_commons.exceptions.BadHashError))
securesystemslib.exceptions.BadHashError))
else:
self.fail('TUF did not prevent an arbitrary package attack.')