From 3eb8f10710ccf8bf632ca37c6210d462c91ecdc1 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Mon, 2 Mar 2015 15:10:27 -0500 Subject: [PATCH 01/11] More coverage improvements --- tests/test_keydb.py | 10 ++++------ tests/test_keys.py | 15 +++++++++++++-- tests/test_repository_lib.py | 5 +++-- tests/test_slow_retrieval_attack.py | 4 ++-- tuf/keydb.py | 10 ++++++---- tuf/keys.py | 1 + 6 files changed, 29 insertions(+), 16 deletions(-) diff --git a/tests/test_keydb.py b/tests/test_keydb.py index f7171233..394e66a6 100755 --- a/tests/test_keydb.py +++ b/tests/test_keydb.py @@ -182,17 +182,15 @@ def test_create_keydb_from_root_metadata(self): keyid = KEYS[0]['keyid'] rsakey2 = KEYS[1] keyid2 = KEYS[1]['keyid'] - keydict = {keyid: rsakey, keyid2: rsakey2, keyid: rsakey} + + keydict = {keyid: rsakey, keyid2: rsakey2} - # Add a duplicate 'keyid' to log/trigger a 'tuf.KeyAlreadyExistsError' - # block (loading continues). roledict = {'Root': {'keyids': [keyid], 'threshold': 1}, - 'Targets': {'keyids': [keyid2], 'threshold': 1}} + 'Targets': {'keyids': [keyid2, keyid], 'threshold': 1}} version = 8 consistent_snapshot = False expires = '1985-10-21T01:21:00Z' - tuf.keydb.add_key(rsakey) root_metadata = tuf.formats.RootFile.make_metadata(version, expires, keydict, roledict, @@ -231,7 +229,7 @@ def test_create_keydb_from_root_metadata(self): rsakey3['keytype'] = 'bad_keytype' keydict[keyid3] = rsakey3 version = 8 - expires = '1985-10-21T01:21:00Z' + expires = '1985-10-21T01:21:00Z' root_metadata = tuf.formats.RootFile.make_metadata(version, expires, diff --git a/tests/test_keys.py b/tests/test_keys.py index 658a6db2..7f19066c 100755 --- a/tests/test_keys.py +++ b/tests/test_keys.py @@ -235,8 +235,19 @@ def test_verify_signature(self): # Passing incorrect number of arguments. self.assertRaises(TypeError, KEYS.verify_signature) - - + + # Verify that the pure python 'ed25519' base case (triggered if 'pynacl' is + # unavailable) is executed in tuf.keys.verify_signature(). + KEYS._ED25519_CRYPTO_LIBRARY = 'invalid' + KEYS._available_crypto_libraries = ['invalid'] + verified = KEYS.verify_signature(self.ed25519key_dict, ed25519_signature, DATA) + self.assertTrue(verified, "Incorrect signature.") + + # Reset to the expected available crypto libraries. + KEYS._ED25519_CRYPTO_LIBRARY = 'pynacl' + KEYS._available_crypto_libraries = ['ed25519', 'pycrypto', 'pynacl'] + + def test_create_rsa_encrypted_pem(self): # Test valid arguments. diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index 557319b3..f73060c8 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -647,8 +647,7 @@ def test_sign_metadata(self): root_private_keypath = os.path.join(keystore_path, 'root_key') root_private_key = \ - repo_lib.import_rsa_privatekey_from_file(root_private_keypath, - 'password') + repo_lib.import_rsa_privatekey_from_file(root_private_keypath, 'password') # sign_metadata() expects the private key 'root_metadata' to be in # 'tuf.keydb'. Remove any public keys that may be loaded before @@ -670,6 +669,8 @@ def test_sign_metadata(self): self.assertRaises(tuf.FormatError, repo_lib.sign_metadata, root_metadata, root_keyids, 3) + # Test + def test_write_metadata_file(self): diff --git a/tests/test_slow_retrieval_attack.py b/tests/test_slow_retrieval_attack.py index c6758a9e..1c48f698 100755 --- a/tests/test_slow_retrieval_attack.py +++ b/tests/test_slow_retrieval_attack.py @@ -265,8 +265,8 @@ def test_with_tuf_mode_2(self): # Verify that the specific 'tuf.SlowRetrievalError' exception is raised by # each mirror. 'file1.txt' should be large enough to trigger a slow - # retrieval attack, otherwise the expected exception may not be consistently - # raised. + # retrieval attack, otherwise the expected exception may not be + # consistently raised. except tuf.NoWorkingMirrorError as exception: for mirror_url, mirror_error in six.iteritems(exception.mirror_errors): url_prefix = self.repository_mirrors['mirror1']['url_prefix'] diff --git a/tuf/keydb.py b/tuf/keydb.py index 60ac62fb..87da76a1 100755 --- a/tuf/keydb.py +++ b/tuf/keydb.py @@ -90,9 +90,9 @@ def create_keydb_from_root_metadata(root_metadata): # Clear the key database. _keydb_dict.clear() - # Iterate through the keys found in 'root_metadata' by converting - # them to 'RSAKEY_SCHEMA' if their type is 'rsa', and then - # adding them the database. Duplicates are avoided. + # Iterate the keys found in 'root_metadata' by converting them to + # 'RSAKEY_SCHEMA' if their type is 'rsa', and then adding them to the + # database. for keyid, key_metadata in six.iteritems(root_metadata['keys']): if key_metadata['keytype'] in _SUPPORTED_KEY_TYPES: # 'key_metadata' is stored in 'KEY_SCHEMA' format. Call @@ -102,7 +102,9 @@ def create_keydb_from_root_metadata(root_metadata): try: add_key(key_dict, keyid) - except tuf.KeyAlreadyExistsError as e: + # Although keyid duplicates should *not* occur (unique dict keys), log a + # warning and continue. + except tuf.KeyAlreadyExistsError as e: # pragma: no cover logger.warning(e) continue diff --git a/tuf/keys.py b/tuf/keys.py index acc44247..ed727eb6 100755 --- a/tuf/keys.py +++ b/tuf/keys.py @@ -844,6 +844,7 @@ def verify_signature(key_dict, signature, data): valid_signature = tuf.ed25519_keys.verify_signature(public, method, sig, data, use_pynacl=True) + # Fall back to the optimized pure python implementation of ed25519. else: # pragma: no cover valid_signature = tuf.ed25519_keys.verify_signature(public, From 7e8cd6a56d282949861db9f8f306316789e4e5d0 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Thu, 2 Apr 2015 16:33:06 -0400 Subject: [PATCH 02/11] Add test module for tuf exceptions --- tests/test_init.py | 52 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 52 insertions(+) create mode 100755 tests/test_init.py diff --git a/tests/test_init.py b/tests/test_init.py new file mode 100755 index 00000000..07d45f9e --- /dev/null +++ b/tests/test_init.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python + +""" + + test_init.py + + + Vladimir Diaz + + + March 30, 2015. + + + See LICENSE for licensing information. + + + Test cases for __init__.py (mainly the exceptions defined there). +""" + +# Help with Python 3 compatibility, where the print statement is a function, an +# implicit relative import is invalid, and the '/' operator performs true +# division. Example: print 'hello world' raises a 'SyntaxError' exception. +from __future__ import print_function +from __future__ import absolute_import +from __future__ import division +from __future__ import unicode_literals + +import unittest +import logging + +import tuf + +logger = logging.getLogger('tuf.test_keys') + +class TestInit(unittest.TestCase): + def setUp(self): + pass + + + def test_bad_signature_error(self): + bad_signature_error = tuf.BadSignatureError('bad_role') + logger.error(bad_signature_error) + + + def test_slow_retrieval_error(self): + slow_signature_error = tuf.SlowRetrievalError('bad_role') + logger.error(slow_signature_error) + + +# Run the unit tests. +if __name__ == '__main__': + unittest.main() From afd8bfd390df44a6f305e41ca93937d62f502959 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Thu, 2 Apr 2015 16:33:39 -0400 Subject: [PATCH 03/11] Improve code coverage for log.py --- tests/test_log.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tests/test_log.py b/tests/test_log.py index 006ba7af..aaf70155 100755 --- a/tests/test_log.py +++ b/tests/test_log.py @@ -116,7 +116,7 @@ def test_add_console_handler(self): raise TypeError('Test exception output in the console.') except TypeError as e: - logger.error(e) + logger.exception(e) def test_remove_console_handler(self): From f6b592723846b6261e7bd8e605496ddb49c15a7a Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Thu, 2 Apr 2015 16:34:52 -0400 Subject: [PATCH 04/11] Improve code coverage for util.py --- tests/test_util.py | 24 ++++++++++++++----- tuf/util.py | 57 ++++++++++++++++++++++++++++------------------ 2 files changed, 53 insertions(+), 28 deletions(-) diff --git a/tests/test_util.py b/tests/test_util.py index cd3a9b11..5c897a49 100755 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -219,26 +219,32 @@ def test_A6_tempfile_decompress_temp_file_object(self): for arg in bogus_args: self.assertRaises(tuf.Error, self.temp_fileobj.decompress_temp_file_object, arg) + + # Test for a valid util.decompress_temp_file_object() call. self.temp_fileobj.decompress_temp_file_object('gzip') self.assertEqual(self.temp_fileobj.read(), fileobj.read()) # Checking the content of the TempFile's '_orig_file' instance. check_compressed_original = self.make_temp_file() with open(check_compressed_original, 'wb') as file_object: - file_object.write(self.temp_fileobj._orig_file.read()) + self.temp_fileobj._orig_file.seek(0) + original_content = self.temp_fileobj._orig_file.read() + file_object.write(original_content) + data_in_orig_file = self._decompress_file(check_compressed_original) fileobj.seek(0) self.assertEqual(data_in_orig_file, fileobj.read()) - + # Try decompressing once more. self.assertRaises(tuf.Error, self.temp_fileobj.decompress_temp_file_object, 'gzip') # Test decompression of invalid gzip file. temp_file = tuf.util.TempFile() - fileobj.seek(0) - temp_file.write(fileobj.read()) - temp_file.decompress_temp_file_object('gzip') + temp_file.write('bad zip') + contents = temp_file.read() + self.assertRaises(tuf.DecompressionError, + temp_file.decompress_temp_file_object, 'gzip') @@ -314,6 +320,12 @@ def test_B3_file_in_confined_directories(self): def test_B4_import_json(self): self.assertTrue('json' in sys.modules) + json_module = tuf.util.import_json() + self.assertTrue(json_module is not None) + + # Test import_json() when 'util._json_moduel' is non-None. + tuf.util._json_module = 'junk_module' + self.assertEqual(tuf.util.import_json(), 'junk_module') @@ -424,7 +436,7 @@ def test_C3_paths_are_consistent_with_hash_prefixes(self): path_hash_prefixes = ['e3a3', '8fae', 'd543'] list_of_targets = ['/file1.txt', '/README.txt', '/warehouse/file2.txt'] - # Ensure the paths of 'list_of_targets' each have the epected path hash + # Ensure the paths of 'list_of_targets' each have the expected path hash # prefix listed in 'path_hash_prefixes'. for filepath in list_of_targets: self.assertTrue(tuf.util.get_target_hash(filepath)[0:4] in path_hash_prefixes) diff --git a/tuf/util.py b/tuf/util.py index ba401431..fa8219a6 100755 --- a/tuf/util.py +++ b/tuf/util.py @@ -317,8 +317,12 @@ def decompress_temp_file_object(self, compression): self._orig_file = self.temporary_file try: - self.temporary_file = gzip.GzipFile(fileobj=self.temporary_file, - mode='rb') + gzip_file_object = gzip.GzipFile(fileobj=self.temporary_file, mode='rb') + uncompressed_content = gzip_file_object.read() + self.temporary_file = tempfile.NamedTemporaryFile() + self.temporary_file.write(uncompressed_content) + self.flush() + except Exception as exception: raise tuf.DecompressionError(exception) @@ -661,12 +665,14 @@ def ensure_all_targets_allowed(rolename, list_of_targets, parent_delegations): if allowed_child_path_hash_prefixes is not None: consistent = paths_are_consistent_with_hash_prefixes - if len(actual_child_targets) > 0: - if not consistent(actual_child_targets, - allowed_child_path_hash_prefixes): - message = repr(rolename) + ' specifies a target that does not' + \ - ' have a path hash prefix listed in its parent role.' - raise tuf.ForbiddenTargetError(message) + + # 'actual_child_tarets' (i.e., 'list_of_targets') should have lenth + # greater than zero due to the tuf.format check above. + if not consistent(actual_child_targets, + allowed_child_path_hash_prefixes): + message = repr(rolename) + ' specifies a target that does not' + \ + ' have a path hash prefix listed in its parent role.' + raise tuf.ForbiddenTargetError(message) elif allowed_child_paths is not None: # Check that each delegated target is either explicitly listed or a parent @@ -710,7 +716,7 @@ def ensure_all_targets_allowed(rolename, list_of_targets, parent_delegations): def paths_are_consistent_with_hash_prefixes(paths, path_hash_prefixes): """ - Determine whether a list of paths are consistent with theirs alleged + Determine whether a list of paths are consistent with their alleged path hash prefixes. By default, the SHA256 hash function is used. @@ -743,21 +749,23 @@ def paths_are_consistent_with_hash_prefixes(paths, path_hash_prefixes): # proven otherwise. consistent = False - if len(paths) > 0 and len(path_hash_prefixes) > 0: - for path in paths: - path_hash = get_target_hash(path) - # Assume that every path is inconsistent until proven otherwise. - consistent = False + # The format checks above ensure the 'paths' and 'path_hash_prefix' lists + # have lengths greater than zero. + for path in paths: + path_hash = get_target_hash(path) + + # Assume that every path is inconsistent until proven otherwise. + consistent = False - for path_hash_prefix in path_hash_prefixes: - if path_hash.startswith(path_hash_prefix): - consistent = True - break - - # This path has no matching path_hash_prefix. Stop looking further. - if not consistent: + for path_hash_prefix in path_hash_prefixes: + if path_hash.startswith(path_hash_prefix): + consistent = True break + # This path has no matching path_hash_prefix. Stop looking further. + if not consistent: + break + return consistent @@ -836,11 +844,16 @@ def import_json(): if _json_module is not None: return _json_module + else: try: module = __import__('json') - except ImportError: + + # The 'json' module is available in Python > 2.6, and thus this exception + # should not occur in all supported Python installations (> 2.6) of TUF. + except ImportError: #pragma: no cover raise ImportError('Could not import the json module') + else: _json_module = module return module From 55e9e952b60d5e3b0e30b52c4c01e4e95eae3302 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Thu, 2 Apr 2015 16:35:45 -0400 Subject: [PATCH 05/11] Improve code coverage for pycrypto_keys.py --- tests/test_pycrypto_keys.py | 9 +++++++++ tuf/pycrypto_keys.py | 5 +++-- 2 files changed, 12 insertions(+), 2 deletions(-) diff --git a/tests/test_pycrypto_keys.py b/tests/test_pycrypto_keys.py index 4ef568ab..667e93fa 100755 --- a/tests/test_pycrypto_keys.py +++ b/tests/test_pycrypto_keys.py @@ -66,6 +66,7 @@ def test_generate_rsa_public_and_private(self): def test_create_rsa_signature(self): global private_rsa + global public_rsa data = 'The quick brown fox jumps over the lazy dog'.encode('utf-8') signature, method = pycrypto.create_rsa_signature(private_rsa, data) @@ -83,9 +84,16 @@ def test_create_rsa_signature(self): pycrypto.create_rsa_signature, '', data) # Check for invalid 'data'. + print(repr(private_rsa)) + pycrypto.create_rsa_signature(private_rsa, '') + self.assertRaises(tuf.CryptoError, pycrypto.create_rsa_signature, private_rsa, 123) + # Check for missing private key. + self.assertRaises(tuf.CryptoError, + pycrypto.create_rsa_signature, public_rsa, data) + def test_verify_rsa_signature(self): global public_rsa @@ -209,6 +217,7 @@ def test_create_rsa_public_and_private_from_encrypted_pem(self): self.assertRaises(tuf.FormatError, pycrypto.create_rsa_public_and_private_from_encrypted_pem, pem_rsakey, ['pw']) + self.assertRaises(tuf.CryptoError, pycrypto.create_rsa_public_and_private_from_encrypted_pem, 'invalid_pem', passphrase) diff --git a/tuf/pycrypto_keys.py b/tuf/pycrypto_keys.py index f3ac2382..b61fd9d6 100755 --- a/tuf/pycrypto_keys.py +++ b/tuf/pycrypto_keys.py @@ -585,8 +585,9 @@ def create_rsa_public_and_private_from_encrypted_pem(encrypted_pem, passphrase): public = rsa_pubkey.exportKey(format='PEM') # PyCrypto raises 'ValueError' if the public or private keys cannot be - # exported. See 'Crypto.PublicKey.RSA'. - except (ValueError): + # exported. See 'Crypto.PublicKey.RSA'. 'ValueError' should not be raised + # if the 'Crypto.PublicKey.RSA.importKey() call above passed. + except (ValueError): #pragma: no cover message = 'The public and private keys cannot be exported in PEM format.' raise tuf.CryptoError(message) From 23083fae12e7895412c22bb01b3030f8fefed193 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Thu, 2 Apr 2015 16:37:11 -0400 Subject: [PATCH 06/11] Improve code coverage for repository_lib.py --- tests/test_repository_lib.py | 79 ++++++++++++++++++++++++++++++++++++ tuf/repository_lib.py | 4 +- 2 files changed, 82 insertions(+), 1 deletion(-) diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index f73060c8..aecefb11 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -758,6 +758,85 @@ def test__check_directory(self): + def test__generate_and_write_metadata(self): + # Test for invalid, or unsupported, rolename. + # Load the root metadata provided in 'tuf/tests/repository_data/'. + root_filepath = os.path.join('repository_data', 'repository', + 'metadata', 'root.json') + root_signable = tuf.util.load_json_file(root_filepath) + + # _generate_and_write_metadata() expects the top-level roles + # (specifically 'snapshot') and keys to be available in 'tuf.roledb'. + tuf.roledb.create_roledb_from_root_metadata(root_signable['signed']) + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + targets_directory = os.path.join(temporary_directory, 'targets') + os.mkdir(targets_directory) + repository_directory = os.path.join(temporary_directory, 'repository') + metadata_directory = os.path.join(repository_directory, + repo_lib.METADATA_STAGED_DIRECTORY_NAME) + targets_metadata = os.path.join('repository_data', 'repository', 'metadata', + 'targets.json') + obsolete_metadata = os.path.join(metadata_directory, 'targets', + 'obsolete_role.json') + tuf.util.ensure_parent_dir(obsolete_metadata) + shutil.copyfile(targets_metadata, obsolete_metadata) + + # Test for an invalid, or unsupported, rolename. + roleinfo = {'keyids': ['123'], 'threshold': 1} + tuf.roledb.add_role('bad_rolename', roleinfo) + self.assertRaises(tuf.Error, + tuf.repository_lib._generate_and_write_metadata, + 'bad_rolename', 'bad_rolename.json', False, + targets_directory, metadata_directory) + + # Verify that obsolete metadata (a metadata file exists on disk, but the + # role is unavailable in 'tuf.roledb'). First add the obsolete + # role to 'tuf.roledb' so that its metadata file can be written to disk. + targets_roleinfo = tuf.roledb.get_roleinfo('targets') + targets_roleinfo['version'] = 1 + expiration = \ + tuf.formats.unix_timestamp_to_datetime(int(time.time() + 86400)) + expiration = expiration.isoformat() + 'Z' + targets_roleinfo['expires'] = expiration + tuf.roledb.add_role('targets/obsolete_role', targets_roleinfo) + + snapshot_filepath = os.path.join('repository_data', 'repository', + 'metadata', 'snapshot.json') + snapshot_signable = tuf.util.load_json_file(snapshot_filepath) + tuf.roledb.remove_role('targets/obsolete_role') + self.assertTrue(os.path.exists(os.path.join(metadata_directory, + 'targets/obsolete_role.json'))) + tuf.repository_lib._delete_obsolete_metadata(metadata_directory, + snapshot_signable['signed'], + False) + self.assertFalse(os.path.exists(metadata_directory + 'targets/obsolete_role.json')) + + + + def test__remove_invalid_and_duplicate_signatures(self): + # Remove duplicate PSS signatures (same key generates valid, but different + # signatures). First load a valid signable (in this case, the root role). + root_filepath = os.path.join('repository_data', 'repository', + 'metadata', 'root.json') + root_signable = tuf.util.load_json_file(root_filepath) + key_filepath = os.path.join('repository_data', 'keystore', 'root_key') + root_rsa_key = repo_lib.import_rsa_privatekey_from_file(key_filepath, + 'password') + + # Append the new valid, but duplicate PSS signature, and test that + # duplicates are removed. + new_pss_signature = tuf.keys.create_signature(root_rsa_key, + root_signable['signed']) + root_signable['signatures'].append(new_pss_signature) + expected_number_of_signatures = len(root_signable['signatures']) + tuf.repository_lib._remove_invalid_and_duplicate_signatures(root_signable) + self.assertEqual(len(root_signable), expected_number_of_signatures) + + # Test that invalid keyid are ignored. + root_signable['signatures'][0]['keyid'] = '404' + tuf.repository_lib._remove_invalid_and_duplicate_signatures(root_signable) + + # Run the test cases. if __name__ == '__main__': unittest.main() diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index 8c107bbd..ccf17122 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -162,6 +162,8 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial, _log_warning_if_expires_soon(TIMESTAMP_FILENAME, roleinfo['expires'], TIMESTAMP_EXPIRES_WARN_SECONDS) + else: + raise tuf.Error('Invalid rolename') signable = sign_metadata(metadata, roleinfo['signing_keyids'], metadata_filename) @@ -387,7 +389,7 @@ def _remove_invalid_and_duplicate_signatures(signable): # Although valid, it may still need removal if it is a duplicate. Check # the keyid, rather than the signature, to remove duplicate PSS signatures. - # PSS may generate multiple different signatures for the same keyid. + # PSS may generate multiple different signatures for the same keyid. else: if keyid in signature_keyids: signable['signatures'].remove(signature) From 003847e73f9a1dd1f559c2847c124045a397f330 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Thu, 2 Apr 2015 16:37:47 -0400 Subject: [PATCH 07/11] Improve code coverage for download.py --- tests/test_slow_retrieval_attack.py | 5 ++++- tuf/download.py | 20 ++++++++------------ 2 files changed, 12 insertions(+), 13 deletions(-) diff --git a/tests/test_slow_retrieval_attack.py b/tests/test_slow_retrieval_attack.py index 1c48f698..8963f945 100755 --- a/tests/test_slow_retrieval_attack.py +++ b/tests/test_slow_retrieval_attack.py @@ -257,8 +257,10 @@ def test_with_tuf_mode_2(self): # by sending just several characters every few seconds. server_process = self._start_slow_server('mode_2') - client_filepath = os.path.join(self.client_directory, 'file1.txt') + original_average_download_speed = tuf.conf.MIN_AVERAGE_DOWNLOAD_SPEED + tuf.conf.MIN_AVERAGE_DOWNLOAD_SPEED = 1 + try: file1_target = self.repository_updater.target('file1.txt') self.repository_updater.download_target(file1_target, self.client_directory) @@ -283,6 +285,7 @@ def test_with_tuf_mode_2(self): finally: self._stop_slow_server(server_process) + tuf.conf.MIN_AVERAGE_DOWNLOAD_SPEED = original_average_download_speed if __name__ == '__main__': diff --git a/tuf/download.py b/tuf/download.py index d3b5df99..2f9816c2 100755 --- a/tuf/download.py +++ b/tuf/download.py @@ -505,20 +505,16 @@ def _check_content_length(reported_length, required_length, strict_length=True): logger.debug('The server reported a length of '+repr(reported_length)+' bytes.') comparison_result = None + + if reported_length < required_length: + comparison_result = 'less than' - try: - if reported_length < required_length: - comparison_result = 'less than' - - elif reported_length > required_length: - comparison_result = 'greater than' - - else: - comparison_result = 'equal to' - - except: - logger.exception('Could not check reported and required lengths.') + elif reported_length > required_length: + comparison_result = 'greater than' + else: + comparison_result = 'equal to' + if strict_length: message = 'The reported length is '+comparison_result+' the required '+\ 'length of '+repr(required_length)+' bytes.' From 3293745d05c36ac8afa00333fb6b032b73507fb1 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Thu, 2 Apr 2015 16:38:18 -0400 Subject: [PATCH 08/11] Improve code coverage for repository_tool.py --- tests/test_repository_tool.py | 71 +++++++++++++++++++++++++++-------- tuf/repository_tool.py | 7 +++- 2 files changed, 60 insertions(+), 18 deletions(-) diff --git a/tests/test_repository_tool.py b/tests/test_repository_tool.py index 3bd70ec3..704daf0d 100755 --- a/tests/test_repository_tool.py +++ b/tests/test_repository_tool.py @@ -114,7 +114,7 @@ def test_init(self): def test_write_and_write_partial(self): # Test creation of a TUF repository. # - # 1. Load public and private keys. + # 1. Import public and private keys. # 2. Add verification keys. # 3. Load signing keys. # 4. Add target files. @@ -245,26 +245,37 @@ def test_write_and_write_partial(self): repository.status() # Verify status() does not raise 'tuf.InsufficientKeysError' if a top-level - # role does not contain a threshold of keys. + # role does and 'targets/role1' do not contain a threshold of keys. root_roleinfo = tuf.roledb.get_roleinfo('root') - old_threshold = root_roleinfo['threshold'] + old_threshold = root_roleinfo['threshold'] root_roleinfo['threshold'] = 10 + role1_roleinfo = tuf.roledb.get_roleinfo('targets/role1') + old_role1_threshold = role1_roleinfo['threshold'] + role1_roleinfo['threshold'] = 10 tuf.roledb.update_roleinfo('root', root_roleinfo) + tuf.roledb.update_roleinfo('targets/role1', role1_roleinfo) repository.status() - # Restore the original threshold value. + # Restore the original threshold values. root_roleinfo['threshold'] = old_threshold tuf.roledb.update_roleinfo('root', root_roleinfo) - + role1_roleinfo['threshold'] = old_role1_threshold + tuf.roledb.update_roleinfo('targets/role1', role1_roleinfo) + + # Verify status() does not raise 'tuf.UnsignedMetadataError' if any of the - # the top-level roles are improperly signed. + # the top-level roles and 'targets/role1' are improperly signed. repository.root.unload_signing_key(root_privkey) repository.root.load_signing_key(targets_privkey) + repository.targets('role1').unload_signing_key(role1_privkey) + repository.targets('role1').load_signing_key(targets_privkey) repository.status() - # Reset Root and verify Targets. + # Reset Root and 'targets/role1', and verify Targets. repository.root.unload_signing_key(targets_privkey) repository.root.load_signing_key(root_privkey) + repository.targets('role1').unload_signing_key(targets_privkey) + repository.targets('role1').load_signing_key(role1_privkey) repository.targets.unload_signing_key(targets_privkey) repository.targets.load_signing_key(snapshot_privkey) repository.status() @@ -987,8 +998,11 @@ def test_add_targets(self): target1_filepath = os.path.join(self.targets_directory, 'file1.txt') target2_filepath = os.path.join(self.targets_directory, 'file2.txt') target3_filepath = os.path.join(self.targets_directory, 'file3.txt') - - target_files = [target1_filepath, target2_filepath, target3_filepath] + + # Add a 'target1_filepath' duplicate for testing purposes + # ('target1_filepath' should not be added twice.) + target_files = \ + [target1_filepath, target2_filepath, target3_filepath, target1_filepath] self.targets_object.add_targets(target_files) self.assertEqual(len(self.targets_object.target_files), 3) @@ -997,17 +1011,19 @@ def test_add_targets(self): # Test improperly formatted arguments. - self.assertRaises(tuf.FormatError, self.targets_object.add_targets, - 3) - + self.assertRaises(tuf.FormatError, self.targets_object.add_targets, 3) # Test invalid filepath argument (i.e., non-existent or invalid file.) - self.assertRaises(tuf.Error, self.targets_object.add_target, + self.assertRaises(tuf.Error, self.targets_object.add_targets, ['non-existent.txt']) - self.assertRaises(tuf.Error, self.targets_object.add_target, + self.assertRaises(tuf.Error, self.targets_object.add_targets, [target1_filepath, target2_filepath, 'non-existent.txt']) - self.assertRaises(tuf.Error, self.targets_object.add_target, - self.temporary_directory) + self.assertRaises(tuf.Error, self.targets_object.add_targets, + [self.temporary_directory]) + temp_directory = os.path.join(self.targets_directory, 'temp') + os.mkdir(temp_directory) + self.assertRaises(tuf.Error, self.targets_object.add_targets, + [temp_directory]) @@ -1032,6 +1048,10 @@ def test_remove_target(self): # Test invalid filepath argument (i.e., non-existent or invalid file.) self.assertRaises(tuf.Error, self.targets_object.remove_target, '/non-existent.txt') + # Test for filepath that hasn't been added yet. + target5_filepath = os.path.join(self.targets_directory, 'file5.txt') + self.assertRaises(tuf.Error, self.targets_object.remove_target, + target5_filepath) @@ -1076,6 +1096,25 @@ def test_delegate(self): self.assertEqual(self.targets_object.get_delegated_rolenames(), ['targets/tuf']) + + # Try to delegate to a role that has already been delegated. + self.assertRaises(tuf.Error, self.targets_object.delegate, rolename, + public_keys, list_of_targets, threshold, backtrack=True, + restricted_paths=restricted_paths, + path_hash_prefixes=path_hash_prefixes) + + # Test for targets that do not exist under the targets directory. + self.targets_object.revoke(rolename) + self.assertRaises(tuf.Error, self.targets_object.delegate, rolename, + public_keys, ['non-existent.txt'], threshold, + backtrack=True, restricted_paths=restricted_paths, + path_hash_prefixes=path_hash_prefixes) + + # Test for targets that do not exist under the targets directory. + self.assertRaises(tuf.Error, self.targets_object.delegate, rolename, + public_keys, list_of_targets, threshold, + backtrack=True, restricted_paths=['non-existent.txt'], + path_hash_prefixes=path_hash_prefixes) # Test improperly formatted arguments. diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index 143f14c7..bc607c79 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -102,7 +102,7 @@ try: tuf.keys.check_crypto_libraries(['rsa', 'ed25519', 'general']) -except tuf.UnsupportedLibraryError as e: +except tuf.UnsupportedLibraryError as e: #pragma: no cover message = 'Warning: The repository and developer tools require additional' + \ ' libraries and can be installed as follows:\n $ pip install tuf[tools]' logger.warn(message) @@ -1795,7 +1795,7 @@ def add_targets(self, list_of_targets): # times these checks are performed. for target in list_of_targets: filepath = os.path.abspath(target) - + if not filepath.startswith(self._targets_directory+os.sep): message = repr(filepath) + ' is not under the Repository\'s targets ' +\ 'directory: ' + repr(self._targets_directory) @@ -1813,6 +1813,9 @@ def add_targets(self, list_of_targets): for relative_target in relative_list_of_targets: if relative_target not in roleinfo['paths']: roleinfo['paths'].update({relative_target: {}}) + + else: + continue tuf.roledb.update_roleinfo(self.rolename, roleinfo) From 75eb79534c41f0bae64aa0cf9f3bb1bcf5115d59 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Fri, 3 Apr 2015 13:46:43 -0400 Subject: [PATCH 09/11] Improve code coverage for repository_lib.py --- tests/test_repository_lib.py | 16 ++++++++++++++-- tuf/repository_lib.py | 5 ++++- 2 files changed, 18 insertions(+), 3 deletions(-) diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index aecefb11..e754d70e 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -649,13 +649,25 @@ def test_sign_metadata(self): root_private_key = \ repo_lib.import_rsa_privatekey_from_file(root_private_keypath, 'password') + # Sign with a valid, but not a threshold, key. + targets_private_keypath = os.path.join(keystore_path, 'targets_key') + targets_private_key = \ + repo_lib.import_rsa_privatekey_from_file(targets_private_keypath, + 'password') + # sign_metadata() expects the private key 'root_metadata' to be in # 'tuf.keydb'. Remove any public keys that may be loaded before # adding private key, otherwise a 'tuf.KeyAlreadyExists' exception is # raised. tuf.keydb.remove_key(root_private_key['keyid']) tuf.keydb.add_key(root_private_key) - + tuf.keydb.remove_key(targets_private_key['keyid']) + tuf.keydb.add_key(targets_private_key) + + root_keyids.extend(tuf.roledb.get_role_keyids('targets')) + # Add the snapshot's public key (to test whether non-private keys are + # ignored by sign_metadata()). + root_keyids.extend(tuf.roledb.get_role_keyids('snapshot')) root_signable = repo_lib.sign_metadata(root_metadata, root_keyids, root_filename) self.assertTrue(tuf.formats.SIGNABLE_SCHEMA.matches(root_signable)) @@ -669,7 +681,7 @@ def test_sign_metadata(self): self.assertRaises(tuf.FormatError, repo_lib.sign_metadata, root_metadata, root_keyids, 3) - # Test + # Test for a key that does not contain the private portion. diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index ccf17122..5ee92038 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -1795,11 +1795,14 @@ def sign_metadata(metadata_object, keyids, filename): for signature in signable['signatures']: if not keyid == signature['keyid']: signatures.append(signature) + + else: + continue signable['signatures'] = signatures # Generate the signature using the appropriate signing method. if key['keytype'] in SUPPORTED_KEY_TYPES: - if len(key['keyval']['private']): + if 'private' in key['keyval']: signed = signable['signed'] signature = tuf.keys.create_signature(key, signed) signable['signatures'].append(signature) From 110c8a196b61db086fbe69fdfedc982de4e3fc50 Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Mon, 6 Apr 2015 20:05:41 -0400 Subject: [PATCH 10/11] Improve code coverage for updater.py --- tests/test_repository_lib.py | 2 - tests/test_updater.py | 83 ++++++++++++++++++++++++++++++++++-- tuf/pycrypto_keys.py | 4 +- 3 files changed, 81 insertions(+), 8 deletions(-) diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index e754d70e..6e0e0ce6 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -681,8 +681,6 @@ def test_sign_metadata(self): self.assertRaises(tuf.FormatError, repo_lib.sign_metadata, root_metadata, root_keyids, 3) - # Test for a key that does not contain the private portion. - def test_write_metadata_file(self): diff --git a/tests/test_updater.py b/tests/test_updater.py index 1e2de0ef..682ee6a7 100755 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -457,12 +457,9 @@ def test_2__import_delegations(self): # one of the roles loaded from parent role's 'delegations'. - - - def test_2__fileinfo_has_changed(self): # Verify that the method returns 'False' if file info was not changed. root_filepath = os.path.join(self.client_metadata_current, 'root.json') @@ -875,6 +872,39 @@ def test_5_targets_of_role(self): + def test_6_refresh_tagets_metadata_chain(self): + # NOTE: This function does not refresh the role specified in the argument, + # only its parent roles. + + # Remove the metadata of the delegated roles. + os.remove(os.path.join(self.client_metadata_current, 'targets.json')) + os.remove(os.path.join(self.client_metadata_current, 'targets', 'role1.json')) + + # Test: normal case. + metadata_list = \ + self.repository_updater.refresh_targets_metadata_chain('targets') + + """ + print(repr(metadata_list)) + self.assertEqual(len(metadata_list), 0) + self.assertTrue('targets' in metadata_list) + + # Verify that the expected role files were downloaded and installed. + os.path.exists(os.path.join(self.client_metadata_current, 'targets.json')) + + self.assertTrue('targets' in self.repository_updater.metadata['current']) + self.assertFalse('targets/role1' in self.repository_updater.metadata['current']) + """ + # Test: Invalid arguments. + # refresh_targets_metadata_chain() expects a string rolename. + self.assertRaises(tuf.FormatError, + self.repository_updater.refresh_targets_metadata_chain, + 8) + self.assertRaises(tuf.RepositoryError, + self.repository_updater.refresh_targets_metadata_chain, + 'unknown_rolename') + + def test_6_target(self): # Setup @@ -983,6 +1013,7 @@ def test_6_download_target(self): target_fileinfo = self.repository_updater.target(target_filepath) self.repository_updater.download_target(target_fileinfo, destination_directory) + download_filepath = \ os.path.join(destination_directory, target_filepath.lstrip('/')) self.assertTrue(os.path.exists(download_filepath)) @@ -994,6 +1025,11 @@ def test_6_download_target(self): if 'custom' in target_fileinfo['fileinfo']: download_targetfileinfo['custom'] = target_fileinfo['fileinfo']['custom'] self.assertEqual(target_fileinfo['fileinfo'], download_targetfileinfo) + + # Test when consistent snapshots is set. + self.repository_updater.consistent_snapshots = True + self.repository_updater.download_target(target_fileinfo, + destination_directory) # Test: Invalid arguments. self.assertRaises(tuf.FormatError, self.repository_updater.download_target, @@ -1183,11 +1219,50 @@ def test_9__get_target_hash(self): self.assertEqual(self.repository_updater._get_target_hash(filepath), target_hash) # Test for improperly formatted argument. - self.assertRaises(tuf.FormatError, tuf.util.get_target_hash, 8) + #self.assertRaises(tuf.FormatError, self.repository_updater._get_target_hash, 8) + def test_10__hard_check_file_length(self): + # Test for exception if file object is not equal to trusted file length. + temp_file_object = tuf.util.TempFile() + temp_file_object.write('X') + temp_file_object.seek(0) + self.assertRaises(tuf.DownloadLengthMismatchError, + self.repository_updater._hard_check_file_length, + temp_file_object, 10) + + + + + + def test_10__soft_check_file_length(self): + # Test for exception if file object is not equal to trusted file length. + temp_file_object = tuf.util.TempFile() + temp_file_object.write('XXX') + temp_file_object.seek(0) + self.assertRaises(tuf.DownloadLengthMismatchError, + self.repository_updater._soft_check_file_length, + temp_file_object, 1) + + + + def test_10__targets_of_role(self): + # Test for non-existent role. + self.assertRaises(tuf.UnknownRoleError, + self.repository_updater._targets_of_role, + 'non-existent-role') + + # Test for role that hasn't been loaded yet. + del self.repository_updater.metadata['current']['targets'] + self.assertEqual(len(self.repository_updater._targets_of_role('targets')), + 0) + + + + + def _load_role_keys(keystore_directory): diff --git a/tuf/pycrypto_keys.py b/tuf/pycrypto_keys.py index b61fd9d6..85521376 100755 --- a/tuf/pycrypto_keys.py +++ b/tuf/pycrypto_keys.py @@ -304,13 +304,13 @@ def create_rsa_signature(private_key, data): pkcs1_pss_signer = Crypto.Signature.PKCS1_PSS.new(rsa_key_object) signature = pkcs1_pss_signer.sign(sha256_object) - except ValueError: + except ValueError: #pragma: no cover raise tuf.CryptoError('The RSA key too small for given hash algorithm.') except TypeError: raise tuf.CryptoError('Missing required RSA private key.') - except IndexError: + except IndexError: # pragma: no cover message = 'An RSA signature cannot be generated: ' + str(e) raise tuf.CryptoError(message) From 98df2014b62e5fa7ca9b9dba453909640e1f0a8a Mon Sep 17 00:00:00 2001 From: Vladimir Diaz Date: Mon, 6 Apr 2015 21:29:21 -0400 Subject: [PATCH 11/11] Fix minors issues with strings and remove print statements --- tests/test_pycrypto_keys.py | 1 - tests/test_updater.py | 6 ++---- tests/test_util.py | 2 +- 3 files changed, 3 insertions(+), 6 deletions(-) diff --git a/tests/test_pycrypto_keys.py b/tests/test_pycrypto_keys.py index 667e93fa..241ef74a 100755 --- a/tests/test_pycrypto_keys.py +++ b/tests/test_pycrypto_keys.py @@ -84,7 +84,6 @@ def test_create_rsa_signature(self): pycrypto.create_rsa_signature, '', data) # Check for invalid 'data'. - print(repr(private_rsa)) pycrypto.create_rsa_signature(private_rsa, '') self.assertRaises(tuf.CryptoError, diff --git a/tests/test_updater.py b/tests/test_updater.py index 682ee6a7..cbf6c63a 100755 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -443,8 +443,6 @@ def test_2__import_delegations(self): self.repository_updater.metadata['current']['targets']\ ['delegations']['keys'][existing_keyid]['keyid'] = '123' - print(repr(self.repository_updater.metadata['current']['targets']\ - ['delegations']['keys'][existing_keyid]['keyid'])) self.repository_updater._import_delegations('targets') #self.assertRaises(tuf.Error, self.repository_updater._import_delegations, # 'targets') @@ -1227,7 +1225,7 @@ def test_9__get_target_hash(self): def test_10__hard_check_file_length(self): # Test for exception if file object is not equal to trusted file length. temp_file_object = tuf.util.TempFile() - temp_file_object.write('X') + temp_file_object.write(b'X') temp_file_object.seek(0) self.assertRaises(tuf.DownloadLengthMismatchError, self.repository_updater._hard_check_file_length, @@ -1240,7 +1238,7 @@ def test_10__hard_check_file_length(self): def test_10__soft_check_file_length(self): # Test for exception if file object is not equal to trusted file length. temp_file_object = tuf.util.TempFile() - temp_file_object.write('XXX') + temp_file_object.write(b'XXX') temp_file_object.seek(0) self.assertRaises(tuf.DownloadLengthMismatchError, self.repository_updater._soft_check_file_length, diff --git a/tests/test_util.py b/tests/test_util.py index 5c897a49..b5ce3694 100755 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -241,7 +241,7 @@ def test_A6_tempfile_decompress_temp_file_object(self): # Test decompression of invalid gzip file. temp_file = tuf.util.TempFile() - temp_file.write('bad zip') + temp_file.write(b'bad zip') contents = temp_file.read() self.assertRaises(tuf.DecompressionError, temp_file.decompress_temp_file_object, 'gzip')