diff --git a/tuf/tests/system_test/test_rollback_attack.py b/tuf/tests/system_test/test_rollback_attack.py old mode 100644 new mode 100755 index dbd69639..4d732f91 --- a/tuf/tests/system_test/test_rollback_attack.py +++ b/tuf/tests/system_test/test_rollback_attack.py @@ -1,3 +1,33 @@ +""" + + test_rollback_attack.py + + + Konstantin Andrianov + + + February 22, 2012 + + + See LICENSE for licensing information. + + + Simulate a rollback attack. A simple client update vs. client update + implementing TUF. + + Note: It's assumed that attacker does NOT have access to metadata signing + keys. Keep them safe! + + Note: There is no difference between 'updates' and 'target' files. + + + To implement TUF use one of the following options: '-t', '--tuf', '--TUF' + Ex. $python test_rollback_attack.py --tuf + + To simply run the a client update without implementing TUF omit the options. + Ex. $python test_rollback_attack.py +""" + import os import shutil import test_system_setup @@ -6,20 +36,13 @@ import optparse +# Was the option set? +test_system_setup.tuf_option() + + class TestRollbackAttack(test_system_setup.TestCase): # Whenever attack succeeds print following message: - msg = 'Roleback attack succeeded!\n' - - @staticmethod - def _tuf_option(): - usage = 'usage: %prog [options]' - parser = optparse.OptionParser(usage=usage) - parser.add_option('-t', '--tuf', '--TUF', action='store_true', dest='tuf', - default=False, help='Implement tuf.') - option, args = parser.parse_args() - return option.tuf - - TestRollbackAttack.TUF = _tuf_option() + msg = 'Rollback attack succeeded!\n' def setUp(self): @@ -31,12 +54,26 @@ def setUp(self): fileobj.close() + + + def tearDown(self): test_system_setup.TestCase.tearDown(self) shutil.rmtree(self.evil_dir) + + + + @unittest.skipIf(test_system_setup.TestCase.TUF is True, 'Implemented TUF!') + @unittest.expectedFailure def test_rollback_on_client(self): + """ + + Illustrate rollback attack vulnerability. + + """ + # Client performs initial updates. old_filename1_data = self.client_download(self.filename1) @@ -47,25 +84,60 @@ def test_rollback_on_client(self): # Client downloads the updated file 'filename1'. new_filename_data = self.client_download(self.filename1) + self.assertEquals(new_data, new_filename_data) - # At this point the client is happy. However, an evil tyrant is prepared + # At this point the client is happy. However, an evil tyrant has prepared # a little surprise. rel_evil_dir = os.path.basename(self.evil_dir) self.url = 'http://localhost:'+str(self.port)+'/'+rel_evil_dir+'/' self.client_download(self.filename1) - # Check client's downloads directory, if indeed the client has an update - # with the old content. + # Check whether the attack succeeded by inspecting the content of the + # update. The update should contain 'new_data'. self.assertEquals(new_data, self.client_download(self.filename1), self.msg) + + + + @unittest.skipIf(test_system_setup.TestCase.TUF is False, + 'TUF was NOT implemented!') def test_rollback_using_tuf(self): - if self.TUF = False: - print 'TUF is NOT implement!\n' - return + """ + + Illustrate protection against rollback attacks. + """ + + # TUF client initial update. + self.tuf_client_download() + targetpath1 = os.path.join(self.tuf_client_downloads_dir, self.filename1) + self.assertTrue(os.path.exists(targetpath1)) + + tuf_client_file_content = self.read_file_content(targetpath1) + # Developer updates 'filename1' update and updates the TUF repository. + new_data = 'NewData' + self.add_or_change_file_at_repository(filename=self.filename1, + data=new_data) + self.refresh_tuf_repository() + # TUF client performs another update. + self.tuf_client_download() + + tuf_client_file_content = self.read_file_content(targetpath1) + + # Attacker tries to be clever. + rel_evil_dir = os.path.basename(self.evil_dir) + self.url = 'http://localhost:'+str(self.port)+'/'+rel_evil_dir+'/' + + # TUF client performs yet another update. + self.tuf_client_download() + + # Check whether the attack succeeded by inspecting the content of the + # update. The update should contain 'new_data'. + tuf_client_file_content = self.read_file_content(targetpath1) + self.assertEquals(new_data, tuf_client_file_content, self.msg) diff --git a/tuf/tests/system_test/test_system_setup.py b/tuf/tests/system_test/test_system_setup.py old mode 100644 new mode 100755 index 2b984bf3..9012cb85 --- a/tuf/tests/system_test/test_system_setup.py +++ b/tuf/tests/system_test/test_system_setup.py @@ -48,12 +48,15 @@ import random import urllib2 import logging +import optparse import tempfile import unittest import subprocess +import tuf.client.updater import tuf.repo.signerlib as signerlib + # Disable/Enable logging. Comment-out to Enable logging. logging.getLogger('tuf') logging.disable(logging.CRITICAL) @@ -76,7 +79,7 @@ class TestCase(unittest.TestCase): delete_file_at_repository(filename): Deletes a file on the repository ('repository_dir'). - tuf_refresh() + refresh_tuf_repository() Updates TUF metadata files. @@ -141,6 +144,7 @@ class TestCase(unittest.TestCase): TUF = True + def setUp(self): unittest.TestCase.setUp(self) @@ -154,6 +158,7 @@ def setUp(self): self.filename1 = os.path.basename(self.filepath1) self.filename2 = os.path.basename(self.filepath2) + print # Start a simple server pointing to the repository directory. self.port = random.randint(30000, 45000) command = ['python', '-m', 'SimpleHTTPServer', str(self.port)] @@ -171,6 +176,8 @@ def setUp(self): + + def tearDown(self): unittest.TestCase.tearDown(self) @@ -188,6 +195,7 @@ def tearDown(self): + def add_or_change_file_at_repository(self, filename=None, data=None): """ @@ -226,6 +234,8 @@ def add_or_change_file_at_repository(self, filename=None, data=None): + + def delete_file_at_repository(self, filename): """ @@ -244,6 +254,8 @@ def delete_file_at_repository(self, filename): + + @staticmethod def _open_connection(url): try: @@ -256,6 +268,8 @@ def _open_connection(url): + + def client_download(self, filename): if not isinstance(filename, basestring): print 'Wrong type: ' + repr(filepath) + '\n' @@ -264,12 +278,21 @@ def client_download(self, filename): return connection.read() + + + def read_file_content(self, filepath): try: fileobj = open(filepath, 'rb') except Exception, e: raise + data = fileobj.read() + fileobj.close() + return data + + + def tuf_setUp(self): @@ -351,12 +374,28 @@ def tuf_setUp(self): # Generate the 'timestamp.txt' metadata file. signerlib.build_timestamp_file(self._keyid, self.tuf_metadata_dir) + # The repository is setup! + + # It's time time setup tuf client. + # Here is a mirrors dictionary that will allow a client to seek out + # places to download the metadata and targets from. + rel_repo_dir = os.path.basename(self.tuf_repository_dir) + self.tuf_url = 'http://localhost:'+str(self.port)+'/'+rel_repo_dir+'/' + self.repository_mirrors = {'mirror1': + {'url_prefix': self.tuf_url, + 'metadata_path': 'metadata', + 'targets_path': 'targets', + 'confined_target_dirs': ['']}} + # Setting up client's TUF directory structure. # 'tuf.client.updater.py' expects the 'current' and 'previous' # directories to exist under client's 'metadata' directory. self.tuf_client_metadata_dir = os.path.join(self.tuf_client_dir, 'metadata') + self.tuf_client_downloads_dir = os.path.join(self.tuf_client_dir, + 'downloads') os.mkdir(self.tuf_client_metadata_dir) + os.mkdir(self.tuf_client_downloads_dir) # Move the metadata to the client's 'current' and 'previous' directories. self.client_current = os.path.join(self.tuf_client_metadata_dir, @@ -368,6 +407,9 @@ def tuf_setUp(self): + + + def tuf_tearDown(self): """ @@ -375,6 +417,7 @@ def tuf_tearDown(self): tuf_setUp(). """ + shutil.rmtree(self.tuf_repository_dir) shutil.rmtree(self.tuf_client_dir) @@ -382,7 +425,7 @@ def tuf_tearDown(self): - def tuf_refresh(self): + def refresh_tuf_repository(self): """ Update TUF metadata files. Call this method whenever targets files have @@ -405,6 +448,46 @@ def tuf_refresh(self): + + + def tuf_client_download(self): + # Adjusting configuration file (tuf.conf.py). + tuf.conf.repository_directory = self.tuf_client_dir + self.destination_dir = self.tuf_client_downloads_dir + + # Instantiate an updater. + updater = tuf.client.updater.Updater('updater', self.repository_mirrors) + + # Update all metadata. + updater.refresh() + + # Get the latest information on targets. + targets = updater.all_targets() + + # Determine which targets have changed or new. + updated_targets = updater.updated_targets(targets, self.destination_dir) + + # Download new/changed targets and store them in the destination + # directory 'destination_dir'. + for target in updated_targets: + updater.download_target(target, self.destination_dir) + + + + + +def tuf_option(): + usage = 'usage: %prog [options]' + parser = optparse.OptionParser(usage=usage) + parser.add_option('-t', '--tuf', '--TUF', action='store_true', dest='tuf', + default=False, help='Implement tuf.') + option, args = parser.parse_args() + TestCase.TUF = option.tuf + + + + + class test_TestCase(TestCase): """ @@ -440,7 +523,7 @@ def test_tuf_setup(self): self.assertTrue(os.path.isfile(target1)) self.assertTrue(os.path.isfile(target2)) - self.tuf_refresh() + self.refresh_tuf_repository() @@ -449,7 +532,7 @@ def test_methods(self): Making sure following methods work as intended: - add_or_change_file_at_repository() - delete_file_at_repository() - - tuf_refresh() + - refresh_tuf_repository() """ new_file = self.add_or_change_file_at_repository() fileobj = open(new_file, 'rb') @@ -467,7 +550,7 @@ def test_methods(self): self.assertTrue(os.path.exists(old_file)) self.assertEquals('1234', fileobj.read()) - self.tuf_refresh() + self.refresh_tuf_repository() new_target = os.path.join(self.tuf_targets_dir, os.path.basename(new_file)) self.assertTrue(os.path.exists(new_target)) @@ -479,5 +562,7 @@ def test_methods(self): -suite = unittest.TestLoader().loadTestsFromTestCase(test_TestCase) -unittest.TextTestRunner(verbosity=2).run(suite) \ No newline at end of file + +# Uncomment to run the test of 'test_TestCase' class. +#suite = unittest.TestLoader().loadTestsFromTestCase(test_TestCase) +#unittest.TextTestRunner(verbosity=2).run(suite) \ No newline at end of file