Moved util.py, now it is common

This commit is contained in:
Artiom Baloian 2016-11-02 09:49:37 -04:00
parent b793c81739
commit b0156944bb
27 changed files with 226 additions and 1221 deletions

View file

@ -29,7 +29,7 @@
import stat
from tuf.repository_tool import *
import tuf.util
import tuf.ssl_crypto.util
parser = optparse.OptionParser()
@ -95,11 +95,11 @@
# Create the target files (downloaded by clients) whose file size and digest
# are specified in the 'targets.json' file.
target1_filepath = 'repository/targets/file1.txt'
tuf.util.ensure_parent_dir(target1_filepath)
tuf.ssl_crypto.util.ensure_parent_dir(target1_filepath)
target2_filepath = 'repository/targets/file2.txt'
tuf.util.ensure_parent_dir(target2_filepath)
tuf.ssl_crypto.util.ensure_parent_dir(target2_filepath)
target3_filepath = 'repository/targets/file3.txt'
tuf.util.ensure_parent_dir(target2_filepath)
tuf.ssl_crypto.util.ensure_parent_dir(target2_filepath)
if not options.dry_run:
with open(target1_filepath, 'wt') as file_object:

View file

@ -20,7 +20,7 @@
import optparse
import os
import tuf.util
import tuf.ssl_crypto.util
from tuf.developer_tool import *
parser = optparse.OptionParser()
@ -59,11 +59,11 @@
# Create the target files (downloaded by clients) whose file size and digest
# are specified in the 'targets.json' file.
target1_filepath = 'project/targets/file1.txt'
tuf.util.ensure_parent_dir(target1_filepath)
tuf.ssl_crypto.util.ensure_parent_dir(target1_filepath)
target2_filepath = 'project/targets/file2.txt'
tuf.util.ensure_parent_dir(target2_filepath)
tuf.ssl_crypto.util.ensure_parent_dir(target2_filepath)
target3_filepath = 'project/targets/file3.txt'
tuf.util.ensure_parent_dir(target2_filepath)
tuf.ssl_crypto.util.ensure_parent_dir(target2_filepath)
if not options.dry_run:
with open(target1_filepath, 'wt') as file_object:

View file

@ -52,7 +52,7 @@
import tuf
import tuf.tufformats
import tuf.util
import tuf.ssl_crypto.util
import tuf.roledb
import tuf.keydb
import tuf.log
@ -181,7 +181,7 @@ def test_without_tuf(self):
target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt')
client_target_path = os.path.join(self.client_directory, 'file1.txt')
self.assertFalse(os.path.exists(client_target_path))
length, hashes = tuf.util.get_file_details(target_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(target_path)
fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
@ -189,19 +189,19 @@ def test_without_tuf(self):
six.moves.urllib.request.urlretrieve(url_file, client_target_path)
self.assertTrue(os.path.exists(client_target_path))
length, hashes = tuf.util.get_file_details(client_target_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path)
download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
self.assertEqual(fileinfo, download_fileinfo)
# Test: Download a target file that has been modified by an attacker.
with open(target_path, 'wt') as file_object:
file_object.write('add malicious content.')
length, hashes = tuf.util.get_file_details(target_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(target_path)
malicious_fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
six.moves.urllib.request.urlretrieve(url_file, client_target_path)
length, hashes = tuf.util.get_file_details(client_target_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path)
download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
# Verify 'download_fileinfo' is unequal to the original trusted version.
@ -259,12 +259,12 @@ def test_with_tuf_and_metadata_tampering(self):
# An attacker also tries to add the malicious target's length and digest
# to its metadata file.
length, hashes = tuf.util.get_file_details(target_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(target_path)
metadata_path = \
os.path.join(self.repository_directory, 'metadata', 'targets.json')
metadata = tuf.util.load_json_file(metadata_path)
metadata = tuf.ssl_crypto.util.load_json_file(metadata_path)
metadata['signed']['targets']['/file1.txt']['hashes'] = hashes
metadata['signed']['targets']['/file1.txt']['length'] = length

View file

@ -55,7 +55,7 @@
import tuf
import tuf.tufformats
import tuf.util
import tuf.ssl_crypto.util
import tuf.log
import tuf.client.updater as updater
import tuf.unittest_toolbox as unittest_toolbox
@ -185,7 +185,7 @@ def test_without_tuf(self):
target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt')
client_target_path = os.path.join(self.client_directory, 'file1.txt')
self.assertFalse(os.path.exists(client_target_path))
length, hashes = tuf.util.get_file_details(target_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(target_path)
fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
@ -193,7 +193,7 @@ def test_without_tuf(self):
six.moves.urllib.request.urlretrieve(url_file, client_target_path)
self.assertTrue(os.path.exists(client_target_path))
length, hashes = tuf.util.get_file_details(client_target_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path)
download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
self.assertEqual(fileinfo, download_fileinfo)
@ -202,7 +202,7 @@ def test_without_tuf(self):
with open(target_path, 'r+t') as file_object:
original_content = file_object.read()
file_object.write(original_content+('append large amount of data' * 100000))
large_length, hashes = tuf.util.get_file_details(target_path)
large_length, hashes = tuf.ssl_crypto.util.get_file_details(target_path)
malicious_fileinfo = tuf.tufformats.make_fileinfo(large_length, hashes)
# Is the modified file actually larger?
@ -210,7 +210,7 @@ def test_without_tuf(self):
six.moves.urllib.request.urlretrieve(url_file, client_target_path)
length, hashes = tuf.util.get_file_details(client_target_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path)
download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
# Verify 'download_fileinfo' is unequal to the original trusted version.
@ -234,10 +234,10 @@ def test_with_tuf(self):
# Verify the client's downloaded file matches the repository's.
target_path = os.path.join(self.repository_directory, 'targets', 'file1.txt')
length, hashes = tuf.util.get_file_details(client_target_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path)
fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
length, hashes = tuf.util.get_file_details(client_target_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path)
download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
self.assertEqual(fileinfo, download_fileinfo)
@ -248,7 +248,7 @@ def test_with_tuf(self):
file_object.write(original_content+('append large amount of data' * 10000))
# Is the modified file actually larger?
large_length, hashes = tuf.util.get_file_details(target_path)
large_length, hashes = tuf.ssl_crypto.util.get_file_details(target_path)
self.assertTrue(large_length > length)
os.remove(client_target_path)
@ -257,7 +257,7 @@ def test_with_tuf(self):
# A large amount of data has been appended to the original content. The
# extra data appended should be discarded by the client, so the downloaded
# file size and hash should not have changed.
length, hashes = tuf.util.get_file_details(client_target_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(client_target_path)
download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
self.assertEqual(fileinfo, download_fileinfo)
@ -265,16 +265,16 @@ def test_with_tuf(self):
timestamp_path = os.path.join(self.repository_directory, 'metadata',
'timestamp.json')
original_length, hashes = tuf.util.get_file_details(timestamp_path)
original_length, hashes = tuf.ssl_crypto.util.get_file_details(timestamp_path)
with open(timestamp_path, 'r+') as file_object:
timestamp_content = tuf.util.load_json_file(timestamp_path)
timestamp_content = tuf.ssl_crypto.util.load_json_file(timestamp_path)
large_data = 'LargeTimestamp' * 10000
timestamp_content['signed']['_type'] = large_data
json.dump(timestamp_content, file_object, indent=1, sort_keys=True)
modified_length, hashes = tuf.util.get_file_details(timestamp_path)
modified_length, hashes = tuf.ssl_crypto.util.get_file_details(timestamp_path)
self.assertTrue(modified_length > original_length)
# Does the TUF client download the upper limit of an unsafely fetched

View file

@ -56,7 +56,7 @@
import unittest2 as unittest
import tuf.tufformats
import tuf.util
import tuf.ssl_crypto.util
import tuf.log
import tuf.client.updater as updater
import tuf.roledb
@ -189,9 +189,9 @@ def test_with_tuf(self):
'role1.json')
file1_filepath = os.path.join(self.repository_directory, 'targets',
'file1.txt')
length, hashes = tuf.util.get_file_details(file1_filepath)
length, hashes = tuf.ssl_crypto.util.get_file_details(file1_filepath)
role1_metadata = tuf.util.load_json_file(role1_filepath)
role1_metadata = tuf.ssl_crypto.util.load_json_file(role1_filepath)
role1_metadata['signed']['targets']['/file2.txt'] = {}
role1_metadata['signed']['targets']['/file2.txt']['hashes'] = hashes
role1_metadata['signed']['targets']['/file2.txt']['length'] = length

View file

@ -60,7 +60,7 @@
import tuf.ssl_crypto.formats
import tuf.tufformats
import tuf.util
import tuf.ssl_crypto.util
import tuf.log
import tuf.client.updater as updater
import tuf.repository_tool as repo_tool
@ -211,7 +211,7 @@ def test_without_tuf(self):
timestamp_path = os.path.join(self.repository_directory, 'metadata',
'timestamp.json')
timestamp_metadata = tuf.util.load_json_file(timestamp_path)
timestamp_metadata = tuf.ssl_crypto.util.load_json_file(timestamp_path)
expiry_time = time.time() - 10
expires = tuf.tufformats.unix_timestamp_to_datetime(int(expiry_time))
expires = expires.isoformat() + 'Z'
@ -229,7 +229,7 @@ def test_without_tuf(self):
'timestamp.json')
shutil.copy(timestamp_path, client_timestamp_path)
length, hashes = tuf.util.get_file_details(timestamp_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(timestamp_path)
fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
@ -237,7 +237,7 @@ def test_without_tuf(self):
six.moves.urllib.request.urlretrieve(url_file, client_timestamp_path)
length, hashes = tuf.util.get_file_details(client_timestamp_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(client_timestamp_path)
download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
# Verify 'download_fileinfo' is equal to the current local file.

View file

@ -34,7 +34,7 @@
import json
import tuf
import tuf.util
import tuf.ssl_crypto.util
from simple_settings import settings
import tuf.log
import tuf.interposition.updater as updater

View file

@ -30,6 +30,7 @@
import tuf
import tuf.log
import tuf.pycrypto_keys
import tuf.ssl_crypto.formats
import tuf.tufformats
import tuf.keys

View file

@ -53,7 +53,7 @@
import unittest2 as unittest
import tuf.tufformats
import tuf.util
import tuf.ssl_crypto.util
import tuf.log
import tuf.client.updater as updater
import tuf.repository_tool as repo_tool

View file

@ -54,7 +54,7 @@
import unittest2 as unittest
import tuf.tufformats
import tuf.util
import tuf.ssl_crypto.util
import tuf.log
import tuf.client.updater as updater
import tuf.repository_tool as repo_tool
@ -205,7 +205,7 @@ def test_without_tuf(self):
# The fileinfo of the previous version is saved to verify that it is indeed
# accepted by the non-TUF client.
length, hashes = tuf.util.get_file_details(backup_timestamp)
length, hashes = tuf.ssl_crypto.util.get_file_details(backup_timestamp)
previous_fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
# Modify the timestamp file on the remote repository.
@ -227,7 +227,7 @@ def test_without_tuf(self):
# Save the fileinfo of the new version generated to verify that it is
# saved by the client.
length, hashes = tuf.util.get_file_details(timestamp_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(timestamp_path)
new_fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
@ -237,7 +237,7 @@ def test_without_tuf(self):
six.moves.urllib.request.urlretrieve(url_file, client_timestamp_path)
length, hashes = tuf.util.get_file_details(client_timestamp_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(client_timestamp_path)
download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
# Verify 'download_fileinfo' is equal to the new version.
@ -249,7 +249,7 @@ def test_without_tuf(self):
six.moves.urllib.request.urlretrieve(url_file, client_timestamp_path)
length, hashes = tuf.util.get_file_details(client_timestamp_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(client_timestamp_path)
download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
# Verify 'download_fileinfo' is equal to the previous version.
@ -276,7 +276,7 @@ def test_with_tuf(self):
# The fileinfo of the previous version is saved to verify that it is indeed
# accepted by the non-TUF client.
length, hashes = tuf.util.get_file_details(backup_timestamp)
length, hashes = tuf.ssl_crypto.util.get_file_details(backup_timestamp)
previous_fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
# Modify the timestamp file on the remote repository.
@ -298,7 +298,7 @@ def test_with_tuf(self):
# Save the fileinfo of the new version generated to verify that it is
# saved by the client.
length, hashes = tuf.util.get_file_details(timestamp_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(timestamp_path)
new_fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
# Refresh top-level metadata, including 'timestamp.json'. Installation of
@ -307,7 +307,7 @@ def test_with_tuf(self):
client_timestamp_path = os.path.join(self.client_directory, 'metadata',
'current', 'timestamp.json')
length, hashes = tuf.util.get_file_details(client_timestamp_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(client_timestamp_path)
download_fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
# Verify 'download_fileinfo' is equal to the new version.

View file

@ -425,7 +425,7 @@ def test_generate_root_metadata(self):
# Load the root metadata provided in 'tuf/tests/repository_data/'.
root_filepath = os.path.join('repository_data', 'repository',
'metadata', 'root.json')
root_signable = tuf.util.load_json_file(root_filepath)
root_signable = tuf.ssl_crypto.util.load_json_file(root_filepath)
# generate_root_metadata() expects the top-level roles and keys to be
# available in 'tuf.keydb' and 'tuf.roledb'.
@ -472,7 +472,7 @@ def test_generate_targets_metadata(self):
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
targets_directory = os.path.join(temporary_directory, 'targets')
file1_path = os.path.join(targets_directory, 'file.txt')
tuf.util.ensure_parent_dir(file1_path)
tuf.ssl_crypto.util.ensure_parent_dir(file1_path)
with open(file1_path, 'wt') as file_object:
file_object.write('test file.')
@ -677,9 +677,9 @@ def test_sign_metadata(self):
keystore_path = os.path.join('repository_data',
'keystore')
root_filename = os.path.join(metadata_path, 'root.json')
root_metadata = tuf.util.load_json_file(root_filename)['signed']
root_metadata = tuf.ssl_crypto.util.load_json_file(root_filename)['signed']
targets_filename = os.path.join(metadata_path, 'targets.json')
targets_metadata = tuf.util.load_json_file(targets_filename)['signed']
targets_metadata = tuf.ssl_crypto.util.load_json_file(targets_filename)['signed']
tuf.keydb.create_keydb_from_root_metadata(root_metadata)
tuf.roledb.create_roledb_from_root_metadata(root_metadata)
@ -735,7 +735,7 @@ def test_write_metadata_file(self):
metadata_directory = os.path.join('repository_data',
'repository', 'metadata')
root_filename = os.path.join(metadata_directory, 'root.json')
root_signable = tuf.util.load_json_file(root_filename)
root_signable = tuf.ssl_crypto.util.load_json_file(root_filename)
output_filename = os.path.join(temporary_directory, 'root.json')
compression_algorithms = ['gz']
@ -833,7 +833,7 @@ def test_write_metadata_file(self):
def test__write_compressed_metadata(self):
# Test for invalid 'compressed_filename' argument and set
# 'write_new_metadata' to False.
file_object = tuf.util.TempFile()
file_object = tuf.ssl_crypto.util.TempFile()
existing_filename = os.path.join('repository_data', 'repository',
'metadata', 'root.json')
@ -845,7 +845,7 @@ def test__write_compressed_metadata(self):
version_number=8)
# Test writing of compressed metadata when consistent snapshots is enabled.
file_object = tuf.util.TempFile()
file_object = tuf.ssl_crypto.util.TempFile()
shutil.copy(existing_filename, os.path.join(self.temporary_directory, '8.root.json.gz'))
shutil.copy(existing_filename, os.path.join(self.temporary_directory, '8.root.json.zip'))
shutil.copy(existing_filename, os.path.join(self.temporary_directory, 'root.json.zip'))
@ -922,7 +922,7 @@ def test__generate_and_write_metadata(self):
# Load the root metadata provided in 'tuf/tests/repository_data/'.
root_filepath = os.path.join('repository_data', 'repository',
'metadata', 'root.json')
root_signable = tuf.util.load_json_file(root_filepath)
root_signable = tuf.ssl_crypto.util.load_json_file(root_filepath)
# _generate_and_write_metadata() expects the top-level roles
# (specifically 'snapshot') and keys to be available in 'tuf.roledb'.
@ -937,7 +937,7 @@ def test__generate_and_write_metadata(self):
'targets.json')
obsolete_metadata = os.path.join(metadata_directory,
'obsolete_role.json')
tuf.util.ensure_parent_dir(obsolete_metadata)
tuf.ssl_crypto.util.ensure_parent_dir(obsolete_metadata)
shutil.copyfile(targets_metadata, obsolete_metadata)
# Verify that obsolete metadata (a metadata file exists on disk, but the
@ -959,7 +959,7 @@ def test__generate_and_write_metadata(self):
snapshot_filepath = os.path.join('repository_data', 'repository',
'metadata', 'snapshot.json')
snapshot_signable = tuf.util.load_json_file(snapshot_filepath)
snapshot_signable = tuf.ssl_crypto.util.load_json_file(snapshot_filepath)
tuf.roledb.remove_role('obsolete_role')
self.assertTrue(os.path.exists(os.path.join(metadata_directory,
'obsolete_role.json')))
@ -979,7 +979,7 @@ def test__delete_obsolete_metadata(self):
os.makedirs(metadata_directory)
snapshot_filepath = os.path.join('repository_data', 'repository',
'metadata', 'snapshot.json')
snapshot_signable = tuf.util.load_json_file(snapshot_filepath)
snapshot_signable = tuf.ssl_crypto.util.load_json_file(snapshot_filepath)
# Create role metadata that should not exist in snapshot.json.
role1_filepath = os.path.join('repository_data', 'repository',
@ -1014,7 +1014,7 @@ def test__load_top_level_metadata(self):
# Add a duplicate signature to the Root file for testing purposes).
root_file = os.path.join(metadata_directory, 'root.json')
signable = tuf.util.load_json_file(os.path.join(metadata_directory, 'root.json'))
signable = tuf.ssl_crypto.util.load_json_file(os.path.join(metadata_directory, 'root.json'))
signable['signatures'].append(signable['signatures'][0])
repo_lib.write_metadata_file(signable, root_file, 8, ['gz'], False)
@ -1062,7 +1062,7 @@ def test__remove_invalid_and_duplicate_signatures(self):
# signatures). First load a valid signable (in this case, the root role).
root_filepath = os.path.join('repository_data', 'repository',
'metadata', 'root.json')
root_signable = tuf.util.load_json_file(root_filepath)
root_signable = tuf.ssl_crypto.util.load_json_file(root_filepath)
key_filepath = os.path.join('repository_data', 'keystore', 'root_key')
root_rsa_key = repo_lib.import_rsa_privatekey_from_file(key_filepath,
'password')

View file

@ -227,7 +227,7 @@ def test_writeall(self):
# Verify that the expected metadata is written.
for role in ['root.json', 'targets.json', 'snapshot.json', 'timestamp.json']:
role_filepath = os.path.join(metadata_directory, role)
role_signable = tuf.util.load_json_file(role_filepath)
role_signable = tuf.ssl_crypto.util.load_json_file(role_filepath)
# Raise 'tuf.ssl_commons.exceptions.FormatError' if 'role_signable' is an invalid signable.
tuf.tufformats.check_signable_object_format(role_signable)
@ -240,7 +240,7 @@ def test_writeall(self):
# Verify the 'role1.json' delegation is also written.
role1_filepath = os.path.join(metadata_directory, 'role1.json')
role1_signable = tuf.util.load_json_file(role1_filepath)
role1_signable = tuf.ssl_crypto.util.load_json_file(role1_filepath)
tuf.tufformats.check_signable_object_format(role1_signable)
# Verify that an exception is *not* raised for multiple
@ -709,7 +709,7 @@ def test_add_signature(self):
metadata_directory = os.path.join('repository_data',
'repository', 'metadata')
root_filepath = os.path.join(metadata_directory, 'root.json')
root_signable = tuf.util.load_json_file(root_filepath)
root_signable = tuf.ssl_crypto.util.load_json_file(root_filepath)
signatures = root_signable['signatures']
# Add the first signature from the list, as only one is needed.
@ -737,7 +737,7 @@ def test_remove_signature(self):
metadata_directory = os.path.join('repository_data',
'repository', 'metadata')
root_filepath = os.path.join(metadata_directory, 'root.json')
root_signable = tuf.util.load_json_file(root_filepath)
root_signable = tuf.ssl_crypto.util.load_json_file(root_filepath)
signatures = root_signable['signatures']
self.metadata.add_signature(signatures[0])
@ -752,7 +752,7 @@ def test_remove_signature(self):
# Test invalid signature argument (i.e., signature that has not been added.)
# Load an unused signature to be tested.
targets_filepath = os.path.join(metadata_directory, 'targets.json')
targets_signable = tuf.util.load_json_file(targets_filepath)
targets_signable = tuf.ssl_crypto.util.load_json_file(targets_filepath)
signatures = targets_signable['signatures']
self.assertRaises(tuf.ssl_commons.exceptions.Error, self.metadata.remove_signature, signatures[0])
@ -768,7 +768,7 @@ def test_signatures(self):
metadata_directory = os.path.join('repository_data',
'repository', 'metadata')
root_filepath = os.path.join(metadata_directory, 'root.json')
root_signable = tuf.util.load_json_file(root_filepath)
root_signable = tuf.ssl_crypto.util.load_json_file(root_filepath)
signatures = root_signable['signatures']
# Add the first signature from the list, as only need one is needed.

View file

@ -188,8 +188,8 @@ def test_root_role_versioning(self):
root_filepath = os.path.join(metadata_directory, 'root.json')
root_1_filepath = os.path.join(metadata_directory, '1.root.json')
root_2_filepath = os.path.join(metadata_directory, '2.root.json')
old_root_signable = tuf.util.load_json_file(root_filepath)
root_1_signable = tuf.util.load_json_file(root_1_filepath)
old_root_signable = tuf.ssl_crypto.util.load_json_file(root_filepath)
root_1_signable = tuf.ssl_crypto.util.load_json_file(root_1_filepath)
# Make a change to the root keys
repository.root.add_verification_key(targets_pubkey)
@ -197,8 +197,8 @@ def test_root_role_versioning(self):
repository.root.threshold = 2
repository.writeall()
new_root_signable = tuf.util.load_json_file(root_filepath)
root_2_signable = tuf.util.load_json_file(root_2_filepath)
new_root_signable = tuf.ssl_crypto.util.load_json_file(root_filepath)
root_2_signable = tuf.ssl_crypto.util.load_json_file(root_2_filepath)
for role_signable in [old_root_signable, new_root_signable, root_1_signable, root_2_signable]:
# Raise 'tuf.ssl_commons.exceptions.FormatError' if 'role_signable' is an invalid signable.

View file

@ -57,7 +57,7 @@
import unittest2 as unittest
import tuf.tufformats
import tuf.util
import tuf.ssl_crypto.util
import tuf.log
import tuf.client.updater as updater
import tuf.unittest_toolbox as unittest_toolbox

View file

@ -64,7 +64,8 @@
import unittest2 as unittest
import tuf
import tuf.util
import tuf.ssl_commons.exceptions
import tuf.ssl_crypto.util
from simple_settings import settings
import tuf.log
import tuf.tufformats
@ -287,7 +288,7 @@ def test_1__load_metadata_from_file(self):
# compare it against the loaded metadata by '_load_metadata_from_file()'.
role1_filepath = \
os.path.join(self.client_metadata_current, 'role1.json')
role1_meta = tuf.util.load_json_file(role1_filepath)
role1_meta = tuf.ssl_crypto.util.load_json_file(role1_filepath)
# Load the 'role1.json' file with _load_metadata_from_file, which should
# store the loaded metadata in the 'self.repository_updater.metadata'
@ -376,7 +377,7 @@ def test_1__update_versioninfo(self):
# on the repository. Load Snapshot to extract Root's version number
# and compare it against the one loaded by 'self.repository_updater'.
snapshot_filepath = os.path.join(self.client_metadata_current, 'snapshot.json')
snapshot_signable = tuf.util.load_json_file(snapshot_filepath)
snapshot_signable = tuf.ssl_crypto.util.load_json_file(snapshot_filepath)
targets_versioninfo = snapshot_signable['signed']['meta']['targets.json']
# Verify that the manually loaded version number of root.json matches
@ -410,7 +411,7 @@ def test_1__update_fileinfo(self):
self.assertEqual(len(fileinfo_dict), 1)
self.assertTrue(tuf.ssl_crypto.formats.FILEDICT_SCHEMA.matches(fileinfo_dict))
root_filepath = os.path.join(self.client_metadata_current, 'root.json')
length, hashes = tuf.util.get_file_details(root_filepath)
length, hashes = tuf.ssl_crypto.util.get_file_details(root_filepath)
root_fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
self.assertTrue('root.json' in fileinfo_dict)
self.assertEqual(fileinfo_dict['root.json'], root_fileinfo)
@ -431,7 +432,7 @@ def test_1__update_fileinfo(self):
def test_2__fileinfo_has_changed(self):
# Verify that the method returns 'False' if file info was not changed.
root_filepath = os.path.join(self.client_metadata_current, 'root.json')
length, hashes = tuf.util.get_file_details(root_filepath)
length, hashes = tuf.ssl_crypto.util.get_file_details(root_filepath)
root_fileinfo = tuf.tufformats.make_fileinfo(length, hashes)
self.assertFalse(self.repository_updater._fileinfo_has_changed('root.json',
root_fileinfo))
@ -496,7 +497,7 @@ def test_2__import_delegations(self):
# Verify that keydb dictionary was updated.
role1_signable = \
tuf.util.load_json_file(os.path.join(self.client_metadata_current,
tuf.ssl_crypto.util.load_json_file(os.path.join(self.client_metadata_current,
'role1.json'))
keyids = []
for signature in role1_signable['signatures']:
@ -541,7 +542,7 @@ def test_2__import_delegations(self):
def test_2__versioninfo_has_been_updated(self):
# Verify that the method returns 'False' if a versioninfo was not changed.
snapshot_filepath = os.path.join(self.client_metadata_current, 'snapshot.json')
snapshot_signable = tuf.util.load_json_file(snapshot_filepath)
snapshot_signable = tuf.ssl_crypto.util.load_json_file(snapshot_filepath)
targets_versioninfo = snapshot_signable['signed']['meta']['targets.json']
self.assertFalse(self.repository_updater._versioninfo_has_been_updated('targets.json',
@ -660,7 +661,7 @@ def test_3__update_metadata(self):
targets_versioninfo['version'])
self.assertTrue('targets' in self.repository_updater.metadata['current'])
targets_signable = tuf.util.load_json_file(targets_filepath)
targets_signable = tuf.ssl_crypto.util.load_json_file(targets_filepath)
loaded_targets_version = targets_signable['signed']['version']
self.assertEqual(targets_versioninfo['version'], loaded_targets_version)
@ -929,7 +930,7 @@ def test_5_targets_of_role(self):
# as available on the server-side version of the role.
role1_filepath = os.path.join(self.repository_directory, 'metadata',
'role1.json')
role1_signable = tuf.util.load_json_file(role1_filepath)
role1_signable = tuf.ssl_crypto.util.load_json_file(role1_filepath)
expected_targets = role1_signable['signed']['targets']
@ -1079,7 +1080,7 @@ def test_6_download_target(self):
download_filepath = \
os.path.join(destination_directory, target_filepath1.lstrip('/'))
self.assertTrue(os.path.exists(download_filepath))
length, hashes = tuf.util.get_file_details(download_filepath, settings.REPOSITORY_HASH_ALGORITHMS)
length, hashes = tuf.ssl_crypto.util.get_file_details(download_filepath, settings.REPOSITORY_HASH_ALGORITHMS)
download_targetfileinfo = tuf.tufformats.make_fileinfo(length, hashes)
# Add any 'custom' data from the repository's target fileinfo to the
@ -1218,7 +1219,7 @@ def test_7_updated_targets(self):
repository.targets.remove_target(target1)
length, hashes = tuf.util.get_file_details(target1)
length, hashes = tuf.ssl_crypto.util.get_file_details(target1)
repository.targets.add_target(target1)
repository.targets.load_signing_key(self.role_keys['targets']['private'])
@ -1227,7 +1228,7 @@ def test_7_updated_targets(self):
with open(target1, 'a') as file_object:
file_object.write('append extra text')
length, hashes = tuf.util.get_file_details(target1)
length, hashes = tuf.ssl_crypto.util.get_file_details(target1)
repository.targets.add_target(target1)
repository.targets.load_signing_key(self.role_keys['targets']['private'])
@ -1335,7 +1336,7 @@ def test_9__get_target_hash(self):
def test_10__hard_check_file_length(self):
# Test for exception if file object is not equal to trusted file length.
temp_file_object = tuf.util.TempFile()
temp_file_object = tuf.ssl_crypto.util.TempFile()
temp_file_object.write(b'X')
temp_file_object.seek(0)
self.assertRaises(tuf.ssl_commons.exceptions.DownloadLengthMismatchError,
@ -1348,7 +1349,7 @@ def test_10__hard_check_file_length(self):
def test_10__soft_check_file_length(self):
# Test for exception if file object is not equal to trusted file length.
temp_file_object = tuf.util.TempFile()
temp_file_object = tuf.ssl_crypto.util.TempFile()
temp_file_object.write(b'XXX')
temp_file_object.seek(0)
self.assertRaises(tuf.ssl_commons.exceptions.DownloadLengthMismatchError,

View file

@ -56,7 +56,7 @@
import unittest2 as unittest
import tuf
import tuf.util
import tuf.ssl_crypto.util
from simple_settings import settings
import tuf.log
import tuf.tufformats

View file

@ -37,7 +37,7 @@
import tuf
import tuf.log
import tuf.ssl_crypto.hash
import tuf.util
import tuf.ssl_crypto.util
from simple_settings import settings
import tuf.unittest_toolbox as unittest_toolbox
@ -50,7 +50,7 @@ class TestUtil(unittest_toolbox.Modified_TestCase):
def setUp(self):
unittest_toolbox.Modified_TestCase.setUp(self)
self.temp_fileobj = tuf.util.TempFile()
self.temp_fileobj = tuf.ssl_crypto.util.TempFile()
@ -86,10 +86,10 @@ def _extract_tempfile_directory(self, config_temp_dir=None):
# Patching 'tempfile.TemporaryFile()' (by substituting
# temfile.TemporaryFile() with tempfile.mkstemp()) in order to get the
# directory of the stored tempfile object.
saved_tempfile_TemporaryFile = tuf.util.tempfile.NamedTemporaryFile
tuf.util.tempfile.NamedTemporaryFile = tempfile.mkstemp
_temp_fileobj = tuf.util.TempFile()
tuf.util.tempfile.NamedTemporaryFile = saved_tempfile_TemporaryFile
saved_tempfile_TemporaryFile = tuf.ssl_crypto.util.tempfile.NamedTemporaryFile
tuf.ssl_crypto.util.tempfile.NamedTemporaryFile = tempfile.mkstemp
_temp_fileobj = tuf.ssl_crypto.util.TempFile()
tuf.ssl_crypto.util.tempfile.NamedTemporaryFile = saved_tempfile_TemporaryFile
junk, _tempfilepath = _temp_fileobj.temporary_file
_tempfile_dir = os.path.dirname(_tempfilepath)
@ -109,14 +109,14 @@ def test_A2_tempfile_init(self):
# Test: Expected input verification.
# Assumed 'settings.temporary_directory' is 'None' initially.
temp_file = tuf.util.TempFile()
temp_file = tuf.ssl_crypto.util.TempFile()
temp_file_directory = os.path.dirname(temp_file.temporary_file.name)
self.assertEqual(tempfile.gettempdir(), temp_file_directory)
saved_temporary_directory = settings.temporary_directory
temp_directory = self.make_temp_directory()
settings.temporary_directory = temp_directory
temp_file = tuf.util.TempFile()
temp_file = tuf.ssl_crypto.util.TempFile()
temp_file_directory = os.path.dirname(temp_file.temporary_file.name)
self.assertEqual(temp_directory, temp_file_directory)
@ -243,7 +243,7 @@ def test_A6_tempfile_decompress_temp_file_object(self):
self.temp_fileobj.decompress_temp_file_object, 'gzip')
# Test decompression of invalid gzip file.
temp_file = tuf.util.TempFile()
temp_file = tuf.ssl_crypto.util.TempFile()
temp_file.write(b'bad zip')
contents = temp_file.read()
self.assertRaises(tuf.ssl_commons.exceptions.DecompressionError,
@ -263,7 +263,7 @@ def test_B1_get_file_details(self):
file_length = os.path.getsize(filepath)
# Test: Expected input.
self.assertEqual(tuf.util.get_file_details(filepath), (file_length, file_hash))
self.assertEqual(tuf.ssl_crypto.util.get_file_details(filepath), (file_length, file_hash))
# Test: Incorrect input.
bogus_inputs = [self.random_string(), 1234, [self.random_string()],
@ -271,9 +271,9 @@ def test_B1_get_file_details(self):
for bogus_input in bogus_inputs:
if isinstance(bogus_input, six.string_types):
self.assertRaises(tuf.ssl_commons.exceptions.Error, tuf.util.get_file_details, bogus_input)
self.assertRaises(tuf.ssl_commons.exceptions.Error, tuf.ssl_crypto.util.get_file_details, bogus_input)
else:
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.get_file_details, bogus_input)
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.get_file_details, bogus_input)
@ -283,10 +283,10 @@ def test_B2_ensure_parent_dir(self):
for parent_dir in [existing_parent_dir, non_existing_parent_dir, 12, [3]]:
if isinstance(parent_dir, six.string_types):
tuf.util.ensure_parent_dir(os.path.join(parent_dir, 'a.txt'))
tuf.ssl_crypto.util.ensure_parent_dir(os.path.join(parent_dir, 'a.txt'))
self.assertTrue(os.path.isdir(parent_dir))
else:
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.ensure_parent_dir, parent_dir)
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.ensure_parent_dir, parent_dir)
@ -294,7 +294,7 @@ def test_B3_file_in_confined_directories(self):
# Goal: Provide invalid input for 'filepath' and 'confined_directories'.
# Include inputs like: '[1, 2, "a"]' and such...
# Reference to 'file_in_confined_directories()' to improve readability.
in_confined_directory = tuf.util.file_in_confined_directories
in_confined_directory = tuf.ssl_crypto.util.file_in_confined_directories
list_of_confined_directories = ['a', 12, {'a':'a'}, [1]]
list_of_filepaths = [12, ['a'], {'a':'a'}, 'a']
for bogus_confined_directory in list_of_confined_directories:
@ -323,25 +323,25 @@ def test_B3_file_in_confined_directories(self):
def test_B4_import_json(self):
self.assertTrue('json' in sys.modules)
json_module = tuf.util.import_json()
json_module = tuf.ssl_crypto.util.import_json()
self.assertTrue(json_module is not None)
# Test import_json() when 'util._json_moduel' is non-None.
tuf.util._json_module = 'junk_module'
self.assertEqual(tuf.util.import_json(), 'junk_module')
tuf.ssl_crypto.util._json_module = 'junk_module'
self.assertEqual(tuf.ssl_crypto.util.import_json(), 'junk_module')
def test_B5_load_json_string(self):
# Test normal case.
data = ['a', {'b': ['c', None, 30.3, 29]}]
json_string = tuf.util.json.dumps(data)
self.assertEqual(data, tuf.util.load_json_string(json_string))
json_string = tuf.ssl_crypto.util.json.dumps(data)
self.assertEqual(data, tuf.ssl_crypto.util.load_json_string(json_string))
# Test invalid arguments.
self.assertRaises(tuf.ssl_commons.exceptions.Error, tuf.util.load_json_string, 8)
self.assertRaises(tuf.ssl_commons.exceptions.Error, tuf.ssl_crypto.util.load_json_string, 8)
invalid_json_string = {'a': tuf.ssl_commons.exceptions.FormatError}
self.assertRaises(tuf.ssl_commons.exceptions.Error, tuf.util.load_json_string, invalid_json_string)
self.assertRaises(tuf.ssl_commons.exceptions.Error, tuf.ssl_crypto.util.load_json_string, invalid_json_string)
@ -349,25 +349,25 @@ def test_B6_load_json_file(self):
data = ['a', {'b': ['c', None, 30.3, 29]}]
filepath = self.make_temp_file()
fileobj = open(filepath, 'wt')
tuf.util.json.dump(data, fileobj)
tuf.ssl_crypto.util.json.dump(data, fileobj)
fileobj.close()
self.assertEqual(data, tuf.util.load_json_file(filepath))
self.assertEqual(data, tuf.ssl_crypto.util.load_json_file(filepath))
# Test a gzipped file.
compressed_filepath = self._compress_existing_file(filepath)
self.assertEqual(data, tuf.util.load_json_file(compressed_filepath))
self.assertEqual(data, tuf.ssl_crypto.util.load_json_file(compressed_filepath))
# Improperly formatted arguments.
for bogus_arg in [1, [b'a'], {'a':b'b'}]:
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.load_json_file, bogus_arg)
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.load_json_file, bogus_arg)
# Non-existent path.
self.assertRaises(IOError, tuf.util.load_json_file, 'non-existent.json')
self.assertRaises(IOError, tuf.ssl_crypto.util.load_json_file, 'non-existent.json')
# Invalid JSON content.
with open(filepath, 'a') as filepath:
filepath.write('junk data')
self.assertRaises(tuf.ssl_commons.exceptions.Error, tuf.util.load_json_file, filepath)
self.assertRaises(tuf.ssl_commons.exceptions.Error, tuf.ssl_crypto.util.load_json_file, filepath)
@ -381,10 +381,10 @@ def test_C1_get_target_hash(self):
for filepath, target_hash in six.iteritems(expected_target_hashes):
self.assertTrue(tuf.ssl_crypto.formats.RELPATH_SCHEMA.matches(filepath))
self.assertTrue(tuf.ssl_crypto.formats.HASH_SCHEMA.matches(target_hash))
self.assertEqual(tuf.util.get_target_hash(filepath), target_hash)
self.assertEqual(tuf.ssl_crypto.util.get_target_hash(filepath), target_hash)
# Test for improperly formatted argument.
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.get_target_hash, 8)
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.get_target_hash, 8)
@ -415,21 +415,21 @@ def test_C2_find_delegated_role(self):
]
self.assertTrue(tuf.ssl_crypto.formats.ROLELIST_SCHEMA.matches(role_list))
self.assertEqual(tuf.util.find_delegated_role(role_list, 'targets/tuf'), 1)
self.assertEqual(tuf.util.find_delegated_role(role_list, 'targets/warehouse'), 0)
self.assertEqual(tuf.ssl_crypto.util.find_delegated_role(role_list, 'targets/tuf'), 1)
self.assertEqual(tuf.ssl_crypto.util.find_delegated_role(role_list, 'targets/warehouse'), 0)
# Test for non-existent role. 'find_delegated_role()' returns 'None'
# if the role is not found.
self.assertEqual(tuf.util.find_delegated_role(role_list, 'targets/non-existent'),
self.assertEqual(tuf.ssl_crypto.util.find_delegated_role(role_list, 'targets/non-existent'),
None)
# Test improperly formatted arguments.
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.find_delegated_role, 8, role_list)
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.find_delegated_role, 8, 'targets/tuf')
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.find_delegated_role, 8, role_list)
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.find_delegated_role, 8, 'targets/tuf')
# Test duplicate roles.
role_list.append(role_list[1])
self.assertRaises(tuf.ssl_commons.exceptions.RepositoryError, tuf.util.find_delegated_role, role_list,
self.assertRaises(tuf.ssl_commons.exceptions.RepositoryError, tuf.ssl_crypto.util.find_delegated_role, role_list,
'targets/tuf')
# Test missing 'name' attribute (optional, but required by
@ -437,7 +437,7 @@ def test_C2_find_delegated_role(self):
# Delete the duplicate role, and the remaining role's 'name' attribute.
del role_list[2]
del role_list[0]['name']
self.assertRaises(tuf.ssl_commons.exceptions.RepositoryError, tuf.util.find_delegated_role, role_list,
self.assertRaises(tuf.ssl_commons.exceptions.RepositoryError, tuf.ssl_crypto.util.find_delegated_role, role_list,
'targets/warehouse')
@ -450,38 +450,38 @@ def test_C3_paths_are_consistent_with_hash_prefixes(self):
# Ensure the paths of 'list_of_targets' each have the expected path hash
# prefix listed in 'path_hash_prefixes'.
for filepath in list_of_targets:
self.assertTrue(tuf.util.get_target_hash(filepath)[0:4] in path_hash_prefixes)
self.assertTrue(tuf.ssl_crypto.util.get_target_hash(filepath)[0:4] in path_hash_prefixes)
self.assertTrue(tuf.util.paths_are_consistent_with_hash_prefixes(list_of_targets,
self.assertTrue(tuf.ssl_crypto.util.paths_are_consistent_with_hash_prefixes(list_of_targets,
path_hash_prefixes))
extra_invalid_prefix = ['e3a3', '8fae', 'd543', '0000']
self.assertTrue(tuf.util.paths_are_consistent_with_hash_prefixes(list_of_targets,
self.assertTrue(tuf.ssl_crypto.util.paths_are_consistent_with_hash_prefixes(list_of_targets,
extra_invalid_prefix))
# Test improperly formatted arguments.
self.assertRaises(tuf.ssl_commons.exceptions.FormatError,
tuf.util.paths_are_consistent_with_hash_prefixes, 8,
tuf.ssl_crypto.util.paths_are_consistent_with_hash_prefixes, 8,
path_hash_prefixes)
self.assertRaises(tuf.ssl_commons.exceptions.FormatError,
tuf.util.paths_are_consistent_with_hash_prefixes,
tuf.ssl_crypto.util.paths_are_consistent_with_hash_prefixes,
list_of_targets, 8)
self.assertRaises(tuf.ssl_commons.exceptions.FormatError,
tuf.util.paths_are_consistent_with_hash_prefixes,
tuf.ssl_crypto.util.paths_are_consistent_with_hash_prefixes,
list_of_targets, ['zza1'])
# Test invalid list of targets.
bad_target_path = '/file5.txt'
self.assertTrue(tuf.util.get_target_hash(bad_target_path)[0:4] not in
self.assertTrue(tuf.ssl_crypto.util.get_target_hash(bad_target_path)[0:4] not in
path_hash_prefixes)
self.assertFalse(tuf.util.paths_are_consistent_with_hash_prefixes([bad_target_path],
self.assertFalse(tuf.ssl_crypto.util.paths_are_consistent_with_hash_prefixes([bad_target_path],
path_hash_prefixes))
# Add invalid target path to 'list_of_targets'.
list_of_targets.append(bad_target_path)
self.assertFalse(tuf.util.paths_are_consistent_with_hash_prefixes(list_of_targets,
self.assertFalse(tuf.ssl_crypto.util.paths_are_consistent_with_hash_prefixes(list_of_targets,
path_hash_prefixes))
@ -515,42 +515,42 @@ def test_C4_ensure_all_targets_allowed(self):
}
self.assertTrue(tuf.ssl_crypto.formats.DELEGATIONS_SCHEMA.matches(parent_delegations))
tuf.util.ensure_all_targets_allowed(rolename, list_of_targets,
tuf.ssl_crypto.util.ensure_all_targets_allowed(rolename, list_of_targets,
parent_delegations)
# The target files of 'targets' are always allowed. 'list_of_targets' and
# 'parent_delegations' are not checked in this case.
tuf.util.ensure_all_targets_allowed('targets', list_of_targets,
tuf.ssl_crypto.util.ensure_all_targets_allowed('targets', list_of_targets,
parent_delegations)
# Test improperly formatted arguments.
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.ensure_all_targets_allowed,
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.ensure_all_targets_allowed,
8, list_of_targets, parent_delegations)
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.ensure_all_targets_allowed,
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.ensure_all_targets_allowed,
rolename, 8, parent_delegations)
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.ensure_all_targets_allowed,
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.ensure_all_targets_allowed,
rolename, list_of_targets, 8)
# Test for invalid 'rolename', which has not been delegated by its parent,
# 'targets'.
self.assertRaises(tuf.ssl_commons.exceptions.RepositoryError, tuf.util.ensure_all_targets_allowed,
self.assertRaises(tuf.ssl_commons.exceptions.RepositoryError, tuf.ssl_crypto.util.ensure_all_targets_allowed,
'targets/non-delegated_rolename', list_of_targets,
parent_delegations)
# Test for target file that is not allowed by the parent role.
self.assertRaises(tuf.ssl_commons.exceptions.ForbiddenTargetError, tuf.util.ensure_all_targets_allowed,
self.assertRaises(tuf.ssl_commons.exceptions.ForbiddenTargetError, tuf.ssl_crypto.util.ensure_all_targets_allowed,
'targets/warehouse', ['file1.zip'], parent_delegations)
self.assertRaises(tuf.ssl_commons.exceptions.ForbiddenTargetError, tuf.util.ensure_all_targets_allowed,
self.assertRaises(tuf.ssl_commons.exceptions.ForbiddenTargetError, tuf.ssl_crypto.util.ensure_all_targets_allowed,
'targets/warehouse', ['file1.txt', 'bad-README.txt'],
parent_delegations)
# Test for required attributes.
# Missing 'paths' attribute.
del parent_delegations['roles'][0]['paths']
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.ensure_all_targets_allowed,
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.ensure_all_targets_allowed,
'targets/warehouse', list_of_targets, parent_delegations)
# Test 'path_hash_prefixes' attribute.
@ -558,15 +558,15 @@ def test_C4_ensure_all_targets_allowed(self):
parent_delegations['roles'][0]['path_hash_prefixes'] = path_hash_prefixes
# Test normal case for 'path_hash_prefixes'.
tuf.util.ensure_all_targets_allowed('targets/warehouse', list_of_targets,
tuf.ssl_crypto.util.ensure_all_targets_allowed('targets/warehouse', list_of_targets,
parent_delegations)
# Test target file with a path_hash_prefix that is not allowed in its
# parent role.
path_hash_prefix = tuf.util.get_target_hash('file5.txt')[0:4]
path_hash_prefix = tuf.ssl_crypto.util.get_target_hash('file5.txt')[0:4]
self.assertTrue(path_hash_prefix not in parent_delegations['roles'][0]
['path_hash_prefixes'])
self.assertRaises(tuf.ssl_commons.exceptions.ForbiddenTargetError, tuf.util.ensure_all_targets_allowed,
self.assertRaises(tuf.ssl_commons.exceptions.ForbiddenTargetError, tuf.ssl_crypto.util.ensure_all_targets_allowed,
'targets/warehouse', ['file5.txt'], parent_delegations)
@ -584,7 +584,7 @@ def test_c6_get_compressed_length(self):
self.temp_fileobj.write(b'hello world')
self.assertTrue(self.temp_fileobj.get_compressed_length() == 11)
temp_file = tuf.util.TempFile()
temp_file = tuf.ssl_crypto.util.TempFile()
@ -592,25 +592,25 @@ def test_digests_are_equal(self):
digest = 'e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855'
# Normal case: test for digests that are equal.
self.assertTrue(tuf.util.digests_are_equal(digest, digest))
self.assertTrue(tuf.ssl_crypto.util.digests_are_equal(digest, digest))
# Normal case: test for digests that are unequal.
self.assertFalse(tuf.util.digests_are_equal(digest, '0a8df1'))
self.assertFalse(tuf.ssl_crypto.util.digests_are_equal(digest, '0a8df1'))
# Test for invalid arguments.
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.digests_are_equal, 7,
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.digests_are_equal, 7,
digest)
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.util.digests_are_equal, digest,
self.assertRaises(tuf.ssl_commons.exceptions.FormatError, tuf.ssl_crypto.util.digests_are_equal, digest,
7)
# Test that digests_are_equal() takes the same amount of time to compare
# equal and unequal arguments.
runtime = timeit.timeit('digests_are_equal("ab8df", "ab8df")',
setup='from tuf.util import digests_are_equal',
setup='from tuf.ssl_crypto.util import digests_are_equal',
number=100000)
runtime2 = timeit.timeit('digests_are_equal("ab8df", "1b8df")',
setup='from tuf.util import digests_are_equal',
setup='from tuf.ssl_crypto.util import digests_are_equal',
number=100000)
runtime3 = timeit.timeit('"ab8df" == "ab8df"', number=100000)

View file

@ -126,7 +126,7 @@
import tuf.mirrors
import tuf.roledb
import tuf.sig
import tuf.util
import tuf.ssl_crypto.util
import six
import iso8601
@ -423,7 +423,7 @@ def _load_metadata_from_file(self, metadata_set, metadata_role):
# Load the file. The loaded object should conform to
# 'tuf.ssl_crypto.formats.SIGNABLE_SCHEMA'.
try:
metadata_signable = tuf.util.load_json_file(metadata_filepath)
metadata_signable = tuf.ssl_crypto.util.load_json_file(metadata_filepath)
# Although the metadata file may exist locally, it may not
# be a valid json file. On the next refresh cycle, it will be
@ -718,7 +718,7 @@ def _update_root_metadata(self, current_root_metadata, compression_algorithm=Non
settings.DEFAULT_ROOT_REQUIRED_LENGTH, None,
compression_algorithm=compression_algorithm)
latest_root_metadata = \
tuf.util.load_json_string(latest_root_metadata_file.read().decode('utf-8'))
tuf.ssl_crypto.util.load_json_string(latest_root_metadata_file.read().decode('utf-8'))
next_version = current_root_metadata['version'] + 1
@ -750,7 +750,7 @@ def _check_hashes(self, file_object, trusted_hashes):
<Arguments>
file_object:
A 'tuf.util.TempFile' file-like object. 'file_object' ensures that a
A 'tuf.ssl_crypto.util.TempFile' file-like object. 'file_object' ensures that a
read() without a size argument properly reads the entire file.
trusted_hashes:
@ -795,7 +795,7 @@ def _hard_check_file_length(self, file_object, trusted_file_length):
<Arguments>
file_object:
A 'tuf.util.TempFile' file-like object. 'file_object' ensures that a
A 'tuf.ssl_crypto.util.TempFile' file-like object. 'file_object' ensures that a
read() without a size argument properly reads the entire file.
trusted_file_length:
@ -812,7 +812,7 @@ def _hard_check_file_length(self, file_object, trusted_file_length):
None.
"""
# Read the entire contents of 'file_object', a 'tuf.util.TempFile' file-like
# Read the entire contents of 'file_object', a 'tuf.ssl_crypto.util.TempFile' file-like
# object that ensures the entire file is read.
observed_length = len(file_object.read())
@ -835,14 +835,14 @@ def _soft_check_file_length(self, file_object, trusted_file_length):
"""
<Purpose>
Non-public method that checks the trusted file length of a
'tuf.util.TempFile' file-like object. The length of the file must be less
'tuf.ssl_crypto.util.TempFile' file-like object. The length of the file must be less
than or equal to the expected length. This is a deliberately redundant
implementation designed to complement
tuf.download._check_downloaded_length().
<Arguments>
file_object:
A 'tuf.util.TempFile' file-like object. 'file_object' ensures that a
A 'tuf.ssl_crypto.util.TempFile' file-like object. 'file_object' ensures that a
read() without a size argument properly reads the entire file.
trusted_file_length:
@ -859,7 +859,7 @@ def _soft_check_file_length(self, file_object, trusted_file_length):
None.
"""
# Read the entire contents of 'file_object', a 'tuf.util.TempFile' file-like
# Read the entire contents of 'file_object', a 'tuf.ssl_crypto.util.TempFile' file-like
# object that ensures the entire file is read.
observed_length = len(file_object.read())
@ -907,7 +907,7 @@ def _get_target_file(self, target_filepath, file_length, file_hashes):
a temporary file and returned.
<Returns>
A 'tuf.util.TempFile' file-like object containing the target.
A 'tuf.ssl_crypto.util.TempFile' file-like object containing the target.
"""
# Define a callable function that is passed as an argument to _get_file()
@ -946,7 +946,7 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object,
<Arguments>
metadata_file_object:
A 'tuf.util.TempFile' instance containing the metadata file.
A 'tuf.ssl_crypto.util.TempFile' instance containing the metadata file.
'metadata_file_object' ensures the entire file is returned with read().
metadata_role:
@ -980,7 +980,7 @@ def _verify_uncompressed_metadata_file(self, metadata_file_object,
metadata = metadata_file_object.read().decode('utf-8')
try:
metadata_signable = tuf.util.load_json_string(metadata)
metadata_signable = tuf.ssl_crypto.util.load_json_string(metadata)
except Exception as exception:
raise tuf.ssl_commons.exceptions.InvalidMetadataJSONError(exception)
@ -1048,7 +1048,7 @@ def _get_metadata_file(self, metadata_role, remote_filename,
file and returned.
<Returns>
A 'tuf.util.TempFile' file-like object containing the metadata.
A 'tuf.ssl_crypto.util.TempFile' file-like object containing the metadata.
"""
file_mirrors = tuf.mirrors.get_list_of_mirrors('meta', remote_filename,
@ -1073,7 +1073,7 @@ def _get_metadata_file(self, metadata_role, remote_filename,
# 'file_object' is also verified if decompressed above (i.e., the
# uncompressed version).
metadata_signable = \
tuf.util.load_json_string(file_object.read().decode('utf-8'))
tuf.ssl_crypto.util.load_json_string(file_object.read().decode('utf-8'))
# If the version number is unspecified, ensure that the version number
# downloaded is greater than the currently trusted version number for
@ -1164,7 +1164,7 @@ def _get_file(self, filepath, verify_file_function, file_type,
The relative metadata or target filepath.
verify_file_function:
A callable function that expects a 'tuf.util.TempFile' file-like object
A callable function that expects a 'tuf.ssl_crypto.util.TempFile' file-like object
and raises an exception if the file is invalid. Target files and
uncompressed versions of metadata may be verified with
'verify_file_function'.
@ -1202,7 +1202,7 @@ def _get_file(self, filepath, verify_file_function, file_type,
file and returned.
<Returns>
A 'tuf.util.TempFile' file-like object containing the metadata or target.
A 'tuf.ssl_crypto.util.TempFile' file-like object containing the metadata or target.
"""
file_mirrors = tuf.mirrors.get_list_of_mirrors(file_type, filepath,
@ -1347,7 +1347,7 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
current_filepath = os.path.join(self.metadata_directory['current'],
metadata_filename)
current_filepath = os.path.abspath(current_filepath)
tuf.util.ensure_parent_dir(current_filepath)
tuf.ssl_crypto.util.ensure_parent_dir(current_filepath)
previous_filepath = os.path.join(self.metadata_directory['previous'],
metadata_filename)
@ -1355,14 +1355,14 @@ def _update_metadata(self, metadata_role, upperbound_filelength, version=None,
if os.path.exists(current_filepath):
# Previous metadata might not exist, say when delegations are added.
tuf.util.ensure_parent_dir(previous_filepath)
tuf.ssl_crypto.util.ensure_parent_dir(previous_filepath)
shutil.move(current_filepath, previous_filepath)
# Next, move the verified updated metadata file to the 'current' directory.
# Note that the 'move' method comes from tuf.util's TempFile class.
# 'metadata_file_object' is an instance of tuf.util.TempFile.
# Note that the 'move' method comes from tuf.ssl_crypto.util's TempFile class.
# 'metadata_file_object' is an instance of tuf.ssl_crypto.util.TempFile.
metadata_signable = \
tuf.util.load_json_string(metadata_file_object.read().decode('utf-8'))
tuf.ssl_crypto.util.load_json_string(metadata_file_object.read().decode('utf-8'))
if compression_algorithm == 'gzip':
current_uncompressed_filepath = \
@ -1834,7 +1834,7 @@ def _update_fileinfo(self, metadata_filename):
# Extract the file information from the actual file and save it
# to the fileinfo store.
file_length, hashes = tuf.util.get_file_details(current_filepath)
file_length, hashes = tuf.ssl_crypto.util.get_file_details(current_filepath)
metadata_fileinfo = tuf.tufformats.make_fileinfo(file_length, hashes)
self.fileinfo[metadata_filename] = metadata_fileinfo
@ -1879,7 +1879,7 @@ def _move_current_to_previous(self, metadata_role):
# Move the current path to the previous path.
if os.path.exists(current_filepath):
tuf.util.ensure_parent_dir(previous_filepath)
tuf.ssl_crypto.util.ensure_parent_dir(previous_filepath)
os.rename(current_filepath, previous_filepath)
@ -2536,7 +2536,7 @@ def _visit_child_role(self, child_role, target_filepath, parent_delegations):
# Is the child role allowed by its parent role to specify this path
# in its metadata?
try:
tuf.util.ensure_all_targets_allowed(child_role_name, [target_filepath],
tuf.ssl_crypto.util.ensure_all_targets_allowed(child_role_name, [target_filepath],
parent_delegations)
except tuf.ssl_commons.exceptions.ForbiddenTargetError:

View file

@ -38,7 +38,7 @@
import tuf
import tuf.tufformats
import tuf.util
import tuf.ssl_crypto.util
import tuf.keydb
import tuf.roledb
import tuf.keys
@ -265,7 +265,7 @@ def write(self, write_partial=False):
# Ensure the parent directories of 'metadata_filepath' exist, otherwise an
# IO exception is raised if 'metadata_filepath' is written to a
# sub-directory.
tuf.util.ensure_parent_dir(delegated_filename)
tuf.ssl_crypto.util.ensure_parent_dir(delegated_filename)
_generate_and_write_metadata(delegated_rolename, delegated_filename,
write_partial, self._targets_directory,
@ -816,7 +816,7 @@ def load_project(project_directory, prefix='', new_targets_location=None):
config_filename = os.path.join(project_directory, PROJECT_FILENAME)
try:
project_configuration = tuf.util.load_json_file(config_filename)
project_configuration = tuf.ssl_crypto.util.load_json_file(config_filename)
tuf.ssl_crypto.formats.PROJECT_CFG_SCHEMA.check_match(project_configuration)
except (OSError, IOError, tuf.ssl_commons.exceptions.FormatError):
@ -863,7 +863,7 @@ def load_project(project_directory, prefix='', new_targets_location=None):
# Load the project's metadata.
targets_metadata_path = os.path.join(project_directory, metadata_directory,
project_filename)
signable = tuf.util.load_json_file(targets_metadata_path)
signable = tuf.ssl_crypto.util.load_json_file(targets_metadata_path)
tuf.tufformats.check_signable_object_format(signable)
targets_metadata = signable['signed']
@ -929,7 +929,7 @@ def load_project(project_directory, prefix='', new_targets_location=None):
signable = None
try:
signable = tuf.util.load_json_file(metadata_path)
signable = tuf.ssl_crypto.util.load_json_file(metadata_path)
except (ValueError, IOError, tuf.ssl_commons.exceptions.Error):
raise

View file

@ -17,7 +17,7 @@
length of a downloaded file has to match the hash and length supplied by the
metadata of that file. The downloaded file is technically a file-like object
that will automatically destroys itself once closed. Note that the file-like
object, 'tuf.util.TempFile', is returned by the '_download_file()' function.
object, 'tuf.ssl_crypto.util.TempFile', is returned by the '_download_file()' function.
"""
# Help with Python 3 compatibility, where the print statement is a function, an
@ -39,7 +39,7 @@
import tuf
from simple_settings import settings
import tuf.ssl_crypto.hash
import tuf.util
import tuf.ssl_crypto.util
import tuf.tufformats
import six
@ -65,7 +65,7 @@ def safe_download(url, required_length):
tuf.download.unsafe_download() may be called if an upper download limit is
preferred.
'tuf.util.TempFile', the file-like object returned, is used instead of
'tuf.ssl_crypto.util.TempFile', the file-like object returned, is used instead of
regular tempfile object because of additional functionality provided, such
as handling compressed metadata and automatically closing files after
moving to final destination.
@ -80,7 +80,7 @@ def safe_download(url, required_length):
limit.
<Side Effects>
A 'tuf.util.TempFile' object is created on disk to store the contents of
A 'tuf.ssl_crypto.util.TempFile' object is created on disk to store the contents of
'url'.
<Exceptions>
@ -92,7 +92,7 @@ def safe_download(url, required_length):
Any other unforeseen runtime exception.
<Returns>
A 'tuf.util.TempFile' file-like object that points to the contents of 'url'.
A 'tuf.ssl_crypto.util.TempFile' file-like object that points to the contents of 'url'.
"""
# Do all of the arguments have the appropriate format?
@ -129,7 +129,7 @@ def unsafe_download(url, required_length):
tuf.download.safe_download() may be called if an exact download limit is
preferred.
'tuf.util.TempFile', the file-like object returned, is used instead of
'tuf.ssl_crypto.util.TempFile', the file-like object returned, is used instead of
regular tempfile object because of additional functionality provided, such
as handling compressed metadata and automatically closing files after
moving to final destination.
@ -144,7 +144,7 @@ def unsafe_download(url, required_length):
limit.
<Side Effects>
A 'tuf.util.TempFile' object is created on disk to store the contents of
A 'tuf.ssl_crypto.util.TempFile' object is created on disk to store the contents of
'url'.
<Exceptions>
@ -156,7 +156,7 @@ def unsafe_download(url, required_length):
Any other unforeseen runtime exception.
<Returns>
A 'tuf.util.TempFile' file-like object that points to the contents of 'url'.
A 'tuf.ssl_crypto.util.TempFile' file-like object that points to the contents of 'url'.
"""
# Do all of the arguments have the appropriate format?
@ -191,8 +191,8 @@ def _download_file(url, required_length, STRICT_REQUIRED_LENGTH=True):
opens a connection to 'url' and downloads the file while ensuring its
length and hashes match 'required_hashes' and 'required_length'.
tuf.util.TempFile is used instead of regular tempfile object because of
additional functionality provided by 'tuf.util.TempFile'.
tuf.ssl_crypto.util.TempFile is used instead of regular tempfile object because of
additional functionality provided by 'tuf.ssl_crypto.util.TempFile'.
<Arguments>
url:
@ -208,7 +208,7 @@ def _download_file(url, required_length, STRICT_REQUIRED_LENGTH=True):
timestamp metadata, which has no signed required_length.
<Side Effects>
A 'tuf.util.TempFile' object is created on disk to store the contents of
A 'tuf.ssl_crypto.util.TempFile' object is created on disk to store the contents of
'url'.
<Exceptions>
@ -220,7 +220,7 @@ def _download_file(url, required_length, STRICT_REQUIRED_LENGTH=True):
Any other unforeseen runtime exception.
<Returns>
A 'tuf.util.TempFile' file-like object that points to the contents of 'url'.
A 'tuf.ssl_crypto.util.TempFile' file-like object that points to the contents of 'url'.
"""
# Do all of the arguments have the appropriate format?
@ -236,7 +236,7 @@ def _download_file(url, required_length, STRICT_REQUIRED_LENGTH=True):
# This is the temporary file that we will return to contain the contents of
# the downloaded file.
temp_file = tuf.util.TempFile()
temp_file = tuf.ssl_crypto.util.TempFile()
try:
# Open the connection to the remote file.

View file

@ -138,6 +138,7 @@
import tuf.ssl_crypto.hash
# Perform format checks of argument objects.
import tuf.pycrypto_keys
import tuf.ssl_crypto.formats
import tuf.tufformats

View file

@ -28,7 +28,7 @@
import os
import tuf
import tuf.util
import tuf.ssl_crypto.util
import tuf.tufformats
import six
@ -89,12 +89,12 @@ def get_list_of_mirrors(file_type, file_path, mirrors_dict):
'Supported file types: '+repr(_SUPPORTED_FILE_TYPES)
raise tuf.ssl_commons.exceptions.Error(message)
# Reference to 'tuf.util.file_in_confined_directories()' (improve readability).
# Reference to 'tuf.ssl_crypto.util.file_in_confined_directories()' (improve readability).
# This function checks whether a mirror should serve a file to the client.
# A client may be confined to certain paths on a repository mirror
# when fetching target files. This field may be set by the client when
# the repository mirror is added to the 'tuf.client.updater.Updater' object.
in_confined_directory = tuf.util.file_in_confined_directories
in_confined_directory = tuf.ssl_crypto.util.file_in_confined_directories
list_of_mirrors = []
for mirror_name, mirror_info in six.iteritems(mirrors_dict):

View file

@ -136,7 +136,7 @@
from simple_settings import settings
# Import routine to process key files containing JSON data.
import tuf.util
import tuf.ssl_crypto.util
# Recommended RSA key sizes:
# http://www.emc.com/emc-plus/rsa-labs/historical/twirl-and-rsa-key-size.htm#table1
@ -828,7 +828,7 @@ def decrypt_key(encrypted_key, password):
# Raise 'tuf.ssl_commons.exceptions.Error' if 'json_data' cannot be deserialized to a valid
# 'tuf.ssl_crypto.formats.ANYKEY_SCHEMA' key object.
key_object = tuf.util.load_json_string(json_data.decode())
key_object = tuf.ssl_crypto.util.load_json_string(json_data.decode())
return key_object
@ -991,7 +991,7 @@ def _decrypt(file_contents, password):
generated_hmac = binascii.hexlify(generated_hmac_object.finalize())
if not tuf.util.digests_are_equal(generated_hmac.decode(), hmac):
if not tuf.ssl_crypto.util.digests_are_equal(generated_hmac.decode(), hmac):
raise tuf.ssl_commons.exceptions.CryptoError('Decryption failed.')
# Construct a Cipher object, with the key and iv.

View file

@ -111,13 +111,13 @@
import tuf.ssl_crypto.hash
# Perform object format-checking.
import tuf.tufformats
#import tuf.tufformats
# Extract the cryptography library settings.
from simple_settings import settings
# Import key files containing json data.
import tuf.util
import tuf.ssl_crypto.util
# Recommended RSA key sizes:
# http://www.emc.com/emc-plus/rsa-labs/historical/twirl-and-rsa-key-size.htm#table1
@ -766,7 +766,7 @@ def decrypt_key(encrypted_key, password):
# Raise 'tuf.ssl_commons.exceptions.Error' if 'json_data' cannot be deserialized to a valid
# 'tuf.ssl_crypto.formats.ANYKEY_SCHEMA' key object.
key_object = tuf.util.load_json_string(json_data.decode())
key_object = tuf.ssl_crypto.util.load_json_string(json_data.decode())
return key_object
@ -931,7 +931,7 @@ def _decrypt(file_contents, password):
Crypto.Hash.SHA256)
generated_hmac = generated_hmac_object.hexdigest()
if not tuf.util.digests_are_equal(generated_hmac, hmac):
if not tuf.ssl_crypto.util.digests_are_equal(generated_hmac, hmac):
raise tuf.ssl_commons.exceptions.CryptoError('Decryption failed.')
# The following decryption routine assumes 'ciphertext' was encrypted with

View file

@ -44,7 +44,7 @@
import tuf
import tuf.ssl_crypto.formats
import tuf.tufformats
import tuf.util
import tuf.ssl_crypto.util
import tuf.keydb
import tuf.roledb
import tuf.keys
@ -607,7 +607,7 @@ def _load_top_level_metadata(repository, top_level_filenames):
if os.path.exists(root_filename):
# Initialize the key and role metadata of the top-level roles.
signable = tuf.util.load_json_file(root_filename)
signable = tuf.ssl_crypto.util.load_json_file(root_filename)
tuf.tufformats.check_signable_object_format(signable)
root_metadata = signable['signed']
tuf.keydb.create_keydb_from_root_metadata(root_metadata)
@ -654,7 +654,7 @@ def _load_top_level_metadata(repository, top_level_filenames):
# Load 'timestamp.json'. A Timestamp role file without a version number is
# always written.
if os.path.exists(timestamp_filename):
signable = tuf.util.load_json_file(timestamp_filename)
signable = tuf.ssl_crypto.util.load_json_file(timestamp_filename)
timestamp_metadata = signable['signed']
for signature in signable['signatures']:
repository.timestamp.add_signature(signature, mark_role_as_dirty=False)
@ -696,7 +696,7 @@ def _load_top_level_metadata(repository, top_level_filenames):
snapshot_filename = os.path.join(dirname, str(snapshot_version) + '.' + basename + METADATA_EXTENSION)
if os.path.exists(snapshot_filename):
signable = tuf.util.load_json_file(snapshot_filename)
signable = tuf.ssl_crypto.util.load_json_file(snapshot_filename)
tuf.tufformats.check_signable_object_format(signable)
snapshot_metadata = signable['signed']
@ -735,7 +735,7 @@ def _load_top_level_metadata(repository, top_level_filenames):
targets_filename = os.path.join(dirname, str(targets_version) + '.' + basename)
if os.path.exists(targets_filename):
signable = tuf.util.load_json_file(targets_filename)
signable = tuf.ssl_crypto.util.load_json_file(targets_filename)
tuf.tufformats.check_signable_object_format(signable)
targets_metadata = signable['signed']
@ -881,11 +881,11 @@ def generate_and_write_rsa_keypair(filepath, bits=DEFAULT_RSA_KEY_BITS,
# Write public key (i.e., 'public', which is in PEM format) to
# '<filepath>.pub'. If the parent directory of filepath does not exist,
# create it (and all its parent directories, if necessary).
tuf.util.ensure_parent_dir(filepath)
tuf.ssl_crypto.util.ensure_parent_dir(filepath)
# Create a tempororary file, write the contents of the public key, and move
# to final destination.
file_object = tuf.util.TempFile()
file_object = tuf.ssl_crypto.util.TempFile()
file_object.write(public.encode('utf-8'))
# The temporary file is closed after the final move.
@ -894,7 +894,7 @@ def generate_and_write_rsa_keypair(filepath, bits=DEFAULT_RSA_KEY_BITS,
# Write the private key in encrypted PEM format to '<filepath>'.
# Unlike the public key file, the private key does not have a file
# extension.
file_object = tuf.util.TempFile()
file_object = tuf.ssl_crypto.util.TempFile()
file_object.write(encrypted_pem.encode('utf-8'))
file_object.move(filepath)
@ -1088,11 +1088,11 @@ def generate_and_write_ed25519_keypair(filepath, password=None):
# Write the public key, conformant to 'tuf.ssl_crypto.formats.KEY_SCHEMA', to
# '<filepath>.pub'.
tuf.util.ensure_parent_dir(filepath)
tuf.ssl_crypto.util.ensure_parent_dir(filepath)
# Create a tempororary file, write the contents of the public key, and move
# to final destination.
file_object = tuf.util.TempFile()
file_object = tuf.ssl_crypto.util.TempFile()
file_object.write(json.dumps(ed25519key_metadata_format).encode('utf-8'))
# The temporary file is closed after the final move.
@ -1100,7 +1100,7 @@ def generate_and_write_ed25519_keypair(filepath, password=None):
# Write the encrypted key string, conformant to
# 'tuf.ssl_crypto.formats.ENCRYPTEDKEY_SCHEMA', to '<filepath>'.
file_object = tuf.util.TempFile()
file_object = tuf.ssl_crypto.util.TempFile()
file_object.write(encrypted_key.encode('utf-8'))
file_object.move(filepath)
@ -1141,7 +1141,7 @@ def import_ed25519_publickey_from_file(filepath):
# ED25519 key objects are saved in json and metadata format. Return the
# loaded key object in tuf.ssl_crypto.formats.ED25519KEY_SCHEMA' format that also
# includes the keyid.
ed25519_key_metadata = tuf.util.load_json_file(filepath)
ed25519_key_metadata = tuf.ssl_crypto.util.load_json_file(filepath)
ed25519_key, junk = tuf.keys.format_metadata_to_key(ed25519_key_metadata)
# Raise an exception if an unexpected key type is imported.
@ -1349,7 +1349,7 @@ def get_metadata_fileinfo(filename, custom=None):
# file information, such as the file's author, version/revision
# numbers, etc.
filesize, filehashes = \
tuf.util.get_file_details(filename, settings.REPOSITORY_HASH_ALGORITHMS)
tuf.ssl_crypto.util.get_file_details(filename, settings.REPOSITORY_HASH_ALGORITHMS)
return tuf.tufformats.make_fileinfo(filesize, filehashes, custom=custom)
@ -1425,7 +1425,7 @@ def get_target_hash(target_filepath):
The hash of 'target_filepath'.
"""
return tuf.util.get_target_hash(target_filepath)
return tuf.ssl_crypto.util.get_target_hash(target_filepath)
@ -1746,7 +1746,7 @@ def generate_snapshot_metadata(metadata_directory, version, expiration_date,
# available delegated roles on the repository.
fileinfodict = {}
root_path = os.path.join(metadata_directory, root_filename + '.json')
length, hashes = tuf.util.get_file_details(root_path)
length, hashes = tuf.ssl_crypto.util.get_file_details(root_path)
root_version = get_metadata_versioninfo('root')
fileinfodict[ROOT_FILENAME] = tuf.tufformats.make_fileinfo(length, hashes, version=root_version['version'])
fileinfodict[TARGETS_FILENAME] = get_metadata_versioninfo(targets_filename)
@ -1831,7 +1831,7 @@ def generate_timestamp_metadata(snapshot_filename, version, expiration_date):
# Retrieve the versioninfo of the Snapshot metadata file.
snapshot_fileinfo = {}
length, hashes = tuf.util.get_file_details(snapshot_filename)
length, hashes = tuf.ssl_crypto.util.get_file_details(snapshot_filename)
snapshot_version = get_metadata_versioninfo('snapshot')
snapshot_fileinfo[SNAPSHOT_FILENAME] = \
tuf.tufformats.make_fileinfo(length, hashes, version=snapshot_version['version'])
@ -2012,11 +2012,11 @@ def write_metadata_file(metadata, filename, version_number,
# versions. To avoid partial metadata from being written, 'metadata' is
# first written to a temporary location (i.e., 'file_object') and then
# moved to 'filename'.
file_object = tuf.util.TempFile()
file_object = tuf.ssl_crypto.util.TempFile()
# Serialize 'metadata' to the file-like object and then write
# 'file_object' to disk. The dictionary keys of 'metadata' are sorted
# and indentation is used. The 'tuf.util.TempFile' file-like object is
# and indentation is used. The 'tuf.ssl_crypto.util.TempFile' file-like object is
# automically closed after the final move.
file_object.write(file_content)
@ -2077,7 +2077,7 @@ def write_metadata_file(metadata, filename, version_number,
continue
elif compression_algorithm == 'gz':
file_object = tuf.util.TempFile()
file_object = tuf.ssl_crypto.util.TempFile()
compressed_filename = filename + '.gz'
# Instantiate a gzip object, but save compressed content to
@ -2127,7 +2127,7 @@ def _write_compressed_metadata(file_object, compressed_filename,
file_object.move(compressed_filename)
# The temporary file must be closed if 'file_object.move()' is not used.
# tuf.util.TempFile() automatically closes the temp file when move() is
# tuf.ssl_crypto.util.TempFile() automatically closes the temp file when move() is
# called
else:
file_object.close_temp_file()
@ -2151,7 +2151,7 @@ def _write_compressed_metadata(file_object, compressed_filename,
else:
logger.debug('Skipping compression extension: ' + repr(compression_extension))
# Move the 'tuf.util.TempFile' object to one of the filenames so that it is
# Move the 'tuf.ssl_crypto.util.TempFile' object to one of the filenames so that it is
# saved and the temporary file closed.
if not os.path.exists(consistent_filename):
logger.debug('Saving ' + repr(consistent_filename))

View file

@ -40,7 +40,7 @@
import tuf
import tuf.tufformats
import tuf.util
import tuf.ssl_crypto.util
import tuf.keydb
import tuf.roledb
import tuf.keys
@ -2939,7 +2939,7 @@ def load_repository(repository_directory):
signable = None
try:
signable = tuf.util.load_json_file(metadata_path)
signable = tuf.ssl_crypto.util.load_json_file(metadata_path)
except (tuf.ssl_commons.exceptions.Error, ValueError, IOError):
logger.debug('Tried to load metadata with invalid JSON'

View file

@ -1,998 +0,0 @@
"""
<Program Name>
util.py
<Author>
Konstantin Andrianov
<Started>
March 24, 2012. Derived from original util.py written by Geremy Condra.
<Copyright>
See LICENSE for licensing information.
<Purpose>
Provides utility services. This module supplies utility functions such as:
get_file_details() that computes the length and hash of a file, import_json
that tries to import a working json module, load_json_* functions, and a
TempFile class that generates a file-like object for temporary storage, etc.
"""
# Help with Python 3 compatibility, where the print statement is a function, an
# implicit relative import is invalid, and the '/' operator performs true
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import os
import sys
import gzip
import shutil
import logging
import tempfile
import fnmatch
import tuf
import tuf.ssl_crypto.hash
from simple_settings import settings
import tuf.tufformats
import six
# The algorithm used by the repository to generate the digests of the
# target filepaths, which are included in metadata files and may be prepended
# to the filenames of consistent snapshots.
HASH_FUNCTION = 'sha256'
# See 'log.py' to learn how logging is handled in TUF.
logger = logging.getLogger('tuf.util')
class TempFile(object):
"""
<Purpose>
A high-level temporary file that cleans itself up or can be manually
cleaned up. This isn't a complete file-like object. The file functions
that are supported make additional common-case safe assumptions. There
are additional functions that aren't part of file-like objects. TempFile
is used in the download.py module to temporarily store downloaded data while
all security checks (file hashes/length) are performed.
"""
def _default_temporary_directory(self, prefix):
"""__init__ helper."""
try:
self.temporary_file = tempfile.NamedTemporaryFile(prefix=prefix)
except OSError as err: # pragma: no cover
logger.critical('Cannot create a system temporary directory: '+repr(err))
raise tuf.ssl_commons.exceptions.Error(err)
def __init__(self, prefix='tuf_temp_'):
"""
<Purpose>
Initializes TempFile.
<Arguments>
prefix:
A string argument to be used with tempfile.NamedTemporaryFile function.
<Exceptions>
tuf.ssl_commons.exceptions.Error on failure to load temp dir.
<Return>
None.
"""
self._compression = None
# If compression is set then the original file is saved in 'self._orig_file'.
self._orig_file = None
temp_dir = settings.temporary_directory
if temp_dir is not None and tuf.ssl_crypto.formats.PATH_SCHEMA.matches(temp_dir):
try:
self.temporary_file = tempfile.NamedTemporaryFile(prefix=prefix,
dir=temp_dir)
except OSError as err:
logger.error('Temp file in ' + temp_dir + ' failed: '+repr(err))
logger.error('Will attempt to use system default temp dir.')
self._default_temporary_directory(prefix)
else:
self._default_temporary_directory(prefix)
def get_compressed_length(self):
"""
<Purpose>
Get the compressed length of the file. This will be correct information
even when the file is read as an uncompressed one.
<Arguments>
None.
<Exceptions>
OSError.
<Return>
Nonnegative integer representing compressed file size.
"""
# Even if we read a compressed file with the gzip standard library module,
# the original file will remain compressed.
return os.stat(self.temporary_file.name).st_size
def flush(self):
"""
<Purpose>
Flushes buffered output for the file.
<Arguments>
None.
<Exceptions>
None.
<Return>
None.
"""
self.temporary_file.flush()
def read(self, size=None):
"""
<Purpose>
Read specified number of bytes. If size is not specified then the whole
file is read and the file pointer is placed at the beginning of the file.
<Arguments>
size:
Number of bytes to be read.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError: if 'size' is invalid.
<Return>
String of data.
"""
if size is None:
self.temporary_file.seek(0)
data = self.temporary_file.read()
self.temporary_file.seek(0)
return data
else:
if not (isinstance(size, int) and size > 0):
raise tuf.ssl_commons.exceptions.FormatError
return self.temporary_file.read(size)
def write(self, data, auto_flush=True):
"""
<Purpose>
Writes a data string to the file.
<Arguments>
data:
A string containing some data.
auto_flush:
Boolean argument, if set to 'True', all data will be flushed from
internal buffer.
<Exceptions>
None.
<Return>
None.
"""
self.temporary_file.write(data)
if auto_flush:
self.flush()
def move(self, destination_path):
"""
<Purpose>
Copies 'self.temporary_file' to a non-temp file at 'destination_path' and
closes 'self.temporary_file' so that it is removed.
<Arguments>
destination_path:
Path to store the file in.
<Exceptions>
None.
<Return>
None.
"""
self.flush()
self.seek(0)
destination_file = open(destination_path, 'wb')
shutil.copyfileobj(self.temporary_file, destination_file)
# Force the destination file to be written to disk from Python's internal and
# the operation system's buffers. os.fsync() should follow flush().
destination_file.flush()
os.fsync(destination_file.fileno())
destination_file.close()
# 'self.close()' closes temporary file which destroys itself.
self.close_temp_file()
def seek(self, *args):
"""
<Purpose>
Set file's current position.
<Arguments>
*args:
(*-operator): unpacking argument list is used
because seek method accepts two args: offset and whence. If whence is
not specified, its default is 0. Indicate offset to set the file's
current position. Refer to the python manual for more info.
<Exceptions>
None.
<Return>
None.
"""
self.temporary_file.seek(*args)
def decompress_temp_file_object(self, compression):
"""
<Purpose>
To decompress a compressed temp file object. Decompression is performed
on a temp file object that is compressed, this occurs after downloading
a compressed file. For instance if a compressed version of some meta
file in the repository is downloaded, the temp file containing the
compressed meta file will be decompressed using this function.
Note that after calling this method, write() can no longer be called.
meta.json.gz
|...[download]
temporary_file (containing meta.json.gz)
/ \
temporary_file _orig_file
containing meta.json containing meta.json.gz
(decompressed data)
<Arguments>
compression:
A string indicating the type of compression that was used to compress
a file. Only gzip is allowed.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError: If 'compression' is improperly formatted.
tuf.ssl_commons.exceptions.Error: If an invalid compression is given.
tuf.ssl_commons.exceptions.DecompressionError: If the compression failed for any reason.
<Side Effects>
'self._orig_file' is used to store the original data of 'temporary_file'.
<Return>
None.
"""
# Does 'compression' have the correct format?
# Raise 'tuf.ssl_commons.exceptions.FormatError' if there is a mismatch.
tuf.ssl_crypto.formats.NAME_SCHEMA.check_match(compression)
if self._orig_file is not None:
raise tuf.ssl_commons.exceptions.Error('Can only set compression on a TempFile once.')
if compression != 'gzip':
raise tuf.ssl_commons.exceptions.Error('Only gzip compression is supported.')
self.seek(0)
self._compression = compression
self._orig_file = self.temporary_file
try:
gzip_file_object = gzip.GzipFile(fileobj=self.temporary_file, mode='rb')
uncompressed_content = gzip_file_object.read()
self.temporary_file = tempfile.NamedTemporaryFile()
self.temporary_file.write(uncompressed_content)
self.flush()
except Exception as exception:
raise tuf.ssl_commons.exceptions.DecompressionError(exception)
def close_temp_file(self):
"""
<Purpose>
Closes the temporary file object. 'close_temp_file' mimics usual
file.close(), however temporary file destroys itself when
'close_temp_file' is called. Further if compression is set, second
temporary file instance 'self._orig_file' is also closed so that no open
temporary files are left open.
<Arguments>
None.
<Exceptions>
None.
<Side Effects>
Closes 'self._orig_file'.
<Return>
None.
"""
self.temporary_file.close()
# If compression has been set, we need to explicitly close the original
# file object.
if self._orig_file is not None:
self._orig_file.close()
def get_file_details(filepath, hash_algorithms=['sha256']):
"""
<Purpose>
To get file's length and hash information. The hash is computed using the
sha256 algorithm. This function is used in the signerlib.py and updater.py
modules.
<Arguments>
filepath:
Absolute file path of a file.
hash_algorithms:
<Exceptions>
tuf.ssl_commons.exceptions.FormatError: If hash of the file does not match HASHDICT_SCHEMA.
tuf.ssl_commons.exceptions.Error: If 'filepath' does not exist.
<Returns>
A tuple (length, hashes) describing 'filepath'.
"""
# Making sure that the format of 'filepath' is a path string.
# 'tuf.ssl_commons.exceptions.FormatError' is raised on incorrect format.
tuf.ssl_crypto.formats.PATH_SCHEMA.check_match(filepath)
tuf.ssl_crypto.formats.HASHALGORITHMS_SCHEMA.check_match(hash_algorithms)
# The returned file hashes of 'filepath'.
file_hashes = {}
# Does the path exists?
if not os.path.exists(filepath):
raise tuf.ssl_commons.exceptions.Error('Path ' + repr(filepath) + ' doest not exist.')
filepath = os.path.abspath(filepath)
# Obtaining length of the file.
file_length = os.path.getsize(filepath)
# Obtaining hash of the file.
for algorithm in hash_algorithms:
digest_object = tuf.ssl_crypto.hash.digest_filename(filepath, algorithm)
file_hashes.update({algorithm: digest_object.hexdigest()})
# Performing a format check to ensure 'file_hash' corresponds HASHDICT_SCHEMA.
# Raise 'tuf.ssl_commons.exceptions.FormatError' if there is a mismatch.
tuf.ssl_crypto.formats.HASHDICT_SCHEMA.check_match(file_hashes)
return file_length, file_hashes
def ensure_parent_dir(filename):
"""
<Purpose>
To ensure existence of the parent directory of 'filename'. If the parent
directory of 'name' does not exist, create it.
Example: If 'filename' is '/a/b/c/d.txt', and only the directory '/a/b/'
exists, then directory '/a/b/c/d/' will be created.
<Arguments>
filename:
A path string.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError: If 'filename' is improperly formatted.
<Side Effects>
A directory is created whenever the parent directory of 'filename' does not
exist.
<Return>
None.
"""
# Ensure 'filename' corresponds to 'PATH_SCHEMA'.
# Raise 'tuf.ssl_commons.exceptions.FormatError' on a mismatch.
tuf.ssl_crypto.formats.PATH_SCHEMA.check_match(filename)
# Split 'filename' into head and tail, check if head exists.
directory = os.path.split(filename)[0]
if directory and not os.path.exists(directory):
# mode = 'rwx------'. 448 (decimal) is 700 in octal.
os.makedirs(directory, 448)
def file_in_confined_directories(filepath, confined_directories):
"""
<Purpose>
Check if the directory containing 'filepath' is in the list/tuple of
'confined_directories'.
<Arguments>
filepath:
A string representing the path of a file. The following example path
strings are viewed as files and not directories: 'a/b/c', 'a/b/c.txt'.
confined_directories:
A list, or a tuple, of directory strings.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError: On incorrect format of the input.
<Return>
Boolean. True, if path is either the empty string
or in 'confined_paths'; False, otherwise.
"""
# Do the arguments have the correct format?
# Raise 'tuf.ssl_commons.exceptions.FormatError' if there is a mismatch.
tuf.ssl_crypto.formats.RELPATH_SCHEMA.check_match(filepath)
tuf.ssl_crypto.formats.RELPATHS_SCHEMA.check_match(confined_directories)
for confined_directory in confined_directories:
# The empty string (arbitrarily chosen) signifies the client is confined
# to all directories and subdirectories. No need to check 'filepath'.
if confined_directory == '':
return True
# Normalized paths needed, to account for up-level references, etc.
# TUF clients have the option of setting the list of directories in
# 'confined_directories'.
filepath = os.path.normpath(filepath)
confined_directory = os.path.normpath(confined_directory)
# A TUF client may restrict himself to specific directories on the
# remote repository. The list of paths in 'confined_path', not including
# each path's subdirectories, are the only directories the client will
# download targets from.
if os.path.dirname(filepath) == confined_directory:
return True
return False
def find_delegated_role(roles, delegated_role):
"""
<Purpose>
Find the index, if any, of a role with a given name in a list of roles.
<Arguments>
roles:
The list of roles, each of which must have a 'name' attribute.
delegated_role:
The name of the role to be found in the list of roles.
<Exceptions>
tuf.ssl_commons.exceptions.RepositoryError, if the list of roles has invalid data.
<Side Effects>
No known side effects.
<Returns>
The unique index, an interger, in the list of roles. if 'delegated_role'
does not exist, 'None' is returned.
"""
# Do the arguments have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named.
# Raise 'tuf.ssl_commons.exceptions.FormatError' if any are improperly formatted.
tuf.ssl_crypto.formats.ROLELIST_SCHEMA.check_match(roles)
tuf.ssl_crypto.formats.ROLENAME_SCHEMA.check_match(delegated_role)
# The index of a role, if any, with the same name.
role_index = None
for index in six.moves.xrange(len(roles)):
role = roles[index]
name = role.get('name')
# This role has no name.
if name is None:
no_name_message = 'Role with no name.'
raise tuf.ssl_commons.exceptions.RepositoryError(no_name_message)
# Does this role have the same name?
else:
# This role has the same name, and...
if name == delegated_role:
# ...it is the only known role with the same name.
if role_index is None:
role_index = index
# ...there are at least two roles with the same name.
else:
duplicate_role_message = 'Duplicate role (' + str(delegated_role) + ').'
raise tuf.ssl_commons.exceptions.RepositoryError(duplicate_role_message)
# This role has a different name.
else:
logger.debug('Skipping delegated role: ' + repr(delegated_role))
return role_index
def ensure_all_targets_allowed(rolename, list_of_targets, parent_delegations):
"""
<Purpose>
Ensure that the list of targets specified by 'rolename' are allowed; this is
determined by inspecting the 'delegations' field of the parent role
of 'rolename'. If a target specified by 'rolename' is not found in the
delegations field of 'metadata_object_of_parent', raise an exception. The
top-level role 'targets' is allowed to list any target file, so this
function does not raise an exception if 'rolename' is 'targets'.
Targets allowed are either exlicitly listed under the 'paths' field, or
match one of the patterns (i.e., Unix shell-style wildcards) listed there.
A parent role may delegate trust to all files under a particular directory,
including files in subdirectories by using wildcards (e.g.,
'/packages/source/Django/*', '/packages/django*.tar.gzip).
Targets listed in hashed bins are also validated (i.e., its calculated path
hash prefix must be delegated by the parent role).
TODO: Should the TUF spec restrict the repository to one particular
algorithm when calcutating path hash prefixes (currently restricted to
SHA256)? Should we allow the repository to specify in the role dictionary
the algorithm used for these generated hashed paths?
<Arguments>
rolename:
The name of the role whose targets must be verified. This is a
role name and should not end in '.json'. Examples: 'root', 'targets',
'unclaimed'.
list_of_targets:
The targets of 'rolename', as listed in targets field of the 'rolename'
metadata. 'list_of_targets' are target paths relative to the targets
directory of the repository. The delegations of the parent role are
checked to verify that the targets of 'list_of_targets' are valid.
parent_delegations:
The parent delegations of 'rolename'. The metadata object stores
the allowed paths and path hash prefixes of child delegations in its
'delegations' attribute.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError:
If any of the arguments are improperly formatted.
tuf.ssl_commons.exceptions.ForbiddenTargetError:
If the targets of 'metadata_role' are not allowed according to
the parent's metadata file. The 'paths' and 'path_hash_prefixes'
attributes are verified.
tuf.ssl_commons.exceptions.RepositoryError:
If the parent of 'rolename' has not made a delegation to 'rolename'.
<Side Effects>
None.
<Returns>
None.
"""
# Do the arguments have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named.
# Raise 'tuf.ssl_commons.exceptions.FormatError' if any are improperly formatted.
tuf.ssl_crypto.formats.ROLENAME_SCHEMA.check_match(rolename)
tuf.ssl_crypto.formats.RELPATHS_SCHEMA.check_match(list_of_targets)
tuf.ssl_crypto.formats.DELEGATIONS_SCHEMA.check_match(parent_delegations)
# Return if 'rolename' is 'targets'. 'targets' is not a delegated role. Any
# target file listed in 'targets' is allowed.
if rolename == 'targets':
return
# The allowed targets of delegated roles are stored in the parent's metadata
# file. Iterate 'list_of_targets' and confirm they are trusted, or their root
# parent directory exists in the role delegated paths, or path hash prefixes,
# of the parent role. First, locate 'rolename' in the 'roles' attribute of
# 'parent_delegations'.
roles = parent_delegations['roles']
role_index = find_delegated_role(roles, rolename)
# Ensure the delegated role exists prior to extracting trusted paths from
# the parent's 'paths', or trusted path hash prefixes from the parent's
# 'path_hash_prefixes'.
if role_index is not None:
role = roles[role_index]
allowed_child_paths = role.get('paths')
allowed_child_path_hash_prefixes = role.get('path_hash_prefixes')
actual_child_targets = list_of_targets
if allowed_child_path_hash_prefixes is not None:
consistent = paths_are_consistent_with_hash_prefixes
# 'actual_child_targets' (i.e., 'list_of_targets') should have lenth
# greater than zero due to the tuf.format check above.
if not consistent(actual_child_targets,
allowed_child_path_hash_prefixes):
message = repr(rolename) + ' specifies a target that does not' + \
' have a path hash prefix listed in its parent role.'
raise tuf.ssl_commons.exceptions.ForbiddenTargetError(message)
elif allowed_child_paths is not None:
# Check that each delegated target is either explicitly listed or a parent
# directory is found under role['paths'], otherwise raise an exception.
# If the parent role explicitly lists target file paths in 'paths',
# this loop will run in O(n^2), the worst-case. The repository
# maintainer will likely delegate entire directories, and opt for
# explicit file paths if the targets in a directory are delegated to
# different roles/developers.
for child_target in actual_child_targets:
for allowed_child_path in allowed_child_paths:
if fnmatch.fnmatch(child_target, allowed_child_path):
break
else:
raise tuf.ssl_commons.exceptions.ForbiddenTargetError('Role '+repr(rolename)+' specifies'+\
' target '+repr(child_target)+','+\
' which is not an allowed path'+\
' according to the delegations set'+\
' by its parent role.')
else:
# 'role' should have been validated when it was downloaded.
# The 'paths' or 'path_hash_prefixes' attributes should not be missing,
# so raise an error in case this clause is reached.
raise tuf.ssl_commons.exceptions.FormatError(repr(role) + ' did not contain one of ' +\
'the required fields ("paths" or ' +\
'"path_hash_prefixes").')
# Raise an exception if the parent has not delegated to the specified
# 'rolename' child role.
else:
raise tuf.ssl_commons.exceptions.RepositoryError('The parent role has not delegated to '+\
repr(rolename) + '.')
def paths_are_consistent_with_hash_prefixes(paths, path_hash_prefixes):
"""
<Purpose>
Determine whether a list of paths are consistent with their alleged
path hash prefixes. By default, the SHA256 hash function is used.
<Arguments>
paths:
A list of paths for which their hashes will be checked.
path_hash_prefixes:
The list of path hash prefixes with which to check the list of paths.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError:
If the arguments are improperly formatted.
<Side Effects>
No known side effects.
<Returns>
A Boolean indicating whether or not the paths are consistent with the
hash prefix.
"""
# Do the arguments have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named.
# Raise 'tuf.ssl_commons.exceptions.FormatError' if any are improperly formatted.
tuf.ssl_crypto.formats.RELPATHS_SCHEMA.check_match(paths)
tuf.ssl_crypto.formats.PATH_HASH_PREFIXES_SCHEMA.check_match(path_hash_prefixes)
# Assume that 'paths' and 'path_hash_prefixes' are inconsistent until
# proven otherwise.
consistent = False
# The format checks above ensure the 'paths' and 'path_hash_prefix' lists
# have lengths greater than zero.
for path in paths:
path_hash = get_target_hash(path)
# Assume that every path is inconsistent until proven otherwise.
consistent = False
for path_hash_prefix in path_hash_prefixes:
if path_hash.startswith(path_hash_prefix):
consistent = True
break
# This path has no matching path_hash_prefix. Stop looking further.
if not consistent:
break
return consistent
def get_target_hash(target_filepath):
"""
<Purpose>
Compute the hash of 'target_filepath'. This is useful in conjunction with
the "path_hash_prefixes" attribute in a delegated targets role, which
tells us which paths it is implicitly responsible for.
The repository may optionally organize targets into hashed bins to ease
target delegations and role metadata management. The use of consistent
hashing allows for a uniform distribution of targets into bins.
<Arguments>
target_filepath:
The path to the target file on the repository. This will be relative to
the 'targets' (or equivalent) directory on a given mirror.
<Exceptions>
None.
<Side Effects>
None.
<Returns>
The hash of 'target_filepath'.
"""
# Does 'target_filepath' have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named.
# Raise 'tuf.ssl_commons.exceptions.FormatError' if there is a mismatch.
tuf.ssl_crypto.formats.RELPATH_SCHEMA.check_match(target_filepath)
# Calculate the hash of the filepath to determine which bin to find the
# target. The client currently assumes the repository uses
# 'HASH_FUNCTION' to generate hashes and 'utf-8'.
digest_object = tuf.ssl_crypto.hash.digest(HASH_FUNCTION)
encoded_target_filepath = target_filepath.encode('utf-8')
digest_object.update(encoded_target_filepath)
target_filepath_hash = digest_object.hexdigest()
return target_filepath_hash
_json_module = None
def import_json():
"""
<Purpose>
Tries to import json module. We used to fall back to the simplejson module,
but we have dropped support for that module. We are keeping this interface
intact for backwards compatibility.
<Arguments>
None.
<Exceptions>
ImportError: on failure to import the json module.
<Side Effects>
None.
<Return>
json module
"""
global _json_module
if _json_module is not None:
return _json_module
else:
try:
module = __import__('json')
# The 'json' module is available in Python > 2.6, and thus this exception
# should not occur in all supported Python installations (> 2.6) of TUF.
except ImportError: #pragma: no cover
raise ImportError('Could not import the json module')
else:
_json_module = module
return module
json = import_json()
def load_json_string(data):
"""
<Purpose>
Deserialize 'data' (JSON string) to a Python object.
<Arguments>
data:
A JSON string.
<Exceptions>
tuf.ssl_commons.exceptions.Error, if 'data' cannot be deserialized to a Python object.
<Side Effects>
None.
<Returns>
Deserialized object. For example, a dictionary.
"""
deserialized_object = None
try:
deserialized_object = json.loads(data)
except TypeError:
message = 'Invalid JSON string: ' + repr(data)
raise tuf.ssl_commons.exceptions.Error(message)
except ValueError:
message = 'Cannot deserialize to a Python object: ' + repr(data)
raise tuf.ssl_commons.exceptions.Error(message)
else:
return deserialized_object
def load_json_file(filepath):
"""
<Purpose>
Deserialize a JSON object from a file containing the object.
<Arguments>
filepath:
Absolute path of JSON file.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError: If 'filepath' is improperly formatted.
tuf.ssl_commons.exceptions.Error: If 'filepath' cannot be deserialized to a Python object.
IOError: If there are runtime IO exceptions.
<Side Effects>
None.
<Return>
Deserialized object. For example, a dictionary.
"""
# Making sure that the format of 'filepath' is a path string.
# tuf.ssl_commons.exceptions.FormatError is raised on incorrect format.
tuf.ssl_crypto.formats.PATH_SCHEMA.check_match(filepath)
deserialized_object = None
# The file is mostly likely gzipped.
if filepath.endswith('.gz'):
logger.debug('gzip.open(' + str(filepath) + ')')
fileobject = six.StringIO(gzip.open(filepath).read().decode('utf-8'))
else:
logger.debug('open(' + str(filepath) + ')')
fileobject = open(filepath)
try:
deserialized_object = json.load(fileobject)
except (ValueError, TypeError):
raise tuf.ssl_commons.exceptions.Error('Cannot deserialize to a Python object: ' + repr(filepath))
else:
fileobject.close()
return deserialized_object
finally:
fileobject.close()
def digests_are_equal(digest1, digest2):
"""
<Purpose>
While protecting against timing attacks, compare the hexadecimal arguments
and determine if they are equal.
<Arguments>
digest1:
The first hexadecimal string value to compare.
digest2:
The second hexadecimal string value to compare.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError: If the arguments are improperly formatted.
<Side Effects>
None.
<Return>
Return True if 'digest1' is equal to 'digest2', False otherwise.
"""
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named.
# Raise 'tuf.ssl_commons.exceptions.FormatError' if there is a mismatch.
tuf.ssl_crypto.formats.HEX_SCHEMA.check_match(digest1)
tuf.ssl_crypto.formats.HEX_SCHEMA.check_match(digest2)
if len(digest1) != len(digest2):
return False
are_equal = True
for element in range(len(digest1)):
if digest1[element] != digest2[element]:
are_equal = False
return are_equal