diff --git a/tests/integration/test_arbitrary_package_attack.py b/tests/integration/test_arbitrary_package_attack.py index 8164c678..c45b2198 100755 --- a/tests/integration/test_arbitrary_package_attack.py +++ b/tests/integration/test_arbitrary_package_attack.py @@ -15,27 +15,25 @@ Simulate an arbitrary package attack. A simple client update vs. client update implementing TUF. -Note: The interposition provided by 'tuf.interposition' is used to intercept -all calls made by urllib/urillib2 to certain hostnames specified in -the interposition configuration file. Look up interposition.py for more -information and illustration of a sample contents of the interposition -configuration file. Interposition was meant to make TUF integration with an -existing software updater an easy process. This allows for more flexibility -to the existing software updater. However, if you are planning to solely use -TUF there should be no need for interposition, all necessary calls will be -generated from within TUF. + Note: The interposition provided by 'tuf.interposition' is used to intercept + all calls made by urllib/urillib2 to certain hostnames specified in + the interposition configuration file. Look up interposition.py for more + information and illustration of a sample contents of the interposition + configuration file. Interposition was meant to make TUF integration with an + existing software updater an easy process. This allows for more flexibility + to the existing software updater. However, if you are planning to solely use + TUF there should be no need for interposition, all necessary calls will be + generated from within TUF. -Note: There is no difference between 'updates' and 'target' files. + There is no difference between 'updates' and 'target' files. """ import os -import shutil import urllib -import tempfile import tuf -import tuf.interposition.urllib_tuf as urllib_tuf +import tuf.interposition import tuf.tests.util_test_tools as util_test_tools @@ -45,9 +43,9 @@ class ArbitraryPackageAlert(Exception): -def _download(url, filename, tuf=False): - if tuf: - urllib_tuf.urlretrieve(url, filename) +def _download(url, filename, using_tuf=False): + if using_tuf: + tuf.interposition.urllib_tuf.urlretrieve(url, filename) else: urllib.urlretrieve(url, filename) @@ -56,24 +54,23 @@ def _download(url, filename, tuf=False): -def test_arbitrary_package_attack(TUF=False): +def test_arbitrary_package_attack(using_tuf=False): """ - - TUF: - If set to 'False' all directories that start with 'tuf_' are ignored, - indicating that tuf is not implemented. - Illustrate arbitrary package attack vulnerability. - + + + using_tuf: + If set to 'False' all directories that start with 'tuf_' are ignored, + indicating that tuf is not implemented. """ - ERROR_MSG = 'Arbitrary Package Attack was Successful!\n' + ERROR_MSG = 'Arbitrary Package Attack was Successful!' try: # Setup. - root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF) + root_repo, url, server_proc, keyids = util_test_tools.init_repo(using_tuf) reg_repo = os.path.join(root_repo, 'reg_repo') tuf_repo = os.path.join(root_repo, 'tuf_repo') downloads = os.path.join(root_repo, 'downloads') @@ -85,7 +82,7 @@ def test_arbitrary_package_attack(TUF=False): url_to_repo = url+'reg_repo/'+file_basename downloaded_file = os.path.join(downloads, file_basename) - if TUF: + if using_tuf: # Update TUF metadata before attacker modifies anything. util_test_tools.tuf_refresh_repo(root_repo, keyids) @@ -108,13 +105,15 @@ def test_arbitrary_package_attack(TUF=False): try: # Client downloads (tries to download) the file. - _download(url=url_to_repo, filename=downloaded_file, tuf=TUF) + _download(url_to_repo, downloaded_file, using_tuf) - except tuf.DownloadError: - # If tuf.DownloadError is raised, this means that TUF has prevented - # the download of an unrecognized file. Enable the logging to see, - # what actually happened. - pass + except tuf.NoWorkingMirrorError, error: + # We only set up one mirror, so if it fails, we expect a + # NoWorkingMirrorError. If TUF has worked as intended, the mirror error + # contained within should be a BadHashError. + mirror_error = error.mirror_errors[url+'tuf_repo/targets/'+file_basename] + + assert isinstance(mirror_error, tuf.BadHashError) else: # Check whether the attack succeeded by inspecting the content of the @@ -131,17 +130,24 @@ def test_arbitrary_package_attack(TUF=False): - +print 'Attempting arbitrary package attack without TUF:' try: - test_arbitrary_package_attack(TUF=False) + test_arbitrary_package_attack(using_tuf=False) except ArbitraryPackageAlert, error: print error +else: + print 'Extraneous dependency attack failed.' +print +print 'Attempting arbitrary package attack with TUF:' try: - test_arbitrary_package_attack(TUF=True) + test_arbitrary_package_attack(using_tuf=True) except ArbitraryPackageAlert, error: print error +else: + print 'Extraneous dependency attack failed.' +print diff --git a/tests/integration/test_delegations.py b/tests/integration/test_delegations.py index 94f84917..bd373f0e 100755 --- a/tests/integration/test_delegations.py +++ b/tests/integration/test_delegations.py @@ -107,7 +107,7 @@ def setUp(self): version = version+1 expiration = tuf.formats.format_time(time.time()+86400) - root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=True) + root_repo, url, server_proc, keyids = util_test_tools.init_repo(using_tuf=True) # Server side repository. tuf_repo = os.path.join(root_repo, 'tuf_repo') @@ -331,14 +331,29 @@ def make_targets_metadata(self): self.T2_metadata =\ make_metadata(self.tuf_repo, self.signed_targets[self.T2], version, expiration) - self.T3_metadata = \ + self.T3_metadata =\ make_metadata(self.tuf_repo, self.signed_targets[self.T3], version, expiration) def test_that_initial_update_fails_with_undelegated_signing_of_targets(self): - # Expect to see a particular exception on initial update. - self.assertRaises(tuf.MetadataNotAvailableError, self.do_update) + """We expect to see ForbiddenTargetError on initial update because + delegated targets roles sign for targets that they were not delegated + to.""" + + # http://docs.python.org/2/library/unittest.html#unittest.TestCase.assertRaises + with self.assertRaises(tuf.NoWorkingMirrorError) as context_manager: + self.do_update() + + mirror_errors = context_manager.exception.mirror_errors + forbidden_target_error = False + + for mirror_url, mirror_error in mirror_errors.iteritems(): + if isinstance(mirror_error, tuf.ForbiddenTargetError): + forbidden_target_error = True + break + + self.assertEqual(forbidden_target_error, True) @@ -455,8 +470,22 @@ def make_targets_metadata(self): def test_that_initial_update_fails_with_many_roles_sharing_a_target(self): - # Expect to see a particular exception on initial update. - self.assertRaises(tuf.DownloadError, self.do_update) + """We expect to see BadHashError on initial update because the hash + metadata mismatches the target.""" + + # http://docs.python.org/2/library/unittest.html#unittest.TestCase.assertRaises + with self.assertRaises(tuf.NoWorkingMirrorError) as context_manager: + self.do_update() + + mirror_errors = context_manager.exception.mirror_errors + bad_hash_error = False + + for mirror_url, mirror_error in mirror_errors.iteritems(): + if isinstance(mirror_error, tuf.BadHashError): + bad_hash_error = True + break + + self.assertEqual(bad_hash_error, True) diff --git a/tests/integration/test_endless_data_attack.py b/tests/integration/test_endless_data_attack.py index 3f1e0ac8..b73ccf94 100755 --- a/tests/integration/test_endless_data_attack.py +++ b/tests/integration/test_endless_data_attack.py @@ -37,7 +37,7 @@ import urllib import tuf -import tuf.interposition.urllib_tuf as urllib_tuf +import tuf.interposition import tuf.tests.util_test_tools as util_test_tools @@ -47,32 +47,31 @@ class EndlessDataAttack(Exception): -def _download(url, filename, TUF=False): - if TUF: - urllib_tuf.urlretrieve(url, filename) +def _download(url, filename, using_tuf=False): + if using_tuf: + tuf.interposition.urllib_tuf.urlretrieve(url, filename) else: urllib.urlretrieve(url, filename) -def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False): +def test_endless_data_attack(using_tuf=False, TIMESTAMP=False): """ - - TUF: - If set to 'False' all directories that start with 'tuf_' are ignored, - indicating that tuf is not implemented. - Illustrate endless data attack vulnerability. + + using_tuf: + If set to 'False' all directories that start with 'tuf_' are ignored, + indicating that tuf is not implemented. + """ ERROR_MSG = 'Endless Data Attack was Successful!\n' - try: # Setup. - root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF) + root_repo, url, server_proc, keyids = util_test_tools.init_repo(using_tuf) reg_repo = os.path.join(root_repo, 'reg_repo') tuf_repo = os.path.join(root_repo, 'tuf_repo') downloads = os.path.join(root_repo, 'downloads') @@ -91,7 +90,7 @@ def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False): noisy_data = 'X'*100000 - if TUF: + if using_tuf: # Update TUF metadata before attacker modifies anything. util_test_tools.tuf_refresh_repo(root_repo, keyids) # Modify the url. Remember that the interposition will intercept @@ -126,11 +125,11 @@ def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False): # Client downloads (tries to download) the file. try: - _download(url=url_to_repo, filename=downloaded_file, TUF=TUF) + _download(url_to_repo, downloaded_file, using_tuf) except Exception, exception: # Because we are extending the true timestamp TUF metadata with invalid # JSON, we except to catch an error about invalid metadata JSON. - if TUF and TIMESTAMP: + if using_tuf and TIMESTAMP: endless_data_attack = False for mirror_url, mirror_error in exception.mirror_errors.iteritems(): @@ -146,7 +145,7 @@ def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False): # When we test downloading "endless" timestamp with TUF, we want to skip # the following test because downloading the timestamp should have failed. - if not (TUF and TIMESTAMP): + if not (using_tuf and TIMESTAMP): # Check whether the attack succeeded by inspecting the content of the # update. The update should contain 'Test A'. Technically it suffices # to check whether the file was downloaded or not. @@ -162,12 +161,12 @@ def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False): try: - test_arbitrary_package_attack(TUF=False, TIMESTAMP=False) + test_endless_data_attack(using_tuf=False, TIMESTAMP=False) except EndlessDataAttack, error: print('Endless data attack worked on download without TUF!') try: - test_arbitrary_package_attack(TUF=True, TIMESTAMP=False) + test_endless_data_attack(using_tuf=True, TIMESTAMP=False) except EndlessDataAttack, error: print('Endless data attack worked on download without TUF!') print(str(error)) @@ -177,12 +176,9 @@ def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False): try: # This test fails because the timestamp metadata has been extended with # random data from its true length, thereby resulting in invalid JSON. - test_arbitrary_package_attack(TUF=True, TIMESTAMP=True) + test_endless_data_attack(using_tuf=True, TIMESTAMP=True) except EndlessDataAttack, error: print('Endless data attack worked on download without TUF!') print(str(error)) else: print('Endless data attack did not work on download with TUF!') - - - diff --git a/tests/integration/test_extraneous_dependencies_attack.py b/tests/integration/test_extraneous_dependencies_attack.py index 2813bdfd..3e1c8b1c 100755 --- a/tests/integration/test_extraneous_dependencies_attack.py +++ b/tests/integration/test_extraneous_dependencies_attack.py @@ -38,12 +38,10 @@ import os -import shutil import urllib -import tempfile import tuf -import tuf.interposition.urllib_tuf as urllib_tuf +import tuf.interposition import tuf.tests.util_test_tools as util_test_tools @@ -52,63 +50,72 @@ class ExtraneousDependencyAlert(Exception): -# Interpret the contents of the file it downloads as a list of dependent -# files from the same repository. -def _download(url, filename, directory, TUF=False): +# Interpret anything following 'requires:' in the contents of the file it +# downloads as a comma-separated list of dependent files from the same repository. +def _download(url, filename, directory, using_tuf=False): destination = os.path.join(directory, filename) - if TUF: - urllib_tuf.urlretrieve(url, destination) + if using_tuf: + tuf.interposition.urllib_tuf.urlretrieve(url, destination) else: urllib.urlretrieve(url, destination) - if util_test_tools.read_file_content(destination) != '': - required_files = util_test_tools.read_file_content(destination).split(',') + file_contents = util_test_tools.read_file_content(destination) + + # Parse the list of required files (if it exists) and download them. + if file_contents.find('requires:') != -1: + required_files = file_contents[file_contents.find('requires:') + 9:].split(',') for required_filename in required_files: required_file_url = os.path.dirname(url)+os.sep+required_filename - _download(required_file_url, required_filename, directory, TUF) + _download(required_file_url, required_filename, directory, using_tuf) -def test_extraneous_dependency_attack(TUF=False): +def test_extraneous_dependency_attack(using_tuf=False): """ - Illustrate arbitrary package attack vulnerability. + Illustrate extraneous dependency attack vulnerability. - TUF: + using_tuf: If set to 'False' all directories that start with 'tuf_' are ignored, indicating that tuf is not implemented. """ - ERROR_MSG = 'Extraneous Dependency Attack was Successful!\n' - + ERROR_MSG = 'Extraneous Dependency Attack was Successful!' try: # Setup. - root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF) + root_repo, url, server_proc, keyids = util_test_tools.init_repo(using_tuf) reg_repo = os.path.join(root_repo, 'reg_repo') tuf_repo = os.path.join(root_repo, 'tuf_repo') downloads = os.path.join(root_repo, 'downloads') targets_dir = os.path.join(tuf_repo, 'targets') # Add files to 'repo' directory: {root_repo}. - good_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo, '') + good_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo, + 'the file you need') good_dependency_basename = os.path.basename(good_dependency_filepath) - bad_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo, '') + bad_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo, + 'the file you don\'t need') bad_dependency_basename = os.path.basename(bad_dependency_filepath) # The dependent file lists the good dependency. dependent_filepath = util_test_tools.add_file_to_repository(reg_repo, - good_dependency_basename) + 'requires:'+good_dependency_basename) dependent_basename = os.path.basename(dependent_filepath) url_to_repo = url+'reg_repo/'+dependent_basename - modified_dependency_list = good_dependency_basename+','+\ - bad_dependency_basename - if TUF: + # List the bad dependency first. If an attacker modifies a target by + # simply appending the file contents, tuf.download will ignore the appended + # data, downloading only as much data as the TUF metadata says the target + # should contain. + modified_dependency_list = bad_dependency_basename+','+\ + good_dependency_basename + + if using_tuf: # Update TUF metadata before attacker modifies anything. util_test_tools.tuf_refresh_repo(root_repo, keyids) @@ -119,28 +126,32 @@ def test_extraneous_dependency_attack(TUF=False): # path relative to 'targets_dir'. url_to_repo = 'http://localhost:9999/'+dependent_basename - # Attacker adds the dependency in the targets repository. - target = os.path.join(targets_dir, dependent_basename) - util_test_tools.modify_file_at_repository(target, - modified_dependency_list) + # Attacker modifies the depenent file in the targets repository, adding + # the bad dependency to its list. + dependent_target_filepath = os.path.join(targets_dir, dependent_basename) + util_test_tools.modify_file_at_repository(dependent_target_filepath, + 'requires:'+modified_dependency_list) - # Attacker adds the dependency in the regular repository. + # Attacker modifies the depenent file in the regular repository, adding + # the bad dependency to its list. util_test_tools.modify_file_at_repository(dependent_filepath, - modified_dependency_list) + 'requires:'+modified_dependency_list) # End of Setup. try: # Client downloads (tries to download) the file. - _download(url=url_to_repo, filename=dependent_basename, - directory=downloads, TUF=TUF) + _download(url_to_repo, dependent_basename, downloads, using_tuf) - except tuf.DownloadError: - # If tuf.DownloadError is raised, this means that TUF has prevented - # the download of an unrecognized file. Enable the logging to see, - # what actually happened. - pass + except tuf.NoWorkingMirrorError, error: + # We only set up one mirror, so if it fails, we expect a + # NoWorkingMirrorError. If TUF has worked as intended, the mirror error + # contained within should be a BadHashError. + mirror_error = \ + error.mirror_errors[url+'tuf_repo/targets/'+dependent_basename] + + assert isinstance(mirror_error, tuf.BadHashError) else: # Check if the legitimate dependency was downloaded. @@ -158,16 +169,20 @@ def test_extraneous_dependency_attack(TUF=False): print 'Attempting extraneous dependency attack without TUF:' try: - test_extraneous_dependency_attack(TUF=False) + test_extraneous_dependency_attack(using_tuf=False) except ExtraneousDependencyAlert, error: print error - - +else: + print 'Extraneous dependency attack failed.' +print print 'Attempting extraneous dependency attack with TUF:' try: - test_extraneous_dependency_attack(TUF=True) + test_extraneous_dependency_attack(using_tuf=True) except ExtraneousDependencyAlert, error: print error +else: + print 'Extraneous dependency attack failed.' +print diff --git a/tests/integration/test_indefinite_freeze_attack.py b/tests/integration/test_indefinite_freeze_attack.py index b422fae5..69fab6f1 100755 --- a/tests/integration/test_indefinite_freeze_attack.py +++ b/tests/integration/test_indefinite_freeze_attack.py @@ -28,7 +28,7 @@ import tuf import tuf.formats -import tuf.interposition.urllib_tuf as urllib_tuf +import tuf.interposition import tuf.repo.signerlib as signerlib import tuf.tests.util_test_tools as util_test_tools @@ -61,9 +61,9 @@ def _remake_timestamp(metadata_dir, keyids): -def _download(url, filename, tuf=False): - if tuf: - urllib_tuf.urlretrieve(url, filename) +def _download(url, filename, using_tuf=False): + if using_tuf: + tuf.interposition.urllib_tuf.urlretrieve(url, filename) else: urllib.urlretrieve(url, filename) @@ -72,10 +72,10 @@ def _download(url, filename, tuf=False): -def test_indefinite_freeze_attack(TUF=False): +def test_indefinite_freeze_attack(using_tuf=False): """ - TUF: + using_tuf: If set to 'False' all directories that start with 'tuf_' are ignored, indicating that tuf is not implemented. @@ -88,7 +88,7 @@ def test_indefinite_freeze_attack(TUF=False): try: # Setup. - root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF) + root_repo, url, server_proc, keyids = util_test_tools.init_repo(using_tuf) reg_repo = os.path.join(root_repo, 'reg_repo') tuf_repo = os.path.join(root_repo, 'tuf_repo') metadata_dir = os.path.join(tuf_repo, 'metadata') @@ -100,7 +100,7 @@ def test_indefinite_freeze_attack(TUF=False): url_to_repo = url+'reg_repo/'+file_basename downloaded_file = os.path.join(downloads, file_basename) - if TUF: + if using_tuf: print 'TUF ...' # Update TUF metadata before attacker modifies anything. @@ -117,26 +117,25 @@ def test_indefinite_freeze_attack(TUF=False): _remake_timestamp(metadata_dir, keyids) - # Client performs initial download. + # Client performs initial download. If the computer is slow, it may + # take longer time than expiration time. In this case you will see + # an ExpiredMetadataError. try: - _download(url=url_to_repo, filename=downloaded_file, tuf=TUF) - except tuf.ExpiredMetadataError: - msg = ('Metadata has expired too soon, extend expiration period. '+ - 'Current expiration is set to: '+repr(EXPIRATION)+' second(s).') - sys.exit(msg) - - # Expire timestamp. - time.sleep(EXPIRATION) - - # Try downloading again, this should raise an error. - try: - _download(url=url_to_repo, filename=downloaded_file, tuf=TUF) - except tuf.ExpiredMetadataError, error: - pass + _download(url_to_repo, downloaded_file, using_tuf) + except: + print 'Initial download failed! It may be because your machine is '+ \ + 'busy. Try again later.' else: - raise IndefiniteFreezeAttackAlert(ERROR_MSG) - + # Expire timestamp. + time.sleep(EXPIRATION) + # Try downloading again, this should raise an error. + try: + _download(url_to_repo, downloaded_file, using_tuf) + except tuf.ExpiredMetadataError, error: + print 'Caught an expiration error!' + else: + raise IndefiniteFreezeAttackAlert(ERROR_MSG) finally: util_test_tools.cleanup(root_repo, server_proc) @@ -145,12 +144,12 @@ def test_indefinite_freeze_attack(TUF=False): try: - test_indefinite_freeze_attack(TUF=False) + test_indefinite_freeze_attack(using_tuf=False) except IndefiniteFreezeAttackAlert, error: print error try: - test_indefinite_freeze_attack(TUF=True) + test_indefinite_freeze_attack(using_tuf=True) except IndefiniteFreezeAttackAlert, error: print error diff --git a/tests/integration/test_mix_and_match_attack.py b/tests/integration/test_mix_and_match_attack.py index da8e170c..89955abd 100755 --- a/tests/integration/test_mix_and_match_attack.py +++ b/tests/integration/test_mix_and_match_attack.py @@ -19,17 +19,17 @@ combination of metadata that never existed together on the repository at the same time. -NOTE: The interposition provided by 'tuf.interposition' is used to intercept -all calls made by urllib/urillib2 to certain network locations specified in -the interposition configuration file. Look up interposition.py for more -information and illustration of a sample contents of the interposition -configuration file. Interposition was meant to make TUF integration with an -existing software updater an easy process. This allows for more flexibility -to the existing software updater. However, if you are planning to solely use -TUF there should be no need for interposition, all necessary calls will be -generated from within TUF. + NOTE: The interposition provided by 'tuf.interposition' is used to intercept + all calls made by urllib/urillib2 to certain network locations specified in + the interposition configuration file. Look up interposition.py for more + information and illustration of a sample contents of the interposition + configuration file. Interposition was meant to make TUF integration with an + existing software updater an easy process. This allows for more flexibility + to the existing software updater. However, if you are planning to solely use + TUF there should be no need for interposition, all necessary calls will be + generated from within TUF. -Note: There is no difference between 'updates' and 'target' files. + There is no difference between 'updates' and 'target' files. """ @@ -38,9 +38,10 @@ import shutil import urllib import tempfile +import time import tuf -import tuf.interposition.urllib_tuf as urllib_tuf +import tuf.interposition import tuf.tests.util_test_tools as util_test_tools @@ -48,16 +49,16 @@ class MixAndMatchAttackAlert(Exception): pass -def _download(url, filename, tuf=False): - if tuf: - urllib_tuf.urlretrieve(url, filename) +def _download(url, filename, using_tuf=False): + if using_tuf: + tuf.interposition.urllib_tuf.urlretrieve(url, filename) else: urllib.urlretrieve(url, filename) -def test_mix_and_match_attack(TUF=False): +def test_mix_and_match_attack(using_tuf=False): """ Attack design: There are 3 stages: @@ -81,7 +82,7 @@ def test_mix_and_match_attack(TUF=False): try: # Setup / Stage 1 # --------------- - root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF) + root_repo, url, server_proc, keyids = util_test_tools.init_repo(using_tuf) reg_repo = os.path.join(root_repo, 'reg_repo') downloads = os.path.join(root_repo, 'downloads') evil_dir = tempfile.mkdtemp(dir=root_repo) @@ -97,7 +98,7 @@ def test_mix_and_match_attack(TUF=False): unpatched_file = os.path.join(evil_dir, file_basename) - if TUF: + if using_tuf: print 'TUF ...' tuf_repo = os.path.join(root_repo, 'tuf_repo') tuf_targets = os.path.join(tuf_repo, 'targets') @@ -125,8 +126,10 @@ def test_mix_and_match_attack(TUF=False): url_to_file = 'http://localhost:9999/'+file_basename + # Wait for some time to let program set up local http server + time.sleep(1) # Client's initial download. - _download(url=url_to_file, filename=downloaded_file, tuf=TUF) + _download(url_to_file, downloaded_file, using_tuf) # Stage 2 # ------- @@ -135,11 +138,11 @@ def test_mix_and_match_attack(TUF=False): # Updating tuf repository. This will copy files from regular repository # into tuf repository and refresh the metadata - if TUF: + if using_tuf: util_test_tools.tuf_refresh_repo(root_repo, keyids) # Client downloads the patched file. - _download(url=url_to_file, filename=downloaded_file, tuf=TUF) + _download(url_to_file, downloaded_file, using_tuf) downloaded_content = util_test_tools.read_file_content(downloaded_file) @@ -150,7 +153,7 @@ def test_mix_and_match_attack(TUF=False): # Updating tuf repository. This will copy files from regular repository # into tuf repository and refresh the metadata - if TUF: + if using_tuf: util_test_tools.tuf_refresh_repo(root_repo, keyids) # Attacker replaces the metadata and the target file. @@ -163,9 +166,11 @@ def test_mix_and_match_attack(TUF=False): # Client tries to downloads the newly patched file. try: - _download(url=url_to_file, filename=downloaded_file, tuf=TUF) - except tuf.MetadataNotAvailableError: - pass + _download(url_to_file, downloaded_file, using_tuf) + except tuf.NoWorkingMirrorError as errors: + for mirror_url, mirror_error in errors.mirror_errors.iteritems(): + if type(mirror_error) == tuf.BadHashError: + print 'Caught a Bad Hash Error!' # Check whether the attack succeeded by inspecting the content of the # update. The update should contain 'Test NOT A'. @@ -182,12 +187,12 @@ def test_mix_and_match_attack(TUF=False): try: - test_mix_and_match_attack(TUF=False) + test_mix_and_match_attack(using_tuf=False) except MixAndMatchAttackAlert, error: print error try: - test_mix_and_match_attack(TUF=True) + test_mix_and_match_attack(using_tuf=True) except MixAndMatchAttackAlert, error: print error diff --git a/tests/integration/test_replay_attack.py b/tests/integration/test_replay_attack.py index b9088264..c9655077 100755 --- a/tests/integration/test_replay_attack.py +++ b/tests/integration/test_replay_attack.py @@ -1,3 +1,5 @@ +#!/usr/bin/env python + """ test_replay_attack.py @@ -38,23 +40,20 @@ import urllib import tempfile -import tuf.interposition.urllib_tuf as urllib_tuf +import tuf.interposition import tuf.tests.util_test_tools as util_test_tools -class TestSetupError(Exception): - pass +class TestSetupError(Exception): pass +class ReplayAttackAlert(Exception): pass -class ReplayAttackAlert(Exception): - pass - -def _download(url, filename, tuf=False): - if tuf: - urllib_tuf.urlretrieve(url, filename) +def _download(url, filename, using_tuf=False): + if using_tuf: + tuf.interposition.urllib_tuf.urlretrieve(url, filename) else: urllib.urlretrieve(url, filename) @@ -63,10 +62,10 @@ def _download(url, filename, tuf=False): -def test_replay_attack(TUF=False): +def test_replay_attack(using_tuf=False): """ - TUF: + using_tuf: If set to 'False' all directories that start with 'tuf_' are ignored, indicating that tuf is not implemented. @@ -76,32 +75,35 @@ def test_replay_attack(TUF=False): """ ERROR_MSG = '\tReplay Attack was Successful!\n\n' - + FIRST_CONTENT = 'Test A' + SECOND_CONTENT = 'Test B' try: # Setup. - root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF) + root_repo, url, server_proc, keyids = \ + util_test_tools.init_repo(using_tuf=using_tuf) reg_repo = os.path.join(root_repo, 'reg_repo') tuf_repo = os.path.join(root_repo, 'tuf_repo') + tuf_repo_copy = os.path.join(root_repo, 'tuf_repo_copy') downloads = os.path.join(root_repo, 'downloads') tuf_targets = os.path.join(tuf_repo, 'targets') # Add file to 'repo' directory: {root_repo} - filepath = util_test_tools.add_file_to_repository(reg_repo, 'Test A') + filepath = util_test_tools.add_file_to_repository(reg_repo, FIRST_CONTENT) file_basename = os.path.basename(filepath) url_to_repo = url+'reg_repo/'+file_basename downloaded_file = os.path.join(downloads, file_basename) # Attacker saves the original file into 'evil_dir'. evil_dir = tempfile.mkdtemp(dir=root_repo) - vulnerable_file = os.path.join(evil_dir, file_basename) + original_file = os.path.join(evil_dir, file_basename) shutil.copy(filepath, evil_dir) - if TUF: - print 'TUF ...' - + if using_tuf: # Update TUF metadata before attacker modifies anything. util_test_tools.tuf_refresh_repo(root_repo, keyids) + # Copy the first version of the repository for replay later. + shutil.copytree(tuf_repo, tuf_repo_copy) # Modify the url. Remember that the interposition will intercept # urls that have 'localhost:9999' hostname, which was specified in @@ -109,62 +111,68 @@ def test_replay_attack(TUF=False): # in 'util_test_tools.py'. Further, the 'file_basename' is the target # path relative to 'targets_dir'. url_to_repo = 'http://localhost:9999/'+file_basename - # End of Setup. - # Client performs initial update. - _download(url=url_to_repo, filename=downloaded_file, tuf=TUF) + _download(url=url_to_repo, filename=downloaded_file, using_tuf=using_tuf) # Downloads are stored in the same directory '{root_repo}/downloads/' # for regular and tuf clients. downloaded_content = util_test_tools.read_file_content(downloaded_file) - if 'Test A' != downloaded_content: - raise TestSetupError('[Initial Updata] Failed to download the file.') + if FIRST_CONTENT != downloaded_content: + raise TestSetupError('[Initial Update] Failed to download the file.') # Developer patches the file and updates the repository. - util_test_tools.modify_file_at_repository(filepath, 'Test NOT A') + util_test_tools.modify_file_at_repository(filepath, SECOND_CONTENT) # Updating tuf repository. This will copy files from regular repository # into tuf repository and refresh the metadata - if TUF: + if using_tuf: util_test_tools.tuf_refresh_repo(root_repo, keyids) - # Client downloads the patched file. - _download(url=url_to_repo, filename=downloaded_file, tuf=TUF) + _download(url=url_to_repo, filename=downloaded_file, using_tuf=using_tuf) # Content of the downloaded file. downloaded_content = util_test_tools.read_file_content(downloaded_file) - if 'Test NOT A' != downloaded_content: + if SECOND_CONTENT != downloaded_content: raise TestSetupError('[Update] Failed to update the file.') # Attacker tries to be clever, he manages to modifies regular and tuf # targets directory by replacing a patched file with an old one. - if os.path.isdir(tuf_targets): - target = os.path.join(tuf_targets, file_basename) - util_test_tools.delete_file_at_repository(target) - shutil.copy(vulnerable_file, tuf_targets) - # Verify that 'target' is an old, un-patched file. - target = os.path.join(tuf_targets, file_basename) - target_content = util_test_tools.read_file_content(target) - if 'Test A' != target_content: - raise TestSetupError("The 'target' file contains new data!") - + if using_tuf: + # Delete the current TUF repository... + shutil.rmtree(tuf_repo) + # ...and replace it with a previous copy. + shutil.move(tuf_repo_copy, tuf_repo) else: + # Delete the current file... util_test_tools.delete_file_at_repository(filepath) - shutil.copy(vulnerable_file, reg_repo) + # ...and replace it with a previous copy. + shutil.copy(original_file, reg_repo) + try: + # Client downloads the file once more. + _download(url=url_to_repo, filename=downloaded_file, using_tuf=using_tuf) + except tuf.NoWorkingMirrorError, exception: + replayed_metadata_attack = False - # Client downloads the file once more. - _download(url=url_to_repo, filename=downloaded_file, tuf=TUF) - - # Check whether the attack succeeded by inspecting the content of the - # update. The update should contain 'Test NOT A'. - downloaded_content = util_test_tools.read_file_content(downloaded_file) - if 'Test NOT A' != downloaded_content: - raise ReplayAttackAlert(ERROR_MSG) + for mirror_url, mirror_error in exception.mirror_errors.iteritems(): + if isinstance(mirror_error, tuf.ReplayedMetadataError): + replayed_metadata_attack = True + break + # In case we did not detect what was likely a replayed metadata attack, + # we reraise the exception to indicate that replayed metadata attack + # detection failed. + if not replayed_metadata_attack: raise + else: + # Check whether the attack succeeded by inspecting the content of the + # update. The update should contain 'Test NOT A'. + downloaded_content = util_test_tools.read_file_content(downloaded_file) + # If we ended up downloading replayed content, then we failed. + if FIRST_CONTENT == downloaded_content: + raise ReplayAttackAlert(ERROR_MSG) finally: util_test_tools.cleanup(root_repo, server_proc) @@ -174,13 +182,19 @@ def test_replay_attack(TUF=False): try: - test_replay_attack(TUF=False) -except ReplayAttackAlert, error: - print error + test_replay_attack(using_tuf=False) +except ReplayAttackAlert, exception: + print('Download without TUF fell prey to replayed metadata attack.') - - -try: - test_replay_attack(TUF=True) -except ReplayAttackAlert, error: - print error + try: + test_replay_attack(using_tuf=True) + except ReplayAttackAlert, exception: + print('Download with TUF fell prey to replayed metadata attack!') + except Exception, exception: + print('Download with TUF failed due to: '+str(exception)) + else: + print('Download with TUF defended against replayed metadata attack.') +except Exception, exception: + print('Download without TUF failed due to: '+str(exception)) +else: + print('Download without TUF did NOT fail due to replayed metadata attack!') diff --git a/tests/integration/test_slow_retrieval_attack.py b/tests/integration/test_slow_retrieval_attack.py index 9d515be2..a544976f 100755 --- a/tests/integration/test_slow_retrieval_attack.py +++ b/tests/integration/test_slow_retrieval_attack.py @@ -49,7 +49,7 @@ import urllib -import tuf.interposition.urllib_tuf as urllib_tuf +import tuf.interposition import tuf.tests.util_test_tools as util_test_tools @@ -57,10 +57,10 @@ class SlowRetrievalAttackAlert(Exception): pass -def _download(url, filename, TUF=False): - if TUF: +def _download(url, filename, using_tuf=False): + if using_tuf: try: - urllib_tuf.urlretrieve(url, filename) + tuf.interposition.urllib_tuf.urlretrieve(url, filename) except tuf.NoWorkingMirrorError, exception: slow_retrieval = False for mirror_url, mirror_error in exception.mirror_errors.iteritems(): @@ -82,10 +82,10 @@ def _download(url, filename, TUF=False): -def test_slow_retrieval_attack(TUF=False, mode=None): +def test_slow_retrieval_attack(using_tuf=False, mode=None): WAIT_TIME = 60 # Number of seconds to wait until download completes. - ERROR_MSG = 'Slow retrieval attack succeeded (TUF: '+str(TUF)+', mode: '+\ + ERROR_MSG = 'Slow retrieval attack succeeded (using_tuf: '+str(using_tuf)+', mode: '+\ str(mode)+').' # Launch the server. @@ -97,7 +97,7 @@ def test_slow_retrieval_attack(TUF=False, mode=None): try: # Setup. root_repo, url, server_proc, keyids = \ - util_test_tools.init_repo(tuf=TUF, port=port) + util_test_tools.init_repo(using_tuf, port=port) reg_repo = os.path.join(root_repo, 'reg_repo') downloads = os.path.join(root_repo, 'downloads') @@ -107,8 +107,7 @@ def test_slow_retrieval_attack(TUF=False, mode=None): url_to_file = url+'reg_repo/'+file_basename downloaded_file = os.path.join(downloads, file_basename) - - if TUF: + if using_tuf: tuf_repo = os.path.join(root_repo, 'tuf_repo') # Update TUF metadata before attacker modifies anything. @@ -124,7 +123,7 @@ def test_slow_retrieval_attack(TUF=False, mode=None): # Client tries to download. # NOTE: if TUF is enabled the metadata files will be downloaded first. - proc = Process(target=_download, args=(url_to_file, downloaded_file, TUF)) + proc = Process(target=_download, args=(url_to_file, downloaded_file, using_tuf)) proc.start() proc.join(WAIT_TIME) @@ -147,25 +146,25 @@ def test_slow_retrieval_attack(TUF=False, mode=None): # mode_2: During the download process, the server blocks the download # by sending just several characters every few seconds. try: - test_slow_retrieval_attack(TUF=False, mode = "mode_1") + test_slow_retrieval_attack(using_tuf=False, mode = "mode_1") except SlowRetrievalAttackAlert, error: print(error) print() try: - test_slow_retrieval_attack(TUF=False, mode = "mode_2") + test_slow_retrieval_attack(using_tuf=False, mode = "mode_2") except SlowRetrievalAttackAlert, error: print(error) print() try: - test_slow_retrieval_attack(TUF=True, mode = "mode_1") + test_slow_retrieval_attack(using_tuf=True, mode = "mode_1") except SlowRetrievalAttackAlert, error: print(error) print() try: - test_slow_retrieval_attack(TUF=True, mode = "mode_2") + test_slow_retrieval_attack(using_tuf=True, mode = "mode_2") except SlowRetrievalAttackAlert, error: print(error) print() diff --git a/tests/unit/test_keystore.py b/tests/unit/test_keystore.py index b5b2c3f8..5eed223a 100755 --- a/tests/unit/test_keystore.py +++ b/tests/unit/test_keystore.py @@ -350,7 +350,6 @@ def tearDownModule(): tuf.repo.keystore.clear_keystore() - # Run the unit tests. if __name__ == '__main__': unittest.main() diff --git a/tests/unit/test_updater.py b/tests/unit/test_updater.py index 770e36e6..0b8385ab 100755 --- a/tests/unit/test_updater.py +++ b/tests/unit/test_updater.py @@ -898,6 +898,7 @@ def test_5_all_targets(self): # returns each filepath listed in 'self.all_role_paths' in the listed # order. self._mock_download_url_to_tempfileobj(self.all_role_paths) + setup.build_server_repository(self.server_repo_dir, self.targets_dir) # Update top-level metadata. self.Repository.refresh() diff --git a/tests/unit/test_util_test_tools.py b/tests/unit/test_util_test_tools.py index 825f4aab..0acf5131 100755 --- a/tests/unit/test_util_test_tools.py +++ b/tests/unit/test_util_test_tools.py @@ -34,7 +34,7 @@ def setUp(self): tuf.repo.keystore.clear_keystore() # Unpacking necessary parameters returned from init_repo() - essential_params = util_test_tools.init_repo(tuf=True) + essential_params = util_test_tools.init_repo(using_tuf=True) self.root_repo = essential_params[0] self.url = essential_params[1] self.server_proc = essential_params[2] diff --git a/tuf/__init__.py b/tuf/__init__.py index 503e4fcf..70324314 100755 --- a/tuf/__init__.py +++ b/tuf/__init__.py @@ -21,6 +21,8 @@ """ +import urlparse + # Import 'tuf.formats' if a module tries to import the # entire tuf package (i.e., from tuf import *). __all__ = ['formats'] @@ -78,7 +80,14 @@ class UnsupportedAlgorithmError(Error): class BadHashError(Error): """Indicate an error while checking the value a hash object.""" - pass + + def __init__(self, expected_hash, observed_hash): + self.expected_hash = expected_hash + self.observed_hash = observed_hash + + def __str__(self): + return 'Observed hash ('+str(self.observed_hash)+\ + ') != expected hash ('+str(self.expected_hash)+')' @@ -118,7 +127,12 @@ class ForbiddenTargetError(RepositoryError): class ExpiredMetadataError(Error): """Indicate that a TUF Metadata file has expired.""" - pass + + def __init__(self, expiry_time): + self.expiry_time = expiry_time # UTC + + def __str__(self): + return 'Metadata expired on '+str(self.expiry_time)+'.' @@ -134,9 +148,9 @@ def __init__(self, metadata_role, previous_version, current_version): def __str__(self): - return str(self.metadata_role)+' is older than the version currently'+\ - 'installed.\nDownloaded version: '+repr(self.previous_version)+'\n'+\ - 'Current version: '+repr(self.current_version) + return 'Downloaded '+str(self.metadata_role)+' is older ('+\ + str(self.previous_version)+') than the version currently '+\ + 'installed ('+repr(self.current_version)+').' @@ -152,7 +166,12 @@ class CryptoError(Error): class BadSignatureError(CryptoError): """Indicate that some metadata file had a bad signature.""" - pass + + def __init__(self, metadata_role_name): + self.metadata_role_name = metadata_role_name + + def __str__(self): + return str(self.metadata_role_name)+' metadata has bad signature!' @@ -278,9 +297,18 @@ def __init__(self, mirror_errors): self.mirror_errors = mirror_errors def __str__(self): - return str(self.mirror_errors) - - + all_errors = 'No working mirror was found:' + for mirror_url, mirror_error in self.mirror_errors.iteritems(): + try: + # http://docs.python.org/2/library/urlparse.html#urlparse.urlparse + mirror_url_tokens = urlparse.urlparse(mirror_url) + except: + logging.exception('Failed to parse mirror URL: '+str(mirror_url)) + mirror_netloc = mirror_url + else: + mirror_netloc = mirror_url_tokens.netloc + all_errors += '\n '+str(mirror_netloc)+': '+str(mirror_error) + return all_errors diff --git a/tuf/client/updater.py b/tuf/client/updater.py index 1d924f8c..3b875fb3 100755 --- a/tuf/client/updater.py +++ b/tuf/client/updater.py @@ -627,8 +627,7 @@ def __check_hashes(self, file_object, trusted_hashes): digest_object.update(file_object.read()) computed_hash = digest_object.hexdigest() if trusted_hash != computed_hash: - raise tuf.BadHashError('Hashes do not match! Expected '+ - trusted_hash+' got '+computed_hash) + raise tuf.BadHashError(trusted_hash, computed_hash) else: logger.info('The file\'s '+algorithm+' hash is correct: '+trusted_hash) @@ -835,7 +834,7 @@ def __verify_uncompressed_metadata_file(self, metadata_file_object, # Verify the signature on the downloaded metadata object. valid = tuf.sig.verify(metadata_signable, metadata_role) if not valid: - raise tuf.BadSignatureError() + raise tuf.BadSignatureError(metadata_role) @@ -1021,7 +1020,7 @@ def __get_file(self, filepath, verify_uncompressed_file, file_type, except Exception, exception: # Remember the error from this mirror, and "reset" the target file. - logger.exception('Download failed from '+file_mirror+'.') + logger.exception('Update failed from '+file_mirror+'.') file_mirror_errors[file_mirror] = exception file_object = None else: @@ -1030,8 +1029,8 @@ def __get_file(self, filepath, verify_uncompressed_file, file_type, if file_object: return file_object else: - logger.exception('Failed to download {0}: {1}'.format(filepath, - file_mirror_errors)) + logger.exception('Failed to update {0} from all mirrors: {1}'.format( + filepath, file_mirror_errors)) raise tuf.NoWorkingMirrorError(file_mirror_errors) @@ -1742,9 +1741,11 @@ def _ensure_not_expired(self, metadata_role): # an exception. 'expires' is in YYYY-MM-DD HH:MM:SS format, so # convert it to seconds since the epoch, which is the time format # returned by time.time() (i.e., current time), before comparing. - if tuf.formats.parse_time(expires) < time.time(): - message = 'Metadata '+repr(rolepath)+' expired on '+repr(expires)+'.' - raise tuf.ExpiredMetadataError(message) + current_time = time.time() + expiry_time = tuf.formats.parse_time(expires) + if expiry_time < current_time: + logger.error('Metadata '+repr(rolepath)+' expired on '+repr(expires)+'.') + raise tuf.ExpiredMetadataError(expires) diff --git a/tuf/log.py b/tuf/log.py index 0667e4ad..e1a44752 100755 --- a/tuf/log.py +++ b/tuf/log.py @@ -72,15 +72,16 @@ _FORMAT_STRING = '[%(asctime)s UTC] [%(name)s] [%(levelname)s]'+\ '[%(funcName)s:%(lineno)s@%(filename)s] %(message)s' - +# Ask all Formatter instances to talk GMT. +# http://docs.python.org/2/library/logging.html#logging.Formatter.formatException logging.Formatter.converter = time.gmtime formatter = logging.Formatter(_FORMAT_STRING) -# Set the handlers for the logger. The console handler is unset by default. A +# Set the handlers for the logger. The console handler is unset by default. A # module importing 'log.py' should explicitly set the console handler if -# outputting log messages to the screen is needed. Adding a console handler -# can be done with tuf.log.add_console_handler(). Logging messages to a file -# *is* set by default. +# outputting log messages to the screen is needed. Adding a console handler can +# be done with tuf.log.add_console_handler(). Logging messages to a file *is* +# set by default. console_handler = None # Set the built-in file handler. Messages will be logged to @@ -104,6 +105,54 @@ +class ConsoleFilter(logging.Filter): + def filter(self, record): + """ + + Use Vinay Sajip's recommendation from Python issue #6435 to modify a + LogRecord object. This is meant to be used with our console handler. + + http://stackoverflow.com/q/6177520 + http://stackoverflow.com/q/5875225 + http://bugs.python.org/issue6435 + http://docs.python.org/2/howto/logging-cookbook.html#filters-contextual + http://docs.python.org/2/library/logging.html#logrecord-attributes + + + record: + A logging.LogRecord object. + + + None. + + + Replaces the LogRecord exception text attribute. + + + True. + + """ + + # If this LogRecord object has an exception, then we will replace its text. + if record.exc_info: + # We place the record's cached exception text (which usually contains the + # exception traceback) with much simpler exception information. This is + # most useful for the console handler, which we do not wish to deluge + # with too much data. Assuming that this filter is not applied to the + # file logging handler, the user may always consult the file log for the + # original exception traceback. The exc_info is explained here: + # http://docs.python.org/2/library/sys.html#sys.exc_info + exc_type, exc_value, exc_traceback = record.exc_info + # Simply set the class name as the exception text. + record.exc_text = exc_type.__name__ + + # Always return True to signal that any given record must be formatted. + return True + + + + + def set_log_level(log_level=_DEFAULT_LOG_LEVEL): """ @@ -201,6 +250,7 @@ def set_console_log_level(log_level=_DEFAULT_CONSOLE_LOG_LEVEL): + def add_console_handler(log_level=_DEFAULT_CONSOLE_LOG_LEVEL): """ @@ -223,14 +273,41 @@ def add_console_handler(log_level=_DEFAULT_CONSOLE_LOG_LEVEL): """ + # Assign to the global console_handler object. + global console_handler + # Does 'log_level' have the correct format? # Raise 'tuf.FormatError' if there is a mismatch. tuf.formats.LENGTH_SCHEMA.check_match(log_level) - # Set the console handler for the logger. The built-in console handler will - # log messages to 'sys.stderr' and capture 'log_level' messages. + if not console_handler: + # Set the console handler for the logger. The built-in console handler will + # log messages to 'sys.stderr' and capture 'log_level' messages. + # NOTE: This is not thread-safe. + console_handler = logging.StreamHandler() + # Get our filter for the console handler. + console_filter = ConsoleFilter() + + console_handler.setLevel(log_level) + console_handler.setFormatter(formatter) + console_handler.addFilter(console_filter) + logger.addHandler(console_handler) + logger.debug('Added a console handler.') + else: + logger.warn('We already have a console handler.') + + + + + +def remove_console_handler(): + # Assign to the global console_handler object. global console_handler - console_handler = logging.StreamHandler() - console_handler.setLevel(log_level) - console_handler.setFormatter(formatter) - logger.addHandler(console_handler) + + if console_handler: + logger.removeHandler(console_handler) + # NOTE: This is not thread-safe. + console_handler = None + logger.debug('Removed a console handler.') + else: + logger.warn('We do not have a console handler.') diff --git a/tuf/repo/keystore.py b/tuf/repo/keystore.py index 2feb1a33..9db7d812 100755 --- a/tuf/repo/keystore.py +++ b/tuf/repo/keystore.py @@ -72,6 +72,7 @@ import tuf.util import tuf.conf + # See 'log.py' to learn how logging is handled in TUF. logger = logging.getLogger('tuf.keystore') @@ -116,7 +117,7 @@ # {keyid: key, # keyid2: key2, # ...} -_keystore = {} +keystore = {} def add_rsakey(rsakey_dict, password, keyid=None): @@ -535,6 +536,7 @@ def _generate_derived_key(password, salt=None, iterations=None): if iterations is None: iterations = _PBKDF2_ITERATIONS + def pseudorandom_function(password, salt): """ PyCrypto's PBKDF2() expects a callable function for its optional diff --git a/tuf/repo/signerlib.py b/tuf/repo/signerlib.py index 2698518f..3f44ea5e 100755 --- a/tuf/repo/signerlib.py +++ b/tuf/repo/signerlib.py @@ -37,7 +37,8 @@ json = tuf.util.import_json() -# Recommended RSA key sizes: http://www.rsa.com/rsalabs/node.asp?id=2004 +# Recommended RSA key sizes: +# http://www.emc.com/emc-plus/rsa-labs/historical/twirl-and-rsa-key-size.htm#table1 # According to the document above, revised May 6, 2003, RSA keys of # size 3072 provide security through 2031 and beyond. 2048-bit keys # are the recommended minimum and are good from the present through 2030. @@ -746,6 +747,9 @@ def generate_and_save_rsa_key(keystore_directory, password, bits: The key size, or key length, of the RSA key. + If 'bits' is unspecified, a 3072-bit RSA key is generated, which is the + key size recommended by TUF, although 2048-bit keys are accepted + (minimum key size). tuf.FormatError, if 'bits' or 'password' does not have the diff --git a/tuf/rsa_key.py b/tuf/rsa_key.py index c5ce9d52..482710e2 100755 --- a/tuf/rsa_key.py +++ b/tuf/rsa_key.py @@ -69,7 +69,8 @@ _KEY_ID_HASH_ALGORITHM = 'sha256' -# Recommended RSA key sizes: http://www.rsa.com/rsalabs/node.asp?id=2004 +# Recommended RSA key sizes: +# http://www.emc.com/emc-plus/rsa-labs/historical/twirl-and-rsa-key-size.htm#table1 # According to the document above, revised May 6, 2003, RSA keys of # size 3072 provide security through 2031 and beyond. _DEFAULT_RSA_KEY_BITS = 3072 @@ -87,16 +88,21 @@ def generate(bits=_DEFAULT_RSA_KEY_BITS): 'private': '-----BEGIN RSA PRIVATE KEY----- ...'}} The public and private keys are in PEM format and stored as strings. + + Although the crytography library called sets a 1024-bit minimum key size, + generate() enforces a minimum key size of 2048 bits. If 'bits' is + unspecified, a 3072-bit RSA key is generated, which is the key size + recommended by TUF. bits: - The key size, or key length, of the RSA key. 'bits' must be 1024, or + The key size, or key length, of the RSA key. 'bits' must be 2048, or greater, and a multiple of 256. ValueError, if an exception occurs after calling the RSA key generation - routine. 'bits' must be 1024, or greater, and a multiple of 256. - Raised by Cryptography library. + routine. 'bits' must be a multiple of 256. The 'ValueError' exception is + raised by the key generation function of the cryptography library called. tuf.FormatError, if 'bits' does not contain the correct format. @@ -121,8 +127,9 @@ def generate(bits=_DEFAULT_RSA_KEY_BITS): keytype = 'rsa' # Generate the public and private RSA keys. The PyCrypto module performs - # the actual key generation. Raise 'ValueError' if 'bits' is less than 1024 - # or not a multiple of 256. + # the actual key generation. Raise 'ValueError' if 'bits' is less than 1024 + # or not a multiple of 256, although a 2048-bit minimum is enforced by + # tuf.formats.RSAKEYBITS_SCHEMA.check_match(). rsa_key_object = Crypto.PublicKey.RSA.generate(bits) # Extract the public & private halves of the RSA key and generate their diff --git a/tuf/tests/repository_setup.py b/tuf/tests/repository_setup.py old mode 100755 new mode 100644 diff --git a/tuf/tests/unittest_toolbox.py b/tuf/tests/unittest_toolbox.py old mode 100755 new mode 100644 index 7aa2126d..ff23af49 --- a/tuf/tests/unittest_toolbox.py +++ b/tuf/tests/unittest_toolbox.py @@ -30,7 +30,6 @@ import tuf.rsa_key as rsa_key import tuf.repo.keystore as keystore - # Modify the number of iterations (from the higher default count) so the unit # tests run faster. keystore._PBKDF2_ITERATIONS = 1000 diff --git a/tuf/tests/util_test_tools.py b/tuf/tests/util_test_tools.py index d79cfeed..6992e97c 100644 --- a/tuf/tests/util_test_tools.py +++ b/tuf/tests/util_test_tools.py @@ -81,7 +81,7 @@ previous metadata files. - init_repo(tuf=True): + init_repo(using_tuf=True): Initializes the repositories (depicted in the diagram above) and starts the server process. init_repo takes one boolean argument which when True sets-up tuf repository i.e. adds all of the @@ -154,11 +154,7 @@ tuf_configurations = None -def disable_console_logging(): - tuf.log.logger.removeHandler(tuf.log.console_handler) - - -def init_repo(tuf=False, port=None): +def init_repo(using_tuf=False, port=None): # Temp root directory for regular and tuf repositories. # WARNING: tuf client stores files in '{root_repo}/downloads/' directory! # Make sure regular download are NOT stored in the that directory when @@ -188,8 +184,9 @@ def init_repo(tuf=False, port=None): time.sleep(.2) keyids = None - if tuf: - disable_console_logging() + if using_tuf: + # We remove the console handler so that tests are silent by default. + tuf.log.remove_console_handler() keyids = init_tuf(root_repo) create_interposition_config(root_repo, url)