From b0156944bbe62bb443096144e596367381a4692e Mon Sep 17 00:00:00 2001 From: Artiom Baloian Date: Wed, 2 Nov 2016 09:49:37 -0400 Subject: [PATCH] Moved util.py, now it is common --- tests/repository_data/generate.py | 8 +- .../repository_data/generate_project_data.py | 8 +- tests/test_arbitrary_package_attack.py | 14 +- tests/test_endless_data_attack.py | 24 +- tests/test_extraneous_dependencies_attack.py | 6 +- tests/test_indefinite_freeze_attack.py | 8 +- tests/test_interpose_updater.py | 2 +- tests/test_keys.py | 1 + tests/test_mix_and_match_attack.py | 2 +- tests/test_replay_attack.py | 16 +- tests/test_repository_lib.py | 26 +- tests/test_repository_tool.py | 12 +- tests/test_root_versioning_integration.py | 8 +- tests/test_slow_retrieval_attack.py | 2 +- tests/test_updater.py | 29 +- .../test_updater_root_rotation_integration.py | 2 +- tests/test_util.py | 130 +-- tuf/client/updater.py | 48 +- tuf/developer_tool.py | 10 +- tuf/download.py | 26 +- tuf/keys.py | 1 + tuf/mirrors.py | 6 +- tuf/pyca_crypto_keys.py | 6 +- tuf/pycrypto_keys.py | 8 +- tuf/repository_lib.py | 42 +- tuf/repository_tool.py | 4 +- tuf/util.py | 998 ------------------ 27 files changed, 226 insertions(+), 1221 deletions(-) delete mode 100755 tuf/util.py diff --git a/tests/repository_data/generate.py b/tests/repository_data/generate.py index 3ea31f98..e0b6bd57 100755 --- a/tests/repository_data/generate.py +++ b/tests/repository_data/generate.py @@ -29,7 +29,7 @@ import stat from tuf.repository_tool import * -import tuf.util +import tuf.ssl_crypto.util parser = optparse.OptionParser() @@ -95,11 +95,11 @@ # Create the target files (downloaded by clients) whose file size and digest # are specified in the 'targets.json' file. target1_filepath = 'repository/targets/file1.txt' -tuf.util.ensure_parent_dir(target1_filepath) +tuf.ssl_crypto.util.ensure_parent_dir(target1_filepath) target2_filepath = 'repository/targets/file2.txt' -tuf.util.ensure_parent_dir(target2_filepath) +tuf.ssl_crypto.util.ensure_parent_dir(target2_filepath) target3_filepath = 'repository/targets/file3.txt' -tuf.util.ensure_parent_dir(target2_filepath) +tuf.ssl_crypto.util.ensure_parent_dir(target2_filepath) if not options.dry_run: with open(target1_filepath, 'wt') as file_object: diff --git a/tests/repository_data/generate_project_data.py b/tests/repository_data/generate_project_data.py index 373d00ca..4fa420bb 100755 --- a/tests/repository_data/generate_project_data.py +++ b/tests/repository_data/generate_project_data.py @@ -20,7 +20,7 @@ import optparse import os -import tuf.util +import tuf.ssl_crypto.util from tuf.developer_tool import * parser = optparse.OptionParser() @@ -59,11 +59,11 @@ # Create the target files (downloaded by clients) whose file size and digest # are specified in the 'targets.json' file. target1_filepath = 'project/targets/file1.txt' -tuf.util.ensure_parent_dir(target1_filepath) +tuf.ssl_crypto.util.ensure_parent_dir(target1_filepath) target2_filepath = 'project/targets/file2.txt' -tuf.util.ensure_parent_dir(target2_filepath) +tuf.ssl_crypto.util.ensure_parent_dir(target2_filepath) target3_filepath = 'project/targets/file3.txt' -tuf.util.ensure_parent_dir(target2_filepath) +tuf.ssl_crypto.util.ensure_parent_dir(target2_filepath) if not options.dry_run: with open(target1_filepath, 'wt') as file_object: diff --git a/tests/test_arbitrary_package_attack.py b/tests/test_arbitrary_package_attack.py index e244c8a5..91ba230b 100755 --- a/tests/test_arbitrary_package_attack.py +++ b/tests/test_arbitrary_package_attack.py @@ -52,7 +52,7 @@ import tuf import tuf.tufformats -import tuf.util +import tuf.ssl_crypto.util import tuf.roledb import tuf.keydb import tuf.log @@ -181,7 +181,7 @@ def test_without_tuf(self): target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt') client_target_path = os.path.join(self.client_directory, 'file1.txt') self.assertFalse(os.path.exists(client_target_path)) - length, hashes = tuf.util.get_file_details(target_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(target_path) fileinfo = tuf.tufformats.make_fileinfo(length, hashes) url_prefix = self.repository_mirrors['mirror1']['url_prefix'] @@ -189,19 +189,19 @@ def test_without_tuf(self): six.moves.urllib.request.urlretrieve(url_file, client_target_path) self.assertTrue(os.path.exists(client_target_path)) - length, hashes = tuf.util.get_file_details(client_target_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path) download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes) self.assertEqual(fileinfo, download_fileinfo) # Test: Download a target file that has been modified by an attacker. with open(target_path, 'wt') as file_object: file_object.write('add malicious content.') - length, hashes = tuf.util.get_file_details(target_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(target_path) malicious_fileinfo = tuf.tufformats.make_fileinfo(length, hashes) six.moves.urllib.request.urlretrieve(url_file, client_target_path) - length, hashes = tuf.util.get_file_details(client_target_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path) download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes) # Verify 'download_fileinfo' is unequal to the original trusted version. @@ -259,12 +259,12 @@ def test_with_tuf_and_metadata_tampering(self): # An attacker also tries to add the malicious target's length and digest # to its metadata file. - length, hashes = tuf.util.get_file_details(target_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(target_path) metadata_path = \ os.path.join(self.repository_directory, 'metadata', 'targets.json') - metadata = tuf.util.load_json_file(metadata_path) + metadata = tuf.ssl_crypto.util.load_json_file(metadata_path) metadata['signed']['targets']['/file1.txt']['hashes'] = hashes metadata['signed']['targets']['/file1.txt']['length'] = length diff --git a/tests/test_endless_data_attack.py b/tests/test_endless_data_attack.py index 90ed12c2..d5e74bdd 100755 --- a/tests/test_endless_data_attack.py +++ b/tests/test_endless_data_attack.py @@ -55,7 +55,7 @@ import tuf import tuf.tufformats -import tuf.util +import tuf.ssl_crypto.util import tuf.log import tuf.client.updater as updater import tuf.unittest_toolbox as unittest_toolbox @@ -185,7 +185,7 @@ def test_without_tuf(self): target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt') client_target_path = os.path.join(self.client_directory, 'file1.txt') self.assertFalse(os.path.exists(client_target_path)) - length, hashes = tuf.util.get_file_details(target_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(target_path) fileinfo = tuf.tufformats.make_fileinfo(length, hashes) url_prefix = self.repository_mirrors['mirror1']['url_prefix'] @@ -193,7 +193,7 @@ def test_without_tuf(self): six.moves.urllib.request.urlretrieve(url_file, client_target_path) self.assertTrue(os.path.exists(client_target_path)) - length, hashes = tuf.util.get_file_details(client_target_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path) download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes) self.assertEqual(fileinfo, download_fileinfo) @@ -202,7 +202,7 @@ def test_without_tuf(self): with open(target_path, 'r+t') as file_object: original_content = file_object.read() file_object.write(original_content+('append large amount of data' * 100000)) - large_length, hashes = tuf.util.get_file_details(target_path) + large_length, hashes = tuf.ssl_crypto.util.get_file_details(target_path) malicious_fileinfo = tuf.tufformats.make_fileinfo(large_length, hashes) # Is the modified file actually larger? @@ -210,7 +210,7 @@ def test_without_tuf(self): six.moves.urllib.request.urlretrieve(url_file, client_target_path) - length, hashes = tuf.util.get_file_details(client_target_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path) download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes) # Verify 'download_fileinfo' is unequal to the original trusted version. @@ -234,10 +234,10 @@ def test_with_tuf(self): # Verify the client's downloaded file matches the repository's. target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt') - length, hashes = tuf.util.get_file_details(client_target_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path) fileinfo = tuf.tufformats.make_fileinfo(length, hashes) - length, hashes = tuf.util.get_file_details(client_target_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path) download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes) self.assertEqual(fileinfo, download_fileinfo) @@ -248,7 +248,7 @@ def test_with_tuf(self): file_object.write(original_content+('append large amount of data' * 10000)) # Is the modified file actually larger? - large_length, hashes = tuf.util.get_file_details(target_path) + large_length, hashes = tuf.ssl_crypto.util.get_file_details(target_path) self.assertTrue(large_length > length) os.remove(client_target_path) @@ -257,7 +257,7 @@ def test_with_tuf(self): # A large amount of data has been appended to the original content. The # extra data appended should be discarded by the client, so the downloaded # file size and hash should not have changed. - length, hashes = tuf.util.get_file_details(client_target_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path) download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes) self.assertEqual(fileinfo, download_fileinfo) @@ -265,16 +265,16 @@ def test_with_tuf(self): timestamp_path = os.path.join(self.repository_directory, 'metadata', 'timestamp.json') - original_length, hashes = tuf.util.get_file_details(timestamp_path) + original_length, hashes = tuf.ssl_crypto.util.get_file_details(timestamp_path) with open(timestamp_path, 'r+') as file_object: - timestamp_content = tuf.util.load_json_file(timestamp_path) + timestamp_content = tuf.ssl_crypto.util.load_json_file(timestamp_path) large_data = 'LargeTimestamp' * 10000 timestamp_content['signed']['_type'] = large_data json.dump(timestamp_content, file_object, indent=1, sort_keys=True) - modified_length, hashes = tuf.util.get_file_details(timestamp_path) + modified_length, hashes = tuf.ssl_crypto.util.get_file_details(timestamp_path) self.assertTrue(modified_length > original_length) # Does the TUF client download the upper limit of an unsafely fetched diff --git a/tests/test_extraneous_dependencies_attack.py b/tests/test_extraneous_dependencies_attack.py index aa2a5867..483f3b44 100755 --- a/tests/test_extraneous_dependencies_attack.py +++ b/tests/test_extraneous_dependencies_attack.py @@ -56,7 +56,7 @@ import unittest2 as unittest import tuf.tufformats -import tuf.util +import tuf.ssl_crypto.util import tuf.log import tuf.client.updater as updater import tuf.roledb @@ -189,9 +189,9 @@ def test_with_tuf(self): 'role1.json') file1_filepath = os.path.join(self.repository_directory, 'targets', 'file1.txt') - length, hashes = tuf.util.get_file_details(file1_filepath) + length, hashes = tuf.ssl_crypto.util.get_file_details(file1_filepath) - role1_metadata = tuf.util.load_json_file(role1_filepath) + role1_metadata = tuf.ssl_crypto.util.load_json_file(role1_filepath) role1_metadata['signed']['targets']['/file2.txt'] = {} role1_metadata['signed']['targets']['/file2.txt']['hashes'] = hashes role1_metadata['signed']['targets']['/file2.txt']['length'] = length diff --git a/tests/test_indefinite_freeze_attack.py b/tests/test_indefinite_freeze_attack.py index df0cb104..d89b93aa 100755 --- a/tests/test_indefinite_freeze_attack.py +++ b/tests/test_indefinite_freeze_attack.py @@ -60,7 +60,7 @@ import tuf.ssl_crypto.formats import tuf.tufformats -import tuf.util +import tuf.ssl_crypto.util import tuf.log import tuf.client.updater as updater import tuf.repository_tool as repo_tool @@ -211,7 +211,7 @@ def test_without_tuf(self): timestamp_path = os.path.join(self.repository_directory, 'metadata', 'timestamp.json') - timestamp_metadata = tuf.util.load_json_file(timestamp_path) + timestamp_metadata = tuf.ssl_crypto.util.load_json_file(timestamp_path) expiry_time = time.time() - 10 expires = tuf.tufformats.unix_timestamp_to_datetime(int(expiry_time)) expires = expires.isoformat() + 'Z' @@ -229,7 +229,7 @@ def test_without_tuf(self): 'timestamp.json') shutil.copy(timestamp_path, client_timestamp_path) - length, hashes = tuf.util.get_file_details(timestamp_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(timestamp_path) fileinfo = tuf.tufformats.make_fileinfo(length, hashes) url_prefix = self.repository_mirrors['mirror1']['url_prefix'] @@ -237,7 +237,7 @@ def test_without_tuf(self): six.moves.urllib.request.urlretrieve(url_file, client_timestamp_path) - length, hashes = tuf.util.get_file_details(client_timestamp_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(client_timestamp_path) download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes) # Verify 'download_fileinfo' is equal to the current local file. diff --git a/tests/test_interpose_updater.py b/tests/test_interpose_updater.py index 9e347ccf..4f547b1e 100755 --- a/tests/test_interpose_updater.py +++ b/tests/test_interpose_updater.py @@ -34,7 +34,7 @@ import json import tuf -import tuf.util +import tuf.ssl_crypto.util from simple_settings import settings import tuf.log import tuf.interposition.updater as updater diff --git a/tests/test_keys.py b/tests/test_keys.py index b2a7629b..18a9101c 100755 --- a/tests/test_keys.py +++ b/tests/test_keys.py @@ -30,6 +30,7 @@ import tuf import tuf.log +import tuf.pycrypto_keys import tuf.ssl_crypto.formats import tuf.tufformats import tuf.keys diff --git a/tests/test_mix_and_match_attack.py b/tests/test_mix_and_match_attack.py index 423f6eb5..6bfd9537 100755 --- a/tests/test_mix_and_match_attack.py +++ b/tests/test_mix_and_match_attack.py @@ -53,7 +53,7 @@ import unittest2 as unittest import tuf.tufformats -import tuf.util +import tuf.ssl_crypto.util import tuf.log import tuf.client.updater as updater import tuf.repository_tool as repo_tool diff --git a/tests/test_replay_attack.py b/tests/test_replay_attack.py index 83902e2d..b5a31ca4 100755 --- a/tests/test_replay_attack.py +++ b/tests/test_replay_attack.py @@ -54,7 +54,7 @@ import unittest2 as unittest import tuf.tufformats -import tuf.util +import tuf.ssl_crypto.util import tuf.log import tuf.client.updater as updater import tuf.repository_tool as repo_tool @@ -205,7 +205,7 @@ def test_without_tuf(self): # The fileinfo of the previous version is saved to verify that it is indeed # accepted by the non-TUF client. - length, hashes = tuf.util.get_file_details(backup_timestamp) + length, hashes = tuf.ssl_crypto.util.get_file_details(backup_timestamp) previous_fileinfo = tuf.tufformats.make_fileinfo(length, hashes) # Modify the timestamp file on the remote repository. @@ -227,7 +227,7 @@ def test_without_tuf(self): # Save the fileinfo of the new version generated to verify that it is # saved by the client. - length, hashes = tuf.util.get_file_details(timestamp_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(timestamp_path) new_fileinfo = tuf.tufformats.make_fileinfo(length, hashes) url_prefix = self.repository_mirrors['mirror1']['url_prefix'] @@ -237,7 +237,7 @@ def test_without_tuf(self): six.moves.urllib.request.urlretrieve(url_file, client_timestamp_path) - length, hashes = tuf.util.get_file_details(client_timestamp_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(client_timestamp_path) download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes) # Verify 'download_fileinfo' is equal to the new version. @@ -249,7 +249,7 @@ def test_without_tuf(self): six.moves.urllib.request.urlretrieve(url_file, client_timestamp_path) - length, hashes = tuf.util.get_file_details(client_timestamp_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(client_timestamp_path) download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes) # Verify 'download_fileinfo' is equal to the previous version. @@ -276,7 +276,7 @@ def test_with_tuf(self): # The fileinfo of the previous version is saved to verify that it is indeed # accepted by the non-TUF client. - length, hashes = tuf.util.get_file_details(backup_timestamp) + length, hashes = tuf.ssl_crypto.util.get_file_details(backup_timestamp) previous_fileinfo = tuf.tufformats.make_fileinfo(length, hashes) # Modify the timestamp file on the remote repository. @@ -298,7 +298,7 @@ def test_with_tuf(self): # Save the fileinfo of the new version generated to verify that it is # saved by the client. - length, hashes = tuf.util.get_file_details(timestamp_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(timestamp_path) new_fileinfo = tuf.tufformats.make_fileinfo(length, hashes) # Refresh top-level metadata, including 'timestamp.json'. Installation of @@ -307,7 +307,7 @@ def test_with_tuf(self): client_timestamp_path = os.path.join(self.client_directory, 'metadata', 'current', 'timestamp.json') - length, hashes = tuf.util.get_file_details(client_timestamp_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(client_timestamp_path) download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes) # Verify 'download_fileinfo' is equal to the new version. diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index dfc3b3b4..e07cbb18 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -425,7 +425,7 @@ def test_generate_root_metadata(self): # Load the root metadata provided in 'tuf/tests/repository_data/'. root_filepath = os.path.join('repository_data', 'repository', 'metadata', 'root.json') - root_signable = tuf.util.load_json_file(root_filepath) + root_signable = tuf.ssl_crypto.util.load_json_file(root_filepath) # generate_root_metadata() expects the top-level roles and keys to be # available in 'tuf.keydb' and 'tuf.roledb'. @@ -472,7 +472,7 @@ def test_generate_targets_metadata(self): temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) targets_directory = os.path.join(temporary_directory, 'targets') file1_path = os.path.join(targets_directory, 'file.txt') - tuf.util.ensure_parent_dir(file1_path) + tuf.ssl_crypto.util.ensure_parent_dir(file1_path) with open(file1_path, 'wt') as file_object: file_object.write('test file.') @@ -677,9 +677,9 @@ def test_sign_metadata(self): keystore_path = os.path.join('repository_data', 'keystore') root_filename = os.path.join(metadata_path, 'root.json') - root_metadata = tuf.util.load_json_file(root_filename)['signed'] + root_metadata = tuf.ssl_crypto.util.load_json_file(root_filename)['signed'] targets_filename = os.path.join(metadata_path, 'targets.json') - targets_metadata = tuf.util.load_json_file(targets_filename)['signed'] + targets_metadata = tuf.ssl_crypto.util.load_json_file(targets_filename)['signed'] tuf.keydb.create_keydb_from_root_metadata(root_metadata) tuf.roledb.create_roledb_from_root_metadata(root_metadata) @@ -735,7 +735,7 @@ def test_write_metadata_file(self): metadata_directory = os.path.join('repository_data', 'repository', 'metadata') root_filename = os.path.join(metadata_directory, 'root.json') - root_signable = tuf.util.load_json_file(root_filename) + root_signable = tuf.ssl_crypto.util.load_json_file(root_filename) output_filename = os.path.join(temporary_directory, 'root.json') compression_algorithms = ['gz'] @@ -833,7 +833,7 @@ def test_write_metadata_file(self): def test__write_compressed_metadata(self): # Test for invalid 'compressed_filename' argument and set # 'write_new_metadata' to False. - file_object = tuf.util.TempFile() + file_object = tuf.ssl_crypto.util.TempFile() existing_filename = os.path.join('repository_data', 'repository', 'metadata', 'root.json') @@ -845,7 +845,7 @@ def test__write_compressed_metadata(self): version_number=8) # Test writing of compressed metadata when consistent snapshots is enabled. - file_object = tuf.util.TempFile() + file_object = tuf.ssl_crypto.util.TempFile() shutil.copy(existing_filename, os.path.join(self.temporary_directory, '8.root.json.gz')) shutil.copy(existing_filename, os.path.join(self.temporary_directory, '8.root.json.zip')) shutil.copy(existing_filename, os.path.join(self.temporary_directory, 'root.json.zip')) @@ -922,7 +922,7 @@ def test__generate_and_write_metadata(self): # Load the root metadata provided in 'tuf/tests/repository_data/'. root_filepath = os.path.join('repository_data', 'repository', 'metadata', 'root.json') - root_signable = tuf.util.load_json_file(root_filepath) + root_signable = tuf.ssl_crypto.util.load_json_file(root_filepath) # _generate_and_write_metadata() expects the top-level roles # (specifically 'snapshot') and keys to be available in 'tuf.roledb'. @@ -937,7 +937,7 @@ def test__generate_and_write_metadata(self): 'targets.json') obsolete_metadata = os.path.join(metadata_directory, 'obsolete_role.json') - tuf.util.ensure_parent_dir(obsolete_metadata) + tuf.ssl_crypto.util.ensure_parent_dir(obsolete_metadata) shutil.copyfile(targets_metadata, obsolete_metadata) # Verify that obsolete metadata (a metadata file exists on disk, but the @@ -959,7 +959,7 @@ def test__generate_and_write_metadata(self): snapshot_filepath = os.path.join('repository_data', 'repository', 'metadata', 'snapshot.json') - snapshot_signable = tuf.util.load_json_file(snapshot_filepath) + snapshot_signable = tuf.ssl_crypto.util.load_json_file(snapshot_filepath) tuf.roledb.remove_role('obsolete_role') self.assertTrue(os.path.exists(os.path.join(metadata_directory, 'obsolete_role.json'))) @@ -979,7 +979,7 @@ def test__delete_obsolete_metadata(self): os.makedirs(metadata_directory) snapshot_filepath = os.path.join('repository_data', 'repository', 'metadata', 'snapshot.json') - snapshot_signable = tuf.util.load_json_file(snapshot_filepath) + snapshot_signable = tuf.ssl_crypto.util.load_json_file(snapshot_filepath) # Create role metadata that should not exist in snapshot.json. role1_filepath = os.path.join('repository_data', 'repository', @@ -1014,7 +1014,7 @@ def test__load_top_level_metadata(self): # Add a duplicate signature to the Root file for testing purposes). root_file = os.path.join(metadata_directory, 'root.json') - signable = tuf.util.load_json_file(os.path.join(metadata_directory, 'root.json')) + signable = tuf.ssl_crypto.util.load_json_file(os.path.join(metadata_directory, 'root.json')) signable['signatures'].append(signable['signatures'][0]) repo_lib.write_metadata_file(signable, root_file, 8, ['gz'], False) @@ -1062,7 +1062,7 @@ def test__remove_invalid_and_duplicate_signatures(self): # signatures). First load a valid signable (in this case, the root role). root_filepath = os.path.join('repository_data', 'repository', 'metadata', 'root.json') - root_signable = tuf.util.load_json_file(root_filepath) + root_signable = tuf.ssl_crypto.util.load_json_file(root_filepath) key_filepath = os.path.join('repository_data', 'keystore', 'root_key') root_rsa_key = repo_lib.import_rsa_privatekey_from_file(key_filepath, 'password') diff --git a/tests/test_repository_tool.py b/tests/test_repository_tool.py index 3dc2aea6..3871e150 100755 --- a/tests/test_repository_tool.py +++ b/tests/test_repository_tool.py @@ -227,7 +227,7 @@ def test_writeall(self): # Verify that the expected metadata is written. for role in ['root.json', 'targets.json', 'snapshot.json', 'timestamp.json']: role_filepath = os.path.join(metadata_directory, role) - role_signable = tuf.util.load_json_file(role_filepath) + role_signable = tuf.ssl_crypto.util.load_json_file(role_filepath) # Raise 'tuf.ssl_commons.exceptions.FormatError' if 'role_signable' is an invalid signable. tuf.tufformats.check_signable_object_format(role_signable) @@ -240,7 +240,7 @@ def test_writeall(self): # Verify the 'role1.json' delegation is also written. role1_filepath = os.path.join(metadata_directory, 'role1.json') - role1_signable = tuf.util.load_json_file(role1_filepath) + role1_signable = tuf.ssl_crypto.util.load_json_file(role1_filepath) tuf.tufformats.check_signable_object_format(role1_signable) # Verify that an exception is *not* raised for multiple @@ -709,7 +709,7 @@ def test_add_signature(self): metadata_directory = os.path.join('repository_data', 'repository', 'metadata') root_filepath = os.path.join(metadata_directory, 'root.json') - root_signable = tuf.util.load_json_file(root_filepath) + root_signable = tuf.ssl_crypto.util.load_json_file(root_filepath) signatures = root_signable['signatures'] # Add the first signature from the list, as only one is needed. @@ -737,7 +737,7 @@ def test_remove_signature(self): metadata_directory = os.path.join('repository_data', 'repository', 'metadata') root_filepath = os.path.join(metadata_directory, 'root.json') - root_signable = tuf.util.load_json_file(root_filepath) + root_signable = tuf.ssl_crypto.util.load_json_file(root_filepath) signatures = root_signable['signatures'] self.metadata.add_signature(signatures[0]) @@ -752,7 +752,7 @@ def test_remove_signature(self): # Test invalid signature argument (i.e., signature that has not been added.) # Load an unused signature to be tested. targets_filepath = os.path.join(metadata_directory, 'targets.json') - targets_signable = tuf.util.load_json_file(targets_filepath) + targets_signable = tuf.ssl_crypto.util.load_json_file(targets_filepath) signatures = targets_signable['signatures'] self.assertRaises(tuf.ssl_commons.exceptions.Error, self.metadata.remove_signature, signatures[0]) @@ -768,7 +768,7 @@ def test_signatures(self): metadata_directory = os.path.join('repository_data', 'repository', 'metadata') root_filepath = os.path.join(metadata_directory, 'root.json') - root_signable = tuf.util.load_json_file(root_filepath) + root_signable = tuf.ssl_crypto.util.load_json_file(root_filepath) signatures = root_signable['signatures'] # Add the first signature from the list, as only need one is needed. diff --git a/tests/test_root_versioning_integration.py b/tests/test_root_versioning_integration.py index ea493b87..fbe149b5 100755 --- a/tests/test_root_versioning_integration.py +++ b/tests/test_root_versioning_integration.py @@ -188,8 +188,8 @@ def test_root_role_versioning(self): root_filepath = os.path.join(metadata_directory, 'root.json') root_1_filepath = os.path.join(metadata_directory, '1.root.json') root_2_filepath = os.path.join(metadata_directory, '2.root.json') - old_root_signable = tuf.util.load_json_file(root_filepath) - root_1_signable = tuf.util.load_json_file(root_1_filepath) + old_root_signable = tuf.ssl_crypto.util.load_json_file(root_filepath) + root_1_signable = tuf.ssl_crypto.util.load_json_file(root_1_filepath) # Make a change to the root keys repository.root.add_verification_key(targets_pubkey) @@ -197,8 +197,8 @@ def test_root_role_versioning(self): repository.root.threshold = 2 repository.writeall() - new_root_signable = tuf.util.load_json_file(root_filepath) - root_2_signable = tuf.util.load_json_file(root_2_filepath) + new_root_signable = tuf.ssl_crypto.util.load_json_file(root_filepath) + root_2_signable = tuf.ssl_crypto.util.load_json_file(root_2_filepath) for role_signable in [old_root_signable, new_root_signable, root_1_signable, root_2_signable]: # Raise 'tuf.ssl_commons.exceptions.FormatError' if 'role_signable' is an invalid signable. diff --git a/tests/test_slow_retrieval_attack.py b/tests/test_slow_retrieval_attack.py index 8eedd7a7..52d4dd95 100755 --- a/tests/test_slow_retrieval_attack.py +++ b/tests/test_slow_retrieval_attack.py @@ -57,7 +57,7 @@ import unittest2 as unittest import tuf.tufformats -import tuf.util +import tuf.ssl_crypto.util import tuf.log import tuf.client.updater as updater import tuf.unittest_toolbox as unittest_toolbox diff --git a/tests/test_updater.py b/tests/test_updater.py index 2d2eb463..62baf908 100755 --- a/tests/test_updater.py +++ b/tests/test_updater.py @@ -64,7 +64,8 @@ import unittest2 as unittest import tuf -import tuf.util +import tuf.ssl_commons.exceptions +import tuf.ssl_crypto.util from simple_settings import settings import tuf.log import tuf.tufformats @@ -287,7 +288,7 @@ def test_1__load_metadata_from_file(self): # compare it against the loaded metadata by '_load_metadata_from_file()'. role1_filepath = \ os.path.join(self.client_metadata_current, 'role1.json') - role1_meta = tuf.util.load_json_file(role1_filepath) + role1_meta = tuf.ssl_crypto.util.load_json_file(role1_filepath) # Load the 'role1.json' file with _load_metadata_from_file, which should # store the loaded metadata in the 'self.repository_updater.metadata' @@ -376,7 +377,7 @@ def test_1__update_versioninfo(self): # on the repository. Load Snapshot to extract Root's version number # and compare it against the one loaded by 'self.repository_updater'. snapshot_filepath = os.path.join(self.client_metadata_current, 'snapshot.json') - snapshot_signable = tuf.util.load_json_file(snapshot_filepath) + snapshot_signable = tuf.ssl_crypto.util.load_json_file(snapshot_filepath) targets_versioninfo = snapshot_signable['signed']['meta']['targets.json'] # Verify that the manually loaded version number of root.json matches @@ -410,7 +411,7 @@ def test_1__update_fileinfo(self): self.assertEqual(len(fileinfo_dict), 1) self.assertTrue(tuf.ssl_crypto.formats.FILEDICT_SCHEMA.matches(fileinfo_dict)) root_filepath = os.path.join(self.client_metadata_current, 'root.json') - length, hashes = tuf.util.get_file_details(root_filepath) + length, hashes = tuf.ssl_crypto.util.get_file_details(root_filepath) root_fileinfo = tuf.tufformats.make_fileinfo(length, hashes) self.assertTrue('root.json' in fileinfo_dict) self.assertEqual(fileinfo_dict['root.json'], root_fileinfo) @@ -431,7 +432,7 @@ def test_1__update_fileinfo(self): def test_2__fileinfo_has_changed(self): # Verify that the method returns 'False' if file info was not changed. root_filepath = os.path.join(self.client_metadata_current, 'root.json') - length, hashes = tuf.util.get_file_details(root_filepath) + length, hashes = tuf.ssl_crypto.util.get_file_details(root_filepath) root_fileinfo = tuf.tufformats.make_fileinfo(length, hashes) self.assertFalse(self.repository_updater._fileinfo_has_changed('root.json', root_fileinfo)) @@ -496,7 +497,7 @@ def test_2__import_delegations(self): # Verify that keydb dictionary was updated. role1_signable = \ - tuf.util.load_json_file(os.path.join(self.client_metadata_current, + tuf.ssl_crypto.util.load_json_file(os.path.join(self.client_metadata_current, 'role1.json')) keyids = [] for signature in role1_signable['signatures']: @@ -541,7 +542,7 @@ def test_2__import_delegations(self): def test_2__versioninfo_has_been_updated(self): # Verify that the method returns 'False' if a versioninfo was not changed. snapshot_filepath = os.path.join(self.client_metadata_current, 'snapshot.json') - snapshot_signable = tuf.util.load_json_file(snapshot_filepath) + snapshot_signable = tuf.ssl_crypto.util.load_json_file(snapshot_filepath) targets_versioninfo = snapshot_signable['signed']['meta']['targets.json'] self.assertFalse(self.repository_updater._versioninfo_has_been_updated('targets.json', @@ -660,7 +661,7 @@ def test_3__update_metadata(self): targets_versioninfo['version']) self.assertTrue('targets' in self.repository_updater.metadata['current']) - targets_signable = tuf.util.load_json_file(targets_filepath) + targets_signable = tuf.ssl_crypto.util.load_json_file(targets_filepath) loaded_targets_version = targets_signable['signed']['version'] self.assertEqual(targets_versioninfo['version'], loaded_targets_version) @@ -929,7 +930,7 @@ def test_5_targets_of_role(self): # as available on the server-side version of the role. role1_filepath = os.path.join(self.repository_directory, 'metadata', 'role1.json') - role1_signable = tuf.util.load_json_file(role1_filepath) + role1_signable = tuf.ssl_crypto.util.load_json_file(role1_filepath) expected_targets = role1_signable['signed']['targets'] @@ -1079,7 +1080,7 @@ def test_6_download_target(self): download_filepath = \ os.path.join(destination_directory, target_filepath1.lstrip('/')) self.assertTrue(os.path.exists(download_filepath)) - length, hashes = tuf.util.get_file_details(download_filepath, settings.REPOSITORY_HASH_ALGORITHMS) + length, hashes = tuf.ssl_crypto.util.get_file_details(download_filepath, settings.REPOSITORY_HASH_ALGORITHMS) download_targetfileinfo = tuf.tufformats.make_fileinfo(length, hashes) # Add any 'custom' data from the repository's target fileinfo to the @@ -1218,7 +1219,7 @@ def test_7_updated_targets(self): repository.targets.remove_target(target1) - length, hashes = tuf.util.get_file_details(target1) + length, hashes = tuf.ssl_crypto.util.get_file_details(target1) repository.targets.add_target(target1) repository.targets.load_signing_key(self.role_keys['targets']['private']) @@ -1227,7 +1228,7 @@ def test_7_updated_targets(self): with open(target1, 'a') as file_object: file_object.write('append extra text') - length, hashes = tuf.util.get_file_details(target1) + length, hashes = tuf.ssl_crypto.util.get_file_details(target1) repository.targets.add_target(target1) repository.targets.load_signing_key(self.role_keys['targets']['private']) @@ -1335,7 +1336,7 @@ def test_9__get_target_hash(self): def test_10__hard_check_file_length(self): # Test for exception if file object is not equal to trusted file length. - temp_file_object = tuf.util.TempFile() + temp_file_object = tuf.ssl_crypto.util.TempFile() temp_file_object.write(b'X') temp_file_object.seek(0) self.assertRaises(tuf.ssl_commons.exceptions.DownloadLengthMismatchError, @@ -1348,7 +1349,7 @@ def test_10__hard_check_file_length(self): def test_10__soft_check_file_length(self): # Test for exception if file object is not equal to trusted file length. - temp_file_object = tuf.util.TempFile() + temp_file_object = tuf.ssl_crypto.util.TempFile() temp_file_object.write(b'XXX') temp_file_object.seek(0) self.assertRaises(tuf.ssl_commons.exceptions.DownloadLengthMismatchError, diff --git a/tests/test_updater_root_rotation_integration.py b/tests/test_updater_root_rotation_integration.py index 08c972e3..7e7f88e6 100755 --- a/tests/test_updater_root_rotation_integration.py +++ b/tests/test_updater_root_rotation_integration.py @@ -56,7 +56,7 @@ import unittest2 as unittest import tuf -import tuf.util +import tuf.ssl_crypto.util from simple_settings import settings import tuf.log import tuf.tufformats diff --git a/tests/test_util.py b/tests/test_util.py index 0afaf5b6..0f7e4949 100755 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -37,7 +37,7 @@ import tuf import tuf.log import tuf.ssl_crypto.hash -import tuf.util +import tuf.ssl_crypto.util from simple_settings import settings import tuf.unittest_toolbox as unittest_toolbox @@ -50,7 +50,7 @@ class TestUtil(unittest_toolbox.Modified_TestCase): def setUp(self): unittest_toolbox.Modified_TestCase.setUp(self) - self.temp_fileobj = tuf.util.TempFile() + self.temp_fileobj = tuf.ssl_crypto.util.TempFile() @@ -86,10 +86,10 @@ def _extract_tempfile_directory(self, config_temp_dir=None): # Patching 'tempfile.TemporaryFile()' (by substituting # temfile.TemporaryFile() with tempfile.mkstemp()) in order to get the # directory of the stored tempfile object. - saved_tempfile_TemporaryFile = tuf.util.tempfile.NamedTemporaryFile - tuf.util.tempfile.NamedTemporaryFile = tempfile.mkstemp - _temp_fileobj = tuf.util.TempFile() - tuf.util.tempfile.NamedTemporaryFile = saved_tempfile_TemporaryFile + saved_tempfile_TemporaryFile = tuf.ssl_crypto.util.tempfile.NamedTemporaryFile + tuf.ssl_crypto.util.tempfile.NamedTemporaryFile = tempfile.mkstemp + _temp_fileobj = tuf.ssl_crypto.util.TempFile() + tuf.ssl_crypto.util.tempfile.NamedTemporaryFile = saved_tempfile_TemporaryFile junk, _tempfilepath = _temp_fileobj.temporary_file _tempfile_dir = os.path.dirname(_tempfilepath) @@ -109,14 +109,14 @@ def test_A2_tempfile_init(self): # Test: Expected input verification. # Assumed 'settings.temporary_directory' is 'None' initially. - temp_file = tuf.util.TempFile() + temp_file = tuf.ssl_crypto.util.TempFile() temp_file_directory = os.path.dirname(temp_file.temporary_file.name) self.assertEqual(tempfile.gettempdir(), temp_file_directory) saved_temporary_directory = settings.temporary_directory temp_directory = self.make_temp_directory() settings.temporary_directory = temp_directory - temp_file = tuf.util.TempFile() + temp_file = tuf.ssl_crypto.util.TempFile() temp_file_directory = os.path.dirname(temp_file.temporary_file.name) self.assertEqual(temp_directory, temp_file_directory) @@ -243,7 +243,7 @@ def test_A6_tempfile_decompress_temp_file_object(self): self.temp_fileobj.decompress_temp_file_object, 'gzip') # Test decompression of invalid gzip file. - temp_file = tuf.util.TempFile() + temp_file = tuf.ssl_crypto.util.TempFile() temp_file.write(b'bad zip') contents = temp_file.read() self.assertRaises(tuf.ssl_commons.exceptions.DecompressionError, @@ -263,7 +263,7 @@ def test_B1_get_file_details(self): file_length = os.path.getsize(filepath) # Test: Expected input. - self.assertEqual(tuf.util.get_file_details(filepath), (file_length, file_hash)) + self.assertEqual(tuf.ssl_crypto.util.get_file_details(filepath), (file_length, file_hash)) # Test: Incorrect input. bogus_inputs = [self.random_string(), 1234, [self.random_string()], @@ -271,9 +271,9 @@ def test_B1_get_file_details(self): for bogus_input in bogus_inputs: if isinstance(bogus_input, six.string_types): - self.assertRaises(tuf.ssl_commons.exceptions.Error, tuf.util.get_file_details, bogus_input) + self.assertRaises(tuf.ssl_commons.exceptions.Error, tuf.ssl_crypto.util.get_file_details, bogus_input) else: - self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.get_file_details, bogus_input) + self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.get_file_details, bogus_input) @@ -283,10 +283,10 @@ def test_B2_ensure_parent_dir(self): for parent_dir in [existing_parent_dir, non_existing_parent_dir, 12, [3]]: if isinstance(parent_dir, six.string_types): - tuf.util.ensure_parent_dir(os.path.join(parent_dir, 'a.txt')) + tuf.ssl_crypto.util.ensure_parent_dir(os.path.join(parent_dir, 'a.txt')) self.assertTrue(os.path.isdir(parent_dir)) else: - self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.ensure_parent_dir, parent_dir) + self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.ensure_parent_dir, parent_dir) @@ -294,7 +294,7 @@ def test_B3_file_in_confined_directories(self): # Goal: Provide invalid input for 'filepath' and 'confined_directories'. # Include inputs like: '[1, 2, "a"]' and such... # Reference to 'file_in_confined_directories()' to improve readability. - in_confined_directory = tuf.util.file_in_confined_directories + in_confined_directory = tuf.ssl_crypto.util.file_in_confined_directories list_of_confined_directories = ['a', 12, {'a':'a'}, [1]] list_of_filepaths = [12, ['a'], {'a':'a'}, 'a'] for bogus_confined_directory in list_of_confined_directories: @@ -323,25 +323,25 @@ def test_B3_file_in_confined_directories(self): def test_B4_import_json(self): self.assertTrue('json' in sys.modules) - json_module = tuf.util.import_json() + json_module = tuf.ssl_crypto.util.import_json() self.assertTrue(json_module is not None) # Test import_json() when 'util._json_moduel' is non-None. - tuf.util._json_module = 'junk_module' - self.assertEqual(tuf.util.import_json(), 'junk_module') + tuf.ssl_crypto.util._json_module = 'junk_module' + self.assertEqual(tuf.ssl_crypto.util.import_json(), 'junk_module') def test_B5_load_json_string(self): # Test normal case. data = ['a', {'b': ['c', None, 30.3, 29]}] - json_string = tuf.util.json.dumps(data) - self.assertEqual(data, tuf.util.load_json_string(json_string)) + json_string = tuf.ssl_crypto.util.json.dumps(data) + self.assertEqual(data, tuf.ssl_crypto.util.load_json_string(json_string)) # Test invalid arguments. - self.assertRaises(tuf.ssl_commons.exceptions.Error, tuf.util.load_json_string, 8) + self.assertRaises(tuf.ssl_commons.exceptions.Error, tuf.ssl_crypto.util.load_json_string, 8) invalid_json_string = {'a': tuf.ssl_commons.exceptions.FormatError} - self.assertRaises(tuf.ssl_commons.exceptions.Error, tuf.util.load_json_string, invalid_json_string) + self.assertRaises(tuf.ssl_commons.exceptions.Error, tuf.ssl_crypto.util.load_json_string, invalid_json_string) @@ -349,25 +349,25 @@ def test_B6_load_json_file(self): data = ['a', {'b': ['c', None, 30.3, 29]}] filepath = self.make_temp_file() fileobj = open(filepath, 'wt') - tuf.util.json.dump(data, fileobj) + tuf.ssl_crypto.util.json.dump(data, fileobj) fileobj.close() - self.assertEqual(data, tuf.util.load_json_file(filepath)) + self.assertEqual(data, tuf.ssl_crypto.util.load_json_file(filepath)) # Test a gzipped file. compressed_filepath = self._compress_existing_file(filepath) - self.assertEqual(data, tuf.util.load_json_file(compressed_filepath)) + self.assertEqual(data, tuf.ssl_crypto.util.load_json_file(compressed_filepath)) # Improperly formatted arguments. for bogus_arg in [1, [b'a'], {'a':b'b'}]: - self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.load_json_file, bogus_arg) + self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.load_json_file, bogus_arg) # Non-existent path. - self.assertRaises(IOError, tuf.util.load_json_file, 'non-existent.json') + self.assertRaises(IOError, tuf.ssl_crypto.util.load_json_file, 'non-existent.json') # Invalid JSON content. with open(filepath, 'a') as filepath: filepath.write('junk data') - self.assertRaises(tuf.ssl_commons.exceptions.Error, tuf.util.load_json_file, filepath) + self.assertRaises(tuf.ssl_commons.exceptions.Error, tuf.ssl_crypto.util.load_json_file, filepath) @@ -381,10 +381,10 @@ def test_C1_get_target_hash(self): for filepath, target_hash in six.iteritems(expected_target_hashes): self.assertTrue(tuf.ssl_crypto.formats.RELPATH_SCHEMA.matches(filepath)) self.assertTrue(tuf.ssl_crypto.formats.HASH_SCHEMA.matches(target_hash)) - self.assertEqual(tuf.util.get_target_hash(filepath), target_hash) + self.assertEqual(tuf.ssl_crypto.util.get_target_hash(filepath), target_hash) # Test for improperly formatted argument. - self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.get_target_hash, 8) + self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.get_target_hash, 8) @@ -415,21 +415,21 @@ def test_C2_find_delegated_role(self): ] self.assertTrue(tuf.ssl_crypto.formats.ROLELIST_SCHEMA.matches(role_list)) - self.assertEqual(tuf.util.find_delegated_role(role_list, 'targets/tuf'), 1) - self.assertEqual(tuf.util.find_delegated_role(role_list, 'targets/warehouse'), 0) + self.assertEqual(tuf.ssl_crypto.util.find_delegated_role(role_list, 'targets/tuf'), 1) + self.assertEqual(tuf.ssl_crypto.util.find_delegated_role(role_list, 'targets/warehouse'), 0) # Test for non-existent role. 'find_delegated_role()' returns 'None' # if the role is not found. - self.assertEqual(tuf.util.find_delegated_role(role_list, 'targets/non-existent'), + self.assertEqual(tuf.ssl_crypto.util.find_delegated_role(role_list, 'targets/non-existent'), None) # Test improperly formatted arguments. - self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.find_delegated_role, 8, role_list) - self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.find_delegated_role, 8, 'targets/tuf') + self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.find_delegated_role, 8, role_list) + self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.find_delegated_role, 8, 'targets/tuf') # Test duplicate roles. role_list.append(role_list[1]) - self.assertRaises(tuf.ssl_commons.exceptions.RepositoryError, tuf.util.find_delegated_role, role_list, + self.assertRaises(tuf.ssl_commons.exceptions.RepositoryError, tuf.ssl_crypto.util.find_delegated_role, role_list, 'targets/tuf') # Test missing 'name' attribute (optional, but required by @@ -437,7 +437,7 @@ def test_C2_find_delegated_role(self): # Delete the duplicate role, and the remaining role's 'name' attribute. del role_list[2] del role_list[0]['name'] - self.assertRaises(tuf.ssl_commons.exceptions.RepositoryError, tuf.util.find_delegated_role, role_list, + self.assertRaises(tuf.ssl_commons.exceptions.RepositoryError, tuf.ssl_crypto.util.find_delegated_role, role_list, 'targets/warehouse') @@ -450,38 +450,38 @@ def test_C3_paths_are_consistent_with_hash_prefixes(self): # Ensure the paths of 'list_of_targets' each have the expected path hash # prefix listed in 'path_hash_prefixes'. for filepath in list_of_targets: - self.assertTrue(tuf.util.get_target_hash(filepath)[0:4] in path_hash_prefixes) + self.assertTrue(tuf.ssl_crypto.util.get_target_hash(filepath)[0:4] in path_hash_prefixes) - self.assertTrue(tuf.util.paths_are_consistent_with_hash_prefixes(list_of_targets, + self.assertTrue(tuf.ssl_crypto.util.paths_are_consistent_with_hash_prefixes(list_of_targets, path_hash_prefixes)) extra_invalid_prefix = ['e3a3', '8fae', 'd543', '0000'] - self.assertTrue(tuf.util.paths_are_consistent_with_hash_prefixes(list_of_targets, + self.assertTrue(tuf.ssl_crypto.util.paths_are_consistent_with_hash_prefixes(list_of_targets, extra_invalid_prefix)) # Test improperly formatted arguments. self.assertRaises(tuf.ssl_commons.exceptions.FormatError, - tuf.util.paths_are_consistent_with_hash_prefixes, 8, + tuf.ssl_crypto.util.paths_are_consistent_with_hash_prefixes, 8, path_hash_prefixes) self.assertRaises(tuf.ssl_commons.exceptions.FormatError, - tuf.util.paths_are_consistent_with_hash_prefixes, + tuf.ssl_crypto.util.paths_are_consistent_with_hash_prefixes, list_of_targets, 8) self.assertRaises(tuf.ssl_commons.exceptions.FormatError, - tuf.util.paths_are_consistent_with_hash_prefixes, + tuf.ssl_crypto.util.paths_are_consistent_with_hash_prefixes, list_of_targets, ['zza1']) # Test invalid list of targets. bad_target_path = '/file5.txt' - self.assertTrue(tuf.util.get_target_hash(bad_target_path)[0:4] not in + self.assertTrue(tuf.ssl_crypto.util.get_target_hash(bad_target_path)[0:4] not in path_hash_prefixes) - self.assertFalse(tuf.util.paths_are_consistent_with_hash_prefixes([bad_target_path], + self.assertFalse(tuf.ssl_crypto.util.paths_are_consistent_with_hash_prefixes([bad_target_path], path_hash_prefixes)) # Add invalid target path to 'list_of_targets'. list_of_targets.append(bad_target_path) - self.assertFalse(tuf.util.paths_are_consistent_with_hash_prefixes(list_of_targets, + self.assertFalse(tuf.ssl_crypto.util.paths_are_consistent_with_hash_prefixes(list_of_targets, path_hash_prefixes)) @@ -515,42 +515,42 @@ def test_C4_ensure_all_targets_allowed(self): } self.assertTrue(tuf.ssl_crypto.formats.DELEGATIONS_SCHEMA.matches(parent_delegations)) - tuf.util.ensure_all_targets_allowed(rolename, list_of_targets, + tuf.ssl_crypto.util.ensure_all_targets_allowed(rolename, list_of_targets, parent_delegations) # The target files of 'targets' are always allowed. 'list_of_targets' and # 'parent_delegations' are not checked in this case. - tuf.util.ensure_all_targets_allowed('targets', list_of_targets, + tuf.ssl_crypto.util.ensure_all_targets_allowed('targets', list_of_targets, parent_delegations) # Test improperly formatted arguments. - self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.ensure_all_targets_allowed, + self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.ensure_all_targets_allowed, 8, list_of_targets, parent_delegations) - self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.ensure_all_targets_allowed, + self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.ensure_all_targets_allowed, rolename, 8, parent_delegations) - self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.ensure_all_targets_allowed, + self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.ensure_all_targets_allowed, rolename, list_of_targets, 8) # Test for invalid 'rolename', which has not been delegated by its parent, # 'targets'. - self.assertRaises(tuf.ssl_commons.exceptions.RepositoryError, tuf.util.ensure_all_targets_allowed, + self.assertRaises(tuf.ssl_commons.exceptions.RepositoryError, tuf.ssl_crypto.util.ensure_all_targets_allowed, 'targets/non-delegated_rolename', list_of_targets, parent_delegations) # Test for target file that is not allowed by the parent role. - self.assertRaises(tuf.ssl_commons.exceptions.ForbiddenTargetError, tuf.util.ensure_all_targets_allowed, + self.assertRaises(tuf.ssl_commons.exceptions.ForbiddenTargetError, tuf.ssl_crypto.util.ensure_all_targets_allowed, 'targets/warehouse', ['file1.zip'], parent_delegations) - self.assertRaises(tuf.ssl_commons.exceptions.ForbiddenTargetError, tuf.util.ensure_all_targets_allowed, + self.assertRaises(tuf.ssl_commons.exceptions.ForbiddenTargetError, tuf.ssl_crypto.util.ensure_all_targets_allowed, 'targets/warehouse', ['file1.txt', 'bad-README.txt'], parent_delegations) # Test for required attributes. # Missing 'paths' attribute. del parent_delegations['roles'][0]['paths'] - self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.ensure_all_targets_allowed, + self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.ensure_all_targets_allowed, 'targets/warehouse', list_of_targets, parent_delegations) # Test 'path_hash_prefixes' attribute. @@ -558,15 +558,15 @@ def test_C4_ensure_all_targets_allowed(self): parent_delegations['roles'][0]['path_hash_prefixes'] = path_hash_prefixes # Test normal case for 'path_hash_prefixes'. - tuf.util.ensure_all_targets_allowed('targets/warehouse', list_of_targets, + tuf.ssl_crypto.util.ensure_all_targets_allowed('targets/warehouse', list_of_targets, parent_delegations) # Test target file with a path_hash_prefix that is not allowed in its # parent role. - path_hash_prefix = tuf.util.get_target_hash('file5.txt')[0:4] + path_hash_prefix = tuf.ssl_crypto.util.get_target_hash('file5.txt')[0:4] self.assertTrue(path_hash_prefix not in parent_delegations['roles'][0] ['path_hash_prefixes']) - self.assertRaises(tuf.ssl_commons.exceptions.ForbiddenTargetError, tuf.util.ensure_all_targets_allowed, + self.assertRaises(tuf.ssl_commons.exceptions.ForbiddenTargetError, tuf.ssl_crypto.util.ensure_all_targets_allowed, 'targets/warehouse', ['file5.txt'], parent_delegations) @@ -584,7 +584,7 @@ def test_c6_get_compressed_length(self): self.temp_fileobj.write(b'hello world') self.assertTrue(self.temp_fileobj.get_compressed_length() == 11) - temp_file = tuf.util.TempFile() + temp_file = tuf.ssl_crypto.util.TempFile() @@ -592,25 +592,25 @@ def test_digests_are_equal(self): digest = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855' # Normal case: test for digests that are equal. - self.assertTrue(tuf.util.digests_are_equal(digest, digest)) + self.assertTrue(tuf.ssl_crypto.util.digests_are_equal(digest, digest)) # Normal case: test for digests that are unequal. - self.assertFalse(tuf.util.digests_are_equal(digest, '0a8df1')) + self.assertFalse(tuf.ssl_crypto.util.digests_are_equal(digest, '0a8df1')) # Test for invalid arguments. - self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.digests_are_equal, 7, + self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.digests_are_equal, 7, digest) - self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.digests_are_equal, digest, + self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.digests_are_equal, digest, 7) # Test that digests_are_equal() takes the same amount of time to compare # equal and unequal arguments. runtime = timeit.timeit('digests_are_equal("ab8df", "ab8df")', - setup='from tuf.util import digests_are_equal', + setup='from tuf.ssl_crypto.util import digests_are_equal', number=100000) runtime2 = timeit.timeit('digests_are_equal("ab8df", "1b8df")', - setup='from tuf.util import digests_are_equal', + setup='from tuf.ssl_crypto.util import digests_are_equal', number=100000) runtime3 = timeit.timeit('"ab8df" == "ab8df"', number=100000) diff --git a/tuf/client/updater.py b/tuf/client/updater.py index 4e7753c7..fe3537ae 100755 --- a/tuf/client/updater.py +++ b/tuf/client/updater.py @@ -126,7 +126,7 @@ import tuf.mirrors import tuf.roledb import tuf.sig -import tuf.util +import tuf.ssl_crypto.util import six import iso8601 @@ -423,7 +423,7 @@ def _load_metadata_from_file(self, metadata_set, metadata_role): # Load the file. The loaded object should conform to # 'tuf.ssl_crypto.formats.SIGNABLE_SCHEMA'. try: - metadata_signable = tuf.util.load_json_file(metadata_filepath) + metadata_signable = tuf.ssl_crypto.util.load_json_file(metadata_filepath) # Although the metadata file may exist locally, it may not # be a valid json file. On the next refresh cycle, it will be @@ -718,7 +718,7 @@ def _update_root_metadata(self, current_root_metadata, compression_algorithm=Non settings.DEFAULT_ROOT_REQUIRED_LENGTH, None, compression_algorithm=compression_algorithm) latest_root_metadata = \ - tuf.util.load_json_string(latest_root_metadata_file.read().decode('utf-8')) + tuf.ssl_crypto.util.load_json_string(latest_root_metadata_file.read().decode('utf-8')) next_version = current_root_metadata['version'] + 1 @@ -750,7 +750,7 @@ def _check_hashes(self, file_object, trusted_hashes): file_object: - A 'tuf.util.TempFile' file-like object. 'file_object' ensures that a + A 'tuf.ssl_crypto.util.TempFile' file-like object. 'file_object' ensures that a read() without a size argument properly reads the entire file. trusted_hashes: @@ -795,7 +795,7 @@ def _hard_check_file_length(self, file_object, trusted_file_length): file_object: - A 'tuf.util.TempFile' file-like object. 'file_object' ensures that a + A 'tuf.ssl_crypto.util.TempFile' file-like object. 'file_object' ensures that a read() without a size argument properly reads the entire file. trusted_file_length: @@ -812,7 +812,7 @@ def _hard_check_file_length(self, file_object, trusted_file_length): None. """ - # Read the entire contents of 'file_object', a 'tuf.util.TempFile' file-like + # Read the entire contents of 'file_object', a 'tuf.ssl_crypto.util.TempFile' file-like # object that ensures the entire file is read. observed_length = len(file_object.read()) @@ -835,14 +835,14 @@ def _soft_check_file_length(self, file_object, trusted_file_length): """ Non-public method that checks the trusted file length of a - 'tuf.util.TempFile' file-like object. The length of the file must be less + 'tuf.ssl_crypto.util.TempFile' file-like object. The length of the file must be less than or equal to the expected length. This is a deliberately redundant implementation designed to complement tuf.download._check_downloaded_length(). file_object: - A 'tuf.util.TempFile' file-like object. 'file_object' ensures that a + A 'tuf.ssl_crypto.util.TempFile' file-like object. 'file_object' ensures that a read() without a size argument properly reads the entire file. trusted_file_length: @@ -859,7 +859,7 @@ def _soft_check_file_length(self, file_object, trusted_file_length): None. """ - # Read the entire contents of 'file_object', a 'tuf.util.TempFile' file-like + # Read the entire contents of 'file_object', a 'tuf.ssl_crypto.util.TempFile' file-like # object that ensures the entire file is read. observed_length = len(file_object.read()) @@ -907,7 +907,7 @@ def _get_target_file(self, target_filepath, file_length, file_hashes): a temporary file and returned. - A 'tuf.util.TempFile' file-like object containing the target. + A 'tuf.ssl_crypto.util.TempFile' file-like object containing the target. """ # Define a callable function that is passed as an argument to _get_file() @@ -946,7 +946,7 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object, metadata_file_object: - A 'tuf.util.TempFile' instance containing the metadata file. + A 'tuf.ssl_crypto.util.TempFile' instance containing the metadata file. 'metadata_file_object' ensures the entire file is returned with read(). metadata_role: @@ -980,7 +980,7 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object, metadata = metadata_file_object.read().decode('utf-8') try: - metadata_signable = tuf.util.load_json_string(metadata) + metadata_signable = tuf.ssl_crypto.util.load_json_string(metadata) except Exception as exception: raise tuf.ssl_commons.exceptions.InvalidMetadataJSONError(exception) @@ -1048,7 +1048,7 @@ def _get_metadata_file(self, metadata_role, remote_filename, file and returned. - A 'tuf.util.TempFile' file-like object containing the metadata. + A 'tuf.ssl_crypto.util.TempFile' file-like object containing the metadata. """ file_mirrors = tuf.mirrors.get_list_of_mirrors('meta', remote_filename, @@ -1073,7 +1073,7 @@ def _get_metadata_file(self, metadata_role, remote_filename, # 'file_object' is also verified if decompressed above (i.e., the # uncompressed version). metadata_signable = \ - tuf.util.load_json_string(file_object.read().decode('utf-8')) + tuf.ssl_crypto.util.load_json_string(file_object.read().decode('utf-8')) # If the version number is unspecified, ensure that the version number # downloaded is greater than the currently trusted version number for @@ -1164,7 +1164,7 @@ def _get_file(self, filepath, verify_file_function, file_type, The relative metadata or target filepath. verify_file_function: - A callable function that expects a 'tuf.util.TempFile' file-like object + A callable function that expects a 'tuf.ssl_crypto.util.TempFile' file-like object and raises an exception if the file is invalid. Target files and uncompressed versions of metadata may be verified with 'verify_file_function'. @@ -1202,7 +1202,7 @@ def _get_file(self, filepath, verify_file_function, file_type, file and returned. - A 'tuf.util.TempFile' file-like object containing the metadata or target. + A 'tuf.ssl_crypto.util.TempFile' file-like object containing the metadata or target. """ file_mirrors = tuf.mirrors.get_list_of_mirrors(file_type, filepath, @@ -1347,7 +1347,7 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None, current_filepath = os.path.join(self.metadata_directory['current'], metadata_filename) current_filepath = os.path.abspath(current_filepath) - tuf.util.ensure_parent_dir(current_filepath) + tuf.ssl_crypto.util.ensure_parent_dir(current_filepath) previous_filepath = os.path.join(self.metadata_directory['previous'], metadata_filename) @@ -1355,14 +1355,14 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None, if os.path.exists(current_filepath): # Previous metadata might not exist, say when delegations are added. - tuf.util.ensure_parent_dir(previous_filepath) + tuf.ssl_crypto.util.ensure_parent_dir(previous_filepath) shutil.move(current_filepath, previous_filepath) # Next, move the verified updated metadata file to the 'current' directory. - # Note that the 'move' method comes from tuf.util's TempFile class. - # 'metadata_file_object' is an instance of tuf.util.TempFile. + # Note that the 'move' method comes from tuf.ssl_crypto.util's TempFile class. + # 'metadata_file_object' is an instance of tuf.ssl_crypto.util.TempFile. metadata_signable = \ - tuf.util.load_json_string(metadata_file_object.read().decode('utf-8')) + tuf.ssl_crypto.util.load_json_string(metadata_file_object.read().decode('utf-8')) if compression_algorithm == 'gzip': current_uncompressed_filepath = \ @@ -1834,7 +1834,7 @@ def _update_fileinfo(self, metadata_filename): # Extract the file information from the actual file and save it # to the fileinfo store. - file_length, hashes = tuf.util.get_file_details(current_filepath) + file_length, hashes = tuf.ssl_crypto.util.get_file_details(current_filepath) metadata_fileinfo = tuf.tufformats.make_fileinfo(file_length, hashes) self.fileinfo[metadata_filename] = metadata_fileinfo @@ -1879,7 +1879,7 @@ def _move_current_to_previous(self, metadata_role): # Move the current path to the previous path. if os.path.exists(current_filepath): - tuf.util.ensure_parent_dir(previous_filepath) + tuf.ssl_crypto.util.ensure_parent_dir(previous_filepath) os.rename(current_filepath, previous_filepath) @@ -2536,7 +2536,7 @@ def _visit_child_role(self, child_role, target_filepath, parent_delegations): # Is the child role allowed by its parent role to specify this path # in its metadata? try: - tuf.util.ensure_all_targets_allowed(child_role_name, [target_filepath], + tuf.ssl_crypto.util.ensure_all_targets_allowed(child_role_name, [target_filepath], parent_delegations) except tuf.ssl_commons.exceptions.ForbiddenTargetError: diff --git a/tuf/developer_tool.py b/tuf/developer_tool.py index caa92907..e433fa07 100755 --- a/tuf/developer_tool.py +++ b/tuf/developer_tool.py @@ -38,7 +38,7 @@ import tuf import tuf.tufformats -import tuf.util +import tuf.ssl_crypto.util import tuf.keydb import tuf.roledb import tuf.keys @@ -265,7 +265,7 @@ def write(self, write_partial=False): # Ensure the parent directories of 'metadata_filepath' exist, otherwise an # IO exception is raised if 'metadata_filepath' is written to a # sub-directory. - tuf.util.ensure_parent_dir(delegated_filename) + tuf.ssl_crypto.util.ensure_parent_dir(delegated_filename) _generate_and_write_metadata(delegated_rolename, delegated_filename, write_partial, self._targets_directory, @@ -816,7 +816,7 @@ def load_project(project_directory, prefix='', new_targets_location=None): config_filename = os.path.join(project_directory, PROJECT_FILENAME) try: - project_configuration = tuf.util.load_json_file(config_filename) + project_configuration = tuf.ssl_crypto.util.load_json_file(config_filename) tuf.ssl_crypto.formats.PROJECT_CFG_SCHEMA.check_match(project_configuration) except (OSError, IOError, tuf.ssl_commons.exceptions.FormatError): @@ -863,7 +863,7 @@ def load_project(project_directory, prefix='', new_targets_location=None): # Load the project's metadata. targets_metadata_path = os.path.join(project_directory, metadata_directory, project_filename) - signable = tuf.util.load_json_file(targets_metadata_path) + signable = tuf.ssl_crypto.util.load_json_file(targets_metadata_path) tuf.tufformats.check_signable_object_format(signable) targets_metadata = signable['signed'] @@ -929,7 +929,7 @@ def load_project(project_directory, prefix='', new_targets_location=None): signable = None try: - signable = tuf.util.load_json_file(metadata_path) + signable = tuf.ssl_crypto.util.load_json_file(metadata_path) except (ValueError, IOError, tuf.ssl_commons.exceptions.Error): raise diff --git a/tuf/download.py b/tuf/download.py index fec50f8d..cf9ac15e 100755 --- a/tuf/download.py +++ b/tuf/download.py @@ -17,7 +17,7 @@ length of a downloaded file has to match the hash and length supplied by the metadata of that file. The downloaded file is technically a file-like object that will automatically destroys itself once closed. Note that the file-like - object, 'tuf.util.TempFile', is returned by the '_download_file()' function. + object, 'tuf.ssl_crypto.util.TempFile', is returned by the '_download_file()' function. """ # Help with Python 3 compatibility, where the print statement is a function, an @@ -39,7 +39,7 @@ import tuf from simple_settings import settings import tuf.ssl_crypto.hash -import tuf.util +import tuf.ssl_crypto.util import tuf.tufformats import six @@ -65,7 +65,7 @@ def safe_download(url, required_length): tuf.download.unsafe_download() may be called if an upper download limit is preferred. - 'tuf.util.TempFile', the file-like object returned, is used instead of + 'tuf.ssl_crypto.util.TempFile', the file-like object returned, is used instead of regular tempfile object because of additional functionality provided, such as handling compressed metadata and automatically closing files after moving to final destination. @@ -80,7 +80,7 @@ def safe_download(url, required_length): limit. - A 'tuf.util.TempFile' object is created on disk to store the contents of + A 'tuf.ssl_crypto.util.TempFile' object is created on disk to store the contents of 'url'. @@ -92,7 +92,7 @@ def safe_download(url, required_length): Any other unforeseen runtime exception. - A 'tuf.util.TempFile' file-like object that points to the contents of 'url'. + A 'tuf.ssl_crypto.util.TempFile' file-like object that points to the contents of 'url'. """ # Do all of the arguments have the appropriate format? @@ -129,7 +129,7 @@ def unsafe_download(url, required_length): tuf.download.safe_download() may be called if an exact download limit is preferred. - 'tuf.util.TempFile', the file-like object returned, is used instead of + 'tuf.ssl_crypto.util.TempFile', the file-like object returned, is used instead of regular tempfile object because of additional functionality provided, such as handling compressed metadata and automatically closing files after moving to final destination. @@ -144,7 +144,7 @@ def unsafe_download(url, required_length): limit. - A 'tuf.util.TempFile' object is created on disk to store the contents of + A 'tuf.ssl_crypto.util.TempFile' object is created on disk to store the contents of 'url'. @@ -156,7 +156,7 @@ def unsafe_download(url, required_length): Any other unforeseen runtime exception. - A 'tuf.util.TempFile' file-like object that points to the contents of 'url'. + A 'tuf.ssl_crypto.util.TempFile' file-like object that points to the contents of 'url'. """ # Do all of the arguments have the appropriate format? @@ -191,8 +191,8 @@ def _download_file(url, required_length, STRICT_REQUIRED_LENGTH=True): opens a connection to 'url' and downloads the file while ensuring its length and hashes match 'required_hashes' and 'required_length'. - tuf.util.TempFile is used instead of regular tempfile object because of - additional functionality provided by 'tuf.util.TempFile'. + tuf.ssl_crypto.util.TempFile is used instead of regular tempfile object because of + additional functionality provided by 'tuf.ssl_crypto.util.TempFile'. url: @@ -208,7 +208,7 @@ def _download_file(url, required_length, STRICT_REQUIRED_LENGTH=True): timestamp metadata, which has no signed required_length. - A 'tuf.util.TempFile' object is created on disk to store the contents of + A 'tuf.ssl_crypto.util.TempFile' object is created on disk to store the contents of 'url'. @@ -220,7 +220,7 @@ def _download_file(url, required_length, STRICT_REQUIRED_LENGTH=True): Any other unforeseen runtime exception. - A 'tuf.util.TempFile' file-like object that points to the contents of 'url'. + A 'tuf.ssl_crypto.util.TempFile' file-like object that points to the contents of 'url'. """ # Do all of the arguments have the appropriate format? @@ -236,7 +236,7 @@ def _download_file(url, required_length, STRICT_REQUIRED_LENGTH=True): # This is the temporary file that we will return to contain the contents of # the downloaded file. - temp_file = tuf.util.TempFile() + temp_file = tuf.ssl_crypto.util.TempFile() try: # Open the connection to the remote file. diff --git a/tuf/keys.py b/tuf/keys.py index 62a83cb6..bde08bc0 100755 --- a/tuf/keys.py +++ b/tuf/keys.py @@ -138,6 +138,7 @@ import tuf.ssl_crypto.hash # Perform format checks of argument objects. +import tuf.pycrypto_keys import tuf.ssl_crypto.formats import tuf.tufformats diff --git a/tuf/mirrors.py b/tuf/mirrors.py index 8153cb37..b342e73e 100755 --- a/tuf/mirrors.py +++ b/tuf/mirrors.py @@ -28,7 +28,7 @@ import os import tuf -import tuf.util +import tuf.ssl_crypto.util import tuf.tufformats import six @@ -89,12 +89,12 @@ def get_list_of_mirrors(file_type, file_path, mirrors_dict): 'Supported file types: '+repr(_SUPPORTED_FILE_TYPES) raise tuf.ssl_commons.exceptions.Error(message) - # Reference to 'tuf.util.file_in_confined_directories()' (improve readability). + # Reference to 'tuf.ssl_crypto.util.file_in_confined_directories()' (improve readability). # This function checks whether a mirror should serve a file to the client. # A client may be confined to certain paths on a repository mirror # when fetching target files. This field may be set by the client when # the repository mirror is added to the 'tuf.client.updater.Updater' object. - in_confined_directory = tuf.util.file_in_confined_directories + in_confined_directory = tuf.ssl_crypto.util.file_in_confined_directories list_of_mirrors = [] for mirror_name, mirror_info in six.iteritems(mirrors_dict): diff --git a/tuf/pyca_crypto_keys.py b/tuf/pyca_crypto_keys.py index b9ae1327..1e0a01f9 100755 --- a/tuf/pyca_crypto_keys.py +++ b/tuf/pyca_crypto_keys.py @@ -136,7 +136,7 @@ from simple_settings import settings # Import routine to process key files containing JSON data. -import tuf.util +import tuf.ssl_crypto.util # Recommended RSA key sizes: # http://www.emc.com/emc-plus/rsa-labs/historical/twirl-and-rsa-key-size.htm#table1 @@ -828,7 +828,7 @@ def decrypt_key(encrypted_key, password): # Raise 'tuf.ssl_commons.exceptions.Error' if 'json_data' cannot be deserialized to a valid # 'tuf.ssl_crypto.formats.ANYKEY_SCHEMA' key object. - key_object = tuf.util.load_json_string(json_data.decode()) + key_object = tuf.ssl_crypto.util.load_json_string(json_data.decode()) return key_object @@ -991,7 +991,7 @@ def _decrypt(file_contents, password): generated_hmac = binascii.hexlify(generated_hmac_object.finalize()) - if not tuf.util.digests_are_equal(generated_hmac.decode(), hmac): + if not tuf.ssl_crypto.util.digests_are_equal(generated_hmac.decode(), hmac): raise tuf.ssl_commons.exceptions.CryptoError('Decryption failed.') # Construct a Cipher object, with the key and iv. diff --git a/tuf/pycrypto_keys.py b/tuf/pycrypto_keys.py index 0d893a27..2948e61d 100755 --- a/tuf/pycrypto_keys.py +++ b/tuf/pycrypto_keys.py @@ -111,13 +111,13 @@ import tuf.ssl_crypto.hash # Perform object format-checking. -import tuf.tufformats +#import tuf.tufformats # Extract the cryptography library settings. from simple_settings import settings # Import key files containing json data. -import tuf.util +import tuf.ssl_crypto.util # Recommended RSA key sizes: # http://www.emc.com/emc-plus/rsa-labs/historical/twirl-and-rsa-key-size.htm#table1 @@ -766,7 +766,7 @@ def decrypt_key(encrypted_key, password): # Raise 'tuf.ssl_commons.exceptions.Error' if 'json_data' cannot be deserialized to a valid # 'tuf.ssl_crypto.formats.ANYKEY_SCHEMA' key object. - key_object = tuf.util.load_json_string(json_data.decode()) + key_object = tuf.ssl_crypto.util.load_json_string(json_data.decode()) return key_object @@ -931,7 +931,7 @@ def _decrypt(file_contents, password): Crypto.Hash.SHA256) generated_hmac = generated_hmac_object.hexdigest() - if not tuf.util.digests_are_equal(generated_hmac, hmac): + if not tuf.ssl_crypto.util.digests_are_equal(generated_hmac, hmac): raise tuf.ssl_commons.exceptions.CryptoError('Decryption failed.') # The following decryption routine assumes 'ciphertext' was encrypted with diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index 9f00ce55..c56f0fee 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -44,7 +44,7 @@ import tuf import tuf.ssl_crypto.formats import tuf.tufformats -import tuf.util +import tuf.ssl_crypto.util import tuf.keydb import tuf.roledb import tuf.keys @@ -607,7 +607,7 @@ def _load_top_level_metadata(repository, top_level_filenames): if os.path.exists(root_filename): # Initialize the key and role metadata of the top-level roles. - signable = tuf.util.load_json_file(root_filename) + signable = tuf.ssl_crypto.util.load_json_file(root_filename) tuf.tufformats.check_signable_object_format(signable) root_metadata = signable['signed'] tuf.keydb.create_keydb_from_root_metadata(root_metadata) @@ -654,7 +654,7 @@ def _load_top_level_metadata(repository, top_level_filenames): # Load 'timestamp.json'. A Timestamp role file without a version number is # always written. if os.path.exists(timestamp_filename): - signable = tuf.util.load_json_file(timestamp_filename) + signable = tuf.ssl_crypto.util.load_json_file(timestamp_filename) timestamp_metadata = signable['signed'] for signature in signable['signatures']: repository.timestamp.add_signature(signature, mark_role_as_dirty=False) @@ -696,7 +696,7 @@ def _load_top_level_metadata(repository, top_level_filenames): snapshot_filename = os.path.join(dirname, str(snapshot_version) + '.' + basename + METADATA_EXTENSION) if os.path.exists(snapshot_filename): - signable = tuf.util.load_json_file(snapshot_filename) + signable = tuf.ssl_crypto.util.load_json_file(snapshot_filename) tuf.tufformats.check_signable_object_format(signable) snapshot_metadata = signable['signed'] @@ -735,7 +735,7 @@ def _load_top_level_metadata(repository, top_level_filenames): targets_filename = os.path.join(dirname, str(targets_version) + '.' + basename) if os.path.exists(targets_filename): - signable = tuf.util.load_json_file(targets_filename) + signable = tuf.ssl_crypto.util.load_json_file(targets_filename) tuf.tufformats.check_signable_object_format(signable) targets_metadata = signable['signed'] @@ -881,11 +881,11 @@ def generate_and_write_rsa_keypair(filepath, bits=DEFAULT_RSA_KEY_BITS, # Write public key (i.e., 'public', which is in PEM format) to # '.pub'. If the parent directory of filepath does not exist, # create it (and all its parent directories, if necessary). - tuf.util.ensure_parent_dir(filepath) + tuf.ssl_crypto.util.ensure_parent_dir(filepath) # Create a tempororary file, write the contents of the public key, and move # to final destination. - file_object = tuf.util.TempFile() + file_object = tuf.ssl_crypto.util.TempFile() file_object.write(public.encode('utf-8')) # The temporary file is closed after the final move. @@ -894,7 +894,7 @@ def generate_and_write_rsa_keypair(filepath, bits=DEFAULT_RSA_KEY_BITS, # Write the private key in encrypted PEM format to ''. # Unlike the public key file, the private key does not have a file # extension. - file_object = tuf.util.TempFile() + file_object = tuf.ssl_crypto.util.TempFile() file_object.write(encrypted_pem.encode('utf-8')) file_object.move(filepath) @@ -1088,11 +1088,11 @@ def generate_and_write_ed25519_keypair(filepath, password=None): # Write the public key, conformant to 'tuf.ssl_crypto.formats.KEY_SCHEMA', to # '.pub'. - tuf.util.ensure_parent_dir(filepath) + tuf.ssl_crypto.util.ensure_parent_dir(filepath) # Create a tempororary file, write the contents of the public key, and move # to final destination. - file_object = tuf.util.TempFile() + file_object = tuf.ssl_crypto.util.TempFile() file_object.write(json.dumps(ed25519key_metadata_format).encode('utf-8')) # The temporary file is closed after the final move. @@ -1100,7 +1100,7 @@ def generate_and_write_ed25519_keypair(filepath, password=None): # Write the encrypted key string, conformant to # 'tuf.ssl_crypto.formats.ENCRYPTEDKEY_SCHEMA', to ''. - file_object = tuf.util.TempFile() + file_object = tuf.ssl_crypto.util.TempFile() file_object.write(encrypted_key.encode('utf-8')) file_object.move(filepath) @@ -1141,7 +1141,7 @@ def import_ed25519_publickey_from_file(filepath): # ED25519 key objects are saved in json and metadata format. Return the # loaded key object in tuf.ssl_crypto.formats.ED25519KEY_SCHEMA' format that also # includes the keyid. - ed25519_key_metadata = tuf.util.load_json_file(filepath) + ed25519_key_metadata = tuf.ssl_crypto.util.load_json_file(filepath) ed25519_key, junk = tuf.keys.format_metadata_to_key(ed25519_key_metadata) # Raise an exception if an unexpected key type is imported. @@ -1349,7 +1349,7 @@ def get_metadata_fileinfo(filename, custom=None): # file information, such as the file's author, version/revision # numbers, etc. filesize, filehashes = \ - tuf.util.get_file_details(filename, settings.REPOSITORY_HASH_ALGORITHMS) + tuf.ssl_crypto.util.get_file_details(filename, settings.REPOSITORY_HASH_ALGORITHMS) return tuf.tufformats.make_fileinfo(filesize, filehashes, custom=custom) @@ -1425,7 +1425,7 @@ def get_target_hash(target_filepath): The hash of 'target_filepath'. """ - return tuf.util.get_target_hash(target_filepath) + return tuf.ssl_crypto.util.get_target_hash(target_filepath) @@ -1746,7 +1746,7 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date, # available delegated roles on the repository. fileinfodict = {} root_path = os.path.join(metadata_directory, root_filename + '.json') - length, hashes = tuf.util.get_file_details(root_path) + length, hashes = tuf.ssl_crypto.util.get_file_details(root_path) root_version = get_metadata_versioninfo('root') fileinfodict[ROOT_FILENAME] = tuf.tufformats.make_fileinfo(length, hashes, version=root_version['version']) fileinfodict[TARGETS_FILENAME] = get_metadata_versioninfo(targets_filename) @@ -1831,7 +1831,7 @@ def generate_timestamp_metadata(snapshot_filename, version, expiration_date): # Retrieve the versioninfo of the Snapshot metadata file. snapshot_fileinfo = {} - length, hashes = tuf.util.get_file_details(snapshot_filename) + length, hashes = tuf.ssl_crypto.util.get_file_details(snapshot_filename) snapshot_version = get_metadata_versioninfo('snapshot') snapshot_fileinfo[SNAPSHOT_FILENAME] = \ tuf.tufformats.make_fileinfo(length, hashes, version=snapshot_version['version']) @@ -2012,11 +2012,11 @@ def write_metadata_file(metadata, filename, version_number, # versions. To avoid partial metadata from being written, 'metadata' is # first written to a temporary location (i.e., 'file_object') and then # moved to 'filename'. - file_object = tuf.util.TempFile() + file_object = tuf.ssl_crypto.util.TempFile() # Serialize 'metadata' to the file-like object and then write # 'file_object' to disk. The dictionary keys of 'metadata' are sorted - # and indentation is used. The 'tuf.util.TempFile' file-like object is + # and indentation is used. The 'tuf.ssl_crypto.util.TempFile' file-like object is # automically closed after the final move. file_object.write(file_content) @@ -2077,7 +2077,7 @@ def write_metadata_file(metadata, filename, version_number, continue elif compression_algorithm == 'gz': - file_object = tuf.util.TempFile() + file_object = tuf.ssl_crypto.util.TempFile() compressed_filename = filename + '.gz' # Instantiate a gzip object, but save compressed content to @@ -2127,7 +2127,7 @@ def _write_compressed_metadata(file_object, compressed_filename, file_object.move(compressed_filename) # The temporary file must be closed if 'file_object.move()' is not used. - # tuf.util.TempFile() automatically closes the temp file when move() is + # tuf.ssl_crypto.util.TempFile() automatically closes the temp file when move() is # called else: file_object.close_temp_file() @@ -2151,7 +2151,7 @@ def _write_compressed_metadata(file_object, compressed_filename, else: logger.debug('Skipping compression extension: ' + repr(compression_extension)) - # Move the 'tuf.util.TempFile' object to one of the filenames so that it is + # Move the 'tuf.ssl_crypto.util.TempFile' object to one of the filenames so that it is # saved and the temporary file closed. if not os.path.exists(consistent_filename): logger.debug('Saving ' + repr(consistent_filename)) diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index 9900f579..e752f35f 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -40,7 +40,7 @@ import tuf import tuf.tufformats -import tuf.util +import tuf.ssl_crypto.util import tuf.keydb import tuf.roledb import tuf.keys @@ -2939,7 +2939,7 @@ def load_repository(repository_directory): signable = None try: - signable = tuf.util.load_json_file(metadata_path) + signable = tuf.ssl_crypto.util.load_json_file(metadata_path) except (tuf.ssl_commons.exceptions.Error, ValueError, IOError): logger.debug('Tried to load metadata with invalid JSON' diff --git a/tuf/util.py b/tuf/util.py deleted file mode 100755 index 4d3969af..00000000 --- a/tuf/util.py +++ /dev/null @@ -1,998 +0,0 @@ -""" - - util.py - - - Konstantin Andrianov - - - March 24, 2012. Derived from original util.py written by Geremy Condra. - - - See LICENSE for licensing information. - - - Provides utility services. This module supplies utility functions such as: - get_file_details() that computes the length and hash of a file, import_json - that tries to import a working json module, load_json_* functions, and a - TempFile class that generates a file-like object for temporary storage, etc. -""" - -# Help with Python 3 compatibility, where the print statement is a function, an -# implicit relative import is invalid, and the '/' operator performs true -# division. Example: print 'hello world' raises a 'SyntaxError' exception. -from __future__ import print_function -from __future__ import absolute_import -from __future__ import division -from __future__ import unicode_literals - -import os -import sys -import gzip -import shutil -import logging -import tempfile -import fnmatch - -import tuf -import tuf.ssl_crypto.hash -from simple_settings import settings -import tuf.tufformats -import six - -# The algorithm used by the repository to generate the digests of the -# target filepaths, which are included in metadata files and may be prepended -# to the filenames of consistent snapshots. -HASH_FUNCTION = 'sha256' - -# See 'log.py' to learn how logging is handled in TUF. -logger = logging.getLogger('tuf.util') - - -class TempFile(object): - """ - - A high-level temporary file that cleans itself up or can be manually - cleaned up. This isn't a complete file-like object. The file functions - that are supported make additional common-case safe assumptions. There - are additional functions that aren't part of file-like objects. TempFile - is used in the download.py module to temporarily store downloaded data while - all security checks (file hashes/length) are performed. - """ - - def _default_temporary_directory(self, prefix): - """__init__ helper.""" - try: - self.temporary_file = tempfile.NamedTemporaryFile(prefix=prefix) - - except OSError as err: # pragma: no cover - logger.critical('Cannot create a system temporary directory: '+repr(err)) - raise tuf.ssl_commons.exceptions.Error(err) - - - - def __init__(self, prefix='tuf_temp_'): - """ - - Initializes TempFile. - - - prefix: - A string argument to be used with tempfile.NamedTemporaryFile function. - - - tuf.ssl_commons.exceptions.Error on failure to load temp dir. - - - None. - """ - - self._compression = None - - # If compression is set then the original file is saved in 'self._orig_file'. - self._orig_file = None - temp_dir = settings.temporary_directory - if temp_dir is not None and tuf.ssl_crypto.formats.PATH_SCHEMA.matches(temp_dir): - try: - self.temporary_file = tempfile.NamedTemporaryFile(prefix=prefix, - dir=temp_dir) - except OSError as err: - logger.error('Temp file in ' + temp_dir + ' failed: '+repr(err)) - logger.error('Will attempt to use system default temp dir.') - self._default_temporary_directory(prefix) - - else: - self._default_temporary_directory(prefix) - - - - - - def get_compressed_length(self): - """ - - Get the compressed length of the file. This will be correct information - even when the file is read as an uncompressed one. - - - None. - - - OSError. - - - Nonnegative integer representing compressed file size. - """ - - # Even if we read a compressed file with the gzip standard library module, - # the original file will remain compressed. - return os.stat(self.temporary_file.name).st_size - - - - def flush(self): - """ - - Flushes buffered output for the file. - - - None. - - - None. - - - None. - """ - - self.temporary_file.flush() - - - - - - def read(self, size=None): - """ - - Read specified number of bytes. If size is not specified then the whole - file is read and the file pointer is placed at the beginning of the file. - - - size: - Number of bytes to be read. - - - tuf.ssl_commons.exceptions.FormatError: if 'size' is invalid. - - - String of data. - """ - - if size is None: - self.temporary_file.seek(0) - data = self.temporary_file.read() - self.temporary_file.seek(0) - - return data - - else: - if not (isinstance(size, int) and size > 0): - raise tuf.ssl_commons.exceptions.FormatError - - return self.temporary_file.read(size) - - - - - - def write(self, data, auto_flush=True): - """ - - Writes a data string to the file. - - - data: - A string containing some data. - - auto_flush: - Boolean argument, if set to 'True', all data will be flushed from - internal buffer. - - - None. - - - None. - """ - - self.temporary_file.write(data) - if auto_flush: - self.flush() - - - - def move(self, destination_path): - """ - - Copies 'self.temporary_file' to a non-temp file at 'destination_path' and - closes 'self.temporary_file' so that it is removed. - - - destination_path: - Path to store the file in. - - - None. - - - None. - """ - - self.flush() - self.seek(0) - destination_file = open(destination_path, 'wb') - shutil.copyfileobj(self.temporary_file, destination_file) - - # Force the destination file to be written to disk from Python's internal and - # the operation system's buffers. os.fsync() should follow flush(). - destination_file.flush() - os.fsync(destination_file.fileno()) - destination_file.close() - - # 'self.close()' closes temporary file which destroys itself. - self.close_temp_file() - - - - - - def seek(self, *args): - """ - - Set file's current position. - - - *args: - (*-operator): unpacking argument list is used - because seek method accepts two args: offset and whence. If whence is - not specified, its default is 0. Indicate offset to set the file's - current position. Refer to the python manual for more info. - - - None. - - - None. - """ - - self.temporary_file.seek(*args) - - - - - - def decompress_temp_file_object(self, compression): - """ - - To decompress a compressed temp file object. Decompression is performed - on a temp file object that is compressed, this occurs after downloading - a compressed file. For instance if a compressed version of some meta - file in the repository is downloaded, the temp file containing the - compressed meta file will be decompressed using this function. - Note that after calling this method, write() can no longer be called. - - meta.json.gz - |...[download] - temporary_file (containing meta.json.gz) - / \ - temporary_file _orig_file - containing meta.json containing meta.json.gz - (decompressed data) - - - compression: - A string indicating the type of compression that was used to compress - a file. Only gzip is allowed. - - - tuf.ssl_commons.exceptions.FormatError: If 'compression' is improperly formatted. - - tuf.ssl_commons.exceptions.Error: If an invalid compression is given. - - tuf.ssl_commons.exceptions.DecompressionError: If the compression failed for any reason. - - - 'self._orig_file' is used to store the original data of 'temporary_file'. - - - None. - """ - - # Does 'compression' have the correct format? - # Raise 'tuf.ssl_commons.exceptions.FormatError' if there is a mismatch. - tuf.ssl_crypto.formats.NAME_SCHEMA.check_match(compression) - - if self._orig_file is not None: - raise tuf.ssl_commons.exceptions.Error('Can only set compression on a TempFile once.') - - if compression != 'gzip': - raise tuf.ssl_commons.exceptions.Error('Only gzip compression is supported.') - - self.seek(0) - self._compression = compression - self._orig_file = self.temporary_file - - try: - gzip_file_object = gzip.GzipFile(fileobj=self.temporary_file, mode='rb') - uncompressed_content = gzip_file_object.read() - self.temporary_file = tempfile.NamedTemporaryFile() - self.temporary_file.write(uncompressed_content) - self.flush() - - except Exception as exception: - raise tuf.ssl_commons.exceptions.DecompressionError(exception) - - - - - - def close_temp_file(self): - """ - - Closes the temporary file object. 'close_temp_file' mimics usual - file.close(), however temporary file destroys itself when - 'close_temp_file' is called. Further if compression is set, second - temporary file instance 'self._orig_file' is also closed so that no open - temporary files are left open. - - - None. - - - None. - - - Closes 'self._orig_file'. - - - None. - """ - - self.temporary_file.close() - # If compression has been set, we need to explicitly close the original - # file object. - if self._orig_file is not None: - self._orig_file.close() - - - - - -def get_file_details(filepath, hash_algorithms=['sha256']): - """ - - To get file's length and hash information. The hash is computed using the - sha256 algorithm. This function is used in the signerlib.py and updater.py - modules. - - - filepath: - Absolute file path of a file. - - hash_algorithms: - - - tuf.ssl_commons.exceptions.FormatError: If hash of the file does not match HASHDICT_SCHEMA. - - tuf.ssl_commons.exceptions.Error: If 'filepath' does not exist. - - - A tuple (length, hashes) describing 'filepath'. - """ - - # Making sure that the format of 'filepath' is a path string. - # 'tuf.ssl_commons.exceptions.FormatError' is raised on incorrect format. - tuf.ssl_crypto.formats.PATH_SCHEMA.check_match(filepath) - tuf.ssl_crypto.formats.HASHALGORITHMS_SCHEMA.check_match(hash_algorithms) - - # The returned file hashes of 'filepath'. - file_hashes = {} - - # Does the path exists? - if not os.path.exists(filepath): - raise tuf.ssl_commons.exceptions.Error('Path ' + repr(filepath) + ' doest not exist.') - filepath = os.path.abspath(filepath) - - # Obtaining length of the file. - file_length = os.path.getsize(filepath) - - # Obtaining hash of the file. - for algorithm in hash_algorithms: - digest_object = tuf.ssl_crypto.hash.digest_filename(filepath, algorithm) - file_hashes.update({algorithm: digest_object.hexdigest()}) - - # Performing a format check to ensure 'file_hash' corresponds HASHDICT_SCHEMA. - # Raise 'tuf.ssl_commons.exceptions.FormatError' if there is a mismatch. - tuf.ssl_crypto.formats.HASHDICT_SCHEMA.check_match(file_hashes) - - return file_length, file_hashes - - - - - -def ensure_parent_dir(filename): - """ - - To ensure existence of the parent directory of 'filename'. If the parent - directory of 'name' does not exist, create it. - - Example: If 'filename' is '/a/b/c/d.txt', and only the directory '/a/b/' - exists, then directory '/a/b/c/d/' will be created. - - - filename: - A path string. - - - tuf.ssl_commons.exceptions.FormatError: If 'filename' is improperly formatted. - - - A directory is created whenever the parent directory of 'filename' does not - exist. - - - None. - """ - - # Ensure 'filename' corresponds to 'PATH_SCHEMA'. - # Raise 'tuf.ssl_commons.exceptions.FormatError' on a mismatch. - tuf.ssl_crypto.formats.PATH_SCHEMA.check_match(filename) - - # Split 'filename' into head and tail, check if head exists. - directory = os.path.split(filename)[0] - - if directory and not os.path.exists(directory): - # mode = 'rwx------'. 448 (decimal) is 700 in octal. - os.makedirs(directory, 448) - - - - - -def file_in_confined_directories(filepath, confined_directories): - """ - - Check if the directory containing 'filepath' is in the list/tuple of - 'confined_directories'. - - - filepath: - A string representing the path of a file. The following example path - strings are viewed as files and not directories: 'a/b/c', 'a/b/c.txt'. - - confined_directories: - A list, or a tuple, of directory strings. - - - tuf.ssl_commons.exceptions.FormatError: On incorrect format of the input. - - - Boolean. True, if path is either the empty string - or in 'confined_paths'; False, otherwise. - """ - - # Do the arguments have the correct format? - # Raise 'tuf.ssl_commons.exceptions.FormatError' if there is a mismatch. - tuf.ssl_crypto.formats.RELPATH_SCHEMA.check_match(filepath) - tuf.ssl_crypto.formats.RELPATHS_SCHEMA.check_match(confined_directories) - - for confined_directory in confined_directories: - # The empty string (arbitrarily chosen) signifies the client is confined - # to all directories and subdirectories. No need to check 'filepath'. - if confined_directory == '': - return True - - # Normalized paths needed, to account for up-level references, etc. - # TUF clients have the option of setting the list of directories in - # 'confined_directories'. - filepath = os.path.normpath(filepath) - confined_directory = os.path.normpath(confined_directory) - - # A TUF client may restrict himself to specific directories on the - # remote repository. The list of paths in 'confined_path', not including - # each path's subdirectories, are the only directories the client will - # download targets from. - if os.path.dirname(filepath) == confined_directory: - return True - - return False - - - - - - -def find_delegated_role(roles, delegated_role): - """ - - Find the index, if any, of a role with a given name in a list of roles. - - - roles: - The list of roles, each of which must have a 'name' attribute. - - delegated_role: - The name of the role to be found in the list of roles. - - - tuf.ssl_commons.exceptions.RepositoryError, if the list of roles has invalid data. - - - No known side effects. - - - The unique index, an interger, in the list of roles. if 'delegated_role' - does not exist, 'None' is returned. - """ - - # Do the arguments have the correct format? - # Ensure the arguments have the appropriate number of objects and object - # types, and that all dict keys are properly named. - # Raise 'tuf.ssl_commons.exceptions.FormatError' if any are improperly formatted. - tuf.ssl_crypto.formats.ROLELIST_SCHEMA.check_match(roles) - tuf.ssl_crypto.formats.ROLENAME_SCHEMA.check_match(delegated_role) - - # The index of a role, if any, with the same name. - role_index = None - - for index in six.moves.xrange(len(roles)): - role = roles[index] - name = role.get('name') - - # This role has no name. - if name is None: - no_name_message = 'Role with no name.' - raise tuf.ssl_commons.exceptions.RepositoryError(no_name_message) - - # Does this role have the same name? - else: - # This role has the same name, and... - if name == delegated_role: - # ...it is the only known role with the same name. - if role_index is None: - role_index = index - - # ...there are at least two roles with the same name. - else: - duplicate_role_message = 'Duplicate role (' + str(delegated_role) + ').' - raise tuf.ssl_commons.exceptions.RepositoryError(duplicate_role_message) - - # This role has a different name. - else: - logger.debug('Skipping delegated role: ' + repr(delegated_role)) - - return role_index - - - - - - -def ensure_all_targets_allowed(rolename, list_of_targets, parent_delegations): - """ - - Ensure that the list of targets specified by 'rolename' are allowed; this is - determined by inspecting the 'delegations' field of the parent role - of 'rolename'. If a target specified by 'rolename' is not found in the - delegations field of 'metadata_object_of_parent', raise an exception. The - top-level role 'targets' is allowed to list any target file, so this - function does not raise an exception if 'rolename' is 'targets'. - - Targets allowed are either exlicitly listed under the 'paths' field, or - match one of the patterns (i.e., Unix shell-style wildcards) listed there. - A parent role may delegate trust to all files under a particular directory, - including files in subdirectories by using wildcards (e.g., - '/packages/source/Django/*', '/packages/django*.tar.gzip). - Targets listed in hashed bins are also validated (i.e., its calculated path - hash prefix must be delegated by the parent role). - - TODO: Should the TUF spec restrict the repository to one particular - algorithm when calcutating path hash prefixes (currently restricted to - SHA256)? Should we allow the repository to specify in the role dictionary - the algorithm used for these generated hashed paths? - - - rolename: - The name of the role whose targets must be verified. This is a - role name and should not end in '.json'. Examples: 'root', 'targets', - 'unclaimed'. - - list_of_targets: - The targets of 'rolename', as listed in targets field of the 'rolename' - metadata. 'list_of_targets' are target paths relative to the targets - directory of the repository. The delegations of the parent role are - checked to verify that the targets of 'list_of_targets' are valid. - - parent_delegations: - The parent delegations of 'rolename'. The metadata object stores - the allowed paths and path hash prefixes of child delegations in its - 'delegations' attribute. - - - tuf.ssl_commons.exceptions.FormatError: - If any of the arguments are improperly formatted. - - tuf.ssl_commons.exceptions.ForbiddenTargetError: - If the targets of 'metadata_role' are not allowed according to - the parent's metadata file. The 'paths' and 'path_hash_prefixes' - attributes are verified. - - tuf.ssl_commons.exceptions.RepositoryError: - If the parent of 'rolename' has not made a delegation to 'rolename'. - - - None. - - - None. - """ - - # Do the arguments have the correct format? - # Ensure the arguments have the appropriate number of objects and object - # types, and that all dict keys are properly named. - # Raise 'tuf.ssl_commons.exceptions.FormatError' if any are improperly formatted. - tuf.ssl_crypto.formats.ROLENAME_SCHEMA.check_match(rolename) - tuf.ssl_crypto.formats.RELPATHS_SCHEMA.check_match(list_of_targets) - tuf.ssl_crypto.formats.DELEGATIONS_SCHEMA.check_match(parent_delegations) - - # Return if 'rolename' is 'targets'. 'targets' is not a delegated role. Any - # target file listed in 'targets' is allowed. - if rolename == 'targets': - return - - # The allowed targets of delegated roles are stored in the parent's metadata - # file. Iterate 'list_of_targets' and confirm they are trusted, or their root - # parent directory exists in the role delegated paths, or path hash prefixes, - # of the parent role. First, locate 'rolename' in the 'roles' attribute of - # 'parent_delegations'. - roles = parent_delegations['roles'] - role_index = find_delegated_role(roles, rolename) - - # Ensure the delegated role exists prior to extracting trusted paths from - # the parent's 'paths', or trusted path hash prefixes from the parent's - # 'path_hash_prefixes'. - if role_index is not None: - role = roles[role_index] - allowed_child_paths = role.get('paths') - allowed_child_path_hash_prefixes = role.get('path_hash_prefixes') - actual_child_targets = list_of_targets - - if allowed_child_path_hash_prefixes is not None: - consistent = paths_are_consistent_with_hash_prefixes - - # 'actual_child_targets' (i.e., 'list_of_targets') should have lenth - # greater than zero due to the tuf.format check above. - if not consistent(actual_child_targets, - allowed_child_path_hash_prefixes): - message = repr(rolename) + ' specifies a target that does not' + \ - ' have a path hash prefix listed in its parent role.' - raise tuf.ssl_commons.exceptions.ForbiddenTargetError(message) - - elif allowed_child_paths is not None: - # Check that each delegated target is either explicitly listed or a parent - # directory is found under role['paths'], otherwise raise an exception. - # If the parent role explicitly lists target file paths in 'paths', - # this loop will run in O(n^2), the worst-case. The repository - # maintainer will likely delegate entire directories, and opt for - # explicit file paths if the targets in a directory are delegated to - # different roles/developers. - for child_target in actual_child_targets: - for allowed_child_path in allowed_child_paths: - if fnmatch.fnmatch(child_target, allowed_child_path): - break - - else: - raise tuf.ssl_commons.exceptions.ForbiddenTargetError('Role '+repr(rolename)+' specifies'+\ - ' target '+repr(child_target)+','+\ - ' which is not an allowed path'+\ - ' according to the delegations set'+\ - ' by its parent role.') - - else: - # 'role' should have been validated when it was downloaded. - # The 'paths' or 'path_hash_prefixes' attributes should not be missing, - # so raise an error in case this clause is reached. - raise tuf.ssl_commons.exceptions.FormatError(repr(role) + ' did not contain one of ' +\ - 'the required fields ("paths" or ' +\ - '"path_hash_prefixes").') - - # Raise an exception if the parent has not delegated to the specified - # 'rolename' child role. - else: - raise tuf.ssl_commons.exceptions.RepositoryError('The parent role has not delegated to '+\ - repr(rolename) + '.') - - - - - -def paths_are_consistent_with_hash_prefixes(paths, path_hash_prefixes): - """ - - Determine whether a list of paths are consistent with their alleged - path hash prefixes. By default, the SHA256 hash function is used. - - - paths: - A list of paths for which their hashes will be checked. - - path_hash_prefixes: - The list of path hash prefixes with which to check the list of paths. - - - tuf.ssl_commons.exceptions.FormatError: - If the arguments are improperly formatted. - - - No known side effects. - - - A Boolean indicating whether or not the paths are consistent with the - hash prefix. - """ - - # Do the arguments have the correct format? - # Ensure the arguments have the appropriate number of objects and object - # types, and that all dict keys are properly named. - # Raise 'tuf.ssl_commons.exceptions.FormatError' if any are improperly formatted. - tuf.ssl_crypto.formats.RELPATHS_SCHEMA.check_match(paths) - tuf.ssl_crypto.formats.PATH_HASH_PREFIXES_SCHEMA.check_match(path_hash_prefixes) - - # Assume that 'paths' and 'path_hash_prefixes' are inconsistent until - # proven otherwise. - consistent = False - - # The format checks above ensure the 'paths' and 'path_hash_prefix' lists - # have lengths greater than zero. - for path in paths: - path_hash = get_target_hash(path) - - # Assume that every path is inconsistent until proven otherwise. - consistent = False - - for path_hash_prefix in path_hash_prefixes: - if path_hash.startswith(path_hash_prefix): - consistent = True - break - - # This path has no matching path_hash_prefix. Stop looking further. - if not consistent: - break - - return consistent - - - - - -def get_target_hash(target_filepath): - """ - - Compute the hash of 'target_filepath'. This is useful in conjunction with - the "path_hash_prefixes" attribute in a delegated targets role, which - tells us which paths it is implicitly responsible for. - - The repository may optionally organize targets into hashed bins to ease - target delegations and role metadata management. The use of consistent - hashing allows for a uniform distribution of targets into bins. - - - target_filepath: - The path to the target file on the repository. This will be relative to - the 'targets' (or equivalent) directory on a given mirror. - - - None. - - - None. - - - The hash of 'target_filepath'. - """ - - # Does 'target_filepath' have the correct format? - # Ensure the arguments have the appropriate number of objects and object - # types, and that all dict keys are properly named. - # Raise 'tuf.ssl_commons.exceptions.FormatError' if there is a mismatch. - tuf.ssl_crypto.formats.RELPATH_SCHEMA.check_match(target_filepath) - - # Calculate the hash of the filepath to determine which bin to find the - # target. The client currently assumes the repository uses - # 'HASH_FUNCTION' to generate hashes and 'utf-8'. - digest_object = tuf.ssl_crypto.hash.digest(HASH_FUNCTION) - encoded_target_filepath = target_filepath.encode('utf-8') - digest_object.update(encoded_target_filepath) - target_filepath_hash = digest_object.hexdigest() - - return target_filepath_hash - - - - - -_json_module = None - -def import_json(): - """ - - Tries to import json module. We used to fall back to the simplejson module, - but we have dropped support for that module. We are keeping this interface - intact for backwards compatibility. - - - None. - - - ImportError: on failure to import the json module. - - - None. - - - json module - """ - - global _json_module - - if _json_module is not None: - return _json_module - - else: - try: - module = __import__('json') - - # The 'json' module is available in Python > 2.6, and thus this exception - # should not occur in all supported Python installations (> 2.6) of TUF. - except ImportError: #pragma: no cover - raise ImportError('Could not import the json module') - - else: - _json_module = module - return module - -json = import_json() - - - -def load_json_string(data): - """ - - Deserialize 'data' (JSON string) to a Python object. - - - data: - A JSON string. - - - tuf.ssl_commons.exceptions.Error, if 'data' cannot be deserialized to a Python object. - - - None. - - - Deserialized object. For example, a dictionary. - """ - - deserialized_object = None - - try: - deserialized_object = json.loads(data) - - except TypeError: - message = 'Invalid JSON string: ' + repr(data) - raise tuf.ssl_commons.exceptions.Error(message) - - except ValueError: - message = 'Cannot deserialize to a Python object: ' + repr(data) - raise tuf.ssl_commons.exceptions.Error(message) - - else: - return deserialized_object - - - -def load_json_file(filepath): - """ - - Deserialize a JSON object from a file containing the object. - - - filepath: - Absolute path of JSON file. - - - tuf.ssl_commons.exceptions.FormatError: If 'filepath' is improperly formatted. - - tuf.ssl_commons.exceptions.Error: If 'filepath' cannot be deserialized to a Python object. - - IOError: If there are runtime IO exceptions. - - - None. - - - Deserialized object. For example, a dictionary. - """ - - # Making sure that the format of 'filepath' is a path string. - # tuf.ssl_commons.exceptions.FormatError is raised on incorrect format. - tuf.ssl_crypto.formats.PATH_SCHEMA.check_match(filepath) - - deserialized_object = None - - # The file is mostly likely gzipped. - if filepath.endswith('.gz'): - logger.debug('gzip.open(' + str(filepath) + ')') - fileobject = six.StringIO(gzip.open(filepath).read().decode('utf-8')) - - else: - logger.debug('open(' + str(filepath) + ')') - fileobject = open(filepath) - - try: - deserialized_object = json.load(fileobject) - - except (ValueError, TypeError): - raise tuf.ssl_commons.exceptions.Error('Cannot deserialize to a Python object: ' + repr(filepath)) - - - else: - fileobject.close() - return deserialized_object - - finally: - fileobject.close() - - - -def digests_are_equal(digest1, digest2): - """ - - While protecting against timing attacks, compare the hexadecimal arguments - and determine if they are equal. - - - digest1: - The first hexadecimal string value to compare. - - digest2: - The second hexadecimal string value to compare. - - - tuf.ssl_commons.exceptions.FormatError: If the arguments are improperly formatted. - - - None. - - - Return True if 'digest1' is equal to 'digest2', False otherwise. - """ - - # Ensure the arguments have the appropriate number of objects and object - # types, and that all dict keys are properly named. - # Raise 'tuf.ssl_commons.exceptions.FormatError' if there is a mismatch. - tuf.ssl_crypto.formats.HEX_SCHEMA.check_match(digest1) - tuf.ssl_crypto.formats.HEX_SCHEMA.check_match(digest2) - - if len(digest1) != len(digest2): - return False - - are_equal = True - - for element in range(len(digest1)): - if digest1[element] != digest2[element]: - are_equal = False - - return are_equal