mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Refactor test_endless_data_attack.py
Refactored 'test_endless_data_attack.py' to use the 'unittest' module (test conditions in code, rather than verifying text output), use pre-generated repository files, and discontinue use of the old repository tools. Minor edits to the test cases.
This commit is contained in:
parent
9cbd3f1d21
commit
b52194cdf4
1 changed files with 234 additions and 142 deletions
|
|
@ -5,30 +5,27 @@
|
|||
test_endless_data_attack.py
|
||||
|
||||
<Author>
|
||||
Konstantin Andrianov
|
||||
Konstantin Andrianov.
|
||||
|
||||
<Started>
|
||||
March 13, 2012
|
||||
March 13, 2012.
|
||||
|
||||
April 3, 2014.
|
||||
Refactored to use the 'unittest' module (test conditions in code, rather
|
||||
than verifying text output), use pre-generated repository files, and
|
||||
discontinue use of the old repository tools. Minor edits to the test cases.
|
||||
-vladimir.v.diaz
|
||||
|
||||
<Copyright>
|
||||
See LICENSE for licensing information.
|
||||
|
||||
<Purpose>
|
||||
Simulate an endless data attack. A simple client update vs. client
|
||||
update implementing TUF.
|
||||
|
||||
Note: The interposition provided by 'tuf.interposition' is used to intercept
|
||||
all calls made by urllib/urillib2 to certain hostname specified in
|
||||
the interposition configuration file. Look up interposition.py for more
|
||||
information and illustration of a sample contents of the interposition
|
||||
configuration file. Interposition was meant to make TUF integration with an
|
||||
existing software updater an easy process. This allows for more flexibility
|
||||
to the existing software updater. However, if you are planning to solely use
|
||||
TUF there should be no need for interposition, all necessary calls will be
|
||||
generated from within TUF.
|
||||
Simulate an endless data attack, where an updater client tries to download a
|
||||
target file modified by an attacker to contain a large amount of data (a TUF
|
||||
client should only download up to the file's expected length). TUF and
|
||||
non-TUF client scenarios are tested.
|
||||
|
||||
There is no difference between 'updates' and 'target' files.
|
||||
|
||||
"""
|
||||
|
||||
# Help with Python 3 compatability, where the print statement is a function, an
|
||||
|
|
@ -40,150 +37,245 @@
|
|||
|
||||
import os
|
||||
import urllib
|
||||
import tempfile
|
||||
import random
|
||||
import time
|
||||
import shutil
|
||||
import json
|
||||
import subprocess
|
||||
import unittest
|
||||
import logging
|
||||
|
||||
import tuf
|
||||
import tuf.interposition
|
||||
import tuf.tests.util_test_tools as util_test_tools
|
||||
import tuf.formats
|
||||
import tuf.util
|
||||
import tuf.log
|
||||
import tuf.client.updater as updater
|
||||
import tuf.tests.unittest_toolbox as unittest_toolbox
|
||||
|
||||
logger = logging.getLogger('tuf.test_endless_data_attack')
|
||||
|
||||
|
||||
class TestEndlessDataAttack(unittest_toolbox.Modified_TestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
# setUpClass() is called before any of the test cases are executed.
|
||||
|
||||
# Create a temporary directory to store the repository, metadata, and target
|
||||
# files. 'temporary_directory' must be deleted in TearDownModule() so that
|
||||
# temporary files are always removed, even when exceptions occur.
|
||||
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
|
||||
|
||||
# Launch a SimpleHTTPServer (serves files in the current directory).
|
||||
# Test cases will request metadata and target files that have been
|
||||
# pre-generated in 'tuf/tests/repository_data', which will be served by the
|
||||
# SimpleHTTPServer launched here. The test cases of this unit test assume
|
||||
# the pre-generated metadata files have a specific structure, such
|
||||
# as a delegated role 'targets/role1', three target files, five key files,
|
||||
# etc.
|
||||
cls.SERVER_PORT = random.randint(30000, 45000)
|
||||
command = ['python', 'simple_server.py', str(cls.SERVER_PORT)]
|
||||
cls.server_process = subprocess.Popen(command, stderr=subprocess.PIPE)
|
||||
logger.info('Server process started.')
|
||||
logger.info('Server process id: '+str(cls.server_process.pid))
|
||||
logger.info('Serving on port: '+str(cls.SERVER_PORT))
|
||||
cls.url = 'http://localhost:'+str(cls.SERVER_PORT) + os.path.sep
|
||||
|
||||
# NOTE: Following error is raised if a delay is not applied:
|
||||
# <urlopen error [Errno 111] Connection refused>
|
||||
time.sleep(.2)
|
||||
|
||||
|
||||
|
||||
class EndlessDataAttack(Exception):
|
||||
pass
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
# tearDownModule() is called after all the test cases have run.
|
||||
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
|
||||
|
||||
# Remove the temporary repository directory, which should contain all the
|
||||
# metadata, targets, and key files generated of all the test cases.
|
||||
shutil.rmtree(cls.temporary_directory)
|
||||
|
||||
unittest_toolbox.Modified_TestCase.clear_toolbox()
|
||||
|
||||
# Kill the SimpleHTTPServer process.
|
||||
if cls.server_process.returncode is None:
|
||||
logger.info('Server process '+str(cls.server_process.pid)+' terminated.')
|
||||
cls.server_process.kill()
|
||||
|
||||
|
||||
|
||||
def _download(url, filename, using_tuf=False):
|
||||
if using_tuf:
|
||||
tuf.interposition.urllib_tuf.urlretrieve(url, filename)
|
||||
else:
|
||||
urllib.urlretrieve(url, filename)
|
||||
def setUp(self):
|
||||
# We are inheriting from custom class.
|
||||
unittest_toolbox.Modified_TestCase.setUp(self)
|
||||
|
||||
# Copy the original repository files provided in the test folder so that
|
||||
# any modifications made to repository files are restricted to the copies.
|
||||
# The 'repository_data' directory is expected to exist in 'tuf/tests/'.
|
||||
original_repository_files = os.path.join(os.getcwd(), os.pardir,
|
||||
'repository_data')
|
||||
temporary_repository_root = \
|
||||
self.make_temp_directory(directory=self.temporary_directory)
|
||||
|
||||
# The original repository, keystore, and client directories will be copied
|
||||
# for each test case.
|
||||
original_repository = os.path.join(original_repository_files, 'repository')
|
||||
original_client = os.path.join(original_repository_files, 'client')
|
||||
|
||||
# Save references to the often-needed client repository directories.
|
||||
# Test cases need these references to access metadata and target files.
|
||||
self.repository_directory = \
|
||||
os.path.join(temporary_repository_root, 'repository')
|
||||
self.client_directory = os.path.join(temporary_repository_root, 'client')
|
||||
|
||||
# Copy the original 'repository', 'client', and 'keystore' directories
|
||||
# to the temporary repository the test cases can use.
|
||||
shutil.copytree(original_repository, self.repository_directory)
|
||||
shutil.copytree(original_client, self.client_directory)
|
||||
|
||||
# Set the url prefix required by the 'tuf/client/updater.py' updater.
|
||||
# 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'.
|
||||
repository_basepath = self.repository_directory[len(os.getcwd()):]
|
||||
url_prefix = \
|
||||
'http://localhost:' + str(self.SERVER_PORT) + repository_basepath
|
||||
|
||||
# Setting 'tuf.conf.repository_directory' with the temporary client
|
||||
# directory copied from the original repository files.
|
||||
tuf.conf.repository_directory = self.client_directory
|
||||
self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix,
|
||||
'metadata_path': 'metadata',
|
||||
'targets_path': 'targets',
|
||||
'confined_target_dirs': ['']}}
|
||||
|
||||
# Create the repository instance. The test cases will use this client
|
||||
# updater to refresh metadata, fetch target files, etc.
|
||||
self.repository_updater = updater.Updater('test_repository',
|
||||
self.repository_mirrors)
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
# Modified_TestCase.tearDown() automatically deletes temporary files and
|
||||
# directories that may have been created during each test case.
|
||||
unittest_toolbox.Modified_TestCase.tearDown(self)
|
||||
|
||||
|
||||
|
||||
def test_endless_data_attack(using_tuf=False, TIMESTAMP=False):
|
||||
"""
|
||||
<Purpose>
|
||||
Illustrate endless data attack vulnerability.
|
||||
def test_without_tuf(self):
|
||||
# Verify that a target file replaced with a larger malicious version (to
|
||||
# simulate an endless data attack) is downloaded by a non-TUF client (i.e.,
|
||||
# a non-TUF client that does not verify hashes, detect mix-and-mix attacks,
|
||||
# etc.) A tuf client, on the other hand, should only download target files
|
||||
# up to their expected lengths, as explicitly specified in metadata, or
|
||||
# 'tuf/conf.py' (when retrieving 'timestamp.json' and 'root.json unsafely'.)
|
||||
|
||||
# Test: Download a valid target file from the repository.
|
||||
# Ensure the target file to be downloaded has not already been downloaded,
|
||||
# and generate its file size and digest. The file size and digest is needed
|
||||
# to verify that the malicious file was indeed downloaded.
|
||||
target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt')
|
||||
client_target_path = os.path.join(self.client_directory, 'file1.txt')
|
||||
self.assertFalse(os.path.exists(client_target_path))
|
||||
length, hashes = tuf.util.get_file_details(target_path)
|
||||
fileinfo = tuf.formats.make_fileinfo(length, hashes)
|
||||
|
||||
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
|
||||
url_file = os.path.join(url_prefix, 'targets', 'file1.txt')
|
||||
urllib.urlretrieve(url_file, client_target_path)
|
||||
|
||||
self.assertTrue(os.path.exists(client_target_path))
|
||||
length, hashes = tuf.util.get_file_details(client_target_path)
|
||||
download_fileinfo = tuf.formats.make_fileinfo(length, hashes)
|
||||
self.assertEqual(fileinfo, download_fileinfo)
|
||||
|
||||
# Test: Download a target file that has been modified by an attacker with
|
||||
# extra data.
|
||||
with open(target_path, 'r+b') as file_object:
|
||||
original_content = file_object.read()
|
||||
file_object.write(original_content+('append large amount of data' * 100000))
|
||||
large_length, hashes = tuf.util.get_file_details(target_path)
|
||||
malicious_fileinfo = tuf.formats.make_fileinfo(large_length, hashes)
|
||||
|
||||
# Is the modified file actually larger?
|
||||
self.assertTrue(large_length > length)
|
||||
|
||||
urllib.urlretrieve(url_file, client_target_path)
|
||||
|
||||
length, hashes = tuf.util.get_file_details(client_target_path)
|
||||
download_fileinfo = tuf.formats.make_fileinfo(length, hashes)
|
||||
|
||||
# Verify 'download_fileinfo' is unequal to the original trusted version.
|
||||
self.assertNotEqual(download_fileinfo, fileinfo)
|
||||
|
||||
<Arguments>
|
||||
using_tuf:
|
||||
If set to 'False' all directories that start with 'tuf_' are ignored,
|
||||
indicating that tuf is not implemented.
|
||||
|
||||
"""
|
||||
|
||||
ERROR_MSG = 'Endless Data Attack was Successful!\n'
|
||||
|
||||
try:
|
||||
# Setup.
|
||||
root_repo, url, server_proc, keyids = util_test_tools.init_repo(using_tuf)
|
||||
reg_repo = os.path.join(root_repo, 'reg_repo')
|
||||
tuf_repo = os.path.join(root_repo, 'tuf_repo')
|
||||
downloads = os.path.join(root_repo, 'downloads')
|
||||
tuf_targets = os.path.join(tuf_repo, 'targets')
|
||||
|
||||
# Original data.
|
||||
INTENDED_DATA = 'Test A'
|
||||
|
||||
# Add a file to 'repo' directory: {root_repo}
|
||||
filepath = util_test_tools.add_file_to_repository(reg_repo, INTENDED_DATA)
|
||||
file_basename = os.path.basename(filepath)
|
||||
url_to_repo = url+'reg_repo/'+file_basename
|
||||
downloaded_file = os.path.join(downloads, file_basename)
|
||||
# We do not deliver truly endless data, but we will extend the original
|
||||
# file by many bytes.
|
||||
noisy_data = 'X'*100000
|
||||
# Verify 'download_fileinfo' is equal to the malicious version.
|
||||
self.assertEqual(download_fileinfo, malicious_fileinfo)
|
||||
|
||||
|
||||
if using_tuf:
|
||||
# Update TUF metadata before attacker modifies anything.
|
||||
util_test_tools.tuf_refresh_repo(root_repo, keyids)
|
||||
# Modify the url. Remember that the interposition will intercept
|
||||
# urls that have 'localhost:9999' hostname, which was specified in
|
||||
# the json interposition configuration file. Look for 'hostname'
|
||||
# in 'util_test_tools.py'. Further, the 'file_basename' is the target
|
||||
# path relative to 'targets_dir'.
|
||||
url_to_repo = 'http://localhost:9999/'+file_basename
|
||||
|
||||
# Attacker modifies the file at the targets repository.
|
||||
target = os.path.join(tuf_targets, file_basename)
|
||||
original_data = util_test_tools.read_file_content(target)
|
||||
larger_original_data = original_data + noisy_data
|
||||
util_test_tools.modify_file_at_repository(target, larger_original_data)
|
||||
def test_with_tuf(self):
|
||||
# Verify that a target file (on the remote repository) modified by an
|
||||
# attacker, to contain a large amount of extra data, is not downloaded by
|
||||
# the TUF client. First test that the valid target file is successfully
|
||||
# downloaded.
|
||||
file1_fileinfo = self.repository_updater.target('file1.txt')
|
||||
destination = os.path.join(self.client_directory)
|
||||
self.repository_updater.download_target(file1_fileinfo, destination)
|
||||
client_target_path = os.path.join(destination, 'file1.txt')
|
||||
self.assertTrue(os.path.exists(client_target_path))
|
||||
|
||||
# Verify the client's downloaded file matches the repository's.
|
||||
target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt')
|
||||
length, hashes = tuf.util.get_file_details(client_target_path)
|
||||
fileinfo = tuf.formats.make_fileinfo(length, hashes)
|
||||
|
||||
length, hashes = tuf.util.get_file_details(client_target_path)
|
||||
download_fileinfo = tuf.formats.make_fileinfo(length, hashes)
|
||||
self.assertEqual(fileinfo, download_fileinfo)
|
||||
|
||||
# Attacker modifies the timestamp.txt metadata.
|
||||
if TIMESTAMP:
|
||||
metadata = os.path.join(tuf_repo, 'metadata')
|
||||
timestamp = os.path.join(metadata, 'timestamp.txt')
|
||||
original_data = util_test_tools.read_file_content(timestamp)
|
||||
larger_original_data = original_data + noisy_data
|
||||
util_test_tools.modify_file_at_repository(timestamp,
|
||||
larger_original_data)
|
||||
|
||||
# Attacker modifies the file at the regular repository.
|
||||
original_data = util_test_tools.read_file_content(filepath)
|
||||
larger_original_data = original_data + noisy_data
|
||||
util_test_tools.modify_file_at_repository(filepath, larger_original_data)
|
||||
|
||||
# End Setup.
|
||||
|
||||
|
||||
# Client downloads (tries to download) the file.
|
||||
# Modify 'file1.txt' and confirm that the TUF client only downloads up to
|
||||
# the expected file length.
|
||||
with open(target_path, 'r+b') as file_object:
|
||||
original_content = file_object.read()
|
||||
file_object.write(original_content+('append large amount of data' * 10000))
|
||||
|
||||
# Is the modified file actually larger?
|
||||
large_length, hashes = tuf.util.get_file_details(target_path)
|
||||
self.assertTrue(large_length > length)
|
||||
|
||||
os.remove(client_target_path)
|
||||
self.repository_updater.download_target(file1_fileinfo, destination)
|
||||
|
||||
# A large amount of data has been appended to the original content. The
|
||||
# extra data appended should be discarded by the client, so the downloaded
|
||||
# file size and hash should not have changed.
|
||||
length, hashes = tuf.util.get_file_details(client_target_path)
|
||||
download_fileinfo = tuf.formats.make_fileinfo(length, hashes)
|
||||
self.assertEqual(fileinfo, download_fileinfo)
|
||||
|
||||
# Test that the TUF client does not download large metadata files, as well.
|
||||
timestamp_path = os.path.join(self.repository_directory, 'metadata',
|
||||
'timestamp.json')
|
||||
|
||||
original_length, hashes = tuf.util.get_file_details(timestamp_path)
|
||||
|
||||
with open(timestamp_path, 'r+b') as file_object:
|
||||
original_content = file_object.read()
|
||||
file_object.write(original_content+('append large amount of data' * 10000))
|
||||
|
||||
modified_length, hashes = tuf.util.get_file_details(timestamp_path)
|
||||
self.assertTrue(modified_length > original_length)
|
||||
|
||||
# Does the TUF client download the upper limit of an unsafely fetched
|
||||
# 'timestamp.json'? 'timestamp.json' must not be greater than
|
||||
# 'tuf.conf.DEFAULT_TIMESTAMP_REQUIRED_LENGTH'.
|
||||
try:
|
||||
_download(url_to_repo, downloaded_file, using_tuf)
|
||||
except Exception, exception:
|
||||
# Because we are extending the true timestamp TUF metadata with invalid
|
||||
# JSON, we except to catch an error about invalid metadata JSON.
|
||||
if using_tuf and TIMESTAMP:
|
||||
endless_data_attack = False
|
||||
|
||||
for mirror_url, mirror_error in exception.mirror_errors.iteritems():
|
||||
if isinstance(mirror_error, tuf.InvalidMetadataJSONError):
|
||||
endless_data_attack = True
|
||||
break
|
||||
|
||||
# In case we did not detect what was likely an endless data attack, we
|
||||
# reraise the exception to indicate that endless data attack detection
|
||||
# failed.
|
||||
if not endless_data_attack: raise
|
||||
else: raise
|
||||
|
||||
# When we test downloading "endless" timestamp with TUF, we want to skip
|
||||
# the following test because downloading the timestamp should have failed.
|
||||
if not (using_tuf and TIMESTAMP):
|
||||
# Check whether the attack succeeded by inspecting the content of the
|
||||
# update. The update should contain 'Test A'. Technically it suffices
|
||||
# to check whether the file was downloaded or not.
|
||||
downloaded_content = util_test_tools.read_file_content(downloaded_file)
|
||||
if downloaded_content != INTENDED_DATA:
|
||||
raise EndlessDataAttack(ERROR_MSG)
|
||||
|
||||
finally:
|
||||
util_test_tools.cleanup(root_repo, server_proc)
|
||||
self.repository_updater.refresh()
|
||||
|
||||
except tuf.NoWorkingMirrorError, exception:
|
||||
for mirror_url, mirror_error in exception.mirror_errors.iteritems():
|
||||
self.assertTrue(isinstance(mirror_error, tuf.InvalidMetadataJSONError))
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
try:
|
||||
test_endless_data_attack(using_tuf=False, TIMESTAMP=False)
|
||||
except EndlessDataAttack, error:
|
||||
print('Endless data attack worked on download without TUF!')
|
||||
|
||||
try:
|
||||
test_endless_data_attack(using_tuf=True, TIMESTAMP=False)
|
||||
except EndlessDataAttack, error:
|
||||
print('Endless data attack worked on download without TUF!')
|
||||
print(str(error))
|
||||
else:
|
||||
print('Endless data attack did not work on download with TUF!')
|
||||
|
||||
try:
|
||||
# This test fails because the timestamp metadata has been extended with
|
||||
# random data from its true length, thereby resulting in invalid JSON.
|
||||
test_endless_data_attack(using_tuf=True, TIMESTAMP=True)
|
||||
except EndlessDataAttack, error:
|
||||
print('Endless data attack worked on download without TUF!')
|
||||
print(str(error))
|
||||
else:
|
||||
print('Endless data attack did not work on download with TUF!')
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
|||
Loading…
Reference in a new issue