mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Merge remote-tracking branch 'origin/master'
This commit is contained in:
commit
cab37422b3
9 changed files with 443 additions and 67 deletions
|
|
@ -197,13 +197,14 @@ def build_repository(project_directory):
|
|||
|
||||
# Handle the expiration time. The expiration date determines when
|
||||
# the top-level roles expire.
|
||||
message = '\nWhen would you like your certificates to expire? (mm/dd/yyyy): '
|
||||
prompt_message = \
|
||||
'\nWhen would you like your certificates to expire? (mm/dd/yyyy): '
|
||||
timeout = None
|
||||
for attempt in range(MAX_INPUT_ATTEMPTS):
|
||||
# Get the difference between the user's entered expiration date and today's
|
||||
# date. Convert and store the difference to total days till expiration.
|
||||
try:
|
||||
input_date = _prompt(message)
|
||||
input_date = _prompt(prompt_message)
|
||||
expiration_date = datetime.datetime.strptime(input_date, '%m/%d/%Y')
|
||||
time_difference = expiration_date - datetime.datetime.now()
|
||||
timeout = time_difference.days
|
||||
|
|
@ -299,11 +300,12 @@ def build_repository(project_directory):
|
|||
# Ensure the user inputs a valid threshold value.
|
||||
role_threshold = None
|
||||
for attempt in range(MAX_INPUT_ATTEMPTS):
|
||||
message = '\nEnter the desired threshold for the role '+repr(role)+': '
|
||||
prompt_message = \
|
||||
'\nEnter the desired threshold for the role '+repr(role)+': '
|
||||
|
||||
# Check for non-integers and values less than one.
|
||||
try:
|
||||
role_threshold = _prompt(message, int)
|
||||
role_threshold = _prompt(prompt_message, int)
|
||||
if not tuf.formats.THRESHOLD_SCHEMA.matches(role_threshold):
|
||||
raise ValueError
|
||||
break
|
||||
|
|
|
|||
|
|
@ -581,7 +581,7 @@ def refresh(self):
|
|||
|
||||
|
||||
|
||||
def _update_metadata(self, metadata_role, compression=None):
|
||||
def _update_metadata(self, metadata_role, fileinfo=None, compression=None):
|
||||
"""
|
||||
<Purpose>
|
||||
Download, verify, and 'install' the metadata belonging to 'metadata_role'.
|
||||
|
|
@ -595,6 +595,11 @@ def _update_metadata(self, metadata_role, compression=None):
|
|||
The name of the metadata. This is a role name and should not end
|
||||
in '.txt'. Examples: 'root', 'targets', 'targets/linux/x86'.
|
||||
|
||||
fileinfo:
|
||||
A dictionary containing length and hashes of the metadata file.
|
||||
Ex: {"hashes": {"sha256": "3a5a6ec1f353...dedce36e0"},
|
||||
"length": 1340}
|
||||
|
||||
compression:
|
||||
A string designating the compression type of 'metadata_role'.
|
||||
The 'release' metadata file may be optionally downloaded and stored in
|
||||
|
|
@ -631,6 +636,15 @@ def _update_metadata(self, metadata_role, compression=None):
|
|||
# Reference to the 'download_url_to_tempfileobj' function.
|
||||
download_file = tuf.download.download_url_to_tempfileobj
|
||||
|
||||
# Extract file length and file hashes. They will be passed as arguments
|
||||
# to 'download_file' function.
|
||||
if fileinfo is not None:
|
||||
file_length=fileinfo['length']
|
||||
file_hashes=fileinfo['hashes']
|
||||
else:
|
||||
file_length=None
|
||||
file_hashes=None
|
||||
|
||||
# Attempt a file download from each mirror until the file is downloaded and
|
||||
# verified. If the signature of the downloaded file is valid, proceed,
|
||||
# otherwise log a warning and try the next mirror. 'metadata_file_object'
|
||||
|
|
@ -642,7 +656,8 @@ def _update_metadata(self, metadata_role, compression=None):
|
|||
metadata_signable = None
|
||||
for mirror_url in get_mirrors('meta', metadata_filename.encode("utf-8"), self.mirrors):
|
||||
try:
|
||||
metadata_file_object = download_file(mirror_url)
|
||||
metadata_file_object = download_file(mirror_url, file_hashes,
|
||||
file_length)
|
||||
except tuf.DownloadError, e:
|
||||
logger.warn('Download failed from '+mirror_url+'.')
|
||||
continue
|
||||
|
|
@ -820,7 +835,8 @@ def _update_metadata_if_changed(self, metadata_role, referenced_metadata='releas
|
|||
compression = 'gzip'
|
||||
|
||||
try:
|
||||
self._update_metadata(metadata_role, compression=compression)
|
||||
self._update_metadata(metadata_role, fileinfo=new_fileinfo,
|
||||
compression=compression)
|
||||
except tuf.RepositoryError, e:
|
||||
# The current metadata we have is not current but we couldn't
|
||||
# get new metadata. We shouldn't use the old metadata anymore.
|
||||
|
|
|
|||
86
tuf/tests/statement_coverage.py
Executable file
86
tuf/tests/statement_coverage.py
Executable file
|
|
@ -0,0 +1,86 @@
|
|||
"""
|
||||
<Program Name>
|
||||
statement_coverage.py
|
||||
|
||||
<Author>
|
||||
Konstantin Andrianov
|
||||
|
||||
<Started>
|
||||
March 20, 2013.
|
||||
|
||||
<Copyright>
|
||||
See LICENSE for licensing information.
|
||||
|
||||
<Purpose>
|
||||
Measure test coverage.
|
||||
|
||||
NOTE: This script is based on third party software. In order to use this
|
||||
script install Ned Batchelder's coverage.py:
|
||||
http://nedbatchelder.com/code/coverage/
|
||||
|
||||
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
|
||||
# Try to import coverage.py. Coverage.py is a third party software.
|
||||
try:
|
||||
coverage = __import__('coverage')
|
||||
except ImportError, error:
|
||||
error_msg = ("\nIt appears that coverage.py is not installed. Install "+
|
||||
"Ned Batchelder's coverage.py "+
|
||||
"('http://nedbatchelder.com/code/coverage/') and try again.\n")
|
||||
print error_msg
|
||||
raise
|
||||
|
||||
|
||||
cov = coverage.coverage()
|
||||
cov.start()
|
||||
|
||||
try:
|
||||
current_directory = os.getcwd()
|
||||
current_directory_content = os.listdir(current_directory)
|
||||
|
||||
# Find test scripts and import them.
|
||||
test_modules = [] # Test modules.
|
||||
tested_modules = [] # Modules that are tested by the 'test_modules'.
|
||||
|
||||
for _file in current_directory_content:
|
||||
if _file.startswith('test') and _file.endswith('.py'):
|
||||
|
||||
_file = os.path.splitext(_file)[0]
|
||||
test_modules.append(_file)
|
||||
|
||||
# Import the module.
|
||||
try:
|
||||
module = __import__(_file)
|
||||
except ImportError, error:
|
||||
print 'Unable to load the module: '+_file
|
||||
raise
|
||||
|
||||
_file = '.'+_file[5:]
|
||||
tested_modules.append(_file)
|
||||
|
||||
finally:
|
||||
cov.stop()
|
||||
|
||||
|
||||
# Include quickstart.
|
||||
tested_modules.remove('.quickstart')
|
||||
tested_modules.append('quickstart')
|
||||
|
||||
# Extracting tuf modules.
|
||||
tuf_modules = {} # list of all loaded tuf modules.
|
||||
for module_name in sys.modules:
|
||||
if module_name.startswith('tuf') or module_name.startswith('quickstart'):
|
||||
tuf_modules[module_name] = sys.modules[module_name]
|
||||
|
||||
# Tested module paths.
|
||||
tested_module_paths = []
|
||||
for module_name in tuf_modules:
|
||||
for tested_module in tested_modules:
|
||||
if module_name.endswith(tested_module):
|
||||
tested_module_paths.append(tuf_modules[module_name].__file__)
|
||||
|
||||
cov.report(tested_module_paths)
|
||||
|
|
@ -1,3 +1,23 @@
|
|||
"""
|
||||
<Program Name>
|
||||
slow_retrieval_server.py
|
||||
|
||||
<Author>
|
||||
Konstantin Andrianov
|
||||
|
||||
<Started>
|
||||
March 13, 2012
|
||||
|
||||
<Copyright>
|
||||
See LICENSE for licensing information.
|
||||
|
||||
<Purpose>
|
||||
Server that throttles data by sending one byte at a time
|
||||
(specified time interval 'DELAY'). The server is used in
|
||||
test_slow_retrieval_attack.py.
|
||||
|
||||
"""
|
||||
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
|
|
@ -5,6 +25,10 @@
|
|||
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
|
||||
|
||||
|
||||
DELAY = 1
|
||||
|
||||
|
||||
|
||||
# HTTP request handler.
|
||||
class Handler(BaseHTTPRequestHandler):
|
||||
|
||||
|
|
@ -22,8 +46,7 @@ def do_GET(self):
|
|||
|
||||
# Throttle the file by sending a character every few seconds.
|
||||
for i in range(len(data)):
|
||||
print i
|
||||
time.sleep(1)
|
||||
time.sleep(DELAY)
|
||||
self.wfile.write(data[i])
|
||||
|
||||
return
|
||||
|
|
@ -42,15 +65,15 @@ def get_random_port():
|
|||
def run(port):
|
||||
server_address = ('localhost', port)
|
||||
httpd = HTTPServer(server_address, Handler)
|
||||
print('Server is active...')
|
||||
print('Slow server is active on port: '+str(port)+' ...')
|
||||
httpd.handle_request()
|
||||
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
if sys.argv[1]:
|
||||
port = int(sys.argv[1])
|
||||
else:
|
||||
port = get_random_port()
|
||||
print 'Port: '+str(port)
|
||||
run(port)
|
||||
if len(sys.argv) > 1:
|
||||
port = int(sys.argv[1])
|
||||
else:
|
||||
port = get_random_port()
|
||||
|
||||
run(port)
|
||||
|
|
@ -35,9 +35,15 @@
|
|||
util_test_tools.disable_logging()
|
||||
|
||||
|
||||
class ExtraneousDependenciesAttackAlert(Exception):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
def test_extraneous_dependencies_attack():
|
||||
|
||||
ERROR_MSG = '\tExtraneous Dependencies Attack Succeeded!\n\n'
|
||||
|
||||
try:
|
||||
|
||||
# Setup.
|
||||
|
|
@ -111,43 +117,66 @@ def _make_delegation(rolename):
|
|||
_make_delegation('role2')
|
||||
|
||||
|
||||
# The attack.
|
||||
# The attacks.
|
||||
|
||||
def _write_rogue_metadata():
|
||||
# Load the keystore before rebuilding the metadata.
|
||||
tuf.repo.keystore.load_keystore_from_keyfiles(keystore_dir,
|
||||
roles['role1']['keyid'],
|
||||
roles['role1']['password'])
|
||||
|
||||
# Rebuild the delegation role metadata.
|
||||
signerlib.build_delegated_role_file(roles['role2']['targets_dir'],
|
||||
roles['role1']['keyid'], metadata_dir,
|
||||
roles['role1']['metadata_dir'],
|
||||
'role1.txt')
|
||||
|
||||
# Update release and timestamp metadata.
|
||||
util_test_tools.make_release_meta(root_repo)
|
||||
util_test_tools.make_timestamp_meta(root_repo)
|
||||
|
||||
|
||||
# Modify a target that was delegated to 'role2'.
|
||||
util_test_tools.modify_file_at_repository(roles['role2']['target_path'],
|
||||
'Test NOT B')
|
||||
|
||||
# Load the keystore before rebuilding the metadata.
|
||||
tuf.repo.keystore.load_keystore_from_keyfiles(keystore_dir,
|
||||
roles['role1']['keyid'],
|
||||
roles['role1']['password'])
|
||||
|
||||
# Rebuild the delegation role metadata.
|
||||
signerlib.build_delegated_role_file(roles['role2']['targets_dir'],
|
||||
roles['role1']['keyid'], metadata_dir,
|
||||
roles['role1']['metadata_dir'],
|
||||
'role1.txt')
|
||||
|
||||
# Update release and timestamp metadata.
|
||||
util_test_tools.make_release_meta(root_repo)
|
||||
util_test_tools.make_timestamp_meta(root_repo)
|
||||
|
||||
# Update rogue delegatee metadata.
|
||||
_write_rogue_metadata()
|
||||
|
||||
# Perform another client download.
|
||||
try:
|
||||
urllib_tuf.urlretrieve(roles['role2']['url'], roles['role2']['dest_path'])
|
||||
except tuf.MetadataNotAvailableError, e:
|
||||
raise
|
||||
pass
|
||||
else:
|
||||
raise ExtraneousDependenciesAttackAlert(ERROR_MSG)
|
||||
|
||||
|
||||
finally:
|
||||
# Add a target file to the directory delegated to 'role2' but not 'role1'.
|
||||
util_test_tools.add_file_to_repository(roles['role2']['targets_dir'], 'AAAA')
|
||||
|
||||
# Update rogue delegatee metadata.
|
||||
_write_rogue_metadata()
|
||||
|
||||
# Perform another client download.
|
||||
try:
|
||||
urllib_tuf.urlretrieve(roles['role2']['url'], roles['role2']['dest_path'])
|
||||
except tuf.MetadataNotAvailableError, e:
|
||||
pass
|
||||
else:
|
||||
raise ExtraneousDependenciesAttackAlert(ERROR_MSG)
|
||||
|
||||
|
||||
|
||||
finally:
|
||||
server_proc.kill()
|
||||
util_test_tools.cleanup(root_repo, server_proc)
|
||||
#util_test_tools.cleanup(root_repo, server_proc)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
try:
|
||||
test_extraneous_dependencies_attack()
|
||||
except tuf.MetadataNotAvailableError, error:
|
||||
print str(error)+'\n'
|
||||
else:
|
||||
print 'Extraneous Dependencies Attack Succeeded!\n'
|
||||
except ExtraneousDependenciesAttackAlert, error:
|
||||
print 'error'
|
||||
|
|
@ -149,6 +149,7 @@ def test_indefinite_freeze_attack(TUF=False):
|
|||
except IndefiniteFreezeAttackAlert, error:
|
||||
print error
|
||||
|
||||
|
||||
try:
|
||||
test_indefinite_freeze_attack(TUF=True)
|
||||
except IndefiniteFreezeAttackAlert, error:
|
||||
|
|
|
|||
197
tuf/tests/system_tests/test_mix_and_match_attack.py
Executable file
197
tuf/tests/system_tests/test_mix_and_match_attack.py
Executable file
|
|
@ -0,0 +1,197 @@
|
|||
"""
|
||||
<Program Name>
|
||||
test_mix_and_match_attack.py
|
||||
|
||||
<Author>
|
||||
Konstantin Andrianov
|
||||
|
||||
<Started>
|
||||
March 27, 2012
|
||||
|
||||
<Copyright>
|
||||
See LICENSE for licensing information.
|
||||
|
||||
<Purpose>
|
||||
Simulate slow retrieval attack. A simple client update vs. client
|
||||
update implementing TUF.
|
||||
|
||||
In the mix-and-match attack, attacker is able to trick clients into using
|
||||
combination of metadata that never existed together on the repository at
|
||||
the same time.
|
||||
|
||||
NOTE: The interposition provided by 'tuf.interposition' is used to intercept
|
||||
all calls made by urllib/urillib2 to certain network locations specified in
|
||||
the interposition configuration file. Look up interposition.py for more
|
||||
information and illustration of a sample contents of the interposition
|
||||
configuration file. Interposition was meant to make TUF integration with an
|
||||
existing software updater an easy process. This allows for more flexibility
|
||||
to the existing software updater. However, if you are planning to solely use
|
||||
TUF there should be no need for interposition, all necessary calls will be
|
||||
generated from within TUF.
|
||||
|
||||
Note: There is no difference between 'updates' and 'target' files.
|
||||
|
||||
"""
|
||||
|
||||
|
||||
import os
|
||||
import shutil
|
||||
import urllib
|
||||
import tempfile
|
||||
|
||||
import util_test_tools
|
||||
from tuf.interposition import urllib_tuf
|
||||
|
||||
|
||||
# Disable logging.
|
||||
util_test_tools.disable_logging()
|
||||
|
||||
|
||||
|
||||
class MixAndMatchAttackAlert(Exception):
|
||||
pass
|
||||
|
||||
|
||||
def _download(url, filename, tuf=False):
|
||||
if tuf:
|
||||
urllib_tuf.urlretrieve(url, filename)
|
||||
|
||||
else:
|
||||
urllib.urlretrieve(url, filename)
|
||||
|
||||
|
||||
|
||||
def test_mix_and_match_attack(TUF=False):
|
||||
"""
|
||||
Attack design:
|
||||
There are 3 stages:
|
||||
Stage 1: Consists of a usual mode of operations using tuf. Client
|
||||
downloads a target file. (Initial download)
|
||||
|
||||
Stage 2: The target file is legitimately modified and metadata correctly
|
||||
updated. Client downloads the target file again. (Patched target download)
|
||||
|
||||
Stage 3: The target file is legitimately modified and metadata correctly
|
||||
updated again. However, before client gets to download the newly patched
|
||||
target file the attacker replaces the release metadata, targets metadata
|
||||
and the target file with the ones from stage 1 (mix-and-match attack).
|
||||
Note that timestamp metadata is untouched. Further note that same would
|
||||
happen if only target metadata, and target file are reverted.
|
||||
"""
|
||||
|
||||
ERROR_MSG = '\tMix-And-Match Attack was Successful!\n\n'
|
||||
|
||||
|
||||
try:
|
||||
# Setup / Stage 1
|
||||
# ---------------
|
||||
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF)
|
||||
reg_repo = os.path.join(root_repo, 'reg_repo')
|
||||
downloads = os.path.join(root_repo, 'downloads')
|
||||
evil_dir = tempfile.mkdtemp(dir=root_repo)
|
||||
|
||||
# Add file to 'repo' directory: {root_repo}
|
||||
filepath = util_test_tools.add_file_to_repository(reg_repo, 'A'*10)
|
||||
file_basename = os.path.basename(filepath)
|
||||
url_to_file = url+'reg_repo/'+file_basename
|
||||
downloaded_file = os.path.join(downloads, file_basename)
|
||||
|
||||
# Attacker saves the initial file.
|
||||
shutil.copy(filepath, evil_dir)
|
||||
unpatched_file = os.path.join(evil_dir, file_basename)
|
||||
|
||||
|
||||
if TUF:
|
||||
print 'TUF ...'
|
||||
tuf_repo = os.path.join(root_repo, 'tuf_repo')
|
||||
tuf_targets = os.path.join(tuf_repo, 'targets')
|
||||
metadata_dir = os.path.join(tuf_repo, 'metadata')
|
||||
release_meta_file = os.path.join(metadata_dir, 'release.txt')
|
||||
targets_meta_file = os.path.join(metadata_dir, 'targets.txt')
|
||||
target = os.path.join(tuf_targets, file_basename)
|
||||
|
||||
# Update TUF metadata before attacker modifies anything.
|
||||
util_test_tools.tuf_refresh_repo(root_repo, keyids)
|
||||
|
||||
# Attacker saves the original metadata and the target file.
|
||||
#shutil.copy(target, evil_dir)
|
||||
shutil.copy(release_meta_file, evil_dir)
|
||||
shutil.copy(targets_meta_file, evil_dir)
|
||||
#target_old = os.path.join(evil_dir, file_basename)
|
||||
release_meta_file_old = os.path.join(evil_dir, 'release.txt')
|
||||
targets_meta_file_old = os.path.join(evil_dir, 'targets.txt')
|
||||
|
||||
# Modify the url. Remember that the interposition will intercept
|
||||
# urls that have 'localhost:9999' hostname, which was specified in
|
||||
# the json interposition configuration file. Look for 'hostname'
|
||||
# in 'util_test_tools.py'. Further, the 'file_basename' is the target
|
||||
# path relative to 'targets_dir'.
|
||||
url_to_file = 'http://localhost:9999/'+file_basename
|
||||
|
||||
|
||||
# Client's initial download.
|
||||
_download(url=url_to_file, filename=downloaded_file, tuf=TUF)
|
||||
|
||||
# Stage 2
|
||||
# -------
|
||||
# Developer patches the file and updates the repository.
|
||||
util_test_tools.modify_file_at_repository(filepath, 'B'*11)
|
||||
|
||||
# Updating tuf repository. This will copy files from regular repository
|
||||
# into tuf repository and refresh the metadata
|
||||
if TUF:
|
||||
util_test_tools.tuf_refresh_repo(root_repo, keyids)
|
||||
|
||||
# Client downloads the patched file.
|
||||
_download(url=url_to_file, filename=downloaded_file, tuf=TUF)
|
||||
|
||||
downloaded_content = util_test_tools.read_file_content(downloaded_file)
|
||||
print downloaded_content
|
||||
|
||||
# Stage 3
|
||||
# -------
|
||||
# Developer patches the file and updates the repository again.
|
||||
util_test_tools.modify_file_at_repository(filepath, 'C'*10)
|
||||
|
||||
# Updating tuf repository. This will copy files from regular repository
|
||||
# into tuf repository and refresh the metadata
|
||||
if TUF:
|
||||
util_test_tools.tuf_refresh_repo(root_repo, keyids)
|
||||
|
||||
# Attacker replaces the metadata and the target file.
|
||||
shutil.copyfile(unpatched_file, target)
|
||||
shutil.copyfile(release_meta_file_old, release_meta_file)
|
||||
shutil.copyfile(targets_meta_file_old, targets_meta_file)
|
||||
|
||||
# Attacker replaces the patched file with the unpatched one.
|
||||
shutil.copyfile(unpatched_file, filepath)
|
||||
|
||||
# Client tries to downloads the newly patched file.
|
||||
_download(url=url_to_file, filename=downloaded_file, tuf=TUF)
|
||||
|
||||
# Check whether the attack succeeded by inspecting the content of the
|
||||
# update. The update should contain 'Test NOT A'.
|
||||
downloaded_content = util_test_tools.read_file_content(downloaded_file)
|
||||
if ('B'*10) != downloaded_content:
|
||||
print downloaded_content
|
||||
raise MixAndMatchAttackAlert(ERROR_MSG)
|
||||
|
||||
|
||||
finally:
|
||||
pass
|
||||
#util_test_tools.cleanup(root_repo, server_proc)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
try:
|
||||
test_mix_and_match_attack(TUF=False)
|
||||
except MixAndMatchAttackAlert, error:
|
||||
print error
|
||||
|
||||
|
||||
try:
|
||||
test_mix_and_match_attack(TUF=True)
|
||||
except MixAndMatchAttackAlert, error:
|
||||
print error
|
||||
|
|
@ -15,7 +15,11 @@
|
|||
Simulate a replay attack. A simple client update vs. client update
|
||||
implementing TUF.
|
||||
|
||||
Note: The interposition provided by 'tuf.interposition' is used to intercept
|
||||
In the replay attack an attacker is able to trick clients into installing
|
||||
software that is older than that which the client previously knew to be
|
||||
available.
|
||||
|
||||
NOTE: The interposition provided by 'tuf.interposition' is used to intercept
|
||||
all calls made by urllib/urillib2 to certain hostname specified in
|
||||
the interposition configuration file. Look up interposition.py for more
|
||||
information and illustration of a sample contents of the interposition
|
||||
|
|
@ -33,8 +37,8 @@
|
|||
import shutil
|
||||
import urllib
|
||||
import tempfile
|
||||
import util_test_tools
|
||||
|
||||
import util_test_tools
|
||||
from tuf.interposition import urllib_tuf
|
||||
|
||||
|
||||
|
|
@ -77,7 +81,7 @@ def test_replay_attack(TUF=False):
|
|||
|
||||
"""
|
||||
|
||||
ERROR_MSG = 'Replay Attack was Successful!\n'
|
||||
ERROR_MSG = '\tReplay Attack was Successful!\n\n'
|
||||
|
||||
|
||||
try:
|
||||
|
|
@ -100,6 +104,8 @@ def test_replay_attack(TUF=False):
|
|||
shutil.copy(filepath, evil_dir)
|
||||
|
||||
if TUF:
|
||||
print 'TUF ...'
|
||||
|
||||
# Update TUF metadata before attacker modifies anything.
|
||||
util_test_tools.tuf_refresh_repo(root_repo, keyids)
|
||||
|
||||
|
|
@ -126,7 +132,7 @@ def test_replay_attack(TUF=False):
|
|||
util_test_tools.modify_file_at_repository(filepath, 'Test NOT A')
|
||||
|
||||
# Updating tuf repository. This will copy files from regular repository
|
||||
# into tuf repository and refresh the metad
|
||||
# into tuf repository and refresh the metadata
|
||||
if TUF:
|
||||
util_test_tools.tuf_refresh_repo(root_repo, keyids)
|
||||
|
||||
|
|
@ -175,7 +181,6 @@ def test_replay_attack(TUF=False):
|
|||
|
||||
try:
|
||||
test_replay_attack(TUF=False)
|
||||
|
||||
except ReplayAttackAlert, error:
|
||||
print error
|
||||
|
||||
|
|
@ -183,6 +188,5 @@ def test_replay_attack(TUF=False):
|
|||
|
||||
try:
|
||||
test_replay_attack(TUF=True)
|
||||
|
||||
except ReplayAttackAlert, error:
|
||||
print error
|
||||
|
|
@ -15,7 +15,13 @@
|
|||
Simulate slow retrieval attack. A simple client update vs. client
|
||||
update implementing TUF.
|
||||
|
||||
Note: The interposition provided by 'tuf.interposition' is used to intercept
|
||||
During the slow retrieval attack, attacker is able to prevent clients from
|
||||
being aware of interference with receiving updates by responding to client
|
||||
requests so slowly that automated updates never complete.
|
||||
|
||||
NOTE: Currently TUF does not protect against slow retrieval attacks.
|
||||
|
||||
NOTE: The interposition provided by 'tuf.interposition' is used to intercept
|
||||
all calls made by urllib/urillib2 to certain network locations specified in
|
||||
the interposition configuration file. Look up interposition.py for more
|
||||
information and illustration of a sample contents of the interposition
|
||||
|
|
@ -29,17 +35,14 @@
|
|||
|
||||
"""
|
||||
|
||||
# TODO: implement slow retrieval server... And design the test.
|
||||
# Should there be a time bracket, during which the download is
|
||||
# expected to happen?
|
||||
|
||||
import os
|
||||
import time
|
||||
import urllib
|
||||
import random
|
||||
import subprocess
|
||||
import util_test_tools
|
||||
from multiprocessing import Process
|
||||
|
||||
import util_test_tools
|
||||
from tuf.interposition import urllib_tuf
|
||||
|
||||
|
||||
|
|
@ -52,29 +55,30 @@ class SlowRetrievalAttackAlert(Exception):
|
|||
pass
|
||||
|
||||
|
||||
def download_using_urlopen(url, tuf=False):
|
||||
def _download(url, filename, tuf=False):
|
||||
if tuf:
|
||||
return urllib_tuf.urlopen(url)
|
||||
urllib_tuf.urlretrieve(url, filename)
|
||||
|
||||
else:
|
||||
return urllib.urlopen(url)
|
||||
urllib.urlretrieve(url, filename)
|
||||
|
||||
|
||||
|
||||
def test_slow_retrieval_attack(TUF=True):
|
||||
def test_slow_retrieval_attack(TUF=False):
|
||||
|
||||
WAIT_TIME = 5 # Number of seconds to wait until download completes.
|
||||
ERROR_MSG = '\tSlow Retrieval Attack was Successful!\n\n'
|
||||
|
||||
# Launch the server.
|
||||
port = random.randint(30000, 45000)
|
||||
print port
|
||||
command = ['python', 'slow_retrieval_server.py', str(port)]
|
||||
server_process = subprocess.Popen(command, stderr=subprocess.PIPE)
|
||||
time.sleep(.1)
|
||||
|
||||
try:
|
||||
# Setup.
|
||||
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF, port=port)
|
||||
print 'root_repo: '+root_repo
|
||||
root_repo, url, server_proc, keyids = \
|
||||
util_test_tools.init_repo(tuf=TUF, port=port)
|
||||
reg_repo = os.path.join(root_repo, 'reg_repo')
|
||||
downloads = os.path.join(root_repo, 'downloads')
|
||||
|
||||
|
|
@ -84,6 +88,7 @@ def test_slow_retrieval_attack(TUF=True):
|
|||
url_to_file = url+'reg_repo/'+file_basename
|
||||
downloaded_file = os.path.join(downloads, file_basename)
|
||||
|
||||
|
||||
if TUF:
|
||||
print 'TUF ...'
|
||||
tuf_repo = os.path.join(root_repo, 'tuf_repo')
|
||||
|
|
@ -99,13 +104,14 @@ def test_slow_retrieval_attack(TUF=True):
|
|||
url_to_file = 'http://localhost:9999/'+file_basename
|
||||
|
||||
|
||||
|
||||
# Download the content of the file using the server.
|
||||
# NOTE: if TUF is enabled the metadata files will be downloaded first. This
|
||||
# WILL take a long time.
|
||||
file_content = download_using_urlopen(url_to_file, tuf=TUF)
|
||||
|
||||
print file_content.read()
|
||||
# Client tries to download.
|
||||
# NOTE: if TUF is enabled the metadata files will be downloaded first.
|
||||
proc = Process(target=_download, args=(url_to_file, downloaded_file, TUF))
|
||||
proc.start()
|
||||
proc.join(WAIT_TIME)
|
||||
if proc.exitcode is None:
|
||||
proc.terminate()
|
||||
raise SlowRetrievalAttackAlert(ERROR_MSG)
|
||||
|
||||
|
||||
finally:
|
||||
|
|
@ -116,4 +122,16 @@ def test_slow_retrieval_attack(TUF=True):
|
|||
util_test_tools.cleanup(root_repo, server_proc)
|
||||
|
||||
|
||||
test_slow_retrieval_attack()
|
||||
|
||||
|
||||
|
||||
try:
|
||||
test_slow_retrieval_attack(TUF=False)
|
||||
except SlowRetrievalAttackAlert, error:
|
||||
print error
|
||||
|
||||
|
||||
try:
|
||||
test_slow_retrieval_attack(TUF=True)
|
||||
except SlowRetrievalAttackAlert, error:
|
||||
print error
|
||||
Loading…
Reference in a new issue