diff --git a/docs/images/repository_tool-diagram.png b/docs/images/repository_tool-diagram.png index 36d444f9..e8e6f289 100644 Binary files a/docs/images/repository_tool-diagram.png and b/docs/images/repository_tool-diagram.png differ diff --git a/tests/unit/test_repository_tool.py b/tests/unit/test_repository_tool.py new file mode 100755 index 00000000..62dc92f4 --- /dev/null +++ b/tests/unit/test_repository_tool.py @@ -0,0 +1,1820 @@ +""" + + test_repository_tool.py + + + Vladimir Diaz + + + April 7, 2014. + + + See LICENSE for licensing information. + + + Unit test for 'repository_tool.py'. +""" + +import os +import time +import unittest +import logging +import tempfile +import shutil + +import tuf +import tuf.log +import tuf.formats +import tuf.roledb +import tuf.keydb +import tuf.hash +import tuf.repository_tool as repo_tool + +logger = logging.getLogger('tuf.test_repository_tool') + +repo_tool.disable_console_log_messages() + + +class TestRepository(unittest.TestCase): + @classmethod + def setUpClass(cls): + + # setUpClass() is called before tests in an individual class are executed. + + # Create a temporary directory to store the repository, metadata, and target + # files. 'temporary_directory' must be deleted in TearDownClass() so that + # temporary files are always removed, even when exceptions occur. + cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) + + + + @classmethod + def tearDownClass(cls): + + # tearDownModule() is called after all the tests have run. + # http://docs.python.org/2/library/unittest.html#class-and-module-fixtures + + # Remove the temporary repository directory, which should contain all the + # metadata, targets, and key files generated for the test cases. + shutil.rmtree(cls.temporary_directory) + + + + def setUp(self): + pass + + + + def tearDown(self): + tuf.roledb.clear_roledb() + tuf.keydb.clear_keydb() + + + def test_init(self): + + # Test normal case. + repository = repo_tool.Repository('repository_directory/', + 'metadata_directory/', + 'targets_directory/') + self.assertTrue(isinstance(repository.root, repo_tool.Root)) + self.assertTrue(isinstance(repository.snapshot, repo_tool.Snapshot)) + self.assertTrue(isinstance(repository.timestamp, repo_tool.Timestamp)) + self.assertTrue(isinstance(repository.targets, repo_tool.Targets)) + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, repo_tool.Repository, 3, + 'metadata_directory/', 'targets_directory') + self.assertRaises(tuf.FormatError, repo_tool.Repository, + 'repository_directory', 3, 'targets_directory') + self.assertRaises(tuf.FormatError, repo_tool.Repository, + 'repository_directory', 'metadata_directory', 3) + + + + def test_write_and_write_partial(self): + # Test creation of a TUF repository. + # + # 1. Load public and private keys. + # 2. Add verification keys. + # 3. Load signing keys. + # 4. Add target files. + # 5. Perform delegation. + # 5. write() + # + # Copy the target files from 'tuf/tests/repository_data' so that write() + # has target fileinfo to include in metadata. + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + targets_directory = os.path.join(temporary_directory, 'repository', + repo_tool.TARGETS_DIRECTORY_NAME) + original_targets_directory = os.path.join(os.pardir, 'repository_data', + 'repository', 'targets') + shutil.copytree(original_targets_directory, targets_directory) + + # In this case, create_new_repository() creates the 'repository/' + # sub-directory in 'temporary_directory' if it does not exist. + repository_directory = os.path.join(temporary_directory, 'repository') + metadata_directory = os.path.join(repository_directory, + repo_tool.METADATA_STAGED_DIRECTORY_NAME) + repository = repo_tool.create_new_repository(repository_directory) + + + # (1) Load the public and private keys of the top-level roles, and one + # delegated role. + keystore_directory = os.path.join(os.pardir, 'repository_data', 'keystore') + + # Load the public keys. + root_pubkey_path = os.path.join(keystore_directory, 'root_key.pub') + targets_pubkey_path = os.path.join(keystore_directory, 'targets_key.pub') + snapshot_pubkey_path = os.path.join(keystore_directory, 'snapshot_key.pub') + timestamp_pubkey_path = os.path.join(keystore_directory, 'timestamp_key.pub') + role1_pubkey_path = os.path.join(keystore_directory, 'delegation_key.pub') + + root_pubkey = repo_tool.import_rsa_publickey_from_file(root_pubkey_path) + targets_pubkey = \ + repo_tool.import_rsa_publickey_from_file(targets_pubkey_path) + snapshot_pubkey = \ + repo_tool.import_rsa_publickey_from_file(snapshot_pubkey_path) + timestamp_pubkey = \ + repo_tool.import_rsa_publickey_from_file(timestamp_pubkey_path) + role1_pubkey = repo_tool.import_rsa_publickey_from_file(role1_pubkey_path) + + # Load the private keys. + root_privkey_path = os.path.join(keystore_directory, 'root_key') + targets_privkey_path = os.path.join(keystore_directory, 'targets_key') + snapshot_privkey_path = os.path.join(keystore_directory, 'snapshot_key') + timestamp_privkey_path = os.path.join(keystore_directory, 'timestamp_key') + role1_privkey_path = os.path.join(keystore_directory, 'delegation_key') + + root_privkey = \ + repo_tool.import_rsa_privatekey_from_file(root_privkey_path, 'password') + targets_privkey = \ + repo_tool.import_rsa_privatekey_from_file(targets_privkey_path, + 'password') + snapshot_privkey = \ + repo_tool.import_rsa_privatekey_from_file(snapshot_privkey_path, + 'password') + timestamp_privkey = \ + repo_tool.import_rsa_privatekey_from_file(timestamp_privkey_path, + 'password') + role1_privkey = \ + repo_tool.import_rsa_privatekey_from_file(role1_privkey_path, + 'password') + + + # (2) Add top-level verification keys. + repository.root.add_verification_key(root_pubkey) + repository.targets.add_verification_key(targets_pubkey) + repository.snapshot.add_verification_key(snapshot_pubkey) + + # Verify that repository.write() fails for insufficient threshold + # of signatures (default threshold = 1). + self.assertRaises(tuf.UnsignedMetadataError, repository.write) + + repository.timestamp.add_verification_key(timestamp_pubkey) + + + # (3) Load top-level signing keys. + repository.root.load_signing_key(root_privkey) + repository.targets.load_signing_key(targets_privkey) + repository.snapshot.load_signing_key(snapshot_privkey) + + # Verify that repository.write() fails for insufficient threshold + # of signatures (default threshold = 1). + self.assertRaises(tuf.UnsignedMetadataError, repository.write) + + repository.timestamp.load_signing_key(timestamp_privkey) + + + # (4) Add target files. + target1 = os.path.join(targets_directory, 'file1.txt') + target2 = os.path.join(targets_directory, 'file2.txt') + target3 = os.path.join(targets_directory, 'file3.txt') + repository.targets.add_target(target1) + repository.targets.add_target(target2) + + # (5) Perform delegation. + repository.targets.delegate('role1', [role1_pubkey], [target3]) + repository.targets('role1').load_signing_key(role1_privkey) + + # (6) Write repository. + repository.targets.compressions = ['gz'] + repository.write() + + + # Verify that the expected metadata is written. + for role in ['root.json', 'targets.json', 'snapshot.json', 'timestamp.json']: + role_filepath = os.path.join(metadata_directory, role) + role_signable = tuf.util.load_json_file(role_filepath) + + # Raise 'tuf.FormatError' if 'role_signable' is an invalid signable. + tuf.formats.check_signable_object_format(role_signable) + + if role == 'targets.json': + compressed_filepath = role_filepath + '.gz' + self.assertTrue(os.path.exists(compressed_filepath)) + + # Verify the 'role1.json' delegation is also written. + role1_filepath = os.path.join(metadata_directory, 'targets', 'role1.json') + role1_signable = tuf.util.load_json_file(role1_filepath) + tuf.formats.check_signable_object_format(role1_signable) + + # Verify that an exception is *not* raised for multiple repository.write(). + repository.write() + + # Verify that a write() fails if a repository is loaded and a change + # is made to a role. + repo_tool.load_repository(repository_directory) + + repository.timestamp.expiration = '2084-01-01 12:00:00' + self.assertRaises(tuf.UnsignedMetadataError, repository.write) + + # Verify that a write_partial() is allowed. + repository.write_partial() + + # Next, perform a non-partial write() with consistent snapshots enabled. + # Since the timestamp was modified, load its private key. + repository.timestamp.load_signing_key(timestamp_privkey) + + # Test creation of a consistent snapshot repository. Writing a consistent + # snapshot modifies the Root metadata, which specifies whether a repository + # supports consistent snapshots. Verify that an exception is raised due to + # the missing signatures of Root and Snapshot. + self.assertRaises(tuf.UnsignedMetadataError, repository.write, + False, True) + + # Load the private keys of Root and Snapshot (new version required since + # Root has changed.) + repository.root.load_signing_key(root_privkey) + repository.snapshot.load_signing_key(snapshot_privkey) + + # Verify that consistent snapshot can be written and loaded. + repository.write(consistent_snapshot=True) + repo_tool.load_repository(repository_directory) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, repository.write, 3, False) + self.assertRaises(tuf.FormatError, repository.write, False, 3) + + + + def test_get_filepaths_in_directory(self): + # Test normal case. + # Use the pre-generated metadata directory for testing. + metadata_directory = os.path.join(os.pardir, 'repository_data', + 'repository', 'metadata') + + + # Test improperly formatted arguments. + # Set 'repo' reference to improve readability. + repo = repo_tool.Repository + + self.assertRaises(tuf.FormatError, repo.get_filepaths_in_directory, + 3, recursive_walk=False, followlinks=False) + self.assertRaises(tuf.FormatError, repo.get_filepaths_in_directory, + metadata_directory, 3, followlinks=False) + self.assertRaises(tuf.FormatError, repo.get_filepaths_in_directory, + metadata_directory, recursive_walk=False, followlinks=3) + + # Test invalid directory argument. + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + nonexistent_directory = os.path.join(temporary_directory, 'nonexistent/') + self.assertRaises(tuf.Error, repo.get_filepaths_in_directory, + nonexistent_directory, recursive_walk=False, + followlinks=False) + + + + + +class TestMetadata(unittest.TestCase): + def setUp(self): + # Inherit from the repo_tool.Metadata() base class. All of the methods + # to be tested in TestMetadata require at least 1 role, so create it here + # and set its roleinfo. + class MetadataRole(repo_tool.Metadata): + + def __init__(self): + super(MetadataRole, self).__init__() + + self._rolename = 'metadata_role' + + # Expire in 86400 seconds (1 day). + expiration = tuf.formats.format_time(time.time() + 86400) + + roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1, + 'signatures': [], 'version': 0, + 'consistent_snapshot': False, + 'compressions': [''], 'expires': expiration, + 'partial_loaded': False} + + tuf.roledb.add_role(self._rolename, roleinfo) + + self.metadata = MetadataRole() + + + + def tearDown(self): + tuf.roledb.clear_roledb() + tuf.keydb.clear_keydb() + self.metadata = None + + + + def test_rolename(self): + base_metadata = repo_tool.Metadata() + + self.assertEqual(base_metadata.rolename, None) + + # Test the sub-classed MetadataRole(). + self.assertEqual(self.metadata.rolename, 'metadata_role') + + + + def test_version(self): + # Test version getter, and the default version number. + self.assertEqual(self.metadata.version, 0) + + # Test version setter, and verify updated version number. + self.metadata.version = 8 + self.assertEqual(self.metadata.version, 8) + + + + def test_threshold(self): + # Test threshold getter, and the default threshold number. + self.assertEqual(self.metadata.threshold, 1) + + # Test threshold setter, and verify updated threshold number. + self.metadata.threshold = 3 + self.assertEqual(self.metadata.threshold, 3) + + + + def test_expiration(self): + # Test expiration getter. + expiration = self.metadata.expiration + self.assertTrue(tuf.formats.TIME_SCHEMA.matches(expiration)) + + # Test expiration setter. + self.metadata.expiration = '2088-01-01 12:00:00' + expiration = self.metadata.expiration + self.assertTrue(tuf.formats.TIME_SCHEMA.matches(expiration)) + + + # Test improperly formatted datetime. + try: + self.metadata.expiration = '3' + except tuf.FormatError: + pass + else: + self.fail('Setter failed to detect improperly formatted datetime.') + + + # Test invalid argument (i.e., expiration has already expired.) + expired_datetime = tuf.formats.format_time(time.time() - 1) + try: + self.metadata.expiration = expired_datetime + except tuf.FormatError: + pass + else: + self.fail('Setter failted to detect an expired datetime.') + + + + def test_keys(self): + # Test default case, where a verification key has not been added. + self.assertEqual(self.metadata.keys, []) + + + # Test keys() getter after a verification key has been loaded. + key_path = os.path.join(os.pardir, 'repository_data', + 'keystore', 'root_key.pub') + key_object = repo_tool.import_rsa_publickey_from_file(key_path) + self.metadata.add_verification_key(key_object) + + keyid = key_object['keyid'] + self.assertEqual([keyid], self.metadata.keys) + + + + def test_signing_keys(self): + # Test default case, where a signing key has not been added. + self.assertEqual(self.metadata.signing_keys, []) + + + # Test signing_keys() getter after a signing key has been loaded. + key_path = os.path.join(os.pardir, 'repository_data', + 'keystore', 'root_key') + key_object = repo_tool.import_rsa_privatekey_from_file(key_path, 'password') + self.metadata.load_signing_key(key_object) + + keyid = key_object['keyid'] + self.assertEqual([keyid], self.metadata.signing_keys) + + + + def test_compressions(self): + # Test default case, where only uncompressed metadata is supported. + self.assertEqual(self.metadata.compressions, ['']) + + # Test compressions getter after a compressions algorithm is added. + self.metadata.compressions = ['gz'] + + self.assertEqual(self.metadata.compressions, ['', 'gz']) + + + # Test improperly formatted argument. + try: + self.metadata.compressions = 3 + except tuf.FormatError: + pass + else: + self.fail('Setter failed to detect improperly formatted compressions') + + + + def test_add_verification_key(self): + # Add verification key and verify with keys() that it was added. + key_path = os.path.join(os.pardir, 'repository_data', + 'keystore', 'root_key.pub') + key_object = repo_tool.import_rsa_publickey_from_file(key_path) + self.metadata.add_verification_key(key_object) + + keyid = key_object['keyid'] + self.assertEqual([keyid], self.metadata.keys) + + + # Test improperly formatted key argument. + self.assertRaises(tuf.FormatError, self.metadata.add_verification_key, 3) + + + + def test_remove_verification_key(self): + # Add verification key so that remove_verifiation_key() can be tested. + key_path = os.path.join(os.pardir, 'repository_data', + 'keystore', 'root_key.pub') + key_object = repo_tool.import_rsa_publickey_from_file(key_path) + self.metadata.add_verification_key(key_object) + + keyid = key_object['keyid'] + self.assertEqual([keyid], self.metadata.keys) + + + # Test successful removal of verification key added above. + self.metadata.remove_verification_key(key_object) + self.assertEqual(self.metadata.keys, []) + + + # Test improperly formatted argument + self.assertRaises(tuf.FormatError, self.metadata.remove_verification_key, 3) + + + # Test non-existent public key argument. + key_path = os.path.join(os.pardir, 'repository_data', + 'keystore', 'targets_key.pub') + unused_key_object = repo_tool.import_rsa_publickey_from_file(key_path) + + self.assertRaises(tuf.Error, self.metadata.remove_verification_key, + unused_key_object) + + + + def test_load_signing_key(self): + # Test normal case. + key_path = os.path.join(os.pardir, 'repository_data', + 'keystore', 'root_key') + key_object = repo_tool.import_rsa_privatekey_from_file(key_path, 'password') + self.metadata.load_signing_key(key_object) + + keyid = key_object['keyid'] + self.assertEqual([keyid], self.metadata.signing_keys) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, self.metadata.load_signing_key, 3) + + + # Test non-private key. + key_path = os.path.join(os.pardir, 'repository_data', + 'keystore', 'root_key.pub') + key_object = repo_tool.import_rsa_publickey_from_file(key_path) + self.assertRaises(tuf.Error, self.metadata.load_signing_key, key_object) + + + + def test_unload_signing_key(self): + # Load a signing key so that unload_signing_key() can have a key to unload. + key_path = os.path.join(os.pardir, 'repository_data', + 'keystore', 'root_key') + key_object = repo_tool.import_rsa_privatekey_from_file(key_path, 'password') + self.metadata.load_signing_key(key_object) + + keyid = key_object['keyid'] + self.assertEqual([keyid], self.metadata.signing_keys) + + self.metadata.unload_signing_key(key_object) + + self.assertEqual(self.metadata.signing_keys, []) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, self.metadata.unload_signing_key, 3) + + + # Test non-existent key argument. + key_path = os.path.join(os.pardir, 'repository_data', + 'keystore', 'targets_key') + unused_key_object = repo_tool.import_rsa_privatekey_from_file(key_path, + 'password') + + self.assertRaises(tuf.Error, self.metadata.unload_signing_key, + unused_key_object) + + + + def test_add_signature(self): + # Test normal case. + # Load signature list from any of pre-generated metadata; needed for + # testing. + metadata_directory = os.path.join(os.pardir, 'repository_data', + 'repository', 'metadata') + root_filepath = os.path.join(metadata_directory, 'root.json') + root_signable = tuf.util.load_json_file(root_filepath) + signatures = root_signable['signatures'] + + # Add the first signature from the list, as only need one is needed. + self.metadata.add_signature(signatures[0]) + self.assertEqual(signatures, self.metadata.signatures) + + + # Test improperly formatted signature argument. + self.assertRaises(tuf.FormatError, self.metadata.add_signature, 3) + + + + def test_remove_signature(self): + # Test normal case. + # Add a signature so remove_signature() has some signature to remove. + metadata_directory = os.path.join(os.pardir, 'repository_data', + 'repository', 'metadata') + root_filepath = os.path.join(metadata_directory, 'root.json') + root_signable = tuf.util.load_json_file(root_filepath) + signatures = root_signable['signatures'] + self.metadata.add_signature(signatures[0]) + + self.metadata.remove_signature(signatures[0]) + self.assertEqual(self.metadata.signatures, []) + + + # Test improperly formatted signature argument. + self.assertRaises(tuf.FormatError, self.metadata.remove_signature, 3) + + + # Test invalid signature argument (i.e., signature that has not been added.) + # Load an unused signature to be tested. + targets_filepath = os.path.join(metadata_directory, 'targets.json') + targets_signable = tuf.util.load_json_file(targets_filepath) + signatures = targets_signable['signatures'] + + self.assertRaises(tuf.Error, self.metadata.remove_signature, signatures[0]) + + + + def test_signatures(self): + # Test default case, where no signatures have been added yet. + self.assertEqual(self.metadata.signatures, []) + + + # Test getter after adding an example signature. + metadata_directory = os.path.join(os.pardir, 'repository_data', + 'repository', 'metadata') + root_filepath = os.path.join(metadata_directory, 'root.json') + root_signable = tuf.util.load_json_file(root_filepath) + signatures = root_signable['signatures'] + + # Add the first signature from the list, as only need one is needed. + self.metadata.add_signature(signatures[0]) + self.assertEqual(signatures, self.metadata.signatures) + + + +class TestRoot(unittest.TestCase): + def setUp(self): + pass + + + + def tearDown(self): + tuf.roledb.clear_roledb() + tuf.keydb.clear_keydb() + + + + def test_init(self): + + # Test normal case. + # Root() subclasses Metadata(), and creates a 'root' role in 'tuf.roledb'. + root_object = repo_tool.Root() + self.assertTrue(isinstance(root_object, repo_tool.Metadata)) + self.assertTrue(tuf.roledb.role_exists('root')) + + + +class TestTimestamp(unittest.TestCase): + def setUp(self): + pass + + + + def tearDown(self): + tuf.roledb.clear_roledb() + tuf.keydb.clear_keydb() + + + + def test_init(self): + + # Test normal case. + # Timestamp() subclasses Metadata(), and creates a 'timestamp' role in + # 'tuf.roledb'. + timestamp_object = repo_tool.Timestamp() + self.assertTrue(isinstance(timestamp_object, repo_tool.Metadata)) + self.assertTrue(tuf.roledb.role_exists('timestamp')) + + + + + +class TestSnapshot(unittest.TestCase): + def setUp(self): + pass + + + + def tearDown(self): + tuf.roledb.clear_roledb() + tuf.keydb.clear_keydb() + + + + def test_init(self): + + # Test normal case. + # Snapshot() subclasses Metadata(), and creates a 'snapshot' role in + # 'tuf.roledb'. + snapshot_object = repo_tool.Snapshot() + self.assertTrue(isinstance(snapshot_object, repo_tool.Metadata)) + self.assertTrue(tuf.roledb.role_exists('snapshot')) + + + + + +class TestTargets(unittest.TestCase): + @classmethod + def setUpClass(cls): + + # setUpClass() is called before tests in an individual class are executed. + + # Create a temporary directory to store the repository, metadata, and target + # files. 'temporary_directory' must be deleted in TearDownClass() so that + # temporary files are always removed, even when exceptions occur. + cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) + + + + @classmethod + def tearDownClass(cls): + + # tearDownModule() is called after all the tests have run. + # http://docs.python.org/2/library/unittest.html#class-and-module-fixtures + + # Remove the temporary repository directory, which should contain all the + # metadata, targets, and key files generated for the test cases. + shutil.rmtree(cls.temporary_directory) + + + + def setUp(self): + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + self.targets_directory = os.path.join(temporary_directory, 'repository', + 'targets') + original_targets_directory = os.path.join(os.pardir, 'repository_data', + 'repository', 'targets') + shutil.copytree(original_targets_directory, self.targets_directory) + self.targets_object = repo_tool.Targets(self.targets_directory) + + + + def tearDown(self): + tuf.roledb.clear_roledb() + tuf.keydb.clear_keydb() + self.targets_object = None + + + + def test_init(self): + + # Test normal case. + # Snapshot() subclasses Metadata(), and creates a 'snapshot' role in + # 'tuf.roledb'. + targets_object = repo_tool.Targets('targets_directory/') + self.assertTrue(isinstance(targets_object, repo_tool.Metadata)) + self.assertTrue(tuf.roledb.role_exists('targets')) + + # Custom Targets object rolename. + targets_object = repo_tool.Targets('targets_directory/', 'project') + self.assertTrue(isinstance(targets_object, repo_tool.Metadata)) + self.assertTrue(tuf.roledb.role_exists('project')) + + # Custom roleinfo object (i.e., tuf.formats.ROLEDB_SCHEMA). 'keyids' and + # 'threshold' are required, the rest are optional. + roleinfo = {'keyids': + ['66c4cb5fef5e4d62b7013ef1cab4b8a827a36c14056d5603c3a970e21eb30e6f'], + 'threshold': 8} + self.assertTrue(tuf.formats.ROLEDB_SCHEMA.matches(roleinfo)) + + targets_object = repo_tool.Targets('targets_directory/', 'package', roleinfo) + self.assertTrue(isinstance(targets_object, repo_tool.Metadata)) + self.assertTrue(tuf.roledb.role_exists('package')) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, repo_tool.Targets, 3) + self.assertRaises(tuf.FormatError, repo_tool.Targets, 'targets_directory/', 3) + self.assertRaises(tuf.FormatError, repo_tool.Targets, 'targets_directory/', + 'targets', 3) + + + + def test_get_delegated_rolenames(self): + # Test normal case. + # Perform two delegations so that get_delegated_rolenames() has roles to + # return. + keystore_directory = os.path.join(os.pardir, 'repository_data', 'keystore') + public_keypath = os.path.join(keystore_directory, 'root_key.pub') + public_key = repo_tool.import_rsa_publickey_from_file(public_keypath) + target1_filepath = os.path.join(self.targets_directory, 'file1.txt') + target2_filepath = os.path.join(self.targets_directory, 'file2.txt') + + # Set needed arguments by delegate(). + public_keys = [public_key] + threshold = 1 + + self.targets_object.delegate('tuf', public_keys, [target1_filepath], + threshold, restricted_paths=None, + path_hash_prefixes=None) + self.targets_object.delegate('warehouse', public_keys, [target2_filepath], + threshold, restricted_paths=None, + path_hash_prefixes=None) + + # Test that get_delegated_rolenames returns the expected delegations. + expected_delegated_rolenames = ['targets/tuf/', 'targets/warehouse'] + for delegated_rolename in self.targets_object.get_delegated_rolenames(): + delegated_rolename in expected_delegated_rolenames + + + + def test_target_files(self): + # Test normal case. + # Verify the targets object initially contains zero target files. + self.assertEqual(self.targets_object.target_files, []) + + target_filepath = os.path.join(self.targets_directory, 'file1.txt') + self.targets_object.add_target(target_filepath) + + self.assertEqual(len(self.targets_object.target_files), 1) + self.assertTrue('/file1.txt' in self.targets_object.target_files) + + + + def test_delegations(self): + # Test normal case. + # Perform a delegation so that delegations() has a Targets() object to + # return. + keystore_directory = os.path.join(os.pardir, 'repository_data', 'keystore') + public_keypath = os.path.join(keystore_directory, 'root_key.pub') + public_key = repo_tool.import_rsa_publickey_from_file(public_keypath) + target1_filepath = os.path.join(self.targets_directory, 'file1.txt') + + # Set needed arguments by delegate(). + public_keys = [public_key] + rolename = 'tuf' + list_of_targets = [target1_filepath] + threshold = 1 + + self.targets_object.delegate(rolename, public_keys, list_of_targets, + threshold, restricted_paths=None, + path_hash_prefixes=None) + + # Test that a valid Targets() object is returned by delegations(). + for delegated_object in self.targets_object.delegations: + self.assertTrue(isinstance(delegated_object, repo_tool.Targets)) + + + + def test_add_target(self): + # Test normal case. + # Verify the targets object initially contains zero target files. + self.assertEqual(self.targets_object.target_files, []) + + target_filepath = os.path.join(self.targets_directory, 'file1.txt') + self.targets_object.add_target(target_filepath) + + self.assertEqual(len(self.targets_object.target_files), 1) + self.assertTrue('/file1.txt' in self.targets_object.target_files) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, self.targets_object.add_target, + 3) + + + # Test invalid filepath argument (i.e., non-existent or invalid file.) + self.assertRaises(tuf.Error, self.targets_object.add_target, + 'non-existent.txt') + self.assertRaises(tuf.Error, self.targets_object.add_target, + self.temporary_directory) + + + + def test_add_targets(self): + # Test normal case. + # Verify the targets object initially contains zero target files. + self.assertEqual(self.targets_object.target_files, []) + + target1_filepath = os.path.join(self.targets_directory, 'file1.txt') + target2_filepath = os.path.join(self.targets_directory, 'file2.txt') + target3_filepath = os.path.join(self.targets_directory, 'file3.txt') + + target_files = [target1_filepath, target2_filepath, target3_filepath] + self.targets_object.add_targets(target_files) + + self.assertEqual(len(self.targets_object.target_files), 3) + self.assertEqual(self.targets_object.target_files, + ['/file1.txt', '/file2.txt', '/file3.txt']) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, self.targets_object.add_targets, + 3) + + + # Test invalid filepath argument (i.e., non-existent or invalid file.) + self.assertRaises(tuf.Error, self.targets_object.add_target, + ['non-existent.txt']) + self.assertRaises(tuf.Error, self.targets_object.add_target, + [target1_filepath, target2_filepath, 'non-existent.txt']) + self.assertRaises(tuf.Error, self.targets_object.add_target, + self.temporary_directory) + + + + def test_remove_target(self): + # Test normal case. + # Verify the targets object initially contains zero target files. + self.assertEqual(self.targets_object.target_files, []) + + # Add a target so that remove_target() has something to remove. + target_filepath = os.path.join(self.targets_directory, 'file1.txt') + self.targets_object.add_target(target_filepath) + + # Test remove_target()'s behavior. + self.targets_object.remove_target(target_filepath) + self.assertEqual(self.targets_object.target_files, []) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, self.targets_object.remove_target, + 3) + + + # Test invalid filepath argument (i.e., non-existent or invalid file.) + self.assertRaises(tuf.Error, self.targets_object.remove_target, + '/non-existent.txt') + + + + def test_clear_targets(self): + # Test normal case. + # Verify the targets object initially contains zero target files. + self.assertEqual(self.targets_object.target_files, []) + + # Add targets, to be tested by clear_targets(). + target1_filepath = os.path.join(self.targets_directory, 'file1.txt') + target2_filepath = os.path.join(self.targets_directory, 'file2.txt') + self.targets_object.add_targets([target1_filepath, target2_filepath]) + + self.targets_object.clear_targets() + self.assertEqual(self.targets_object.target_files, []) + + + + def test_delegate(self): + # Test normal case. + # Need at least one public key and valid target paths required by + # delegate(). + keystore_directory = os.path.join(os.pardir, 'repository_data', 'keystore') + public_keypath = os.path.join(keystore_directory, 'root_key.pub') + public_key = repo_tool.import_rsa_publickey_from_file(public_keypath) + target1_filepath = os.path.join(self.targets_directory, 'file1.txt') + target2_filepath = os.path.join(self.targets_directory, 'file2.txt') + + + # Set needed arguments by delegate(). + public_keys = [public_key] + rolename = 'tuf' + list_of_targets = [target1_filepath, target2_filepath] + threshold = 1 + restricted_paths = [self.targets_directory] + path_hash_prefixes = ['e3a3', '8fae', 'd543'] + + self.targets_object.delegate(rolename, public_keys, list_of_targets, + threshold, restricted_paths, + path_hash_prefixes) + + self.assertEqual(self.targets_object.get_delegated_rolenames(), + ['targets/tuf']) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, self.targets_object.delegate, + 3, public_keys, list_of_targets, threshold, + restricted_paths, path_hash_prefixes) + self.assertRaises(tuf.FormatError, self.targets_object.delegate, + rolename, 3, list_of_targets, threshold, + restricted_paths, path_hash_prefixes) + self.assertRaises(tuf.FormatError, self.targets_object.delegate, + rolename, public_keys, 3, threshold, + restricted_paths, path_hash_prefixes) + self.assertRaises(tuf.FormatError, self.targets_object.delegate, + rolename, public_keys, list_of_targets, '3', + restricted_paths, path_hash_prefixes) + self.assertRaises(tuf.FormatError, self.targets_object.delegate, + rolename, public_keys, list_of_targets, threshold, + 3, path_hash_prefixes) + self.assertRaises(tuf.FormatError, self.targets_object.delegate, + rolename, public_keys, list_of_targets, threshold, + restricted_paths, 3) + + + # Test invalid arguments (e.g., already delegated 'rolename', non-existent + # files, etc.). + # Test duplicate 'rolename' delegation, which should have been delegated + # in the normal case above. + self.assertRaises(tuf.Error, self.targets_object.delegate, + rolename, public_keys, list_of_targets, threshold, + restricted_paths, path_hash_prefixes) + + # Test non-existent target paths. + self.assertRaises(tuf.Error, self.targets_object.delegate, + rolename, public_keys, ['/non-existent'], threshold, + restricted_paths, path_hash_prefixes) + + + + def test_delegate_hashed_bins(self): + # Test normal case. + keystore_directory = os.path.join(os.pardir, 'repository_data', 'keystore') + public_keypath = os.path.join(keystore_directory, 'root_key.pub') + public_key = repo_tool.import_rsa_publickey_from_file(public_keypath) + target1_filepath = os.path.join(self.targets_directory, 'file1.txt') + + # Set needed arguments by delegate_hashed_bins(). + public_keys = [public_key] + list_of_targets = [target1_filepath] + + # Test delegate_hashed_bins() and verify that 16 hashed bins have + # been delegated in the parent's roleinfo. + self.targets_object.delegate_hashed_bins(list_of_targets, public_keys, + number_of_bins=16) + + # The expected child rolenames, since 'number_of_bins' = 16 + delegated_rolenames = ['0', '1', '2', '3', '4', '5', '6', '7', + '8', '9', 'a', 'b', 'c', 'd', 'e', 'f'] + + # Prepend the parent's rolename. + expected_delegated_rolenames = [] + for rolename in delegated_rolenames: + rolename = self.targets_object.rolename + '/' + rolename + expected_delegated_rolenames.append(rolename) + + self.assertEqual(sorted(self.targets_object.get_delegated_rolenames()), + sorted(expected_delegated_rolenames)) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, + self.targets_object.delegate_hashed_bins, 3, public_keys, + number_of_bins=1) + self.assertRaises(tuf.FormatError, + self.targets_object.delegate_hashed_bins, + list_of_targets, 3, number_of_bins=1) + self.assertRaises(tuf.FormatError, + self.targets_object.delegate_hashed_bins, + list_of_targets, public_keys, '1') + + + # Test invalid arguments. + # Invalid number of bins, which must be a power of 2. + self.assertRaises(tuf.Error, + self.targets_object.delegate_hashed_bins, + list_of_targets, public_keys, number_of_bins=3) + + # Invalid 'list_of_targets'. + self.assertRaises(tuf.Error, + self.targets_object.delegate_hashed_bins, + ['/non-existent'], public_keys, number_of_bins=3) + + + + def test_add_restricted_paths(self): + # Test normal case. + # Perform a delegation so that add_restricted_paths() has a child role + # to restrict. + keystore_directory = os.path.join(os.pardir, 'repository_data', 'keystore') + public_keypath = os.path.join(keystore_directory, 'root_key.pub') + public_key = repo_tool.import_rsa_publickey_from_file(public_keypath) + + # Set needed arguments by delegate(). + public_keys = [public_key] + rolename = 'tuf' + threshold = 1 + + self.targets_object.delegate(rolename, public_keys, [], + threshold, restricted_paths=None, + path_hash_prefixes=None) + + restricted_path = os.path.join(self.targets_directory, 'tuf_files') + os.mkdir(restricted_path) + restricted_paths = [restricted_path] + self.targets_object.add_restricted_paths(restricted_paths, 'tuf') + + # Retrieve 'targets_object' roleinfo, and verify the roleinfo contains + # the expected restricted paths of the delegated role. Only + # Repository.write() verifies that child target paths are allowed by the + # parent. + targets_object_roleinfo = tuf.roledb.get_roleinfo(self.targets_object.rolename) + + delegated_role = targets_object_roleinfo['delegations']['roles'][0] + self.assertEqual(['/tuf_files/'], delegated_role['paths']) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, self.targets_object.add_restricted_paths, + 3, 'tuf') + self.assertRaises(tuf.FormatError, self.targets_object.add_restricted_paths, + restricted_paths, 3) + + + # Test invalid arguments. + # A non-delegated child role. + self.assertRaises(tuf.Error, self.targets_object.add_restricted_paths, + restricted_paths, 'non_delegated_rolename') + + # Non-existent 'restricted_paths'. + self.assertRaises(tuf.Error, self.targets_object.add_restricted_paths, + ['/non-existent'], 'tuf') + + + + def test_revoke(self): + # Test normal case. + # Perform a delegation so that revoke() has a delegation to revoke. + keystore_directory = os.path.join(os.pardir, 'repository_data', 'keystore') + public_keypath = os.path.join(keystore_directory, 'root_key.pub') + public_key = repo_tool.import_rsa_publickey_from_file(public_keypath) + target1_filepath = os.path.join(self.targets_directory, 'file1.txt') + + # Set needed arguments by delegate(). + public_keys = [public_key] + rolename = 'tuf' + list_of_targets = [target1_filepath] + threshold = 1 + + self.targets_object.delegate(rolename, public_keys, list_of_targets, + threshold, restricted_paths=None, + path_hash_prefixes=None) + + # Test revoke() + self.targets_object.revoke('tuf') + self.assertEqual(self.targets_object.get_delegated_rolenames(), []) + + + # Test improperly formatted rolename argument. + self.assertRaises(tuf.FormatError, self.targets_object.revoke, 3) + + + + + +class TestRepositoryToolFunctions(unittest.TestCase): + @classmethod + def setUpClass(cls): + + # setUpClass() is called before tests in an individual class are executed. + + # Create a temporary directory to store the repository, metadata, and target + # files. 'temporary_directory' must be deleted in TearDownClass() so that + # temporary files are always removed, even when exceptions occur. + cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) + + + + @classmethod + def tearDownClass(cls): + + # tearDownModule() is called after all the tests have run. + # http://docs.python.org/2/library/unittest.html#class-and-module-fixtures + + # Remove the temporary repository directory, which should contain all the + # metadata, targets, and key files generated for the test cases. + shutil.rmtree(cls.temporary_directory) + + + + def setUp(self): + pass + + + def tearDown(self): + pass + + + + def test_create_new_repository(self): + # Test normal case. + # Setup the temporary repository directories needed by + # create_new_repository(). + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + repository_directory = os.path.join(temporary_directory, 'repository') + metadata_directory = os.path.join(repository_directory, + repo_tool.METADATA_STAGED_DIRECTORY_NAME) + targets_directory = os.path.join(repository_directory, + repo_tool.TARGETS_DIRECTORY_NAME) + + repository = repo_tool.create_new_repository(repository_directory) + self.assertTrue(isinstance(repository, repo_tool.Repository)) + + # Verify that the 'repository/', 'repository/metadata', and + # 'repository/targets' directories were created. + self.assertTrue(os.path.exists(repository_directory)) + self.assertTrue(os.path.exists(metadata_directory)) + self.assertTrue(os.path.exists(targets_directory)) + + + # Test that the 'repository' directory is created (along with the other + # sub-directories) when it does not exist yet. The repository tool creates + # the non-existent directory. + shutil.rmtree(repository_directory) + + repository = repo_tool.create_new_repository(repository_directory) + repository = repo_tool.create_new_repository(repository_directory) + self.assertTrue(isinstance(repository, repo_tool.Repository)) + + # Verify that the 'repository/', 'repository/metadata', and + # 'repository/targets' directories were created. + self.assertTrue(os.path.exists(repository_directory)) + self.assertTrue(os.path.exists(metadata_directory)) + self.assertTrue(os.path.exists(targets_directory)) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, repo_tool.create_new_repository, 3) + + + + def test_load_repository(self): + # Test normal case. + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + original_repository_directory = os.path.join(os.pardir, 'repository_data', + 'repository') + repository_directory = os.path.join(temporary_directory, 'repository') + shutil.copytree(original_repository_directory, repository_directory) + + repository = repo_tool.load_repository(repository_directory) + self.assertTrue(isinstance(repository, repo_tool.Repository)) + + # Verify the expected roles have been loaded. See + # 'tuf/tests/repository_data/repository/'. + expected_roles = \ + ['root', 'targets', 'snapshot', 'timestamp', 'targets/role1'] + for role in tuf.roledb.get_rolenames(): + self.assertTrue(role in expected_roles) + + self.assertTrue(len(repository.root.keys)) + self.assertTrue(len(repository.targets.keys)) + self.assertTrue(len(repository.snapshot.keys)) + self.assertTrue(len(repository.timestamp.keys)) + self.assertTrue(len(repository.targets('role1').keys)) + + # Assumed the targets (tuf/tests/repository_data/) role contains 'file1.txt' + # and 'file2.txt'. + self.assertTrue('/file1.txt' in repository.targets.target_files) + self.assertTrue('/file2.txt' in repository.targets.target_files) + self.assertTrue('/file3.txt' in repository.targets('role1').target_files) + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, repo_tool.load_repository, 3) + + + # Test for invalid 'repository_directory' (i.e., does not contain the + # minimum required metadata. + root_filepath = \ + os.path.join(repository_directory, + repo_tool.METADATA_STAGED_DIRECTORY_NAME, 'root.json') + os.remove(root_filepath) + self.assertRaises(tuf.RepositoryError, repo_tool.load_repository, + repository_directory) + + + + def test_generate_and_write_rsa_keypair(self): + + # Test normal case. + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + test_keypath = os.path.join(temporary_directory, 'rsa_key') + + repo_tool.generate_and_write_rsa_keypair(test_keypath, password='pw') + self.assertTrue(os.path.exists(test_keypath)) + self.assertTrue(os.path.exists(test_keypath + '.pub')) + + # Ensure the generated key files are importable. + imported_pubkey = \ + repo_tool.import_rsa_publickey_from_file(test_keypath + '.pub') + self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(imported_pubkey)) + + imported_privkey = \ + repo_tool.import_rsa_privatekey_from_file(test_keypath, 'pw') + self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(imported_privkey)) + + # Custom 'bits' argument. + os.remove(test_keypath) + os.remove(test_keypath + '.pub') + repo_tool.generate_and_write_rsa_keypair(test_keypath, bits=2048, + password='pw') + self.assertTrue(os.path.exists(test_keypath)) + self.assertTrue(os.path.exists(test_keypath + '.pub')) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, repo_tool.generate_and_write_rsa_keypair, + 3, bits=2048, password='pw') + self.assertRaises(tuf.FormatError, repo_tool.generate_and_write_rsa_keypair, + test_keypath, bits='bad', password='pw') + self.assertRaises(tuf.FormatError, repo_tool.generate_and_write_rsa_keypair, + test_keypath, bits=2048, password=3) + + + # Test invalid 'bits' argument. + self.assertRaises(tuf.FormatError, repo_tool.generate_and_write_rsa_keypair, + test_keypath, bits=1024, password='pw') + + + + def test_import_rsa_privatekey_from_file(self): + # Test normal case. + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + + # Load one of the pre-generated key files from 'tuf/tests/repository_data'. + # 'password' unlocks the pre-generated key files. + key_filepath = os.path.join(os.pardir, 'repository_data', 'keystore', + 'root_key') + self.assertTrue(os.path.exists(key_filepath)) + + imported_rsa_key = repo_tool.import_rsa_privatekey_from_file(key_filepath, + 'password') + self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(imported_rsa_key)) + + + # Test improperly formatted argument. + self.assertRaises(tuf.FormatError, + repo_tool.import_rsa_privatekey_from_file, 3, 'pw') + + + # Test invalid argument. + # Non-existent key file. + nonexistent_keypath = os.path.join(temporary_directory, + 'nonexistent_keypath') + self.assertRaises(IOError, repo_tool.import_rsa_privatekey_from_file, + nonexistent_keypath, 'pw') + + # Invalid key file argument. + invalid_keyfile = os.path.join(temporary_directory, 'invalid_keyfile') + with open(invalid_keyfile, 'wb') as file_object: + file_object.write('bad keyfile') + self.assertRaises(tuf.CryptoError, repo_tool.import_rsa_privatekey_from_file, + invalid_keyfile, 'pw') + + + + def test_import_rsa_publickey_from_file(self): + # Test normal case. + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + + # Load one of the pre-generated key files from 'tuf/tests/repository_data'. + key_filepath = os.path.join(os.pardir, 'repository_data', 'keystore', + 'root_key.pub') + self.assertTrue(os.path.exists(key_filepath)) + + imported_rsa_key = repo_tool.import_rsa_publickey_from_file(key_filepath) + self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(imported_rsa_key)) + + + # Test improperly formatted argument. + self.assertRaises(tuf.FormatError, + repo_tool.import_rsa_privatekey_from_file, 3) + + + # Test invalid argument. + # Non-existent key file. + nonexistent_keypath = os.path.join(temporary_directory, + 'nonexistent_keypath') + self.assertRaises(IOError, repo_tool.import_rsa_publickey_from_file, + nonexistent_keypath) + + # Invalid key file argument. + invalid_keyfile = os.path.join(temporary_directory, 'invalid_keyfile') + with open(invalid_keyfile, 'wb') as file_object: + file_object.write('bad keyfile') + self.assertRaises(tuf.Error, repo_tool.import_rsa_publickey_from_file, + invalid_keyfile) + + + + def test_generate_and_write_ed25519_keypair(self): + + # Test normal case. + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + test_keypath = os.path.join(temporary_directory, 'ed25519_key') + + repo_tool.generate_and_write_ed25519_keypair(test_keypath, password='pw') + self.assertTrue(os.path.exists(test_keypath)) + self.assertTrue(os.path.exists(test_keypath + '.pub')) + + # Ensure the generated key files are importable. + imported_pubkey = \ + repo_tool.import_ed25519_publickey_from_file(test_keypath + '.pub') + self.assertTrue(tuf.formats.ED25519KEY_SCHEMA.matches(imported_pubkey)) + + imported_privkey = \ + repo_tool.import_ed25519_privatekey_from_file(test_keypath, 'pw') + self.assertTrue(tuf.formats.ED25519KEY_SCHEMA.matches(imported_privkey)) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, + repo_tool.generate_and_write_ed25519_keypair, + 3, password='pw') + self.assertRaises(tuf.FormatError, repo_tool.generate_and_write_rsa_keypair, + test_keypath, password=3) + + + + def test_import_ed25519_publickey_from_file(self): + # Test normal case. + # Generate ed25519 keys that can be imported. + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + ed25519_keypath = os.path.join(temporary_directory, 'ed25519_key') + repo_tool.generate_and_write_ed25519_keypair(ed25519_keypath, password='pw') + + imported_ed25519_key = \ + repo_tool.import_ed25519_publickey_from_file(ed25519_keypath + '.pub') + self.assertTrue(tuf.formats.ED25519KEY_SCHEMA.matches(imported_ed25519_key)) + + + # Test improperly formatted argument. + self.assertRaises(tuf.FormatError, + repo_tool.import_ed25519_publickey_from_file, 3) + + + # Test invalid argument. + # Non-existent key file. + nonexistent_keypath = os.path.join(temporary_directory, + 'nonexistent_keypath') + self.assertRaises(IOError, repo_tool.import_ed25519_publickey_from_file, + nonexistent_keypath) + + # Invalid key file argument. + invalid_keyfile = os.path.join(temporary_directory, 'invalid_keyfile') + with open(invalid_keyfile, 'wb') as file_object: + file_object.write('bad keyfile') + + self.assertRaises(tuf.Error, repo_tool.import_ed25519_publickey_from_file, + invalid_keyfile) + + + + def test_import_ed25519_privatekey_from_file(self): + # Test normal case. + # Generate ed25519 keys that can be imported. + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + ed25519_keypath = os.path.join(temporary_directory, 'ed25519_key') + repo_tool.generate_and_write_ed25519_keypair(ed25519_keypath, password='pw') + + imported_ed25519_key = \ + repo_tool.import_ed25519_privatekey_from_file(ed25519_keypath, 'pw') + self.assertTrue(tuf.formats.ED25519KEY_SCHEMA.matches(imported_ed25519_key)) + + + # Test improperly formatted argument. + self.assertRaises(tuf.FormatError, + repo_tool.import_ed25519_privatekey_from_file, 3, 'pw') + + + # Test invalid argument. + # Non-existent key file. + nonexistent_keypath = os.path.join(temporary_directory, + 'nonexistent_keypath') + self.assertRaises(IOError, repo_tool.import_ed25519_privatekey_from_file, + nonexistent_keypath, 'pw') + + # Invalid key file argument. + invalid_keyfile = os.path.join(temporary_directory, 'invalid_keyfile') + with open(invalid_keyfile, 'wb') as file_object: + file_object.write('bad keyfile') + + self.assertRaises(tuf.Error, repo_tool.import_ed25519_privatekey_from_file, + invalid_keyfile, 'pw') + + + + def test_get_metadata_filenames(self): + + # Test normal case. + metadata_directory = os.path.join('metadata/') + filenames = {'root.json': metadata_directory + 'root.json', + 'targets.json': metadata_directory + 'targets.json', + 'snapshot.json': metadata_directory + 'snapshot.json', + 'timestamp.json': metadata_directory + 'timestamp.json'} + + self.assertEqual(filenames, repo_tool.get_metadata_filenames('metadata/')) + + # If a directory argument is not specified, the current working directory + # is used. + metadata_directory = os.getcwd() + filenames = {'root.json': os.path.join(metadata_directory, 'root.json'), + 'targets.json': os.path.join(metadata_directory, 'targets.json'), + 'snapshot.json': os.path.join(metadata_directory, 'snapshot.json'), + 'timestamp.json': os.path.join(metadata_directory, 'timestamp.json')} + self.assertEqual(filenames, repo_tool.get_metadata_filenames()) + + + # Test improperly formatted argument. + self.assertRaises(tuf.FormatError, repo_tool.get_metadata_filenames, 3) + + + + def test_get_metadata_fileinfo(self): + # Test normal case. + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + test_filepath = os.path.join(temporary_directory, 'file.txt') + + with open(test_filepath, 'wb') as file_object: + file_object.write('test file') + + # Generate test fileinfo object. It is assumed SHA256 hashes are computed + # by get_metadata_fileinfo(). + file_length = os.path.getsize(test_filepath) + digest_object = tuf.hash.digest_filename(test_filepath) + file_hashes = {'sha256': digest_object.hexdigest()} + fileinfo = {'length': file_length, 'hashes': file_hashes} + self.assertTrue(tuf.formats.FILEINFO_SCHEMA.matches(fileinfo)) + + self.assertEqual(fileinfo, repo_tool.get_metadata_fileinfo(test_filepath)) + + + # Test improperly formatted argument. + self.assertRaises(tuf.FormatError, repo_tool.get_metadata_fileinfo, 3) + + + # Test non-existent file. + nonexistent_filepath = os.path.join(temporary_directory, 'oops.txt') + self.assertRaises(tuf.Error, repo_tool.get_metadata_fileinfo, + nonexistent_filepath) + + + + def test_get_target_hash(self): + # Test normal case. + expected_target_hashes = { + '/file1.txt': 'e3a3d89eb3b70ce3fbce6017d7b8c12d4abd5635427a0e8a238f53157df85b3d', + '/README.txt': '8faee106f1bb69f34aaf1df1e3c2e87d763c4d878cb96b91db13495e32ceb0b0', + '/packages/file2.txt': 'c9c4a5cdd84858dd6a23d98d7e6e6b2aec45034946c16b2200bc317c75415e92' + } + for filepath, target_hash in expected_target_hashes.items(): + self.assertTrue(tuf.formats.RELPATH_SCHEMA.matches(filepath)) + self.assertTrue(tuf.formats.HASH_SCHEMA.matches(target_hash)) + self.assertEqual(repo_tool.get_target_hash(filepath), target_hash) + + # Test for improperly formatted argument. + self.assertRaises(tuf.FormatError, repo_tool.get_target_hash, 8) + + + + def test_generate_root_metadata(self): + # Test normal case. + # Load the root metadata provided in 'tuf/tests/repository_data/'. + root_filepath = os.path.join(os.pardir, 'repository_data', 'repository', + 'metadata', 'root.json') + root_signable = tuf.util.load_json_file(root_filepath) + + # generate_root_metadata() expects the top-level roles and keys to be + # available in 'tuf.keydb' and 'tuf.roledb'. + tuf.roledb.create_roledb_from_root_metadata(root_signable['signed']) + tuf.keydb.create_keydb_from_root_metadata(root_signable['signed']) + + root_metadata = repo_tool.generate_root_metadata(1, '2088-01-01 12:00:00 UTC', + consistent_snapshot=False) + self.assertTrue(tuf.formats.ROOT_SCHEMA.matches(root_metadata)) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, repo_tool.generate_root_metadata, + '3', '2088-01-01 12:00:00 UTC', False) + self.assertRaises(tuf.FormatError, repo_tool.generate_root_metadata, + 1, 3, False) + self.assertRaises(tuf.FormatError, repo_tool.generate_root_metadata, + 1, '2088-01-01 12:00:00 UTC', 3) + + # Test for missing required roles and keys. + tuf.roledb.clear_roledb() + tuf.keydb.clear_keydb() + self.assertRaises(tuf.Error, repo_tool.generate_root_metadata, + 1, '2088-01-01 12:00:00 UTC', False) + + + + def test_generate_targets_metadata(self): + # Test normal case. + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + targets_directory = os.path.join(temporary_directory, 'targets') + file1_path = os.path.join(targets_directory, 'file.txt') + tuf.util.ensure_parent_dir(file1_path) + + with open(file1_path, 'wb') as file_object: + file_object.write('test file.') + + # Set valid generate_targets_metadata() arguments. + version = 1 + expiration_date = '2088-01-01 12:00:00 UTC' + target_files = ['file.txt'] + + delegations = {"keys": { + "a394c28384648328b16731f81440d72243c77bb44c07c040be99347f0df7d7bf": { + "keytype": "ed25519", + "keyval": { + "public": "3eb81026ded5af2c61fb3d4b272ac53cd1049a810ee88f4df1fc35cdaf918157" + } + } + }, + "roles": [ + { + "keyids": [ + "a394c28384648328b16731f81440d72243c77bb44c07c040be99347f0df7d7bf" + ], + "name": "targets/warehouse", + "paths": [ + "/file1.txt", "/README.txt", '/warehouse/' + ], + "threshold": 1 + } + ] + } + + targets_metadata = \ + repo_tool.generate_targets_metadata(targets_directory, target_files, + version, expiration_date, delegations, + False) + self.assertTrue(tuf.formats.TARGETS_SCHEMA.matches(targets_metadata)) + + # Verify that 'digest.filename' file is saved to 'targets_directory' if + # the 'write_consistent_targets' argument is True. + list_targets_directory = os.listdir(targets_directory) + targets_metadata = \ + repo_tool.generate_targets_metadata(targets_directory, target_files, + version, expiration_date, delegations, + write_consistent_targets=True) + new_list_targets_directory = os.listdir(targets_directory) + + # Verify that 'targets_directory' contains only one extra item. + self.assertTrue(len(list_targets_directory) + 1, + len(new_list_targets_directory)) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, repo_tool.generate_targets_metadata, + 3, target_files, version, expiration_date) + self.assertRaises(tuf.FormatError, repo_tool.generate_targets_metadata, + targets_directory, 3, version, expiration_date) + self.assertRaises(tuf.FormatError, repo_tool.generate_targets_metadata, + targets_directory, target_files, '3', expiration_date) + self.assertRaises(tuf.FormatError, repo_tool.generate_targets_metadata, + targets_directory, target_files, version, 3) + + # Improperly formatted 'delegations' and 'write_consistent_targets' + self.assertRaises(tuf.FormatError, repo_tool.generate_targets_metadata, + targets_directory, target_files, version, expiration_date, + 3, False) + self.assertRaises(tuf.FormatError, repo_tool.generate_targets_metadata, + targets_directory, target_files, version, expiration_date, + delegations, 3) + + + # Test invalid 'target_files' argument. + self.assertRaises(tuf.Error, repo_tool.generate_targets_metadata, + targets_directory, ['nonexistent_file.txt'], version, + expiration_date) + + + + + def test_generate_snapshot_metadata(self): + # Test normal case. + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + original_repository_path = os.path.join(os.pardir, 'repository_data', + 'repository') + repository_directory = os.path.join(temporary_directory, 'repository') + shutil.copytree(original_repository_path, repository_directory) + metadata_directory = os.path.join(repository_directory, + repo_tool.METADATA_STAGED_DIRECTORY_NAME) + root_filename = os.path.join(metadata_directory, repo_tool.ROOT_FILENAME) + targets_filename = os.path.join(metadata_directory, + repo_tool.TARGETS_FILENAME) + version = 1 + expiration_date = '2088-01-01 12:00:00 UTC' + + snapshot_metadata = \ + repo_tool.generate_snapshot_metadata(metadata_directory, version, + expiration_date, root_filename, + targets_filename, + consistent_snapshot=False) + self.assertTrue(tuf.formats.SNAPSHOT_SCHEMA.matches(snapshot_metadata)) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, repo_tool.generate_snapshot_metadata, + 3, version, expiration_date, + root_filename, targets_filename, consistent_snapshot=False) + self.assertRaises(tuf.FormatError, repo_tool.generate_snapshot_metadata, + metadata_directory, '3', expiration_date, + root_filename, targets_filename, consistent_snapshot=False) + self.assertRaises(tuf.FormatError, repo_tool.generate_snapshot_metadata, + metadata_directory, version, 3, + root_filename, targets_filename, consistent_snapshot=False) + self.assertRaises(tuf.FormatError, repo_tool.generate_snapshot_metadata, + metadata_directory, version, expiration_date, + 3, targets_filename, consistent_snapshot=False) + self.assertRaises(tuf.FormatError, repo_tool.generate_snapshot_metadata, + metadata_directory, version, expiration_date, + root_filename, 3, consistent_snapshot=False) + self.assertRaises(tuf.FormatError, repo_tool.generate_snapshot_metadata, + metadata_directory, version, expiration_date, + root_filename, targets_filename, 3) + + + + def test_generate_timestamp_metadata(self): + # Test normal case. + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + original_repository_path = os.path.join(os.pardir, 'repository_data', + 'repository') + repository_directory = os.path.join(temporary_directory, 'repository') + shutil.copytree(original_repository_path, repository_directory) + metadata_directory = os.path.join(repository_directory, + repo_tool.METADATA_STAGED_DIRECTORY_NAME) + snapshot_filename = os.path.join(metadata_directory, + repo_tool.SNAPSHOT_FILENAME) + + # Set valid generate_timestamp_metadata() arguments. + version = 1 + expiration_date = '2088-01-01 12:00:00 UTC' + compressions = ['gz'] + + snapshot_metadata = \ + repo_tool.generate_timestamp_metadata(snapshot_filename, version, + expiration_date, compressions) + self.assertTrue(tuf.formats.TIMESTAMP_SCHEMA.matches(snapshot_metadata)) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, repo_tool.generate_timestamp_metadata, + 3, version, expiration_date, compressions) + self.assertRaises(tuf.FormatError, repo_tool.generate_timestamp_metadata, + snapshot_filename, '3', expiration_date, compressions) + self.assertRaises(tuf.FormatError, repo_tool.generate_timestamp_metadata, + snapshot_filename, version, 3, compressions) + self.assertRaises(tuf.FormatError, repo_tool.generate_timestamp_metadata, + snapshot_filename, version, expiration_date, 3) + self.assertRaises(tuf.FormatError, repo_tool.generate_timestamp_metadata, + snapshot_filename, version, expiration_date, ['compress']) + + + + + def test_sign_metadata(self): + # Test normal case. + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + metadata_path = os.path.join(os.pardir, 'repository_data', + 'repository', 'metadata') + keystore_path = os.path.join(os.pardir, 'repository_data', + 'keystore') + root_filename = os.path.join(metadata_path, 'root.json') + root_metadata = tuf.util.load_json_file(root_filename)['signed'] + + tuf.keydb.create_keydb_from_root_metadata(root_metadata) + tuf.roledb.create_roledb_from_root_metadata(root_metadata) + root_keyids = tuf.roledb.get_role_keyids('root') + + root_private_keypath = os.path.join(keystore_path, 'root_key') + root_private_key = \ + repo_tool.import_rsa_privatekey_from_file(root_private_keypath, + 'password') + + # sign_metadata() expects the private key 'root_metadata' to be in + # 'tuf.keydb'. Remove any public keys that may be loaded before + # adding private key, otherwise a 'tuf.KeyAlreadyExists' exception is + # raised. + tuf.keydb.remove_key(root_private_key['keyid']) + tuf.keydb.add_key(root_private_key) + + root_signable = repo_tool.sign_metadata(root_metadata, root_keyids, + root_filename) + self.assertTrue(tuf.formats.SIGNABLE_SCHEMA.matches(root_signable)) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, repo_tool.sign_metadata, 3, root_keyids, + 'root.json') + self.assertRaises(tuf.FormatError, repo_tool.sign_metadata, root_metadata, + 3, 'root.json') + self.assertRaises(tuf.FormatError, repo_tool.sign_metadata, root_metadata, + root_keyids, 3) + + + + def test_write_metadata_file(self): + # Test normal case. + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + metadata_directory = os.path.join(os.pardir, 'repository_data', + 'repository', 'metadata') + root_filename = os.path.join(metadata_directory, 'root.json') + root_signable = tuf.util.load_json_file(root_filename) + + output_filename = os.path.join(temporary_directory, 'root.json') + compressions = ['gz'] + + self.assertFalse(os.path.exists(output_filename)) + repo_tool.write_metadata_file(root_signable, output_filename, compressions, + consistent_snapshot=False) + self.assertTrue(os.path.exists(output_filename)) + self.assertTrue(os.path.exists(output_filename + '.gz')) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, repo_tool.write_metadata_file, + 3, output_filename, compressions, False) + self.assertRaises(tuf.FormatError, repo_tool.write_metadata_file, + root_signable, 3, compressions, False) + self.assertRaises(tuf.FormatError, repo_tool.write_metadata_file, + root_signable, output_filename, 3, False) + self.assertRaises(tuf.FormatError, repo_tool.write_metadata_file, + root_signable, output_filename, compressions, 3) + + + + def test_create_tuf_client_directory(self): + # Test normal case. + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + repository_directory = os.path.join(os.pardir, 'repository_data', + 'repository') + client_directory = os.path.join(temporary_directory, 'client') + + repo_tool.create_tuf_client_directory(repository_directory, client_directory) + + self.assertTrue(os.path.exists(client_directory)) + metadata_directory = os.path.join(client_directory, 'metadata') + current_directory = os.path.join(metadata_directory, 'current') + previous_directory = os.path.join(metadata_directory, 'previous') + self.assertTrue(os.path.exists(client_directory)) + self.assertTrue(os.path.exists(metadata_directory)) + self.assertTrue(os.path.exists(current_directory)) + self.assertTrue(os.path.exists(previous_directory)) + + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, repo_tool.create_tuf_client_directory, + 3, client_directory) + self.assertRaises(tuf.FormatError, repo_tool.create_tuf_client_directory, + repository_directory, 3) + + + # Test invalid argument (i.e., client directory already exists.) + self.assertRaises(tuf.RepositoryError, repo_tool.create_tuf_client_directory, + repository_directory, client_directory) + + +# Run the test cases. +if __name__ == '__main__': + unittest.main() diff --git a/tests/unit/test_updater.py b/tests/unit/test_updater.py index 95476a34..d8e1a5b4 100755 --- a/tests/unit/test_updater.py +++ b/tests/unit/test_updater.py @@ -5,7 +5,7 @@ test_updater.py - Konstantin Andrianov + Konstantin Andrianov. October 15, 2012. diff --git a/tuf/pycrypto_keys.py b/tuf/pycrypto_keys.py index a5b59f03..def95632 100755 --- a/tuf/pycrypto_keys.py +++ b/tuf/pycrypto_keys.py @@ -894,9 +894,15 @@ def _decrypt(file_contents, password): # from 'file_contents'. These five values are delimited by # '_ENCRYPTION_DELIMITER'. This delimiter is arbitrarily chosen and should # not occur in the hexadecimal representations of the fields it is separating. - salt, iterations, hmac, iv, ciphertext = \ - file_contents.split(_ENCRYPTION_DELIMITER) + # Raise 'tuf.CryptoError', if 'file_contents' does not contains the expected + # data layout. + try: + salt, iterations, hmac, iv, ciphertext = \ + file_contents.split(_ENCRYPTION_DELIMITER) + except ValueError: + raise tuf.CryptoError('Invalid encrypted file.') + # Ensure we have the expected raw data for the delimited cryptographic data. salt = binascii.unhexlify(salt) iterations = int(binascii.unhexlify(iterations)) diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index 69211c4e..5e1a3866 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -188,7 +188,7 @@ def write(self, write_partial=False, consistent_snapshot=False): private keys, etc. - write_partial: + mrite_partial: A boolean indicating whether partial metadata should be written to disk. Partial metadata may be written to allow multiple maintainters to independently sign and update role metadata. write() raises an @@ -878,7 +878,7 @@ def keys(self): A getter method that returns the role's keyids of the keys. The role is expected to eventually contain a threshold of signatures generated - by the private keys of each of the role's keys (returned here as a keyid). + by the private keys of each of the role's keys (returned here as a keyid.) None. @@ -1117,7 +1117,8 @@ def expiration(self, expiration_datetime_utc): 'tuf.formats.DATETIME_SCHEMA'. - tuf.FormatError, if 'expiration_datetime_utc' is improperly formatted. + tuf.FormatError, if 'expiration_datetime_utc' is improperly formatted, + or invalid (e.g., already expired). Modifies the expiration attribute of the Repository object. @@ -1135,7 +1136,7 @@ def expiration(self, expiration_datetime_utc): # Further validate the datetime, such as a correct date, time, expiration. # Convert 'expiration_datetime_utc' to a unix timestamp so that it can be # compared with time.time(). - expiration_datetime_utc = expiration_datetime_utc+' UTC' + expiration_datetime_utc = expiration_datetime_utc + ' UTC' try: unix_timestamp = tuf.formats.parse_time(expiration_datetime_utc) @@ -1479,13 +1480,14 @@ class Targets(Metadata): tuf.FormatError, if the arguments are improperly formatted. - Modifies the roleinfo of the targets role in 'tuf.roledb'. + Modifies the roleinfo of the targets role in 'tuf.roledb', or creates + a default one named 'targets'. None. """ - def __init__(self, targets_directory, rolename, roleinfo=None): + def __init__(self, targets_directory, rolename='targets', roleinfo=None): # Do the arguments have the correct format? # Ensure the arguments have the appropriate number of objects and object @@ -1609,7 +1611,8 @@ def add_restricted_paths(self, list_of_directory_paths, child_rolename): child_rolename: The child delegation that requires an update to its restricted paths, - as listed in the parent role's delegations. + as listed in the parent role's delegations (e.g., 'Django' in + 'targets/unclaimed/Django'). tuf.Error, if a directory path in 'list_of_directory_paths' is not a @@ -2190,8 +2193,9 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins, considered the hash prefix). - tuf.FormatError, if the arguments are improperly formatted, - 'number_of_bins' is not a power of 2, or one of the targets + tuf.FormatError, if the arguments are improperly formatted. + + tuf.Error, if 'number_of_bins' is not a power of 2, or one of the targets in 'list_of_targets' is not located under the repository's targets directory. @@ -2227,7 +2231,7 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins, # (total_hash_prefixes / number_of_bins) hash prefixes. if total_hash_prefixes % number_of_bins != 0: message = 'The "number_of_bins" argument must be a power of 2.' - raise tuf.FormatError(message) + raise tuf.Error(message) logger.info('Creating hashed bin delegations.') logger.info(repr(len(list_of_targets)) + ' total targets.') @@ -2248,7 +2252,7 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins, if not target_path.startswith(self._targets_directory+os.sep): message = 'A path in the list of targets argument is not '+\ 'under the repository\'s targets directory: '+repr(target_path) - raise tuf.FormatError(message) + raise tuf.Error(message) # Determine the hash prefix of 'target_path' by computing the digest of # its path relative to the targets directory. Example: @@ -2767,29 +2771,32 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata, # Strip the digest if 'consistent_snapshot' is True. # Example: 'targets/unclaimed/13df98ab0.django.json' --> - # 'targets/unclaimed/django.json' - metadata_name, embeded_digest = \ - _strip_consistent_snapshot_digest(metadata_name, consistent_snapshot) - + # 'targets/unclaimed/django.json'. Consistent and non-consistent + # metadata might co-exist if write() and write(consistent_snapshot=True) + # are mixed, so ensure only 'digest.filename' metadata is stripped. + embeded_digest = None + if metadata_name not in snapshot_metadata['meta']: + metadata_name, embeded_digest = \ + _strip_consistent_snapshot_digest(metadata_name, consistent_snapshot) + # Strip filename extensions. The role database does not include the # metadata extension. + metadata_name_extension = metadata_name for metadata_extension in METADATA_EXTENSIONS: if metadata_name.endswith(metadata_extension): - metadata_name_without_extension = \ - metadata_name[:-len(metadata_extension)] + metadata_name = metadata_name[:-len(metadata_extension)] # Delete the metadata file if it does not exist in 'tuf.roledb'. - # repository_tool.py might have marked 'metadata_name' as removed, but its - # metadata file is not actually deleted yet. Do it now. - if not tuf.roledb.role_exists(metadata_name_without_extension): + # 'repository_tool.py' might have marked 'metadata_name' as removed, but + # its metadata file is not actually deleted yet. Do it now. + if not tuf.roledb.role_exists(metadata_name): logger.info('Removing outdated metadata: ' + repr(metadata_path)) os.remove(metadata_path) # Delete outdated consistent snapshots. snapshot metadata includes # the file extension of roles. - if consistent_snapshot: - #metadata_name_extension = metadata_name + METADATA_EXTENSION - file_hashes = snapshot_metadata['meta'][metadata_name] \ + if consistent_snapshot and embeded_digest is not None: + file_hashes = snapshot_metadata['meta'][metadata_name_extension] \ ['hashes'].values() if embeded_digest not in file_hashes: logger.info('Removing outdated metadata: ' + repr(metadata_path)) @@ -2956,9 +2963,11 @@ def load_repository(repository_directory): tuf.FormatError, if 'repository_directory' or any of the metadata files - are improperly formatted. Also raised if, at a minimum, the Root role - cannot be found. + are improperly formatted. + tuf.RepositoryError, if the Root role cannot be found. At a minimum, + a repository must contain 'root.json' + All the metadata files found in the repository are loaded and their contents stored in a repository_tool.Repository object. @@ -3379,6 +3388,8 @@ def import_rsa_privatekey_from_file(filepath, password=None): tuf.FormatError, if the arguments are improperly formatted. + tuf.CryptoError, if 'filepath' is not a valid encrypted key file. + The contents of 'filepath' is read, decrypted, and the key stored. @@ -3408,7 +3419,8 @@ def import_rsa_privatekey_from_file(filepath, password=None): with open(filepath, 'rb') as file_object: encrypted_pem = file_object.read() - # Convert 'encrypted_pem' to 'tuf.formats.RSAKEY_SCHEMA' format. + # Convert 'encrypted_pem' to 'tuf.formats.RSAKEY_SCHEMA' format. Raise + # 'tuf.CryptoError' if 'encrypted_pem' is invalid. rsa_key = tuf.keys.import_rsakey_from_encrypted_pem(encrypted_pem, password) return rsa_key @@ -3691,13 +3703,13 @@ def get_metadata_filenames(metadata_directory=None): If 'metadata_directory' is set to 'metadata', the dictionary returned would contain: - filenames = {'root': 'metadata/root.json', - 'targets': 'metadata/targets.json', - 'snapshot': 'metadata/snapshot.json', - 'timestamp': 'metadata/timestamp.json'} + filenames = {'root.json': 'metadata/root.json', + 'targets.json': 'metadata/targets.json', + 'snapshot.json': 'metadata/snapshot.json', + 'timestamp.json': 'metadata/timestamp.json'} - If the metadata directory is not set by the caller, the current - directory is used. + If 'metadata_directory' is not set by the caller, the current directory is + used. metadata_directory: @@ -3714,19 +3726,15 @@ def get_metadata_filenames(metadata_directory=None): metadata files, such as 'root.json' and 'snapshot.json'. """ + if metadata_directory is None: + metadata_directory = os.getcwd() + # Does 'metadata_directory' have the correct format? # Ensure the arguments have the appropriate number of objects and object # types, and that all dict keys are properly named. # Raise 'tuf.FormatError' if there is a mismatch. tuf.formats.PATH_SCHEMA.check_match(metadata_directory) - if metadata_directory is None: - metadata_directory = '.' - - # Does 'metadata_directory' have the correct format? - # Raise 'tuf.FormatError' if there is a mismatch. - tuf.formats.PATH_SCHEMA.check_match(metadata_directory) - # Store the filepaths of the top-level roles, including the # 'metadata_directory' for each one. filenames = {} @@ -3749,13 +3757,14 @@ def get_metadata_filenames(metadata_directory=None): -def get_metadata_file_info(filename): +def get_metadata_fileinfo(filename): """ Retrieve the file information of 'filename'. The object returned conforms to 'tuf.formats.FILEINFO_SCHEMA'. The information generated for 'filename' is stored in metadata files like 'targets.json'. The fileinfo object returned has the form: + fileinfo = {'length': 1024, 'hashes': {'sha256': 1233dfba312, ...}, 'custom': {...}} @@ -3776,7 +3785,7 @@ def get_metadata_file_info(filename): A dictionary conformant to 'tuf.formats.FILEINFO_SCHEMA'. This dictionary contains the length, hashes, and custom data about the - 'filename' metadata file. + 'filename' metadata file. SHA256 hashes are generated by default. """ # Does 'filename' have the correct format? @@ -3831,30 +3840,7 @@ def get_target_hash(target_filepath): The hash of 'target_filepath'. """ - # Does 'target_filepath' have the correct format? - # Ensure the arguments have the appropriate number of objects and object - # types, and that all dict keys are properly named. - # Raise 'tuf.FormatError' if there is a mismatch. - tuf.formats.RELPATH_SCHEMA.check_match(target_filepath) - - # Calculate the hash of the filepath to determine which bin to find the - # target. The client currently assumes the repository uses - # 'HASH_FUNCTION' to generate hashes. - digest_object = tuf.hash.digest(HASH_FUNCTION) - - try: - digest_object.update(target_filepath) - - except UnicodeEncodeError: - # Sometimes, there are Unicode characters in target paths. We assume a - # UTF-8 encoding and try to hash that. - digest_object = tuf.hash.digest(HASH_FUNCTION) - encoded_target_filepath = target_filepath.encode('utf-8') - digest_object.update(encoded_target_filepath) - - target_filepath_hash = digest_object.hexdigest() - - return target_filepath_hash + return tuf.util.get_target_hash(target_filepath) @@ -3878,13 +3864,16 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot): 'tuf.formats.TIME_SCHEMA'. consistent_snapshot: + Boolean. If True, a file digest is expected to be prepended to the + filename of any target file located in the targets directory. Each digest + is stripped from the target filename and listed in the snapshot metadata. tuf.FormatError, if the generated root metadata object could not be generated with the correct format. tuf.Error, if an error is encountered while generating the root - metadata object. + metadata object (e.g., a required top-level role not found in 'tuf.roledb'.) The contents of 'tuf.keydb.py' and 'tuf.roledb.py' are read. @@ -4056,7 +4045,7 @@ def generate_targets_metadata(targets_directory, target_files, version, 'targets metadata.' raise tuf.Error(message) - filedict[relative_targetpath] = get_metadata_file_info(target_path) + filedict[relative_targetpath] = get_metadata_fileinfo(target_path) if write_consistent_targets: for target_digest in filedict[relative_targetpath]['hashes'].values(): @@ -4082,7 +4071,7 @@ def generate_targets_metadata(targets_directory, target_files, version, def generate_snapshot_metadata(metadata_directory, version, expiration_date, root_filename, targets_filename, - consistent_snapshot): + consistent_snapshot=False): """ Create the snapshot metadata. The minimum metadata must exist @@ -4118,7 +4107,7 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date, is stripped from the target filename and listed in the snapshot metadata. - tuf.FormatError, if 'metadata_directory' is improperly formatted. + tuf.FormatError, if the arguments are improperly formatted. tuf.Error, if an error occurred trying to generate the snapshot metadata object. @@ -4139,14 +4128,15 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date, tuf.formats.TIME_SCHEMA.check_match(expiration_date) tuf.formats.PATH_SCHEMA.check_match(root_filename) tuf.formats.PATH_SCHEMA.check_match(targets_filename) + tuf.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot) metadata_directory = _check_directory(metadata_directory) # Retrieve the fileinfo of 'root.json' and 'targets.json'. This file # information includes data such as file length, hashes of the file, etc. filedict = {} - filedict[ROOT_FILENAME] = get_metadata_file_info(root_filename) - filedict[TARGETS_FILENAME] = get_metadata_file_info(targets_filename) + filedict[ROOT_FILENAME] = get_metadata_fileinfo(root_filename) + filedict[TARGETS_FILENAME] = get_metadata_fileinfo(targets_filename) # Add compressed versions of the 'targets.json' and 'root.json' metadata, # if they exist. @@ -4158,10 +4148,10 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date, # add their file attributes to 'filedict'. if os.path.exists(compressed_root_filename): filedict[ROOT_FILENAME+extension] = \ - get_metadata_file_info(compressed_root_filename) + get_metadata_fileinfo(compressed_root_filename) if os.path.exists(compressed_targets_filename): filedict[TARGETS_FILENAME+extension] = \ - get_metadata_file_info(compressed_targets_filename) + get_metadata_fileinfo(compressed_targets_filename) # Walk the 'targets/' directory and generate the fileinfo of all the role # files found. This information is stored in the 'meta' field of the snapshot @@ -4191,7 +4181,7 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date, # Obsolete role files may still be found. Ensure only roles loaded # in the roledb are included in the snapshot metadata. if tuf.roledb.role_exists(rolename): - filedict[metadata_name] = get_metadata_file_info(metadata_path) + filedict[metadata_name] = get_metadata_fileinfo(metadata_path) # Generate the snapshot metadata object. snapshot_metadata = tuf.formats.SnapshotFile.make_metadata(version, @@ -4254,7 +4244,7 @@ def generate_timestamp_metadata(snapshot_filename, version, # Retrieve the fileinfo of the snapshot metadata file. # This file information contains hashes, file length, custom data, etc. fileinfo = {} - fileinfo[SNAPSHOT_FILENAME] = get_metadata_file_info(snapshot_filename) + fileinfo[SNAPSHOT_FILENAME] = get_metadata_fileinfo(snapshot_filename) # Save the fileinfo of the compressed versions of 'timestamp.json' # in 'fileinfo'. Log the files included in 'fileinfo'. @@ -4264,14 +4254,14 @@ def generate_timestamp_metadata(snapshot_filename, version, compressed_filename = snapshot_filename + '.' + file_extension try: - compressed_fileinfo = get_metadata_file_info(compressed_filename) + compressed_fileinfo = get_metadata_fileinfo(compressed_filename) except: - logger.warn('Cannot get fileinfo about '+str(compressed_filename)) + logger.warn('Cannot get fileinfo about '+repr(compressed_filename)) else: - logger.info('Including fileinfo about '+str(compressed_filename)) - fileinfo[SNAPSHOT_FILENAME+'.'+file_extension] = compressed_fileinfo + logger.info('Including fileinfo about '+repr(compressed_filename)) + fileinfo[SNAPSHOT_FILENAME + '.' + file_extension] = compressed_fileinfo # Generate the timestamp metadata object. timestamp_metadata = tuf.formats.TimestampFile.make_metadata(version, diff --git a/tuf/util.py b/tuf/util.py index a825175f..7c93668d 100755 --- a/tuf/util.py +++ b/tuf/util.py @@ -879,6 +879,8 @@ def load_json_file(filepath): tuf.FormatError: If 'filepath' is improperly formatted. + tuf.Error: If 'filepath' cannot be deserialized to a Python object. + IOError in case of runtime IO exceptions. @@ -892,6 +894,8 @@ def load_json_file(filepath): # tuf.FormatError is raised on incorrect format. tuf.formats.PATH_SCHEMA.check_match(filepath) + deserialized_object = None + # The file is mostly likely gzipped. if filepath.endswith('.gz'): logger.debug('gzip.open('+str(filepath)+')') @@ -902,7 +906,14 @@ def load_json_file(filepath): fileobject = open(filepath) try: - return json.load(fileobject) + deserialized_object = json.load(fileobject) + + except ValueError, TypeError: + message = 'Cannot deserialize to a Python object: '+repr(filepath) + raise tuf.Error(message) + + else: + return deserialized_object finally: fileobject.close()