diff --git a/README.md b/README.md index cc5b37c0..d4bb96fa 100644 --- a/README.md +++ b/README.md @@ -81,6 +81,33 @@ TUF specification document is also available: * [The Update Framework Specification](docs/tuf-spec.txt?raw=true) +##Installation +```Bash +pip - installing and managing Python packages (recommended): + +# Installing from Python Package Index (https://pypi.python.org/pypi). +$ pip install tuf + +# Installing from local source archive. +$ pip install + +# Or from the root directory of the unpacked archive. +$ pip install . +``` + +### Installing optional requirements (i.e., after installing tuf). +```Bash +# The optional `tuf[tools]` can be installed by users that wish to generate +# TUF repository files, such as metadata, cryptographic keys, and signatures. +# Whereas the basic install can only verify ed25519 signatures and is intended +# for sofware updater clients, `tuf[tools]` provides repository maintainers +# secure ed25519 key and signature generation with PyNaCl / libsodium. + +# The TUF tools also enable general-purpose cryptography with PyCrypto. Software +# updaters that want to support verification of RSASSA-PSS signatures must require +# their clients to install `tuf[tools]`. +$ pip install tuf[tools] +``` ##Using TUF diff --git a/tests/test_formats.py b/tests/test_formats.py index 5b2234d4..af3900bd 100755 --- a/tests/test_formats.py +++ b/tests/test_formats.py @@ -437,8 +437,23 @@ def test_TargetsFile(self): self.assertTrue(TARGETS_SCHEMA.matches(make_metadata(version, expires, filedict, delegations))) + self.assertTrue(TARGETS_SCHEMA.matches(make_metadata(version, expires, filedict))) + metadata = make_metadata(version, expires, filedict, delegations) self.assertTrue(isinstance(from_metadata(metadata), tuf.formats.TargetsFile)) + + # Test conditions for different combination of required arguments (i.e., + # a filedict or delegations argument is required.) + metadata = make_metadata(version, expires, filedict) + self.assertTrue(isinstance(from_metadata(metadata), tuf.formats.TargetsFile)) + + metadata = make_metadata(version, expires, delegations=delegations) + self.assertTrue(isinstance(from_metadata(metadata), tuf.formats.TargetsFile)) + + # Directly instantiating a TargetsFile object. + tuf.formats.TargetsFile(version, expires) + tuf.formats.TargetsFile(version, expires, filedict) + tuf.formats.TargetsFile(version, expires, delegations=delegations) # Test conditions for invalid arguments. bad_version = 'eight' @@ -456,6 +471,21 @@ def test_TargetsFile(self): self.assertRaises(tuf.Error, make_metadata, version, expires) self.assertRaises(tuf.FormatError, from_metadata, 123) + + + + def test_MirrorsFile(self): + # Test normal case. + version = 8 + expires = '1985-10-21T13:20:00Z' + + mirrors_file = tuf.formats.MirrorsFile(version, expires) + + make_metadata = tuf.formats.MirrorsFile.make_metadata + from_metadata = tuf.formats.MirrorsFile.from_metadata + + self.assertRaises(NotImplementedError, make_metadata) + self.assertRaises(NotImplementedError, from_metadata, mirrors_file) @@ -505,9 +535,10 @@ def test_parse_base64(self): self.assertTrue(isinstance(tuf.formats.parse_base64(base64), six.binary_type)) # Test conditions for invalid arguments. - self.assertRaises(tuf.FormatError, tuf.formats.format_base64, 123) - self.assertRaises(tuf.FormatError, tuf.formats.format_base64, True) - self.assertRaises(tuf.FormatError, tuf.formats.format_base64, ['123']) + self.assertRaises(tuf.FormatError, tuf.formats.parse_base64, 123) + self.assertRaises(tuf.FormatError, tuf.formats.parse_base64, True) + self.assertRaises(tuf.FormatError, tuf.formats.parse_base64, ['123']) + self.assertRaises(tuf.FormatError, tuf.formats.parse_base64, '/') @@ -566,6 +597,7 @@ def test_make_role_metadata(self): keyids = ['123abc', 'abc123'] threshold = 2 paths = ['path1/', 'path2'] + path_hash_prefixes = ['000', '003'] name = '123' ROLE_SCHEMA = tuf.formats.ROLE_SCHEMA @@ -575,7 +607,9 @@ def test_make_role_metadata(self): self.assertTrue(ROLE_SCHEMA.matches(make_role(keyids, threshold, name=name))) self.assertTrue(ROLE_SCHEMA.matches(make_role(keyids, threshold, paths=paths))) self.assertTrue(ROLE_SCHEMA.matches(make_role(keyids, threshold, name=name, paths=paths))) - + self.assertTrue(ROLE_SCHEMA.matches(make_role(keyids, threshold, name=name, + path_hash_prefixes=path_hash_prefixes))) + # Test conditions for invalid arguments. bad_keyids = 'bad' bad_threshold = 'bad' @@ -597,8 +631,9 @@ def test_make_role_metadata(self): self.assertRaises(tuf.FormatError, make_role, keyids, bad_threshold, name=name, paths=paths) self.assertRaises(tuf.FormatError, make_role, keyids, threshold, name=bad_name, paths=paths) self.assertRaises(tuf.FormatError, make_role, keyids, threshold, name=name, paths=bad_paths) - - + + # 'paths' and 'path_hash_prefixes' cannot both be specified. + self.assertRaises(tuf.FormatError, make_role, keyids, threshold, name, paths, path_hash_prefixes) def test_get_role_class(self): # Test conditions for valid arguments. @@ -688,8 +723,10 @@ def test_encode_canonical(self): self.assertEqual('[]', encode([])) self.assertEqual('{"A":[99]}', encode({"A": [99]})) self.assertEqual('{"x":3,"y":2}', encode({"x": 3, "y": 2})) + + self.assertEqual('{"x":3,"y":null}', encode({"x": 3, "y": None})) - # Condition where 'encode()' sends the result the callable + # Condition where 'encode()' sends the result to the callable # 'output'. self.assertEqual(None, encode([1, 2, 3], output)) self.assertEqual('[1,2,3]', ''.join(result)) @@ -700,6 +737,7 @@ def test_encode_canonical(self): self.assertRaises(tuf.FormatError, encode, {"x": 8.0}) self.assertRaises(tuf.FormatError, encode, 8.0, output) + self.assertRaises(tuf.FormatError, encode, {"x": tuf.FormatError}) # Run unit test. diff --git a/tests/test_hash.py b/tests/test_hash.py index 67607991..33efd781 100755 --- a/tests/test_hash.py +++ b/tests/test_hash.py @@ -13,7 +13,7 @@ See LICENSE for licensing information. - Unit test for hash.py. + Unit test for 'hash.py'. """ # Help with Python 3 compatibility, where the print statement is a function, an diff --git a/tests/test_keydb.py b/tests/test_keydb.py index cb031dbb..1ad0b631 100755 --- a/tests/test_keydb.py +++ b/tests/test_keydb.py @@ -160,6 +160,9 @@ def test_remove_key(self): # Test for 'keyid' not in keydb. self.assertRaises(tuf.UnknownKeyError, tuf.keydb.remove_key, keyid) + + # Test condition for unknown key argument. + self.assertRaises(tuf.UnknownKeyError, tuf.keydb.remove_key, '1') # Test conditions for arguments with invalid formats. self.assertRaises(tuf.FormatError, tuf.keydb.remove_key, None) @@ -185,14 +188,17 @@ def test_create_keydb_from_root_metadata(self): 'Targets': {'keyids': [keyid2], 'threshold': 1}} version = 8 consistent_snapshot = False - expires = '1985-10-21T01:21:00Z' - + expires = '1985-10-21T01:21:00Z' + + tuf.keydb.add_key(rsakey) root_metadata = tuf.formats.RootFile.make_metadata(version, expires, keydict, roledict, consistent_snapshot) self.assertEqual(None, tuf.keydb.create_keydb_from_root_metadata(root_metadata)) + tuf.keydb.create_keydb_from_root_metadata(root_metadata) + # Ensure 'keyid' and 'keyid2' were added to the keydb database. self.assertEqual(rsakey, tuf.keydb.get_key(keyid)) self.assertEqual(rsakey2, tuf.keydb.get_key(keyid2)) diff --git a/tests/test_log.py b/tests/test_log.py new file mode 100755 index 00000000..d2854440 --- /dev/null +++ b/tests/test_log.py @@ -0,0 +1,76 @@ + +""" + + test_log.py + + + Vladimir Diaz + + + May 1, 2014. + + + See LICENSE for licensing information. + + + Unit test for 'log.py'. +""" + +import logging +import unittest + +import tuf +import tuf.log + +logger = logging.getLogger('tuf.test_log') + +log_levels = [logging.CRITICAL, logging.ERROR, logging.WARNING, + logging.INFO, logging.DEBUG] + + +class TestLog(unittest.TestCase): + + + + + def test_set_log_level(self): + # Test normal case. + global log_levels + + tuf.log.set_log_level() + self.assertTrue(logger.isEnabledFor(logging.DEBUG)) + + for level in log_levels: + tuf.log.set_log_level(level) + self.assertTrue(logger.isEnabledFor(level)) + + # Test for improperly formatted argument. + self.assertRaises(tuf.FormatError, tuf.log.set_log_level, '123') + + # Test for invalid argument. + self.assertRaises(tuf.FormatError, tuf.log.set_log_level, 51) + + + + def test_set_filehandler_log_level(self): + pass + + + def test_set_console_log_level(self): + pass + + + + def test_add_console_handler(self): + pass + + + + def test_remove_console_handler(self): + pass + + + +# Run unit test. +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_repository_tool.py b/tests/test_repository_tool.py index 991a09c1..799f4238 100755 --- a/tests/test_repository_tool.py +++ b/tests/test_repository_tool.py @@ -264,6 +264,7 @@ def test_write_and_write_partial(self): repository.write(consistent_snapshot=True) repo_tool.load_repository(repository_directory) + # Test # Test improperly formatted arguments. self.assertRaises(tuf.FormatError, repository.write, 3, False) @@ -274,14 +275,21 @@ def test_write_and_write_partial(self): def test_get_filepaths_in_directory(self): # Test normal case. # Use the pre-generated metadata directory for testing. - metadata_directory = os.path.join('repository_data', - 'repository', 'metadata') - - - # Test improperly formatted arguments. # Set 'repo' reference to improve readability. repo = repo_tool.Repository + metadata_directory = os.path.join('repository_data', + 'repository', 'metadata') + # Verify the expected filenames. get_filepaths_in_directory() returns + # a list of absolute paths. + metadata_files = repo.get_filepaths_in_directory(metadata_directory) + expected_files = ['root.json', 'targets.json', 'targets.json.gz', + 'snapshot.json', 'timestamp.json'] + basenames = [] + for filepath in metadata_files: + basenames.append(os.path.basename(filepath)) + self.assertEqual(sorted(expected_files), sorted(basenames)) + # Test improperly formatted arguments. self.assertRaises(tuf.FormatError, repo.get_filepaths_in_directory, 3, recursive_walk=False, followlinks=False) self.assertRaises(tuf.FormatError, repo.get_filepaths_in_directory, @@ -290,6 +298,9 @@ def test_get_filepaths_in_directory(self): metadata_directory, recursive_walk=False, followlinks=3) # Test invalid directory argument. + # A non-directory. + self.assertRaises(tuf.Error, repo.get_filepaths_in_directory, + os.path.join(metadata_directory, 'root.json')) temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) nonexistent_directory = os.path.join(temporary_directory, 'nonexistent/') self.assertRaises(tuf.Error, repo.get_filepaths_in_directory, @@ -375,6 +386,17 @@ def test_expiration(self): expiration = self.metadata.expiration self.assertTrue(isinstance(expiration, datetime.datetime)) + # test a setter with microseconds, we are forcing the microseconds value + expiration = datetime.datetime.today() + datetime.timedelta(weeks = 1) + # we force the microseconds value if we are unlucky enough to get a 0 + if expiration.microsecond == 0: + expiration = expiration.replace(microsecond = 1) + + new_expiration = self.metadata.expiration + self.assertTrue(isinstance(new_expiration, datetime.datetime)) + + # check that the expiration value is truncated + self.assertTrue(new_expiration.microsecond == 0) # Test improperly formatted datetime. try: @@ -763,6 +785,29 @@ def test_init(self): self.assertRaises(tuf.FormatError, repo_tool.Targets, 'targets_directory/', 3) self.assertRaises(tuf.FormatError, repo_tool.Targets, 'targets_directory/', 'targets', 3) + + + + def test_call(self): + # Test normal case. + # Perform a delegation so that a delegated role can be accessed and tested + # through __call__(). Example: {targets_object}('role1'). + keystore_directory = os.path.join('repository_data', 'keystore') + public_keypath = os.path.join(keystore_directory, 'root_key.pub') + public_key = repo_tool.import_rsa_publickey_from_file(public_keypath) + target1_filepath = os.path.join(self.targets_directory, 'file1.txt') + + # Create Targets() object to be tested. + targets_object = repo_tool.Targets(self.targets_directory) + targets_object.delegate('role1', [public_key], [target1_filepath]) + + self.assertTrue(isinstance(targets_object('role1'), repo_tool.Targets)) + + # Test invalid (i.e., non-delegated) rolename argument. + self.assertRaises(tuf.UnknownRoleError, targets_object, 'unknown_role') + + # Test improperly formatted argument. + self.assertRaises(tuf.FormatError, targets_object, 1) @@ -852,8 +897,16 @@ def test_add_target(self): # Test invalid filepath argument (i.e., non-existent or invalid file.) self.assertRaises(tuf.Error, self.targets_object.add_target, 'non-existent.txt') + + # Not under the repository's targets directory. self.assertRaises(tuf.Error, self.targets_object.add_target, self.temporary_directory) + + # Not a file (i.e., a valid path, but a directory.) + test_directory = os.path.join(self.targets_directory, 'test_directory') + os.mkdir(test_directory) + self.assertRaises(tuf.Error, self.targets_object.add_target, + test_directory) @@ -1047,6 +1100,64 @@ def test_delegate_hashed_bins(self): + def test_add_target_to_bin(self): + # Test normal case. + # Delegate the hashed bins so that add_target_to_bin() can be tested. + keystore_directory = os.path.join('repository_data', 'keystore') + public_keypath = os.path.join(keystore_directory, 'targets_key.pub') + public_key = repo_tool.import_rsa_publickey_from_file(public_keypath) + target1_filepath = os.path.join(self.targets_directory, 'file1.txt') + + # Set needed arguments by delegate_hashed_bins(). + public_keys = [public_key] + list_of_targets = [target1_filepath] + + # Delegate to hashed bins. The target filepath to be tested is expected + # to contain a hash prefix of 'e', so it should be added to the + # 'targets/e' role. + self.targets_object.delegate_hashed_bins(list_of_targets, public_keys, + number_of_bins=16) + + # Ensure each hashed bin initially contains zero targets. + for delegation in self.targets_object.delegations: + self.assertTrue(target1_filepath not in delegation.target_files) + + # Add 'target1_filepath' and verify that the relative path of + # 'target1_filepath' is added to the correct bin. + self.targets_object.add_target_to_bin(target1_filepath) + for delegation in self.targets_object.delegations: + if delegation.rolename == 'targets/e': + self.assertTrue('/file1.txt' in delegation.target_files) + + else: + self.assertFalse('/file1.txt' in delegation.target_files) + + # Test for non-existent delegations and hashed bins. + empty_targets_role = repo_tool.Targets(self.targets_directory, 'empty') + + self.assertRaises(tuf.Error, empty_targets_role.add_target_to_bin, + target1_filepath) + + # Non-bin delegation, although it has a correct hashed bin name. + empty_targets_role.delegate('e', [public_key], [target1_filepath]) + self.assertRaises(tuf.Error, empty_targets_role.add_target_to_bin, + target1_filepath) + + # Test for a required hashed bin that does not exist. + self.targets_object.revoke('e') + self.assertRaises(tuf.Error, self.targets_object.add_target_to_bin, + target1_filepath) + + # Test improperly formatted argument. + self.assertRaises(tuf.FormatError, + self.targets_object.add_target_to_bin, 3) + + # Invalid target file path argument. + self.assertRaises(tuf.Error, + self.targets_object.add_target_to_bin, '/non-existent') + + + def test_add_restricted_paths(self): # Test normal case. # Perform a delegation so that add_restricted_paths() has a child role @@ -1064,6 +1175,10 @@ def test_add_restricted_paths(self): threshold, restricted_paths=None, path_hash_prefixes=None) + # Delegate an extra role for test coverage (i.e., check that restricted + # paths are not added to a child role not requested.) + self.targets_object.delegate('junk_role', public_keys, []) + restricted_path = os.path.join(self.targets_directory, 'tuf_files') os.mkdir(restricted_path) restricted_paths = [restricted_path] @@ -1094,6 +1209,11 @@ def test_add_restricted_paths(self): # Non-existent 'restricted_paths'. self.assertRaises(tuf.Error, self.targets_object.add_restricted_paths, ['/non-existent'], 'tuf') + + # Directory not under the repository's targets directory. + repository_directory = os.path.join('repository_data', 'repository') + self.assertRaises(tuf.Error, self.targets_object.add_restricted_paths, + [repository_directory], 'tuf') diff --git a/tests/test_sig.py b/tests/test_sig.py index cb99bc4b..e618fa92 100755 --- a/tests/test_sig.py +++ b/tests/test_sig.py @@ -55,6 +55,15 @@ def tearDown(self): def test_get_signature_status_no_role(self): signable = {'signed' : 'test', 'signatures' : []} + + # A valid, but empty signature status + sig_status = tuf.sig.get_signature_status(signable) + self.assertTrue(tuf.formats.SIGNATURESTATUS_SCHEMA.matches(sig_status)) + + # A valid signable, but non-existent role argument. + self.assertRaises(tuf.UnknownRoleError, tuf.sig.get_signature_status, + signable, 'unknown_role') + # Should verify we are not adding a duplicate signature # when doing the following action. Here we know 'signable' # has only one signature so it's okay. @@ -66,6 +75,10 @@ def test_get_signature_status_no_role(self): # No specific role we're considering. sig_status = tuf.sig.get_signature_status(signable, None) + # Non-existent role. + self.assertRaises(tuf.UnknownRoleError, tuf.sig.get_signature_status, + signable, 'unknown_role') + self.assertEqual(0, sig_status['threshold']) self.assertEqual([KEYS[0]['keyid']], sig_status['good_sigs']) self.assertEqual([], sig_status['bad_sigs']) @@ -353,6 +366,9 @@ def test_generate_rsa_signature(self): self.assertEqual(1, len(signable['signatures'])) signature = signable['signatures'][0] self.assertEqual(KEYS[0]['keyid'], signature['keyid']) + + returned_signature = tuf.sig.generate_rsa_signature(signable['signed'], KEYS[0]) + self.assertTrue(tuf.formats.SIGNATURE_SCHEMA.matches(returned_signature)) signable['signatures'].append(tuf.keys.create_signature( KEYS[1], signable['signed'])) @@ -361,6 +377,7 @@ def test_generate_rsa_signature(self): signature = signable['signatures'][1] self.assertEqual(KEYS[1]['keyid'], signature['keyid']) + def test_may_need_new_keys(self): # One untrusted key in 'signable'. diff --git a/tox.ini b/tox.ini index 2875a56e..9a3935f8 100644 --- a/tox.ini +++ b/tox.ini @@ -12,7 +12,7 @@ changedir = tests commands = coverage run --source tuf aggregate_tests.py - coverage report -m + coverage report -m deps = coverage diff --git a/tuf/README.md b/tuf/README.md index 404438e5..df04afe3 100644 --- a/tuf/README.md +++ b/tuf/README.md @@ -43,10 +43,15 @@ Type "help", "copyright", "credits" or "license" for more information. >>> from tuf.repository_tool import * >>> repository = load_repository("path/to/repository") ``` -Note that *tuf.repository_tool.py* is not used in TUF integrations. The +Note that **tuf.repository_tool.py** is not used in TUF integrations. The **tuf.interposition** package and **tuf.client.updater** module assist in integrating TUF with a software updater. +The repository tool requires additional cryptographic libraries and may be +installed as follows: +```Bash +$ pip install tuf[tools] +``` ### Keys ### diff --git a/tuf/formats.py b/tuf/formats.py index 9aa9a97a..bec3477b 100755 --- a/tuf/formats.py +++ b/tuf/formats.py @@ -235,6 +235,11 @@ # An ED25519 raw signature, which must be 64 bytes. ED25519SIGNATURE_SCHEMA = SCHEMA.LengthBytes(64) +# Required installation libraries expected by the repository tools and other +# cryptography modules. +REQUIRED_LIBRARIES_SCHEMA = SCHEMA.ListOf(SCHEMA.OneOf( + [SCHEMA.String('general'), SCHEMA.String('ed25519'), SCHEMA.String('rsa')])) + # An ed25519 TUF key. ED25519KEY_SCHEMA = SCHEMA.Object( object_name = 'ED25519KEY_SCHEMA', @@ -665,6 +670,7 @@ def make_metadata(version, expiration_date, filedict=None, delegations=None): result = {'_type' : 'Targets'} result['version'] = version result['expires'] = expiration_date + result['targets'] = {} if filedict is not None: result['targets'] = filedict if delegations is not None: @@ -1297,7 +1303,7 @@ def encode_canonical(object, output_function=None): try: _encode_canonical(object, output_function) - except TypeError as e: + except (TypeError, tuf.FormatError) as e: message = 'Could not encode '+repr(object)+': '+str(e) raise tuf.FormatError(message) diff --git a/tuf/keydb.py b/tuf/keydb.py index d2e351ac..dbb34670 100755 --- a/tuf/keydb.py +++ b/tuf/keydb.py @@ -102,14 +102,14 @@ def create_keydb_from_root_metadata(root_metadata): try: add_key(key_dict, keyid) + except tuf.KeyAlreadyExistsError as e: + logger.warning(e) + continue + # 'tuf.Error' raised if keyid does not match the keyid for 'rsakey_dict'. except tuf.Error as e: logger.error(e) continue - - except tuf.KeyAlreadyExistsError as e: - logger.warning(e) - continue else: logger.warning('Root Metadata file contains a key with an invalid keytype.') diff --git a/tuf/keys.py b/tuf/keys.py index 6b33853c..cd70c494 100755 --- a/tuf/keys.py +++ b/tuf/keys.py @@ -205,9 +205,8 @@ def generate_rsa_key(bits=_DEFAULT_RSA_KEY_BITS): tuf.formats.RSAKEYBITS_SCHEMA.check_match(bits) # Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified in - # 'tuf.conf', are unsupported or unavailable: - # 'tuf.conf.RSA_CRYPTO_LIBRARY' and 'tuf.conf.ED25519_CRYPTO_LIBRARY'. - _check_crypto_libraries() + # 'tuf.conf', are unsupported or unavailable: 'tuf.conf.RSA_CRYPTO_LIBRARY'. + check_crypto_libraries(['rsa']) # Begin building the RSA key dictionary. rsakey_dict = {} @@ -289,8 +288,8 @@ def generate_ed25519_key(): # Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified # in 'tuf.conf', are unsupported or unavailable: - # 'tuf.conf.RSA_CRYPTO_LIBRARY' and 'tuf.conf.ED25519_CRYPTO_LIBRARY'. - _check_crypto_libraries() + # 'tuf.conf.ED25519_CRYPTO_LIBRARY'. + check_crypto_libraries(['ed25519']) # Begin building the ED25519 key dictionary. ed25519_key = {} @@ -511,46 +510,78 @@ def _get_keyid(keytype, key_value): -def _check_crypto_libraries(): - """ Ensure all the crypto libraries specified in tuf.conf are available. """ +def check_crypto_libraries(required_libraries): + """ + + Public function that ensures the cryptography libraries specified in + 'tuf.conf' are supported and available for each 'required_libraries'. + + + required_libraries: + A list of library strings to validate. One, or multiple, strings from + ['rsa', 'ed25519', 'general'] can be specified. + + + tuf.UnsupportedLibraryError, if the 'required_libraries' and the libraries + specified in 'tuf.conf' are not supported or unavailable. + + + Validates the libraries set in 'tuf.conf'. + + + None. + """ + + # Does 'required_libraries' have the correct format? + # This check will ensure 'required_libraries' has the appropriate number + # of objects and object types, and that all dict keys are properly named. + # Raise 'tuf.FormatError' if the check fails. + tuf.formats.REQUIRED_LIBRARIES_SCHEMA.check_match(required_libraries) - # The checks below all raise 'tuf.UnsupportedLibraryError' if the RSA and - # ED25519 crypto libraries specified in 'tuf.conf.py' are not supported or - # unavailable. The appropriate error message is added to the exception. - # The funcions of this module that depend on user-installed crypto libraries - # should call this private function to ensure the called routine does not fail - # with unpredictable exceptions in the event of a missing library. - # The supported and available lists checked are populated when 'tuf.keys.py' - # is imported. - if _RSA_CRYPTO_LIBRARY not in _SUPPORTED_RSA_CRYPTO_LIBRARIES: + # The checks below all raise 'tuf.UnsupportedLibraryError' if the general, + # RSA, and ED25519 crypto libraries specified in 'tuf.conf.py' are not + # supported or unavailable. The appropriate error message is added to the + # exception. The funcions of this module that depend on user-installed + # crypto libraries should call this private function to ensure the called + # routine does not fail with unpredictable exceptions in the event of a + # missing library. The supported and available lists checked are populated + # when 'tuf.keys.py' is imported. + + if 'rsa' in required_libraries and _RSA_CRYPTO_LIBRARY not in \ + _SUPPORTED_RSA_CRYPTO_LIBRARIES: message = 'The '+repr(_RSA_CRYPTO_LIBRARY)+' crypto library specified'+ \ ' in "tuf.conf.RSA_CRYPTO_LIBRARY" is not supported.\n'+ \ 'Supported crypto libraries: '+repr(_SUPPORTED_RSA_CRYPTO_LIBRARIES)+'.' raise tuf.UnsupportedLibraryError(message) - if _ED25519_CRYPTO_LIBRARY not in _SUPPORTED_ED25519_CRYPTO_LIBRARIES: + if 'ed25519' in required_libraries and _ED25519_CRYPTO_LIBRARY not in \ + _SUPPORTED_ED25519_CRYPTO_LIBRARIES: message = 'The '+repr(_ED25519_CRYPTO_LIBRARY)+' crypto library specified'+\ ' in "tuf.conf.ED25519_CRYPTO_LIBRARY" is not supported.\n'+ \ 'Supported crypto libraries: '+repr(_SUPPORTED_ED25519_CRYPTO_LIBRARIES)+'.' raise tuf.UnsupportedLibraryError(message) - if _GENERAL_CRYPTO_LIBRARY not in _SUPPORTED_GENERAL_CRYPTO_LIBRARIES: + if 'general' in required_libraries and _GENERAL_CRYPTO_LIBRARY not in \ + _SUPPORTED_GENERAL_CRYPTO_LIBRARIES: message = 'The '+repr(_GENERAL_CRYPTO_LIBRARY)+' crypto library specified'+\ ' in "tuf.conf.GENERAL_CRYPTO_LIBRARY" is not supported.\n'+ \ 'Supported crypto libraries: '+repr(_SUPPORTED_GENERAL_CRYPTO_LIBRARIES)+'.' raise tuf.UnsupportedLibraryError(message) - if _RSA_CRYPTO_LIBRARY not in _available_crypto_libraries: + if 'rsa' in required_libraries and _RSA_CRYPTO_LIBRARY not in \ + _available_crypto_libraries: message = 'The '+repr(_RSA_CRYPTO_LIBRARY)+' crypto library specified'+ \ ' in "tuf.conf.RSA_CRYPTO_LIBRARY" could not be imported.' raise tuf.UnsupportedLibraryError(message) - if _ED25519_CRYPTO_LIBRARY not in _available_crypto_libraries: + if 'ed25519' in required_libraries and _ED25519_CRYPTO_LIBRARY not in \ + _available_crypto_libraries: message = 'The '+repr(_ED25519_CRYPTO_LIBRARY)+' crypto library specified'+\ ' in "tuf.conf.ED25519_CRYPTO_LIBRARY" could not be imported.' raise tuf.UnsupportedLibraryError(message) - if _GENERAL_CRYPTO_LIBRARY not in _available_crypto_libraries: + if 'general' in required_libraries and _GENERAL_CRYPTO_LIBRARY not in \ + _available_crypto_libraries: message = 'The '+repr(_GENERAL_CRYPTO_LIBRARY)+' crypto library specified'+\ ' in "tuf.conf.GENERAL_CRYPTO_LIBRARY" could not be imported.' raise tuf.UnsupportedLibraryError(message) @@ -636,8 +667,8 @@ def create_signature(key_dict, data): # Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified # in 'tuf.conf', are unsupported or unavailable: - # 'tuf.conf.RSA_CRYPTO_LIBRARY' and 'tuf.conf.ED25519_CRYPTO_LIBRARY'. - _check_crypto_libraries() + # 'tuf.conf.RSA_CRYPTO_LIBRARY' or 'tuf.conf.ED25519_CRYPTO_LIBRARY'. + check_crypto_libraries([key_dict['keytype']]) # Signing the 'data' object requires a private key. # The 'RSASSA-PSS' (i.e., PyCrypto module) and 'ed25519' (i.e., PyNaCl and the @@ -893,8 +924,8 @@ def import_rsakey_from_encrypted_pem(encrypted_pem, password): # Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified in # 'tuf.conf', are unsupported or unavailable: - # 'tuf.conf.RSA_CRYPTO_LIBRARY' and 'tuf.conf.ED25519_CRYPTO_LIBRARY'. - _check_crypto_libraries() + # 'tuf.conf.RSA_CRYPTO_LIBRARY' and 'tuf.conf.GENERAL_CRYPTO_LIBRARY'. + check_crypto_libraries(['rsa', 'general']) # Begin building the RSA key dictionary. rsakey_dict = {} @@ -1076,7 +1107,7 @@ def encrypt_key(key_object, password): # Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified in # 'tuf.conf', are unsupported or unavailable: # 'tuf.conf.GENERAL_CRYPTO_LIBRARY'. - _check_crypto_libraries() + check_crypto_libraries(['general']) # Encrypted string of 'key_object'. The encrypted string may be safely saved # to a file and stored offline. @@ -1172,7 +1203,7 @@ def decrypt_key(encrypted_key, passphrase): # Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified in # 'tuf.conf', are unsupported or unavailable: # 'tuf.conf.GENERAL_CRYPTO_LIBRARY'. - _check_crypto_libraries() + check_crypto_libraries(['general']) # Store and return the decrypted key object. key_object = None @@ -1255,8 +1286,8 @@ def create_rsa_encrypted_pem(private_key, passphrase): # Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified in # 'tuf.conf', are unsupported or unavailable: - # 'tuf.conf.RSA_CRYPTO_LIBRARY'. - _check_crypto_libraries() + # 'tuf.conf.RSA_CRYPTO_LIBRARY' and 'tuf.conf.GENERAL_CRYPTO_LIBRARY'. + check_crypto_libraries(['rsa', 'general']) encrypted_pem = None diff --git a/tuf/log.py b/tuf/log.py index eec18bce..92281f31 100755 --- a/tuf/log.py +++ b/tuf/log.py @@ -175,7 +175,8 @@ def filter(self, record): def set_log_level(log_level=_DEFAULT_LOG_LEVEL): """ - Allow the default log level to be overridden. + Allow the default log level to be overridden. If 'log_level' is not + provided, log level defaults to 'logging.DEBUG'. log_level: @@ -205,7 +206,8 @@ def set_log_level(log_level=_DEFAULT_LOG_LEVEL): def set_filehandler_log_level(log_level=_DEFAULT_FILE_LOG_LEVEL): """ - Allow the default file handler log level to be overridden. + Allow the default file handler log level to be overridden. If 'log_level' + is not provided, log level defaults to 'logging.DEBUG'. log_level: @@ -235,7 +237,8 @@ def set_filehandler_log_level(log_level=_DEFAULT_FILE_LOG_LEVEL): def set_console_log_level(log_level=_DEFAULT_CONSOLE_LOG_LEVEL): """ - Allow the default log level for console messages to be overridden. + Allow the default log level for console messages to be overridden. If + 'log_level' is not provided, log level defaults to 'logging.INFO'. log_level: @@ -262,6 +265,7 @@ def set_console_log_level(log_level=_DEFAULT_CONSOLE_LOG_LEVEL): if console_handler is not None: console_handler.setLevel(log_level) + else: message = 'The console handler has not been set with add_console_handler().' raise tuf.Error(message) @@ -302,14 +306,18 @@ def add_console_handler(log_level=_DEFAULT_CONSOLE_LOG_LEVEL): # Set the console handler for the logger. The built-in console handler will # log messages to 'sys.stderr' and capture 'log_level' messages. console_handler = logging.StreamHandler() + # Get our filter for the console handler. console_filter = ConsoleFilter() + console_format_string = '%(message)s' + formatter = logging.Formatter(console_format_string) console_handler.setLevel(log_level) console_handler.setFormatter(formatter) console_handler.addFilter(console_filter) logger.addHandler(console_handler) logger.debug('Added a console handler.') + else: logger.warning('We already have a console handler.') @@ -343,5 +351,6 @@ def remove_console_handler(): logger.removeHandler(console_handler) console_handler = None logger.debug('Removed a console handler.') + else: logger.warning('We do not have a console handler.') diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index 94499d80..ee14bd5e 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -119,6 +119,15 @@ TIMESTAMP_EXPIRES_WARN_SECONDS = 86400 +try: + tuf.keys.check_crypto_libraries(['rsa', 'ed25519', 'general']) + +except tuf.UnsupportedLibraryError as e: + message = 'Warning: The repository and developer tools require additional' + \ + ' libraries and can be installed as follows:\n $ pip install tuf[tools]' + logger.warn(message) + + class Repository(object): """ @@ -231,8 +240,8 @@ def write(self, write_partial=False, consistent_snapshot=False): tuf.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot) # At this point the tuf.keydb and tuf.roledb stores must be fully - # populated, otherwise write() throwns a 'tuf.Repository' exception if - # any of the top-level roles are missing signatures, keys, etc. + # populated, otherwise write() throwns a 'tuf.UnsignedMetadataError' + # exception if any of the top-level roles are missing signatures, keys, etc. # Write the metadata files of all the delegated roles. Ensure target paths # are allowed, metadata is valid and properly signed, and required files and @@ -432,23 +441,23 @@ def status(self): except tuf.UnsignedMetadataError as e: insufficient_signatures.append(delegated_role) - # Print the verification results of the delegated roles and return + # Log the verification results of the delegated roles and return # immediately after each invalid case. if len(insufficient_keys): message = \ 'Delegated roles with insufficient keys:\n'+repr(insufficient_keys) - print(message) + logger.info(message) return if len(insufficient_signatures): message = \ 'Delegated roles with insufficient signatures:\n'+\ repr(insufficient_signatures) - print(message) + logger.info(message) return - # Verify the top-level roles and print the results. - _print_status_of_top_level_roles(targets_directory, metadata_directory) + # Verify the top-level roles and log the results. + _log_status_of_top_level_roles(targets_directory, metadata_directory) finally: shutil.rmtree(temp_repository_directory, ignore_errors=True) @@ -1131,7 +1140,8 @@ def expiration(self, datetime_object): tuf.Error, if 'datetime_object' has already expired. - Modifies the expiration attribute of the Repository object. + Modifies the expiration attribute of the Repository object. + The datetime given will be truncated to microseconds = 0 None. @@ -1143,6 +1153,10 @@ def expiration(self, datetime_object): message = repr(datetime_object) + ' is not a datetime.datetime() object.' raise tuf.FormatError(message) + # truncate the microseconds value to produce a correct schema string + # of the form yyyy-mm-ddThh:mm:ssZ + datetime_object = datetime_object.replace(microsecond = 0) + # Ensure the expiration has not already passed. current_datetime_object = \ tuf.formats.unix_timestamp_to_datetime(int(time.time())) @@ -1553,11 +1567,14 @@ def __call__(self, rolename): tuf.FormatError, if the arguments are improperly formatted. + tuf.UnknownRoleError, if 'rolename' has not been delegated by this + Targets object. + Modifies the roleinfo of the targets role in 'tuf.roledb'. - None. + The Targets object of 'rolename'. """ # Do the arguments have the correct format? @@ -1568,6 +1585,7 @@ def __call__(self, rolename): if rolename in self._delegated_roles: return self._delegated_roles[rolename] + else: message = repr(rolename)+' has not been delegated by '+repr(self.rolename) raise tuf.UnknownRoleError(message) @@ -1812,6 +1830,7 @@ def add_targets(self, list_of_targets): if os.path.isfile(filepath): relative_list_of_targets.append(filepath[targets_directory_length:]) + else: message = repr(filepath)+' is not a valid file.' raise tuf.Error(message) @@ -2331,6 +2350,109 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins, + def add_target_to_bin(self, target_filepath): + """ + + Add the fileinfo of 'target_filepath' to the expected hashed bin if + the bin is available. The hashed bin should have been created by + {targets_role}.delegate_hashed_bins(). Assuming the target filepath + falls under the repository's targets directory, determine the filepath's + hash prefix, locate the expected bin (if any), and then add the fileinfo + to the expected bin. Example: 'targets/foo.tar.gz' may be added to + the 'targets/unclaimed/58-5f.json' role's list of targets by calling this + method. + + >>> + >>> + >>> + + + target_filepath: + The filepath of the target to be added to a hashed bin. The filepath + must fall under repository's targets directory. + + + tuf.FormatError, if 'target_filepath' is improperly formatted. + + tuf.Error, if 'target_filepath' cannot be added to a hashed bin + (e.g., an invalid target filepath, or the expected hashed bin does not + exist.) + + + The fileinfo of 'target_filepath' is added to a hashed bin of this Targets + object. + + + None. + """ + + # Do the arguments have the correct format? + # Ensure the arguments have the appropriate number of objects and object + # types, and that all dict keys are properly named. + # Raise 'tuf.FormatError' if there is a mismatch. + tuf.formats.PATH_SCHEMA.check_match(target_filepath) + + # Determine the prefix length of any one of the hashed bins. The prefix + # length is not stored in the roledb, so it must be determined here by + # inspecting one of path hash prefixes listed. + roleinfo = tuf.roledb.get_roleinfo(self.rolename) + prefix_length = 0 + delegation = None + + # Set 'delegation' if this Targets role has performed any delegations. + if len(roleinfo['delegations']['roles']): + delegation = roleinfo['delegations']['roles'][0] + + else: + raise tuf.Error(self.rolename + ' has not delegated to any roles.') + + # Set 'prefix_length' if this Targets object has delegated to hashed bins, + # otherwise raise an exception. + if 'path_hash_prefixes' in delegation and len(delegation['path_hash_prefixes']): + prefix_length = len(delegation['path_hash_prefixes'][0]) + + else: + raise tuf.Error(self.rolename + ' has not delegated to hashed bins.') + + # Ensure the filepath falls under the repository's targets directory. + filepath = os.path.abspath(target_filepath) + if not filepath.startswith(self._targets_directory + os.sep): + message = repr(filepath)+' is not under the Repository\'s targets '+\ + 'directory: '+repr(self._targets_directory) + raise tuf.Error(message) + + # Determine the hash prefix of 'target_path' by computing the digest of + # its path relative to the targets directory. Example: + # '{repository_root}/targets/file1.txt' -> '/file1.txt'. + relative_path = filepath[len(self._targets_directory):] + digest_object = tuf.hash.digest(algorithm=HASH_FUNCTION) + digest_object.update(relative_path) + path_hash = digest_object.hexdigest() + path_hash_prefix = path_hash[:prefix_length] + + # Search for 'path_hash_prefix', and if found, extract the hashed bin's + # rolename. The hashed bin name is needed so that 'target_filepath' can be + # added to the Targets object of the hashed bin. + hashed_bin_name = None + for delegation in roleinfo['delegations']['roles']: + if path_hash_prefix in delegation['path_hash_prefixes']: + hashed_bin_name = delegation['name'] + break + + else: + continue + + # 'self._delegated_roles' is keyed by relative rolenames, so update + # 'hashed_bin_name'. + if hashed_bin_name is not None: + hashed_bin_name = hashed_bin_name[len(self.rolename)+1:] + self._delegated_roles[hashed_bin_name].add_target(target_filepath) + + else: + raise tuf.Error(target_filepath + ' cannot be added to any bins.') + + + @property def delegations(self): """ @@ -2405,10 +2527,10 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial, root_filename = filenames['root'] targets_filename = filenames['targets'] metadata = generate_snapshot_metadata(metadata_directory, - roleinfo['version'], - roleinfo['expires'], root_filename, - targets_filename, - consistent_snapshot) + roleinfo['version'], + roleinfo['expires'], root_filename, + targets_filename, + consistent_snapshot) _log_warning_if_expires_soon(SNAPSHOT_FILENAME, roleinfo['expires'], SNAPSHOT_EXPIRES_WARN_SECONDS) @@ -2473,13 +2595,13 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial, -def _print_status_of_top_level_roles(targets_directory, metadata_directory): +def _log_status_of_top_level_roles(targets_directory, metadata_directory): """ - Non-public function that prints whether any of the top-level roles contain an + Non-public function that logs whether any of the top-level roles contain an invalid number of public and private keys, or an insufficient threshold of signatures. Considering that the top-level metadata have to be verified in the expected root -> targets -> snapshot -> timestamp order, this function - prints the error message and returns as soon as a required metadata file is + logs the error message and returns as soon as a required metadata file is found to be invalid. It is assumed here that the delegated roles have been written and verified. Example output: @@ -2507,8 +2629,8 @@ def _print_status_of_top_level_roles(targets_directory, metadata_directory): try: _check_role_keys(rolename) - except tuf.InsufficientKeysError as e: - print(str(e)) + except tuf.InsufficientKeysError, e: + logger.info(str(e)) return # Do the top-level roles contain a valid threshold of signatures? Top-level @@ -2518,13 +2640,13 @@ def _print_status_of_top_level_roles(targets_directory, metadata_directory): signable, root_filename = \ _generate_and_write_metadata('root', root_filename, False, targets_directory, metadata_directory) - _print_status('root', signable) + _log_status('root', signable) # 'tuf.UnsignedMetadataError' raised if metadata contains an invalid threshold - # of signatures. Print the valid/threshold message, where valid < threshold. + # of signatures. log the valid/threshold message, where valid < threshold. except tuf.UnsignedMetadataError as e: signable = e[1] - _print_status('root', signable) + _log_status('root', signable) return # Verify the metadata of the Targets role. @@ -2532,11 +2654,11 @@ def _print_status_of_top_level_roles(targets_directory, metadata_directory): signable, targets_filename = \ _generate_and_write_metadata('targets', targets_filename, False, targets_directory, metadata_directory) - _print_status('targets', signable) + _log_status('targets', signable) except tuf.UnsignedMetadataError as e: signable = e[1] - _print_status('targets', signable) + _log_status('targets', signable) return # Verify the metadata of the snapshot role. @@ -2546,11 +2668,11 @@ def _print_status_of_top_level_roles(targets_directory, metadata_directory): _generate_and_write_metadata('snapshot', snapshot_filename, False, targets_directory, metadata_directory, False, filenames) - _print_status('snapshot', signable) + _log_status('snapshot', signable) except tuf.UnsignedMetadataError as e: signable = e[1] - _print_status('snapshot', signable) + _log_status('snapshot', signable) return # Verify the metadata of the Timestamp role. @@ -2560,19 +2682,19 @@ def _print_status_of_top_level_roles(targets_directory, metadata_directory): _generate_and_write_metadata('timestamp', snapshot_filename, False, targets_directory, metadata_directory, False, filenames) - _print_status('timestamp', signable) + _log_status('timestamp', signable) except tuf.UnsignedMetadataError as e: signable = e[1] - _print_status('timestamp', signable) + _log_status('timestamp', signable) return -def _print_status(rolename, signable): +def _log_status(rolename, signable): """ - Non-public function prints the number of (good/threshold) signatures of + Non-public function logs the number of (good/threshold) signatures of 'rolename'. """ @@ -2580,7 +2702,7 @@ def _print_status(rolename, signable): message = repr(rolename)+' role contains '+ repr(len(status['good_sigs']))+\ ' / '+repr(status['threshold'])+' signatures.' - print(message) + logger.info(message) @@ -2588,7 +2710,7 @@ def _print_status(rolename, signable): def _prompt(message, result_type=str): """ - Non-public function that prompts the user for input by printing 'message', + Non-public function that prompts the user for input by loging 'message', converting the input to 'result_type', and returning the value to the caller. """ diff --git a/tuf/sig.py b/tuf/sig.py index e8738487..0d7b5992 100755 --- a/tuf/sig.py +++ b/tuf/sig.py @@ -122,6 +122,7 @@ def get_signature_status(signable, role=None): # Identify unrecognized key. try: key = tuf.keydb.get_key(keyid) + except tuf.UnknownKeyError: unknown_sigs.append(keyid) continue @@ -129,6 +130,7 @@ def get_signature_status(signable, role=None): # Identify key using an unknown key signing method. try: valid_sig = tuf.keys.verify_signature(key, signature, signed) + except tuf.UnknownMethodError: unknown_method_sigs.append(keyid) continue @@ -157,8 +159,10 @@ def get_signature_status(signable, role=None): if role is not None: try: threshold = tuf.roledb.get_role_threshold(role) + except tuf.UnknownRoleError: raise + else: threshold = 0