Resolve conflicts from demo 2 updates

This commit is contained in:
vladdd 2013-09-23 13:38:18 -04:00
commit 7d691011e5
20 changed files with 452 additions and 274 deletions

View file

@ -15,27 +15,25 @@
Simulate an arbitrary package attack. A simple client update vs. client
update implementing TUF.
Note: The interposition provided by 'tuf.interposition' is used to intercept
all calls made by urllib/urillib2 to certain hostnames specified in
the interposition configuration file. Look up interposition.py for more
information and illustration of a sample contents of the interposition
configuration file. Interposition was meant to make TUF integration with an
existing software updater an easy process. This allows for more flexibility
to the existing software updater. However, if you are planning to solely use
TUF there should be no need for interposition, all necessary calls will be
generated from within TUF.
Note: The interposition provided by 'tuf.interposition' is used to intercept
all calls made by urllib/urillib2 to certain hostnames specified in
the interposition configuration file. Look up interposition.py for more
information and illustration of a sample contents of the interposition
configuration file. Interposition was meant to make TUF integration with an
existing software updater an easy process. This allows for more flexibility
to the existing software updater. However, if you are planning to solely use
TUF there should be no need for interposition, all necessary calls will be
generated from within TUF.
Note: There is no difference between 'updates' and 'target' files.
There is no difference between 'updates' and 'target' files.
"""
import os
import shutil
import urllib
import tempfile
import tuf
import tuf.interposition.urllib_tuf as urllib_tuf
import tuf.interposition
import tuf.tests.util_test_tools as util_test_tools
@ -45,9 +43,9 @@ class ArbitraryPackageAlert(Exception):
def _download(url, filename, tuf=False):
if tuf:
urllib_tuf.urlretrieve(url, filename)
def _download(url, filename, using_tuf=False):
if using_tuf:
tuf.interposition.urllib_tuf.urlretrieve(url, filename)
else:
urllib.urlretrieve(url, filename)
@ -56,24 +54,23 @@ def _download(url, filename, tuf=False):
def test_arbitrary_package_attack(TUF=False):
def test_arbitrary_package_attack(using_tuf=False):
"""
<Arguments>
TUF:
If set to 'False' all directories that start with 'tuf_' are ignored,
indicating that tuf is not implemented.
<Purpose>
Illustrate arbitrary package attack vulnerability.
<Arguments>
using_tuf:
If set to 'False' all directories that start with 'tuf_' are ignored,
indicating that tuf is not implemented.
"""
ERROR_MSG = 'Arbitrary Package Attack was Successful!\n'
ERROR_MSG = 'Arbitrary Package Attack was Successful!'
try:
# Setup.
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF)
root_repo, url, server_proc, keyids = util_test_tools.init_repo(using_tuf)
reg_repo = os.path.join(root_repo, 'reg_repo')
tuf_repo = os.path.join(root_repo, 'tuf_repo')
downloads = os.path.join(root_repo, 'downloads')
@ -85,7 +82,7 @@ def test_arbitrary_package_attack(TUF=False):
url_to_repo = url+'reg_repo/'+file_basename
downloaded_file = os.path.join(downloads, file_basename)
if TUF:
if using_tuf:
# Update TUF metadata before attacker modifies anything.
util_test_tools.tuf_refresh_repo(root_repo, keyids)
@ -108,13 +105,15 @@ def test_arbitrary_package_attack(TUF=False):
try:
# Client downloads (tries to download) the file.
_download(url=url_to_repo, filename=downloaded_file, tuf=TUF)
_download(url_to_repo, downloaded_file, using_tuf)
except tuf.DownloadError:
# If tuf.DownloadError is raised, this means that TUF has prevented
# the download of an unrecognized file. Enable the logging to see,
# what actually happened.
pass
except tuf.NoWorkingMirrorError, error:
# We only set up one mirror, so if it fails, we expect a
# NoWorkingMirrorError. If TUF has worked as intended, the mirror error
# contained within should be a BadHashError.
mirror_error = error.mirror_errors[url+'tuf_repo/targets/'+file_basename]
assert isinstance(mirror_error, tuf.BadHashError)
else:
# Check whether the attack succeeded by inspecting the content of the
@ -131,17 +130,24 @@ def test_arbitrary_package_attack(TUF=False):
print 'Attempting arbitrary package attack without TUF:'
try:
test_arbitrary_package_attack(TUF=False)
test_arbitrary_package_attack(using_tuf=False)
except ArbitraryPackageAlert, error:
print error
else:
print 'Extraneous dependency attack failed.'
print
print 'Attempting arbitrary package attack with TUF:'
try:
test_arbitrary_package_attack(TUF=True)
test_arbitrary_package_attack(using_tuf=True)
except ArbitraryPackageAlert, error:
print error
else:
print 'Extraneous dependency attack failed.'
print

View file

@ -107,7 +107,7 @@ def setUp(self):
version = version+1
expiration = tuf.formats.format_time(time.time()+86400)
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=True)
root_repo, url, server_proc, keyids = util_test_tools.init_repo(using_tuf=True)
# Server side repository.
tuf_repo = os.path.join(root_repo, 'tuf_repo')
@ -331,14 +331,29 @@ def make_targets_metadata(self):
self.T2_metadata =\
make_metadata(self.tuf_repo, self.signed_targets[self.T2],
version, expiration)
self.T3_metadata = \
self.T3_metadata =\
make_metadata(self.tuf_repo, self.signed_targets[self.T3],
version, expiration)
def test_that_initial_update_fails_with_undelegated_signing_of_targets(self):
# Expect to see a particular exception on initial update.
self.assertRaises(tuf.MetadataNotAvailableError, self.do_update)
"""We expect to see ForbiddenTargetError on initial update because
delegated targets roles sign for targets that they were not delegated
to."""
# http://docs.python.org/2/library/unittest.html#unittest.TestCase.assertRaises
with self.assertRaises(tuf.NoWorkingMirrorError) as context_manager:
self.do_update()
mirror_errors = context_manager.exception.mirror_errors
forbidden_target_error = False
for mirror_url, mirror_error in mirror_errors.iteritems():
if isinstance(mirror_error, tuf.ForbiddenTargetError):
forbidden_target_error = True
break
self.assertEqual(forbidden_target_error, True)
@ -455,8 +470,22 @@ def make_targets_metadata(self):
def test_that_initial_update_fails_with_many_roles_sharing_a_target(self):
# Expect to see a particular exception on initial update.
self.assertRaises(tuf.DownloadError, self.do_update)
"""We expect to see BadHashError on initial update because the hash
metadata mismatches the target."""
# http://docs.python.org/2/library/unittest.html#unittest.TestCase.assertRaises
with self.assertRaises(tuf.NoWorkingMirrorError) as context_manager:
self.do_update()
mirror_errors = context_manager.exception.mirror_errors
bad_hash_error = False
for mirror_url, mirror_error in mirror_errors.iteritems():
if isinstance(mirror_error, tuf.BadHashError):
bad_hash_error = True
break
self.assertEqual(bad_hash_error, True)

View file

@ -37,7 +37,7 @@
import urllib
import tuf
import tuf.interposition.urllib_tuf as urllib_tuf
import tuf.interposition
import tuf.tests.util_test_tools as util_test_tools
@ -47,32 +47,31 @@ class EndlessDataAttack(Exception):
def _download(url, filename, TUF=False):
if TUF:
urllib_tuf.urlretrieve(url, filename)
def _download(url, filename, using_tuf=False):
if using_tuf:
tuf.interposition.urllib_tuf.urlretrieve(url, filename)
else:
urllib.urlretrieve(url, filename)
def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False):
def test_endless_data_attack(using_tuf=False, TIMESTAMP=False):
"""
<Arguments>
TUF:
If set to 'False' all directories that start with 'tuf_' are ignored,
indicating that tuf is not implemented.
<Purpose>
Illustrate endless data attack vulnerability.
<Arguments>
using_tuf:
If set to 'False' all directories that start with 'tuf_' are ignored,
indicating that tuf is not implemented.
"""
ERROR_MSG = 'Endless Data Attack was Successful!\n'
try:
# Setup.
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF)
root_repo, url, server_proc, keyids = util_test_tools.init_repo(using_tuf)
reg_repo = os.path.join(root_repo, 'reg_repo')
tuf_repo = os.path.join(root_repo, 'tuf_repo')
downloads = os.path.join(root_repo, 'downloads')
@ -91,7 +90,7 @@ def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False):
noisy_data = 'X'*100000
if TUF:
if using_tuf:
# Update TUF metadata before attacker modifies anything.
util_test_tools.tuf_refresh_repo(root_repo, keyids)
# Modify the url. Remember that the interposition will intercept
@ -126,11 +125,11 @@ def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False):
# Client downloads (tries to download) the file.
try:
_download(url=url_to_repo, filename=downloaded_file, TUF=TUF)
_download(url_to_repo, downloaded_file, using_tuf)
except Exception, exception:
# Because we are extending the true timestamp TUF metadata with invalid
# JSON, we except to catch an error about invalid metadata JSON.
if TUF and TIMESTAMP:
if using_tuf and TIMESTAMP:
endless_data_attack = False
for mirror_url, mirror_error in exception.mirror_errors.iteritems():
@ -146,7 +145,7 @@ def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False):
# When we test downloading "endless" timestamp with TUF, we want to skip
# the following test because downloading the timestamp should have failed.
if not (TUF and TIMESTAMP):
if not (using_tuf and TIMESTAMP):
# Check whether the attack succeeded by inspecting the content of the
# update. The update should contain 'Test A'. Technically it suffices
# to check whether the file was downloaded or not.
@ -162,12 +161,12 @@ def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False):
try:
test_arbitrary_package_attack(TUF=False, TIMESTAMP=False)
test_endless_data_attack(using_tuf=False, TIMESTAMP=False)
except EndlessDataAttack, error:
print('Endless data attack worked on download without TUF!')
try:
test_arbitrary_package_attack(TUF=True, TIMESTAMP=False)
test_endless_data_attack(using_tuf=True, TIMESTAMP=False)
except EndlessDataAttack, error:
print('Endless data attack worked on download without TUF!')
print(str(error))
@ -177,12 +176,9 @@ def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False):
try:
# This test fails because the timestamp metadata has been extended with
# random data from its true length, thereby resulting in invalid JSON.
test_arbitrary_package_attack(TUF=True, TIMESTAMP=True)
test_endless_data_attack(using_tuf=True, TIMESTAMP=True)
except EndlessDataAttack, error:
print('Endless data attack worked on download without TUF!')
print(str(error))
else:
print('Endless data attack did not work on download with TUF!')

View file

@ -38,12 +38,10 @@
import os
import shutil
import urllib
import tempfile
import tuf
import tuf.interposition.urllib_tuf as urllib_tuf
import tuf.interposition
import tuf.tests.util_test_tools as util_test_tools
@ -52,63 +50,72 @@ class ExtraneousDependencyAlert(Exception):
# Interpret the contents of the file it downloads as a list of dependent
# files from the same repository.
def _download(url, filename, directory, TUF=False):
# Interpret anything following 'requires:' in the contents of the file it
# downloads as a comma-separated list of dependent files from the same repository.
def _download(url, filename, directory, using_tuf=False):
destination = os.path.join(directory, filename)
if TUF:
urllib_tuf.urlretrieve(url, destination)
if using_tuf:
tuf.interposition.urllib_tuf.urlretrieve(url, destination)
else:
urllib.urlretrieve(url, destination)
if util_test_tools.read_file_content(destination) != '':
required_files = util_test_tools.read_file_content(destination).split(',')
file_contents = util_test_tools.read_file_content(destination)
# Parse the list of required files (if it exists) and download them.
if file_contents.find('requires:') != -1:
required_files = file_contents[file_contents.find('requires:') + 9:].split(',')
for required_filename in required_files:
required_file_url = os.path.dirname(url)+os.sep+required_filename
_download(required_file_url, required_filename, directory, TUF)
_download(required_file_url, required_filename, directory, using_tuf)
def test_extraneous_dependency_attack(TUF=False):
def test_extraneous_dependency_attack(using_tuf=False):
"""
<Purpose>
Illustrate arbitrary package attack vulnerability.
Illustrate extraneous dependency attack vulnerability.
<Arguments>
TUF:
using_tuf:
If set to 'False' all directories that start with 'tuf_' are ignored,
indicating that tuf is not implemented.
"""
ERROR_MSG = 'Extraneous Dependency Attack was Successful!\n'
ERROR_MSG = 'Extraneous Dependency Attack was Successful!'
try:
# Setup.
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF)
root_repo, url, server_proc, keyids = util_test_tools.init_repo(using_tuf)
reg_repo = os.path.join(root_repo, 'reg_repo')
tuf_repo = os.path.join(root_repo, 'tuf_repo')
downloads = os.path.join(root_repo, 'downloads')
targets_dir = os.path.join(tuf_repo, 'targets')
# Add files to 'repo' directory: {root_repo}.
good_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo, '')
good_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo,
'the file you need')
good_dependency_basename = os.path.basename(good_dependency_filepath)
bad_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo, '')
bad_dependency_filepath = util_test_tools.add_file_to_repository(reg_repo,
'the file you don\'t need')
bad_dependency_basename = os.path.basename(bad_dependency_filepath)
# The dependent file lists the good dependency.
dependent_filepath = util_test_tools.add_file_to_repository(reg_repo,
good_dependency_basename)
'requires:'+good_dependency_basename)
dependent_basename = os.path.basename(dependent_filepath)
url_to_repo = url+'reg_repo/'+dependent_basename
modified_dependency_list = good_dependency_basename+','+\
bad_dependency_basename
if TUF:
# List the bad dependency first. If an attacker modifies a target by
# simply appending the file contents, tuf.download will ignore the appended
# data, downloading only as much data as the TUF metadata says the target
# should contain.
modified_dependency_list = bad_dependency_basename+','+\
good_dependency_basename
if using_tuf:
# Update TUF metadata before attacker modifies anything.
util_test_tools.tuf_refresh_repo(root_repo, keyids)
@ -119,28 +126,32 @@ def test_extraneous_dependency_attack(TUF=False):
# path relative to 'targets_dir'.
url_to_repo = 'http://localhost:9999/'+dependent_basename
# Attacker adds the dependency in the targets repository.
target = os.path.join(targets_dir, dependent_basename)
util_test_tools.modify_file_at_repository(target,
modified_dependency_list)
# Attacker modifies the depenent file in the targets repository, adding
# the bad dependency to its list.
dependent_target_filepath = os.path.join(targets_dir, dependent_basename)
util_test_tools.modify_file_at_repository(dependent_target_filepath,
'requires:'+modified_dependency_list)
# Attacker adds the dependency in the regular repository.
# Attacker modifies the depenent file in the regular repository, adding
# the bad dependency to its list.
util_test_tools.modify_file_at_repository(dependent_filepath,
modified_dependency_list)
'requires:'+modified_dependency_list)
# End of Setup.
try:
# Client downloads (tries to download) the file.
_download(url=url_to_repo, filename=dependent_basename,
directory=downloads, TUF=TUF)
_download(url_to_repo, dependent_basename, downloads, using_tuf)
except tuf.DownloadError:
# If tuf.DownloadError is raised, this means that TUF has prevented
# the download of an unrecognized file. Enable the logging to see,
# what actually happened.
pass
except tuf.NoWorkingMirrorError, error:
# We only set up one mirror, so if it fails, we expect a
# NoWorkingMirrorError. If TUF has worked as intended, the mirror error
# contained within should be a BadHashError.
mirror_error = \
error.mirror_errors[url+'tuf_repo/targets/'+dependent_basename]
assert isinstance(mirror_error, tuf.BadHashError)
else:
# Check if the legitimate dependency was downloaded.
@ -158,16 +169,20 @@ def test_extraneous_dependency_attack(TUF=False):
print 'Attempting extraneous dependency attack without TUF:'
try:
test_extraneous_dependency_attack(TUF=False)
test_extraneous_dependency_attack(using_tuf=False)
except ExtraneousDependencyAlert, error:
print error
else:
print 'Extraneous dependency attack failed.'
print
print 'Attempting extraneous dependency attack with TUF:'
try:
test_extraneous_dependency_attack(TUF=True)
test_extraneous_dependency_attack(using_tuf=True)
except ExtraneousDependencyAlert, error:
print error
else:
print 'Extraneous dependency attack failed.'
print

View file

@ -28,7 +28,7 @@
import tuf
import tuf.formats
import tuf.interposition.urllib_tuf as urllib_tuf
import tuf.interposition
import tuf.repo.signerlib as signerlib
import tuf.tests.util_test_tools as util_test_tools
@ -61,9 +61,9 @@ def _remake_timestamp(metadata_dir, keyids):
def _download(url, filename, tuf=False):
if tuf:
urllib_tuf.urlretrieve(url, filename)
def _download(url, filename, using_tuf=False):
if using_tuf:
tuf.interposition.urllib_tuf.urlretrieve(url, filename)
else:
urllib.urlretrieve(url, filename)
@ -72,10 +72,10 @@ def _download(url, filename, tuf=False):
def test_indefinite_freeze_attack(TUF=False):
def test_indefinite_freeze_attack(using_tuf=False):
"""
<Arguments>
TUF:
using_tuf:
If set to 'False' all directories that start with 'tuf_' are ignored,
indicating that tuf is not implemented.
@ -88,7 +88,7 @@ def test_indefinite_freeze_attack(TUF=False):
try:
# Setup.
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF)
root_repo, url, server_proc, keyids = util_test_tools.init_repo(using_tuf)
reg_repo = os.path.join(root_repo, 'reg_repo')
tuf_repo = os.path.join(root_repo, 'tuf_repo')
metadata_dir = os.path.join(tuf_repo, 'metadata')
@ -100,7 +100,7 @@ def test_indefinite_freeze_attack(TUF=False):
url_to_repo = url+'reg_repo/'+file_basename
downloaded_file = os.path.join(downloads, file_basename)
if TUF:
if using_tuf:
print 'TUF ...'
# Update TUF metadata before attacker modifies anything.
@ -117,26 +117,25 @@ def test_indefinite_freeze_attack(TUF=False):
_remake_timestamp(metadata_dir, keyids)
# Client performs initial download.
# Client performs initial download. If the computer is slow, it may
# take longer time than expiration time. In this case you will see
# an ExpiredMetadataError.
try:
_download(url=url_to_repo, filename=downloaded_file, tuf=TUF)
except tuf.ExpiredMetadataError:
msg = ('Metadata has expired too soon, extend expiration period. '+
'Current expiration is set to: '+repr(EXPIRATION)+' second(s).')
sys.exit(msg)
# Expire timestamp.
time.sleep(EXPIRATION)
# Try downloading again, this should raise an error.
try:
_download(url=url_to_repo, filename=downloaded_file, tuf=TUF)
except tuf.ExpiredMetadataError, error:
pass
_download(url_to_repo, downloaded_file, using_tuf)
except:
print 'Initial download failed! It may be because your machine is '+ \
'busy. Try again later.'
else:
raise IndefiniteFreezeAttackAlert(ERROR_MSG)
# Expire timestamp.
time.sleep(EXPIRATION)
# Try downloading again, this should raise an error.
try:
_download(url_to_repo, downloaded_file, using_tuf)
except tuf.ExpiredMetadataError, error:
print 'Caught an expiration error!'
else:
raise IndefiniteFreezeAttackAlert(ERROR_MSG)
finally:
util_test_tools.cleanup(root_repo, server_proc)
@ -145,12 +144,12 @@ def test_indefinite_freeze_attack(TUF=False):
try:
test_indefinite_freeze_attack(TUF=False)
test_indefinite_freeze_attack(using_tuf=False)
except IndefiniteFreezeAttackAlert, error:
print error
try:
test_indefinite_freeze_attack(TUF=True)
test_indefinite_freeze_attack(using_tuf=True)
except IndefiniteFreezeAttackAlert, error:
print error

View file

@ -19,17 +19,17 @@
combination of metadata that never existed together on the repository at
the same time.
NOTE: The interposition provided by 'tuf.interposition' is used to intercept
all calls made by urllib/urillib2 to certain network locations specified in
the interposition configuration file. Look up interposition.py for more
information and illustration of a sample contents of the interposition
configuration file. Interposition was meant to make TUF integration with an
existing software updater an easy process. This allows for more flexibility
to the existing software updater. However, if you are planning to solely use
TUF there should be no need for interposition, all necessary calls will be
generated from within TUF.
NOTE: The interposition provided by 'tuf.interposition' is used to intercept
all calls made by urllib/urillib2 to certain network locations specified in
the interposition configuration file. Look up interposition.py for more
information and illustration of a sample contents of the interposition
configuration file. Interposition was meant to make TUF integration with an
existing software updater an easy process. This allows for more flexibility
to the existing software updater. However, if you are planning to solely use
TUF there should be no need for interposition, all necessary calls will be
generated from within TUF.
Note: There is no difference between 'updates' and 'target' files.
There is no difference between 'updates' and 'target' files.
"""
@ -38,9 +38,10 @@
import shutil
import urllib
import tempfile
import time
import tuf
import tuf.interposition.urllib_tuf as urllib_tuf
import tuf.interposition
import tuf.tests.util_test_tools as util_test_tools
@ -48,16 +49,16 @@ class MixAndMatchAttackAlert(Exception):
pass
def _download(url, filename, tuf=False):
if tuf:
urllib_tuf.urlretrieve(url, filename)
def _download(url, filename, using_tuf=False):
if using_tuf:
tuf.interposition.urllib_tuf.urlretrieve(url, filename)
else:
urllib.urlretrieve(url, filename)
def test_mix_and_match_attack(TUF=False):
def test_mix_and_match_attack(using_tuf=False):
"""
Attack design:
There are 3 stages:
@ -81,7 +82,7 @@ def test_mix_and_match_attack(TUF=False):
try:
# Setup / Stage 1
# ---------------
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF)
root_repo, url, server_proc, keyids = util_test_tools.init_repo(using_tuf)
reg_repo = os.path.join(root_repo, 'reg_repo')
downloads = os.path.join(root_repo, 'downloads')
evil_dir = tempfile.mkdtemp(dir=root_repo)
@ -97,7 +98,7 @@ def test_mix_and_match_attack(TUF=False):
unpatched_file = os.path.join(evil_dir, file_basename)
if TUF:
if using_tuf:
print 'TUF ...'
tuf_repo = os.path.join(root_repo, 'tuf_repo')
tuf_targets = os.path.join(tuf_repo, 'targets')
@ -125,8 +126,10 @@ def test_mix_and_match_attack(TUF=False):
url_to_file = 'http://localhost:9999/'+file_basename
# Wait for some time to let program set up local http server
time.sleep(1)
# Client's initial download.
_download(url=url_to_file, filename=downloaded_file, tuf=TUF)
_download(url_to_file, downloaded_file, using_tuf)
# Stage 2
# -------
@ -135,11 +138,11 @@ def test_mix_and_match_attack(TUF=False):
# Updating tuf repository. This will copy files from regular repository
# into tuf repository and refresh the metadata
if TUF:
if using_tuf:
util_test_tools.tuf_refresh_repo(root_repo, keyids)
# Client downloads the patched file.
_download(url=url_to_file, filename=downloaded_file, tuf=TUF)
_download(url_to_file, downloaded_file, using_tuf)
downloaded_content = util_test_tools.read_file_content(downloaded_file)
@ -150,7 +153,7 @@ def test_mix_and_match_attack(TUF=False):
# Updating tuf repository. This will copy files from regular repository
# into tuf repository and refresh the metadata
if TUF:
if using_tuf:
util_test_tools.tuf_refresh_repo(root_repo, keyids)
# Attacker replaces the metadata and the target file.
@ -163,9 +166,11 @@ def test_mix_and_match_attack(TUF=False):
# Client tries to downloads the newly patched file.
try:
_download(url=url_to_file, filename=downloaded_file, tuf=TUF)
except tuf.MetadataNotAvailableError:
pass
_download(url_to_file, downloaded_file, using_tuf)
except tuf.NoWorkingMirrorError as errors:
for mirror_url, mirror_error in errors.mirror_errors.iteritems():
if type(mirror_error) == tuf.BadHashError:
print 'Caught a Bad Hash Error!'
# Check whether the attack succeeded by inspecting the content of the
# update. The update should contain 'Test NOT A'.
@ -182,12 +187,12 @@ def test_mix_and_match_attack(TUF=False):
try:
test_mix_and_match_attack(TUF=False)
test_mix_and_match_attack(using_tuf=False)
except MixAndMatchAttackAlert, error:
print error
try:
test_mix_and_match_attack(TUF=True)
test_mix_and_match_attack(using_tuf=True)
except MixAndMatchAttackAlert, error:
print error

View file

@ -1,3 +1,5 @@
#!/usr/bin/env python
"""
<Program Name>
test_replay_attack.py
@ -38,23 +40,20 @@
import urllib
import tempfile
import tuf.interposition.urllib_tuf as urllib_tuf
import tuf.interposition
import tuf.tests.util_test_tools as util_test_tools
class TestSetupError(Exception):
pass
class TestSetupError(Exception): pass
class ReplayAttackAlert(Exception): pass
class ReplayAttackAlert(Exception):
pass
def _download(url, filename, tuf=False):
if tuf:
urllib_tuf.urlretrieve(url, filename)
def _download(url, filename, using_tuf=False):
if using_tuf:
tuf.interposition.urllib_tuf.urlretrieve(url, filename)
else:
urllib.urlretrieve(url, filename)
@ -63,10 +62,10 @@ def _download(url, filename, tuf=False):
def test_replay_attack(TUF=False):
def test_replay_attack(using_tuf=False):
"""
<Arguments>
TUF:
using_tuf:
If set to 'False' all directories that start with 'tuf_' are ignored,
indicating that tuf is not implemented.
@ -76,32 +75,35 @@ def test_replay_attack(TUF=False):
"""
ERROR_MSG = '\tReplay Attack was Successful!\n\n'
FIRST_CONTENT = 'Test A'
SECOND_CONTENT = 'Test B'
try:
# Setup.
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF)
root_repo, url, server_proc, keyids = \
util_test_tools.init_repo(using_tuf=using_tuf)
reg_repo = os.path.join(root_repo, 'reg_repo')
tuf_repo = os.path.join(root_repo, 'tuf_repo')
tuf_repo_copy = os.path.join(root_repo, 'tuf_repo_copy')
downloads = os.path.join(root_repo, 'downloads')
tuf_targets = os.path.join(tuf_repo, 'targets')
# Add file to 'repo' directory: {root_repo}
filepath = util_test_tools.add_file_to_repository(reg_repo, 'Test A')
filepath = util_test_tools.add_file_to_repository(reg_repo, FIRST_CONTENT)
file_basename = os.path.basename(filepath)
url_to_repo = url+'reg_repo/'+file_basename
downloaded_file = os.path.join(downloads, file_basename)
# Attacker saves the original file into 'evil_dir'.
evil_dir = tempfile.mkdtemp(dir=root_repo)
vulnerable_file = os.path.join(evil_dir, file_basename)
original_file = os.path.join(evil_dir, file_basename)
shutil.copy(filepath, evil_dir)
if TUF:
print 'TUF ...'
if using_tuf:
# Update TUF metadata before attacker modifies anything.
util_test_tools.tuf_refresh_repo(root_repo, keyids)
# Copy the first version of the repository for replay later.
shutil.copytree(tuf_repo, tuf_repo_copy)
# Modify the url. Remember that the interposition will intercept
# urls that have 'localhost:9999' hostname, which was specified in
@ -109,62 +111,68 @@ def test_replay_attack(TUF=False):
# in 'util_test_tools.py'. Further, the 'file_basename' is the target
# path relative to 'targets_dir'.
url_to_repo = 'http://localhost:9999/'+file_basename
# End of Setup.
# Client performs initial update.
_download(url=url_to_repo, filename=downloaded_file, tuf=TUF)
_download(url=url_to_repo, filename=downloaded_file, using_tuf=using_tuf)
# Downloads are stored in the same directory '{root_repo}/downloads/'
# for regular and tuf clients.
downloaded_content = util_test_tools.read_file_content(downloaded_file)
if 'Test A' != downloaded_content:
raise TestSetupError('[Initial Updata] Failed to download the file.')
if FIRST_CONTENT != downloaded_content:
raise TestSetupError('[Initial Update] Failed to download the file.')
# Developer patches the file and updates the repository.
util_test_tools.modify_file_at_repository(filepath, 'Test NOT A')
util_test_tools.modify_file_at_repository(filepath, SECOND_CONTENT)
# Updating tuf repository. This will copy files from regular repository
# into tuf repository and refresh the metadata
if TUF:
if using_tuf:
util_test_tools.tuf_refresh_repo(root_repo, keyids)
# Client downloads the patched file.
_download(url=url_to_repo, filename=downloaded_file, tuf=TUF)
_download(url=url_to_repo, filename=downloaded_file, using_tuf=using_tuf)
# Content of the downloaded file.
downloaded_content = util_test_tools.read_file_content(downloaded_file)
if 'Test NOT A' != downloaded_content:
if SECOND_CONTENT != downloaded_content:
raise TestSetupError('[Update] Failed to update the file.')
# Attacker tries to be clever, he manages to modifies regular and tuf
# targets directory by replacing a patched file with an old one.
if os.path.isdir(tuf_targets):
target = os.path.join(tuf_targets, file_basename)
util_test_tools.delete_file_at_repository(target)
shutil.copy(vulnerable_file, tuf_targets)
# Verify that 'target' is an old, un-patched file.
target = os.path.join(tuf_targets, file_basename)
target_content = util_test_tools.read_file_content(target)
if 'Test A' != target_content:
raise TestSetupError("The 'target' file contains new data!")
if using_tuf:
# Delete the current TUF repository...
shutil.rmtree(tuf_repo)
# ...and replace it with a previous copy.
shutil.move(tuf_repo_copy, tuf_repo)
else:
# Delete the current file...
util_test_tools.delete_file_at_repository(filepath)
shutil.copy(vulnerable_file, reg_repo)
# ...and replace it with a previous copy.
shutil.copy(original_file, reg_repo)
try:
# Client downloads the file once more.
_download(url=url_to_repo, filename=downloaded_file, using_tuf=using_tuf)
except tuf.NoWorkingMirrorError, exception:
replayed_metadata_attack = False
# Client downloads the file once more.
_download(url=url_to_repo, filename=downloaded_file, tuf=TUF)
# Check whether the attack succeeded by inspecting the content of the
# update. The update should contain 'Test NOT A'.
downloaded_content = util_test_tools.read_file_content(downloaded_file)
if 'Test NOT A' != downloaded_content:
raise ReplayAttackAlert(ERROR_MSG)
for mirror_url, mirror_error in exception.mirror_errors.iteritems():
if isinstance(mirror_error, tuf.ReplayedMetadataError):
replayed_metadata_attack = True
break
# In case we did not detect what was likely a replayed metadata attack,
# we reraise the exception to indicate that replayed metadata attack
# detection failed.
if not replayed_metadata_attack: raise
else:
# Check whether the attack succeeded by inspecting the content of the
# update. The update should contain 'Test NOT A'.
downloaded_content = util_test_tools.read_file_content(downloaded_file)
# If we ended up downloading replayed content, then we failed.
if FIRST_CONTENT == downloaded_content:
raise ReplayAttackAlert(ERROR_MSG)
finally:
util_test_tools.cleanup(root_repo, server_proc)
@ -174,13 +182,19 @@ def test_replay_attack(TUF=False):
try:
test_replay_attack(TUF=False)
except ReplayAttackAlert, error:
print error
test_replay_attack(using_tuf=False)
except ReplayAttackAlert, exception:
print('Download without TUF fell prey to replayed metadata attack.')
try:
test_replay_attack(TUF=True)
except ReplayAttackAlert, error:
print error
try:
test_replay_attack(using_tuf=True)
except ReplayAttackAlert, exception:
print('Download with TUF fell prey to replayed metadata attack!')
except Exception, exception:
print('Download with TUF failed due to: '+str(exception))
else:
print('Download with TUF defended against replayed metadata attack.')
except Exception, exception:
print('Download without TUF failed due to: '+str(exception))
else:
print('Download without TUF did NOT fail due to replayed metadata attack!')

View file

@ -49,7 +49,7 @@
import urllib
import tuf.interposition.urllib_tuf as urllib_tuf
import tuf.interposition
import tuf.tests.util_test_tools as util_test_tools
@ -57,10 +57,10 @@ class SlowRetrievalAttackAlert(Exception):
pass
def _download(url, filename, TUF=False):
if TUF:
def _download(url, filename, using_tuf=False):
if using_tuf:
try:
urllib_tuf.urlretrieve(url, filename)
tuf.interposition.urllib_tuf.urlretrieve(url, filename)
except tuf.NoWorkingMirrorError, exception:
slow_retrieval = False
for mirror_url, mirror_error in exception.mirror_errors.iteritems():
@ -82,10 +82,10 @@ def _download(url, filename, TUF=False):
def test_slow_retrieval_attack(TUF=False, mode=None):
def test_slow_retrieval_attack(using_tuf=False, mode=None):
WAIT_TIME = 60 # Number of seconds to wait until download completes.
ERROR_MSG = 'Slow retrieval attack succeeded (TUF: '+str(TUF)+', mode: '+\
ERROR_MSG = 'Slow retrieval attack succeeded (using_tuf: '+str(using_tuf)+', mode: '+\
str(mode)+').'
# Launch the server.
@ -97,7 +97,7 @@ def test_slow_retrieval_attack(TUF=False, mode=None):
try:
# Setup.
root_repo, url, server_proc, keyids = \
util_test_tools.init_repo(tuf=TUF, port=port)
util_test_tools.init_repo(using_tuf, port=port)
reg_repo = os.path.join(root_repo, 'reg_repo')
downloads = os.path.join(root_repo, 'downloads')
@ -107,8 +107,7 @@ def test_slow_retrieval_attack(TUF=False, mode=None):
url_to_file = url+'reg_repo/'+file_basename
downloaded_file = os.path.join(downloads, file_basename)
if TUF:
if using_tuf:
tuf_repo = os.path.join(root_repo, 'tuf_repo')
# Update TUF metadata before attacker modifies anything.
@ -124,7 +123,7 @@ def test_slow_retrieval_attack(TUF=False, mode=None):
# Client tries to download.
# NOTE: if TUF is enabled the metadata files will be downloaded first.
proc = Process(target=_download, args=(url_to_file, downloaded_file, TUF))
proc = Process(target=_download, args=(url_to_file, downloaded_file, using_tuf))
proc.start()
proc.join(WAIT_TIME)
@ -147,25 +146,25 @@ def test_slow_retrieval_attack(TUF=False, mode=None):
# mode_2: During the download process, the server blocks the download
# by sending just several characters every few seconds.
try:
test_slow_retrieval_attack(TUF=False, mode = "mode_1")
test_slow_retrieval_attack(using_tuf=False, mode = "mode_1")
except SlowRetrievalAttackAlert, error:
print(error)
print()
try:
test_slow_retrieval_attack(TUF=False, mode = "mode_2")
test_slow_retrieval_attack(using_tuf=False, mode = "mode_2")
except SlowRetrievalAttackAlert, error:
print(error)
print()
try:
test_slow_retrieval_attack(TUF=True, mode = "mode_1")
test_slow_retrieval_attack(using_tuf=True, mode = "mode_1")
except SlowRetrievalAttackAlert, error:
print(error)
print()
try:
test_slow_retrieval_attack(TUF=True, mode = "mode_2")
test_slow_retrieval_attack(using_tuf=True, mode = "mode_2")
except SlowRetrievalAttackAlert, error:
print(error)
print()

View file

@ -350,7 +350,6 @@ def tearDownModule():
tuf.repo.keystore.clear_keystore()
# Run the unit tests.
if __name__ == '__main__':
unittest.main()

View file

@ -898,6 +898,7 @@ def test_5_all_targets(self):
# returns each filepath listed in 'self.all_role_paths' in the listed
# order.
self._mock_download_url_to_tempfileobj(self.all_role_paths)
setup.build_server_repository(self.server_repo_dir, self.targets_dir)
# Update top-level metadata.
self.Repository.refresh()

View file

@ -34,7 +34,7 @@ def setUp(self):
tuf.repo.keystore.clear_keystore()
# Unpacking necessary parameters returned from init_repo()
essential_params = util_test_tools.init_repo(tuf=True)
essential_params = util_test_tools.init_repo(using_tuf=True)
self.root_repo = essential_params[0]
self.url = essential_params[1]
self.server_proc = essential_params[2]

View file

@ -21,6 +21,8 @@
"""
import urlparse
# Import 'tuf.formats' if a module tries to import the
# entire tuf package (i.e., from tuf import *).
__all__ = ['formats']
@ -78,7 +80,14 @@ class UnsupportedAlgorithmError(Error):
class BadHashError(Error):
"""Indicate an error while checking the value a hash object."""
pass
def __init__(self, expected_hash, observed_hash):
self.expected_hash = expected_hash
self.observed_hash = observed_hash
def __str__(self):
return 'Observed hash ('+str(self.observed_hash)+\
') != expected hash ('+str(self.expected_hash)+')'
@ -118,7 +127,12 @@ class ForbiddenTargetError(RepositoryError):
class ExpiredMetadataError(Error):
"""Indicate that a TUF Metadata file has expired."""
pass
def __init__(self, expiry_time):
self.expiry_time = expiry_time # UTC
def __str__(self):
return 'Metadata expired on '+str(self.expiry_time)+'.'
@ -134,9 +148,9 @@ def __init__(self, metadata_role, previous_version, current_version):
def __str__(self):
return str(self.metadata_role)+' is older than the version currently'+\
'installed.\nDownloaded version: '+repr(self.previous_version)+'\n'+\
'Current version: '+repr(self.current_version)
return 'Downloaded '+str(self.metadata_role)+' is older ('+\
str(self.previous_version)+') than the version currently '+\
'installed ('+repr(self.current_version)+').'
@ -152,7 +166,12 @@ class CryptoError(Error):
class BadSignatureError(CryptoError):
"""Indicate that some metadata file had a bad signature."""
pass
def __init__(self, metadata_role_name):
self.metadata_role_name = metadata_role_name
def __str__(self):
return str(self.metadata_role_name)+' metadata has bad signature!'
@ -278,9 +297,18 @@ def __init__(self, mirror_errors):
self.mirror_errors = mirror_errors
def __str__(self):
return str(self.mirror_errors)
all_errors = 'No working mirror was found:'
for mirror_url, mirror_error in self.mirror_errors.iteritems():
try:
# http://docs.python.org/2/library/urlparse.html#urlparse.urlparse
mirror_url_tokens = urlparse.urlparse(mirror_url)
except:
logging.exception('Failed to parse mirror URL: '+str(mirror_url))
mirror_netloc = mirror_url
else:
mirror_netloc = mirror_url_tokens.netloc
all_errors += '\n '+str(mirror_netloc)+': '+str(mirror_error)
return all_errors

View file

@ -627,8 +627,7 @@ def __check_hashes(self, file_object, trusted_hashes):
digest_object.update(file_object.read())
computed_hash = digest_object.hexdigest()
if trusted_hash != computed_hash:
raise tuf.BadHashError('Hashes do not match! Expected '+
trusted_hash+' got '+computed_hash)
raise tuf.BadHashError(trusted_hash, computed_hash)
else:
logger.info('The file\'s '+algorithm+' hash is correct: '+trusted_hash)
@ -835,7 +834,7 @@ def __verify_uncompressed_metadata_file(self, metadata_file_object,
# Verify the signature on the downloaded metadata object.
valid = tuf.sig.verify(metadata_signable, metadata_role)
if not valid:
raise tuf.BadSignatureError()
raise tuf.BadSignatureError(metadata_role)
@ -1021,7 +1020,7 @@ def __get_file(self, filepath, verify_uncompressed_file, file_type,
except Exception, exception:
# Remember the error from this mirror, and "reset" the target file.
logger.exception('Download failed from '+file_mirror+'.')
logger.exception('Update failed from '+file_mirror+'.')
file_mirror_errors[file_mirror] = exception
file_object = None
else:
@ -1030,8 +1029,8 @@ def __get_file(self, filepath, verify_uncompressed_file, file_type,
if file_object:
return file_object
else:
logger.exception('Failed to download {0}: {1}'.format(filepath,
file_mirror_errors))
logger.exception('Failed to update {0} from all mirrors: {1}'.format(
filepath, file_mirror_errors))
raise tuf.NoWorkingMirrorError(file_mirror_errors)
@ -1742,9 +1741,11 @@ def _ensure_not_expired(self, metadata_role):
# an exception. 'expires' is in YYYY-MM-DD HH:MM:SS format, so
# convert it to seconds since the epoch, which is the time format
# returned by time.time() (i.e., current time), before comparing.
if tuf.formats.parse_time(expires) < time.time():
message = 'Metadata '+repr(rolepath)+' expired on '+repr(expires)+'.'
raise tuf.ExpiredMetadataError(message)
current_time = time.time()
expiry_time = tuf.formats.parse_time(expires)
if expiry_time < current_time:
logger.error('Metadata '+repr(rolepath)+' expired on '+repr(expires)+'.')
raise tuf.ExpiredMetadataError(expires)

View file

@ -72,15 +72,16 @@
_FORMAT_STRING = '[%(asctime)s UTC] [%(name)s] [%(levelname)s]'+\
'[%(funcName)s:%(lineno)s@%(filename)s] %(message)s'
# Ask all Formatter instances to talk GMT.
# http://docs.python.org/2/library/logging.html#logging.Formatter.formatException
logging.Formatter.converter = time.gmtime
formatter = logging.Formatter(_FORMAT_STRING)
# Set the handlers for the logger. The console handler is unset by default. A
# Set the handlers for the logger. The console handler is unset by default. A
# module importing 'log.py' should explicitly set the console handler if
# outputting log messages to the screen is needed. Adding a console handler
# can be done with tuf.log.add_console_handler(). Logging messages to a file
# *is* set by default.
# outputting log messages to the screen is needed. Adding a console handler can
# be done with tuf.log.add_console_handler(). Logging messages to a file *is*
# set by default.
console_handler = None
# Set the built-in file handler. Messages will be logged to
@ -104,6 +105,54 @@
class ConsoleFilter(logging.Filter):
def filter(self, record):
"""
<Purpose>
Use Vinay Sajip's recommendation from Python issue #6435 to modify a
LogRecord object. This is meant to be used with our console handler.
http://stackoverflow.com/q/6177520
http://stackoverflow.com/q/5875225
http://bugs.python.org/issue6435
http://docs.python.org/2/howto/logging-cookbook.html#filters-contextual
http://docs.python.org/2/library/logging.html#logrecord-attributes
<Arguments>
record:
A logging.LogRecord object.
<Exceptions>
None.
<Side Effects>
Replaces the LogRecord exception text attribute.
<Returns>
True.
"""
# If this LogRecord object has an exception, then we will replace its text.
if record.exc_info:
# We place the record's cached exception text (which usually contains the
# exception traceback) with much simpler exception information. This is
# most useful for the console handler, which we do not wish to deluge
# with too much data. Assuming that this filter is not applied to the
# file logging handler, the user may always consult the file log for the
# original exception traceback. The exc_info is explained here:
# http://docs.python.org/2/library/sys.html#sys.exc_info
exc_type, exc_value, exc_traceback = record.exc_info
# Simply set the class name as the exception text.
record.exc_text = exc_type.__name__
# Always return True to signal that any given record must be formatted.
return True
def set_log_level(log_level=_DEFAULT_LOG_LEVEL):
"""
<Purpose>
@ -201,6 +250,7 @@ def set_console_log_level(log_level=_DEFAULT_CONSOLE_LOG_LEVEL):
def add_console_handler(log_level=_DEFAULT_CONSOLE_LOG_LEVEL):
"""
<Purpose>
@ -223,14 +273,41 @@ def add_console_handler(log_level=_DEFAULT_CONSOLE_LOG_LEVEL):
"""
# Assign to the global console_handler object.
global console_handler
# Does 'log_level' have the correct format?
# Raise 'tuf.FormatError' if there is a mismatch.
tuf.formats.LENGTH_SCHEMA.check_match(log_level)
# Set the console handler for the logger. The built-in console handler will
# log messages to 'sys.stderr' and capture 'log_level' messages.
if not console_handler:
# Set the console handler for the logger. The built-in console handler will
# log messages to 'sys.stderr' and capture 'log_level' messages.
# NOTE: This is not thread-safe.
console_handler = logging.StreamHandler()
# Get our filter for the console handler.
console_filter = ConsoleFilter()
console_handler.setLevel(log_level)
console_handler.setFormatter(formatter)
console_handler.addFilter(console_filter)
logger.addHandler(console_handler)
logger.debug('Added a console handler.')
else:
logger.warn('We already have a console handler.')
def remove_console_handler():
# Assign to the global console_handler object.
global console_handler
console_handler = logging.StreamHandler()
console_handler.setLevel(log_level)
console_handler.setFormatter(formatter)
logger.addHandler(console_handler)
if console_handler:
logger.removeHandler(console_handler)
# NOTE: This is not thread-safe.
console_handler = None
logger.debug('Removed a console handler.')
else:
logger.warn('We do not have a console handler.')

View file

@ -72,6 +72,7 @@
import tuf.util
import tuf.conf
# See 'log.py' to learn how logging is handled in TUF.
logger = logging.getLogger('tuf.keystore')
@ -116,7 +117,7 @@
# {keyid: key,
# keyid2: key2,
# ...}
_keystore = {}
keystore = {}
def add_rsakey(rsakey_dict, password, keyid=None):
@ -535,6 +536,7 @@ def _generate_derived_key(password, salt=None, iterations=None):
if iterations is None:
iterations = _PBKDF2_ITERATIONS
def pseudorandom_function(password, salt):
"""
PyCrypto's PBKDF2() expects a callable function for its optional

View file

@ -37,7 +37,8 @@
json = tuf.util.import_json()
# Recommended RSA key sizes: http://www.rsa.com/rsalabs/node.asp?id=2004
# Recommended RSA key sizes:
# http://www.emc.com/emc-plus/rsa-labs/historical/twirl-and-rsa-key-size.htm#table1
# According to the document above, revised May 6, 2003, RSA keys of
# size 3072 provide security through 2031 and beyond. 2048-bit keys
# are the recommended minimum and are good from the present through 2030.
@ -746,6 +747,9 @@ def generate_and_save_rsa_key(keystore_directory, password,
bits:
The key size, or key length, of the RSA key.
If 'bits' is unspecified, a 3072-bit RSA key is generated, which is the
key size recommended by TUF, although 2048-bit keys are accepted
(minimum key size).
<Exceptions>
tuf.FormatError, if 'bits' or 'password' does not have the

View file

@ -69,7 +69,8 @@
_KEY_ID_HASH_ALGORITHM = 'sha256'
# Recommended RSA key sizes: http://www.rsa.com/rsalabs/node.asp?id=2004
# Recommended RSA key sizes:
# http://www.emc.com/emc-plus/rsa-labs/historical/twirl-and-rsa-key-size.htm#table1
# According to the document above, revised May 6, 2003, RSA keys of
# size 3072 provide security through 2031 and beyond.
_DEFAULT_RSA_KEY_BITS = 3072
@ -87,16 +88,21 @@ def generate(bits=_DEFAULT_RSA_KEY_BITS):
'private': '-----BEGIN RSA PRIVATE KEY----- ...'}}
The public and private keys are in PEM format and stored as strings.
Although the crytography library called sets a 1024-bit minimum key size,
generate() enforces a minimum key size of 2048 bits. If 'bits' is
unspecified, a 3072-bit RSA key is generated, which is the key size
recommended by TUF.
<Arguments>
bits:
The key size, or key length, of the RSA key. 'bits' must be 1024, or
The key size, or key length, of the RSA key. 'bits' must be 2048, or
greater, and a multiple of 256.
<Exceptions>
ValueError, if an exception occurs after calling the RSA key generation
routine. 'bits' must be 1024, or greater, and a multiple of 256.
Raised by Cryptography library.
routine. 'bits' must be a multiple of 256. The 'ValueError' exception is
raised by the key generation function of the cryptography library called.
tuf.FormatError, if 'bits' does not contain the correct format.
@ -121,8 +127,9 @@ def generate(bits=_DEFAULT_RSA_KEY_BITS):
keytype = 'rsa'
# Generate the public and private RSA keys. The PyCrypto module performs
# the actual key generation. Raise 'ValueError' if 'bits' is less than 1024
# or not a multiple of 256.
# the actual key generation. Raise 'ValueError' if 'bits' is less than 1024
# or not a multiple of 256, although a 2048-bit minimum is enforced by
# tuf.formats.RSAKEYBITS_SCHEMA.check_match().
rsa_key_object = Crypto.PublicKey.RSA.generate(bits)
# Extract the public & private halves of the RSA key and generate their

0
tuf/tests/repository_setup.py Executable file → Normal file
View file

1
tuf/tests/unittest_toolbox.py Executable file → Normal file
View file

@ -30,7 +30,6 @@
import tuf.rsa_key as rsa_key
import tuf.repo.keystore as keystore
# Modify the number of iterations (from the higher default count) so the unit
# tests run faster.
keystore._PBKDF2_ITERATIONS = 1000

View file

@ -81,7 +81,7 @@
previous metadata files.
<Methods>
init_repo(tuf=True):
init_repo(using_tuf=True):
Initializes the repositories (depicted in the diagram above) and
starts the server process. init_repo takes one boolean argument
which when True sets-up tuf repository i.e. adds all of the
@ -154,11 +154,7 @@
tuf_configurations = None
def disable_console_logging():
tuf.log.logger.removeHandler(tuf.log.console_handler)
def init_repo(tuf=False, port=None):
def init_repo(using_tuf=False, port=None):
# Temp root directory for regular and tuf repositories.
# WARNING: tuf client stores files in '{root_repo}/downloads/' directory!
# Make sure regular download are NOT stored in the that directory when
@ -188,8 +184,9 @@ def init_repo(tuf=False, port=None):
time.sleep(.2)
keyids = None
if tuf:
disable_console_logging()
if using_tuf:
# We remove the console handler so that tests are silent by default.
tuf.log.remove_console_handler()
keyids = init_tuf(root_repo)
create_interposition_config(root_repo, url)