mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Merge branch 'develop' of https://github.com/theupdateframework/tuf into develop
Conflicts: tests/test_hash.py tests/test_keydb.py tuf/formats.py tuf/keydb.py tuf/repository_tool.py
This commit is contained in:
commit
c5fd17ab3e
15 changed files with 546 additions and 85 deletions
27
README.md
27
README.md
|
|
@ -81,6 +81,33 @@ TUF specification document is also available:
|
|||
* [The Update Framework Specification](docs/tuf-spec.txt?raw=true)
|
||||
|
||||
|
||||
##Installation
|
||||
```Bash
|
||||
pip - installing and managing Python packages (recommended):
|
||||
|
||||
# Installing from Python Package Index (https://pypi.python.org/pypi).
|
||||
$ pip install tuf
|
||||
|
||||
# Installing from local source archive.
|
||||
$ pip install <path to archive>
|
||||
|
||||
# Or from the root directory of the unpacked archive.
|
||||
$ pip install .
|
||||
```
|
||||
|
||||
### Installing optional requirements (i.e., after installing tuf).
|
||||
```Bash
|
||||
# The optional `tuf[tools]` can be installed by users that wish to generate
|
||||
# TUF repository files, such as metadata, cryptographic keys, and signatures.
|
||||
# Whereas the basic install can only verify ed25519 signatures and is intended
|
||||
# for sofware updater clients, `tuf[tools]` provides repository maintainers
|
||||
# secure ed25519 key and signature generation with PyNaCl / libsodium.
|
||||
|
||||
# The TUF tools also enable general-purpose cryptography with PyCrypto. Software
|
||||
# updaters that want to support verification of RSASSA-PSS signatures must require
|
||||
# their clients to install `tuf[tools]`.
|
||||
$ pip install tuf[tools]
|
||||
```
|
||||
|
||||
##Using TUF
|
||||
|
||||
|
|
|
|||
|
|
@ -437,8 +437,23 @@ def test_TargetsFile(self):
|
|||
|
||||
self.assertTrue(TARGETS_SCHEMA.matches(make_metadata(version, expires,
|
||||
filedict, delegations)))
|
||||
self.assertTrue(TARGETS_SCHEMA.matches(make_metadata(version, expires, filedict)))
|
||||
|
||||
metadata = make_metadata(version, expires, filedict, delegations)
|
||||
self.assertTrue(isinstance(from_metadata(metadata), tuf.formats.TargetsFile))
|
||||
|
||||
# Test conditions for different combination of required arguments (i.e.,
|
||||
# a filedict or delegations argument is required.)
|
||||
metadata = make_metadata(version, expires, filedict)
|
||||
self.assertTrue(isinstance(from_metadata(metadata), tuf.formats.TargetsFile))
|
||||
|
||||
metadata = make_metadata(version, expires, delegations=delegations)
|
||||
self.assertTrue(isinstance(from_metadata(metadata), tuf.formats.TargetsFile))
|
||||
|
||||
# Directly instantiating a TargetsFile object.
|
||||
tuf.formats.TargetsFile(version, expires)
|
||||
tuf.formats.TargetsFile(version, expires, filedict)
|
||||
tuf.formats.TargetsFile(version, expires, delegations=delegations)
|
||||
|
||||
# Test conditions for invalid arguments.
|
||||
bad_version = 'eight'
|
||||
|
|
@ -456,6 +471,21 @@ def test_TargetsFile(self):
|
|||
self.assertRaises(tuf.Error, make_metadata, version, expires)
|
||||
|
||||
self.assertRaises(tuf.FormatError, from_metadata, 123)
|
||||
|
||||
|
||||
|
||||
def test_MirrorsFile(self):
|
||||
# Test normal case.
|
||||
version = 8
|
||||
expires = '1985-10-21T13:20:00Z'
|
||||
|
||||
mirrors_file = tuf.formats.MirrorsFile(version, expires)
|
||||
|
||||
make_metadata = tuf.formats.MirrorsFile.make_metadata
|
||||
from_metadata = tuf.formats.MirrorsFile.from_metadata
|
||||
|
||||
self.assertRaises(NotImplementedError, make_metadata)
|
||||
self.assertRaises(NotImplementedError, from_metadata, mirrors_file)
|
||||
|
||||
|
||||
|
||||
|
|
@ -505,9 +535,10 @@ def test_parse_base64(self):
|
|||
self.assertTrue(isinstance(tuf.formats.parse_base64(base64), six.binary_type))
|
||||
|
||||
# Test conditions for invalid arguments.
|
||||
self.assertRaises(tuf.FormatError, tuf.formats.format_base64, 123)
|
||||
self.assertRaises(tuf.FormatError, tuf.formats.format_base64, True)
|
||||
self.assertRaises(tuf.FormatError, tuf.formats.format_base64, ['123'])
|
||||
self.assertRaises(tuf.FormatError, tuf.formats.parse_base64, 123)
|
||||
self.assertRaises(tuf.FormatError, tuf.formats.parse_base64, True)
|
||||
self.assertRaises(tuf.FormatError, tuf.formats.parse_base64, ['123'])
|
||||
self.assertRaises(tuf.FormatError, tuf.formats.parse_base64, '/')
|
||||
|
||||
|
||||
|
||||
|
|
@ -566,6 +597,7 @@ def test_make_role_metadata(self):
|
|||
keyids = ['123abc', 'abc123']
|
||||
threshold = 2
|
||||
paths = ['path1/', 'path2']
|
||||
path_hash_prefixes = ['000', '003']
|
||||
name = '123'
|
||||
|
||||
ROLE_SCHEMA = tuf.formats.ROLE_SCHEMA
|
||||
|
|
@ -575,7 +607,9 @@ def test_make_role_metadata(self):
|
|||
self.assertTrue(ROLE_SCHEMA.matches(make_role(keyids, threshold, name=name)))
|
||||
self.assertTrue(ROLE_SCHEMA.matches(make_role(keyids, threshold, paths=paths)))
|
||||
self.assertTrue(ROLE_SCHEMA.matches(make_role(keyids, threshold, name=name, paths=paths)))
|
||||
|
||||
self.assertTrue(ROLE_SCHEMA.matches(make_role(keyids, threshold, name=name,
|
||||
path_hash_prefixes=path_hash_prefixes)))
|
||||
|
||||
# Test conditions for invalid arguments.
|
||||
bad_keyids = 'bad'
|
||||
bad_threshold = 'bad'
|
||||
|
|
@ -597,8 +631,9 @@ def test_make_role_metadata(self):
|
|||
self.assertRaises(tuf.FormatError, make_role, keyids, bad_threshold, name=name, paths=paths)
|
||||
self.assertRaises(tuf.FormatError, make_role, keyids, threshold, name=bad_name, paths=paths)
|
||||
self.assertRaises(tuf.FormatError, make_role, keyids, threshold, name=name, paths=bad_paths)
|
||||
|
||||
|
||||
|
||||
# 'paths' and 'path_hash_prefixes' cannot both be specified.
|
||||
self.assertRaises(tuf.FormatError, make_role, keyids, threshold, name, paths, path_hash_prefixes)
|
||||
|
||||
def test_get_role_class(self):
|
||||
# Test conditions for valid arguments.
|
||||
|
|
@ -688,8 +723,10 @@ def test_encode_canonical(self):
|
|||
self.assertEqual('[]', encode([]))
|
||||
self.assertEqual('{"A":[99]}', encode({"A": [99]}))
|
||||
self.assertEqual('{"x":3,"y":2}', encode({"x": 3, "y": 2}))
|
||||
|
||||
self.assertEqual('{"x":3,"y":null}', encode({"x": 3, "y": None}))
|
||||
|
||||
# Condition where 'encode()' sends the result the callable
|
||||
# Condition where 'encode()' sends the result to the callable
|
||||
# 'output'.
|
||||
self.assertEqual(None, encode([1, 2, 3], output))
|
||||
self.assertEqual('[1,2,3]', ''.join(result))
|
||||
|
|
@ -700,6 +737,7 @@ def test_encode_canonical(self):
|
|||
self.assertRaises(tuf.FormatError, encode, {"x": 8.0})
|
||||
self.assertRaises(tuf.FormatError, encode, 8.0, output)
|
||||
|
||||
self.assertRaises(tuf.FormatError, encode, {"x": tuf.FormatError})
|
||||
|
||||
|
||||
# Run unit test.
|
||||
|
|
|
|||
|
|
@ -13,7 +13,7 @@
|
|||
See LICENSE for licensing information.
|
||||
|
||||
<Purpose>
|
||||
Unit test for hash.py.
|
||||
Unit test for 'hash.py'.
|
||||
"""
|
||||
|
||||
# Help with Python 3 compatibility, where the print statement is a function, an
|
||||
|
|
|
|||
|
|
@ -160,6 +160,9 @@ def test_remove_key(self):
|
|||
|
||||
# Test for 'keyid' not in keydb.
|
||||
self.assertRaises(tuf.UnknownKeyError, tuf.keydb.remove_key, keyid)
|
||||
|
||||
# Test condition for unknown key argument.
|
||||
self.assertRaises(tuf.UnknownKeyError, tuf.keydb.remove_key, '1')
|
||||
|
||||
# Test conditions for arguments with invalid formats.
|
||||
self.assertRaises(tuf.FormatError, tuf.keydb.remove_key, None)
|
||||
|
|
@ -185,14 +188,17 @@ def test_create_keydb_from_root_metadata(self):
|
|||
'Targets': {'keyids': [keyid2], 'threshold': 1}}
|
||||
version = 8
|
||||
consistent_snapshot = False
|
||||
expires = '1985-10-21T01:21:00Z'
|
||||
|
||||
expires = '1985-10-21T01:21:00Z'
|
||||
|
||||
tuf.keydb.add_key(rsakey)
|
||||
root_metadata = tuf.formats.RootFile.make_metadata(version,
|
||||
expires,
|
||||
keydict, roledict,
|
||||
consistent_snapshot)
|
||||
self.assertEqual(None, tuf.keydb.create_keydb_from_root_metadata(root_metadata))
|
||||
|
||||
tuf.keydb.create_keydb_from_root_metadata(root_metadata)
|
||||
|
||||
# Ensure 'keyid' and 'keyid2' were added to the keydb database.
|
||||
self.assertEqual(rsakey, tuf.keydb.get_key(keyid))
|
||||
self.assertEqual(rsakey2, tuf.keydb.get_key(keyid2))
|
||||
|
|
|
|||
76
tests/test_log.py
Executable file
76
tests/test_log.py
Executable file
|
|
@ -0,0 +1,76 @@
|
|||
|
||||
"""
|
||||
<Program Name>
|
||||
test_log.py
|
||||
|
||||
<Authors>
|
||||
Vladimir Diaz <vladimir.v.diaz@gmail.com>
|
||||
|
||||
<Started>
|
||||
May 1, 2014.
|
||||
|
||||
<Copyright>
|
||||
See LICENSE for licensing information.
|
||||
|
||||
<Purpose>
|
||||
Unit test for 'log.py'.
|
||||
"""
|
||||
|
||||
import logging
|
||||
import unittest
|
||||
|
||||
import tuf
|
||||
import tuf.log
|
||||
|
||||
logger = logging.getLogger('tuf.test_log')
|
||||
|
||||
log_levels = [logging.CRITICAL, logging.ERROR, logging.WARNING,
|
||||
logging.INFO, logging.DEBUG]
|
||||
|
||||
|
||||
class TestLog(unittest.TestCase):
|
||||
|
||||
|
||||
|
||||
|
||||
def test_set_log_level(self):
|
||||
# Test normal case.
|
||||
global log_levels
|
||||
|
||||
tuf.log.set_log_level()
|
||||
self.assertTrue(logger.isEnabledFor(logging.DEBUG))
|
||||
|
||||
for level in log_levels:
|
||||
tuf.log.set_log_level(level)
|
||||
self.assertTrue(logger.isEnabledFor(level))
|
||||
|
||||
# Test for improperly formatted argument.
|
||||
self.assertRaises(tuf.FormatError, tuf.log.set_log_level, '123')
|
||||
|
||||
# Test for invalid argument.
|
||||
self.assertRaises(tuf.FormatError, tuf.log.set_log_level, 51)
|
||||
|
||||
|
||||
|
||||
def test_set_filehandler_log_level(self):
|
||||
pass
|
||||
|
||||
|
||||
def test_set_console_log_level(self):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
def test_add_console_handler(self):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
def test_remove_console_handler(self):
|
||||
pass
|
||||
|
||||
|
||||
|
||||
# Run unit test.
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
@ -264,6 +264,7 @@ def test_write_and_write_partial(self):
|
|||
repository.write(consistent_snapshot=True)
|
||||
repo_tool.load_repository(repository_directory)
|
||||
|
||||
# Test
|
||||
|
||||
# Test improperly formatted arguments.
|
||||
self.assertRaises(tuf.FormatError, repository.write, 3, False)
|
||||
|
|
@ -274,14 +275,21 @@ def test_write_and_write_partial(self):
|
|||
def test_get_filepaths_in_directory(self):
|
||||
# Test normal case.
|
||||
# Use the pre-generated metadata directory for testing.
|
||||
metadata_directory = os.path.join('repository_data',
|
||||
'repository', 'metadata')
|
||||
|
||||
|
||||
# Test improperly formatted arguments.
|
||||
# Set 'repo' reference to improve readability.
|
||||
repo = repo_tool.Repository
|
||||
metadata_directory = os.path.join('repository_data',
|
||||
'repository', 'metadata')
|
||||
# Verify the expected filenames. get_filepaths_in_directory() returns
|
||||
# a list of absolute paths.
|
||||
metadata_files = repo.get_filepaths_in_directory(metadata_directory)
|
||||
expected_files = ['root.json', 'targets.json', 'targets.json.gz',
|
||||
'snapshot.json', 'timestamp.json']
|
||||
basenames = []
|
||||
for filepath in metadata_files:
|
||||
basenames.append(os.path.basename(filepath))
|
||||
self.assertEqual(sorted(expected_files), sorted(basenames))
|
||||
|
||||
# Test improperly formatted arguments.
|
||||
self.assertRaises(tuf.FormatError, repo.get_filepaths_in_directory,
|
||||
3, recursive_walk=False, followlinks=False)
|
||||
self.assertRaises(tuf.FormatError, repo.get_filepaths_in_directory,
|
||||
|
|
@ -290,6 +298,9 @@ def test_get_filepaths_in_directory(self):
|
|||
metadata_directory, recursive_walk=False, followlinks=3)
|
||||
|
||||
# Test invalid directory argument.
|
||||
# A non-directory.
|
||||
self.assertRaises(tuf.Error, repo.get_filepaths_in_directory,
|
||||
os.path.join(metadata_directory, 'root.json'))
|
||||
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
||||
nonexistent_directory = os.path.join(temporary_directory, 'nonexistent/')
|
||||
self.assertRaises(tuf.Error, repo.get_filepaths_in_directory,
|
||||
|
|
@ -375,6 +386,17 @@ def test_expiration(self):
|
|||
expiration = self.metadata.expiration
|
||||
self.assertTrue(isinstance(expiration, datetime.datetime))
|
||||
|
||||
# test a setter with microseconds, we are forcing the microseconds value
|
||||
expiration = datetime.datetime.today() + datetime.timedelta(weeks = 1)
|
||||
# we force the microseconds value if we are unlucky enough to get a 0
|
||||
if expiration.microsecond == 0:
|
||||
expiration = expiration.replace(microsecond = 1)
|
||||
|
||||
new_expiration = self.metadata.expiration
|
||||
self.assertTrue(isinstance(new_expiration, datetime.datetime))
|
||||
|
||||
# check that the expiration value is truncated
|
||||
self.assertTrue(new_expiration.microsecond == 0)
|
||||
|
||||
# Test improperly formatted datetime.
|
||||
try:
|
||||
|
|
@ -763,6 +785,29 @@ def test_init(self):
|
|||
self.assertRaises(tuf.FormatError, repo_tool.Targets, 'targets_directory/', 3)
|
||||
self.assertRaises(tuf.FormatError, repo_tool.Targets, 'targets_directory/',
|
||||
'targets', 3)
|
||||
|
||||
|
||||
|
||||
def test_call(self):
|
||||
# Test normal case.
|
||||
# Perform a delegation so that a delegated role can be accessed and tested
|
||||
# through __call__(). Example: {targets_object}('role1').
|
||||
keystore_directory = os.path.join('repository_data', 'keystore')
|
||||
public_keypath = os.path.join(keystore_directory, 'root_key.pub')
|
||||
public_key = repo_tool.import_rsa_publickey_from_file(public_keypath)
|
||||
target1_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
||||
|
||||
# Create Targets() object to be tested.
|
||||
targets_object = repo_tool.Targets(self.targets_directory)
|
||||
targets_object.delegate('role1', [public_key], [target1_filepath])
|
||||
|
||||
self.assertTrue(isinstance(targets_object('role1'), repo_tool.Targets))
|
||||
|
||||
# Test invalid (i.e., non-delegated) rolename argument.
|
||||
self.assertRaises(tuf.UnknownRoleError, targets_object, 'unknown_role')
|
||||
|
||||
# Test improperly formatted argument.
|
||||
self.assertRaises(tuf.FormatError, targets_object, 1)
|
||||
|
||||
|
||||
|
||||
|
|
@ -852,8 +897,16 @@ def test_add_target(self):
|
|||
# Test invalid filepath argument (i.e., non-existent or invalid file.)
|
||||
self.assertRaises(tuf.Error, self.targets_object.add_target,
|
||||
'non-existent.txt')
|
||||
|
||||
# Not under the repository's targets directory.
|
||||
self.assertRaises(tuf.Error, self.targets_object.add_target,
|
||||
self.temporary_directory)
|
||||
|
||||
# Not a file (i.e., a valid path, but a directory.)
|
||||
test_directory = os.path.join(self.targets_directory, 'test_directory')
|
||||
os.mkdir(test_directory)
|
||||
self.assertRaises(tuf.Error, self.targets_object.add_target,
|
||||
test_directory)
|
||||
|
||||
|
||||
|
||||
|
|
@ -1047,6 +1100,64 @@ def test_delegate_hashed_bins(self):
|
|||
|
||||
|
||||
|
||||
def test_add_target_to_bin(self):
|
||||
# Test normal case.
|
||||
# Delegate the hashed bins so that add_target_to_bin() can be tested.
|
||||
keystore_directory = os.path.join('repository_data', 'keystore')
|
||||
public_keypath = os.path.join(keystore_directory, 'targets_key.pub')
|
||||
public_key = repo_tool.import_rsa_publickey_from_file(public_keypath)
|
||||
target1_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
||||
|
||||
# Set needed arguments by delegate_hashed_bins().
|
||||
public_keys = [public_key]
|
||||
list_of_targets = [target1_filepath]
|
||||
|
||||
# Delegate to hashed bins. The target filepath to be tested is expected
|
||||
# to contain a hash prefix of 'e', so it should be added to the
|
||||
# 'targets/e' role.
|
||||
self.targets_object.delegate_hashed_bins(list_of_targets, public_keys,
|
||||
number_of_bins=16)
|
||||
|
||||
# Ensure each hashed bin initially contains zero targets.
|
||||
for delegation in self.targets_object.delegations:
|
||||
self.assertTrue(target1_filepath not in delegation.target_files)
|
||||
|
||||
# Add 'target1_filepath' and verify that the relative path of
|
||||
# 'target1_filepath' is added to the correct bin.
|
||||
self.targets_object.add_target_to_bin(target1_filepath)
|
||||
for delegation in self.targets_object.delegations:
|
||||
if delegation.rolename == 'targets/e':
|
||||
self.assertTrue('/file1.txt' in delegation.target_files)
|
||||
|
||||
else:
|
||||
self.assertFalse('/file1.txt' in delegation.target_files)
|
||||
|
||||
# Test for non-existent delegations and hashed bins.
|
||||
empty_targets_role = repo_tool.Targets(self.targets_directory, 'empty')
|
||||
|
||||
self.assertRaises(tuf.Error, empty_targets_role.add_target_to_bin,
|
||||
target1_filepath)
|
||||
|
||||
# Non-bin delegation, although it has a correct hashed bin name.
|
||||
empty_targets_role.delegate('e', [public_key], [target1_filepath])
|
||||
self.assertRaises(tuf.Error, empty_targets_role.add_target_to_bin,
|
||||
target1_filepath)
|
||||
|
||||
# Test for a required hashed bin that does not exist.
|
||||
self.targets_object.revoke('e')
|
||||
self.assertRaises(tuf.Error, self.targets_object.add_target_to_bin,
|
||||
target1_filepath)
|
||||
|
||||
# Test improperly formatted argument.
|
||||
self.assertRaises(tuf.FormatError,
|
||||
self.targets_object.add_target_to_bin, 3)
|
||||
|
||||
# Invalid target file path argument.
|
||||
self.assertRaises(tuf.Error,
|
||||
self.targets_object.add_target_to_bin, '/non-existent')
|
||||
|
||||
|
||||
|
||||
def test_add_restricted_paths(self):
|
||||
# Test normal case.
|
||||
# Perform a delegation so that add_restricted_paths() has a child role
|
||||
|
|
@ -1064,6 +1175,10 @@ def test_add_restricted_paths(self):
|
|||
threshold, restricted_paths=None,
|
||||
path_hash_prefixes=None)
|
||||
|
||||
# Delegate an extra role for test coverage (i.e., check that restricted
|
||||
# paths are not added to a child role not requested.)
|
||||
self.targets_object.delegate('junk_role', public_keys, [])
|
||||
|
||||
restricted_path = os.path.join(self.targets_directory, 'tuf_files')
|
||||
os.mkdir(restricted_path)
|
||||
restricted_paths = [restricted_path]
|
||||
|
|
@ -1094,6 +1209,11 @@ def test_add_restricted_paths(self):
|
|||
# Non-existent 'restricted_paths'.
|
||||
self.assertRaises(tuf.Error, self.targets_object.add_restricted_paths,
|
||||
['/non-existent'], 'tuf')
|
||||
|
||||
# Directory not under the repository's targets directory.
|
||||
repository_directory = os.path.join('repository_data', 'repository')
|
||||
self.assertRaises(tuf.Error, self.targets_object.add_restricted_paths,
|
||||
[repository_directory], 'tuf')
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -55,6 +55,15 @@ def tearDown(self):
|
|||
|
||||
def test_get_signature_status_no_role(self):
|
||||
signable = {'signed' : 'test', 'signatures' : []}
|
||||
|
||||
# A valid, but empty signature status
|
||||
sig_status = tuf.sig.get_signature_status(signable)
|
||||
self.assertTrue(tuf.formats.SIGNATURESTATUS_SCHEMA.matches(sig_status))
|
||||
|
||||
# A valid signable, but non-existent role argument.
|
||||
self.assertRaises(tuf.UnknownRoleError, tuf.sig.get_signature_status,
|
||||
signable, 'unknown_role')
|
||||
|
||||
# Should verify we are not adding a duplicate signature
|
||||
# when doing the following action. Here we know 'signable'
|
||||
# has only one signature so it's okay.
|
||||
|
|
@ -66,6 +75,10 @@ def test_get_signature_status_no_role(self):
|
|||
# No specific role we're considering.
|
||||
sig_status = tuf.sig.get_signature_status(signable, None)
|
||||
|
||||
# Non-existent role.
|
||||
self.assertRaises(tuf.UnknownRoleError, tuf.sig.get_signature_status,
|
||||
signable, 'unknown_role')
|
||||
|
||||
self.assertEqual(0, sig_status['threshold'])
|
||||
self.assertEqual([KEYS[0]['keyid']], sig_status['good_sigs'])
|
||||
self.assertEqual([], sig_status['bad_sigs'])
|
||||
|
|
@ -353,6 +366,9 @@ def test_generate_rsa_signature(self):
|
|||
self.assertEqual(1, len(signable['signatures']))
|
||||
signature = signable['signatures'][0]
|
||||
self.assertEqual(KEYS[0]['keyid'], signature['keyid'])
|
||||
|
||||
returned_signature = tuf.sig.generate_rsa_signature(signable['signed'], KEYS[0])
|
||||
self.assertTrue(tuf.formats.SIGNATURE_SCHEMA.matches(returned_signature))
|
||||
|
||||
signable['signatures'].append(tuf.keys.create_signature(
|
||||
KEYS[1], signable['signed']))
|
||||
|
|
@ -361,6 +377,7 @@ def test_generate_rsa_signature(self):
|
|||
signature = signable['signatures'][1]
|
||||
self.assertEqual(KEYS[1]['keyid'], signature['keyid'])
|
||||
|
||||
|
||||
|
||||
def test_may_need_new_keys(self):
|
||||
# One untrusted key in 'signable'.
|
||||
|
|
|
|||
2
tox.ini
2
tox.ini
|
|
@ -12,7 +12,7 @@ changedir = tests
|
|||
|
||||
commands =
|
||||
coverage run --source tuf aggregate_tests.py
|
||||
coverage report -m
|
||||
coverage report -m
|
||||
|
||||
deps =
|
||||
coverage
|
||||
|
|
|
|||
|
|
@ -43,10 +43,15 @@ Type "help", "copyright", "credits" or "license" for more information.
|
|||
>>> from tuf.repository_tool import *
|
||||
>>> repository = load_repository("path/to/repository")
|
||||
```
|
||||
Note that *tuf.repository_tool.py* is not used in TUF integrations. The
|
||||
Note that **tuf.repository_tool.py** is not used in TUF integrations. The
|
||||
**tuf.interposition** package and **tuf.client.updater** module assist in
|
||||
integrating TUF with a software updater.
|
||||
|
||||
The repository tool requires additional cryptographic libraries and may be
|
||||
installed as follows:
|
||||
```Bash
|
||||
$ pip install tuf[tools]
|
||||
```
|
||||
|
||||
### Keys ###
|
||||
|
||||
|
|
|
|||
|
|
@ -235,6 +235,11 @@
|
|||
# An ED25519 raw signature, which must be 64 bytes.
|
||||
ED25519SIGNATURE_SCHEMA = SCHEMA.LengthBytes(64)
|
||||
|
||||
# Required installation libraries expected by the repository tools and other
|
||||
# cryptography modules.
|
||||
REQUIRED_LIBRARIES_SCHEMA = SCHEMA.ListOf(SCHEMA.OneOf(
|
||||
[SCHEMA.String('general'), SCHEMA.String('ed25519'), SCHEMA.String('rsa')]))
|
||||
|
||||
# An ed25519 TUF key.
|
||||
ED25519KEY_SCHEMA = SCHEMA.Object(
|
||||
object_name = 'ED25519KEY_SCHEMA',
|
||||
|
|
@ -665,6 +670,7 @@ def make_metadata(version, expiration_date, filedict=None, delegations=None):
|
|||
result = {'_type' : 'Targets'}
|
||||
result['version'] = version
|
||||
result['expires'] = expiration_date
|
||||
result['targets'] = {}
|
||||
if filedict is not None:
|
||||
result['targets'] = filedict
|
||||
if delegations is not None:
|
||||
|
|
@ -1297,7 +1303,7 @@ def encode_canonical(object, output_function=None):
|
|||
try:
|
||||
_encode_canonical(object, output_function)
|
||||
|
||||
except TypeError as e:
|
||||
except (TypeError, tuf.FormatError) as e:
|
||||
message = 'Could not encode '+repr(object)+': '+str(e)
|
||||
raise tuf.FormatError(message)
|
||||
|
||||
|
|
|
|||
|
|
@ -102,14 +102,14 @@ def create_keydb_from_root_metadata(root_metadata):
|
|||
try:
|
||||
add_key(key_dict, keyid)
|
||||
|
||||
except tuf.KeyAlreadyExistsError as e:
|
||||
logger.warning(e)
|
||||
continue
|
||||
|
||||
# 'tuf.Error' raised if keyid does not match the keyid for 'rsakey_dict'.
|
||||
except tuf.Error as e:
|
||||
logger.error(e)
|
||||
continue
|
||||
|
||||
except tuf.KeyAlreadyExistsError as e:
|
||||
logger.warning(e)
|
||||
continue
|
||||
|
||||
else:
|
||||
logger.warning('Root Metadata file contains a key with an invalid keytype.')
|
||||
|
|
|
|||
89
tuf/keys.py
89
tuf/keys.py
|
|
@ -205,9 +205,8 @@ def generate_rsa_key(bits=_DEFAULT_RSA_KEY_BITS):
|
|||
tuf.formats.RSAKEYBITS_SCHEMA.check_match(bits)
|
||||
|
||||
# Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified in
|
||||
# 'tuf.conf', are unsupported or unavailable:
|
||||
# 'tuf.conf.RSA_CRYPTO_LIBRARY' and 'tuf.conf.ED25519_CRYPTO_LIBRARY'.
|
||||
_check_crypto_libraries()
|
||||
# 'tuf.conf', are unsupported or unavailable: 'tuf.conf.RSA_CRYPTO_LIBRARY'.
|
||||
check_crypto_libraries(['rsa'])
|
||||
|
||||
# Begin building the RSA key dictionary.
|
||||
rsakey_dict = {}
|
||||
|
|
@ -289,8 +288,8 @@ def generate_ed25519_key():
|
|||
|
||||
# Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified
|
||||
# in 'tuf.conf', are unsupported or unavailable:
|
||||
# 'tuf.conf.RSA_CRYPTO_LIBRARY' and 'tuf.conf.ED25519_CRYPTO_LIBRARY'.
|
||||
_check_crypto_libraries()
|
||||
# 'tuf.conf.ED25519_CRYPTO_LIBRARY'.
|
||||
check_crypto_libraries(['ed25519'])
|
||||
|
||||
# Begin building the ED25519 key dictionary.
|
||||
ed25519_key = {}
|
||||
|
|
@ -511,46 +510,78 @@ def _get_keyid(keytype, key_value):
|
|||
|
||||
|
||||
|
||||
def _check_crypto_libraries():
|
||||
""" Ensure all the crypto libraries specified in tuf.conf are available. """
|
||||
def check_crypto_libraries(required_libraries):
|
||||
"""
|
||||
<Purpose>
|
||||
Public function that ensures the cryptography libraries specified in
|
||||
'tuf.conf' are supported and available for each 'required_libraries'.
|
||||
|
||||
<Arguments>
|
||||
required_libraries:
|
||||
A list of library strings to validate. One, or multiple, strings from
|
||||
['rsa', 'ed25519', 'general'] can be specified.
|
||||
|
||||
<Exceptions>
|
||||
tuf.UnsupportedLibraryError, if the 'required_libraries' and the libraries
|
||||
specified in 'tuf.conf' are not supported or unavailable.
|
||||
|
||||
<Side Effects>
|
||||
Validates the libraries set in 'tuf.conf'.
|
||||
|
||||
<Returns>
|
||||
None.
|
||||
"""
|
||||
|
||||
# Does 'required_libraries' have the correct format?
|
||||
# This check will ensure 'required_libraries' has the appropriate number
|
||||
# of objects and object types, and that all dict keys are properly named.
|
||||
# Raise 'tuf.FormatError' if the check fails.
|
||||
tuf.formats.REQUIRED_LIBRARIES_SCHEMA.check_match(required_libraries)
|
||||
|
||||
# The checks below all raise 'tuf.UnsupportedLibraryError' if the RSA and
|
||||
# ED25519 crypto libraries specified in 'tuf.conf.py' are not supported or
|
||||
# unavailable. The appropriate error message is added to the exception.
|
||||
# The funcions of this module that depend on user-installed crypto libraries
|
||||
# should call this private function to ensure the called routine does not fail
|
||||
# with unpredictable exceptions in the event of a missing library.
|
||||
# The supported and available lists checked are populated when 'tuf.keys.py'
|
||||
# is imported.
|
||||
if _RSA_CRYPTO_LIBRARY not in _SUPPORTED_RSA_CRYPTO_LIBRARIES:
|
||||
# The checks below all raise 'tuf.UnsupportedLibraryError' if the general,
|
||||
# RSA, and ED25519 crypto libraries specified in 'tuf.conf.py' are not
|
||||
# supported or unavailable. The appropriate error message is added to the
|
||||
# exception. The funcions of this module that depend on user-installed
|
||||
# crypto libraries should call this private function to ensure the called
|
||||
# routine does not fail with unpredictable exceptions in the event of a
|
||||
# missing library. The supported and available lists checked are populated
|
||||
# when 'tuf.keys.py' is imported.
|
||||
|
||||
if 'rsa' in required_libraries and _RSA_CRYPTO_LIBRARY not in \
|
||||
_SUPPORTED_RSA_CRYPTO_LIBRARIES:
|
||||
message = 'The '+repr(_RSA_CRYPTO_LIBRARY)+' crypto library specified'+ \
|
||||
' in "tuf.conf.RSA_CRYPTO_LIBRARY" is not supported.\n'+ \
|
||||
'Supported crypto libraries: '+repr(_SUPPORTED_RSA_CRYPTO_LIBRARIES)+'.'
|
||||
raise tuf.UnsupportedLibraryError(message)
|
||||
|
||||
if _ED25519_CRYPTO_LIBRARY not in _SUPPORTED_ED25519_CRYPTO_LIBRARIES:
|
||||
if 'ed25519' in required_libraries and _ED25519_CRYPTO_LIBRARY not in \
|
||||
_SUPPORTED_ED25519_CRYPTO_LIBRARIES:
|
||||
message = 'The '+repr(_ED25519_CRYPTO_LIBRARY)+' crypto library specified'+\
|
||||
' in "tuf.conf.ED25519_CRYPTO_LIBRARY" is not supported.\n'+ \
|
||||
'Supported crypto libraries: '+repr(_SUPPORTED_ED25519_CRYPTO_LIBRARIES)+'.'
|
||||
raise tuf.UnsupportedLibraryError(message)
|
||||
|
||||
if _GENERAL_CRYPTO_LIBRARY not in _SUPPORTED_GENERAL_CRYPTO_LIBRARIES:
|
||||
if 'general' in required_libraries and _GENERAL_CRYPTO_LIBRARY not in \
|
||||
_SUPPORTED_GENERAL_CRYPTO_LIBRARIES:
|
||||
message = 'The '+repr(_GENERAL_CRYPTO_LIBRARY)+' crypto library specified'+\
|
||||
' in "tuf.conf.GENERAL_CRYPTO_LIBRARY" is not supported.\n'+ \
|
||||
'Supported crypto libraries: '+repr(_SUPPORTED_GENERAL_CRYPTO_LIBRARIES)+'.'
|
||||
raise tuf.UnsupportedLibraryError(message)
|
||||
|
||||
if _RSA_CRYPTO_LIBRARY not in _available_crypto_libraries:
|
||||
if 'rsa' in required_libraries and _RSA_CRYPTO_LIBRARY not in \
|
||||
_available_crypto_libraries:
|
||||
message = 'The '+repr(_RSA_CRYPTO_LIBRARY)+' crypto library specified'+ \
|
||||
' in "tuf.conf.RSA_CRYPTO_LIBRARY" could not be imported.'
|
||||
raise tuf.UnsupportedLibraryError(message)
|
||||
|
||||
if _ED25519_CRYPTO_LIBRARY not in _available_crypto_libraries:
|
||||
if 'ed25519' in required_libraries and _ED25519_CRYPTO_LIBRARY not in \
|
||||
_available_crypto_libraries:
|
||||
message = 'The '+repr(_ED25519_CRYPTO_LIBRARY)+' crypto library specified'+\
|
||||
' in "tuf.conf.ED25519_CRYPTO_LIBRARY" could not be imported.'
|
||||
raise tuf.UnsupportedLibraryError(message)
|
||||
|
||||
if _GENERAL_CRYPTO_LIBRARY not in _available_crypto_libraries:
|
||||
if 'general' in required_libraries and _GENERAL_CRYPTO_LIBRARY not in \
|
||||
_available_crypto_libraries:
|
||||
message = 'The '+repr(_GENERAL_CRYPTO_LIBRARY)+' crypto library specified'+\
|
||||
' in "tuf.conf.GENERAL_CRYPTO_LIBRARY" could not be imported.'
|
||||
raise tuf.UnsupportedLibraryError(message)
|
||||
|
|
@ -636,8 +667,8 @@ def create_signature(key_dict, data):
|
|||
|
||||
# Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified
|
||||
# in 'tuf.conf', are unsupported or unavailable:
|
||||
# 'tuf.conf.RSA_CRYPTO_LIBRARY' and 'tuf.conf.ED25519_CRYPTO_LIBRARY'.
|
||||
_check_crypto_libraries()
|
||||
# 'tuf.conf.RSA_CRYPTO_LIBRARY' or 'tuf.conf.ED25519_CRYPTO_LIBRARY'.
|
||||
check_crypto_libraries([key_dict['keytype']])
|
||||
|
||||
# Signing the 'data' object requires a private key.
|
||||
# The 'RSASSA-PSS' (i.e., PyCrypto module) and 'ed25519' (i.e., PyNaCl and the
|
||||
|
|
@ -893,8 +924,8 @@ def import_rsakey_from_encrypted_pem(encrypted_pem, password):
|
|||
|
||||
# Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified in
|
||||
# 'tuf.conf', are unsupported or unavailable:
|
||||
# 'tuf.conf.RSA_CRYPTO_LIBRARY' and 'tuf.conf.ED25519_CRYPTO_LIBRARY'.
|
||||
_check_crypto_libraries()
|
||||
# 'tuf.conf.RSA_CRYPTO_LIBRARY' and 'tuf.conf.GENERAL_CRYPTO_LIBRARY'.
|
||||
check_crypto_libraries(['rsa', 'general'])
|
||||
|
||||
# Begin building the RSA key dictionary.
|
||||
rsakey_dict = {}
|
||||
|
|
@ -1076,7 +1107,7 @@ def encrypt_key(key_object, password):
|
|||
# Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified in
|
||||
# 'tuf.conf', are unsupported or unavailable:
|
||||
# 'tuf.conf.GENERAL_CRYPTO_LIBRARY'.
|
||||
_check_crypto_libraries()
|
||||
check_crypto_libraries(['general'])
|
||||
|
||||
# Encrypted string of 'key_object'. The encrypted string may be safely saved
|
||||
# to a file and stored offline.
|
||||
|
|
@ -1172,7 +1203,7 @@ def decrypt_key(encrypted_key, passphrase):
|
|||
# Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified in
|
||||
# 'tuf.conf', are unsupported or unavailable:
|
||||
# 'tuf.conf.GENERAL_CRYPTO_LIBRARY'.
|
||||
_check_crypto_libraries()
|
||||
check_crypto_libraries(['general'])
|
||||
|
||||
# Store and return the decrypted key object.
|
||||
key_object = None
|
||||
|
|
@ -1255,8 +1286,8 @@ def create_rsa_encrypted_pem(private_key, passphrase):
|
|||
|
||||
# Raise 'tuf.UnsupportedLibraryError' if the following libraries, specified in
|
||||
# 'tuf.conf', are unsupported or unavailable:
|
||||
# 'tuf.conf.RSA_CRYPTO_LIBRARY'.
|
||||
_check_crypto_libraries()
|
||||
# 'tuf.conf.RSA_CRYPTO_LIBRARY' and 'tuf.conf.GENERAL_CRYPTO_LIBRARY'.
|
||||
check_crypto_libraries(['rsa', 'general'])
|
||||
|
||||
encrypted_pem = None
|
||||
|
||||
|
|
|
|||
15
tuf/log.py
15
tuf/log.py
|
|
@ -175,7 +175,8 @@ def filter(self, record):
|
|||
def set_log_level(log_level=_DEFAULT_LOG_LEVEL):
|
||||
"""
|
||||
<Purpose>
|
||||
Allow the default log level to be overridden.
|
||||
Allow the default log level to be overridden. If 'log_level' is not
|
||||
provided, log level defaults to 'logging.DEBUG'.
|
||||
|
||||
<Arguments>
|
||||
log_level:
|
||||
|
|
@ -205,7 +206,8 @@ def set_log_level(log_level=_DEFAULT_LOG_LEVEL):
|
|||
def set_filehandler_log_level(log_level=_DEFAULT_FILE_LOG_LEVEL):
|
||||
"""
|
||||
<Purpose>
|
||||
Allow the default file handler log level to be overridden.
|
||||
Allow the default file handler log level to be overridden. If 'log_level'
|
||||
is not provided, log level defaults to 'logging.DEBUG'.
|
||||
|
||||
<Arguments>
|
||||
log_level:
|
||||
|
|
@ -235,7 +237,8 @@ def set_filehandler_log_level(log_level=_DEFAULT_FILE_LOG_LEVEL):
|
|||
def set_console_log_level(log_level=_DEFAULT_CONSOLE_LOG_LEVEL):
|
||||
"""
|
||||
<Purpose>
|
||||
Allow the default log level for console messages to be overridden.
|
||||
Allow the default log level for console messages to be overridden. If
|
||||
'log_level' is not provided, log level defaults to 'logging.INFO'.
|
||||
|
||||
<Arguments>
|
||||
log_level:
|
||||
|
|
@ -262,6 +265,7 @@ def set_console_log_level(log_level=_DEFAULT_CONSOLE_LOG_LEVEL):
|
|||
|
||||
if console_handler is not None:
|
||||
console_handler.setLevel(log_level)
|
||||
|
||||
else:
|
||||
message = 'The console handler has not been set with add_console_handler().'
|
||||
raise tuf.Error(message)
|
||||
|
|
@ -302,14 +306,18 @@ def add_console_handler(log_level=_DEFAULT_CONSOLE_LOG_LEVEL):
|
|||
# Set the console handler for the logger. The built-in console handler will
|
||||
# log messages to 'sys.stderr' and capture 'log_level' messages.
|
||||
console_handler = logging.StreamHandler()
|
||||
|
||||
# Get our filter for the console handler.
|
||||
console_filter = ConsoleFilter()
|
||||
console_format_string = '%(message)s'
|
||||
formatter = logging.Formatter(console_format_string)
|
||||
|
||||
console_handler.setLevel(log_level)
|
||||
console_handler.setFormatter(formatter)
|
||||
console_handler.addFilter(console_filter)
|
||||
logger.addHandler(console_handler)
|
||||
logger.debug('Added a console handler.')
|
||||
|
||||
else:
|
||||
logger.warning('We already have a console handler.')
|
||||
|
||||
|
|
@ -343,5 +351,6 @@ def remove_console_handler():
|
|||
logger.removeHandler(console_handler)
|
||||
console_handler = None
|
||||
logger.debug('Removed a console handler.')
|
||||
|
||||
else:
|
||||
logger.warning('We do not have a console handler.')
|
||||
|
|
|
|||
|
|
@ -119,6 +119,15 @@
|
|||
TIMESTAMP_EXPIRES_WARN_SECONDS = 86400
|
||||
|
||||
|
||||
try:
|
||||
tuf.keys.check_crypto_libraries(['rsa', 'ed25519', 'general'])
|
||||
|
||||
except tuf.UnsupportedLibraryError as e:
|
||||
message = 'Warning: The repository and developer tools require additional' + \
|
||||
' libraries and can be installed as follows:\n $ pip install tuf[tools]'
|
||||
logger.warn(message)
|
||||
|
||||
|
||||
class Repository(object):
|
||||
"""
|
||||
<Purpose>
|
||||
|
|
@ -231,8 +240,8 @@ def write(self, write_partial=False, consistent_snapshot=False):
|
|||
tuf.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot)
|
||||
|
||||
# At this point the tuf.keydb and tuf.roledb stores must be fully
|
||||
# populated, otherwise write() throwns a 'tuf.Repository' exception if
|
||||
# any of the top-level roles are missing signatures, keys, etc.
|
||||
# populated, otherwise write() throwns a 'tuf.UnsignedMetadataError'
|
||||
# exception if any of the top-level roles are missing signatures, keys, etc.
|
||||
|
||||
# Write the metadata files of all the delegated roles. Ensure target paths
|
||||
# are allowed, metadata is valid and properly signed, and required files and
|
||||
|
|
@ -432,23 +441,23 @@ def status(self):
|
|||
except tuf.UnsignedMetadataError as e:
|
||||
insufficient_signatures.append(delegated_role)
|
||||
|
||||
# Print the verification results of the delegated roles and return
|
||||
# Log the verification results of the delegated roles and return
|
||||
# immediately after each invalid case.
|
||||
if len(insufficient_keys):
|
||||
message = \
|
||||
'Delegated roles with insufficient keys:\n'+repr(insufficient_keys)
|
||||
print(message)
|
||||
logger.info(message)
|
||||
return
|
||||
|
||||
if len(insufficient_signatures):
|
||||
message = \
|
||||
'Delegated roles with insufficient signatures:\n'+\
|
||||
repr(insufficient_signatures)
|
||||
print(message)
|
||||
logger.info(message)
|
||||
return
|
||||
|
||||
# Verify the top-level roles and print the results.
|
||||
_print_status_of_top_level_roles(targets_directory, metadata_directory)
|
||||
# Verify the top-level roles and log the results.
|
||||
_log_status_of_top_level_roles(targets_directory, metadata_directory)
|
||||
|
||||
finally:
|
||||
shutil.rmtree(temp_repository_directory, ignore_errors=True)
|
||||
|
|
@ -1131,7 +1140,8 @@ def expiration(self, datetime_object):
|
|||
tuf.Error, if 'datetime_object' has already expired.
|
||||
|
||||
<Side Effects>
|
||||
Modifies the expiration attribute of the Repository object.
|
||||
Modifies the expiration attribute of the Repository object.
|
||||
The datetime given will be truncated to microseconds = 0
|
||||
|
||||
<Returns>
|
||||
None.
|
||||
|
|
@ -1143,6 +1153,10 @@ def expiration(self, datetime_object):
|
|||
message = repr(datetime_object) + ' is not a datetime.datetime() object.'
|
||||
raise tuf.FormatError(message)
|
||||
|
||||
# truncate the microseconds value to produce a correct schema string
|
||||
# of the form yyyy-mm-ddThh:mm:ssZ
|
||||
datetime_object = datetime_object.replace(microsecond = 0)
|
||||
|
||||
# Ensure the expiration has not already passed.
|
||||
current_datetime_object = \
|
||||
tuf.formats.unix_timestamp_to_datetime(int(time.time()))
|
||||
|
|
@ -1553,11 +1567,14 @@ def __call__(self, rolename):
|
|||
<Exceptions>
|
||||
tuf.FormatError, if the arguments are improperly formatted.
|
||||
|
||||
tuf.UnknownRoleError, if 'rolename' has not been delegated by this
|
||||
Targets object.
|
||||
|
||||
<Side Effects>
|
||||
Modifies the roleinfo of the targets role in 'tuf.roledb'.
|
||||
|
||||
<Returns>
|
||||
None.
|
||||
The Targets object of 'rolename'.
|
||||
"""
|
||||
|
||||
# Do the arguments have the correct format?
|
||||
|
|
@ -1568,6 +1585,7 @@ def __call__(self, rolename):
|
|||
|
||||
if rolename in self._delegated_roles:
|
||||
return self._delegated_roles[rolename]
|
||||
|
||||
else:
|
||||
message = repr(rolename)+' has not been delegated by '+repr(self.rolename)
|
||||
raise tuf.UnknownRoleError(message)
|
||||
|
|
@ -1812,6 +1830,7 @@ def add_targets(self, list_of_targets):
|
|||
|
||||
if os.path.isfile(filepath):
|
||||
relative_list_of_targets.append(filepath[targets_directory_length:])
|
||||
|
||||
else:
|
||||
message = repr(filepath)+' is not a valid file.'
|
||||
raise tuf.Error(message)
|
||||
|
|
@ -2331,6 +2350,109 @@ def delegate_hashed_bins(self, list_of_targets, keys_of_hashed_bins,
|
|||
|
||||
|
||||
|
||||
def add_target_to_bin(self, target_filepath):
|
||||
"""
|
||||
<Purpose>
|
||||
Add the fileinfo of 'target_filepath' to the expected hashed bin if
|
||||
the bin is available. The hashed bin should have been created by
|
||||
{targets_role}.delegate_hashed_bins(). Assuming the target filepath
|
||||
falls under the repository's targets directory, determine the filepath's
|
||||
hash prefix, locate the expected bin (if any), and then add the fileinfo
|
||||
to the expected bin. Example: 'targets/foo.tar.gz' may be added to
|
||||
the 'targets/unclaimed/58-5f.json' role's list of targets by calling this
|
||||
method.
|
||||
|
||||
>>>
|
||||
>>>
|
||||
>>>
|
||||
|
||||
<Arguments>
|
||||
target_filepath:
|
||||
The filepath of the target to be added to a hashed bin. The filepath
|
||||
must fall under repository's targets directory.
|
||||
|
||||
<Exceptions>
|
||||
tuf.FormatError, if 'target_filepath' is improperly formatted.
|
||||
|
||||
tuf.Error, if 'target_filepath' cannot be added to a hashed bin
|
||||
(e.g., an invalid target filepath, or the expected hashed bin does not
|
||||
exist.)
|
||||
|
||||
<Side Effects>
|
||||
The fileinfo of 'target_filepath' is added to a hashed bin of this Targets
|
||||
object.
|
||||
|
||||
<Returns>
|
||||
None.
|
||||
"""
|
||||
|
||||
# Do the arguments have the correct format?
|
||||
# Ensure the arguments have the appropriate number of objects and object
|
||||
# types, and that all dict keys are properly named.
|
||||
# Raise 'tuf.FormatError' if there is a mismatch.
|
||||
tuf.formats.PATH_SCHEMA.check_match(target_filepath)
|
||||
|
||||
# Determine the prefix length of any one of the hashed bins. The prefix
|
||||
# length is not stored in the roledb, so it must be determined here by
|
||||
# inspecting one of path hash prefixes listed.
|
||||
roleinfo = tuf.roledb.get_roleinfo(self.rolename)
|
||||
prefix_length = 0
|
||||
delegation = None
|
||||
|
||||
# Set 'delegation' if this Targets role has performed any delegations.
|
||||
if len(roleinfo['delegations']['roles']):
|
||||
delegation = roleinfo['delegations']['roles'][0]
|
||||
|
||||
else:
|
||||
raise tuf.Error(self.rolename + ' has not delegated to any roles.')
|
||||
|
||||
# Set 'prefix_length' if this Targets object has delegated to hashed bins,
|
||||
# otherwise raise an exception.
|
||||
if 'path_hash_prefixes' in delegation and len(delegation['path_hash_prefixes']):
|
||||
prefix_length = len(delegation['path_hash_prefixes'][0])
|
||||
|
||||
else:
|
||||
raise tuf.Error(self.rolename + ' has not delegated to hashed bins.')
|
||||
|
||||
# Ensure the filepath falls under the repository's targets directory.
|
||||
filepath = os.path.abspath(target_filepath)
|
||||
if not filepath.startswith(self._targets_directory + os.sep):
|
||||
message = repr(filepath)+' is not under the Repository\'s targets '+\
|
||||
'directory: '+repr(self._targets_directory)
|
||||
raise tuf.Error(message)
|
||||
|
||||
# Determine the hash prefix of 'target_path' by computing the digest of
|
||||
# its path relative to the targets directory. Example:
|
||||
# '{repository_root}/targets/file1.txt' -> '/file1.txt'.
|
||||
relative_path = filepath[len(self._targets_directory):]
|
||||
digest_object = tuf.hash.digest(algorithm=HASH_FUNCTION)
|
||||
digest_object.update(relative_path)
|
||||
path_hash = digest_object.hexdigest()
|
||||
path_hash_prefix = path_hash[:prefix_length]
|
||||
|
||||
# Search for 'path_hash_prefix', and if found, extract the hashed bin's
|
||||
# rolename. The hashed bin name is needed so that 'target_filepath' can be
|
||||
# added to the Targets object of the hashed bin.
|
||||
hashed_bin_name = None
|
||||
for delegation in roleinfo['delegations']['roles']:
|
||||
if path_hash_prefix in delegation['path_hash_prefixes']:
|
||||
hashed_bin_name = delegation['name']
|
||||
break
|
||||
|
||||
else:
|
||||
continue
|
||||
|
||||
# 'self._delegated_roles' is keyed by relative rolenames, so update
|
||||
# 'hashed_bin_name'.
|
||||
if hashed_bin_name is not None:
|
||||
hashed_bin_name = hashed_bin_name[len(self.rolename)+1:]
|
||||
self._delegated_roles[hashed_bin_name].add_target(target_filepath)
|
||||
|
||||
else:
|
||||
raise tuf.Error(target_filepath + ' cannot be added to any bins.')
|
||||
|
||||
|
||||
|
||||
@property
|
||||
def delegations(self):
|
||||
"""
|
||||
|
|
@ -2405,10 +2527,10 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
|
|||
root_filename = filenames['root']
|
||||
targets_filename = filenames['targets']
|
||||
metadata = generate_snapshot_metadata(metadata_directory,
|
||||
roleinfo['version'],
|
||||
roleinfo['expires'], root_filename,
|
||||
targets_filename,
|
||||
consistent_snapshot)
|
||||
roleinfo['version'],
|
||||
roleinfo['expires'], root_filename,
|
||||
targets_filename,
|
||||
consistent_snapshot)
|
||||
|
||||
_log_warning_if_expires_soon(SNAPSHOT_FILENAME, roleinfo['expires'],
|
||||
SNAPSHOT_EXPIRES_WARN_SECONDS)
|
||||
|
|
@ -2473,13 +2595,13 @@ def _generate_and_write_metadata(rolename, metadata_filename, write_partial,
|
|||
|
||||
|
||||
|
||||
def _print_status_of_top_level_roles(targets_directory, metadata_directory):
|
||||
def _log_status_of_top_level_roles(targets_directory, metadata_directory):
|
||||
"""
|
||||
Non-public function that prints whether any of the top-level roles contain an
|
||||
Non-public function that logs whether any of the top-level roles contain an
|
||||
invalid number of public and private keys, or an insufficient threshold of
|
||||
signatures. Considering that the top-level metadata have to be verified in
|
||||
the expected root -> targets -> snapshot -> timestamp order, this function
|
||||
prints the error message and returns as soon as a required metadata file is
|
||||
logs the error message and returns as soon as a required metadata file is
|
||||
found to be invalid. It is assumed here that the delegated roles have been
|
||||
written and verified. Example output:
|
||||
|
||||
|
|
@ -2507,8 +2629,8 @@ def _print_status_of_top_level_roles(targets_directory, metadata_directory):
|
|||
try:
|
||||
_check_role_keys(rolename)
|
||||
|
||||
except tuf.InsufficientKeysError as e:
|
||||
print(str(e))
|
||||
except tuf.InsufficientKeysError, e:
|
||||
logger.info(str(e))
|
||||
return
|
||||
|
||||
# Do the top-level roles contain a valid threshold of signatures? Top-level
|
||||
|
|
@ -2518,13 +2640,13 @@ def _print_status_of_top_level_roles(targets_directory, metadata_directory):
|
|||
signable, root_filename = \
|
||||
_generate_and_write_metadata('root', root_filename, False,
|
||||
targets_directory, metadata_directory)
|
||||
_print_status('root', signable)
|
||||
_log_status('root', signable)
|
||||
|
||||
# 'tuf.UnsignedMetadataError' raised if metadata contains an invalid threshold
|
||||
# of signatures. Print the valid/threshold message, where valid < threshold.
|
||||
# of signatures. log the valid/threshold message, where valid < threshold.
|
||||
except tuf.UnsignedMetadataError as e:
|
||||
signable = e[1]
|
||||
_print_status('root', signable)
|
||||
_log_status('root', signable)
|
||||
return
|
||||
|
||||
# Verify the metadata of the Targets role.
|
||||
|
|
@ -2532,11 +2654,11 @@ def _print_status_of_top_level_roles(targets_directory, metadata_directory):
|
|||
signable, targets_filename = \
|
||||
_generate_and_write_metadata('targets', targets_filename, False,
|
||||
targets_directory, metadata_directory)
|
||||
_print_status('targets', signable)
|
||||
_log_status('targets', signable)
|
||||
|
||||
except tuf.UnsignedMetadataError as e:
|
||||
signable = e[1]
|
||||
_print_status('targets', signable)
|
||||
_log_status('targets', signable)
|
||||
return
|
||||
|
||||
# Verify the metadata of the snapshot role.
|
||||
|
|
@ -2546,11 +2668,11 @@ def _print_status_of_top_level_roles(targets_directory, metadata_directory):
|
|||
_generate_and_write_metadata('snapshot', snapshot_filename, False,
|
||||
targets_directory, metadata_directory,
|
||||
False, filenames)
|
||||
_print_status('snapshot', signable)
|
||||
_log_status('snapshot', signable)
|
||||
|
||||
except tuf.UnsignedMetadataError as e:
|
||||
signable = e[1]
|
||||
_print_status('snapshot', signable)
|
||||
_log_status('snapshot', signable)
|
||||
return
|
||||
|
||||
# Verify the metadata of the Timestamp role.
|
||||
|
|
@ -2560,19 +2682,19 @@ def _print_status_of_top_level_roles(targets_directory, metadata_directory):
|
|||
_generate_and_write_metadata('timestamp', snapshot_filename, False,
|
||||
targets_directory, metadata_directory,
|
||||
False, filenames)
|
||||
_print_status('timestamp', signable)
|
||||
_log_status('timestamp', signable)
|
||||
|
||||
except tuf.UnsignedMetadataError as e:
|
||||
signable = e[1]
|
||||
_print_status('timestamp', signable)
|
||||
_log_status('timestamp', signable)
|
||||
return
|
||||
|
||||
|
||||
|
||||
|
||||
def _print_status(rolename, signable):
|
||||
def _log_status(rolename, signable):
|
||||
"""
|
||||
Non-public function prints the number of (good/threshold) signatures of
|
||||
Non-public function logs the number of (good/threshold) signatures of
|
||||
'rolename'.
|
||||
"""
|
||||
|
||||
|
|
@ -2580,7 +2702,7 @@ def _print_status(rolename, signable):
|
|||
|
||||
message = repr(rolename)+' role contains '+ repr(len(status['good_sigs']))+\
|
||||
' / '+repr(status['threshold'])+' signatures.'
|
||||
print(message)
|
||||
logger.info(message)
|
||||
|
||||
|
||||
|
||||
|
|
@ -2588,7 +2710,7 @@ def _print_status(rolename, signable):
|
|||
|
||||
def _prompt(message, result_type=str):
|
||||
"""
|
||||
Non-public function that prompts the user for input by printing 'message',
|
||||
Non-public function that prompts the user for input by loging 'message',
|
||||
converting the input to 'result_type', and returning the value to the
|
||||
caller.
|
||||
"""
|
||||
|
|
|
|||
|
|
@ -122,6 +122,7 @@ def get_signature_status(signable, role=None):
|
|||
# Identify unrecognized key.
|
||||
try:
|
||||
key = tuf.keydb.get_key(keyid)
|
||||
|
||||
except tuf.UnknownKeyError:
|
||||
unknown_sigs.append(keyid)
|
||||
continue
|
||||
|
|
@ -129,6 +130,7 @@ def get_signature_status(signable, role=None):
|
|||
# Identify key using an unknown key signing method.
|
||||
try:
|
||||
valid_sig = tuf.keys.verify_signature(key, signature, signed)
|
||||
|
||||
except tuf.UnknownMethodError:
|
||||
unknown_method_sigs.append(keyid)
|
||||
continue
|
||||
|
|
@ -157,8 +159,10 @@ def get_signature_status(signable, role=None):
|
|||
if role is not None:
|
||||
try:
|
||||
threshold = tuf.roledb.get_role_threshold(role)
|
||||
|
||||
except tuf.UnknownRoleError:
|
||||
raise
|
||||
|
||||
else:
|
||||
threshold = 0
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue