mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Modified test_system_setup.py and test_rollback_attack.py.
This commit is contained in:
parent
3217690d02
commit
765aafe831
2 changed files with 182 additions and 25 deletions
108
tuf/tests/system_test/test_rollback_attack.py
Normal file → Executable file
108
tuf/tests/system_test/test_rollback_attack.py
Normal file → Executable file
|
|
@ -1,3 +1,33 @@
|
|||
"""
|
||||
<Program Name>
|
||||
test_rollback_attack.py
|
||||
|
||||
<Author>
|
||||
Konstantin Andrianov
|
||||
|
||||
<Started>
|
||||
February 22, 2012
|
||||
|
||||
<Copyright>
|
||||
See LICENSE for licensing information.
|
||||
|
||||
<Purpose>
|
||||
Simulate a rollback attack. A simple client update vs. client update
|
||||
implementing TUF.
|
||||
|
||||
Note: It's assumed that attacker does NOT have access to metadata signing
|
||||
keys. Keep them safe!
|
||||
|
||||
Note: There is no difference between 'updates' and 'target' files.
|
||||
|
||||
<Usage>
|
||||
To implement TUF use one of the following options: '-t', '--tuf', '--TUF'
|
||||
Ex. $python test_rollback_attack.py --tuf
|
||||
|
||||
To simply run the a client update without implementing TUF omit the options.
|
||||
Ex. $python test_rollback_attack.py
|
||||
"""
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import test_system_setup
|
||||
|
|
@ -6,20 +36,13 @@
|
|||
import optparse
|
||||
|
||||
|
||||
# Was the option set?
|
||||
test_system_setup.tuf_option()
|
||||
|
||||
|
||||
class TestRollbackAttack(test_system_setup.TestCase):
|
||||
# Whenever attack succeeds print following message:
|
||||
msg = 'Roleback attack succeeded!\n'
|
||||
|
||||
@staticmethod
|
||||
def _tuf_option():
|
||||
usage = 'usage: %prog [options]'
|
||||
parser = optparse.OptionParser(usage=usage)
|
||||
parser.add_option('-t', '--tuf', '--TUF', action='store_true', dest='tuf',
|
||||
default=False, help='Implement tuf.')
|
||||
option, args = parser.parse_args()
|
||||
return option.tuf
|
||||
|
||||
TestRollbackAttack.TUF = _tuf_option()
|
||||
msg = 'Rollback attack succeeded!\n'
|
||||
|
||||
|
||||
def setUp(self):
|
||||
|
|
@ -31,12 +54,26 @@ def setUp(self):
|
|||
fileobj.close()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
test_system_setup.TestCase.tearDown(self)
|
||||
shutil.rmtree(self.evil_dir)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@unittest.skipIf(test_system_setup.TestCase.TUF is True, 'Implemented TUF!')
|
||||
@unittest.expectedFailure
|
||||
def test_rollback_on_client(self):
|
||||
"""
|
||||
<Purpose>
|
||||
Illustrate rollback attack vulnerability.
|
||||
|
||||
"""
|
||||
|
||||
# Client performs initial updates.
|
||||
old_filename1_data = self.client_download(self.filename1)
|
||||
|
||||
|
|
@ -47,25 +84,60 @@ def test_rollback_on_client(self):
|
|||
|
||||
# Client downloads the updated file 'filename1'.
|
||||
new_filename_data = self.client_download(self.filename1)
|
||||
self.assertEquals(new_data, new_filename_data)
|
||||
|
||||
# At this point the client is happy. However, an evil tyrant is prepared
|
||||
# At this point the client is happy. However, an evil tyrant has prepared
|
||||
# a little surprise.
|
||||
rel_evil_dir = os.path.basename(self.evil_dir)
|
||||
self.url = 'http://localhost:'+str(self.port)+'/'+rel_evil_dir+'/'
|
||||
self.client_download(self.filename1)
|
||||
|
||||
# Check client's downloads directory, if indeed the client has an update
|
||||
# with the old content.
|
||||
# Check whether the attack succeeded by inspecting the content of the
|
||||
# update. The update should contain 'new_data'.
|
||||
self.assertEquals(new_data, self.client_download(self.filename1), self.msg)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
@unittest.skipIf(test_system_setup.TestCase.TUF is False,
|
||||
'TUF was NOT implemented!')
|
||||
def test_rollback_using_tuf(self):
|
||||
if self.TUF = False:
|
||||
print 'TUF is NOT implement!\n'
|
||||
return
|
||||
"""
|
||||
<Purpose>
|
||||
Illustrate protection against rollback attacks.
|
||||
|
||||
"""
|
||||
|
||||
# TUF client initial update.
|
||||
self.tuf_client_download()
|
||||
targetpath1 = os.path.join(self.tuf_client_downloads_dir, self.filename1)
|
||||
self.assertTrue(os.path.exists(targetpath1))
|
||||
|
||||
tuf_client_file_content = self.read_file_content(targetpath1)
|
||||
|
||||
# Developer updates 'filename1' update and updates the TUF repository.
|
||||
new_data = 'NewData'
|
||||
self.add_or_change_file_at_repository(filename=self.filename1,
|
||||
data=new_data)
|
||||
self.refresh_tuf_repository()
|
||||
|
||||
# TUF client performs another update.
|
||||
self.tuf_client_download()
|
||||
|
||||
tuf_client_file_content = self.read_file_content(targetpath1)
|
||||
|
||||
# Attacker tries to be clever.
|
||||
rel_evil_dir = os.path.basename(self.evil_dir)
|
||||
self.url = 'http://localhost:'+str(self.port)+'/'+rel_evil_dir+'/'
|
||||
|
||||
# TUF client performs yet another update.
|
||||
self.tuf_client_download()
|
||||
|
||||
# Check whether the attack succeeded by inspecting the content of the
|
||||
# update. The update should contain 'new_data'.
|
||||
tuf_client_file_content = self.read_file_content(targetpath1)
|
||||
self.assertEquals(new_data, tuf_client_file_content, self.msg)
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
99
tuf/tests/system_test/test_system_setup.py
Normal file → Executable file
99
tuf/tests/system_test/test_system_setup.py
Normal file → Executable file
|
|
@ -48,12 +48,15 @@
|
|||
import random
|
||||
import urllib2
|
||||
import logging
|
||||
import optparse
|
||||
import tempfile
|
||||
import unittest
|
||||
import subprocess
|
||||
|
||||
import tuf.client.updater
|
||||
import tuf.repo.signerlib as signerlib
|
||||
|
||||
|
||||
# Disable/Enable logging. Comment-out to Enable logging.
|
||||
logging.getLogger('tuf')
|
||||
logging.disable(logging.CRITICAL)
|
||||
|
|
@ -76,7 +79,7 @@ class TestCase(unittest.TestCase):
|
|||
delete_file_at_repository(filename):
|
||||
Deletes a file on the repository ('repository_dir').
|
||||
|
||||
tuf_refresh()
|
||||
refresh_tuf_repository()
|
||||
Updates TUF metadata files.
|
||||
|
||||
|
||||
|
|
@ -141,6 +144,7 @@ class TestCase(unittest.TestCase):
|
|||
TUF = True
|
||||
|
||||
|
||||
|
||||
def setUp(self):
|
||||
unittest.TestCase.setUp(self)
|
||||
|
||||
|
|
@ -154,6 +158,7 @@ def setUp(self):
|
|||
self.filename1 = os.path.basename(self.filepath1)
|
||||
self.filename2 = os.path.basename(self.filepath2)
|
||||
|
||||
print
|
||||
# Start a simple server pointing to the repository directory.
|
||||
self.port = random.randint(30000, 45000)
|
||||
command = ['python', '-m', 'SimpleHTTPServer', str(self.port)]
|
||||
|
|
@ -171,6 +176,8 @@ def setUp(self):
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
unittest.TestCase.tearDown(self)
|
||||
|
||||
|
|
@ -188,6 +195,7 @@ def tearDown(self):
|
|||
|
||||
|
||||
|
||||
|
||||
def add_or_change_file_at_repository(self, filename=None, data=None):
|
||||
"""
|
||||
<Purpose>
|
||||
|
|
@ -226,6 +234,8 @@ def add_or_change_file_at_repository(self, filename=None, data=None):
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
def delete_file_at_repository(self, filename):
|
||||
"""
|
||||
<Purpose>
|
||||
|
|
@ -244,6 +254,8 @@ def delete_file_at_repository(self, filename):
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
@staticmethod
|
||||
def _open_connection(url):
|
||||
try:
|
||||
|
|
@ -256,6 +268,8 @@ def _open_connection(url):
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
def client_download(self, filename):
|
||||
if not isinstance(filename, basestring):
|
||||
print 'Wrong type: ' + repr(filepath) + '\n'
|
||||
|
|
@ -264,12 +278,21 @@ def client_download(self, filename):
|
|||
return connection.read()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def read_file_content(self, filepath):
|
||||
try:
|
||||
fileobj = open(filepath, 'rb')
|
||||
except Exception, e:
|
||||
raise
|
||||
|
||||
data = fileobj.read()
|
||||
fileobj.close()
|
||||
return data
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def tuf_setUp(self):
|
||||
|
|
@ -351,12 +374,28 @@ def tuf_setUp(self):
|
|||
# Generate the 'timestamp.txt' metadata file.
|
||||
signerlib.build_timestamp_file(self._keyid, self.tuf_metadata_dir)
|
||||
|
||||
# The repository is setup!
|
||||
|
||||
# It's time time setup tuf client.
|
||||
# Here is a mirrors dictionary that will allow a client to seek out
|
||||
# places to download the metadata and targets from.
|
||||
rel_repo_dir = os.path.basename(self.tuf_repository_dir)
|
||||
self.tuf_url = 'http://localhost:'+str(self.port)+'/'+rel_repo_dir+'/'
|
||||
self.repository_mirrors = {'mirror1':
|
||||
{'url_prefix': self.tuf_url,
|
||||
'metadata_path': 'metadata',
|
||||
'targets_path': 'targets',
|
||||
'confined_target_dirs': ['']}}
|
||||
|
||||
# Setting up client's TUF directory structure.
|
||||
# 'tuf.client.updater.py' expects the 'current' and 'previous'
|
||||
# directories to exist under client's 'metadata' directory.
|
||||
self.tuf_client_metadata_dir = os.path.join(self.tuf_client_dir,
|
||||
'metadata')
|
||||
self.tuf_client_downloads_dir = os.path.join(self.tuf_client_dir,
|
||||
'downloads')
|
||||
os.mkdir(self.tuf_client_metadata_dir)
|
||||
os.mkdir(self.tuf_client_downloads_dir)
|
||||
|
||||
# Move the metadata to the client's 'current' and 'previous' directories.
|
||||
self.client_current = os.path.join(self.tuf_client_metadata_dir,
|
||||
|
|
@ -368,6 +407,9 @@ def tuf_setUp(self):
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def tuf_tearDown(self):
|
||||
"""
|
||||
<Purpose>
|
||||
|
|
@ -375,6 +417,7 @@ def tuf_tearDown(self):
|
|||
tuf_setUp().
|
||||
|
||||
"""
|
||||
|
||||
shutil.rmtree(self.tuf_repository_dir)
|
||||
shutil.rmtree(self.tuf_client_dir)
|
||||
|
||||
|
|
@ -382,7 +425,7 @@ def tuf_tearDown(self):
|
|||
|
||||
|
||||
|
||||
def tuf_refresh(self):
|
||||
def refresh_tuf_repository(self):
|
||||
"""
|
||||
<Purpose>
|
||||
Update TUF metadata files. Call this method whenever targets files have
|
||||
|
|
@ -405,6 +448,46 @@ def tuf_refresh(self):
|
|||
|
||||
|
||||
|
||||
|
||||
|
||||
def tuf_client_download(self):
|
||||
# Adjusting configuration file (tuf.conf.py).
|
||||
tuf.conf.repository_directory = self.tuf_client_dir
|
||||
self.destination_dir = self.tuf_client_downloads_dir
|
||||
|
||||
# Instantiate an updater.
|
||||
updater = tuf.client.updater.Updater('updater', self.repository_mirrors)
|
||||
|
||||
# Update all metadata.
|
||||
updater.refresh()
|
||||
|
||||
# Get the latest information on targets.
|
||||
targets = updater.all_targets()
|
||||
|
||||
# Determine which targets have changed or new.
|
||||
updated_targets = updater.updated_targets(targets, self.destination_dir)
|
||||
|
||||
# Download new/changed targets and store them in the destination
|
||||
# directory 'destination_dir'.
|
||||
for target in updated_targets:
|
||||
updater.download_target(target, self.destination_dir)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def tuf_option():
|
||||
usage = 'usage: %prog [options]'
|
||||
parser = optparse.OptionParser(usage=usage)
|
||||
parser.add_option('-t', '--tuf', '--TUF', action='store_true', dest='tuf',
|
||||
default=False, help='Implement tuf.')
|
||||
option, args = parser.parse_args()
|
||||
TestCase.TUF = option.tuf
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class test_TestCase(TestCase):
|
||||
"""
|
||||
<Purpose>
|
||||
|
|
@ -440,7 +523,7 @@ def test_tuf_setup(self):
|
|||
self.assertTrue(os.path.isfile(target1))
|
||||
self.assertTrue(os.path.isfile(target2))
|
||||
|
||||
self.tuf_refresh()
|
||||
self.refresh_tuf_repository()
|
||||
|
||||
|
||||
|
||||
|
|
@ -449,7 +532,7 @@ def test_methods(self):
|
|||
Making sure following methods work as intended:
|
||||
- add_or_change_file_at_repository()
|
||||
- delete_file_at_repository()
|
||||
- tuf_refresh()
|
||||
- refresh_tuf_repository()
|
||||
"""
|
||||
new_file = self.add_or_change_file_at_repository()
|
||||
fileobj = open(new_file, 'rb')
|
||||
|
|
@ -467,7 +550,7 @@ def test_methods(self):
|
|||
self.assertTrue(os.path.exists(old_file))
|
||||
self.assertEquals('1234', fileobj.read())
|
||||
|
||||
self.tuf_refresh()
|
||||
self.refresh_tuf_repository()
|
||||
new_target = os.path.join(self.tuf_targets_dir,
|
||||
os.path.basename(new_file))
|
||||
self.assertTrue(os.path.exists(new_target))
|
||||
|
|
@ -479,5 +562,7 @@ def test_methods(self):
|
|||
|
||||
|
||||
|
||||
suite = unittest.TestLoader().loadTestsFromTestCase(test_TestCase)
|
||||
unittest.TextTestRunner(verbosity=2).run(suite)
|
||||
|
||||
# Uncomment to run the test of 'test_TestCase' class.
|
||||
#suite = unittest.TestLoader().loadTestsFromTestCase(test_TestCase)
|
||||
#unittest.TextTestRunner(verbosity=2).run(suite)
|
||||
Loading…
Reference in a new issue