mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Merge branch 'generate_switches' of https://github.com/SantiagoTorres/tuf into SantiagoTorres-generate_switches
This commit is contained in:
commit
f26f6dc4b2
17 changed files with 200 additions and 80 deletions
|
|
@ -118,3 +118,11 @@ TUF has four major classes of users: clients, for whom TUF is largely transparen
|
|||
* [Low-level Integration](tuf/client/README.md)
|
||||
|
||||
* [High-level Integration](tuf/interposition/README.md)
|
||||
|
||||
## Acknowledgements
|
||||
|
||||
This material is based upon work supported by the National Science Foundation
|
||||
under Grant No. CNS-1345049 and CNS-0959138. Any opinions, findings, and
|
||||
conclusions or recommendations expressed in this material are those of the
|
||||
author(s) and do not necessarily reflect the views of the National Science
|
||||
Foundation.
|
||||
|
|
|
|||
|
|
@ -17,4 +17,7 @@
|
|||
# http://nvie.com/posts/pin-your-packages/
|
||||
pycrypto==2.6.1
|
||||
pynacl==0.2.3
|
||||
|
||||
# Testing requirements. The rest of the testing dependencies available in
|
||||
# 'tox.ini'
|
||||
tox
|
||||
|
|
|
|||
4
setup.py
4
setup.py
|
|
@ -96,6 +96,10 @@
|
|||
'Programming Language :: Python :: 2',
|
||||
'Programming Language :: Python :: 2.6',
|
||||
'Programming Language :: Python :: 2.7',
|
||||
'Programming Language :: Python :: 3',
|
||||
'Programming Language :: Python :: 3.2',
|
||||
'Programming Language :: Python :: 3.3',
|
||||
'Programming Language :: Python :: 3.4',
|
||||
'Programming Language :: Python :: Implementation :: CPython',
|
||||
'Topic :: Security',
|
||||
'Topic :: Software Development'
|
||||
|
|
|
|||
|
|
@ -29,25 +29,41 @@
|
|||
from tuf.repository_tool import *
|
||||
import tuf.util
|
||||
|
||||
import optparse
|
||||
|
||||
parser = optparse.OptionParser()
|
||||
parser.add_option("-k","--keys", action='store_true', dest="should_generate_keys",
|
||||
help="Generate a new set of keys", default=False)
|
||||
parser.add_option("-d","--dry-run", action='store_true', dest="dry_run",
|
||||
help="Do not write the files, just run", default=False)
|
||||
(options, args) = parser.parse_args()
|
||||
|
||||
|
||||
repository = create_new_repository('repository')
|
||||
|
||||
# Generate and save the top-level role keys, including the delegated roles.
|
||||
# The unit tests should only have to import the keys they need from these
|
||||
# pre-generated key files.
|
||||
root_key_file = 'keystore/root_key'
|
||||
targets_key_file = 'keystore/targets_key'
|
||||
snapshot_key_file = 'keystore/snapshot_key'
|
||||
timestamp_key_file = 'keystore/timestamp_key'
|
||||
delegation_key_file = 'keystore/delegation_key'
|
||||
|
||||
# Generate public and private key files for the top-level roles, and two
|
||||
# delegated roles (these number of keys should be sufficient for most of the
|
||||
# unit tests). Unit tests may generate additional keys, if needed.
|
||||
generate_and_write_rsa_keypair(root_key_file, bits=2048, password='password')
|
||||
generate_and_write_rsa_keypair(targets_key_file, bits=2048, password='password')
|
||||
generate_and_write_rsa_keypair(snapshot_key_file, bits=2048, password='password')
|
||||
generate_and_write_rsa_keypair(timestamp_key_file, bits=2048, password='password')
|
||||
generate_and_write_rsa_keypair(delegation_key_file, bits=2048, password='password')
|
||||
|
||||
if options.should_generate_keys and not options.dry_run:
|
||||
# Generate and save the top-level role keys, including the delegated roles.
|
||||
# The unit tests should only have to import the keys they need from these
|
||||
# pre-generated key files.
|
||||
# Generate public and private key files for the top-level roles, and two
|
||||
# delegated roles (these number of keys should be sufficient for most of the
|
||||
# unit tests). Unit tests may generate additional keys, if needed.
|
||||
generate_and_write_rsa_keypair(root_key_file, bits=2048, password='password')
|
||||
generate_and_write_rsa_keypair(targets_key_file, bits=2048,
|
||||
password='password')
|
||||
generate_and_write_rsa_keypair(snapshot_key_file, bits=2048,
|
||||
password='password')
|
||||
generate_and_write_rsa_keypair(timestamp_key_file, bits=2048,
|
||||
password='password')
|
||||
generate_and_write_rsa_keypair(delegation_key_file, bits=2048,
|
||||
password='password')
|
||||
|
||||
# Import the public keys. These keys are needed so that metadata roles are
|
||||
# assigned verification keys, which clients use to verify the signatures created
|
||||
|
|
@ -88,14 +104,15 @@
|
|||
target3_filepath = 'repository/targets/file3.txt'
|
||||
tuf.util.ensure_parent_dir(target2_filepath)
|
||||
|
||||
with open(target1_filepath, 'wt') as file_object:
|
||||
file_object.write('This is an example target file.')
|
||||
if not options.dry_run:
|
||||
with open(target1_filepath, 'wt') as file_object:
|
||||
file_object.write('This is an example target file.')
|
||||
|
||||
with open(target2_filepath, 'wt') as file_object:
|
||||
file_object.write('This is an another example target file.')
|
||||
with open(target2_filepath, 'wt') as file_object:
|
||||
file_object.write('This is an another example target file.')
|
||||
|
||||
with open(target3_filepath, 'wt') as file_object:
|
||||
file_object.write('This is role1\'s target file.')
|
||||
with open(target3_filepath, 'wt') as file_object:
|
||||
file_object.write('This is role1\'s target file.')
|
||||
|
||||
# Add target files to the top-level 'targets.json' role. These target files
|
||||
# should already exist.
|
||||
|
|
@ -119,15 +136,18 @@
|
|||
repository.targets.compressions = ['gz']
|
||||
|
||||
# Create the actual metadata files, which are saved to 'metadata.staged'.
|
||||
repository.write()
|
||||
if not options.dry_run:
|
||||
repository.write()
|
||||
|
||||
# Move the staged.metadata to 'metadata' and create the client folder. The
|
||||
# client folder, which includes the required directory structure and metadata
|
||||
# files for clients to successfully load an 'tuf.client.updater.py' object.
|
||||
staged_metadata_directory = 'repository/metadata.staged'
|
||||
metadata_directory = 'repository/metadata'
|
||||
shutil.copytree(staged_metadata_directory, metadata_directory)
|
||||
if not options.dry_run:
|
||||
shutil.copytree(staged_metadata_directory, metadata_directory)
|
||||
|
||||
# Create the client files (required directory structure and minimal metadata)
|
||||
# required by the 'tuf.interposition' and 'tuf.client.updater.py' updaters.
|
||||
create_tuf_client_directory('repository', 'client')
|
||||
if not options.dry_run:
|
||||
create_tuf_client_directory('repository', 'client')
|
||||
|
|
|
|||
|
|
@ -89,7 +89,7 @@ def setUpClass(cls):
|
|||
|
||||
# NOTE: Following error is raised if a delay is not applied:
|
||||
# <urlopen error [Errno 111] Connection refused>
|
||||
time.sleep(.2)
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -170,13 +170,28 @@ def test_download_url_to_tempfileobj_and_urls(self):
|
|||
|
||||
self.assertRaises(six.moves.urllib.error.HTTPError,
|
||||
download_file,
|
||||
'http://localhost:'+str(self.PORT)+'/'+self.random_string(),
|
||||
'http://localhost:' + str(self.PORT) + '/' + self.random_string(),
|
||||
self.target_data_length)
|
||||
|
||||
self.assertRaises(six.moves.urllib.error.URLError,
|
||||
download_file,
|
||||
'http://localhost:'+str(self.PORT+1)+'/'+self.random_string(),
|
||||
'http://localhost:' + str(self.PORT+1) + '/' + self.random_string(),
|
||||
self.target_data_length)
|
||||
|
||||
|
||||
|
||||
def test__get_opener(self):
|
||||
# Test normal case.
|
||||
# A simple https server should be used to test the rest of the optional
|
||||
# ssl-related functions of 'tuf.download.py'.
|
||||
fake_cacert = self.make_temp_data_file()
|
||||
|
||||
with open(fake_cacert, 'wt') as file_object:
|
||||
file_object.write('fake cacert')
|
||||
|
||||
tuf.conf.ssl_certificates = fake_cacert
|
||||
tuf.download._get_opener('https')
|
||||
|
||||
|
||||
|
||||
# Run unit test.
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ def setUpClass(cls):
|
|||
|
||||
# NOTE: Following error is raised if a delay is not applied:
|
||||
# <urlopen error [Errno 111] Connection refused>
|
||||
time.sleep(.5)
|
||||
time.sleep(.8)
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -92,7 +92,7 @@ def setUpClass(cls):
|
|||
|
||||
# NOTE: Following error is raised if a delay is not applied:
|
||||
# <urlopen error [Errno 111] Connection refused>
|
||||
time.sleep(.2)
|
||||
time.sleep(.8)
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -96,7 +96,7 @@ def setUpClass(cls):
|
|||
|
||||
# NOTE: Following error is raised if a delay is not applied:
|
||||
# <urlopen error [Errno 111] Connection refused>
|
||||
time.sleep(.2)
|
||||
time.sleep(.8)
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -97,7 +97,7 @@ def setUpClass(cls):
|
|||
|
||||
# NOTE: Following error is raised if a delay is not applied:
|
||||
# <urlopen error [Errno 111] Connection refused>
|
||||
time.sleep(.2)
|
||||
time.sleep(.8)
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -30,6 +30,7 @@
|
|||
import datetime
|
||||
import logging
|
||||
import tempfile
|
||||
import json
|
||||
import shutil
|
||||
import sys
|
||||
|
||||
|
|
@ -262,6 +263,20 @@ def test_import_ed25519_publickey_from_file(self):
|
|||
|
||||
self.assertRaises(tuf.Error, repo_lib.import_ed25519_publickey_from_file,
|
||||
invalid_keyfile)
|
||||
|
||||
# Invalid public key imported (contains unexpected keytype.)
|
||||
keytype = imported_ed25519_key['keytype']
|
||||
keyval = imported_ed25519_key['keyval']
|
||||
ed25519key_metadata_format = \
|
||||
tuf.keys.format_keyval_to_metadata(keytype, keyval, private=False)
|
||||
|
||||
ed25519key_metadata_format['keytype'] = 'invalid_keytype'
|
||||
with open(ed25519_keypath + '.pub', 'wb') as file_object:
|
||||
file_object.write(json.dumps(ed25519key_metadata_format).encode('utf-8'))
|
||||
|
||||
self.assertRaises(tuf.FormatError,
|
||||
repo_lib.import_ed25519_publickey_from_file,
|
||||
ed25519_keypath + '.pub')
|
||||
|
||||
|
||||
|
||||
|
|
@ -296,6 +311,32 @@ def test_import_ed25519_privatekey_from_file(self):
|
|||
|
||||
self.assertRaises(tuf.Error, repo_lib.import_ed25519_privatekey_from_file,
|
||||
invalid_keyfile, 'pw')
|
||||
|
||||
# Invalid private key imported (contains unexpected keytype.)
|
||||
imported_ed25519_key['keytype'] = 'invalid_keytype'
|
||||
|
||||
# Use 'pycrypto_keys.py' to bypass the key format validation performed by
|
||||
# 'keys.py'.
|
||||
salt, iterations, derived_key = \
|
||||
tuf.pycrypto_keys._generate_derived_key('pw')
|
||||
|
||||
# Store the derived key info in a dictionary, the object expected
|
||||
# by the non-public _encrypt() routine.
|
||||
derived_key_information = {'salt': salt, 'iterations': iterations,
|
||||
'derived_key': derived_key}
|
||||
|
||||
# Convert the key object to json string format and encrypt it with the
|
||||
# derived key.
|
||||
encrypted_key = \
|
||||
tuf.pycrypto_keys._encrypt(json.dumps(imported_ed25519_key),
|
||||
derived_key_information)
|
||||
|
||||
with open(ed25519_keypath, 'wb') as file_object:
|
||||
file_object.write(encrypted_key.encode('utf-8'))
|
||||
|
||||
self.assertRaises(tuf.FormatError,
|
||||
repo_lib.import_ed25519_privatekey_from_file,
|
||||
ed25519_keypath, 'pw')
|
||||
|
||||
|
||||
|
||||
|
|
@ -677,6 +718,12 @@ def test_create_tuf_client_directory(self):
|
|||
repository_directory, client_directory)
|
||||
|
||||
|
||||
def test__check_directory(self):
|
||||
# Test for non-existent directory.
|
||||
self.assertRaises(tuf.Error, repo_lib._check_directory, 'non-existent')
|
||||
|
||||
|
||||
|
||||
# Run the test cases.
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
|
|||
|
|
@ -1103,6 +1103,25 @@ def test_8_remove_obsolete_targets(self):
|
|||
# in 'destination_directory' remains the same.
|
||||
self.repository_updater.remove_obsolete_targets(destination_directory)
|
||||
self.assertTrue(os.listdir(destination_directory), 1)
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
def test_9__get_target_hash(self):
|
||||
# Test normal case.
|
||||
# Test target filepaths with ascii and non-ascii characters.
|
||||
expected_target_hashes = {
|
||||
'/file1.txt': 'e3a3d89eb3b70ce3fbce6017d7b8c12d4abd5635427a0e8a238f53157df85b3d',
|
||||
'/Jalape\xc3\xb1o': '78bfd5c314680545eb48ecad508aceb861f8d6e680f4fe1b791da45c298cda88'
|
||||
}
|
||||
for filepath, target_hash in six.iteritems(expected_target_hashes):
|
||||
self.assertTrue(tuf.formats.RELPATH_SCHEMA.matches(filepath))
|
||||
self.assertTrue(tuf.formats.HASH_SCHEMA.matches(target_hash))
|
||||
self.assertEqual(self.repository_updater._get_target_hash(filepath), target_hash)
|
||||
|
||||
# Test for improperly formatted argument.
|
||||
self.assertRaises(tuf.FormatError, tuf.util.get_target_hash, 8)
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
1
tox.ini
1
tox.ini
|
|
@ -19,6 +19,7 @@ deps =
|
|||
pynacl
|
||||
pycrypto
|
||||
|
||||
|
||||
[testenv:py26]
|
||||
deps =
|
||||
{[testenv]deps}
|
||||
|
|
|
|||
|
|
@ -2448,16 +2448,17 @@ def _visit_child_role(self, child_role, target_filepath):
|
|||
# 'role_name' should have been validated when it was downloaded.
|
||||
# The 'paths' or 'path_hash_prefixes' fields should not be missing,
|
||||
# so we raise a format error here in case they are both missing.
|
||||
raise tuf.FormatError(repr(child_role_name)+' has neither ' \
|
||||
raise tuf.FormatError(repr(child_role_name) + ' has neither ' \
|
||||
'"paths" nor "path_hash_prefixes".')
|
||||
|
||||
if child_role_is_relevant:
|
||||
logger.debug('Child role '+repr(child_role_name)+' has target '+
|
||||
logger.debug('Child role ' + repr(child_role_name) + ' has target ' + \
|
||||
repr(target_filepath))
|
||||
return child_role_name
|
||||
|
||||
else:
|
||||
logger.debug('Child role '+repr(child_role_name)+
|
||||
' does not have target '+repr(target_filepath))
|
||||
logger.debug('Child role ' + repr(child_role_name) + \
|
||||
' does not have target ' + repr(target_filepath))
|
||||
return None
|
||||
|
||||
|
||||
|
|
@ -2495,20 +2496,11 @@ def _get_target_hash(self, target_filepath, hash_function='sha256'):
|
|||
"""
|
||||
|
||||
# Calculate the hash of the filepath to determine which bin to find the
|
||||
# target. The client currently assumes the repository uses
|
||||
# 'hash_function' to generate hashes.
|
||||
|
||||
# target. The client currently assumes the repository (i.e., repository
|
||||
# tool) uses 'hash_function' to generate hashes and UTF-8.
|
||||
digest_object = tuf.hash.digest(hash_function)
|
||||
|
||||
try:
|
||||
digest_object.update(target_filepath)
|
||||
except UnicodeEncodeError:
|
||||
# Sometimes, there are Unicode characters in target paths. We assume a
|
||||
# UTF-8 encoding and try to hash that.
|
||||
digest_object = tuf.hash.digest(hash_function)
|
||||
encoded_target_filepath = target_filepath.encode('utf-8')
|
||||
digest_object.update(encoded_target_filepath)
|
||||
|
||||
encoded_target_filepath = target_filepath.encode('utf-8')
|
||||
digest_object.update(encoded_target_filepath)
|
||||
target_filepath_hash = digest_object.hexdigest()
|
||||
|
||||
return target_filepath_hash
|
||||
|
|
@ -2554,7 +2546,7 @@ def remove_obsolete_targets(self, destination_directory):
|
|||
for target in self.metadata['previous'][role]['targets']:
|
||||
if target not in self.metadata['current'][role]['targets']:
|
||||
# 'target' is only in 'previous', so remove it.
|
||||
logger.warning('Removing obsolete file: '+repr(target)+'.')
|
||||
logger.warning('Removing obsolete file: ' + repr(target) + '.')
|
||||
# Remove the file if it hasn't been removed already.
|
||||
destination = os.path.join(destination_directory, target)
|
||||
try:
|
||||
|
|
@ -2563,7 +2555,7 @@ def remove_obsolete_targets(self, destination_directory):
|
|||
except OSError as e:
|
||||
# If 'filename' already removed, just log it.
|
||||
if e.errno == errno.ENOENT:
|
||||
logger.info('File '+repr(destination)+' was already removed.')
|
||||
logger.info('File ' + repr(destination) + ' was already removed.')
|
||||
|
||||
else:
|
||||
logger.error(str(e))
|
||||
|
|
@ -2722,6 +2714,6 @@ def download_target(self, target, destination_directory):
|
|||
raise
|
||||
|
||||
else:
|
||||
logger.warning(str(target_dirpath)+' does not exist.')
|
||||
logger.warning(repr(target_dirpath) + ' does not exist.')
|
||||
|
||||
target_file_object.move(destination)
|
||||
|
|
|
|||
|
|
@ -461,8 +461,9 @@ def _get_content_length(connection):
|
|||
assert reported_length > -1
|
||||
|
||||
except:
|
||||
logger.exception('Could not get content length about '+str(connection)+
|
||||
' from server!')
|
||||
message = \
|
||||
'Could not get content length about ' + str(connection) + ' from server.'
|
||||
logger.exception(message)
|
||||
reported_length = None
|
||||
|
||||
finally:
|
||||
|
|
|
|||
|
|
@ -231,11 +231,14 @@ def _get_password(prompt='Password: ', confirm=False):
|
|||
# getpass() prompts the user for a password without echoing
|
||||
# the user input.
|
||||
password = getpass.getpass(prompt, sys.stderr)
|
||||
|
||||
if not confirm:
|
||||
return password
|
||||
password2 = getpass.getpass('Confirm: ', sys.stderr)
|
||||
|
||||
if password == password2:
|
||||
return password
|
||||
|
||||
else:
|
||||
print('Mismatch; try again.')
|
||||
|
||||
|
|
@ -246,10 +249,14 @@ def _get_password(prompt='Password: ', confirm=False):
|
|||
def _metadata_is_partially_loaded(rolename, signable, roleinfo):
|
||||
"""
|
||||
Non-public function that determines whether 'rolename' is loaded with
|
||||
at least 1 good signature, but an insufficient threshold (which means
|
||||
'rolename' was written to disk with repository.write_partial(). If 'rolename'
|
||||
is found to be partially loaded, mark it as partially loaded in its
|
||||
'tuf.roledb' roleinfo. This function exists to assist in deciding whether
|
||||
at least zero good signatures, but an insufficient threshold (which means
|
||||
'rolename' was written to disk with repository.write_partial()). A repository
|
||||
maintainer may write partial metadata without including a valid signature.
|
||||
Howerver, the final repository.write() must include a threshold number of
|
||||
signatures.
|
||||
|
||||
If 'rolename' is found to be partially loaded, mark it as partially loaded in
|
||||
its 'tuf.roledb' roleinfo. This function exists to assist in deciding whether
|
||||
a role's version number should be incremented when write() or write_parital()
|
||||
is called. Return True if 'rolename' was partially loaded, False otherwise.
|
||||
"""
|
||||
|
|
@ -259,7 +266,7 @@ def _metadata_is_partially_loaded(rolename, signable, roleinfo):
|
|||
status = tuf.sig.get_signature_status(signable, rolename)
|
||||
|
||||
if len(status['good_sigs']) < status['threshold'] and \
|
||||
len(status['good_sigs']) >= 1:
|
||||
len(status['good_sigs']) >= 0:
|
||||
return True
|
||||
|
||||
else:
|
||||
|
|
@ -299,7 +306,7 @@ def _check_directory(directory):
|
|||
|
||||
# Check if the directory exists.
|
||||
if not os.path.isdir(directory):
|
||||
raise tuf.Error(repr(directory)+' directory does not exist.')
|
||||
raise tuf.Error(repr(directory) + ' directory does not exist.')
|
||||
|
||||
directory = os.path.abspath(directory)
|
||||
|
||||
|
|
@ -326,14 +333,14 @@ def _check_role_keys(rolename):
|
|||
|
||||
# Raise an exception for an invalid threshold of public keys.
|
||||
if total_keyids < threshold:
|
||||
message = repr(rolename)+' role contains '+repr(total_keyids)+' / '+ \
|
||||
repr(threshold)+' public keys.'
|
||||
message = repr(rolename) + ' role contains ' + \
|
||||
repr(total_keyids) + ' / ' + repr(threshold) + ' public keys.'
|
||||
raise tuf.InsufficientKeysError(message)
|
||||
|
||||
# Raise an exception for an invalid threshold of signing keys.
|
||||
if total_signatures == 0 and total_signing_keys < threshold:
|
||||
message = repr(rolename)+' role contains '+repr(total_signing_keys)+' / '+ \
|
||||
repr(threshold)+' signing keys.'
|
||||
message = repr(rolename) + ' role contains ' + \
|
||||
repr(total_signing_keys) + ' / ' + repr(threshold) + ' signing keys.'
|
||||
raise tuf.InsufficientKeysError(message)
|
||||
|
||||
|
||||
|
|
@ -494,7 +501,7 @@ def _strip_consistent_snapshot_digest(metadata_filename, consistent_snapshot):
|
|||
embeded_digest = basename[:basename.find('.')]
|
||||
|
||||
# Ensure the digest, including the period, is stripped.
|
||||
basename = basename[basename.find('.')+1:]
|
||||
basename = basename[basename.find('.') + 1:]
|
||||
|
||||
metadata_filename = os.path.join(dirname, basename)
|
||||
|
||||
|
|
@ -754,7 +761,7 @@ def generate_and_write_rsa_keypair(filepath, bits=DEFAULT_RSA_KEY_BITS,
|
|||
tuf.formats.RSAKEYBITS_SCHEMA.check_match(bits)
|
||||
|
||||
# If the caller does not provide a password argument, prompt for one.
|
||||
if password is None:
|
||||
if password is None: # pragma: no cover
|
||||
message = 'Enter a password for the RSA key file: '
|
||||
password = _get_password(message, confirm=True)
|
||||
|
||||
|
|
@ -779,7 +786,7 @@ def generate_and_write_rsa_keypair(filepath, bits=DEFAULT_RSA_KEY_BITS,
|
|||
file_object.write(public.encode('utf-8'))
|
||||
|
||||
# The temporary file is closed after the final move.
|
||||
file_object.move(filepath+'.pub')
|
||||
file_object.move(filepath + '.pub')
|
||||
|
||||
# Write the private key in encrypted PEM format to '<filepath>'.
|
||||
# Unlike the public key file, the private key does not have a file
|
||||
|
|
@ -834,7 +841,7 @@ def import_rsa_privatekey_from_file(filepath, password=None):
|
|||
# If the caller does not provide a password argument, prompt for one.
|
||||
# Password confirmation disabled here, which should ideally happen only
|
||||
# when creating encrypted key files (i.e., improve usability).
|
||||
if password is None:
|
||||
if password is None: # pragma: no cover
|
||||
message = 'Enter a password for the encrypted RSA file: '
|
||||
password = _get_password(message, confirm=False)
|
||||
|
||||
|
|
@ -955,7 +962,7 @@ def generate_and_write_ed25519_keypair(filepath, password=None):
|
|||
tuf.formats.PATH_SCHEMA.check_match(filepath)
|
||||
|
||||
# If the caller does not provide a password argument, prompt for one.
|
||||
if password is None:
|
||||
if password is None: # pragma: no cover
|
||||
message = 'Enter a password for the ED25519 key: '
|
||||
password = _get_password(message, confirm=True)
|
||||
|
||||
|
|
@ -986,7 +993,7 @@ def generate_and_write_ed25519_keypair(filepath, password=None):
|
|||
file_object.write(json.dumps(ed25519key_metadata_format).encode('utf-8'))
|
||||
|
||||
# The temporary file is closed after the final move.
|
||||
file_object.move(filepath+'.pub')
|
||||
file_object.move(filepath + '.pub')
|
||||
|
||||
# Write the encrypted key string, conformant to
|
||||
# 'tuf.formats.ENCRYPTEDKEY_SCHEMA', to '<filepath>'.
|
||||
|
|
@ -1034,9 +1041,11 @@ def import_ed25519_publickey_from_file(filepath):
|
|||
ed25519_key_metadata = tuf.util.load_json_file(filepath)
|
||||
ed25519_key = tuf.keys.format_metadata_to_key(ed25519_key_metadata)
|
||||
|
||||
# Raise an exception if an unexpected key type is imported.
|
||||
if ed25519_key['keytype'] != 'ed25519':
|
||||
message = 'Invalid key type loaded: '+repr(ed25519_key['keytype'])
|
||||
# Raise an exception if an unexpected key type is imported.
|
||||
# Redundant validation of 'keytype'. 'tuf.keys.format_metadata_to_key()'
|
||||
# should have fully validated 'ed25519_key_metadata'.
|
||||
if ed25519_key['keytype'] != 'ed25519': # pragma: no cover
|
||||
message = 'Invalid key type loaded: ' + repr(ed25519_key['keytype'])
|
||||
raise tuf.FormatError(message)
|
||||
|
||||
return ed25519_key
|
||||
|
|
@ -1093,7 +1102,7 @@ def import_ed25519_privatekey_from_file(filepath, password=None):
|
|||
# If the caller does not provide a password argument, prompt for one.
|
||||
# Password confirmation disabled here, which should ideally happen only
|
||||
# when creating encrypted key files (i.e., improve usability).
|
||||
if password is None:
|
||||
if password is None: # pragma: no cover
|
||||
message = 'Enter a password for the encrypted ED25519 key: '
|
||||
password = _get_password(message, confirm=False)
|
||||
|
||||
|
|
@ -1115,7 +1124,7 @@ def import_ed25519_privatekey_from_file(filepath, password=None):
|
|||
|
||||
# Raise an exception if an unexpected key type is imported.
|
||||
if key_object['keytype'] != 'ed25519':
|
||||
message = 'Invalid key type loaded: '+repr(key_object['keytype'])
|
||||
message = 'Invalid key type loaded: ' + repr(key_object['keytype'])
|
||||
raise tuf.FormatError(message)
|
||||
|
||||
return key_object
|
||||
|
|
@ -1223,7 +1232,7 @@ def get_metadata_fileinfo(filename):
|
|||
tuf.formats.PATH_SCHEMA.check_match(filename)
|
||||
|
||||
if not os.path.isfile(filename):
|
||||
message = repr(filename)+' is not a file.'
|
||||
message = repr(filename) + ' is not a file.'
|
||||
raise tuf.Error(message)
|
||||
|
||||
# Note: 'filehashes' is a dictionary of the form
|
||||
|
|
@ -1330,7 +1339,7 @@ def generate_root_metadata(version, expiration_date, consistent_snapshot):
|
|||
|
||||
# If a top-level role is missing from 'tuf.roledb.py', raise an exception.
|
||||
if not tuf.roledb.role_exists(rolename):
|
||||
raise tuf.Error(repr(rolename)+' not in "tuf.roledb".')
|
||||
raise tuf.Error(repr(rolename) + ' not in "tuf.roledb".')
|
||||
|
||||
# Keep track of the keys loaded to avoid duplicates.
|
||||
keyids = []
|
||||
|
|
@ -1469,7 +1478,7 @@ def generate_targets_metadata(targets_directory, target_files, version,
|
|||
# Ensure all target files listed in 'target_files' exist. If just one of
|
||||
# these files does not exist, raise an exception.
|
||||
if not os.path.exists(target_path):
|
||||
message = repr(target_path)+' cannot be read. Unable to generate '+ \
|
||||
message = repr(target_path) + ' cannot be read. Unable to generate '+ \
|
||||
'targets metadata.'
|
||||
raise tuf.Error(message)
|
||||
|
||||
|
|
@ -1685,10 +1694,10 @@ def generate_timestamp_metadata(snapshot_filename, version,
|
|||
compressed_fileinfo = get_metadata_fileinfo(compressed_filename)
|
||||
|
||||
except:
|
||||
logger.warning('Cannot get fileinfo about '+repr(compressed_filename))
|
||||
logger.warning('Cannot get fileinfo about ' + repr(compressed_filename))
|
||||
|
||||
else:
|
||||
logger.info('Including fileinfo about '+repr(compressed_filename))
|
||||
logger.info('Including fileinfo about ' + repr(compressed_filename))
|
||||
fileinfo[SNAPSHOT_FILENAME + '.' + file_extension] = compressed_fileinfo
|
||||
|
||||
# Generate the timestamp metadata object.
|
||||
|
|
@ -1754,7 +1763,7 @@ def sign_metadata(metadata_object, keyids, filename):
|
|||
|
||||
# Load the signing key.
|
||||
key = tuf.keydb.get_key(keyid)
|
||||
logger.info('Signing '+repr(filename)+' with '+key['keyid'])
|
||||
logger.info('Signing ' + repr(filename) + ' with ' + key['keyid'])
|
||||
|
||||
# Create a new signature list. If 'keyid' is encountered, do not add it
|
||||
# to the new list.
|
||||
|
|
@ -1772,7 +1781,7 @@ def sign_metadata(metadata_object, keyids, filename):
|
|||
signable['signatures'].append(signature)
|
||||
|
||||
else:
|
||||
logger.warning('Private key unset. Skipping: '+repr(keyid))
|
||||
logger.warning('Private key unset. Skipping: ' + repr(keyid))
|
||||
|
||||
else:
|
||||
raise tuf.Error('The keydb contains a key with an invalid key type.')
|
||||
|
|
@ -2095,8 +2104,8 @@ def _log_status(rolename, signable):
|
|||
|
||||
status = tuf.sig.get_signature_status(signable, rolename)
|
||||
|
||||
message = repr(rolename)+' role contains '+ repr(len(status['good_sigs']))+\
|
||||
' / '+repr(status['threshold'])+' signatures.'
|
||||
message = repr(rolename) + ' role contains ' + repr(len(status['good_sigs']))+\
|
||||
' / ' + repr(status['threshold']) + ' signatures.'
|
||||
logger.info(message)
|
||||
|
||||
|
||||
|
|
@ -2173,6 +2182,7 @@ def create_tuf_client_directory(repository_directory, client_directory):
|
|||
message = 'Cannot create a fresh client metadata directory: '+ \
|
||||
repr(client_metadata_directory)+'. Already exists.'
|
||||
raise tuf.RepositoryError(message)
|
||||
|
||||
else:
|
||||
raise
|
||||
|
||||
|
|
|
|||
|
|
@ -2675,7 +2675,7 @@ def load_repository(repository_directory):
|
|||
roleinfo['paths'] = list(metadata_object['targets'].keys())
|
||||
roleinfo['delegations'] = metadata_object['delegations']
|
||||
|
||||
if os.path.exists(metadata_path+'.gz'):
|
||||
if os.path.exists(metadata_path + '.gz'):
|
||||
roleinfo['compressions'].append('gz')
|
||||
|
||||
# The roleinfo of 'metadata_name' should have been initialized with
|
||||
|
|
|
|||
Loading…
Reference in a new issue