Added mix-and-match attack test and statement_coverage (based on coverage.py).

This commit is contained in:
Kon 2013-04-13 12:56:10 -04:00
parent 753f318990
commit f46b1f71ea
7 changed files with 418 additions and 60 deletions

86
tuf/tests/statement_coverage.py Executable file
View file

@ -0,0 +1,86 @@
"""
<Program Name>
statement_coverage.py
<Author>
Konstantin Andrianov
<Started>
March 20, 2013.
<Copyright>
See LICENSE for licensing information.
<Purpose>
Measure test coverage.
NOTE: This script is based on third party software. In order to use this
script install Ned Batchelder's coverage.py:
http://nedbatchelder.com/code/coverage/
"""
import os
import sys
# Try to import coverage.py. Coverage.py is a third party software.
try:
coverage = __import__('coverage')
except ImportError, error:
error_msg = ("\nIt appears that coverage.py is not installed. Install "+
"Ned Batchelder's coverage.py "+
"('http://nedbatchelder.com/code/coverage/') and try again.\n")
print error_msg
raise
cov = coverage.coverage()
cov.start()
try:
current_directory = os.getcwd()
current_directory_content = os.listdir(current_directory)
# Find test scripts and import them.
test_modules = [] # Test modules.
tested_modules = [] # Modules that are tested by the 'test_modules'.
for _file in current_directory_content:
if _file.startswith('test') and _file.endswith('.py'):
_file = os.path.splitext(_file)[0]
test_modules.append(_file)
# Import the module.
try:
module = __import__(_file)
except ImportError, error:
print 'Unable to load the module: '+_file
raise
_file = '.'+_file[5:]
tested_modules.append(_file)
finally:
cov.stop()
# Include quickstart.
tested_modules.remove('.quickstart')
tested_modules.append('quickstart')
# Extracting tuf modules.
tuf_modules = {} # list of all loaded tuf modules.
for module_name in sys.modules:
if module_name.startswith('tuf') or module_name.startswith('quickstart'):
tuf_modules[module_name] = sys.modules[module_name]
# Tested module paths.
tested_module_paths = []
for module_name in tuf_modules:
for tested_module in tested_modules:
if module_name.endswith(tested_module):
tested_module_paths.append(tuf_modules[module_name].__file__)
cov.report(tested_module_paths)

View file

@ -1,3 +1,23 @@
"""
<Program Name>
slow_retrieval_server.py
<Author>
Konstantin Andrianov
<Started>
March 13, 2012
<Copyright>
See LICENSE for licensing information.
<Purpose>
Server that throttles data by sending one byte at a time
(specified time interval 'DELAY'). The server is used in
test_slow_retrieval_attack.py.
"""
import os
import sys
import time
@ -5,6 +25,10 @@
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
DELAY = 1
# HTTP request handler.
class Handler(BaseHTTPRequestHandler):
@ -22,8 +46,7 @@ def do_GET(self):
# Throttle the file by sending a character every few seconds.
for i in range(len(data)):
print i
time.sleep(1)
time.sleep(DELAY)
self.wfile.write(data[i])
return
@ -42,15 +65,15 @@ def get_random_port():
def run(port):
server_address = ('localhost', port)
httpd = HTTPServer(server_address, Handler)
print('Server is active...')
print('Slow server is active on port: '+str(port)+' ...')
httpd.handle_request()
if __name__ == '__main__':
if sys.argv[1]:
port = int(sys.argv[1])
else:
port = get_random_port()
print 'Port: '+str(port)
run(port)
if len(sys.argv) > 1:
port = int(sys.argv[1])
else:
port = get_random_port()
run(port)

View file

@ -35,9 +35,15 @@
util_test_tools.disable_logging()
class ExtraneousDependenciesAttackAlert(Exception):
pass
def test_extraneous_dependencies_attack():
ERROR_MSG = '\tExtraneous Dependencies Attack Succeeded!\n\n'
try:
# Setup.
@ -111,43 +117,66 @@ def _make_delegation(rolename):
_make_delegation('role2')
# The attack.
# The attacks.
def _write_rogue_metadata():
# Load the keystore before rebuilding the metadata.
tuf.repo.keystore.load_keystore_from_keyfiles(keystore_dir,
roles['role1']['keyid'],
roles['role1']['password'])
# Rebuild the delegation role metadata.
signerlib.build_delegated_role_file(roles['role2']['targets_dir'],
roles['role1']['keyid'], metadata_dir,
roles['role1']['metadata_dir'],
'role1.txt')
# Update release and timestamp metadata.
util_test_tools.make_release_meta(root_repo)
util_test_tools.make_timestamp_meta(root_repo)
# Modify a target that was delegated to 'role2'.
util_test_tools.modify_file_at_repository(roles['role2']['target_path'],
'Test NOT B')
# Load the keystore before rebuilding the metadata.
tuf.repo.keystore.load_keystore_from_keyfiles(keystore_dir,
roles['role1']['keyid'],
roles['role1']['password'])
# Rebuild the delegation role metadata.
signerlib.build_delegated_role_file(roles['role2']['targets_dir'],
roles['role1']['keyid'], metadata_dir,
roles['role1']['metadata_dir'],
'role1.txt')
# Update release and timestamp metadata.
util_test_tools.make_release_meta(root_repo)
util_test_tools.make_timestamp_meta(root_repo)
# Update rogue delegatee metadata.
_write_rogue_metadata()
# Perform another client download.
try:
urllib_tuf.urlretrieve(roles['role2']['url'], roles['role2']['dest_path'])
except tuf.MetadataNotAvailableError, e:
raise
pass
else:
raise ExtraneousDependenciesAttackAlert(ERROR_MSG)
finally:
# Add a target file to the directory delegated to 'role2' but not 'role1'.
util_test_tools.add_file_to_repository(roles['role2']['targets_dir'], 'AAAA')
# Update rogue delegatee metadata.
_write_rogue_metadata()
# Perform another client download.
try:
urllib_tuf.urlretrieve(roles['role2']['url'], roles['role2']['dest_path'])
except tuf.MetadataNotAvailableError, e:
pass
else:
raise ExtraneousDependenciesAttackAlert(ERROR_MSG)
finally:
server_proc.kill()
util_test_tools.cleanup(root_repo, server_proc)
#util_test_tools.cleanup(root_repo, server_proc)
try:
test_extraneous_dependencies_attack()
except tuf.MetadataNotAvailableError, error:
print str(error)+'\n'
else:
print 'Extraneous Dependencies Attack Succeeded!\n'
except ExtraneousDependenciesAttackAlert, error:
print 'error'

View file

@ -149,6 +149,7 @@ def test_indefinite_freeze_attack(TUF=False):
except IndefiniteFreezeAttackAlert, error:
print error
try:
test_indefinite_freeze_attack(TUF=True)
except IndefiniteFreezeAttackAlert, error:

View file

@ -0,0 +1,197 @@
"""
<Program Name>
test_mix_and_match_attack.py
<Author>
Konstantin Andrianov
<Started>
March 27, 2012
<Copyright>
See LICENSE for licensing information.
<Purpose>
Simulate slow retrieval attack. A simple client update vs. client
update implementing TUF.
In the mix-and-match attack, attacker is able to trick clients into using
combination of metadata that never existed together on the repository at
the same time.
NOTE: The interposition provided by 'tuf.interposition' is used to intercept
all calls made by urllib/urillib2 to certain network locations specified in
the interposition configuration file. Look up interposition.py for more
information and illustration of a sample contents of the interposition
configuration file. Interposition was meant to make TUF integration with an
existing software updater an easy process. This allows for more flexibility
to the existing software updater. However, if you are planning to solely use
TUF there should be no need for interposition, all necessary calls will be
generated from within TUF.
Note: There is no difference between 'updates' and 'target' files.
"""
import os
import shutil
import urllib
import tempfile
import util_test_tools
from tuf.interposition import urllib_tuf
# Disable logging.
util_test_tools.disable_logging()
class MixAndMatchAttackAlert(Exception):
pass
def _download(url, filename, tuf=False):
if tuf:
urllib_tuf.urlretrieve(url, filename)
else:
urllib.urlretrieve(url, filename)
def test_mix_and_match_attack(TUF=False):
"""
Attack design:
There are 3 stages:
Stage 1: Consists of a usual mode of operations using tuf. Client
downloads a target file. (Initial download)
Stage 2: The target file is legitimately modified and metadata correctly
updated. Client downloads the target file again. (Patched target download)
Stage 3: The target file is legitimately modified and metadata correctly
updated again. However, before client gets to download the newly patched
target file the attacker replaces the release metadata, targets metadata
and the target file with the ones from stage 1 (mix-and-match attack).
Note that timestamp metadata is untouched. Further note that same would
happen if only target metadata, and target file are reverted.
"""
ERROR_MSG = '\tMix-And-Match Attack was Successful!\n\n'
try:
# Setup / Stage 1
# ---------------
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF)
reg_repo = os.path.join(root_repo, 'reg_repo')
downloads = os.path.join(root_repo, 'downloads')
evil_dir = tempfile.mkdtemp(dir=root_repo)
# Add file to 'repo' directory: {root_repo}
filepath = util_test_tools.add_file_to_repository(reg_repo, 'A'*10)
file_basename = os.path.basename(filepath)
url_to_file = url+'reg_repo/'+file_basename
downloaded_file = os.path.join(downloads, file_basename)
# Attacker saves the initial file.
shutil.copy(filepath, evil_dir)
unpatched_file = os.path.join(evil_dir, file_basename)
if TUF:
print 'TUF ...'
tuf_repo = os.path.join(root_repo, 'tuf_repo')
tuf_targets = os.path.join(tuf_repo, 'targets')
metadata_dir = os.path.join(tuf_repo, 'metadata')
release_meta_file = os.path.join(metadata_dir, 'release.txt')
targets_meta_file = os.path.join(metadata_dir, 'targets.txt')
target = os.path.join(tuf_targets, file_basename)
# Update TUF metadata before attacker modifies anything.
util_test_tools.tuf_refresh_repo(root_repo, keyids)
# Attacker saves the original metadata and the target file.
#shutil.copy(target, evil_dir)
shutil.copy(release_meta_file, evil_dir)
shutil.copy(targets_meta_file, evil_dir)
#target_old = os.path.join(evil_dir, file_basename)
release_meta_file_old = os.path.join(evil_dir, 'release.txt')
targets_meta_file_old = os.path.join(evil_dir, 'targets.txt')
# Modify the url. Remember that the interposition will intercept
# urls that have 'localhost:9999' hostname, which was specified in
# the json interposition configuration file. Look for 'hostname'
# in 'util_test_tools.py'. Further, the 'file_basename' is the target
# path relative to 'targets_dir'.
url_to_file = 'http://localhost:9999/'+file_basename
# Client's initial download.
_download(url=url_to_file, filename=downloaded_file, tuf=TUF)
# Stage 2
# -------
# Developer patches the file and updates the repository.
util_test_tools.modify_file_at_repository(filepath, 'B'*11)
# Updating tuf repository. This will copy files from regular repository
# into tuf repository and refresh the metadata
if TUF:
util_test_tools.tuf_refresh_repo(root_repo, keyids)
# Client downloads the patched file.
_download(url=url_to_file, filename=downloaded_file, tuf=TUF)
downloaded_content = util_test_tools.read_file_content(downloaded_file)
print downloaded_content
# Stage 3
# -------
# Developer patches the file and updates the repository again.
util_test_tools.modify_file_at_repository(filepath, 'C'*10)
# Updating tuf repository. This will copy files from regular repository
# into tuf repository and refresh the metadata
if TUF:
util_test_tools.tuf_refresh_repo(root_repo, keyids)
# Attacker replaces the metadata and the target file.
shutil.copyfile(unpatched_file, target)
shutil.copyfile(release_meta_file_old, release_meta_file)
shutil.copyfile(targets_meta_file_old, targets_meta_file)
# Attacker replaces the patched file with the unpatched one.
shutil.copyfile(unpatched_file, filepath)
# Client tries to downloads the newly patched file.
_download(url=url_to_file, filename=downloaded_file, tuf=TUF)
# Check whether the attack succeeded by inspecting the content of the
# update. The update should contain 'Test NOT A'.
downloaded_content = util_test_tools.read_file_content(downloaded_file)
if ('B'*10) != downloaded_content:
print downloaded_content
raise MixAndMatchAttackAlert(ERROR_MSG)
finally:
pass
#util_test_tools.cleanup(root_repo, server_proc)
try:
test_mix_and_match_attack(TUF=False)
except MixAndMatchAttackAlert, error:
print error
try:
test_mix_and_match_attack(TUF=True)
except MixAndMatchAttackAlert, error:
print error

View file

@ -15,7 +15,11 @@
Simulate a replay attack. A simple client update vs. client update
implementing TUF.
Note: The interposition provided by 'tuf.interposition' is used to intercept
In the replay attack an attacker is able to trick clients into installing
software that is older than that which the client previously knew to be
available.
NOTE: The interposition provided by 'tuf.interposition' is used to intercept
all calls made by urllib/urillib2 to certain hostname specified in
the interposition configuration file. Look up interposition.py for more
information and illustration of a sample contents of the interposition
@ -33,8 +37,8 @@
import shutil
import urllib
import tempfile
import util_test_tools
import util_test_tools
from tuf.interposition import urllib_tuf
@ -77,7 +81,7 @@ def test_replay_attack(TUF=False):
"""
ERROR_MSG = 'Replay Attack was Successful!\n'
ERROR_MSG = '\tReplay Attack was Successful!\n\n'
try:
@ -100,6 +104,8 @@ def test_replay_attack(TUF=False):
shutil.copy(filepath, evil_dir)
if TUF:
print 'TUF ...'
# Update TUF metadata before attacker modifies anything.
util_test_tools.tuf_refresh_repo(root_repo, keyids)
@ -126,7 +132,7 @@ def test_replay_attack(TUF=False):
util_test_tools.modify_file_at_repository(filepath, 'Test NOT A')
# Updating tuf repository. This will copy files from regular repository
# into tuf repository and refresh the metad
# into tuf repository and refresh the metadata
if TUF:
util_test_tools.tuf_refresh_repo(root_repo, keyids)
@ -175,7 +181,6 @@ def test_replay_attack(TUF=False):
try:
test_replay_attack(TUF=False)
except ReplayAttackAlert, error:
print error
@ -183,6 +188,5 @@ def test_replay_attack(TUF=False):
try:
test_replay_attack(TUF=True)
except ReplayAttackAlert, error:
print error

View file

@ -15,7 +15,13 @@
Simulate slow retrieval attack. A simple client update vs. client
update implementing TUF.
Note: The interposition provided by 'tuf.interposition' is used to intercept
During the slow retrieval attack, attacker is able to prevent clients from
being aware of interference with receiving updates by responding to client
requests so slowly that automated updates never complete.
NOTE: Currently TUF does not protect against slow retrieval attacks.
NOTE: The interposition provided by 'tuf.interposition' is used to intercept
all calls made by urllib/urillib2 to certain network locations specified in
the interposition configuration file. Look up interposition.py for more
information and illustration of a sample contents of the interposition
@ -29,17 +35,14 @@
"""
# TODO: implement slow retrieval server... And design the test.
# Should there be a time bracket, during which the download is
# expected to happen?
import os
import time
import urllib
import random
import subprocess
import util_test_tools
from multiprocessing import Process
import util_test_tools
from tuf.interposition import urllib_tuf
@ -52,29 +55,30 @@ class SlowRetrievalAttackAlert(Exception):
pass
def download_using_urlopen(url, tuf=False):
def _download(url, filename, tuf=False):
if tuf:
return urllib_tuf.urlopen(url)
urllib_tuf.urlretrieve(url, filename)
else:
return urllib.urlopen(url)
urllib.urlretrieve(url, filename)
def test_slow_retrieval_attack(TUF=True):
def test_slow_retrieval_attack(TUF=False):
WAIT_TIME = 5 # Number of seconds to wait until download completes.
ERROR_MSG = '\tSlow Retrieval Attack was Successful!\n\n'
# Launch the server.
port = random.randint(30000, 45000)
print port
command = ['python', 'slow_retrieval_server.py', str(port)]
server_process = subprocess.Popen(command, stderr=subprocess.PIPE)
time.sleep(.1)
try:
# Setup.
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF, port=port)
print 'root_repo: '+root_repo
root_repo, url, server_proc, keyids = \
util_test_tools.init_repo(tuf=TUF, port=port)
reg_repo = os.path.join(root_repo, 'reg_repo')
downloads = os.path.join(root_repo, 'downloads')
@ -84,6 +88,7 @@ def test_slow_retrieval_attack(TUF=True):
url_to_file = url+'reg_repo/'+file_basename
downloaded_file = os.path.join(downloads, file_basename)
if TUF:
print 'TUF ...'
tuf_repo = os.path.join(root_repo, 'tuf_repo')
@ -99,13 +104,14 @@ def test_slow_retrieval_attack(TUF=True):
url_to_file = 'http://localhost:9999/'+file_basename
# Download the content of the file using the server.
# NOTE: if TUF is enabled the metadata files will be downloaded first. This
# WILL take a long time.
file_content = download_using_urlopen(url_to_file, tuf=TUF)
print file_content.read()
# Client tries to download.
# NOTE: if TUF is enabled the metadata files will be downloaded first.
proc = Process(target=_download, args=(url_to_file, downloaded_file, TUF))
proc.start()
proc.join(WAIT_TIME)
if proc.exitcode is None:
proc.terminate()
raise SlowRetrievalAttackAlert(ERROR_MSG)
finally:
@ -116,4 +122,16 @@ def test_slow_retrieval_attack(TUF=True):
util_test_tools.cleanup(root_repo, server_proc)
test_slow_retrieval_attack()
try:
test_slow_retrieval_attack(TUF=False)
except SlowRetrievalAttackAlert, error:
print error
try:
test_slow_retrieval_attack(TUF=True)
except SlowRetrievalAttackAlert, error:
print error