mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Fix variable shadow bug, use absolute import in two integration tests.
This commit is contained in:
parent
4c81247bc9
commit
b5263e4322
2 changed files with 80 additions and 83 deletions
|
|
@ -37,7 +37,7 @@
|
|||
import urllib
|
||||
|
||||
import tuf
|
||||
import tuf.interposition.urllib_tuf as urllib_tuf
|
||||
import tuf.interposition
|
||||
import tuf.tests.util_test_tools as util_test_tools
|
||||
|
||||
|
||||
|
|
@ -49,13 +49,13 @@ class EndlessDataAttack(Exception):
|
|||
|
||||
def _download(url, filename, TUF=False):
|
||||
if TUF:
|
||||
urllib_tuf.urlretrieve(url, filename)
|
||||
tuf.interposition.urllib_tuf.urlretrieve(url, filename)
|
||||
else:
|
||||
urllib.urlretrieve(url, filename)
|
||||
|
||||
|
||||
|
||||
def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False):
|
||||
def test_endless_data_attack(TUF=False, TIMESTAMP=False):
|
||||
"""
|
||||
<Arguments>
|
||||
TUF:
|
||||
|
|
@ -69,105 +69,102 @@ def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False):
|
|||
|
||||
ERROR_MSG = 'Endless Data Attack was Successful!\n'
|
||||
|
||||
# Setup.
|
||||
root_repo, url, server_proc, keyids = util_test_tools.init_repo(using_tuf=TUF)
|
||||
reg_repo = os.path.join(root_repo, 'reg_repo')
|
||||
tuf_repo = os.path.join(root_repo, 'tuf_repo')
|
||||
downloads = os.path.join(root_repo, 'downloads')
|
||||
tuf_targets = os.path.join(tuf_repo, 'targets')
|
||||
|
||||
try:
|
||||
# Setup.
|
||||
root_repo, url, server_proc, keyids = util_test_tools.init_repo(tuf=TUF)
|
||||
reg_repo = os.path.join(root_repo, 'reg_repo')
|
||||
tuf_repo = os.path.join(root_repo, 'tuf_repo')
|
||||
downloads = os.path.join(root_repo, 'downloads')
|
||||
tuf_targets = os.path.join(tuf_repo, 'targets')
|
||||
# Original data.
|
||||
INTENDED_DATA = 'Test A'
|
||||
|
||||
# Original data.
|
||||
INTENDED_DATA = 'Test A'
|
||||
|
||||
# Add a file to 'repo' directory: {root_repo}
|
||||
filepath = util_test_tools.add_file_to_repository(reg_repo, INTENDED_DATA)
|
||||
file_basename = os.path.basename(filepath)
|
||||
url_to_repo = url+'reg_repo/'+file_basename
|
||||
downloaded_file = os.path.join(downloads, file_basename)
|
||||
# We do not deliver truly endless data, but we will extend the original
|
||||
# file by many bytes.
|
||||
noisy_data = 'X'*100000
|
||||
# Add a file to 'repo' directory: {root_repo}
|
||||
filepath = util_test_tools.add_file_to_repository(reg_repo, INTENDED_DATA)
|
||||
file_basename = os.path.basename(filepath)
|
||||
url_to_repo = url+'reg_repo/'+file_basename
|
||||
downloaded_file = os.path.join(downloads, file_basename)
|
||||
# We do not deliver truly endless data, but we will extend the original
|
||||
# file by many bytes.
|
||||
noisy_data = 'X'*100000
|
||||
|
||||
|
||||
if TUF:
|
||||
# Update TUF metadata before attacker modifies anything.
|
||||
util_test_tools.tuf_refresh_repo(root_repo, keyids)
|
||||
# Modify the url. Remember that the interposition will intercept
|
||||
# urls that have 'localhost:9999' hostname, which was specified in
|
||||
# the json interposition configuration file. Look for 'hostname'
|
||||
# in 'util_test_tools.py'. Further, the 'file_basename' is the target
|
||||
# path relative to 'targets_dir'.
|
||||
url_to_repo = 'http://localhost:9999/'+file_basename
|
||||
if TUF:
|
||||
# Update TUF metadata before attacker modifies anything.
|
||||
util_test_tools.tuf_refresh_repo(root_repo, keyids)
|
||||
# Modify the url. Remember that the interposition will intercept
|
||||
# urls that have 'localhost:9999' hostname, which was specified in
|
||||
# the json interposition configuration file. Look for 'hostname'
|
||||
# in 'util_test_tools.py'. Further, the 'file_basename' is the target
|
||||
# path relative to 'targets_dir'.
|
||||
url_to_repo = 'http://localhost:9999/'+file_basename
|
||||
|
||||
# Attacker modifies the file at the targets repository.
|
||||
target = os.path.join(tuf_targets, file_basename)
|
||||
original_data = util_test_tools.read_file_content(target)
|
||||
larger_original_data = original_data + noisy_data
|
||||
util_test_tools.modify_file_at_repository(target, larger_original_data)
|
||||
|
||||
# Attacker modifies the timestamp.txt metadata.
|
||||
if TIMESTAMP:
|
||||
metadata = os.path.join(tuf_repo, 'metadata')
|
||||
timestamp = os.path.join(metadata, 'timestamp.txt')
|
||||
original_data = util_test_tools.read_file_content(timestamp)
|
||||
larger_original_data = original_data + noisy_data
|
||||
util_test_tools.modify_file_at_repository(timestamp,
|
||||
larger_original_data)
|
||||
|
||||
# Attacker modifies the file at the regular repository.
|
||||
original_data = util_test_tools.read_file_content(filepath)
|
||||
# Attacker modifies the file at the targets repository.
|
||||
target = os.path.join(tuf_targets, file_basename)
|
||||
original_data = util_test_tools.read_file_content(target)
|
||||
larger_original_data = original_data + noisy_data
|
||||
util_test_tools.modify_file_at_repository(filepath, larger_original_data)
|
||||
util_test_tools.modify_file_at_repository(target, larger_original_data)
|
||||
|
||||
# End Setup.
|
||||
# Attacker modifies the timestamp.txt metadata.
|
||||
if TIMESTAMP:
|
||||
metadata = os.path.join(tuf_repo, 'metadata')
|
||||
timestamp = os.path.join(metadata, 'timestamp.txt')
|
||||
original_data = util_test_tools.read_file_content(timestamp)
|
||||
larger_original_data = original_data + noisy_data
|
||||
util_test_tools.modify_file_at_repository(timestamp,
|
||||
larger_original_data)
|
||||
|
||||
# Attacker modifies the file at the regular repository.
|
||||
original_data = util_test_tools.read_file_content(filepath)
|
||||
larger_original_data = original_data + noisy_data
|
||||
util_test_tools.modify_file_at_repository(filepath, larger_original_data)
|
||||
|
||||
# End Setup.
|
||||
|
||||
|
||||
# Client downloads (tries to download) the file.
|
||||
try:
|
||||
_download(url=url_to_repo, filename=downloaded_file, TUF=TUF)
|
||||
except Exception, exception:
|
||||
# Because we are extending the true timestamp TUF metadata with invalid
|
||||
# JSON, we except to catch an error about invalid metadata JSON.
|
||||
if TUF and TIMESTAMP:
|
||||
endless_data_attack = False
|
||||
# Client downloads (tries to download) the file.
|
||||
try:
|
||||
_download(url=url_to_repo, filename=downloaded_file, TUF=TUF)
|
||||
except Exception, exception:
|
||||
# Because we are extending the true timestamp TUF metadata with invalid
|
||||
# JSON, we except to catch an error about invalid metadata JSON.
|
||||
if TUF and TIMESTAMP:
|
||||
endless_data_attack = False
|
||||
|
||||
for mirror_url, mirror_error in exception.mirror_errors.iteritems():
|
||||
if isinstance(mirror_error, tuf.InvalidMetadataJSONError):
|
||||
endless_data_attack = True
|
||||
break
|
||||
for mirror_url, mirror_error in exception.mirror_errors.iteritems():
|
||||
if isinstance(mirror_error, tuf.InvalidMetadataJSONError):
|
||||
endless_data_attack = True
|
||||
break
|
||||
|
||||
# In case we did not detect what was likely an endless data attack, we
|
||||
# reraise the exception to indicate that endless data attack detection
|
||||
# failed.
|
||||
if not endless_data_attack: raise
|
||||
else: raise
|
||||
# In case we did not detect what was likely an endless data attack, we
|
||||
# reraise the exception to indicate that endless data attack detection
|
||||
# failed.
|
||||
if not endless_data_attack: raise
|
||||
else: raise
|
||||
|
||||
# When we test downloading "endless" timestamp with TUF, we want to skip
|
||||
# the following test because downloading the timestamp should have failed.
|
||||
if not (TUF and TIMESTAMP):
|
||||
# Check whether the attack succeeded by inspecting the content of the
|
||||
# update. The update should contain 'Test A'. Technically it suffices
|
||||
# to check whether the file was downloaded or not.
|
||||
downloaded_content = util_test_tools.read_file_content(downloaded_file)
|
||||
if downloaded_content != INTENDED_DATA:
|
||||
raise EndlessDataAttack(ERROR_MSG)
|
||||
# When we test downloading "endless" timestamp with TUF, we want to skip
|
||||
# the following test because downloading the timestamp should have failed.
|
||||
if not (TUF and TIMESTAMP):
|
||||
# Check whether the attack succeeded by inspecting the content of the
|
||||
# update. The update should contain 'Test A'. Technically it suffices
|
||||
# to check whether the file was downloaded or not.
|
||||
downloaded_content = util_test_tools.read_file_content(downloaded_file)
|
||||
if downloaded_content != INTENDED_DATA:
|
||||
raise EndlessDataAttack(ERROR_MSG)
|
||||
|
||||
finally:
|
||||
util_test_tools.cleanup(root_repo, server_proc)
|
||||
util_test_tools.cleanup(root_repo, server_proc)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
try:
|
||||
test_arbitrary_package_attack(TUF=False, TIMESTAMP=False)
|
||||
test_endless_data_attack(TUF=False, TIMESTAMP=False)
|
||||
except EndlessDataAttack, error:
|
||||
print('Endless data attack worked on download without TUF!')
|
||||
|
||||
try:
|
||||
test_arbitrary_package_attack(TUF=True, TIMESTAMP=False)
|
||||
test_endless_data_attack(TUF=True, TIMESTAMP=False)
|
||||
except EndlessDataAttack, error:
|
||||
print('Endless data attack worked on download without TUF!')
|
||||
print(str(error))
|
||||
|
|
@ -177,7 +174,7 @@ def test_arbitrary_package_attack(TUF=False, TIMESTAMP=False):
|
|||
try:
|
||||
# This test fails because the timestamp metadata has been extended with
|
||||
# random data from its true length, thereby resulting in invalid JSON.
|
||||
test_arbitrary_package_attack(TUF=True, TIMESTAMP=True)
|
||||
test_endless_data_attack(TUF=True, TIMESTAMP=True)
|
||||
except EndlessDataAttack, error:
|
||||
print('Endless data attack worked on download without TUF!')
|
||||
print(str(error))
|
||||
|
|
|
|||
|
|
@ -49,7 +49,7 @@
|
|||
import urllib
|
||||
|
||||
|
||||
import tuf.interposition.urllib_tuf as urllib_tuf
|
||||
import tuf.interposition
|
||||
import tuf.tests.util_test_tools as util_test_tools
|
||||
|
||||
|
||||
|
|
@ -60,7 +60,7 @@ class SlowRetrievalAttackAlert(Exception):
|
|||
def _download(url, filename, TUF=False):
|
||||
if TUF:
|
||||
try:
|
||||
urllib_tuf.urlretrieve(url, filename)
|
||||
tuf.interposition.urllib_tuf.urlretrieve(url, filename)
|
||||
except tuf.NoWorkingMirrorError, exception:
|
||||
slow_retrieval = False
|
||||
for mirror_url, mirror_error in exception.mirror_errors.iteritems():
|
||||
|
|
@ -97,7 +97,7 @@ def test_slow_retrieval_attack(TUF=False, mode=None):
|
|||
try:
|
||||
# Setup.
|
||||
root_repo, url, server_proc, keyids = \
|
||||
util_test_tools.init_repo(tuf=TUF, port=port)
|
||||
util_test_tools.init_repo(using_tuf=TUF, port=port)
|
||||
reg_repo = os.path.join(root_repo, 'reg_repo')
|
||||
downloads = os.path.join(root_repo, 'downloads')
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue