mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Replace crypto imports in updater.py
This commit is contained in:
parent
ae59f8ff5b
commit
3600e748a3
1 changed files with 37 additions and 37 deletions
|
|
@ -119,14 +119,14 @@
|
|||
from simple_settings import settings
|
||||
import tuf.download
|
||||
import tuf.formats
|
||||
import tuf.ssl_crypto.hash
|
||||
import tuf.ssl_crypto.keys
|
||||
import tuf.ssl_crypto.keydb
|
||||
import securesystemslib.hash
|
||||
import securesystemslib.keys
|
||||
import tuf.keydb
|
||||
import tuf.log
|
||||
import tuf.mirrors
|
||||
import tuf.roledb
|
||||
import tuf.sig
|
||||
import tuf.ssl_crypto.util
|
||||
import securesystemslib.util
|
||||
|
||||
import six
|
||||
import iso8601
|
||||
|
|
@ -423,7 +423,7 @@ def _load_metadata_from_file(self, metadata_set, metadata_role):
|
|||
# Load the file. The loaded object should conform to
|
||||
# 'tuf.ssl_crypto.formats.SIGNABLE_SCHEMA'.
|
||||
try:
|
||||
metadata_signable = tuf.ssl_crypto.util.load_json_file(metadata_filepath)
|
||||
metadata_signable = securesystemslib.util.load_json_file(metadata_filepath)
|
||||
|
||||
# Although the metadata file may exist locally, it may not
|
||||
# be a valid json file. On the next refresh cycle, it will be
|
||||
|
|
@ -490,7 +490,7 @@ def _rebuild_key_and_role_db(self):
|
|||
# of these files. The metadata files for delegated roles are also not
|
||||
# loaded when the repository is first instantiated. Due to this setup,
|
||||
# reloading delegated roles is not required here.
|
||||
tuf.ssl_crypto.keydb.create_keydb_from_root_metadata(self.metadata['current']['root'],
|
||||
tuf.keydb.create_keydb_from_root_metadata(self.metadata['current']['root'],
|
||||
self.updater_name)
|
||||
tuf.roledb.create_roledb_from_root_metadata(self.metadata['current']['root'],
|
||||
self.updater_name)
|
||||
|
|
@ -538,15 +538,15 @@ def _import_delegations(self, parent_role):
|
|||
# Iterate the keys of the delegated roles of 'parent_role' and load them.
|
||||
for keyid, keyinfo in six.iteritems(keys_info):
|
||||
if keyinfo['keytype'] in ['rsa', 'ed25519']:
|
||||
key, keyids = tuf.ssl_crypto.keys.format_metadata_to_key(keyinfo)
|
||||
key, keyids = securesystemslib.keys.format_metadata_to_key(keyinfo)
|
||||
|
||||
# We specify the keyid to ensure that it's the correct keyid
|
||||
# for the key.
|
||||
try:
|
||||
tuf.ssl_crypto.keydb.add_key(key, keyid, self.updater_name)
|
||||
tuf.keydb.add_key(key, keyid, self.updater_name)
|
||||
for keyid in keyids:
|
||||
key['keyid'] = keyid
|
||||
tuf.ssl_crypto.keydb.add_key(key, keyid=None, repository_name=self.updater_name)
|
||||
tuf.keydb.add_key(key, keyid=None, repository_name=self.updater_name)
|
||||
|
||||
except tuf.ssl_commons.exceptions.KeyAlreadyExistsError:
|
||||
pass
|
||||
|
|
@ -719,7 +719,7 @@ def _update_root_metadata(self, current_root_metadata, compression_algorithm=Non
|
|||
settings.DEFAULT_ROOT_REQUIRED_LENGTH, None,
|
||||
compression_algorithm=compression_algorithm)
|
||||
latest_root_metadata = \
|
||||
tuf.ssl_crypto.util.load_json_string(latest_root_metadata_file.read().decode('utf-8'))
|
||||
securesystemslib.util.load_json_string(latest_root_metadata_file.read().decode('utf-8'))
|
||||
|
||||
|
||||
next_version = current_root_metadata['version'] + 1
|
||||
|
|
@ -751,7 +751,7 @@ def _check_hashes(self, file_object, trusted_hashes):
|
|||
|
||||
<Arguments>
|
||||
file_object:
|
||||
A 'tuf.ssl_crypto.util.TempFile' file-like object. 'file_object' ensures that a
|
||||
A 'securesystemslib.util.TempFile' file-like object. 'file_object' ensures that a
|
||||
read() without a size argument properly reads the entire file.
|
||||
|
||||
trusted_hashes:
|
||||
|
|
@ -763,7 +763,7 @@ def _check_hashes(self, file_object, trusted_hashes):
|
|||
tuf.ssl_commons.exceptions.BadHashError, if the hashes don't match.
|
||||
|
||||
<Side Effects>
|
||||
Hash digest object is created using the 'tuf.ssl_crypto.hash' module.
|
||||
Hash digest object is created using the 'securesystemslib.hash' module.
|
||||
|
||||
<Returns>
|
||||
None.
|
||||
|
|
@ -772,7 +772,7 @@ def _check_hashes(self, file_object, trusted_hashes):
|
|||
# Verify each trusted hash of 'trusted_hashes'. If all are valid, simply
|
||||
# return.
|
||||
for algorithm, trusted_hash in six.iteritems(trusted_hashes):
|
||||
digest_object = tuf.ssl_crypto.hash.digest(algorithm)
|
||||
digest_object = securesystemslib.hash.digest(algorithm)
|
||||
digest_object.update(file_object.read())
|
||||
computed_hash = digest_object.hexdigest()
|
||||
|
||||
|
|
@ -796,7 +796,7 @@ def _hard_check_file_length(self, file_object, trusted_file_length):
|
|||
|
||||
<Arguments>
|
||||
file_object:
|
||||
A 'tuf.ssl_crypto.util.TempFile' file-like object. 'file_object'
|
||||
A 'securesystemslib.util.TempFile' file-like object. 'file_object'
|
||||
ensures that a read() without a size argument properly reads the entire
|
||||
file.
|
||||
|
||||
|
|
@ -814,7 +814,7 @@ def _hard_check_file_length(self, file_object, trusted_file_length):
|
|||
None.
|
||||
"""
|
||||
|
||||
# Read the entire contents of 'file_object', a 'tuf.ssl_crypto.util.TempFile' file-like
|
||||
# Read the entire contents of 'file_object', a 'securesystemslib.util.TempFile' file-like
|
||||
# object that ensures the entire file is read.
|
||||
observed_length = len(file_object.read())
|
||||
|
||||
|
|
@ -837,14 +837,14 @@ def _soft_check_file_length(self, file_object, trusted_file_length):
|
|||
"""
|
||||
<Purpose>
|
||||
Non-public method that checks the trusted file length of a
|
||||
'tuf.ssl_crypto.util.TempFile' file-like object. The length of the file
|
||||
'securesystemslib.util.TempFile' file-like object. The length of the file
|
||||
must be less than or equal to the expected length. This is a deliberately
|
||||
redundant implementation designed to complement
|
||||
tuf.download._check_downloaded_length().
|
||||
|
||||
<Arguments>
|
||||
file_object:
|
||||
A 'tuf.ssl_crypto.util.TempFile' file-like object. 'file_object'
|
||||
A 'securesystemslib.util.TempFile' file-like object. 'file_object'
|
||||
ensures that a read() without a size argument properly reads the entire
|
||||
file.
|
||||
|
||||
|
|
@ -864,7 +864,7 @@ def _soft_check_file_length(self, file_object, trusted_file_length):
|
|||
"""
|
||||
|
||||
# Read the entire contents of 'file_object', a
|
||||
# 'tuf.ssl_crypto.util.TempFile' file-like object that ensures the entire
|
||||
# 'securesystemslib.util.TempFile' file-like object that ensures the entire
|
||||
# file is read.
|
||||
observed_length = len(file_object.read())
|
||||
|
||||
|
|
@ -912,7 +912,7 @@ def _get_target_file(self, target_filepath, file_length, file_hashes):
|
|||
a temporary file and returned.
|
||||
|
||||
<Returns>
|
||||
A 'tuf.ssl_crypto.util.TempFile' file-like object containing the target.
|
||||
A 'securesystemslib.util.TempFile' file-like object containing the target.
|
||||
"""
|
||||
|
||||
# Define a callable function that is passed as an argument to _get_file()
|
||||
|
|
@ -951,7 +951,7 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object,
|
|||
|
||||
<Arguments>
|
||||
metadata_file_object:
|
||||
A 'tuf.ssl_crypto.util.TempFile' instance containing the metadata file.
|
||||
A 'securesystemslib.util.TempFile' instance containing the metadata file.
|
||||
'metadata_file_object' ensures the entire file is returned with read().
|
||||
|
||||
metadata_role:
|
||||
|
|
@ -985,7 +985,7 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object,
|
|||
metadata = metadata_file_object.read().decode('utf-8')
|
||||
|
||||
try:
|
||||
metadata_signable = tuf.ssl_crypto.util.load_json_string(metadata)
|
||||
metadata_signable = securesystemslib.util.load_json_string(metadata)
|
||||
|
||||
except Exception as exception:
|
||||
raise tuf.ssl_commons.exceptions.InvalidMetadataJSONError(exception)
|
||||
|
|
@ -1053,7 +1053,7 @@ def _get_metadata_file(self, metadata_role, remote_filename,
|
|||
file and returned.
|
||||
|
||||
<Returns>
|
||||
A 'tuf.ssl_crypto.util.TempFile' file-like object containing the metadata.
|
||||
A 'securesystemslib.util.TempFile' file-like object containing the metadata.
|
||||
"""
|
||||
|
||||
file_mirrors = tuf.mirrors.get_list_of_mirrors('meta', remote_filename,
|
||||
|
|
@ -1078,7 +1078,7 @@ def _get_metadata_file(self, metadata_role, remote_filename,
|
|||
# 'file_object' is also verified if decompressed above (i.e., the
|
||||
# uncompressed version).
|
||||
metadata_signable = \
|
||||
tuf.ssl_crypto.util.load_json_string(file_object.read().decode('utf-8'))
|
||||
securesystemslib.util.load_json_string(file_object.read().decode('utf-8'))
|
||||
|
||||
# If the version number is unspecified, ensure that the version number
|
||||
# downloaded is greater than the currently trusted version number for
|
||||
|
|
@ -1165,7 +1165,7 @@ def _get_file(self, filepath, verify_file_function, file_type,
|
|||
The relative metadata or target filepath.
|
||||
|
||||
verify_file_function:
|
||||
A callable function that expects a 'tuf.ssl_crypto.util.TempFile'
|
||||
A callable function that expects a 'securesystemslib.util.TempFile'
|
||||
file-like object and raises an exception if the file is invalid.
|
||||
Target files and uncompressed versions of metadata may be verified with
|
||||
'verify_file_function'.
|
||||
|
|
@ -1203,7 +1203,7 @@ def _get_file(self, filepath, verify_file_function, file_type,
|
|||
file and returned.
|
||||
|
||||
<Returns>
|
||||
A 'tuf.ssl_crypto.util.TempFile' file-like object containing the metadata
|
||||
A 'securesystemslib.util.TempFile' file-like object containing the metadata
|
||||
or target.
|
||||
"""
|
||||
|
||||
|
|
@ -1352,7 +1352,7 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
|
|||
current_filepath = os.path.join(self.metadata_directory['current'],
|
||||
metadata_filename)
|
||||
current_filepath = os.path.abspath(current_filepath)
|
||||
tuf.ssl_crypto.util.ensure_parent_dir(current_filepath)
|
||||
securesystemslib.util.ensure_parent_dir(current_filepath)
|
||||
|
||||
previous_filepath = os.path.join(self.metadata_directory['previous'],
|
||||
metadata_filename)
|
||||
|
|
@ -1360,14 +1360,14 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
|
|||
|
||||
if os.path.exists(current_filepath):
|
||||
# Previous metadata might not exist, say when delegations are added.
|
||||
tuf.ssl_crypto.util.ensure_parent_dir(previous_filepath)
|
||||
securesystemslib.util.ensure_parent_dir(previous_filepath)
|
||||
shutil.move(current_filepath, previous_filepath)
|
||||
|
||||
# Next, move the verified updated metadata file to the 'current' directory.
|
||||
# Note that the 'move' method comes from tuf.ssl_crypto.util's TempFile class.
|
||||
# 'metadata_file_object' is an instance of tuf.ssl_crypto.util.TempFile.
|
||||
# Note that the 'move' method comes from securesystemslib.util's TempFile class.
|
||||
# 'metadata_file_object' is an instance of securesystemslib.util.TempFile.
|
||||
metadata_signable = \
|
||||
tuf.ssl_crypto.util.load_json_string(metadata_file_object.read().decode('utf-8'))
|
||||
securesystemslib.util.load_json_string(metadata_file_object.read().decode('utf-8'))
|
||||
|
||||
if compression_algorithm == 'gzip':
|
||||
current_uncompressed_filepath = \
|
||||
|
|
@ -1841,7 +1841,7 @@ def _update_fileinfo(self, metadata_filename):
|
|||
|
||||
# Extract the file information from the actual file and save it
|
||||
# to the fileinfo store.
|
||||
file_length, hashes = tuf.ssl_crypto.util.get_file_details(current_filepath)
|
||||
file_length, hashes = securesystemslib.util.get_file_details(current_filepath)
|
||||
metadata_fileinfo = tuf.formats.make_fileinfo(file_length, hashes)
|
||||
self.fileinfo[metadata_filename] = metadata_fileinfo
|
||||
|
||||
|
|
@ -1886,7 +1886,7 @@ def _move_current_to_previous(self, metadata_role):
|
|||
|
||||
# Move the current path to the previous path.
|
||||
if os.path.exists(current_filepath):
|
||||
tuf.ssl_crypto.util.ensure_parent_dir(previous_filepath)
|
||||
securesystemslib.util.ensure_parent_dir(previous_filepath)
|
||||
os.rename(current_filepath, previous_filepath)
|
||||
|
||||
|
||||
|
|
@ -2546,8 +2546,8 @@ def _visit_child_role(self, child_role, target_filepath, parent_delegations):
|
|||
# Is the child role allowed by its parent role to specify this path
|
||||
# in its metadata?
|
||||
try:
|
||||
tuf.ssl_crypto.util.ensure_all_targets_allowed(child_role_name, [target_filepath],
|
||||
parent_delegations)
|
||||
securesystemslib.util.ensure_all_targets_allowed(child_role_name,
|
||||
[target_filepath], parent_delegations)
|
||||
|
||||
except tuf.ssl_commons.exceptions.ForbiddenTargetError:
|
||||
logger.debug('Child role ' + repr(child_role_name) + ' has target ' + \
|
||||
|
|
@ -2602,7 +2602,7 @@ def _get_target_hash(self, target_filepath, hash_function='sha256'):
|
|||
# Calculate the hash of the filepath to determine which bin to find the
|
||||
# target. The client currently assumes the repository (i.e., repository
|
||||
# tool) uses 'hash_function' to generate hashes and UTF-8.
|
||||
digest_object = tuf.ssl_crypto.hash.digest(hash_function)
|
||||
digest_object = securesystemslib.hash.digest(hash_function)
|
||||
encoded_target_filepath = target_filepath.encode('utf-8')
|
||||
digest_object.update(encoded_target_filepath)
|
||||
target_filepath_hash = digest_object.hexdigest()
|
||||
|
|
@ -2743,8 +2743,8 @@ def updated_targets(self, targets, destination_directory):
|
|||
for algorithm, digest in six.iteritems(target['fileinfo']['hashes']):
|
||||
digest_object = None
|
||||
try:
|
||||
digest_object = tuf.ssl_crypto.hash.digest_filename(target_filepath,
|
||||
algorithm=algorithm)
|
||||
digest_object = securesystemslib.hash.digest_filename(target_filepath,
|
||||
algorithm=algorithm)
|
||||
|
||||
# This exception would occur if the target does not exist locally.
|
||||
except IOError:
|
||||
|
|
|
|||
Loading…
Reference in a new issue