diff --git a/tuf/tests/statement_coverage.py b/tuf/tests/statement_coverage.py new file mode 100755 index 00000000..6c40b0d5 --- /dev/null +++ b/tuf/tests/statement_coverage.py @@ -0,0 +1,86 @@ +""" + + statement_coverage.py + + + Konstantin Andrianov + + + March 20, 2013. + + + See LICENSE for licensing information. + + + Measure test coverage. + +NOTE: This script is based on third party software. In order to use this +script install Ned Batchelder's coverage.py: +http://nedbatchelder.com/code/coverage/ + + +""" + +import os +import sys + +# Try to import coverage.py. Coverage.py is a third party software. +try: + coverage = __import__('coverage') +except ImportError, error: + error_msg = ("\nIt appears that coverage.py is not installed. Install "+ + "Ned Batchelder's coverage.py "+ + "('http://nedbatchelder.com/code/coverage/') and try again.\n") + print error_msg + raise + + +cov = coverage.coverage() +cov.start() + +try: + current_directory = os.getcwd() + current_directory_content = os.listdir(current_directory) + + # Find test scripts and import them. + test_modules = [] # Test modules. + tested_modules = [] # Modules that are tested by the 'test_modules'. + + for _file in current_directory_content: + if _file.startswith('test') and _file.endswith('.py'): + + _file = os.path.splitext(_file)[0] + test_modules.append(_file) + + # Import the module. + try: + module = __import__(_file) + except ImportError, error: + print 'Unable to load the module: '+_file + raise + + _file = '.'+_file[5:] + tested_modules.append(_file) + +finally: + cov.stop() + + +# Include quickstart. +tested_modules.remove('.quickstart') +tested_modules.append('quickstart') + +# Extracting tuf modules. +tuf_modules = {} # list of all loaded tuf modules. +for module_name in sys.modules: + if module_name.startswith('tuf') or module_name.startswith('quickstart'): + tuf_modules[module_name] = sys.modules[module_name] + +# Tested module paths. +tested_module_paths = [] +for module_name in tuf_modules: + for tested_module in tested_modules: + if module_name.endswith(tested_module): + tested_module_paths.append(tuf_modules[module_name].__file__) + +cov.report(tested_module_paths) \ No newline at end of file diff --git a/tuf/tests/system_tests/slow_retrieval_server.py b/tuf/tests/system_tests/slow_retrieval_server.py index 5b413da7..0887d119 100755 --- a/tuf/tests/system_tests/slow_retrieval_server.py +++ b/tuf/tests/system_tests/slow_retrieval_server.py @@ -1,3 +1,23 @@ +""" + + slow_retrieval_server.py + + + Konstantin Andrianov + + + March 13, 2012 + + + See LICENSE for licensing information. + + + Server that throttles data by sending one byte at a time + (specified time interval 'DELAY'). The server is used in + test_slow_retrieval_attack.py. + +""" + import os import sys import time @@ -5,6 +25,10 @@ from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer +DELAY = 1 + + + # HTTP request handler. class Handler(BaseHTTPRequestHandler): @@ -22,8 +46,7 @@ def do_GET(self): # Throttle the file by sending a character every few seconds. for i in range(len(data)): - print i - time.sleep(1) + time.sleep(DELAY) self.wfile.write(data[i]) return @@ -42,15 +65,15 @@ def get_random_port(): def run(port): server_address = ('localhost', port) httpd = HTTPServer(server_address, Handler) - print('Server is active...') + print('Slow server is active on port: '+str(port)+' ...') httpd.handle_request() - + if __name__ == '__main__': - if sys.argv[1]: - port = int(sys.argv[1]) - else: - port = get_random_port() - print 'Port: '+str(port) - run(port) + if len(sys.argv) > 1: + port = int(sys.argv[1]) + else: + port = get_random_port() + + run(port) \ No newline at end of file diff --git a/tuf/tests/system_tests/test_extraneous_dependencies_attack.py b/tuf/tests/system_tests/test_extraneous_dependencies_attack.py index 2b9a946a..57f1c583 100755 --- a/tuf/tests/system_tests/test_extraneous_dependencies_attack.py +++ b/tuf/tests/system_tests/test_extraneous_dependencies_attack.py @@ -35,9 +35,15 @@ util_test_tools.disable_logging() +class ExtraneousDependenciesAttackAlert(Exception): + pass + + def test_extraneous_dependencies_attack(): + ERROR_MSG = '\tExtraneous Dependencies Attack Succeeded!\n\n' + try: # Setup. @@ -111,43 +117,66 @@ def _make_delegation(rolename): _make_delegation('role2') - # The attack. + # The attacks. + + def _write_rogue_metadata(): + # Load the keystore before rebuilding the metadata. + tuf.repo.keystore.load_keystore_from_keyfiles(keystore_dir, + roles['role1']['keyid'], + roles['role1']['password']) + + # Rebuild the delegation role metadata. + signerlib.build_delegated_role_file(roles['role2']['targets_dir'], + roles['role1']['keyid'], metadata_dir, + roles['role1']['metadata_dir'], + 'role1.txt') + + # Update release and timestamp metadata. + util_test_tools.make_release_meta(root_repo) + util_test_tools.make_timestamp_meta(root_repo) + + # Modify a target that was delegated to 'role2'. util_test_tools.modify_file_at_repository(roles['role2']['target_path'], 'Test NOT B') - # Load the keystore before rebuilding the metadata. - tuf.repo.keystore.load_keystore_from_keyfiles(keystore_dir, - roles['role1']['keyid'], - roles['role1']['password']) - - # Rebuild the delegation role metadata. - signerlib.build_delegated_role_file(roles['role2']['targets_dir'], - roles['role1']['keyid'], metadata_dir, - roles['role1']['metadata_dir'], - 'role1.txt') - - # Update release and timestamp metadata. - util_test_tools.make_release_meta(root_repo) - util_test_tools.make_timestamp_meta(root_repo) - + # Update rogue delegatee metadata. + _write_rogue_metadata() # Perform another client download. try: urllib_tuf.urlretrieve(roles['role2']['url'], roles['role2']['dest_path']) except tuf.MetadataNotAvailableError, e: - raise + pass + else: + raise ExtraneousDependenciesAttackAlert(ERROR_MSG) - finally: + # Add a target file to the directory delegated to 'role2' but not 'role1'. + util_test_tools.add_file_to_repository(roles['role2']['targets_dir'], 'AAAA') + + # Update rogue delegatee metadata. + _write_rogue_metadata() + + # Perform another client download. + try: + urllib_tuf.urlretrieve(roles['role2']['url'], roles['role2']['dest_path']) + except tuf.MetadataNotAvailableError, e: + pass + else: + raise ExtraneousDependenciesAttackAlert(ERROR_MSG) + + + + finally: server_proc.kill() - util_test_tools.cleanup(root_repo, server_proc) + #util_test_tools.cleanup(root_repo, server_proc) + + try: test_extraneous_dependencies_attack() -except tuf.MetadataNotAvailableError, error: - print str(error)+'\n' -else: - print 'Extraneous Dependencies Attack Succeeded!\n' \ No newline at end of file +except ExtraneousDependenciesAttackAlert, error: + print 'error' \ No newline at end of file diff --git a/tuf/tests/system_tests/test_indefinite_freeze_attack.py b/tuf/tests/system_tests/test_indefinite_freeze_attack.py index 1017fb4f..43b69159 100755 --- a/tuf/tests/system_tests/test_indefinite_freeze_attack.py +++ b/tuf/tests/system_tests/test_indefinite_freeze_attack.py @@ -149,6 +149,7 @@ def test_indefinite_freeze_attack(TUF=False): except IndefiniteFreezeAttackAlert, error: print error + try: test_indefinite_freeze_attack(TUF=True) except IndefiniteFreezeAttackAlert, error: diff --git a/tuf/tests/system_tests/test_mix_and_match_attack.py b/tuf/tests/system_tests/test_mix_and_match_attack.py new file mode 100755 index 00000000..a45ca24b --- /dev/null +++ b/tuf/tests/system_tests/test_mix_and_match_attack.py @@ -0,0 +1,197 @@ +""" + + test_mix_and_match_attack.py + + + Konstantin Andrianov + + + March 27, 2012 + + + See LICENSE for licensing information. + + + Simulate slow retrieval attack. A simple client update vs. client + update implementing TUF. + + In the mix-and-match attack, attacker is able to trick clients into using + combination of metadata that never existed together on the repository at + the same time. + +NOTE: The interposition provided by 'tuf.interposition' is used to intercept +all calls made by urllib/urillib2 to certain network locations specified in +the interposition configuration file. Look up interposition.py for more +information and illustration of a sample contents of the interposition +configuration file. Interposition was meant to make TUF integration with an +existing software updater an easy process. This allows for more flexibility +to the existing software updater. However, if you are planning to solely use +TUF there should be no need for interposition, all necessary calls will be +generated from within TUF. + +Note: There is no difference between 'updates' and 'target' files. + +""" + + +import os +import shutil +import urllib +import tempfile + +import util_test_tools +from tuf.interposition import urllib_tuf + + +# Disable logging. +util_test_tools.disable_logging() + + + +class MixAndMatchAttackAlert(Exception): + pass + + +def _download(url, filename, tuf=False): + if tuf: + urllib_tuf.urlretrieve(url, filename) + + else: + urllib.urlretrieve(url, filename) + + + +def test_mix_and_match_attack(TUF=False): + """ + Attack design: + There are 3 stages: + Stage 1: Consists of a usual mode of operations using tuf. Client + downloads a target file. (Initial download) + + Stage 2: The target file is legitimately modified and metadata correctly + updated. Client downloads the target file again. (Patched target download) + + Stage 3: The target file is legitimately modified and metadata correctly + updated again. However, before client gets to download the newly patched + target file the attacker replaces the release metadata, targets metadata + and the target file with the ones from stage 1 (mix-and-match attack). + Note that timestamp metadata is untouched. Further note that same would + happen if only target metadata, and target file are reverted. + """ + + ERROR_MSG = '\tMix-And-Match Attack was Successful!\n\n' + + + try: + # Setup / Stage 1 + # --------------- + root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF) + reg_repo = os.path.join(root_repo, 'reg_repo') + downloads = os.path.join(root_repo, 'downloads') + evil_dir = tempfile.mkdtemp(dir=root_repo) + + # Add file to 'repo' directory: {root_repo} + filepath = util_test_tools.add_file_to_repository(reg_repo, 'A'*10) + file_basename = os.path.basename(filepath) + url_to_file = url+'reg_repo/'+file_basename + downloaded_file = os.path.join(downloads, file_basename) + + # Attacker saves the initial file. + shutil.copy(filepath, evil_dir) + unpatched_file = os.path.join(evil_dir, file_basename) + + + if TUF: + print 'TUF ...' + tuf_repo = os.path.join(root_repo, 'tuf_repo') + tuf_targets = os.path.join(tuf_repo, 'targets') + metadata_dir = os.path.join(tuf_repo, 'metadata') + release_meta_file = os.path.join(metadata_dir, 'release.txt') + targets_meta_file = os.path.join(metadata_dir, 'targets.txt') + target = os.path.join(tuf_targets, file_basename) + + # Update TUF metadata before attacker modifies anything. + util_test_tools.tuf_refresh_repo(root_repo, keyids) + + # Attacker saves the original metadata and the target file. + #shutil.copy(target, evil_dir) + shutil.copy(release_meta_file, evil_dir) + shutil.copy(targets_meta_file, evil_dir) + #target_old = os.path.join(evil_dir, file_basename) + release_meta_file_old = os.path.join(evil_dir, 'release.txt') + targets_meta_file_old = os.path.join(evil_dir, 'targets.txt') + + # Modify the url. Remember that the interposition will intercept + # urls that have 'localhost:9999' hostname, which was specified in + # the json interposition configuration file. Look for 'hostname' + # in 'util_test_tools.py'. Further, the 'file_basename' is the target + # path relative to 'targets_dir'. + url_to_file = 'http://localhost:9999/'+file_basename + + + # Client's initial download. + _download(url=url_to_file, filename=downloaded_file, tuf=TUF) + + # Stage 2 + # ------- + # Developer patches the file and updates the repository. + util_test_tools.modify_file_at_repository(filepath, 'B'*11) + + # Updating tuf repository. This will copy files from regular repository + # into tuf repository and refresh the metadata + if TUF: + util_test_tools.tuf_refresh_repo(root_repo, keyids) + + # Client downloads the patched file. + _download(url=url_to_file, filename=downloaded_file, tuf=TUF) + + downloaded_content = util_test_tools.read_file_content(downloaded_file) + print downloaded_content + + # Stage 3 + # ------- + # Developer patches the file and updates the repository again. + util_test_tools.modify_file_at_repository(filepath, 'C'*10) + + # Updating tuf repository. This will copy files from regular repository + # into tuf repository and refresh the metadata + if TUF: + util_test_tools.tuf_refresh_repo(root_repo, keyids) + + # Attacker replaces the metadata and the target file. + shutil.copyfile(unpatched_file, target) + shutil.copyfile(release_meta_file_old, release_meta_file) + shutil.copyfile(targets_meta_file_old, targets_meta_file) + + # Attacker replaces the patched file with the unpatched one. + shutil.copyfile(unpatched_file, filepath) + + # Client tries to downloads the newly patched file. + _download(url=url_to_file, filename=downloaded_file, tuf=TUF) + + # Check whether the attack succeeded by inspecting the content of the + # update. The update should contain 'Test NOT A'. + downloaded_content = util_test_tools.read_file_content(downloaded_file) + if ('B'*10) != downloaded_content: + print downloaded_content + raise MixAndMatchAttackAlert(ERROR_MSG) + + + finally: + pass + #util_test_tools.cleanup(root_repo, server_proc) + + + + + +try: + test_mix_and_match_attack(TUF=False) +except MixAndMatchAttackAlert, error: + print error + + +try: + test_mix_and_match_attack(TUF=True) +except MixAndMatchAttackAlert, error: + print error \ No newline at end of file diff --git a/tuf/tests/system_tests/test_replay_attack.py b/tuf/tests/system_tests/test_replay_attack.py index 1afee0f1..4250a854 100755 --- a/tuf/tests/system_tests/test_replay_attack.py +++ b/tuf/tests/system_tests/test_replay_attack.py @@ -15,7 +15,11 @@ Simulate a replay attack. A simple client update vs. client update implementing TUF. -Note: The interposition provided by 'tuf.interposition' is used to intercept + In the replay attack an attacker is able to trick clients into installing + software that is older than that which the client previously knew to be + available. + +NOTE: The interposition provided by 'tuf.interposition' is used to intercept all calls made by urllib/urillib2 to certain hostname specified in the interposition configuration file. Look up interposition.py for more information and illustration of a sample contents of the interposition @@ -33,8 +37,8 @@ import shutil import urllib import tempfile -import util_test_tools +import util_test_tools from tuf.interposition import urllib_tuf @@ -77,7 +81,7 @@ def test_replay_attack(TUF=False): """ - ERROR_MSG = 'Replay Attack was Successful!\n' + ERROR_MSG = '\tReplay Attack was Successful!\n\n' try: @@ -100,6 +104,8 @@ def test_replay_attack(TUF=False): shutil.copy(filepath, evil_dir) if TUF: + print 'TUF ...' + # Update TUF metadata before attacker modifies anything. util_test_tools.tuf_refresh_repo(root_repo, keyids) @@ -126,7 +132,7 @@ def test_replay_attack(TUF=False): util_test_tools.modify_file_at_repository(filepath, 'Test NOT A') # Updating tuf repository. This will copy files from regular repository - # into tuf repository and refresh the metad + # into tuf repository and refresh the metadata if TUF: util_test_tools.tuf_refresh_repo(root_repo, keyids) @@ -175,7 +181,6 @@ def test_replay_attack(TUF=False): try: test_replay_attack(TUF=False) - except ReplayAttackAlert, error: print error @@ -183,6 +188,5 @@ def test_replay_attack(TUF=False): try: test_replay_attack(TUF=True) - except ReplayAttackAlert, error: print error \ No newline at end of file diff --git a/tuf/tests/system_tests/test_slow_retrieval_attack.py b/tuf/tests/system_tests/test_slow_retrieval_attack.py index fa260818..d2f9f4d9 100755 --- a/tuf/tests/system_tests/test_slow_retrieval_attack.py +++ b/tuf/tests/system_tests/test_slow_retrieval_attack.py @@ -15,7 +15,13 @@ Simulate slow retrieval attack. A simple client update vs. client update implementing TUF. -Note: The interposition provided by 'tuf.interposition' is used to intercept + During the slow retrieval attack, attacker is able to prevent clients from + being aware of interference with receiving updates by responding to client + requests so slowly that automated updates never complete. + +NOTE: Currently TUF does not protect against slow retrieval attacks. + +NOTE: The interposition provided by 'tuf.interposition' is used to intercept all calls made by urllib/urillib2 to certain network locations specified in the interposition configuration file. Look up interposition.py for more information and illustration of a sample contents of the interposition @@ -29,17 +35,14 @@ """ -# TODO: implement slow retrieval server... And design the test. -# Should there be a time bracket, during which the download is -# expected to happen? - import os import time import urllib import random import subprocess -import util_test_tools +from multiprocessing import Process +import util_test_tools from tuf.interposition import urllib_tuf @@ -52,29 +55,30 @@ class SlowRetrievalAttackAlert(Exception): pass -def download_using_urlopen(url, tuf=False): +def _download(url, filename, tuf=False): if tuf: - return urllib_tuf.urlopen(url) + urllib_tuf.urlretrieve(url, filename) + else: - return urllib.urlopen(url) + urllib.urlretrieve(url, filename) -def test_slow_retrieval_attack(TUF=True): +def test_slow_retrieval_attack(TUF=False): + WAIT_TIME = 5 # Number of seconds to wait until download completes. ERROR_MSG = '\tSlow Retrieval Attack was Successful!\n\n' # Launch the server. port = random.randint(30000, 45000) - print port command = ['python', 'slow_retrieval_server.py', str(port)] server_process = subprocess.Popen(command, stderr=subprocess.PIPE) time.sleep(.1) try: # Setup. - root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF, port=port) - print 'root_repo: '+root_repo + root_repo, url, server_proc, keyids = \ + util_test_tools.init_repo(tuf=TUF, port=port) reg_repo = os.path.join(root_repo, 'reg_repo') downloads = os.path.join(root_repo, 'downloads') @@ -84,6 +88,7 @@ def test_slow_retrieval_attack(TUF=True): url_to_file = url+'reg_repo/'+file_basename downloaded_file = os.path.join(downloads, file_basename) + if TUF: print 'TUF ...' tuf_repo = os.path.join(root_repo, 'tuf_repo') @@ -99,13 +104,14 @@ def test_slow_retrieval_attack(TUF=True): url_to_file = 'http://localhost:9999/'+file_basename - - # Download the content of the file using the server. - # NOTE: if TUF is enabled the metadata files will be downloaded first. This - # WILL take a long time. - file_content = download_using_urlopen(url_to_file, tuf=TUF) - - print file_content.read() + # Client tries to download. + # NOTE: if TUF is enabled the metadata files will be downloaded first. + proc = Process(target=_download, args=(url_to_file, downloaded_file, TUF)) + proc.start() + proc.join(WAIT_TIME) + if proc.exitcode is None: + proc.terminate() + raise SlowRetrievalAttackAlert(ERROR_MSG) finally: @@ -116,4 +122,16 @@ def test_slow_retrieval_attack(TUF=True): util_test_tools.cleanup(root_repo, server_proc) -test_slow_retrieval_attack() \ No newline at end of file + + + +try: + test_slow_retrieval_attack(TUF=False) +except SlowRetrievalAttackAlert, error: + print error + + +try: + test_slow_retrieval_attack(TUF=True) +except SlowRetrievalAttackAlert, error: + print error \ No newline at end of file