Merge branch 'develop' of https://github.com/theupdateframework/tuf into develop

This commit is contained in:
Vladimir Diaz 2015-05-03 14:37:22 -04:00
commit c94c3b6464
69 changed files with 2198 additions and 429 deletions

View file

@ -6,7 +6,9 @@ Konstantin Andrianov
Martin Peck
Monzur Muhammad
Nick Mathewson
Pankhuri Goyal
Roger Dingledine
Ruben Pollan
Santiago Torres
Sebastian Hahn
Tian Tian

View file

@ -139,6 +139,8 @@ Installation
pip - installing and managing Python packages (recommended)
Installing from Python Package Index (https://pypi.python.org/pypi).
Note: Please use "pip install --no-use-wheel tuf" if your version
of pip <= 1.5.6
$ pip install tuf
Installing from local source archive.

View file

@ -18,13 +18,13 @@ The following are some of the known attacks on software update systems, includin
* **Extraneous dependencies attacks**. An attacker indicates to clients that in order to install the software they wanted, they also need to install unrelated software. This unrelated software can be from a trusted source but may have known vulnerabilities that are exploitable by the attacker.
* **Mix-and-match attacks**. An attacker presents clients with a view of a repository that includes files that never existed together on the repository at the same time. This can result in, for example, outdated versions of dependencies being installed.
* **Mix-and-match attacks**. An attacker presents clients with a view of a repository that includes files that did not exist together on the repository at the same time. This can result in, for example, outdated versions of dependencies being installed.
* **Wrong software installation**. An attacker provides a client with a trusted file that is not the one the client wanted.
* **Malicious mirrors preventing updates**. An attacker in control of one repository mirror is able to prevent users from obtaining updates from other, good mirrors.
* **Vulnerability to key compromises**. At attacker who is able to compromise a single key or less than a given threshold of keys can compromise clients. This includes relying on a single online key (such as only being protected by SSL) or a single offline key (such as most software update systems use to sign files).
* **Vulnerability to key compromises**. An attacker who is able to compromise a single key or less than a given threshold of keys can compromise clients. This includes relying on a single online key (such as only being protected by SSL) or a single offline key (such as most software update systems use to sign files).
##Design Concepts
@ -55,7 +55,7 @@ File integrity is important both with respect to single files as well as collect
## Freshness
As software updates often fix security bugs, it is important that software update systems be able to obtain the latest versions of files that are available. An attacker may want to trick a client into installing outdated versions of software or even just convince a client that no updates are available.
As software updates often fix security bugs, it is important for software update systems to be able to obtain the latest versions of files that are available. An attacker may want to trick a client into installing outdated versions of software or even just convince a client that no updates are available.
Ensuring freshness means to:

View file

@ -551,6 +551,10 @@
METAPATH is the the metadata file's path on the repository relative to the
metadata base URL.
The HASHES and LENGTH are the hashes and length of the file. LENGTH is an
integer. HASHES is a dictionary that specifies one or more hashes, including
the cryptographic hash function. For example: { "sha256": HASH, ... }
4.5. File formats: targets.json and delegated target roles
@ -578,8 +582,7 @@
It is allowed to have a TARGETS object with no TARGETPATH elements. This
can be used to indicate that no target files are available.
The HASH and LENGTH are the hash and length of the target file. If
defined, the elements and values of "custom" will be made available to the
If defined, the elements and values of "custom" will be made available to the
client application. The information in "custom" is opaque to the framework
and can include version numbers, dependencies, requirements, and any other
data that the application wants to include to describe the file at
@ -592,13 +595,16 @@
KEYID : KEY,
... },
"roles" : [{
"name": ROLE,
"name": ROLENAME,
"keyids" : [ KEYID, ... ] ,
"threshold" : THRESHOLD,
("path_hash_prefixes" : [ HEX_DIGEST, ... ] |
"paths" : [ PATHPATTERN, ... ])
}, ... ]
}
ROLENAME is the full name of the delegated role. For example,
"targets/projects"
In order to discuss target paths, a role MUST specify only one of the
"path_hash_prefixes" or "paths" attributes, each of which we discuss next.
@ -613,10 +619,6 @@
split a large number of targets into separate bins identified by consistent
hashing.
TODO: Should the TUF spec restrict the repository to one particular
algorithm? Should we allow the repository to specify in the role dictionary
the algorithm used for these generated hashed paths?
The "paths" list describes paths that the role is trusted to provide.
Clients MUST check that a target is in one of the trusted paths of all roles
in a delegation chain, not just in a trusted path of the role that describes

View file

@ -62,6 +62,8 @@
# modules.
random.shuffle(tests_without_extension)
suite = unittest.TestLoader().loadTestsFromNames(tests_without_extension)
unittest.TextTestRunner(verbosity=2).run(suite)
if __name__ == '__main__':
suite = unittest.TestLoader().loadTestsFromNames(tests_without_extension)
all_tests_passed = unittest.TextTestRunner(verbosity=2).run(suite).wasSuccessful()
if not all_tests_passed:
sys.exit(1)

View file

@ -135,9 +135,12 @@
repository.timestamp.expiration = datetime.datetime(2030, 1, 1, 0, 0)
repository.targets('role1').expiration = datetime.datetime(2030, 1, 1, 0, 0)
# Compress the 'targets.json' role so that the unit tests have a pre-generated
# example of compressed metadata.
# Compress the top-level role metadata so that the unit tests have a
# pre-generated example of compressed metadata.
repository.root.compressions = ['gz']
repository.targets.compressions = ['gz']
repository.snapshot.compressions = ['gz']
repository.timestamp.compressions = ['gz']
# Create the actual metadata files, which are saved to 'metadata.staged'.
if not options.dry_run:

Binary file not shown.

View file

@ -237,6 +237,12 @@ def test_https_connection(self):
def test__get_content_length(self):
content_length = \
tuf.download._get_content_length({'bad_connection_object': 8})
self.assertEqual(content_length, None)
# Run unit test.
if __name__ == '__main__':
unittest.main()

52
tests/test_init.py Executable file
View file

@ -0,0 +1,52 @@
#!/usr/bin/env python
"""
<Program Name>
test_init.py
<Author>
Vladimir Diaz
<Started>
March 30, 2015.
<Copyright>
See LICENSE for licensing information.
<Purpose>
Test cases for __init__.py (mainly the exceptions defined there).
"""
# Help with Python 3 compatibility, where the print statement is a function, an
# implicit relative import is invalid, and the '/' operator performs true
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import unittest
import logging
import tuf
logger = logging.getLogger('tuf.test_keys')
class TestInit(unittest.TestCase):
def setUp(self):
pass
def test_bad_signature_error(self):
bad_signature_error = tuf.BadSignatureError('bad_role')
logger.error(bad_signature_error)
def test_slow_retrieval_error(self):
slow_signature_error = tuf.SlowRetrievalError('bad_role')
logger.error(slow_signature_error)
# Run the unit tests.
if __name__ == '__main__':
unittest.main()

472
tests/test_interpose_updater.py Executable file
View file

@ -0,0 +1,472 @@
#!/usr/bin/env python
"""
<Program Name>
test_interpose_updater.py
<Author>
Pankhuri Goyal <pankhurigoyal02@gmail.com>
<Started>
August 2014.
<Copyright>
See LICENSE for licensing information.
<Purpose>
Unit test for 'tuf.interposition.updater.py'.
"""
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import os
import sys
import tempfile
import subprocess
import random
import shutil
import logging
import time
import copy
import json
import tuf
import tuf.util
import tuf.conf
import tuf.log
import tuf.interposition.updater as updater
import tuf.interposition.configuration as configuration
import tuf.unittest_toolbox as unittest_toolbox
if sys.version_info >= (2, 7):
import unittest
else:
import unittest2 as unittest
logger = logging.getLogger('tuf.test_interpose_updater')
class TestUpdaterController(unittest_toolbox.Modified_TestCase):
@classmethod
def setUpClass(cls):
# This method is called before tests in individual class are executed.
# Create a temporary directory to store the repository, metadata, and
# target files. 'temporary_directory' must be deleted in TearDownModule()
# so that temporary files are always removed, even when exceptions occur.
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
# Launch a SimpleHTTPServer (serves files in the current directory).
# Test cases will request metadata and target files that have been
# pre-generated in 'tuf/tests/repository_data', which will be served
# by the SimpleHTTPServer launched here. The test cases of
# 'test_updater.py' assume the pre-generated metadata files have a specific
# structure, such as a delegated role 'targets/role1', three target
# files, five key files, etc.
cls.SERVER_PORT = random.randint(30000, 45000)
command = ['python', 'simple_server.py', str(cls.SERVER_PORT)]
cls.server_process = subprocess.Popen(command, stderr=subprocess.PIPE)
logger.info('\n\tServer process started.')
logger.info('\tServer process id: '+str(cls.server_process.pid))
logger.info('\tServing on port: '+str(cls.SERVER_PORT))
cls.url = 'http://localhost:'+str(cls.SERVER_PORT) + os.path.sep
time.sleep(1)
@classmethod
def tearDownClass(cls):
# Remove the temporary directory after all the tests are done.
shutil.rmtree(cls.temporary_directory)
# Kill the SimpleHTTPServer Process
if cls.server_process is None:
message = '\tServer process ' + str(cls.server_process.pid) + \
' terminated.'
logger.info(message)
cls.server_process.kill()
def setUp(self):
# We are inheriting from custom class.
unittest_toolbox.Modified_TestCase.setUp(self)
# Copy the original repository files provided in the test folder so that
# any modifications made to repository files are restricted to the copies.
# The 'repository_data' directory is expected to exist in 'tuf.tests/'.
original_repository_files = os.path.join(os.getcwd(), 'repository_data')
temporary_repository_root = \
self.make_temp_directory(directory=self.temporary_directory)
# The original repository, keystore, and client directories will be copied
# for each test case.
original_repository = os.path.join(original_repository_files, 'repository')
original_keystore = os.path.join(original_repository_files, 'keystore')
original_client = os.path.join(original_repository_files, 'client')
# Save references to the often-needed client repository directories.
# Test cases need these references to access metadata and target files.
self.repository_directory = \
os.path.join(temporary_repository_root, 'repository')
self.keystore_directory = \
os.path.join(temporary_repository_root, 'keystore')
self.client_directory = os.path.join(temporary_repository_root, 'client')
self.client_metadata = os.path.join(self.client_directory, 'metadata')
self.client_metadata_current = os.path.join(self.client_metadata, 'current')
self.client_metadata_previous = \
os.path.join(self.client_metadata, 'previous')
# Copy the original 'repository', 'client', and 'keystore' directories
# to the temporary repository the test cases can use.
shutil.copytree(original_repository, self.repository_directory)
shutil.copytree(original_client, self.client_directory)
shutil.copytree(original_keystore, self.keystore_directory)
# 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'.
repository_basepath = self.repository_directory[len(os.getcwd()):]
# Test Set 1 -
port = self.SERVER_PORT
url_prefix = 'http://localhost:' + str(port) + repository_basepath
# Setting 'tuf.conf.repository_directory' with the temporary client
# directory copied from the original repository files.
tuf.conf.repository_directory = self.client_directory
self.repository_mirrors = {'mirror': {'url_prefix': url_prefix,
'metadata_path': 'metadata',
'targets_path': 'targets',
'confined_target_dirs': ['']}
}
self.target_filepath = [{".*/targets":"/file1.txt"}]
self.good_configuration = configuration.Configuration('localhost', 8001,
self.client_directory,
self.repository_mirrors,
self.target_filepath, None)
self.test1_configuration = configuration.Configuration('localhost', port,
self.client_directory,
self.repository_mirrors,
'targets', None)
self.test2_configuration = configuration.Configuration('localhost', 8002,
self.client_directory,
self.repository_mirrors,
'targets', None)
test_server_port=random.randint(30000, 45000)
self.test3_configuration = configuration.Configuration('localhost',
test_server_port,
self.client_directory,
self.repository_mirrors,
'targets', None)
url_prefix_test = \
'http://localhost:' + str(test_server_port) + repository_basepath
self.repository_mirrors = {'mirror': {'url_prefix': url_prefix_test,
'metadata_path': 'metadata',
'targets_path': 'targets',
'confined_target_dirs': ['']}
}
self.test4_configuration = configuration.Configuration('localhost', 8004,
self.client_directory,
self.repository_mirrors,
'targets', None)
def tearDown(self):
# We are inheriting from custom class.
unittest_toolbox.Modified_TestCase.tearDown(self)
# Unit Tests
def test_add(self):
updater_controller = updater.UpdaterController()
# Given good configuration, the UpdaterController.add() should work.
updater_controller.add(self.good_configuration)
# Instead of configuration, if some number is given.
self.assertRaises(tuf.InvalidConfigurationError, updater_controller.add, 8)
# Hostname already exists, should raise exception.
self.assertRaises(tuf.FormatError, updater_controller.add,
self.good_configuration)
# Hostname already exists as a mirror, should raise an exception.
self.assertRaises(tuf.FormatError, updater_controller.add,
self.test1_configuration)
# Repository mirror already exists as another mirror.
self.assertRaises(tuf.FormatError, updater_controller.add,
self.test2_configuration)
# Remove the old updater.
updater_controller.remove(self.good_configuration)
# Add a new updater for this test.
updater_controller.add(self.test3_configuration)
# Repository mirror already exists as an updater.
self.assertRaises(tuf.FormatError, updater_controller.add,
self.test4_configuration)
# Remove the updater once the testing is completed.
updater_controller.remove(self.test3_configuration)
def test_refresh(self):
updater_controller = updater.UpdaterController()
# To check refresh() method, add a configuration for test.
updater_controller.add(self.good_configuration)
updater_controller.refresh(self.good_configuration)
# Check for invalid configuration error.
self.assertRaises(tuf.InvalidConfigurationError,
updater_controller.refresh, 8)
# Check if the updater not added in the updater list is refreshed, gives an
# error or not.
self.assertRaises(tuf.NotFoundError, updater_controller.refresh,
self.test1_configuration)
# Giving the same port number and network location as good_configuration.
self.test4_configuration.port = 8001
self.test4_configuration.network_location = 'localhost:8001'
# Check if the mirror not added is refreshed, gives an error or not.
self.assertRaises(tuf.NotFoundError, updater_controller.refresh,
self.test4_configuration)
# Make an object of tuf.interposition.updater.Updater of good configuration
# for testing.
good_updater = updater.Updater(self.good_configuration)
good_updater.refresh()
self.good_configuration.repository_mirrors['mirror']['url_prefix'] = \
'http://localhost:99999999'
# To check if a bad url_prefix of a mirror raises an exception or not.
self.assertRaises(tuf.NoWorkingMirrorError, good_updater.refresh)
def test_get(self):
updater_controller = updater.UpdaterController()
updater_controller.add(self.good_configuration)
url = 'http://localhost:8001'
updater_controller.get(url)
wrong_url = 'http://localhost:9999'
updater_controller.get(wrong_url)
good_updater = updater.Updater(self.good_configuration)
self.assertRaises(tuf.URLMatchesNoPatternError,
good_updater.get_target_filepath, url)
def test_remove(self):
updater_controller = updater.UpdaterController()
# To check remove() method, add a configuration for test.
updater_controller.add(self.good_configuration)
# Check for invalid configuration error.
self.assertRaises(tuf.InvalidConfigurationError,
updater_controller.remove, 8)
self.assertRaises(tuf.NotFoundError, updater_controller.remove,
self.test1_configuration)
# Giving the same port number and network location as good_configuration.
self.test4_configuration.port = 8001
self.test4_configuration.network_location = 'localhost:8001'
self.assertRaises(tuf.NotFoundError, updater_controller.remove,
self.test4_configuration)
class TestUpdater(unittest_toolbox.Modified_TestCase):
@classmethod
def setUpClass(cls):
# This method is called before tests in individual class are executed.
# Create a temporary directory to store the repository, metadata, and
# target files. 'temporary_directory' must be deleted in TearDownModule()
# so that temporary files are always removed, even when exceptions occur.
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
# Launch a SimpleHTTPServer (serves files in the current directory).
# Test cases will request metadata and target files that have been
# pre-generated in 'tuf/tests/repository_data', which will be served
# by the SimpleHTTPServer launched here. The test cases of
# 'test_updater.py' assume the pre-generated metadata files have a specific
# structure, such as a delegated role 'targets/role1', three target
# files, five key files, etc.
cls.SERVER_PORT = random.randint(30000, 45000)
command = ['python', 'simple_server.py', str(cls.SERVER_PORT)]
cls.server_process = subprocess.Popen(command, stderr=subprocess.PIPE)
logger.info('\n\tServer process started.')
logger.info('\tServer process id: '+str(cls.server_process.pid))
logger.info('\tServing on port: '+str(cls.SERVER_PORT))
cls.url = 'http://localhost:'+str(cls.SERVER_PORT) + os.path.sep
time.sleep(1)
@classmethod
def tearDownClass(cls):
# Remove the temporary directory after all the tests are done.
shutil.rmtree(cls.temporary_directory)
# Kill the SimpleHTTPServer Process
if cls.server_process is None:
message = '\tServer process ' + str(cls.server_process.pid) + \
' terminated.'
logger.info(message)
cls.server_process.kill()
def setUp(self):
# We are inheriting from custom class.
unittest_toolbox.Modified_TestCase.setUp(self)
# Copy the original repository files provided in the test folder so that
# any modifications made to repository files are restricted to the copies.
# The 'repository_data' directory is expected to exist in 'tuf.tests/'.
original_repository_files = os.path.join(os.getcwd(), 'repository_data')
temporary_repository_root = \
self.make_temp_directory(directory=self.temporary_directory)
# The original repository, keystore, and client directories will be copied
# for each test case.
original_repository = os.path.join(original_repository_files, 'repository')
original_keystore = os.path.join(original_repository_files, 'keystore')
original_client = os.path.join(original_repository_files, 'client')
# Save references to the often-needed client repository directories.
# Test cases need these references to access metadata and target files.
self.repository_directory = \
os.path.join(temporary_repository_root, 'repository')
self.keystore_directory = \
os.path.join(temporary_repository_root, 'keystore')
self.client_directory = os.path.join(temporary_repository_root, 'client')
self.client_metadata = os.path.join(self.client_directory, 'metadata')
self.client_metadata_current = os.path.join(self.client_metadata, 'current')
self.client_metadata_previous = \
os.path.join(self.client_metadata, 'previous')
# Copy the original 'repository', 'client', and 'keystore' directories
# to the temporary repository the test cases can use.
shutil.copytree(original_repository, self.repository_directory)
shutil.copytree(original_client, self.client_directory)
shutil.copytree(original_keystore, self.keystore_directory)
# 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'.
repository_basepath = self.repository_directory[len(os.getcwd()):]
# Test Set 1 -
port = self.SERVER_PORT
url_prefix = 'http://localhost:' + str(port) + repository_basepath
# Setting 'tuf.conf.repository_directory' with the temporary client
# directory copied from the original repository files.
tuf.conf.repository_directory = self.client_directory
self.repository_mirrors = {'mirror': {'url_prefix': url_prefix,
'metadata_path': 'metadata',
'targets_path': 'targets',
'confined_target_dirs': ['']}
}
self.target_paths = [{".*/targets":"/file1.txt"}]
self.good_configuration = configuration.Configuration('localhost', 8001,
self.client_directory,
self.repository_mirrors,
self.target_paths, None)
def tearDown(self):
# We are inheriting from custom class.
unittest_toolbox.Modified_TestCase.tearDown(self)
# Unit Tests
def test_download_target(self):
myUpdater = updater.Updater(self.good_configuration)
target_filepath = 'file.txt'
self.assertRaises(tuf.UnknownTargetError, myUpdater.download_target,
target_filepath)
self.assertRaises(tuf.FormatError, myUpdater.download_target, 8)
target_filepath = 'file1.txt'
myUpdater.download_target(target_filepath)
def test_get_target_filepath(self):
myUpdater = updater.Updater(self.good_configuration)
self.assertRaises(AttributeError, myUpdater.get_target_filepath, 8)
test_source_url = 'http://localhost:9999'
self.assertRaises(tuf.URLMatchesNoPatternError,
myUpdater.get_target_filepath, test_source_url)
test_source_url = 'http://localhost:8001/targets/file.txt'
myUpdater.get_target_filepath(test_source_url)
def test_open(self):
myUpdater = updater.Updater(self.good_configuration)
self.assertRaises(AttributeError, myUpdater.open, 8)
url = 'http://localhost:8001/targets/file1.txt'
myUpdater.open(url, 'interposition.json')
def test_retrieve(self):
myUpdater = updater.Updater(self.good_configuration)
self.assertRaises(AttributeError, myUpdater.retrieve, 8)
test_source_url = 'http://localhost:8001/targets/file1.txt'
myUpdater.retrieve(test_source_url, 'interposition.json')
#self.assertRaises(tuf.NoWorkingMirrorError, myUpdater.retrieve, test_source_url)
test_source_url = 'http://6767:localhost'
self.assertRaises(tuf.URLMatchesNoPatternError, myUpdater.retrieve,
test_source_url)
test_source_url = 'http://localhost:8001/targets/file1.txt'
myUpdater.retrieve(test_source_url)
if __name__ == '__main__':
unittest.main()

View file

@ -182,17 +182,15 @@ def test_create_keydb_from_root_metadata(self):
keyid = KEYS[0]['keyid']
rsakey2 = KEYS[1]
keyid2 = KEYS[1]['keyid']
keydict = {keyid: rsakey, keyid2: rsakey2, keyid: rsakey}
keydict = {keyid: rsakey, keyid2: rsakey2}
# Add a duplicate 'keyid' to log/trigger a 'tuf.KeyAlreadyExistsError'
# block (loading continues).
roledict = {'Root': {'keyids': [keyid], 'threshold': 1},
'Targets': {'keyids': [keyid2], 'threshold': 1}}
'Targets': {'keyids': [keyid2, keyid], 'threshold': 1}}
version = 8
consistent_snapshot = False
expires = '1985-10-21T01:21:00Z'
tuf.keydb.add_key(rsakey)
root_metadata = tuf.formats.RootFile.make_metadata(version,
expires,
keydict, roledict,
@ -231,7 +229,7 @@ def test_create_keydb_from_root_metadata(self):
rsakey3['keytype'] = 'bad_keytype'
keydict[keyid3] = rsakey3
version = 8
expires = '1985-10-21T01:21:00Z'
expires = '1985-10-21T01:21:00Z'
root_metadata = tuf.formats.RootFile.make_metadata(version,
expires,

View file

@ -235,8 +235,19 @@ def test_verify_signature(self):
# Passing incorrect number of arguments.
self.assertRaises(TypeError, KEYS.verify_signature)
# Verify that the pure python 'ed25519' base case (triggered if 'pynacl' is
# unavailable) is executed in tuf.keys.verify_signature().
KEYS._ED25519_CRYPTO_LIBRARY = 'invalid'
KEYS._available_crypto_libraries = ['invalid']
verified = KEYS.verify_signature(self.ed25519key_dict, ed25519_signature, DATA)
self.assertTrue(verified, "Incorrect signature.")
# Reset to the expected available crypto libraries.
KEYS._ED25519_CRYPTO_LIBRARY = 'pynacl'
KEYS._available_crypto_libraries = ['ed25519', 'pycrypto', 'pynacl']
def test_create_rsa_encrypted_pem(self):
# Test valid arguments.

View file

@ -116,7 +116,7 @@ def test_add_console_handler(self):
raise TypeError('Test exception output in the console.')
except TypeError as e:
logger.error(e)
logger.exception(e)
def test_remove_console_handler(self):

View file

@ -66,6 +66,7 @@ def test_generate_rsa_public_and_private(self):
def test_create_rsa_signature(self):
global private_rsa
global public_rsa
data = 'The quick brown fox jumps over the lazy dog'.encode('utf-8')
signature, method = pycrypto.create_rsa_signature(private_rsa, data)
@ -83,9 +84,15 @@ def test_create_rsa_signature(self):
pycrypto.create_rsa_signature, '', data)
# Check for invalid 'data'.
pycrypto.create_rsa_signature(private_rsa, '')
self.assertRaises(tuf.CryptoError,
pycrypto.create_rsa_signature, private_rsa, 123)
# Check for missing private key.
self.assertRaises(tuf.CryptoError,
pycrypto.create_rsa_signature, public_rsa, data)
def test_verify_rsa_signature(self):
global public_rsa
@ -209,6 +216,7 @@ def test_create_rsa_public_and_private_from_encrypted_pem(self):
self.assertRaises(tuf.FormatError,
pycrypto.create_rsa_public_and_private_from_encrypted_pem,
pem_rsakey, ['pw'])
self.assertRaises(tuf.CryptoError,
pycrypto.create_rsa_public_and_private_from_encrypted_pem,
'invalid_pem', passphrase)

View file

@ -493,7 +493,14 @@ def test_generate_targets_metadata(self):
version, expiration_date, delegations,
False)
self.assertTrue(tuf.formats.TARGETS_SCHEMA.matches(targets_metadata))
# Valid arguments with 'delegations' set to None.
targets_metadata = \
repo_lib.generate_targets_metadata(targets_directory, target_files,
version, expiration_date, None,
False)
self.assertTrue(tuf.formats.TARGETS_SCHEMA.matches(targets_metadata))
# Verify that 'digest.filename' file is saved to 'targets_directory' if
# the 'write_consistent_targets' argument is True.
list_targets_directory = os.listdir(targets_directory)
@ -529,11 +536,13 @@ def test_generate_targets_metadata(self):
targets_directory, target_files, version, expiration_date,
delegations, 3)
# Test non-existent target file.
bad_target_file = \
{'non-existent.txt': {'file_permission': file_permissions}}
# Test invalid 'target_files' argument.
self.assertRaises(tuf.Error, repo_lib.generate_targets_metadata,
targets_directory, ['nonexistent_file.txt'], version,
expiration_date)
targets_directory, bad_target_file, version,
expiration_date)
@ -638,16 +647,27 @@ def test_sign_metadata(self):
root_private_keypath = os.path.join(keystore_path, 'root_key')
root_private_key = \
repo_lib.import_rsa_privatekey_from_file(root_private_keypath,
'password')
repo_lib.import_rsa_privatekey_from_file(root_private_keypath, 'password')
# Sign with a valid, but not a threshold, key.
targets_private_keypath = os.path.join(keystore_path, 'targets_key')
targets_private_key = \
repo_lib.import_rsa_privatekey_from_file(targets_private_keypath,
'password')
# sign_metadata() expects the private key 'root_metadata' to be in
# 'tuf.keydb'. Remove any public keys that may be loaded before
# adding private key, otherwise a 'tuf.KeyAlreadyExists' exception is
# raised.
tuf.keydb.remove_key(root_private_key['keyid'])
tuf.keydb.add_key(root_private_key)
tuf.keydb.remove_key(targets_private_key['keyid'])
tuf.keydb.add_key(targets_private_key)
root_keyids.extend(tuf.roledb.get_role_keyids('targets'))
# Add the snapshot's public key (to test whether non-private keys are
# ignored by sign_metadata()).
root_keyids.extend(tuf.roledb.get_role_keyids('snapshot'))
root_signable = repo_lib.sign_metadata(root_metadata, root_keyids,
root_filename)
self.assertTrue(tuf.formats.SIGNABLE_SCHEMA.matches(root_signable))
@ -748,6 +768,85 @@ def test__check_directory(self):
def test__generate_and_write_metadata(self):
# Test for invalid, or unsupported, rolename.
# Load the root metadata provided in 'tuf/tests/repository_data/'.
root_filepath = os.path.join('repository_data', 'repository',
'metadata', 'root.json')
root_signable = tuf.util.load_json_file(root_filepath)
# _generate_and_write_metadata() expects the top-level roles
# (specifically 'snapshot') and keys to be available in 'tuf.roledb'.
tuf.roledb.create_roledb_from_root_metadata(root_signable['signed'])
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
targets_directory = os.path.join(temporary_directory, 'targets')
os.mkdir(targets_directory)
repository_directory = os.path.join(temporary_directory, 'repository')
metadata_directory = os.path.join(repository_directory,
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
targets_metadata = os.path.join('repository_data', 'repository', 'metadata',
'targets.json')
obsolete_metadata = os.path.join(metadata_directory, 'targets',
'obsolete_role.json')
tuf.util.ensure_parent_dir(obsolete_metadata)
shutil.copyfile(targets_metadata, obsolete_metadata)
# Test for an invalid, or unsupported, rolename.
roleinfo = {'keyids': ['123'], 'threshold': 1}
tuf.roledb.add_role('bad_rolename', roleinfo)
self.assertRaises(tuf.Error,
tuf.repository_lib._generate_and_write_metadata,
'bad_rolename', 'bad_rolename.json', False,
targets_directory, metadata_directory)
# Verify that obsolete metadata (a metadata file exists on disk, but the
# role is unavailable in 'tuf.roledb'). First add the obsolete
# role to 'tuf.roledb' so that its metadata file can be written to disk.
targets_roleinfo = tuf.roledb.get_roleinfo('targets')
targets_roleinfo['version'] = 1
expiration = \
tuf.formats.unix_timestamp_to_datetime(int(time.time() + 86400))
expiration = expiration.isoformat() + 'Z'
targets_roleinfo['expires'] = expiration
tuf.roledb.add_role('targets/obsolete_role', targets_roleinfo)
snapshot_filepath = os.path.join('repository_data', 'repository',
'metadata', 'snapshot.json')
snapshot_signable = tuf.util.load_json_file(snapshot_filepath)
tuf.roledb.remove_role('targets/obsolete_role')
self.assertTrue(os.path.exists(os.path.join(metadata_directory,
'targets/obsolete_role.json')))
tuf.repository_lib._delete_obsolete_metadata(metadata_directory,
snapshot_signable['signed'],
False)
self.assertFalse(os.path.exists(metadata_directory + 'targets/obsolete_role.json'))
def test__remove_invalid_and_duplicate_signatures(self):
# Remove duplicate PSS signatures (same key generates valid, but different
# signatures). First load a valid signable (in this case, the root role).
root_filepath = os.path.join('repository_data', 'repository',
'metadata', 'root.json')
root_signable = tuf.util.load_json_file(root_filepath)
key_filepath = os.path.join('repository_data', 'keystore', 'root_key')
root_rsa_key = repo_lib.import_rsa_privatekey_from_file(key_filepath,
'password')
# Append the new valid, but duplicate PSS signature, and test that
# duplicates are removed.
new_pss_signature = tuf.keys.create_signature(root_rsa_key,
root_signable['signed'])
root_signable['signatures'].append(new_pss_signature)
expected_number_of_signatures = len(root_signable['signatures'])
tuf.repository_lib._remove_invalid_and_duplicate_signatures(root_signable)
self.assertEqual(len(root_signable), expected_number_of_signatures)
# Test that invalid keyid are ignored.
root_signable['signatures'][0]['keyid'] = '404'
tuf.repository_lib._remove_invalid_and_duplicate_signatures(root_signable)
# Run the test cases.
if __name__ == '__main__':
unittest.main()

View file

@ -114,7 +114,7 @@ def test_init(self):
def test_write_and_write_partial(self):
# Test creation of a TUF repository.
#
# 1. Load public and private keys.
# 1. Import public and private keys.
# 2. Add verification keys.
# 3. Load signing keys.
# 4. Add target files.
@ -245,26 +245,37 @@ def test_write_and_write_partial(self):
repository.status()
# Verify status() does not raise 'tuf.InsufficientKeysError' if a top-level
# role does not contain a threshold of keys.
# role does and 'targets/role1' do not contain a threshold of keys.
root_roleinfo = tuf.roledb.get_roleinfo('root')
old_threshold = root_roleinfo['threshold']
old_threshold = root_roleinfo['threshold']
root_roleinfo['threshold'] = 10
role1_roleinfo = tuf.roledb.get_roleinfo('targets/role1')
old_role1_threshold = role1_roleinfo['threshold']
role1_roleinfo['threshold'] = 10
tuf.roledb.update_roleinfo('root', root_roleinfo)
tuf.roledb.update_roleinfo('targets/role1', role1_roleinfo)
repository.status()
# Restore the original threshold value.
# Restore the original threshold values.
root_roleinfo['threshold'] = old_threshold
tuf.roledb.update_roleinfo('root', root_roleinfo)
role1_roleinfo['threshold'] = old_role1_threshold
tuf.roledb.update_roleinfo('targets/role1', role1_roleinfo)
# Verify status() does not raise 'tuf.UnsignedMetadataError' if any of the
# the top-level roles are improperly signed.
# the top-level roles and 'targets/role1' are improperly signed.
repository.root.unload_signing_key(root_privkey)
repository.root.load_signing_key(targets_privkey)
repository.targets('role1').unload_signing_key(role1_privkey)
repository.targets('role1').load_signing_key(targets_privkey)
repository.status()
# Reset Root and verify Targets.
# Reset Root and 'targets/role1', and verify Targets.
repository.root.unload_signing_key(targets_privkey)
repository.root.load_signing_key(root_privkey)
repository.targets('role1').unload_signing_key(targets_privkey)
repository.targets('role1').load_signing_key(role1_privkey)
repository.targets.unload_signing_key(targets_privkey)
repository.targets.load_signing_key(snapshot_privkey)
repository.status()
@ -333,8 +344,9 @@ def test_get_filepaths_in_directory(self):
# Verify the expected filenames. get_filepaths_in_directory() returns
# a list of absolute paths.
metadata_files = repo.get_filepaths_in_directory(metadata_directory)
expected_files = ['root.json', 'targets.json', 'targets.json.gz',
'snapshot.json', 'timestamp.json']
expected_files = ['root.json', 'root.json.gz', 'targets.json',
'targets.json.gz', 'snapshot.json', 'snapshot.json.gz',
'timestamp.json', 'timestamp.json.gz']
basenames = []
for filepath in metadata_files:
basenames.append(os.path.basename(filepath))
@ -986,8 +998,11 @@ def test_add_targets(self):
target1_filepath = os.path.join(self.targets_directory, 'file1.txt')
target2_filepath = os.path.join(self.targets_directory, 'file2.txt')
target3_filepath = os.path.join(self.targets_directory, 'file3.txt')
target_files = [target1_filepath, target2_filepath, target3_filepath]
# Add a 'target1_filepath' duplicate for testing purposes
# ('target1_filepath' should not be added twice.)
target_files = \
[target1_filepath, target2_filepath, target3_filepath, target1_filepath]
self.targets_object.add_targets(target_files)
self.assertEqual(len(self.targets_object.target_files), 3)
@ -996,17 +1011,19 @@ def test_add_targets(self):
# Test improperly formatted arguments.
self.assertRaises(tuf.FormatError, self.targets_object.add_targets,
3)
self.assertRaises(tuf.FormatError, self.targets_object.add_targets, 3)
# Test invalid filepath argument (i.e., non-existent or invalid file.)
self.assertRaises(tuf.Error, self.targets_object.add_target,
self.assertRaises(tuf.Error, self.targets_object.add_targets,
['non-existent.txt'])
self.assertRaises(tuf.Error, self.targets_object.add_target,
self.assertRaises(tuf.Error, self.targets_object.add_targets,
[target1_filepath, target2_filepath, 'non-existent.txt'])
self.assertRaises(tuf.Error, self.targets_object.add_target,
self.temporary_directory)
self.assertRaises(tuf.Error, self.targets_object.add_targets,
[self.temporary_directory])
temp_directory = os.path.join(self.targets_directory, 'temp')
os.mkdir(temp_directory)
self.assertRaises(tuf.Error, self.targets_object.add_targets,
[temp_directory])
@ -1031,6 +1048,10 @@ def test_remove_target(self):
# Test invalid filepath argument (i.e., non-existent or invalid file.)
self.assertRaises(tuf.Error, self.targets_object.remove_target,
'/non-existent.txt')
# Test for filepath that hasn't been added yet.
target5_filepath = os.path.join(self.targets_directory, 'file5.txt')
self.assertRaises(tuf.Error, self.targets_object.remove_target,
target5_filepath)
@ -1075,6 +1096,25 @@ def test_delegate(self):
self.assertEqual(self.targets_object.get_delegated_rolenames(),
['targets/tuf'])
# Try to delegate to a role that has already been delegated.
self.assertRaises(tuf.Error, self.targets_object.delegate, rolename,
public_keys, list_of_targets, threshold, backtrack=True,
restricted_paths=restricted_paths,
path_hash_prefixes=path_hash_prefixes)
# Test for targets that do not exist under the targets directory.
self.targets_object.revoke(rolename)
self.assertRaises(tuf.Error, self.targets_object.delegate, rolename,
public_keys, ['non-existent.txt'], threshold,
backtrack=True, restricted_paths=restricted_paths,
path_hash_prefixes=path_hash_prefixes)
# Test for targets that do not exist under the targets directory.
self.assertRaises(tuf.Error, self.targets_object.delegate, rolename,
public_keys, list_of_targets, threshold,
backtrack=True, restricted_paths=['non-existent.txt'],
path_hash_prefixes=path_hash_prefixes)
# Test improperly formatted arguments.

View file

@ -257,16 +257,18 @@ def test_with_tuf_mode_2(self):
# by sending just several characters every few seconds.
server_process = self._start_slow_server('mode_2')
client_filepath = os.path.join(self.client_directory, 'file1.txt')
original_average_download_speed = tuf.conf.MIN_AVERAGE_DOWNLOAD_SPEED
tuf.conf.MIN_AVERAGE_DOWNLOAD_SPEED = 1
try:
file1_target = self.repository_updater.target('file1.txt')
self.repository_updater.download_target(file1_target, self.client_directory)
# Verify that the specific 'tuf.SlowRetrievalError' exception is raised by
# each mirror. 'file1.txt' should be large enough to trigger a slow
# retrieval attack, otherwise the expected exception may not be consistently
# raised.
# retrieval attack, otherwise the expected exception may not be
# consistently raised.
except tuf.NoWorkingMirrorError as exception:
for mirror_url, mirror_error in six.iteritems(exception.mirror_errors):
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
@ -283,6 +285,7 @@ def test_with_tuf_mode_2(self):
finally:
self._stop_slow_server(server_process)
tuf.conf.MIN_AVERAGE_DOWNLOAD_SPEED = original_average_download_speed
if __name__ == '__main__':

View file

@ -443,8 +443,6 @@ def test_2__import_delegations(self):
self.repository_updater.metadata['current']['targets']\
['delegations']['keys'][existing_keyid]['keyid'] = '123'
print(repr(self.repository_updater.metadata['current']['targets']\
['delegations']['keys'][existing_keyid]['keyid']))
self.repository_updater._import_delegations('targets')
#self.assertRaises(tuf.Error, self.repository_updater._import_delegations,
# 'targets')
@ -457,12 +455,9 @@ def test_2__import_delegations(self):
# one of the roles loaded from parent role's 'delegations'.
def test_2__fileinfo_has_changed(self):
# Verify that the method returns 'False' if file info was not changed.
root_filepath = os.path.join(self.client_metadata_current, 'root.json')
@ -691,6 +686,10 @@ def test_3__update_metadata_if_changed(self):
self.assertTrue(self.repository_updater.metadata['current']['targets'])
self.assertEqual(self.repository_updater.metadata['current']['targets']['version'], 2)
# Test for an invalid 'referenced_metadata' argument.
self.assertRaises(tuf.RepositoryError,
self.repository_updater._update_metadata_if_changed,
'snapshot', 'bad_role')
@ -722,7 +721,15 @@ def test_4_refresh(self):
# First verify that an expired root metadata is updated.
expired_date = '1960-01-01T12:00:00Z'
self.repository_updater.metadata['current']['root']['expires'] = expired_date
self.repository_updater.refresh()
self.repository_updater.refresh()
# Second, verify that expired root metadata is not updated if
# 'unsafely_update_root_if_necessary' is explictly set to 'False'.
expired_date = '1960-01-01T12:00:00Z'
self.repository_updater.metadata['current']['root']['expires'] = expired_date
self.assertRaises(tuf.ExpiredMetadataError,
self.repository_updater.refresh,
unsafely_update_root_if_necessary=False)
repository = repo_tool.load_repository(self.repository_directory)
target3 = os.path.join(self.repository_directory, 'targets', 'file3.txt')
@ -786,8 +793,17 @@ def test_4__refresh_targets_metadata(self):
# Verify that client's metadata files were refreshed successfully.
self.assertEqual(len(self.repository_updater.metadata['current']), 5)
# Test for compressed metadata roles.
self.repository_updater.metadata['current']['snapshot']['meta']['targets.json.gz'] = \
self.repository_updater.metadata['current']['snapshot']['meta']['targets.json']
self.repository_updater._refresh_targets_metadata(include_delegations=True)
# Test for repository error if the 'targets' role is not specified
# in 'snapshot'.
del self.repository_updater.metadata['current']['snapshot']['meta']['targets.json']
self.assertRaises(tuf.RepositoryError,
self.repository_updater._refresh_targets_metadata,
'targets', True)
def test_5_all_targets(self):
@ -867,6 +883,39 @@ def test_5_targets_of_role(self):
def test_6_refresh_tagets_metadata_chain(self):
# NOTE: This function does not refresh the role specified in the argument,
# only its parent roles.
# Remove the metadata of the delegated roles.
os.remove(os.path.join(self.client_metadata_current, 'targets.json'))
os.remove(os.path.join(self.client_metadata_current, 'targets', 'role1.json'))
# Test: normal case.
metadata_list = \
self.repository_updater.refresh_targets_metadata_chain('targets')
"""
print(repr(metadata_list))
self.assertEqual(len(metadata_list), 0)
self.assertTrue('targets' in metadata_list)
# Verify that the expected role files were downloaded and installed.
os.path.exists(os.path.join(self.client_metadata_current, 'targets.json'))
self.assertTrue('targets' in self.repository_updater.metadata['current'])
self.assertFalse('targets/role1' in self.repository_updater.metadata['current'])
"""
# Test: Invalid arguments.
# refresh_targets_metadata_chain() expects a string rolename.
self.assertRaises(tuf.FormatError,
self.repository_updater.refresh_targets_metadata_chain,
8)
self.assertRaises(tuf.RepositoryError,
self.repository_updater.refresh_targets_metadata_chain,
'unknown_rolename')
def test_6_target(self):
# Setup
@ -975,6 +1024,7 @@ def test_6_download_target(self):
target_fileinfo = self.repository_updater.target(target_filepath)
self.repository_updater.download_target(target_fileinfo,
destination_directory)
download_filepath = \
os.path.join(destination_directory, target_filepath.lstrip('/'))
self.assertTrue(os.path.exists(download_filepath))
@ -986,6 +1036,15 @@ def test_6_download_target(self):
if 'custom' in target_fileinfo['fileinfo']:
download_targetfileinfo['custom'] = target_fileinfo['fileinfo']['custom']
self.assertEqual(target_fileinfo['fileinfo'], download_targetfileinfo)
# Test when consistent snapshots is set. TODO: create a valid repository
# with consistent snapshots set. The updater expects the existence
# of <hash>.filename files if root.json sets 'consistent_snapshot = True'.
"""
self.repository_updater.consistent_snapshot = True
self.repository_updater.download_target(target_fileinfo,
destination_directory)
"""
# Test: Invalid arguments.
self.assertRaises(tuf.FormatError, self.repository_updater.download_target,
@ -995,7 +1054,14 @@ def test_6_download_target(self):
target_fileinfo = self.repository_updater.target(random_target_filepath)
self.assertRaises(tuf.FormatError, self.repository_updater.download_target,
target_fileinfo, 8)
# Non-existent destination.
# TODO: test for non-existent directories.
"""
self.assertRaises(tuf.Error, self.repository_updater.download_target,
target_fileinfo, 'non-existent/bad_path')
"""
# Test:
# Attempt a file download of a valid target, however, a download exception
# occurs because the target is not within the mirror's confined target
@ -1031,12 +1097,21 @@ def test_7_updated_targets(self):
# Get the list of target files. It will be used as an argument to
# 'updated_targets' function.
all_targets = self.repository_updater.all_targets()
# Test for duplicates and targets in the root directory of the repository.
additional_target = all_targets[0].copy()
all_targets.append(additional_target)
additional_target_in_root_directory = additional_target.copy()
additional_target_in_root_directory['filepath'] = 'file1.txt'
all_targets.append(additional_target_in_root_directory)
# At this point client needs to update and download all targets.
# Test: normal cases.
updated_targets = \
self.repository_updater.updated_targets(all_targets, destination_directory)
all_targets = self.repository_updater.all_targets()
# Assumed the pre-generated repository specifies two target files in
# 'targets.json' and one delegated target file in 'targets/role1.json'.
self.assertEqual(len(updated_targets), 3)
@ -1054,7 +1129,7 @@ def test_7_updated_targets(self):
# Test: download all the targets.
for download_target in all_targets:
self.repository_updater.download_target(download_target,
destination_directory)
destination_directory)
updated_targets = \
self.repository_updater.updated_targets(all_targets, destination_directory)
@ -1175,11 +1250,70 @@ def test_9__get_target_hash(self):
self.assertEqual(self.repository_updater._get_target_hash(filepath), target_hash)
# Test for improperly formatted argument.
self.assertRaises(tuf.FormatError, tuf.util.get_target_hash, 8)
#self.assertRaises(tuf.FormatError, self.repository_updater._get_target_hash, 8)
def test_10__hard_check_file_length(self):
# Test for exception if file object is not equal to trusted file length.
temp_file_object = tuf.util.TempFile()
temp_file_object.write(b'X')
temp_file_object.seek(0)
self.assertRaises(tuf.DownloadLengthMismatchError,
self.repository_updater._hard_check_file_length,
temp_file_object, 10)
def test_10__soft_check_file_length(self):
# Test for exception if file object is not equal to trusted file length.
temp_file_object = tuf.util.TempFile()
temp_file_object.write(b'XXX')
temp_file_object.seek(0)
self.assertRaises(tuf.DownloadLengthMismatchError,
self.repository_updater._soft_check_file_length,
temp_file_object, 1)
def test_10__targets_of_role(self):
# Test for non-existent role.
self.assertRaises(tuf.UnknownRoleError,
self.repository_updater._targets_of_role,
'non-existent-role')
# Test for role that hasn't been loaded yet.
del self.repository_updater.metadata['current']['targets']
self.assertEqual(len(self.repository_updater._targets_of_role('targets')),
0)
def test_10__visit_child_role(self):
# Call _visit_child_role and test the dict keys: 'paths',
# 'path_hash_prefixes', and if both are missing.
targets_role = self.repository_updater.metadata['current']['targets']
child_role = targets_role['delegations']['roles'][0]
self.assertEqual(self.repository_updater._visit_child_role(child_role,
'/file3.txt'), child_role['name'])
# Test path hash prefixes.
child_role['path_hash_prefixes'] = ['8baf', '0000']
self.assertEqual(self.repository_updater._visit_child_role(child_role,
'/file3.txt'), child_role['name'])
# Test if both 'path' and 'path_hash_prefixes' is missing.
del child_role['paths']
del child_role['path_hash_prefixes']
self.assertRaises(tuf.FormatError, self.repository_updater._visit_child_role,
child_role, child_role['name'])
def _load_role_keys(keystore_directory):

View file

@ -219,26 +219,32 @@ def test_A6_tempfile_decompress_temp_file_object(self):
for arg in bogus_args:
self.assertRaises(tuf.Error,
self.temp_fileobj.decompress_temp_file_object, arg)
# Test for a valid util.decompress_temp_file_object() call.
self.temp_fileobj.decompress_temp_file_object('gzip')
self.assertEqual(self.temp_fileobj.read(), fileobj.read())
# Checking the content of the TempFile's '_orig_file' instance.
check_compressed_original = self.make_temp_file()
with open(check_compressed_original, 'wb') as file_object:
file_object.write(self.temp_fileobj._orig_file.read())
self.temp_fileobj._orig_file.seek(0)
original_content = self.temp_fileobj._orig_file.read()
file_object.write(original_content)
data_in_orig_file = self._decompress_file(check_compressed_original)
fileobj.seek(0)
self.assertEqual(data_in_orig_file, fileobj.read())
# Try decompressing once more.
self.assertRaises(tuf.Error,
self.temp_fileobj.decompress_temp_file_object, 'gzip')
# Test decompression of invalid gzip file.
temp_file = tuf.util.TempFile()
fileobj.seek(0)
temp_file.write(fileobj.read())
temp_file.decompress_temp_file_object('gzip')
temp_file.write(b'bad zip')
contents = temp_file.read()
self.assertRaises(tuf.DecompressionError,
temp_file.decompress_temp_file_object, 'gzip')
@ -314,6 +320,12 @@ def test_B3_file_in_confined_directories(self):
def test_B4_import_json(self):
self.assertTrue('json' in sys.modules)
json_module = tuf.util.import_json()
self.assertTrue(json_module is not None)
# Test import_json() when 'util._json_moduel' is non-None.
tuf.util._json_module = 'junk_module'
self.assertEqual(tuf.util.import_json(), 'junk_module')
@ -410,7 +422,7 @@ def test_C2_find_delegated_role(self):
'targets/tuf')
# Test missing 'name' attribute (optional, but required by
# 'find_delegated_role()'.
# 'find_delegated_role()').
# Delete the duplicate role, and the remaining role's 'name' attribute.
del role_list[2]
del role_list[0]['name']
@ -424,7 +436,7 @@ def test_C3_paths_are_consistent_with_hash_prefixes(self):
path_hash_prefixes = ['e3a3', '8fae', 'd543']
list_of_targets = ['/file1.txt', '/README.txt', '/warehouse/file2.txt']
# Ensure the paths of 'list_of_targets' each have the epected path hash
# Ensure the paths of 'list_of_targets' each have the expected path hash
# prefix listed in 'path_hash_prefixes'.
for filepath in list_of_targets:
self.assertTrue(tuf.util.get_target_hash(filepath)[0:4] in path_hash_prefixes)

View file

@ -4,6 +4,7 @@
# and then run "tox" from this directory.
[tox]
#envlist = py27
envlist = py26, py27, py32, py33, py34
@ -12,7 +13,8 @@ changedir = tests
commands =
coverage run --source tuf aggregate_tests.py
coverage report -m
coverage report -m --fail-under 96
coverage html
deps =
coverage

View file

@ -293,7 +293,7 @@ class UnknownTargetError(Error):
class InvalidNameError(Error):
"""Indicate an error while trying to validate any type of named object"""
"""Indicate an error while trying to validate any type of named object."""
pass
@ -345,3 +345,18 @@ def __str__(self):
all_errors += '\n ' + repr(mirror_netloc) + ': ' + repr(mirror_error)
return all_errors
class NotFoundError(Error):
"""If a required configuration or resource is not found."""
pass
class URLMatchesNoPatternError(Error):
"""If a URL does not match a user-specified regular expression."""
pass
class InvalidConfigurationError(Error):
"""If a configuration object does not match the expected format."""
pass

View file

@ -1914,6 +1914,9 @@ def _refresh_targets_metadata(self, rolename='targets', include_delegations=Fals
# extensions.
if metadata_path.endswith('.json'):
roles_to_update.append(metadata_path[:-len('.json')])
else:
continue
# Remove the 'targets' role because it gets updated when the targets.json
# file is updated in _update_metadata_if_changed('targets').
@ -2440,6 +2443,9 @@ def _visit_child_role(self, child_role, target_filepath):
for child_role_path_hash_prefix in child_role_path_hash_prefixes:
if target_filepath_hash.startswith(child_role_path_hash_prefix):
child_role_is_relevant = True
else:
continue
elif child_role_paths is not None:
for child_role_path in child_role_paths:
@ -2726,6 +2732,8 @@ def download_target(self, target, destination_directory):
raise
else:
logger.warning(repr(target_dirpath) + ' does not exist.')
message = repr(target_dirpath) + ' does not exist.'
logger.warning(message)
raise tuf.Error(message)
target_file_object.move(destination)

View file

@ -44,10 +44,10 @@
# 'ssl.match_hostname' was added in Python 3.2. The vendored version is needed
# for Python 2.6 and 2.7.
try:
from ssl import match_hostname, CertificateError
from ssl import match_hostname, CertificateError
except ImportError:
from tuf._vendor.ssl_match_hostname import match_hostname, CertificateError
except ImportError: # pragma: no cover
from tuf._vendor.ssl_match_hostname import match_hostname, CertificateError
# See 'log.py' to learn how logging is handled in TUF.
logger = logging.getLogger('tuf.download')
@ -505,20 +505,16 @@ def _check_content_length(reported_length, required_length, strict_length=True):
logger.debug('The server reported a length of '+repr(reported_length)+' bytes.')
comparison_result = None
if reported_length < required_length:
comparison_result = 'less than'
try:
if reported_length < required_length:
comparison_result = 'less than'
elif reported_length > required_length:
comparison_result = 'greater than'
else:
comparison_result = 'equal to'
except:
logger.exception('Could not check reported and required lengths.')
elif reported_length > required_length:
comparison_result = 'greater than'
else:
comparison_result = 'equal to'
if strict_length:
message = 'The reported length is '+comparison_result+' the required '+\
'length of '+repr(required_length)+' bytes.'

View file

@ -1,28 +1,56 @@
## Examples
## Interposition
```python
import tuf.interposition
# Configurations are simply a JSON object which allows you to answer these questions:
# - Which network locations get intercepted?
# - Given a network location, which TUF mirrors should we forward requests to?
# - Given a network location, which paths should be intercepted?
# - Given a TUF mirror, how do we verify its SSL certificate?
tuf.interposition.configure()
```
The interposition package (tuf/interposition/) can be used to integrate TUF
into a software updater. It is an integration method that requires the least
amount of effort from developers who are performing the integration. The
integration method used by interposition is considered high-level because the
integrator does not explicitly call TUF methods to refresh metadata and
download target files. For example, performing a low-level integration with
*tuf/client/updater.py* requires the integrator to instantiate an updater
object, call updater.refresh() to refresh TUF metadata, and
updater.download_target() to download target files referenced in TUF metadata.
In contrast, an integrator may utilize interposition to load some configuration
settings to indicate which URLs requested by Python urllib calls should be
interposed by TUF. This means that all the update calls for metadata and
target requests are made transparently by the low level *tuf/client/updater.py*
module.
### Option one
### Interposition Examples
To use interposition, integrators must:
1. Create an interposition configuration file.
2. Import interposition, and load the configuration file with configure().
3. Perform updater urllib calls that may be interposed.
4. Deconfigure interposition.
## Option 1
```python
from tuf.interposition import urllib_tuf as urllib
from tuf.interposition import urllib2_tuf as urllib2
# configure() loads the interposition configuration file that indicates which
# URLs should be interposed by TUF. Any urllib calls that occur after
# configure() are subject to interposition.
configuration = tuf.interposition.configure()
url = 'http://example.com/path/to/document'
urllib.urlopen(url)
urllib.urlretrieve(url)
urllib.urlretrieve(url, 'mytarget')
urllib2.urlopen(url)
# deconfigure() is used to stop interposition. Any urllib calls that occur
# after deconfigure() are not interposed.
tuf.interposition.deconfigure(configuration)
```
### Option two
## Option 2
```python
@tuf.interposition.open_url
@ -30,6 +58,12 @@ def instancemethod(self, url, ...):
...
```
Note: tuf.interposition.refresh(configuration) may be called to force a
refresh of the TUF metadata. Interposition normally performs a refresh of TUF
metadata when configure() is called.
## Configuration
A *configuration* is simply a JSON object which tells `tuf.interposition` which

154
tuf/interposition/__init__.py Normal file → Executable file
View file

@ -1,16 +1,45 @@
"""
<Program Name>
__init__.py
<Author>
Trishank Kuppusamy.
Pankhuri Goyal <pankhurigoyal02@gmail.com>
<Started>
<Copyright>
See LICENSE for licensing information.
<Purpose>
TODO: Add pros / cons of using interposition. Also, should we move code
here to its own module (instead of __init__.py)?
"""
# Help with Python 3 compatibility, where the print statement is a function, an
# implicit relative import is invalid, and the '/' operator performs true
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import functools
import imp
import json
import socket
import urllib
import urllib2
import logging
import tuf.log
import tuf._vendor.six as six
# We import them directly into our namespace so that there is no name conflict.
from configuration import ConfigurationParser, InvalidConfiguration
from utility import Logger
from updater import UpdaterController
# We import the following directly into our namespace so that there is no name
# conflict.
from tuf.interposition.configuration import ConfigurationParser
from tuf.interposition.updater import UpdaterController
logger = logging.getLogger('tuf.interposition.__init__')
# Export nothing when: from tuf.interposition import *
__all__ = []
@ -19,10 +48,12 @@
# TODO:
# - Document design decisions.
# - Interposition: Honour urllib/urllib2 contract.
# - Review security issues resulting from regular expressions (e.g. complexity attacks).
# - Review security issues resulting from regular expressions
# (e.g. complexity attacks).
# - Warn user when TUF is used without any configuration.
# - Override other default (e.g. HTTPS) urllib2 handlers.
# - Failsafe: If TUF fails, offer option to unsafely resort back to urllib/urllib2?
# - Failsafe: If TUF fails, offer option to unsafely resort back to
# urllib/urllib2?
@ -59,39 +90,53 @@
def __monkey_patch():
"""Build and monkey patch public copies of the urllib and urllib2 modules.
"""
Build and monkey patch public copies of the urllib and urllib2 modules.
We prefer simplicity, which leads to easier proof of security, even if it may
come at the cost of not honouring some provisions of the urllib and urllib2
module contracts unrelated to security.
We prefer simplicity, which leads to easier proof of security, even if it may
come at the cost of not honouring some provisions of the urllib and urllib2
module contracts unrelated to security.
References:
http://stackoverflow.com/a/11285504
http://docs.python.org/2/library/imp.html"""
References:
http://stackoverflow.com/a/11285504
http://docs.python.org/2/library/imp.html
"""
global urllib_tuf
global urllib2_tuf
if urllib_tuf is None:
urllib_module_name = 'urllib'
if six.PY3:
urllib_module_name = 'urllib/request'
try:
module_file, pathname, description = imp.find_module("urllib")
module_file, pathname, description = imp.find_module(urllib_module_name)
urllib_tuf = \
imp.load_module( "urllib_tuf", module_file, pathname, description)
imp.load_module('urllib_tuf', module_file, pathname, description)
module_file.close()
except:
raise
else:
urllib_tuf.urlopen = __urllib_urlopen
urllib_tuf.urlretrieve = __urllib_urlretrieve
if urllib2_tuf is None:
urllib2_module_name = 'urllib2'
if six.PY3:
urllib2_module_name = 'urllib/request'
try:
module_file, pathname, description = imp.find_module("urllib2")
module_file, pathname, description = imp.find_module(urllib2_module_name)
urllib2_tuf = \
imp.load_module( "urllib2_tuf", module_file, pathname, description)
imp.load_module('urllib2_tuf', module_file, pathname, description)
module_file.close()
except:
raise
else:
urllib2_tuf.urlopen = __urllib2_urlopen
@ -105,7 +150,8 @@ def __urllib_urlopen(url, data=None, proxies=None):
updater = __updater_controller.get(url)
if updater is None:
return urllib.urlopen(url, data=data, proxies=proxies)
return six.moves.urllib.request.urlopen(url, data=data, proxies=proxies)
else:
return updater.open(url, data=data)
@ -119,7 +165,8 @@ def __urllib_urlretrieve(url, filename=None, reporthook=None, data=None):
updater = __updater_controller.get(url)
if updater is None:
return urllib.urlretrieve(url, filename=filename, reporthook=reporthook, data=data)
return six.moves.urllib.request.urlretrieve(url, filename=filename, reporthook=reporthook, data=data)
else:
return updater.retrieve(url, filename=filename, reporthook=reporthook, data=data)
@ -136,27 +183,32 @@ def __urllib2_urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
updater = None
# If this is a urllib2.Request...
if isinstance(url, urllib2.Request):
if isinstance(url, six.moves.urllib.request.Request):
# If this is a GET HTTP method...
if url.get_method() == "GET":
# ...then you should check with TUF.
updater = __updater_controller.get(url.get_full_url())
else:
# ...otherwise, revert to default behaviour.
Logger.warn(NON_GET_HTTP_METHOD_MESSAGE.format(method=url.get_method(),
logger.warn(NON_GET_HTTP_METHOD_MESSAGE.format(method=url.get_method(),
url=url.get_full_url()))
return urllib2.urlopen(url, data=data, timeout=timeout)
return six.moves.urllib.request.urlopen(url, data=data, timeout=timeout)
else:
# ...otherwise, we assume this is a string.
updater = __updater_controller.get(url)
if updater is None:
return urllib2.urlopen(url, data=data, timeout=timeout)
return six.moves.urllib.request.urlopen(url, data=data, timeout=timeout)
else:
response = updater.open(url, data=data)
# See urllib2.AbstractHTTPHandler.do_open
# TODO: let Updater handle this
response.msg = ""
return response
@ -173,12 +225,13 @@ def __read_configuration(configuration_handler,
parent_ssl_certificates_directory=None):
"""
A generic function to read TUF interposition configurations off a file, and
then handle those configurations with a given function. configuration_handler
must be a function which accepts a tuf.interposition.Configuration
instance.
then handle those configurations with a given function.
configuration_handler must be a function which accepts a
tuf.interposition.Configuration instance.
Returns the parsed configurations as a dictionary of configurations indexed
by hostnames."""
by hostnames.
"""
INVALID_TUF_CONFIGURATION = "Invalid configuration for {network_location}!"
INVALID_TUF_INTERPOSITION_JSON = "Invalid configuration in {filename}!"
@ -193,34 +246,34 @@ def __read_configuration(configuration_handler,
configurations = tuf_interpositions.get("configurations", {})
if len(configurations) == 0:
raise InvalidConfiguration(NO_CONFIGURATIONS.format(filename=filename))
raise tuf.InvalidConfigurationError(NO_CONFIGURATIONS.format(filename=filename))
else:
for network_location, configuration in configurations.iteritems():
for network_location, configuration in six.iteritems(configurations):
try:
configuration_parser = ConfigurationParser(network_location,
configuration, parent_repository_directory=parent_repository_directory,
parent_ssl_certificates_directory=parent_ssl_certificates_directory)
# configuration_parser.parse() returns a
# 'tuf.interposition.Configuration' object, which interposition
# uses to determine which URLs should be interposed.
configuration = configuration_parser.parse()
configuration_handler(configuration)
parsed_configurations[configuration.hostname] = configuration
except:
Logger.exception(INVALID_TUF_CONFIGURATION.format(network_location=network_location))
logger.exception(INVALID_TUF_CONFIGURATION.format(network_location=network_location))
raise
except:
Logger.exception(INVALID_TUF_INTERPOSITION_JSON.format(filename=filename))
logger.exception(INVALID_TUF_INTERPOSITION_JSON.format(filename=filename))
raise
else:
return parsed_configurations
# TODO: Is parent_repository_directory a security risk? For example, would it
# allow the user to overwrite another TUF repository metadata on the filesystem?
# On the other hand, it is beyond TUF's scope to handle filesystem permissions.
@ -230,7 +283,8 @@ def configure(filename="tuf.interposition.json",
parent_repository_directory=None,
parent_ssl_certificates_directory=None):
"""The optional parent_repository_directory parameter is used to specify the
"""
The optional parent_repository_directory parameter is used to specify the
containing parent directory of the "repository_directory" specified in a
configuration for *all* network locations, because sometimes the absolute
location of the "repository_directory" is only known at runtime. If you
@ -272,7 +326,8 @@ def configure(filename="tuf.interposition.json",
optional; it must specify certificates bundled as PEM (RFC 1422).
Returns the parsed configurations as a dictionary of configurations indexed
by hostnames."""
by hostnames.
"""
configurations = \
__read_configuration(__updater_controller.add, filename=filename,
@ -296,7 +351,7 @@ def refresh(configurations):
# Although interposition was designed to remain transparent, for software
# updaters that require an explicit refresh of top-level metadata, this
# method is provided.
for configuration in configurations.itervalues():
for configuration in six.itervalues(configurations):
__updater_controller.refresh(configuration)
@ -306,7 +361,7 @@ def refresh(configurations):
def deconfigure(configurations):
"""Remove TUF interposition for previously read configurations."""
for configuration in configurations.itervalues():
for configuration in six.itervalues(configurations):
__updater_controller.remove(configuration)
@ -314,8 +369,10 @@ def deconfigure(configurations):
def open_url(instancemethod):
"""Decorate an instance method of the form
instancemethod(self, url, ...) with me in order to pass it to TUF."""
"""
Decorate an instance method of the form
instancemethod(self, url, ...) with me in order to pass it to TUF.
"""
@functools.wraps(instancemethod)
def wrapper(self, *args, **kwargs):
@ -325,16 +382,18 @@ def wrapper(self, *args, **kwargs):
data = kwargs.get("data")
# If this is a urllib2.Request...
if isinstance(url_object, urllib2.Request):
if isinstance(url_object, six.moves.request.Request):
# If this is a GET HTTP method...
if url_object.get_method() == "GET":
# ...then you should check with TUF.
url = url_object.get_full_url()
else:
# ...otherwise, revert to default behaviour.
Logger.warn(NON_GET_HTTP_METHOD_MESSAGE.format(method=url_object.get_method(),
logger.warn(NON_GET_HTTP_METHOD_MESSAGE.format(method=url_object.get_method(),
url=url_object.get_full_url()))
return instancemethod(self, *args, **kwargs)
# ...otherwise, we assume this is a string.
else:
url = url_object
@ -345,6 +404,7 @@ def wrapper(self, *args, **kwargs):
if updater is None:
# ...then revert to default behaviour.
return instancemethod(self, *args, **kwargs)
else:
# ...otherwise, use TUF to get this document.
return updater.open(url, data=data)
@ -364,7 +424,3 @@ def wrapper(self, *args, **kwargs):
# Build and monkey patch public copies of the urllib and urllib2 modules.
__monkey_patch()

256
tuf/interposition/configuration.py Normal file → Executable file
View file

@ -1,38 +1,71 @@
"""
<Program Name>
configuration.py
<Author>
Trishank Kuppusamy
Pankhuri Goyal <pankhurigoyal02@gmail.com>
Vladimir Diaz <vladimir.v.diaz@gmail.com>
<Started>
<Copyright>
See LICENSE for licensing information.
<Purpose>
"""
# Help with Python 3 compatibility where the print statement is a function, an
# implicit relative import is invalid, and the '/' operator performs true
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import os.path
import types
import urlparse
# We import them directly into our namespace so that there is no name conflict.
from utility import Logger, InterpositionException
################################ GLOBAL CLASSES ################################
class InvalidConfiguration(InterpositionException):
"""User configuration is invalid."""
pass
import logging
import tuf.log
import tuf._vendor.six as six
logger = logging.getLogger('tuf.interposition.configuration')
class Configuration(object):
"""Holds TUF interposition configuration information about a network
location which is important to an updater for that network location."""
"""
<Purpose>
Holds TUF interposition configuration information about a network
location which is important to an updater for that network location.
"""
def __init__(self, hostname, port, repository_directory, repository_mirrors,
target_paths, ssl_certificates):
"""Constructor assumes that its parameters are valid."""
"""
<Purpose>
Constructor assumes that its parameters are valid.
<Arguments>
hostname:
port:
repository_directory:
repository_mirrors:
target_paths:
ssl_certificates:
<Exceptions>
<Side Effects>
<Returns>
"""
self.hostname = hostname
self.port = port
@ -50,8 +83,19 @@ def __repr__(self):
def get_repository_mirror_hostnames(self):
"""Get a set of hostnames of every repository mirror of this
configuration."""
"""
<Purpose>
Get a set of hostnames of every repository mirror of this configuration.
<Arguments>
None.
<Exceptions>
<Side Effects>
<Returns>
"""
# Parse TUF server repository mirrors.
repository_mirrors = self.repository_mirrors
@ -59,10 +103,14 @@ def get_repository_mirror_hostnames(self):
for repository_mirror in repository_mirrors:
mirror_configuration = repository_mirrors[repository_mirror]
url_prefix = mirror_configuration["url_prefix"]
parsed_url = urlparse.urlparse(url_prefix)
parsed_url = six.moves.urllib.parse.urlparse(url_prefix)
mirror_hostname = parsed_url.hostname
repository_mirror_hostnames.add(mirror_hostname)
mirror_port = parsed_url.port
mirror_network_location = \
"{hostname}:{port}".format(hostname=mirror_hostname, port = mirror_port)
repository_mirror_hostnames.add(mirror_network_location)
return repository_mirror_hostnames
@ -71,14 +119,36 @@ def get_repository_mirror_hostnames(self):
class ConfigurationParser(object):
"""Parses TUF interposition configuration information about a network
location, stored as a JSON object, and returns it as a Configuration."""
"""
<Purpose>
Parses TUF interposition configuration information about a network
location, stored as a JSON object, and returns it as a Configuration.
"""
def __init__(self, network_location, configuration,
parent_repository_directory=None,
parent_ssl_certificates_directory=None):
"""
<Purpose>
<Arguments>
network_location:
configuration:
parent_repository_directory:
parent_ssl_certificates_directory:
<Exceptions>
<Side Effects>
<Returns>
None.
"""
self.network_location = network_location
self.configuration = configuration
self.parent_repository_directory = parent_repository_directory
@ -86,7 +156,20 @@ def __init__(self, network_location, configuration,
def get_network_location(self):
"""Check network location."""
"""
<Purpose>
Check network location.
<Arguments>
None.
<Exceptions>
<Side Effects>
<Returns>
"""
INVALID_NETWORK_LOCATION = "Invalid network location {network_location}!"
@ -97,14 +180,27 @@ def get_network_location(self):
if len(network_location_tokens) > 1:
port = int(network_location_tokens[1], 10)
if port <= 0 or port >= 2**16:
raise InvalidConfiguration(INVALID_NETWORK_LOCATION.format(
raise tuf.InvalidConfigurationError(INVALID_NETWORK_LOCATION.format(
network_location=self.network_location))
return hostname, port
def get_repository_directory(self):
"""Locate TUF client metadata repository."""
"""
<Purpose>
Locate TUF client metadata repository.
<Arguments>
None.
<Exceptions>
<Side Effects>
<Returns>
"""
INVALID_PARENT_REPOSITORY_DIRECTORY = \
"Invalid parent_repository_directory for {network_location}!"
@ -121,14 +217,27 @@ def get_repository_directory(self):
# TODO: assert os.path.isdir(repository_directory)
else:
raise InvalidConfiguration(INVALID_PARENT_REPOSITORY_DIRECTORY.format(
raise tuf.InvalidConfigurationError(INVALID_PARENT_REPOSITORY_DIRECTORY.format(
network_location=self.network_location))
return repository_directory
def get_ssl_certificates(self):
"""Get any PEM certificate bundle."""
"""
<Purpose>
Get any PEM certificate bundle.
<Arguments>
None.
<Exceptions>
<Side Effects>
<Returns>
"""
INVALID_SSL_CERTIFICATES = \
"Invalid ssl_certificates for {network_location}!"
@ -147,11 +256,11 @@ def get_ssl_certificates(self):
ssl_certificates)
if not os.path.isfile(ssl_certificates):
raise InvalidConfiguration(INVALID_SSL_CERTIFICATES.format(
raise tuf.InvalidConfigurationError(INVALID_SSL_CERTIFICATES.format(
network_location=self.network_location))
else:
raise InvalidConfiguration(
raise tuf.InvalidConfigurationError(
INVALID_PARENT_SSL_CERTIFICATES_DIRECTORY.format(
network_location=self.network_location))
@ -159,7 +268,24 @@ def get_ssl_certificates(self):
def get_repository_mirrors(self, hostname, port, ssl_certificates):
"""Parse TUF server repository mirrors."""
"""
<Purpose>
Parse TUF server repository mirrors.
<Arguments>
hostname:
port:
ssl_certificates:
<Exceptions>
<Side Effects>
<Returns>
"""
INVALID_REPOSITORY_MIRROR = "Invalid repository mirror {repository_mirror}!"
@ -171,7 +297,7 @@ def get_repository_mirrors(self, hostname, port, ssl_certificates):
try:
url_prefix = mirror_configuration["url_prefix"]
parsed_url = urlparse.urlparse(url_prefix)
parsed_url = six.moves.urllib.parse.urlparse(url_prefix)
mirror_hostname = parsed_url.hostname
mirror_port = parsed_url.port or 80
mirror_scheme = parsed_url.scheme
@ -199,18 +325,29 @@ def get_repository_mirrors(self, hostname, port, ssl_certificates):
except:
error_message = \
INVALID_REPOSITORY_MIRROR.format(repository_mirror=repository_mirror)
Logger.exception(error_message)
raise InvalidConfiguration(error_message)
logger.exception(error_message)
raise tuf.InvalidConfigurationError(error_message)
return repository_mirrors
def get_target_paths(self):
"""
Within a network_location, we match URLs with this list of regular
expressions, which tell us to map from a source URL to a target URL.
If there are multiple regular expressions which match a source URL,
the order of appearance will be used to resolve ambiguity.
<Purpose>
Within a network_location, we match URLs with this list of regular
expressions, which tell us to map from a source URL to a target URL.
If there are multiple regular expressions which match a source URL,
the order of appearance will be used to resolve ambiguity.
<Arguments>
None.
<Exceptions>
<Side Effects>
<Returns>
"""
INVALID_TARGET_PATH = "Invalid target path in {network_location}!"
@ -221,27 +358,40 @@ def get_target_paths(self):
target_paths = self.configuration.get("target_paths", [WILD_TARGET_PATH])
# target_paths: [ target_path, ... ]
assert isinstance(target_paths, types.ListType)
assert isinstance(target_paths, list)
for target_path in target_paths:
try:
# target_path: { "regex_with_groups", "target_with_group_captures" }
# e.g. { ".*(/some/directory)/$", "{0}/index.html" }
assert isinstance(target_path, types.DictType)
assert isinstance(target_path, dict)
assert len(target_path) == 1
except:
error_message = \
INVALID_TARGET_PATH.format(network_location=self.network_location)
Logger.exception(error_message)
raise InvalidConfiguration(error_message)
logger.exception(error_message)
raise tuf.InvalidConfigurationError(error_message)
return target_paths
# TODO: more input sanity checks?
def parse(self):
"""Parse, check and get the required configuration parameters."""
"""
<Purpose>
Parse, check, and get the required configuration parameters.
<Arguments>
None.
<Exceptions>
<Side Effects>
<Returns>
"""
hostname, port = self.get_network_location()
ssl_certificates = self.get_ssl_certificates()
@ -252,5 +402,5 @@ def parse(self):
self.get_repository_mirrors(hostname, port, ssl_certificates)
# If everything passes, we return a Configuration.
return Configuration(hostname, port, repository_directory, repository_mirrors,
target_paths, ssl_certificates)
return Configuration(hostname, port, repository_directory,
repository_mirrors, target_paths, ssl_certificates)

978
tuf/interposition/updater.py Normal file → Executable file

File diff suppressed because it is too large Load diff

View file

@ -1,43 +0,0 @@
import logging
# Import our standard logger for its side effects.
import tuf.log
class InterpositionException(Exception):
"""Base exception class."""
pass
class Logger(object):
"""A static logging object for tuf.interposition."""
__logger = logging.getLogger("tuf.interposition")
@staticmethod
def debug(message):
Logger.__logger.debug(message)
@staticmethod
def exception(message):
Logger.__logger.exception(message)
@staticmethod
def info(message):
Logger.__logger.info(message)
@staticmethod
def warn(message):
Logger.__logger.warn(message)

View file

@ -90,9 +90,9 @@ def create_keydb_from_root_metadata(root_metadata):
# Clear the key database.
_keydb_dict.clear()
# Iterate through the keys found in 'root_metadata' by converting
# them to 'RSAKEY_SCHEMA' if their type is 'rsa', and then
# adding them the database. Duplicates are avoided.
# Iterate the keys found in 'root_metadata' by converting them to
# 'RSAKEY_SCHEMA' if their type is 'rsa', and then adding them to the
# database.
for keyid, key_metadata in six.iteritems(root_metadata['keys']):
if key_metadata['keytype'] in _SUPPORTED_KEY_TYPES:
# 'key_metadata' is stored in 'KEY_SCHEMA' format. Call
@ -102,7 +102,9 @@ def create_keydb_from_root_metadata(root_metadata):
try:
add_key(key_dict, keyid)
except tuf.KeyAlreadyExistsError as e:
# Although keyid duplicates should *not* occur (unique dict keys), log a
# warning and continue.
except tuf.KeyAlreadyExistsError as e: # pragma: no cover
logger.warning(e)
continue

View file

@ -844,6 +844,7 @@ def verify_signature(key_dict, signature, data):
valid_signature = tuf.ed25519_keys.verify_signature(public,
method, sig, data,
use_pynacl=True)
# Fall back to the optimized pure python implementation of ed25519.
else: # pragma: no cover
valid_signature = tuf.ed25519_keys.verify_signature(public,

View file

@ -304,13 +304,13 @@ def create_rsa_signature(private_key, data):
pkcs1_pss_signer = Crypto.Signature.PKCS1_PSS.new(rsa_key_object)
signature = pkcs1_pss_signer.sign(sha256_object)
except ValueError:
except ValueError: #pragma: no cover
raise tuf.CryptoError('The RSA key too small for given hash algorithm.')
except TypeError:
raise tuf.CryptoError('Missing required RSA private key.')
except IndexError:
except IndexError: # pragma: no cover
message = 'An RSA signature cannot be generated: ' + str(e)
raise tuf.CryptoError(message)
@ -585,8 +585,9 @@ def create_rsa_public_and_private_from_encrypted_pem(encrypted_pem, passphrase):
public = rsa_pubkey.exportKey(format='PEM')
# PyCrypto raises 'ValueError' if the public or private keys cannot be
# exported. See 'Crypto.PublicKey.RSA'.
except (ValueError):
# exported. See 'Crypto.PublicKey.RSA'. 'ValueError' should not be raised
# if the 'Crypto.PublicKey.RSA.importKey() call above passed.
except (ValueError): #pragma: no cover
message = 'The public and private keys cannot be exported in PEM format.'
raise tuf.CryptoError(message)

View file

@ -162,6 +162,8 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
_log_warning_if_expires_soon(TIMESTAMP_FILENAME, roleinfo['expires'],
TIMESTAMP_EXPIRES_WARN_SECONDS)
else:
raise tuf.Error('Invalid rolename')
signable = sign_metadata(metadata, roleinfo['signing_keyids'],
metadata_filename)
@ -387,7 +389,7 @@ def _remove_invalid_and_duplicate_signatures(signable):
# Although valid, it may still need removal if it is a duplicate. Check
# the keyid, rather than the signature, to remove duplicate PSS signatures.
# PSS may generate multiple different signatures for the same keyid.
# PSS may generate multiple different signatures for the same keyid.
else:
if keyid in signature_keyids:
signable['signatures'].remove(signature)
@ -1793,11 +1795,14 @@ def sign_metadata(metadata_object, keyids, filename):
for signature in signable['signatures']:
if not keyid == signature['keyid']:
signatures.append(signature)
else:
continue
signable['signatures'] = signatures
# Generate the signature using the appropriate signing method.
if key['keytype'] in SUPPORTED_KEY_TYPES:
if len(key['keyval']['private']):
if 'private' in key['keyval']:
signed = signable['signed']
signature = tuf.keys.create_signature(key, signed)
signable['signatures'].append(signature)

View file

@ -102,7 +102,7 @@
try:
tuf.keys.check_crypto_libraries(['rsa', 'ed25519', 'general'])
except tuf.UnsupportedLibraryError as e:
except tuf.UnsupportedLibraryError as e: #pragma: no cover
message = 'Warning: The repository and developer tools require additional' + \
' libraries and can be installed as follows:\n $ pip install tuf[tools]'
logger.warn(message)
@ -1795,7 +1795,7 @@ def add_targets(self, list_of_targets):
# times these checks are performed.
for target in list_of_targets:
filepath = os.path.abspath(target)
if not filepath.startswith(self._targets_directory+os.sep):
message = repr(filepath) + ' is not under the Repository\'s targets ' +\
'directory: ' + repr(self._targets_directory)
@ -1813,6 +1813,9 @@ def add_targets(self, list_of_targets):
for relative_target in relative_list_of_targets:
if relative_target not in roleinfo['paths']:
roleinfo['paths'].update({relative_target: {}})
else:
continue
tuf.roledb.update_roleinfo(self.rolename, roleinfo)

View file

@ -317,8 +317,12 @@ def decompress_temp_file_object(self, compression):
self._orig_file = self.temporary_file
try:
self.temporary_file = gzip.GzipFile(fileobj=self.temporary_file,
mode='rb')
gzip_file_object = gzip.GzipFile(fileobj=self.temporary_file, mode='rb')
uncompressed_content = gzip_file_object.read()
self.temporary_file = tempfile.NamedTemporaryFile()
self.temporary_file.write(uncompressed_content)
self.flush()
except Exception as exception:
raise tuf.DecompressionError(exception)
@ -661,12 +665,14 @@ def ensure_all_targets_allowed(rolename, list_of_targets, parent_delegations):
if allowed_child_path_hash_prefixes is not None:
consistent = paths_are_consistent_with_hash_prefixes
if len(actual_child_targets) > 0:
if not consistent(actual_child_targets,
allowed_child_path_hash_prefixes):
message = repr(rolename) + ' specifies a target that does not' + \
' have a path hash prefix listed in its parent role.'
raise tuf.ForbiddenTargetError(message)
# 'actual_child_tarets' (i.e., 'list_of_targets') should have lenth
# greater than zero due to the tuf.format check above.
if not consistent(actual_child_targets,
allowed_child_path_hash_prefixes):
message = repr(rolename) + ' specifies a target that does not' + \
' have a path hash prefix listed in its parent role.'
raise tuf.ForbiddenTargetError(message)
elif allowed_child_paths is not None:
# Check that each delegated target is either explicitly listed or a parent
@ -710,7 +716,7 @@ def ensure_all_targets_allowed(rolename, list_of_targets, parent_delegations):
def paths_are_consistent_with_hash_prefixes(paths, path_hash_prefixes):
"""
<Purpose>
Determine whether a list of paths are consistent with theirs alleged
Determine whether a list of paths are consistent with their alleged
path hash prefixes. By default, the SHA256 hash function is used.
<Arguments>
@ -743,21 +749,23 @@ def paths_are_consistent_with_hash_prefixes(paths, path_hash_prefixes):
# proven otherwise.
consistent = False
if len(paths) > 0 and len(path_hash_prefixes) > 0:
for path in paths:
path_hash = get_target_hash(path)
# Assume that every path is inconsistent until proven otherwise.
consistent = False
# The format checks above ensure the 'paths' and 'path_hash_prefix' lists
# have lengths greater than zero.
for path in paths:
path_hash = get_target_hash(path)
# Assume that every path is inconsistent until proven otherwise.
consistent = False
for path_hash_prefix in path_hash_prefixes:
if path_hash.startswith(path_hash_prefix):
consistent = True
break
# This path has no matching path_hash_prefix. Stop looking further.
if not consistent:
for path_hash_prefix in path_hash_prefixes:
if path_hash.startswith(path_hash_prefix):
consistent = True
break
# This path has no matching path_hash_prefix. Stop looking further.
if not consistent:
break
return consistent
@ -836,11 +844,16 @@ def import_json():
if _json_module is not None:
return _json_module
else:
try:
module = __import__('json')
except ImportError:
# The 'json' module is available in Python > 2.6, and thus this exception
# should not occur in all supported Python installations (> 2.6) of TUF.
except ImportError: #pragma: no cover
raise ImportError('Could not import the json module')
else:
_json_module = module
return module