Adding tests: test_indefinite_freeze_attack.py, test_slow_retrieval_attack.py

This commit is contained in:
Kon 2013-04-02 14:02:15 -04:00
parent 4cd13c61df
commit 4a6490723e
4 changed files with 159 additions and 187 deletions

View file

@ -1133,7 +1133,7 @@ def build_timestamp_file(timestamp_keyids, metadata_directory):
release_filepath = os.path.join(metadata_directory, RELEASE_FILENAME)
timestamp_filepath = os.path.join(metadata_directory, TIMESTAMP_FILENAME)
# Generate and sign the release metadata.
# Generate and sign the timestamp metadata.
timestamp_metadata = generate_timestamp_metadata(release_filepath)
signable = sign_metadata(timestamp_metadata, timestamp_keyids, timestamp_filepath)

View file

@ -14,117 +14,130 @@
<Purpose>
Simulate an indefinite freeze attack.
In an indefinite freeze attack, attacker is able to respond to client's
requests with the same, outdated metadata without the client being aware.
"""
import os
import sys
import time
import shutil
import urllib
import tempfile
import util_test_tools
import tuf.interposition
import tuf.formats
import tuf.repo.signerlib as signerlib
from tuf.interposition import urllib_tuf
# Disable logging.
util_test_tools.disable_logging()
class IndefineteFreezeAttackError(Exception):
class IndefiniteFreezeAttackAlert(Exception):
pass
def test_replay_attack(TUF=False):
EXPIRATION = 1 # second(s)
def _remake_timestamp(metadata_dir, keyids):
"""Create timestamp metadata object. Modify expiration date. Sign and
write the metadata.
"""
release_filepath = os.path.join(metadata_dir, 'release.txt')
timestamp_filepath = os.path.join(metadata_dir, 'timestamp.txt')
timestamp_metadata = signerlib.generate_timestamp_metadata(release_filepath)
timestamp_metadata['signed']['expires'] = \
tuf.formats.format_time(time.time() + EXPIRATION)
signable = \
signerlib.sign_metadata(timestamp_metadata, keyids, timestamp_filepath)
signerlib.write_metadata_file(signable, timestamp_filepath)
def _download(url, filename, tuf=False):
if tuf:
urllib_tuf.urlretrieve(url, filename)
else:
urllib.urlretrieve(url, filename)
def test_indefinite_freeze_attack(TUF=False):
"""
<Arguments>
TUF:
If set to 'False' all directories that start with 'tuf_' are ignored,
indicating that tuf is not implemented.
<Purpose>
The idea here is to expire timestamp metadata so that the attacker
"""
ERROR_MSG = '\tIndefinite Freeze Attack was Successful!\n\n'
try:
# Setup.
root_repo, url, server_proc, keyids, interpose_json = \
util_test_tools.init_repo(tuf=TUF)
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF)
reg_repo = os.path.join(root_repo, 'reg_repo')
tuf_repo = os.path.join(root_repo, 'tuf_repo')
metadata_dir = os.path.join(tuf_repo, 'metadata')
downloads = os.path.join(root_repo, 'downloads')
tuf_targets = os.path.join(tuf_repo, 'targets')
# Add file to 'repo' directory: {root_repo}
filepath = util_test_tools.add_file_to_repository(reg_repo, 'Test A')
file_basename = os.path.basename(filepath)
url_to_repo = url+'reg_repo/'+file_basename
downloaded_file = os.path.join(downloads, file_basename)
# Attacker saves the original file into 'evil_dir'.
evil_dir = tempfile.mkdtemp(dir=root_repo)
vulnerable_file = os.path.join(evil_dir, file_basename)
shutil.copy(filepath, evil_dir)
# Refresh the tuf repository and apply tuf interpose.
if TUF:
util_test_tools.tuf_refresh_repo(root_repo, keyids)
tuf.interposition.configure(interpose_json)
tuf.interposition.interpose()
print 'TUF ...'
# End Setup.
# Client performs initial update.
urllib.urlretrieve(url_to_repo, downloaded_file)
# Downloads are stored in the same directory '{root_repo}/downloads/'
# for regular and tuf clients.
downloaded_content = util_test_tools.read_file_content(downloaded_file)
msg = '[Initial Updata] Failed to download the file.'
if 'Test A' != downloaded_content:
raise TestSetupError(msg)
# Developer patches the file and updates the repository.
util_test_tools.modify_file_at_repository(filepath, 'Test NOT A')
# Updating tuf repository. This will copy files from regular repository
# into tuf repository and refresh the metad
if TUF:
# Update TUF metadata before attacker modifies anything.
util_test_tools.tuf_refresh_repo(root_repo, keyids)
# Client downloads the patched file.
urllib.urlretrieve(url_to_repo, downloaded_file)
# Modify the url. Remember that the interposition will intercept
# urls that have 'localhost:9999' hostname, which was specified in
# the json interposition configuration file. Look for 'hostname'
# in 'util_test_tools.py'. Further, the 'file_basename' is the target
# path relative to 'targets_dir'.
url_to_repo = 'http://localhost:9999/'+file_basename
# Content of the downloaded file.
downloaded_content = util_test_tools.read_file_content(downloaded_file)
msg = '[Update] Failed to update the file.'
if 'Test NOT A' != downloaded_content:
raise TestSetupError(msg)
# Make timestamp metadata with close expiration date (2s).
_remake_timestamp(metadata_dir, keyids)
# Attacker tries to be clever, he manages to modifies regular and tuf
# targets directory by replacing a patched file with an old one.
if os.path.isdir(tuf_targets):
target = os.path.join(tuf_targets, file_basename)
util_test_tools.delete_file_at_repository(target)
shutil.copy(vulnerable_file, tuf_targets)
# Verify that 'target' is an old, un-patched file.
target = os.path.join(tuf_targets, file_basename)
target_content = util_test_tools.read_file_content(target)
msg = "The 'target' file contains new data!"
if 'Test A' != target_content:
raise TestSetupError(msg)
# Client performs initial download.
try:
_download(url=url_to_repo, filename=downloaded_file, tuf=TUF)
except tuf.ExpiredMetadataError:
msg = ('Metadata has expired too soon, extend expiration period. '+
'Current expiration is set to: '+repr(EXPIRATION)+' second(s).')
sys.exit(msg)
# Expire timestamp.
time.sleep(EXPIRATION)
# Try downloading again, this should raise an error.
try:
_download(url=url_to_repo, filename=downloaded_file, tuf=TUF)
except tuf.ExpiredMetadataError, error:
pass
else:
util_test_tools.delete_file_at_repository(filepath)
shutil.copy(vulnerable_file, reg_repo)
raise IndefiniteFreezeAttackAlert(ERROR_MSG)
# Client downloads the file once time.
urllib.urlretrieve(url_to_repo, downloaded_file)
# Check whether the attack succeeded by inspecting the content of the
# update. The update should contain 'Test NOT A'.
downloaded_content = util_test_tools.read_file_content(downloaded_file)
msg = 'Replay attack was successful!\n'
if 'Test NOT A' != downloaded_content:
raise ReplayAttackError(msg)
finally:
tuf.interposition.go_away()
util_test_tools.cleanup(root_repo, server_proc)
@ -132,11 +145,11 @@ def test_replay_attack(TUF=False):
try:
test_replay_attack(TUF=False)
except ReplayAttackError, error:
test_indefinite_freeze_attack(TUF=False)
except IndefiniteFreezeAttackAlert, error:
print error
try:
test_replay_attack(TUF=True)
except ReplayAttackError, error:
test_indefinite_freeze_attack(TUF=True)
except IndefiniteFreezeAttackAlert, error:
print error

View file

@ -40,35 +40,70 @@
import subprocess
import util_test_tools
from tuf.interposition import urllib_tuf
# Disable logging.
util_test_tools.disable_logging()
def test():
class SlowRetrievalAttackAlert(Exception):
pass
def download_using_urlopen(url, tuf=False):
if tuf:
return urllib_tuf.urlopen(url)
else:
return urllib.urlopen(url)
def test_slow_retrieval_attack(TUF=True):
ERROR_MSG = '\tSlow Retrieval Attack was Successful!\n\n'
# Launch the server.
port = random.randint(30000, 45000)
print port
command = ['python', 'slow_retrieval_server.py', str(port)]
server_process = subprocess.Popen(command, stderr=subprocess.PIPE)
time.sleep(.1)
try:
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=True)
# Setup.
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF, port=port)
print 'root_repo: '+root_repo
reg_repo = os.path.join(root_repo, 'reg_repo')
# Make a file in 'reg_repo'.
filepath = util_test_tools.add_file_to_repository(reg_repo, 'Test')
downloads = os.path.join(root_repo, 'downloads')
# Path of the file relative to 'root_repo'.
relative_filepath = os.path.relpath(filepath)
# Add file to 'repo' directory: {root_repo}
filepath = util_test_tools.add_file_to_repository(reg_repo, 'A'*10)
file_basename = os.path.basename(filepath)
url_to_file = url+'reg_repo/'+file_basename
downloaded_file = os.path.join(downloads, file_basename)
if TUF:
print 'TUF ...'
tuf_repo = os.path.join(root_repo, 'tuf_repo')
# Update TUF metadata before attacker modifies anything.
util_test_tools.tuf_refresh_repo(root_repo, keyids)
# Modify the url. Remember that the interposition will intercept
# urls that have 'localhost:9999' hostname, which was specified in
# the json interposition configuration file. Look for 'hostname'
# in 'util_test_tools.py'. Further, the 'file_basename' is the target
# path relative to 'targets_dir'.
url_to_file = 'http://localhost:9999/'+file_basename
# Tailor the url.
url_to_file = 'http://localhost:'+str(port)+'/'+relative_filepath
# Download the content of the file using the server.
file_content = urllib.urlopen(url_to_file)
# NOTE: if TUF is enabled the metadata files will be downloaded first. This
# WILL take a long time.
file_content = download_using_urlopen(url_to_file, tuf=TUF)
print file_content.read()
@ -81,97 +116,4 @@ def test():
util_test_tools.cleanup(root_repo, server_proc)
test()
'''
import os
import shutil
import urllib
import tempfile
import util_test_tools
from tuf.interposition import urllib_tuf
# Disable logging.
util_test_tools.disable_logging()
class TestSetupError(Exception):
pass
class SlowRetrievalAttack(Exception):
pass
def download(url, filename, tuf=False):
if tuf:
urllib_tuf.urlretrieve(url, filename)
else:
urllib.urlretrieve(url, filename)
def test_arbitrary_package_attack(TUF=False):
"""
<Arguments>
TUF:
If set to 'False' all directories that start with 'tuf_' are ignored,
indicating that tuf is not implemented.
<Purpose>
Illustrate endless data attack vulnerability.
"""
try:
# Setup.
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF)
reg_repo = os.path.join(root_repo, 'reg_repo')
tuf_repo = os.path.join(root_repo, 'tuf_repo')
downloads = os.path.join(root_repo, 'downloads')
tuf_targets = os.path.join(tuf_repo, 'targets')
# Add a file to 'repo' directory: {root_repo}
filepath = util_test_tools.add_file_to_repository(reg_repo, 'Test A')
file_basename = os.path.basename(filepath)
url_to_repo = url+'reg_repo/'+file_basename
downloaded_file = os.path.join(downloads, file_basename)
# Refresh the tuf repository and apply tuf interpose.
if TUF:
# Update TUF metadata before attacker modifies anything.
util_test_tools.tuf_refresh_repo(root_repo, keyids)
# End Setup.
# Client downloads (tries to download) the file.
download(url=url_to_repo, filename=downloaded_file, tuf=TUF)
# Check whether the attack succeeded by inspecting the content of the
# update. The update should contain 'Test A'.
downloaded_content = util_test_tools.read_file_content(downloaded_file)
msg = 'Slow Retrieval Attack was successful!\n'
print downloaded_content
if 'Test A' != downloaded_content:
raise SlowRetrievalAttack(msg)
finally:
util_test_tools.cleanup(root_repo, server_proc)
try:
test_arbitrary_package_attack(TUF=False)
except SlowRetrievalAttack, error:
print error
try:
test_arbitrary_package_attack(TUF=True)
except SlowRetrievalAttack, error:
print error
'''
test_slow_retrieval_attack()

View file

@ -155,7 +155,7 @@ def disable_logging():
def init_repo(tuf=False):
def init_repo(tuf=False, port=None):
# Temp root directory for regular and tuf repositories.
# WARNING: tuf client stores files in '{root_repo}/downloads/' directory!
# Make sure regular download are NOT stored in the that directory when
@ -166,11 +166,12 @@ def init_repo(tuf=False):
root_repo = tempfile.mkdtemp(dir=os.getcwd())
os.mkdir(os.path.join(root_repo, 'reg_repo'))
os.mkdir(os.path.join(root_repo, 'downloads'))
# Start a simple server pointing to the repository directory.
port = random.randint(30000, 45000)
command = ['python', '-m', 'SimpleHTTPServer', str(port)]
server_proc = subprocess.Popen(command, stderr=subprocess.PIPE)
server_proc = None
if port is None:
# Start a simple server pointing to the repository directory.
port = random.randint(30000, 45000)
command = ['python', '-m', 'SimpleHTTPServer', str(port)]
server_proc = subprocess.Popen(command, stderr=subprocess.PIPE)
# Tailor url for the repository. In order to download a 'file.txt'
# from 'reg_repo' do: url+'reg_repo/file.txt'
@ -193,10 +194,15 @@ def init_repo(tuf=False):
def cleanup(root_repo, server_process):
if server_process.returncode is None:
server_process.kill()
if server_process is not None:
if server_process.returncode is None:
server_process.kill()
print 'Server terminated.\n'
# Clear the keystore.
keystore.clear_keystore()
# Removing repository directory.
try:
shutil.rmtree(root_repo)
@ -516,6 +522,11 @@ def create_delegation(tuf_repo, delegated_targets_path, keyid, keyid_password,
keystore_dir = os.path.join(tuf_repo, 'keystore')
metadata_dir = os.path.join(tuf_repo, 'metadata')
original_get_metadata_directory = signercli._get_metadata_directory
original_prompt = signercli._prompt
original_get_password = signercli._get_password
original_get_keyids = signercli._get_keyids
# Patch signercli._get_metadata_directory()
_get_metadata_directory(metadata_dir)
@ -557,4 +568,10 @@ def _mock_get_keyid(junk, keyid=keyid):
# Patch signercli._get_keyids().
signercli._get_keyids = _mock_get_keyid
signercli.make_delegation(keystore_dir)
signercli.make_delegation(keystore_dir)
keystore.clear_keystore()
signercli._get_keyids = original_get_keyids
signercli._get_password = original_get_password
signercli._prompt = original_prompt
signercli._get_metadata_directory = original_get_metadata_directory