diff --git a/MANIFEST.in b/MANIFEST.in index f015228a..db697212 100644 --- a/MANIFEST.in +++ b/MANIFEST.in @@ -15,6 +15,7 @@ include tuf/_vendor/ed25519/LICENSE recursive-include docs *.txt recursive-include docs/papers *.pdf recursive-include docs/images *.png +recursive-include tuf/scripts *.py recursive-include examples * recursive-include tests *.py recursive-include tests *.pem diff --git a/setup.py b/setup.py index c53879df..f3af28a2 100755 --- a/setup.py +++ b/setup.py @@ -112,6 +112,7 @@ packages = find_packages(exclude=['tests']), extras_require = extras, scripts = [ - 'tuf/client/basic_client.py' + 'tuf/scripts/basic_client.py', + 'tuf/scripts/tuf.py' ] ) diff --git a/tests/test_interpose_updater.py b/tests/test_interpose_updater.py index 6aa6b4bb..b2c489b1 100755 --- a/tests/test_interpose_updater.py +++ b/tests/test_interpose_updater.py @@ -452,8 +452,10 @@ def test_open(self): self.assertRaises(AttributeError, myUpdater.open, 8) - url = 'http://localhost:8001/targets/file1.txt' - myUpdater.open(url, 'interposition.json') + url = 'http://localhost:8001/targets/file1.txt' + interposition_file = \ + os.path.join(self.temporary_directory, 'interposition.json') + myUpdater.open(url, interposition_file) def test_retrieve(self): @@ -462,7 +464,9 @@ def test_retrieve(self): self.assertRaises(AttributeError, myUpdater.retrieve, 8) test_source_url = 'http://localhost:8001/targets/file1.txt' - myUpdater.retrieve(test_source_url, 'interposition.json') + interposition_file = \ + os.path.join(self.temporary_directory, 'interposition.json') + myUpdater.retrieve(test_source_url, interposition_file) #self.assertRaises(tuf.NoWorkingMirrorError, myUpdater.retrieve, test_source_url) diff --git a/tests/test_keys.py b/tests/test_keys.py index f17763b2..a59f79f6 100755 --- a/tests/test_keys.py +++ b/tests/test_keys.py @@ -51,25 +51,31 @@ def setUpClass(cls): def test_generate_rsa_key(self): - _rsakey_dict = KEYS.generate_rsa_key() + default_rsa_library = KEYS._RSA_CRYPTO_LIBRARY + for rsa_crypto_library in ['pycrypto', 'pyca-cryptography']: + KEYS._RSA_CRYPTO_LIBRARY = rsa_crypto_library + + _rsakey_dict = KEYS.generate_rsa_key() - # Check if the format of the object returned by generate() corresponds - # to RSAKEY_SCHEMA format. - self.assertEqual(None, tuf.formats.RSAKEY_SCHEMA.check_match(_rsakey_dict), - FORMAT_ERROR_MSG) + # Check if the format of the object returned by generate() corresponds + # to RSAKEY_SCHEMA format. + self.assertEqual(None, tuf.formats.RSAKEY_SCHEMA.check_match(_rsakey_dict), + FORMAT_ERROR_MSG) - # Passing a bit value that is <2048 to generate() - should raise - # 'tuf.FormatError'. - self.assertRaises(tuf.FormatError, KEYS.generate_rsa_key, 555) + # Passing a bit value that is <2048 to generate() - should raise + # 'tuf.FormatError'. + self.assertRaises(tuf.FormatError, KEYS.generate_rsa_key, 555) - # Passing a string instead of integer for a bit value. - self.assertRaises(tuf.FormatError, KEYS.generate_rsa_key, 'bits') + # Passing a string instead of integer for a bit value. + self.assertRaises(tuf.FormatError, KEYS.generate_rsa_key, 'bits') - # NOTE if random bit value >=2048 (not 4096) is passed generate(bits) - # does not raise any errors and returns a valid key. - self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(KEYS.generate_rsa_key(2048))) - self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(KEYS.generate_rsa_key(4096))) + # NOTE if random bit value >=2048 (not 4096) is passed generate(bits) + # does not raise any errors and returns a valid key. + self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(KEYS.generate_rsa_key(2048))) + self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(KEYS.generate_rsa_key(4096))) + # Reset to originally set RSA crypto library. + KEYS._RSA_CRYPTO_LIBRARY = default_rsa_library def test_format_keyval_to_metadata(self): @@ -176,122 +182,149 @@ def test_helper_get_keyid(self): def test_create_signature(self): - # Creating a signature for 'DATA'. - rsa_signature = KEYS.create_signature(self.rsakey_dict, DATA) - ed25519_signature = KEYS.create_signature(self.ed25519key_dict, DATA) + default_rsa_library = KEYS._RSA_CRYPTO_LIBRARY + for rsa_crypto_library in ['pycrypto', 'pyca-cryptography']: + KEYS._RSA_CRYPTO_LIBRARY = rsa_crypto_library - # Check format of output. - self.assertEqual(None, - tuf.formats.SIGNATURE_SCHEMA.check_match(rsa_signature), - FORMAT_ERROR_MSG) - self.assertEqual(None, - tuf.formats.SIGNATURE_SCHEMA.check_match(ed25519_signature), - FORMAT_ERROR_MSG) + # Creating a signature for 'DATA'. + rsa_signature = KEYS.create_signature(self.rsakey_dict, DATA) + ed25519_signature = KEYS.create_signature(self.ed25519key_dict, DATA) + + # Check format of output. + self.assertEqual(None, + tuf.formats.SIGNATURE_SCHEMA.check_match(rsa_signature), + FORMAT_ERROR_MSG) + self.assertEqual(None, + tuf.formats.SIGNATURE_SCHEMA.check_match(ed25519_signature), + FORMAT_ERROR_MSG) - # Removing private key from 'rsakey_dict' - should raise a TypeError. - private = self.rsakey_dict['keyval']['private'] - self.rsakey_dict['keyval']['private'] = '' - - args = (self.rsakey_dict, DATA) - self.assertRaises(TypeError, KEYS.create_signature, *args) + # Removing private key from 'rsakey_dict' - should raise a TypeError. + private = self.rsakey_dict['keyval']['private'] + self.rsakey_dict['keyval']['private'] = '' + + args = (self.rsakey_dict, DATA) + self.assertRaises(ValueError, KEYS.create_signature, *args) - # Supplying an incorrect number of arguments. - self.assertRaises(TypeError, KEYS.create_signature) - self.rsakey_dict['keyval']['private'] = private + # Supplying an incorrect number of arguments. + self.assertRaises(TypeError, KEYS.create_signature) + self.rsakey_dict['keyval']['private'] = private + KEYS._RSA_CRYPTO_LIBRARY = default_rsa_library def test_verify_signature(self): - # Creating a signature of 'DATA' to be verified. - rsa_signature = KEYS.create_signature(self.rsakey_dict, DATA) - ed25519_signature = KEYS.create_signature(self.ed25519key_dict, DATA) - - # Verifying the 'signature' of 'DATA'. - verified = KEYS.verify_signature(self.rsakey_dict, rsa_signature, DATA) - self.assertTrue(verified, "Incorrect signature.") + default_rsa_library = KEYS._RSA_CRYPTO_LIBRARY + default_available_libraries = KEYS._available_crypto_libraries + for rsa_crypto_library in ['pycrypto', 'pyca-cryptography']: + KEYS._RSA_CRYPTO_LIBRARY = rsa_crypto_library - # Verifying the 'ed25519_signature' of 'DATA'. - verified = KEYS.verify_signature(self.ed25519key_dict, ed25519_signature, DATA) - self.assertTrue(verified, "Incorrect signature.") + # Creating a signature of 'DATA' to be verified. + rsa_signature = KEYS.create_signature(self.rsakey_dict, DATA) + ed25519_signature = KEYS.create_signature(self.ed25519key_dict, DATA) - # Testing an invalid 'rsa_signature'. Same 'rsa_signature' is passed, with - # 'DATA' different than the original 'DATA' that was used - # in creating the 'rsa_signature'. Function should return 'False'. + # Verifying the 'signature' of 'DATA'. + verified = KEYS.verify_signature(self.rsakey_dict, rsa_signature, DATA) + self.assertTrue(verified, "Incorrect signature.") + + # Verifying the 'ed25519_signature' of 'DATA'. + verified = KEYS.verify_signature(self.ed25519key_dict, ed25519_signature, DATA) + self.assertTrue(verified, "Incorrect signature.") + + # Testing an invalid 'rsa_signature'. Same 'rsa_signature' is passed, with + # 'DATA' different than the original 'DATA' that was used + # in creating the 'rsa_signature'. Function should return 'False'. + + # Modifying 'DATA'. + _DATA = '1111' + DATA + '1111' - # Modifying 'DATA'. - _DATA = '1111' + DATA + '1111' - - # Verifying the 'signature' of modified '_DATA'. - verified = KEYS.verify_signature(self.rsakey_dict, rsa_signature, _DATA) - self.assertFalse(verified, - 'Returned \'True\' on an incorrect signature.') + # Verifying the 'signature' of modified '_DATA'. + verified = KEYS.verify_signature(self.rsakey_dict, rsa_signature, _DATA) + self.assertFalse(verified, + 'Returned \'True\' on an incorrect signature.') - # Modifying 'signature' to pass an incorrect method since only - # 'PyCrypto-PKCS#1 PSS' is accepted. - rsa_signature['method'] = 'Biff' + # Modifying 'signature' to pass an incorrect method since only + # 'PyCrypto-PKCS#1 PSS' is accepted. + rsa_signature['method'] = 'Biff' - args = (self.rsakey_dict, rsa_signature, DATA) - self.assertRaises(tuf.UnknownMethodError, KEYS.verify_signature, *args) + args = (self.rsakey_dict, rsa_signature, DATA) + self.assertRaises(tuf.UnknownMethodError, KEYS.verify_signature, *args) - # Passing incorrect number of arguments. - self.assertRaises(TypeError, KEYS.verify_signature) - - # Verify that the pure python 'ed25519' base case (triggered if 'pynacl' is - # unavailable) is executed in tuf.keys.verify_signature(). - KEYS._ED25519_CRYPTO_LIBRARY = 'invalid' - KEYS._available_crypto_libraries = ['invalid'] - verified = KEYS.verify_signature(self.ed25519key_dict, ed25519_signature, DATA) - self.assertTrue(verified, "Incorrect signature.") + # Passing incorrect number of arguments. + self.assertRaises(TypeError, KEYS.verify_signature) - # Reset to the expected available crypto libraries. - KEYS._ED25519_CRYPTO_LIBRARY = 'pynacl' - KEYS._available_crypto_libraries = ['ed25519', 'pycrypto', 'pynacl'] + # Verify that the pure python 'ed25519' base case (triggered if 'pynacl' is + # unavailable) is executed in tuf.keys.verify_signature(). + KEYS._ED25519_CRYPTO_LIBRARY = 'invalid' + KEYS._available_crypto_libraries = ['invalid'] + verified = KEYS.verify_signature(self.ed25519key_dict, ed25519_signature, DATA) + self.assertTrue(verified, "Incorrect signature.") + + # Reset to the expected available crypto libraries. + KEYS._ED25519_CRYPTO_LIBRARY = 'pynacl' + KEYS._available_crypto_libraries = default_available_libraries + KEYS._RSA_CRYPTO_LIBRARY = default_rsa_library + def test_create_rsa_encrypted_pem(self): - # Test valid arguments. - private = self.rsakey_dict['keyval']['private'] - passphrase = 'secret' - encrypted_pem = KEYS.create_rsa_encrypted_pem(private, passphrase) - self.assertTrue(tuf.formats.PEMRSA_SCHEMA.matches(encrypted_pem)) - - # Test improperly formatted arguments. - self.assertRaises(tuf.FormatError, KEYS.create_rsa_encrypted_pem, - 8, passphrase) + default_rsa_library = KEYS._RSA_CRYPTO_LIBRARY + for rsa_crypto_library in ['pycrypto', 'pyca-cryptography']: + KEYS._RSA_CRYPTO_LIBRARY = rsa_crypto_library - self.assertRaises(tuf.FormatError, KEYS.create_rsa_encrypted_pem, - private, 8) + # Test valid arguments. + private = self.rsakey_dict['keyval']['private'] + passphrase = 'secret' + encrypted_pem = KEYS.create_rsa_encrypted_pem(private, passphrase) + self.assertTrue(tuf.formats.PEMRSA_SCHEMA.matches(encrypted_pem)) - # Test for missing required library. - KEYS._RSA_CRYPTO_LIBRARY = 'invalid' - self.assertRaises(tuf.UnsupportedLibraryError, KEYS.create_rsa_encrypted_pem, - private, passphrase) - KEYS._RSA_CRYPTO_LIBRARY = 'pycrypto' - + # Try to import the encryped PEM file. + rsakey = KEYS.import_rsakey_from_encrypted_pem(encrypted_pem, passphrase) + self.assertTrue(tuf.formats.RSAKEY_SCHEMA.matches(rsakey)) + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, KEYS.create_rsa_encrypted_pem, + 8, passphrase) + + self.assertRaises(tuf.FormatError, KEYS.create_rsa_encrypted_pem, + private, 8) + + # Test for missing required library. + KEYS._RSA_CRYPTO_LIBRARY = 'invalid' + self.assertRaises(tuf.UnsupportedLibraryError, KEYS.create_rsa_encrypted_pem, + private, passphrase) + KEYS._RSA_CRYPTO_LIBRARY = 'pycrypto' + KEYS._RSA_CRYPTO_LIBRARY = default_rsa_library + + def test_decrypt_key(self): - # Test valid arguments. - passphrase = 'secret' - encrypted_key = KEYS.encrypt_key(self.rsakey_dict, passphrase).encode('utf-8') - decrypted_key = KEYS.decrypt_key(encrypted_key, passphrase) - - self.assertTrue(tuf.formats.ANYKEY_SCHEMA.matches(decrypted_key)) + default_general_library = KEYS._GENERAL_CRYPTO_LIBRARY + for general_crypto_library in ['pycrypto', 'pyca-cryptography']: + KEYS._GENERAL_CRYPTO_LIBRARY = general_crypto_library - # Test improperly formatted arguments. - self.assertRaises(tuf.FormatError, KEYS.decrypt_key, - 8, passphrase) - - self.assertRaises(tuf.FormatError, KEYS.decrypt_key, - encrypted_key, 8) + # Test valid arguments. + passphrase = 'secret' + encrypted_key = KEYS.encrypt_key(self.rsakey_dict, passphrase).encode('utf-8') + decrypted_key = KEYS.decrypt_key(encrypted_key, passphrase) - # Test for missing required library. - KEYS._GENERAL_CRYPTO_LIBRARY = 'invalid' - self.assertRaises(tuf.UnsupportedLibraryError, KEYS.decrypt_key, - encrypted_key, passphrase) - KEYS._GENERAL_CRYPTO_LIBRARY = 'pycrypto' + self.assertTrue(tuf.formats.ANYKEY_SCHEMA.matches(decrypted_key)) + + # Test improperly formatted arguments. + self.assertRaises(tuf.FormatError, KEYS.decrypt_key, + 8, passphrase) + + self.assertRaises(tuf.FormatError, KEYS.decrypt_key, + encrypted_key, 8) + # Test for missing required library. + KEYS._GENERAL_CRYPTO_LIBRARY = 'invalid' + self.assertRaises(tuf.UnsupportedLibraryError, KEYS.decrypt_key, + encrypted_key, passphrase) + KEYS._GENERAL_CRYPTO_LIBRARY = 'pycrypto' + + KEYS._GENERAL_CRYPTO_LIBRARY = default_general_library # Run the unit tests. diff --git a/tests/test_pyca_crypto_keys.py b/tests/test_pyca_crypto_keys.py index 52e67034..790e0213 100755 --- a/tests/test_pyca_crypto_keys.py +++ b/tests/test_pyca_crypto_keys.py @@ -137,6 +137,21 @@ def test_verify_rsa_signature(self): self.assertEqual(False, crypto_keys.verify_rsa_signature(mismatched_signature, method, public_rsa, data)) + def test__decrypt(self): + # Verify that invalid encrypted file is detected. + self.assertRaises(tuf.CryptoError, crypto_keys._decrypt, + 'bad encrypted file', 'password') + + + + def test_encrypt_key(self): + # Verify that a key argument with a missing private key is rejected. + global public_rsa + + self.assertRaises(tuf.FormatError, crypto_keys.encrypt_key, + public_rsa, 'password') + + # Run the unit tests. if __name__ == '__main__': diff --git a/tests/test_pycrypto_keys.py b/tests/test_pycrypto_keys.py index 8d1f6c36..9c653105 100755 --- a/tests/test_pycrypto_keys.py +++ b/tests/test_pycrypto_keys.py @@ -81,7 +81,7 @@ def test_create_rsa_signature(self): self.assertRaises(tuf.FormatError, pycrypto.create_rsa_signature, 123, data) - self.assertRaises(TypeError, + self.assertRaises(ValueError, pycrypto.create_rsa_signature, '', data) # Check for invalid 'data'. diff --git a/tests/test_repository_lib.py b/tests/test_repository_lib.py index b4109683..4d798248 100755 --- a/tests/test_repository_lib.py +++ b/tests/test_repository_lib.py @@ -436,6 +436,19 @@ def test_generate_root_metadata(self): consistent_snapshot=False) self.assertTrue(tuf.formats.ROOT_SCHEMA.matches(root_metadata)) + root_keyids = tuf.roledb.get_role_keyids('root') + tuf.keydb._keydb_dict['default'][root_keyids[0]]['keytype'] = 'bad_keytype' + self.assertRaises(tuf.Error, repo_lib.generate_root_metadata, 1, + expires, consistent_snapshot=False) + + # Reset the root key's keytype, so that we can next verify that a different + # tuf.Error exception is raised for duplicate keyids. + tuf.keydb._keydb_dict['default'][root_keyids[0]]['keytype'] = 'rsa' + + # Add duplicate keyid to root's roleinfo. + tuf.roledb._roledb_dict['default']['root']['keyids'].append(root_keyids[0]) + self.assertRaises(tuf.Error, repo_lib.generate_root_metadata, 1, + expires, consistent_snapshot=False) # Test improperly formatted arguments. self.assertRaises(tuf.FormatError, repo_lib.generate_root_metadata, @@ -519,6 +532,12 @@ def test_generate_targets_metadata(self): self.assertTrue(len(list_targets_directory) + 1, len(new_list_targets_directory)) + # Verify that an exception is not raised if the target files already exist. + repo_lib.generate_targets_metadata(targets_directory, target_files, + version, expiration_date, delegations, + write_consistent_targets=True) + + # Verify that 'targets_metadata' contains a 'custom' entry (optional) # for 'file.txt'. self.assertTrue('custom' in targets_metadata['targets']['file.txt']) @@ -658,20 +677,22 @@ def test_sign_metadata(self): 'keystore') root_filename = os.path.join(metadata_path, 'root.json') root_metadata = tuf.util.load_json_file(root_filename)['signed'] + targets_filename = os.path.join(metadata_path, 'targets.json') + targets_metadata = tuf.util.load_json_file(targets_filename)['signed'] tuf.keydb.create_keydb_from_root_metadata(root_metadata) tuf.roledb.create_roledb_from_root_metadata(root_metadata) root_keyids = tuf.roledb.get_role_keyids('root') + targets_keyids = tuf.roledb.get_role_keyids('targets') root_private_keypath = os.path.join(keystore_path, 'root_key') root_private_key = \ repo_lib.import_rsa_privatekey_from_file(root_private_keypath, 'password') # Sign with a valid, but not a threshold, key. - targets_private_keypath = os.path.join(keystore_path, 'targets_key') - targets_private_key = \ - repo_lib.import_ed25519_privatekey_from_file(targets_private_keypath, - 'password') + targets_public_keypath = os.path.join(keystore_path, 'targets_key.pub') + targets_public_key = \ + repo_lib.import_ed25519_publickey_from_file(targets_public_keypath) # sign_metadata() expects the private key 'root_metadata' to be in # 'tuf.keydb'. Remove any public keys that may be loaded before @@ -679,17 +700,23 @@ def test_sign_metadata(self): # raised. tuf.keydb.remove_key(root_private_key['keyid']) tuf.keydb.add_key(root_private_key) - tuf.keydb.remove_key(targets_private_key['keyid']) - tuf.keydb.add_key(targets_private_key) + tuf.keydb.remove_key(targets_public_key['keyid']) + tuf.keydb.add_key(targets_public_key) - root_keyids.extend(tuf.roledb.get_role_keyids('targets')) - # Add the snapshot's public key (to test whether non-private keys are - # ignored by sign_metadata()). - root_keyids.extend(tuf.roledb.get_role_keyids('snapshot')) + # Verify that a valid root signable is generated. root_signable = repo_lib.sign_metadata(root_metadata, root_keyids, root_filename) self.assertTrue(tuf.formats.SIGNABLE_SCHEMA.matches(root_signable)) + # Test for an unset private key (in this case, target's). + repo_lib.sign_metadata(targets_metadata, targets_keyids, + targets_filename) + + # Add an invalid keytype to one of the root keys. + root_keyid = root_keyids[0] + tuf.keydb._keydb_dict['default'][root_keyid]['keytype'] = 'bad_keytype' + self.assertRaises(tuf.Error, repo_lib.sign_metadata, root_metadata, + root_keyids, root_filename) # Test improperly formatted arguments. self.assertRaises(tuf.FormatError, repo_lib.sign_metadata, 3, root_keyids, @@ -720,7 +747,20 @@ def test_write_metadata_file(self): consistent_snapshot=False) self.assertTrue(os.path.exists(output_filename)) self.assertTrue(os.path.exists(output_filename + '.gz')) + + # Attempt to over-write the previously written metadata file. An exception + # is not raised in this case, only a debug message is logged. + repo_lib.write_metadata_file(root_signable, output_filename, + version_number, + compression_algorithms, + consistent_snapshot=False) + # Test unknown compression algorithm. + self.assertRaises(tuf.FormatError, repo_lib.write_metadata_file, + root_signable, output_filename, + version_number, + compression_algorithms=['bad_algo'], + consistent_snapshot=False) # Test improperly formatted arguments. self.assertRaises(tuf.FormatError, repo_lib.write_metadata_file, @@ -735,8 +775,40 @@ def test_write_metadata_file(self): self.assertRaises(tuf.FormatError, repo_lib.write_metadata_file, root_signable, output_filename, version_number, compression_algorithms, 3) + + def test__write_compressed_metadata(self): + # Test for invalid 'compressed_filename' argument and set + # 'write_new_metadata' to False. + file_object = tuf.util.TempFile() + existing_filename = os.path.join('repository_data', 'repository', + 'metadata', 'root.json') + + write_new_metadata = False + repo_lib._write_compressed_metadata(file_object, + compressed_filename=existing_filename, + write_new_metadata=write_new_metadata, + consistent_snapshot=False, + version_number=8) + + # Test writing of compressed metadata when consistent snapshots is enabled. + file_object = tuf.util.TempFile() + shutil.copy(existing_filename, os.path.join(self.temporary_directory, '8.root.json.gz')) + shutil.copy(existing_filename, os.path.join(self.temporary_directory, '8.root.json.zip')) + shutil.copy(existing_filename, os.path.join(self.temporary_directory, 'root.json.zip')) + compressed_filename = os.path.join(self.temporary_directory, 'root.json.gz') + + # For testing purposes, add additional compression algorithms to + # repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS. + repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS = ['gz', 'zip', 'bz2'] + repo_lib._write_compressed_metadata(file_object, + compressed_filename=compressed_filename, + write_new_metadata=True, + consistent_snapshot=True, + version_number=8) + repo_lib.SUPPORTED_COMPRESSION_EXTENSIONS = ['gz'] + def test_create_tuf_client_directory(self): # Test normal case. @@ -811,7 +883,7 @@ def test__generate_and_write_metadata(self): repo_lib.METADATA_STAGED_DIRECTORY_NAME) targets_metadata = os.path.join('repository_data', 'repository', 'metadata', 'targets.json') - obsolete_metadata = os.path.join(metadata_directory, 'targets', + obsolete_metadata = os.path.join(metadata_directory, 'obsolete_role.json') tuf.util.ensure_parent_dir(obsolete_metadata) shutil.copyfile(targets_metadata, obsolete_metadata) @@ -825,18 +897,108 @@ def test__generate_and_write_metadata(self): tuf.formats.unix_timestamp_to_datetime(int(time.time() + 86400)) expiration = expiration.isoformat() + 'Z' targets_roleinfo['expires'] = expiration - tuf.roledb.add_role('targets/obsolete_role', targets_roleinfo) + tuf.roledb.add_role('obsolete_role', targets_roleinfo) + + repo_lib._generate_and_write_metadata('obsolete_role', obsolete_metadata, + True, + targets_directory, metadata_directory, + consistent_snapshot=False, + filenames=None, + compression_algorithms=['gz']) snapshot_filepath = os.path.join('repository_data', 'repository', 'metadata', 'snapshot.json') snapshot_signable = tuf.util.load_json_file(snapshot_filepath) - tuf.roledb.remove_role('targets/obsolete_role') + tuf.roledb.remove_role('obsolete_role') self.assertTrue(os.path.exists(os.path.join(metadata_directory, - 'targets/obsolete_role.json'))) + 'obsolete_role.json'))) tuf.repository_lib._delete_obsolete_metadata(metadata_directory, snapshot_signable['signed'], False) - self.assertFalse(os.path.exists(metadata_directory + 'targets/obsolete_role.json')) + self.assertFalse(os.path.exists(metadata_directory + 'obsolete_role.json')) + shutil.copyfile(targets_metadata, obsolete_metadata) + + + + def test__delete_obsolete_metadata(self): + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + repository_directory = os.path.join(temporary_directory, 'repository') + metadata_directory = os.path.join(repository_directory, + repo_lib.METADATA_STAGED_DIRECTORY_NAME) + os.makedirs(metadata_directory) + snapshot_filepath = os.path.join('repository_data', 'repository', + 'metadata', 'snapshot.json') + snapshot_signable = tuf.util.load_json_file(snapshot_filepath) + + # Create role metadata that should not exist in snapshot.json. + role1_filepath = os.path.join('repository_data', 'repository', + 'metadata', 'role1.json') + shutil.copyfile(role1_filepath, os.path.join(metadata_directory, 'role2.json')) + + repo_lib._delete_obsolete_metadata(metadata_directory, + snapshot_signable['signed'], + True) + + # Verify what happens for a non-existent metadata directory (a debug message + # is logged). + repo_lib._delete_obsolete_metadata('non-existent', + snapshot_signable['signed'], + True) + + + def test__load_top_level_metadata(self): + tuf.roledb.clear_roledb(clear_all=True) + tuf.keydb.clear_keydb(clear_all=True) + + temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory) + repository_directory = os.path.join(temporary_directory, 'repository') + metadata_directory = os.path.join(repository_directory, + repo_lib.METADATA_STAGED_DIRECTORY_NAME) + targets_directory = os.path.join(repository_directory, + repo_lib.TARGETS_DIRECTORY_NAME) + shutil.copytree(os.path.join('repository_data', 'repository', 'metadata'), + metadata_directory) + shutil.copytree(os.path.join('repository_data', 'repository', 'targets'), + targets_directory) + + # Remove compressed metadata so that we can test for loading of a + # repository with no compression enabled. + for role_file in os.listdir(metadata_directory): + if role_file.endswith('.json.gz'): + role_filename = os.path.join(metadata_directory, role_file) + os.remove(role_filename) + + filenames = repo_lib.get_metadata_filenames(metadata_directory) + repository = repo_tool.create_new_repository(repository_directory) + repo_lib._load_top_level_metadata(repository, filenames) + + # We partially loaded 'role1' via the top-level Targets role. For the + # purposes of this test case (which only loads top-level metadata and no + # delegated metadata), remove this role to avoid issues with partially + # loaded information (e.g., missing 'version' info, signatures, etc.) + tuf.roledb.remove_role('role1') + + # Partially write all top-level roles (we increase the threshold of each + # top-level role so that they are flagged as partially written. + repository.root.threshold = repository.root.threshold + 1 + repository.snapshot.threshold = repository.snapshot.threshold + 1 + repository.targets.threshold = repository.targets.threshold + 1 + repository.timestamp.threshold = repository.timestamp.threshold + 1 + repository.write(write_partial=True) + + repo_lib._load_top_level_metadata(repository, filenames) + + # Attempt to load a repository with missing top-level metadata. + for role_file in os.listdir(metadata_directory): + if role_file.endswith('.json') and not role_file.startswith('root'): + role_filename = os.path.join(metadata_directory, role_file) + os.remove(role_filename) + repo_lib._load_top_level_metadata(repository, filenames) + + # Remove the required Root file and verify that an exception is raised. + os.remove(os.path.join(metadata_directory, 'root.json')) + self.assertRaises(tuf.RepositoryError, repo_lib._load_top_level_metadata, + repository, filenames) diff --git a/tests/test_repository_tool.py b/tests/test_repository_tool.py index 59c50dde..1690057b 100755 --- a/tests/test_repository_tool.py +++ b/tests/test_repository_tool.py @@ -137,7 +137,6 @@ def test_write_and_write_partial(self): metadata_directory = os.path.join(repository_directory, repo_tool.METADATA_STAGED_DIRECTORY_NAME) repository = repo_tool.create_new_repository(repository_directory) - # (1) Load the public and private keys of the top-level roles, and one # delegated role. @@ -220,7 +219,6 @@ def test_write_and_write_partial(self): # (6) Write repository. repository.targets.compressions = ['gz'] repository.write() - # Verify that the expected metadata is written. for role in ['root.json', 'targets.json', 'snapshot.json', 'timestamp.json']: @@ -229,6 +227,8 @@ def test_write_and_write_partial(self): # Raise 'tuf.FormatError' if 'role_signable' is an invalid signable. tuf.formats.check_signable_object_format(role_signable) + + self.assertTrue(os.path.exists(role_filepath)) if role == 'targets.json': compressed_filepath = role_filepath + '.gz' @@ -242,31 +242,41 @@ def test_write_and_write_partial(self): # Verify that an exception is *not* raised for multiple repository.write(). repository.write() - # Verify the status() does not raise an exception. + # Verify that status() does not raise an exception. repository.status() - # Verify status() does not raise 'tuf.InsufficientKeysError' if a top-level - # role does and 'role1' do not contain a threshold of keys. - root_roleinfo = tuf.roledb.get_roleinfo('root') - old_threshold = root_roleinfo['threshold'] - root_roleinfo['threshold'] = 10 + # Verify that status() does not raise 'tuf.InsufficientKeysError' if a + # top-level role does not contain a threshold of keys. + targets_roleinfo = tuf.roledb.get_roleinfo('targets') + old_threshold = targets_roleinfo['threshold'] + targets_roleinfo['threshold'] = 10 + tuf.roledb.update_roleinfo('targets', targets_roleinfo) + repository.status() + + # Restore the original threshold values. + targets_roleinfo = tuf.roledb.get_roleinfo('targets') + targets_roleinfo['threshold'] = old_threshold + tuf.roledb.update_roleinfo('targets', targets_roleinfo) + + # Verify that status() does not raise 'tuf.InsufficientKeysError' if a + # delegated role does not contain a threshold of keys. role1_roleinfo = tuf.roledb.get_roleinfo('role1') old_role1_threshold = role1_roleinfo['threshold'] role1_roleinfo['threshold'] = 10 - tuf.roledb.update_roleinfo('root', root_roleinfo) tuf.roledb.update_roleinfo('role1', role1_roleinfo) repository.status() - - # Restore the original threshold values. - root_roleinfo['threshold'] = old_threshold - tuf.roledb.update_roleinfo('root', root_roleinfo) - role1_roleinfo['threshold'] = old_role1_threshold + + # Restore role1's threshold. + role1_roleinfo = tuf.roledb.get_roleinfo('role1') + role1_roleinfo['threshold'] = old_role1_threshold tuf.roledb.update_roleinfo('role1', role1_roleinfo) # Verify status() does not raise 'tuf.UnsignedMetadataError' if any of the - # the top-level roles and 'role1' are improperly signed. + # the top-level roles. Test that 'root' is improperly signed. repository.root.unload_signing_key(root_privkey) repository.root.load_signing_key(targets_privkey) + repository.status() + repository.targets('role1').unload_signing_key(role1_privkey) repository.targets('role1').load_signing_key(targets_privkey) repository.status() @@ -324,8 +334,14 @@ def test_write_and_write_partial(self): repository.root.load_signing_key(root_privkey) repository.snapshot.load_signing_key(snapshot_privkey) - # Verify that a consistent snapshot can be written and loaded. + # Verify that a consistent snapshot can be written and loaded. The + # 'targets' and 'role1' roles must be be marked as dirty, otherwise + # write() will not create consistent snapshots for them. + repository.mark_dirty(['targets', 'role1']) repository.write(consistent_snapshot=True) + + # Verify that the newly written consistent snapshot can be loaded + # successfully. repo_tool.load_repository(repository_directory) # Test improperly formatted arguments. diff --git a/tests/test_roledb.py b/tests/test_roledb.py index 06a2aa17..6149dd5d 100755 --- a/tests/test_roledb.py +++ b/tests/test_roledb.py @@ -677,7 +677,30 @@ def test_get_dirty_roles(self): # Test for improperly formatted argument. self.assertRaises(tuf.FormatError, tuf.roledb.get_dirty_roles, 123) + + + + def test_mark_dirty(self): + # Add a dirty role to roledb. + rolename = 'targets' + roleinfo1 = {'keyids': ['123'], 'threshold': 1} + tuf.roledb.add_role(rolename, roleinfo1) + rolename2 = 'dirty_role' + roleinfo2 = {'keyids': ['123'], 'threshold': 2} + mark_role_as_dirty = True + tuf.roledb.update_roleinfo(rolename, roleinfo1, mark_role_as_dirty) + # Note: The 'default' repository is searched if the repository name is + # not given to get_dirty_roles(). + self.assertEqual([rolename], tuf.roledb.get_dirty_roles()) + tuf.roledb.mark_dirty(['dirty_role']) + self.assertEqual([rolename2, rolename], sorted(tuf.roledb.get_dirty_roles())) + + # Verify that a role cannot be marked as dirty for a non-existent + # repository. + self.assertRaises(tuf.InvalidNameError, tuf.roledb.mark_dirty, + ['dirty_role'], 'non-existent') + def _test_rolename(self, test_function): diff --git a/tuf/developer_tool.py b/tuf/developer_tool.py index bf71e452..6db9f88d 100755 --- a/tuf/developer_tool.py +++ b/tuf/developer_tool.py @@ -287,11 +287,6 @@ def write(self, write_partial=False): self._targets_directory, self.keys, self._prefix, self.threshold, self.layout_type, self._project_name) - - # Delete the metadata of roles no longer in 'tuf.roledb'. Obsolete roles - # may have been revoked. - _delete_obsolete_metadata(self._metadata_directory, - project_signable['signed'], False) diff --git a/tuf/formats.py b/tuf/formats.py index 0a5b785f..9c257bd9 100755 --- a/tuf/formats.py +++ b/tuf/formats.py @@ -450,6 +450,12 @@ key_schema = RELPATH_SCHEMA, value_schema = CUSTOM_SCHEMA) +# Command argument list, as used by the CLI tool. +# Example: {'keytype': ed25519, 'expires': 365,} +COMMAND_SCHEMA = SCHEMA.DictOf( + key_schema = NAME_SCHEMA, + value_schema = SCHEMA.Any()) + # tuf.roledb ROLEDB_SCHEMA = SCHEMA.Object( object_name = 'ROLEDB_SCHEMA', @@ -986,7 +992,8 @@ def make_signable(object): """ if not isinstance(object, dict) or 'signed' not in object: - return { 'signed' : object, 'signatures' : [] } + return {'signed': object, 'signatures': []} + else: return object diff --git a/tuf/keys.py b/tuf/keys.py index 5afc5d6d..5d4c3ab4 100755 --- a/tuf/keys.py +++ b/tuf/keys.py @@ -1444,8 +1444,8 @@ def create_rsa_encrypted_pem(private_key, passphrase): # Does 'passphrase' have the correct format? tuf.formats.PASSWORD_SCHEMA.check_match(passphrase) - # Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified in - # 'tuf.conf', are unsupported or unavailable: + # Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified + # in 'tuf.conf', are unsupported or unavailable: # 'tuf.conf.GENERAL_CRYPTO_LIBRARY' and 'tuf.conf.RSA_CRYPTO_LIBRARY'. check_crypto_libraries(['rsa', 'general']) diff --git a/tuf/pycrypto_keys.py b/tuf/pycrypto_keys.py index d4d953cb..45081dc8 100755 --- a/tuf/pycrypto_keys.py +++ b/tuf/pycrypto_keys.py @@ -253,7 +253,7 @@ def create_rsa_signature(private_key, data): tuf.FormatError, if 'private_key' is improperly formatted. - TypeError, if 'private_key' is unset. + ValueError, if 'private_key' is unset. tuf.CryptoError, if the signature cannot be generated. @@ -316,7 +316,7 @@ def create_rsa_signature(private_key, data): raise tuf.CryptoError('An RSA signature cannot be generated: ' + str(e)) else: - raise TypeError('The required private key is unset.') + raise ValueError('The required private key is unset.') return signature, method diff --git a/tuf/repository_lib.py b/tuf/repository_lib.py index c0414793..ae3fa760 100755 --- a/tuf/repository_lib.py +++ b/tuf/repository_lib.py @@ -99,7 +99,7 @@ SUPPORTED_COMPRESSION_EXTENSIONS = ['.gz'] # The full list of supported TUF metadata extensions. -METADATA_EXTENSIONS = ['.json'] +METADATA_EXTENSIONS = ['.json.gz', '.json'] def _generate_and_write_metadata(rolename, metadata_filename, write_partial, @@ -209,7 +209,7 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial, filename = write_metadata_file(signable, metadata_filename, metadata['version'], compression_algorithms, consistent_snapshot) - + # The root and timestamp files should also be written without a version # number prepended if 'consistent_snaptshot' is True. Clients may request # a timestamp and root file without knowing their version numbers. @@ -341,7 +341,6 @@ def _check_role_keys(rolename): """ Non-public function that verifies the public and signing keys of 'rolename'. If either contain an invalid threshold of keys, raise an exception. - 'rolename' is the full rolename (e.g., 'targets/unclaimed/django'). """ # Extract the total number of public and private keys of 'rolename' from its @@ -354,15 +353,13 @@ def _check_role_keys(rolename): # Raise an exception for an invalid threshold of public keys. if total_keyids < threshold: - message = repr(rolename) + ' role contains ' + \ - repr(total_keyids) + ' / ' + repr(threshold) + ' public keys.' - raise tuf.InsufficientKeysError(message) + raise tuf.InsufficientKeysError(repr(rolename) + ' role contains' + ' ' + repr(total_keyids) + ' / ' + repr(threshold) + ' public keys.') # Raise an exception for an invalid threshold of signing keys. if total_signatures == 0 and total_signing_keys < threshold: - message = repr(rolename) + ' role contains ' + \ - repr(total_signing_keys) + ' / ' + repr(threshold) + ' signing keys.' - raise tuf.InsufficientKeysError(message) + raise tuf.InsufficientKeysError(repr(rolename) + ' role contains' + ' ' + repr(total_signing_keys) + ' / ' + repr(threshold) + ' signing keys.') @@ -422,48 +419,63 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata, 'repository_tool.py'. Revoked metadata files are not actually deleted until this function is called. Obsolete metadata should *not* be retained in "metadata.staged", otherwise they may be re-loaded by 'load_repository()'. + Note: Obsolete metadata may not always be easily detected (by inspecting top-level metadata during loading) due to partial metadata and top-level metadata that have not been written yet. """ - # Walk the repository's metadata 'targets' sub-directory, where all the - # metadata of delegated roles is stored. - targets_metadata = os.path.join(metadata_directory, 'targets') - - # The 'targets.json' metadata is not visited, only its child delegations. - # The 'targets/unclaimed/django.json' role would be located in the - # '{repository_directory}/metadata/targets/unclaimed/' directory. - if os.path.exists(targets_metadata) and os.path.isdir(targets_metadata): - for directory_path, junk_directories, files in os.walk(targets_metadata): + # Walk the repository's metadata sub-directory, which is where all metadata + # is stored (including delegated roles). The 'django.json' role (e.g., + # delegated by Targets) would be located in the + # '{repository_directory}/metadata/' directory. + if os.path.exists(metadata_directory) and os.path.isdir(metadata_directory): + for directory_path, junk_directories, files in os.walk(metadata_directory): # 'files' here is a list of target file names. for basename in files: metadata_path = os.path.join(directory_path, basename) # Strip the metadata dirname and the leading path separator. - # '{repository_directory}/metadata/targets/unclaimed/django.json' --> - # 'targets/unclaimed/django.json' + # '{repository_directory}/metadata/django.json' --> + # 'django.json' metadata_name = \ metadata_path[len(metadata_directory):].lstrip(os.path.sep) - + # Strip the version number if 'consistent_snapshot' is True. Example: - # '10.django.json' --> 'django.json'. Consistent and non-consistent + # '10.django.json' --> 'django.json'. Consistent and non-consistent # metadata might co-exist if write() and # write(consistent_snapshot=True) are mixed, so ensure only # '.filename' metadata is stripped. embedded_version_number = None - - if metadata_name not in snapshot_metadata['meta']: + + # Should we check if 'consistent_snapshot' is True? It might have been + # set previously, but 'consistent_snapshot' can potentially be False + # now. We'll proceed with the understanding that 'metadata_name' can + # have a prepended version number even though the repository is now + # a non-consistent one. + if metadata_name not in snapshot_metadata['meta']: metadata_name, embedded_version_number = \ _strip_version_number(metadata_name, consistent_snapshot) + + else: + logger.debug(repr(metadata_name) + ' found in the snapshot role.') # Strip filename extensions. The role database does not include the # metadata extension. metadata_name_extension = metadata_name + for metadata_extension in METADATA_EXTENSIONS: if metadata_name.endswith(metadata_extension): metadata_name = metadata_name[:-len(metadata_extension)] - + break + + else: + logger.debug(repr(metadata_name) + ' does not match' + ' supported extension ' + repr(metadata_extension)) + + if metadata_name in ['root', 'targets', 'snapshot', 'timestamp']: + return + # Delete the metadata file if it does not exist in 'tuf.roledb'. # 'repository_tool.py' might have removed 'metadata_name,' # but its metadata file is not actually deleted yet. Do it now. @@ -471,18 +483,14 @@ def _delete_obsolete_metadata(metadata_directory, snapshot_metadata, logger.info('Removing outdated metadata: ' + repr(metadata_path)) os.remove(metadata_path) - # Delete outdated consistent snapshots. Snapshot metadata includes the - # file extension of roles. TODO: Should we leave it up to integrators - # to remove outdated consistent snapshots? - """ - if consistent_snapshot and embedded_version_number is not None: - file_hashes = list(snapshot_metadata['meta'][metadata_name_extension] \ - ['hashes'].values()) - if embedded_digest not in file_hashes: - logger.info('Removing outdated metadata: ' + repr(metadata_path)) - os.remove(metadata_path) - """ + else: + logger.debug('Not removing metadata: ' + repr(metadata_path)) + # TODO: Should we delete outdated consistent snapshots, or does it make + # more sense for integrators to remove outdated consistent snapshots? + + else: + logger.debug('Metadata directory does not exist: ' + repr(metadata_directory)) @@ -518,8 +526,12 @@ def _strip_version_number(metadata_filename, consistent_snapshot): dirname, basename = os.path.split(metadata_filename) version_number, basename = basename.split('.', 1) stripped_metadata_filename = os.path.join(dirname, basename) - - return stripped_metadata_filename, version_number + + if not version_number.isdigit(): + return metadata_filename, '' + + else: + return stripped_metadata_filename, version_number else: return metadata_filename, '' @@ -530,7 +542,7 @@ def _strip_version_number(metadata_filename, consistent_snapshot): def _load_top_level_metadata(repository, top_level_filenames): """ Load the metadata of the Root, Timestamp, Targets, and Snapshot roles. At a - minimum, the Root role must exist and successfully load. + minimum, the Root role must exist and load successfully. """ root_filename = top_level_filenames[ROOT_FILENAME] @@ -546,6 +558,7 @@ def _load_top_level_metadata(repository, top_level_filenames): # Load 'root.json'. A Root role file without a version number is always # written. if os.path.exists(root_filename): + # Initialize the key and role metadata of the top-level roles. signable = tuf.util.load_json_file(root_filename) tuf.formats.check_signable_object_format(signable) @@ -559,9 +572,16 @@ def _load_top_level_metadata(repository, top_level_filenames): for signature in signable['signatures']: if signature not in roleinfo['signatures']: roleinfo['signatures'].append(signature) + + else: + logger.debug('Found a Root signature that is already loaded:' + ' ' + repr(signature)) if os.path.exists(root_filename + '.gz'): roleinfo['compressions'].append('gz') + + else: + logger.debug('A compressed Root file was not found.') # By default, roleinfo['partial_loaded'] of top-level roles should be set # to False in 'create_roledb_from_root_metadata()'. Update this field, if @@ -569,6 +589,9 @@ def _load_top_level_metadata(repository, top_level_filenames): if _metadata_is_partially_loaded('root', signable, roleinfo): roleinfo['partial_loaded'] = True + else: + logger.debug('Root was not partially loaded.') + _log_warning_if_expires_soon(ROOT_FILENAME, roleinfo['expires'], ROOT_EXPIRES_WARN_SECONDS) @@ -578,8 +601,8 @@ def _load_top_level_metadata(repository, top_level_filenames): consistent_snapshot = root_metadata['consistent_snapshot'] else: - message = 'Cannot load the required root file: ' + repr(root_filename) - raise tuf.RepositoryError(message) + raise tuf.RepositoryError('Cannot load the required root file:' + ' ' + repr(root_filename)) # Load 'timestamp.json'. A Timestamp role file without a version number is # always written. @@ -595,9 +618,15 @@ def _load_top_level_metadata(repository, top_level_filenames): roleinfo['version'] = timestamp_metadata['version'] if os.path.exists(timestamp_filename + '.gz'): roleinfo['compressions'].append('gz') + + else: + logger.debug('A compressed Timestamp file was not found.') if _metadata_is_partially_loaded('timestamp', signable, roleinfo): roleinfo['partial_loaded'] = True + + else: + logger.debug('The Timestamp role was not partially loaded.') _log_warning_if_expires_soon(TIMESTAMP_FILENAME, roleinfo['expires'], TIMESTAMP_EXPIRES_WARN_SECONDS) @@ -605,7 +634,7 @@ def _load_top_level_metadata(repository, top_level_filenames): tuf.roledb.update_roleinfo('timestamp', roleinfo, mark_role_as_dirty=False) else: - pass + logger.debug('Cannot load the Timestamp file: ' + repr(timestamp_filename)) # Load 'snapshot.json'. A consistent snapshot.json must be calculated if # 'consistent_snapshot' is True. @@ -633,9 +662,15 @@ def _load_top_level_metadata(repository, top_level_filenames): roleinfo['version'] = snapshot_metadata['version'] if os.path.exists(snapshot_filename + '.gz'): roleinfo['compressions'].append('gz') + + else: + logger.debug('A compressed Snapshot file was not loaded.') if _metadata_is_partially_loaded('snapshot', signable, roleinfo): roleinfo['partial_loaded'] = True + + else: + logger.debug('Snapshot was not partially loaded.') _log_warning_if_expires_soon(SNAPSHOT_FILENAME, roleinfo['expires'], SNAPSHOT_EXPIRES_WARN_SECONDS) @@ -643,7 +678,7 @@ def _load_top_level_metadata(repository, top_level_filenames): tuf.roledb.update_roleinfo('snapshot', roleinfo, mark_role_as_dirty=False) else: - pass + logger.debug('The Snapshot file cannot be loaded: ' + repr(snapshot_filename)) # Load 'targets.json'. A consistent snapshot of the Targets role must be # calculated if 'consistent_snapshot' is True. @@ -669,9 +704,15 @@ def _load_top_level_metadata(repository, top_level_filenames): roleinfo['delegations'] = targets_metadata['delegations'] if os.path.exists(targets_filename + '.gz'): roleinfo['compressions'].append('gz') + + else: + logger.debug('Compressed Targets file cannot be loaded.') if _metadata_is_partially_loaded('targets', signable, roleinfo): roleinfo['partial_loaded'] = True + + else: + logger.debug('Targets file was not partially loaded.') _log_warning_if_expires_soon(TARGETS_FILENAME, roleinfo['expires'], TARGETS_EXPIRES_WARN_SECONDS) @@ -707,7 +748,7 @@ def _load_top_level_metadata(repository, top_level_filenames): tuf.roledb.add_role(rolename, roleinfo) else: - pass + logger.debug('The Targets file cannot be loaded: ' + repr(targets_filename)) return repository, consistent_snapshot @@ -1442,11 +1483,11 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot, # This is not a recognized key. Raise an exception. else: - raise tuf.Error('Unsupported keytype: '+keyid) + raise tuf.Error('Unsupported keytype: ' + keyid) # Do we have a duplicate? if keyid in keyids: - raise tuf.Error('Same keyid listed twice: '+keyid) + raise tuf.Error('Same keyid listed twice: ' + keyid) # Add the loaded keyid for the role being processed. keyids.append(keyid) @@ -1556,9 +1597,9 @@ def generate_targets_metadata(targets_directory, target_files, version, # Ensure all target files listed in 'target_files' exist. If just one of # these files does not exist, raise an exception. if not os.path.exists(target_path): - message = repr(target_path) + ' cannot be read. Unable to generate ' +\ - 'targets metadata.' - raise tuf.Error(message) + raise tuf.Error(repr(target_path) + ' cannot be read.' + ' Unable to generate targets metadata.') + # Add 'custom' if it has been provided. Custom data about the target is # optional and will only be included in metadata (i.e., a 'custom' field in @@ -1580,7 +1621,10 @@ def generate_targets_metadata(targets_directory, target_files, version, if not os.path.exists(digest_target): logger.warning('Hard linking target file to ' + repr(digest_target)) os.link(target_path, digest_target) - + + else: + logger.debug(repr(digest_target) + ' already exists.') + # Generate the targets metadata object. targets_metadata = tuf.formats.TargetsFile.make_metadata(version, expiration_date, @@ -1814,23 +1858,12 @@ def sign_metadata(metadata_object, keyids, filename): # keyid of 'keyids'. signable = tuf.formats.make_signable(metadata_object) - # Sign the metadata with each keyid in 'keyids'. + # Sign the metadata with each keyid in 'keyids'. 'signable' should have + # zero signatures (metadata_object contained none). for keyid in keyids: # Load the signing key. key = tuf.keydb.get_key(keyid) - # TODO logger.info('Signing ' + repr(filename) + ' with ' + key['keyid']) - - # Create a new signature list. If 'keyid' is encountered, do not add it - # to the new list. - signatures = [] - for signature in signable['signatures']: - if not keyid == signature['keyid']: - signatures.append(signature) - - else: - continue - signable['signatures'] = signatures # Generate the signature using the appropriate signing method. if key['keytype'] in SUPPORTED_KEY_TYPES: @@ -1926,55 +1959,38 @@ def write_metadata_file(metadata, filename, version_number, # if re-saving is required. file_content = _get_written_metadata(metadata) - # Verify whether new metadata needs to be written (i.e., has not been - # previously written or has changed. - write_new_metadata = False + # We previously verified whether new metadata needed to be written (i.e., has + # not been previously written or has changed). It is now assumed that the + # caller intends to write changes that have been marked as dirty. - # Has the uncompressed metadata changed? Does it exist? If so, set - # 'write_compressed_version' to 'True' so that it is written. - # Compressed metadata should only be written if it does not exist or the - # uncompressed version has changed). - new_digests = {} - for hash_algorithm in tuf.conf.REPOSITORY_HASH_ALGORITHMS: - digest_object = tuf.hash.digest(hash_algorithm) - digest_object.update(file_content) - new_digests.update({hash_algorithm: digest_object.hexdigest()}) - - try: - file_length_junk, old_digests = tuf.util.get_file_details(written_filename) - if old_digests != new_digests: - write_new_metadata = True + # The 'metadata' object is written to 'file_object', including compressed + # versions. To avoid partial metadata from being written, 'metadata' is + # first written to a temporary location (i.e., 'file_object') and then + # moved to 'filename'. + file_object = tuf.util.TempFile() - # 'tuf.Error' raised if 'filename' does not exist. - except tuf.Error: - write_new_metadata = True - - if write_new_metadata: - # The 'metadata' object is written to 'file_object', including compressed - # versions. To avoid partial metadata from being written, 'metadata' is - # first written to a temporary location (i.e., 'file_object') and then - # moved to 'filename'. - file_object = tuf.util.TempFile() + # Serialize 'metadata' to the file-like object and then write + # 'file_object' to disk. The dictionary keys of 'metadata' are sorted + # and indentation is used. The 'tuf.util.TempFile' file-like object is + # automically closed after the final move. + file_object.write(file_content) + logger.debug('Saving ' + repr(written_filename)) + + file_object.move(written_filename) + + if consistent_snapshot: + dirname, basename = os.path.split(written_filename) - # Serialize 'metadata' to the file-like object and then write - # 'file_object' to disk. The dictionary keys of 'metadata' are sorted - # and indentation is used. The 'tuf.util.TempFile' file-like object is - # automically closed after the final move. - file_object.write(file_content) - logger.debug('Saving ' + repr(written_filename)) - - file_object.move(written_filename) - - if consistent_snapshot: - dirname, basename = os.path.split(written_filename) - - basename = basename.split(METADATA_EXTENSION, 1)[0] - version_and_filename = str(version_number) + '.' + basename + METADATA_EXTENSION - written_consistent_filename = os.path.join(dirname, version_and_filename) + basename = basename.split(METADATA_EXTENSION, 1)[0] + version_and_filename = str(version_number) + '.' + basename + METADATA_EXTENSION + written_consistent_filename = os.path.join(dirname, version_and_filename) + + logger.info('Linking ' + repr(written_consistent_filename)) + os.link(written_filename, written_consistent_filename) + + else: + logger.info('Not linking a consistent filename for: ' + repr(written_filename)) - logger.info('Linking ' + repr(written_consistent_filename)) - os.link(written_filename, written_consistent_filename) - # Generate the compressed versions of 'metadata', if necessary. A compressed # file may be written (without needing to write the uncompressed version) if # the repository maintainer adds compression after writing the uncompressed @@ -2001,15 +2017,20 @@ def write_metadata_file(metadata, filename, version_number, finally: gzip_object.close() - else: - raise tuf.FormatError('Unknown compression algorithm: ' + repr(compressio_algorithm)) + # This else clause should not be reached because the + # 'compression_algorithms' list is validated against the + # COMPRESSIONS_SCHEMA above. + else: # pragma: no cover + raise tuf.FormatError('Unknown compression algorithm:' + ' ' + repr(compression_algorithm)) # Save the compressed version, ensuring an unchanged file is not re-saved. # Re-saving the same compressed version may cause its digest to # unexpectedly change (gzip includes a timestamp) even though content has # not changed. _write_compressed_metadata(file_object, compressed_filename, - write_new_metadata, consistent_snapshot, version_number) + True, consistent_snapshot, + version_number) return written_filename @@ -2029,7 +2050,7 @@ def _write_compressed_metadata(file_object, compressed_filename, # If a consistent snapshot is unneeded, 'file_object' may be simply moved # 'compressed_filename' if not already written. if not consistent_snapshot: - if not os.path.exists(compressed_filename) or write_new_metadata: + if write_new_metadata or not os.path.exists(compressed_filename): file_object.move(compressed_filename) # The temporary file must be closed if 'file_object.move()' is not used. @@ -2038,43 +2059,34 @@ def _write_compressed_metadata(file_object, compressed_filename, else: file_object.close_temp_file() - # Consistent snapshots = True. Ensure the file's digest is included in the + # consistent snapshots = True. Ensure the version number is included in the # compressed filename written, provided it does not already exist. else: compressed_content = file_object.read() - new_digests = [] - consistent_filenames = [] - - # Multiple snapshots may be written if the repository uses multiple - # hash algorithms. Generate the digest of the compressed content. - for hash_algorithm in tuf.conf.REPOSITORY_HASH_ALGORITHMS: - digest_object = tuf.hash.digest(hash_algorithm) - digest_object.update(compressed_content) - new_digests.append(digest_object.hexdigest()) - - # Attach each version number to the compressed consistent snapshot filename. - for new_digest in new_digests: - dirname, basename = os.path.split(compressed_filename) - for compression_extension in SUPPORTED_COMPRESSION_EXTENSIONS: - if basename.endswith(compression_extension): - basename = basename.split(compression_extension, 1)[0] - version_and_filename = str(version_number) + '.' + basename + compression_extension + consistent_filename = None + version_and_filename = None + + # Attach the version number to the compressed, consistent snapshot filename. + dirname, basename = os.path.split(compressed_filename) + + for compression_extension in SUPPORTED_COMPRESSION_EXTENSIONS: + if basename.endswith(compression_extension): + basename = basename.split(compression_extension, 1)[0] + version_and_filename = str(version_number) + '.' + basename + compression_extension + consistent_filename = os.path.join(dirname, version_and_filename) - consistent_filenames.append(os.path.join(dirname, version_and_filename)) + else: + logger.debug('Skipping compression extension: ' + repr(compression_extension)) # Move the 'tuf.util.TempFile' object to one of the filenames so that it is - # saved and the temporary file closed. Any remaining consistent snapshots - # may still need to be copied or linked. - compressed_filename = consistent_filenames.pop() - if not os.path.exists(compressed_filename): - logger.info('Saving ' + repr(compressed_filename)) - file_object.move(compressed_filename) + # saved and the temporary file closed. + if not os.path.exists(consistent_filename): + logger.info('Saving ' + repr(consistent_filename)) + file_object.move(consistent_filename) - # Save any remaining compressed consistent snapshots. - for consistent_filename in consistent_filenames: - if not os.path.exists(consistent_filename): - logger.info('Linking ' + repr(consistent_filename)) - os.link(compressed_filename, consistent_filename) + else: + logger.debug('Skipping already written compressed file:' + ' ' + repr(consistent_filename)) diff --git a/tuf/repository_tool.py b/tuf/repository_tool.py index b8587d59..a1d1682a 100755 --- a/tuf/repository_tool.py +++ b/tuf/repository_tool.py @@ -240,11 +240,10 @@ def write(self, write_partial=False, consistent_snapshot=False, 'timestamp': os.path.join(self._metadata_directory, repo_lib.TIMESTAMP_FILENAME)} snapshot_signable = None - dirty_rolenames = tuf.roledb.get_dirty_roles() for dirty_rolename in dirty_rolenames: - + # Ignore top-level roles, they will be generated later in this method. if dirty_rolename in ['root', 'targets', 'snapshot', 'timestamp']: continue @@ -449,7 +448,31 @@ def dirty_roles(self): """ logger.info('Dirty roles: ' + str(tuf.roledb.get_dirty_roles())) - + + + + def mark_dirty(self, roles): + """ + + Mark the list of 'roles' as dirty. + + + roles: + A list of roles to mark as dirty. on the next write, these roles + will be written to disk. + + + None. + + + None. + + + None. + """ + + tuf.roledb.mark_dirty(roles) + @staticmethod @@ -2835,7 +2858,7 @@ def load_repository(repository_directory): targets_objects = {} loaded_metadata = [] targets_objects['targets'] = repository.targets - + for metadata_role in os.listdir(metadata_directory): metadata_path = os.path.join(metadata_directory, metadata_role) diff --git a/tuf/roledb.py b/tuf/roledb.py index 845cabd9..d0bfb4a1 100755 --- a/tuf/roledb.py +++ b/tuf/roledb.py @@ -428,6 +428,45 @@ def get_dirty_roles(repository_name='default'): +def mark_dirty(roles, repository_name='default'): + """ + + Mark the list of 'roles' as dirty. + + + repository_name: + The name of the repository to get the dirty roles. If not supplied, the + 'default' repository is searched. + + roles: + A list of roles that should be marked as dirty. + + + tuf.FormatError, if the arguments are improperly formatted. + + tuf.InvalidNameError, if 'repository_name' does not exist in the role + database. + + + None. + + + None. + """ + + # Are the arguments properly formatted? If not, raise tuf.FormatError. + tuf.formats.NAMES_SCHEMA.check_match(roles) + tuf.formats.NAME_SCHEMA.check_match(repository_name) + + global _roledb_dict + global _dirty_roles + + if repository_name not in _roledb_dict or repository_name not in _dirty_roles: + raise tuf.InvalidNameError('Repository name does not' ' exist: ' + + repository_name) + + _dirty_roles[repository_name].update(roles) + def role_exists(rolename, repository_name='default'): @@ -872,6 +911,8 @@ def clear_roledb(repository_name='default', clear_all=False): if clear_all: _roledb_dict = {} _roledb_dict['default'] = {} + _dirty_roles = {} + _dirty_roles['default'] = set() return _roledb_dict[repository_name] = {} diff --git a/tuf/client/basic_client.py b/tuf/scripts/basic_client.py similarity index 100% rename from tuf/client/basic_client.py rename to tuf/scripts/basic_client.py diff --git a/tuf/scripts/tuf.py b/tuf/scripts/tuf.py new file mode 100755 index 00000000..f87de6f6 --- /dev/null +++ b/tuf/scripts/tuf.py @@ -0,0 +1,249 @@ +#!/usr/bin/env python + +""" + + tuf.py + + + Vladimir Diaz + + + August 2016. + + + See LICENSE for licensing information. + + + Provide a command line interface to the repository tool + (i.e., tuf.repository_tool.py). This CLI removes the need to write code, + which is required by the repository and developer tools. + + + $ tuf.py --init [--consistent-snapshot=false] + $ tuf.py --gen-key --keytype --keystore [--expires=] + $ tuf.py --add --repo + $ tuf.py --remove --repo + $ tuf.py --snapshot + $ tuf.py --timestamp + $ tuf.py --sign --repo + $ tuf.py --commit + $ tuf.py --regenerate + $ tuf.py --clean --repo + + + --init + + --gen-key + + --add + + --remove + + --snapshot + + --timestamp + + --sign + + --commit + + --regenerate + + --clean + + --verbose: + Set the verbosity level of logging messages. Accepts values 1-5. +""" + +# Help with Python 3 compatibility, where the print statement is a function, an +# implicit relative import is invalid, and the '/' operator performs true +# division. Example: print 'hello world' raises a 'SyntaxError' exception. +from __future__ import print_function +from __future__ import absolute_import +from __future__ import division +from __future__ import unicode_literals + +import sys +import optparse +import logging + +import tuf +import tuf.log +import tuf.formats + +from tuf.repository_tool import * + +# See 'log.py' to learn how logging is handled in TUF. +logger = logging.getLogger('tuf.tuf') + + +def update_repository(repository_path, command, command_arguments): + """ + + Update or create the repository found in 'repository_path'. What to + update is determined by the 'command,' which can correspond to one of the + supported repository tool functions. + + + repository_path: + + command: + + command_arguments: + + + tuf.FormatError, if any of the arugments are improperly formatted. + + + The TUF repository at 'repository_path' is either created or modified. + + + None. + """ + + # Do the arguments have the correct format? + tuf.formats.URL_SCHEMA.check_match(repository_path) + tuf.formats.NAME_SCHEMA.check_match(command) + tuf.formats.COMMAND_SCHEMA.check_match(command_arguments) + + # Set the local repository directory containing all of the metadata files. + tuf.conf.repository_directory = repository_path + + if command == 'init': + repository = create_new_repository(repository_path) + + # Import the root key(s). + try: + if command_arguments['keytype'] == 'ed25519': + repository.root.load_signing_key + + # Write the changes to the staged repository directory. + repository.write(consistent_snapshot=command_arguments['consistent_snapshot']) + + + elif command = 'gen-key': + command_arguments + + +def parse_options(): + """ + + Parse the command-line options and set the logging level + as specified by the user through the --verbose option. + The 'tuf' command expects the repository path to be set by the user. + + Example: + $ python --init ./repository --consistent-snapshot=false --verbose 3 + + If a required option is unset, a parser error is printed and the scripts + exits. + + + None. + + + None. + + + Sets the logging level for TUF logging. + + + A tuple ('options.REPOSITORY_PATH', command, command_arguments). 'command' + 'command_arguments' correspond to a repository tool fuction. + + """ + + parser = optparse.OptionParser() + + # Add the options supported by 'tuf.py' to the option parser. + parser.add_option('--verbose', dest='VERBOSE', type=int, default=2, + help='Set the verbosity level of logging messages.' + 'The lower the setting, the greater the verbosity.') + + parser.add_option('--init', dest='INIT', type='string', default='.', + help='') + + parser.add_option('--gen-key', dest='GEN-KEY', type='string', default='.', + help='') + + parser.add_option('--keytype', dest='KEYTYPE', type='string', default='ed25519', + help='') + + parser.add_option('--expires', dest='EXPIRES', type=int, default=365, + help='') + + parser.add_option('--add', dest='ADD', type='string', default='', + help='') + + parser.add_option('--remove', dest='REMOVE', type='string', default='', + help='') + + parser.add_option('--snapshot', dest='SNAPSHOT', type='string', default='.', + help='') + + parser.add_option('--timestamp', dest='TIMESTAMP', type='string', default='.', + help='') + + parser.add_option('--sign', dest='SIGN', type='string', default='.', + help='') + + parser.add_option('--commit', dest='COMMIT', type='string', default='.', + help='') + + parser.add_option('--regenerate', dest='REGENERATE', type='string', default='.', + help='') + + parser.add_option('--clean', dest='CLEAN', type='string', default='.', + help='') + + options, args = parser.parse_args() + + # Set the logging level. + if options.VERBOSE == 5: + tuf.log.set_log_level(logging.CRITICAL) + + elif options.VERBOSE == 4: + tuf.log.set_log_level(logging.ERROR) + + elif options.VERBOSE == 3: + tuf.log.set_log_level(logging.WARNING) + + elif options.VERBOSE == 2: + tuf.log.set_log_level(logging.INFO) + + elif options.VERBOSE == 1: + tuf.log.set_log_level(logging.DEBUG) + + else: + tuf.log.set_log_level(logging.NOTSET) + + # Ensure the repository path was set by the user. + if options.REPOSITORY_PATH is None: + parser.error('The repository path is unknown.') + + # Return a tuple containing the repository path, command, and command + # arguments needed by the repository tool. + return options.REPOSITORY_PATH, command, command_options + + + +if __name__ == '__main__': + + # Parse the options and set the logging level. + repository_path, command, command_arguments = parse_options() + + # Update the repository depending on the option specified on the command + # line. For example, + # tuf.repository_tool.generate_and_write_ed25519_keypair('./path/to/keystore/root') + # is called if the user invokes: + # $ tuf --gen-key root --keystore ./path/to/keystore --keytype ed25519 + + try: + update_repository(repository_path, command, command_arguments) + + except (tuf.Error) as e: + sys.stderr.write('Error: ' + str(e) + '\n') + sys.exit(1) + + # Successfully updated the local repository. + sys.exit(0)