From 4a6490723ecbb9749e01ecca47dee5c54ebb3aef Mon Sep 17 00:00:00 2001 From: Kon Date: Tue, 2 Apr 2013 14:02:15 -0400 Subject: [PATCH] Adding tests: test_indefinite_freeze_attack.py, test_slow_retrieval_attack.py --- tuf/repo/signerlib.py | 2 +- .../test_indefinite_freeze_attack.py | 155 ++++++++++-------- .../test_slow_retrieval_attack.py | 154 ++++++----------- tuf/tests/system_tests/util_test_tools.py | 35 +++- 4 files changed, 159 insertions(+), 187 deletions(-) diff --git a/tuf/repo/signerlib.py b/tuf/repo/signerlib.py index 2ed337a3..2fd2b428 100755 --- a/tuf/repo/signerlib.py +++ b/tuf/repo/signerlib.py @@ -1133,7 +1133,7 @@ def build_timestamp_file(timestamp_keyids, metadata_directory): release_filepath = os.path.join(metadata_directory, RELEASE_FILENAME) timestamp_filepath = os.path.join(metadata_directory, TIMESTAMP_FILENAME) - # Generate and sign the release metadata. + # Generate and sign the timestamp metadata. timestamp_metadata = generate_timestamp_metadata(release_filepath) signable = sign_metadata(timestamp_metadata, timestamp_keyids, timestamp_filepath) diff --git a/tuf/tests/system_tests/test_indefinite_freeze_attack.py b/tuf/tests/system_tests/test_indefinite_freeze_attack.py index a5e29667..1017fb4f 100755 --- a/tuf/tests/system_tests/test_indefinite_freeze_attack.py +++ b/tuf/tests/system_tests/test_indefinite_freeze_attack.py @@ -14,117 +14,130 @@ Simulate an indefinite freeze attack. + In an indefinite freeze attack, attacker is able to respond to client's + requests with the same, outdated metadata without the client being aware. + """ import os +import sys +import time import shutil import urllib import tempfile import util_test_tools -import tuf.interposition +import tuf.formats +import tuf.repo.signerlib as signerlib +from tuf.interposition import urllib_tuf + + +# Disable logging. +util_test_tools.disable_logging() -class IndefineteFreezeAttackError(Exception): +class IndefiniteFreezeAttackAlert(Exception): pass -def test_replay_attack(TUF=False): + +EXPIRATION = 1 # second(s) + + + +def _remake_timestamp(metadata_dir, keyids): + """Create timestamp metadata object. Modify expiration date. Sign and + write the metadata. + """ + release_filepath = os.path.join(metadata_dir, 'release.txt') + timestamp_filepath = os.path.join(metadata_dir, 'timestamp.txt') + timestamp_metadata = signerlib.generate_timestamp_metadata(release_filepath) + timestamp_metadata['signed']['expires'] = \ + tuf.formats.format_time(time.time() + EXPIRATION) + signable = \ + signerlib.sign_metadata(timestamp_metadata, keyids, timestamp_filepath) + signerlib.write_metadata_file(signable, timestamp_filepath) + + + +def _download(url, filename, tuf=False): + if tuf: + urllib_tuf.urlretrieve(url, filename) + + else: + urllib.urlretrieve(url, filename) + + + + + +def test_indefinite_freeze_attack(TUF=False): """ TUF: If set to 'False' all directories that start with 'tuf_' are ignored, indicating that tuf is not implemented. - - + The idea here is to expire timestamp metadata so that the attacker """ + ERROR_MSG = '\tIndefinite Freeze Attack was Successful!\n\n' + + try: # Setup. - root_repo, url, server_proc, keyids, interpose_json = \ - util_test_tools.init_repo(tuf=TUF) + root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF) reg_repo = os.path.join(root_repo, 'reg_repo') tuf_repo = os.path.join(root_repo, 'tuf_repo') + metadata_dir = os.path.join(tuf_repo, 'metadata') downloads = os.path.join(root_repo, 'downloads') - tuf_targets = os.path.join(tuf_repo, 'targets') - + # Add file to 'repo' directory: {root_repo} filepath = util_test_tools.add_file_to_repository(reg_repo, 'Test A') file_basename = os.path.basename(filepath) url_to_repo = url+'reg_repo/'+file_basename downloaded_file = os.path.join(downloads, file_basename) - # Attacker saves the original file into 'evil_dir'. - evil_dir = tempfile.mkdtemp(dir=root_repo) - vulnerable_file = os.path.join(evil_dir, file_basename) - shutil.copy(filepath, evil_dir) - - # Refresh the tuf repository and apply tuf interpose. if TUF: - util_test_tools.tuf_refresh_repo(root_repo, keyids) - tuf.interposition.configure(interpose_json) - tuf.interposition.interpose() + print 'TUF ...' - # End Setup. - - # Client performs initial update. - urllib.urlretrieve(url_to_repo, downloaded_file) - - # Downloads are stored in the same directory '{root_repo}/downloads/' - # for regular and tuf clients. - downloaded_content = util_test_tools.read_file_content(downloaded_file) - msg = '[Initial Updata] Failed to download the file.' - if 'Test A' != downloaded_content: - raise TestSetupError(msg) - - # Developer patches the file and updates the repository. - util_test_tools.modify_file_at_repository(filepath, 'Test NOT A') - - # Updating tuf repository. This will copy files from regular repository - # into tuf repository and refresh the metad - if TUF: + # Update TUF metadata before attacker modifies anything. util_test_tools.tuf_refresh_repo(root_repo, keyids) - # Client downloads the patched file. - urllib.urlretrieve(url_to_repo, downloaded_file) + # Modify the url. Remember that the interposition will intercept + # urls that have 'localhost:9999' hostname, which was specified in + # the json interposition configuration file. Look for 'hostname' + # in 'util_test_tools.py'. Further, the 'file_basename' is the target + # path relative to 'targets_dir'. + url_to_repo = 'http://localhost:9999/'+file_basename - # Content of the downloaded file. - downloaded_content = util_test_tools.read_file_content(downloaded_file) - msg = '[Update] Failed to update the file.' - if 'Test NOT A' != downloaded_content: - raise TestSetupError(msg) + # Make timestamp metadata with close expiration date (2s). + _remake_timestamp(metadata_dir, keyids) - # Attacker tries to be clever, he manages to modifies regular and tuf - # targets directory by replacing a patched file with an old one. - if os.path.isdir(tuf_targets): - target = os.path.join(tuf_targets, file_basename) - util_test_tools.delete_file_at_repository(target) - shutil.copy(vulnerable_file, tuf_targets) - # Verify that 'target' is an old, un-patched file. - target = os.path.join(tuf_targets, file_basename) - target_content = util_test_tools.read_file_content(target) - msg = "The 'target' file contains new data!" - if 'Test A' != target_content: - raise TestSetupError(msg) + + # Client performs initial download. + try: + _download(url=url_to_repo, filename=downloaded_file, tuf=TUF) + except tuf.ExpiredMetadataError: + msg = ('Metadata has expired too soon, extend expiration period. '+ + 'Current expiration is set to: '+repr(EXPIRATION)+' second(s).') + sys.exit(msg) + + # Expire timestamp. + time.sleep(EXPIRATION) + + # Try downloading again, this should raise an error. + try: + _download(url=url_to_repo, filename=downloaded_file, tuf=TUF) + except tuf.ExpiredMetadataError, error: + pass else: - util_test_tools.delete_file_at_repository(filepath) - shutil.copy(vulnerable_file, reg_repo) + raise IndefiniteFreezeAttackAlert(ERROR_MSG) - # Client downloads the file once time. - urllib.urlretrieve(url_to_repo, downloaded_file) - - # Check whether the attack succeeded by inspecting the content of the - # update. The update should contain 'Test NOT A'. - downloaded_content = util_test_tools.read_file_content(downloaded_file) - msg = 'Replay attack was successful!\n' - if 'Test NOT A' != downloaded_content: - raise ReplayAttackError(msg) finally: - tuf.interposition.go_away() util_test_tools.cleanup(root_repo, server_proc) @@ -132,11 +145,11 @@ def test_replay_attack(TUF=False): try: - test_replay_attack(TUF=False) -except ReplayAttackError, error: + test_indefinite_freeze_attack(TUF=False) +except IndefiniteFreezeAttackAlert, error: print error try: - test_replay_attack(TUF=True) -except ReplayAttackError, error: + test_indefinite_freeze_attack(TUF=True) +except IndefiniteFreezeAttackAlert, error: print error \ No newline at end of file diff --git a/tuf/tests/system_tests/test_slow_retrieval_attack.py b/tuf/tests/system_tests/test_slow_retrieval_attack.py index c15de348..fa260818 100755 --- a/tuf/tests/system_tests/test_slow_retrieval_attack.py +++ b/tuf/tests/system_tests/test_slow_retrieval_attack.py @@ -40,35 +40,70 @@ import subprocess import util_test_tools +from tuf.interposition import urllib_tuf + + # Disable logging. util_test_tools.disable_logging() -def test(): + +class SlowRetrievalAttackAlert(Exception): + pass + + +def download_using_urlopen(url, tuf=False): + if tuf: + return urllib_tuf.urlopen(url) + else: + return urllib.urlopen(url) + + + +def test_slow_retrieval_attack(TUF=True): + + ERROR_MSG = '\tSlow Retrieval Attack was Successful!\n\n' # Launch the server. port = random.randint(30000, 45000) + print port command = ['python', 'slow_retrieval_server.py', str(port)] server_process = subprocess.Popen(command, stderr=subprocess.PIPE) time.sleep(.1) try: - - root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=True) - + # Setup. + root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF, port=port) + print 'root_repo: '+root_repo reg_repo = os.path.join(root_repo, 'reg_repo') - - # Make a file in 'reg_repo'. - filepath = util_test_tools.add_file_to_repository(reg_repo, 'Test') + downloads = os.path.join(root_repo, 'downloads') - # Path of the file relative to 'root_repo'. - relative_filepath = os.path.relpath(filepath) + # Add file to 'repo' directory: {root_repo} + filepath = util_test_tools.add_file_to_repository(reg_repo, 'A'*10) + file_basename = os.path.basename(filepath) + url_to_file = url+'reg_repo/'+file_basename + downloaded_file = os.path.join(downloads, file_basename) + + if TUF: + print 'TUF ...' + tuf_repo = os.path.join(root_repo, 'tuf_repo') + + # Update TUF metadata before attacker modifies anything. + util_test_tools.tuf_refresh_repo(root_repo, keyids) + + # Modify the url. Remember that the interposition will intercept + # urls that have 'localhost:9999' hostname, which was specified in + # the json interposition configuration file. Look for 'hostname' + # in 'util_test_tools.py'. Further, the 'file_basename' is the target + # path relative to 'targets_dir'. + url_to_file = 'http://localhost:9999/'+file_basename + - # Tailor the url. - url_to_file = 'http://localhost:'+str(port)+'/'+relative_filepath # Download the content of the file using the server. - file_content = urllib.urlopen(url_to_file) + # NOTE: if TUF is enabled the metadata files will be downloaded first. This + # WILL take a long time. + file_content = download_using_urlopen(url_to_file, tuf=TUF) print file_content.read() @@ -81,97 +116,4 @@ def test(): util_test_tools.cleanup(root_repo, server_proc) -test() - - - -''' -import os -import shutil -import urllib -import tempfile -import util_test_tools - -from tuf.interposition import urllib_tuf - -# Disable logging. -util_test_tools.disable_logging() - - -class TestSetupError(Exception): - pass - -class SlowRetrievalAttack(Exception): - pass - - - -def download(url, filename, tuf=False): - if tuf: - urllib_tuf.urlretrieve(url, filename) - else: - urllib.urlretrieve(url, filename) - - - -def test_arbitrary_package_attack(TUF=False): - """ - - TUF: - If set to 'False' all directories that start with 'tuf_' are ignored, - indicating that tuf is not implemented. - - - Illustrate endless data attack vulnerability. - - """ - - try: - # Setup. - root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF) - reg_repo = os.path.join(root_repo, 'reg_repo') - tuf_repo = os.path.join(root_repo, 'tuf_repo') - downloads = os.path.join(root_repo, 'downloads') - tuf_targets = os.path.join(tuf_repo, 'targets') - - # Add a file to 'repo' directory: {root_repo} - filepath = util_test_tools.add_file_to_repository(reg_repo, 'Test A') - file_basename = os.path.basename(filepath) - url_to_repo = url+'reg_repo/'+file_basename - downloaded_file = os.path.join(downloads, file_basename) - - # Refresh the tuf repository and apply tuf interpose. - if TUF: - # Update TUF metadata before attacker modifies anything. - util_test_tools.tuf_refresh_repo(root_repo, keyids) - - # End Setup. - - # Client downloads (tries to download) the file. - download(url=url_to_repo, filename=downloaded_file, tuf=TUF) - - # Check whether the attack succeeded by inspecting the content of the - # update. The update should contain 'Test A'. - downloaded_content = util_test_tools.read_file_content(downloaded_file) - msg = 'Slow Retrieval Attack was successful!\n' - print downloaded_content - if 'Test A' != downloaded_content: - raise SlowRetrievalAttack(msg) - - finally: - util_test_tools.cleanup(root_repo, server_proc) - - - - -try: - test_arbitrary_package_attack(TUF=False) -except SlowRetrievalAttack, error: - print error - -try: - test_arbitrary_package_attack(TUF=True) -except SlowRetrievalAttack, error: - print error - -''' \ No newline at end of file +test_slow_retrieval_attack() \ No newline at end of file diff --git a/tuf/tests/system_tests/util_test_tools.py b/tuf/tests/system_tests/util_test_tools.py index b622a782..b5d8838b 100755 --- a/tuf/tests/system_tests/util_test_tools.py +++ b/tuf/tests/system_tests/util_test_tools.py @@ -155,7 +155,7 @@ def disable_logging(): -def init_repo(tuf=False): +def init_repo(tuf=False, port=None): # Temp root directory for regular and tuf repositories. # WARNING: tuf client stores files in '{root_repo}/downloads/' directory! # Make sure regular download are NOT stored in the that directory when @@ -166,11 +166,12 @@ def init_repo(tuf=False): root_repo = tempfile.mkdtemp(dir=os.getcwd()) os.mkdir(os.path.join(root_repo, 'reg_repo')) os.mkdir(os.path.join(root_repo, 'downloads')) - - # Start a simple server pointing to the repository directory. - port = random.randint(30000, 45000) - command = ['python', '-m', 'SimpleHTTPServer', str(port)] - server_proc = subprocess.Popen(command, stderr=subprocess.PIPE) + server_proc = None + if port is None: + # Start a simple server pointing to the repository directory. + port = random.randint(30000, 45000) + command = ['python', '-m', 'SimpleHTTPServer', str(port)] + server_proc = subprocess.Popen(command, stderr=subprocess.PIPE) # Tailor url for the repository. In order to download a 'file.txt' # from 'reg_repo' do: url+'reg_repo/file.txt' @@ -193,10 +194,15 @@ def init_repo(tuf=False): def cleanup(root_repo, server_process): - if server_process.returncode is None: - server_process.kill() + if server_process is not None: + if server_process.returncode is None: + server_process.kill() + print 'Server terminated.\n' + # Clear the keystore. + keystore.clear_keystore() + # Removing repository directory. try: shutil.rmtree(root_repo) @@ -516,6 +522,11 @@ def create_delegation(tuf_repo, delegated_targets_path, keyid, keyid_password, keystore_dir = os.path.join(tuf_repo, 'keystore') metadata_dir = os.path.join(tuf_repo, 'metadata') + original_get_metadata_directory = signercli._get_metadata_directory + original_prompt = signercli._prompt + original_get_password = signercli._get_password + original_get_keyids = signercli._get_keyids + # Patch signercli._get_metadata_directory() _get_metadata_directory(metadata_dir) @@ -557,4 +568,10 @@ def _mock_get_keyid(junk, keyid=keyid): # Patch signercli._get_keyids(). signercli._get_keyids = _mock_get_keyid - signercli.make_delegation(keystore_dir) \ No newline at end of file + signercli.make_delegation(keystore_dir) + + keystore.clear_keystore() + signercli._get_keyids = original_get_keyids + signercli._get_password = original_get_password + signercli._prompt = original_prompt + signercli._get_metadata_directory = original_get_metadata_directory \ No newline at end of file