mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
The repository tools should properly write and load consistent versions of root.json and snapshot.json. Version numbers were previously prepended to these two roles.
1527 lines
57 KiB
Python
Executable file
1527 lines
57 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
"""
|
|
<Program Name>
|
|
test_repository_tool.py
|
|
|
|
<Author>
|
|
Vladimir Diaz <vladimir.v.diaz@gmail.com>
|
|
|
|
<Started>
|
|
April 7, 2014.
|
|
|
|
<Copyright>
|
|
See LICENSE for licensing information.
|
|
|
|
<Purpose>
|
|
Unit test for 'repository_tool.py'.
|
|
"""
|
|
|
|
# Help with Python 3 compatibility, where the print statement is a function, an
|
|
# implicit relative import is invalid, and the '/' operator performs true
|
|
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
|
|
from __future__ import print_function
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import unicode_literals
|
|
|
|
import os
|
|
import time
|
|
import datetime
|
|
import unittest
|
|
import logging
|
|
import tempfile
|
|
import shutil
|
|
import sys
|
|
|
|
# 'unittest2' required for testing under Python < 2.7.
|
|
if sys.version_info >= (2, 7):
|
|
import unittest
|
|
|
|
else:
|
|
import unittest2 as unittest
|
|
|
|
import tuf
|
|
import tuf.log
|
|
import tuf.formats
|
|
import tuf.roledb
|
|
import tuf.keydb
|
|
import tuf.hash
|
|
import tuf.repository_tool as repo_tool
|
|
|
|
import six
|
|
|
|
logger = logging.getLogger('tuf.test_repository_tool')
|
|
|
|
repo_tool.disable_console_log_messages()
|
|
|
|
|
|
class TestRepository(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
# setUpClass() is called before tests in an individual class are executed.
|
|
|
|
# Create a temporary directory to store the repository, metadata, and target
|
|
# files. 'temporary_directory' must be deleted in TearDownClass() so that
|
|
# temporary files are always removed, even when exceptions occur.
|
|
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
|
|
|
|
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
|
|
# tearDownModule() is called after all the tests have run.
|
|
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
|
|
|
|
# Remove the temporary repository directory, which should contain all the
|
|
# metadata, targets, and key files generated for the test cases.
|
|
shutil.rmtree(cls.temporary_directory)
|
|
|
|
|
|
|
|
def setUp(self):
|
|
pass
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb()
|
|
tuf.keydb.clear_keydb()
|
|
|
|
|
|
def test_init(self):
|
|
|
|
# Test normal case.
|
|
repository = repo_tool.Repository('repository_directory/',
|
|
'metadata_directory/',
|
|
'targets_directory/')
|
|
self.assertTrue(isinstance(repository.root, repo_tool.Root))
|
|
self.assertTrue(isinstance(repository.snapshot, repo_tool.Snapshot))
|
|
self.assertTrue(isinstance(repository.timestamp, repo_tool.Timestamp))
|
|
self.assertTrue(isinstance(repository.targets, repo_tool.Targets))
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(tuf.FormatError, repo_tool.Repository, 3,
|
|
'metadata_directory/', 'targets_directory')
|
|
self.assertRaises(tuf.FormatError, repo_tool.Repository,
|
|
'repository_directory', 3, 'targets_directory')
|
|
self.assertRaises(tuf.FormatError, repo_tool.Repository,
|
|
'repository_directory', 'metadata_directory', 3)
|
|
|
|
|
|
|
|
def test_write_and_write_partial(self):
|
|
# Test creation of a TUF repository.
|
|
#
|
|
# 1. Import public and private keys.
|
|
# 2. Add verification keys.
|
|
# 3. Load signing keys.
|
|
# 4. Add target files.
|
|
# 5. Perform delegation.
|
|
# 5. write()
|
|
#
|
|
# Copy the target files from 'tuf/tests/repository_data' so that write()
|
|
# has target fileinfo to include in metadata.
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
targets_directory = os.path.join(temporary_directory, 'repository',
|
|
repo_tool.TARGETS_DIRECTORY_NAME)
|
|
original_targets_directory = os.path.join('repository_data',
|
|
'repository', 'targets')
|
|
shutil.copytree(original_targets_directory, targets_directory)
|
|
|
|
# In this case, create_new_repository() creates the 'repository/'
|
|
# sub-directory in 'temporary_directory' if it does not exist.
|
|
repository_directory = os.path.join(temporary_directory, 'repository')
|
|
metadata_directory = os.path.join(repository_directory,
|
|
repo_tool.METADATA_STAGED_DIRECTORY_NAME)
|
|
repository = repo_tool.create_new_repository(repository_directory)
|
|
|
|
|
|
# (1) Load the public and private keys of the top-level roles, and one
|
|
# delegated role.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
|
|
# Load the public keys.
|
|
root_pubkey_path = os.path.join(keystore_directory, 'root_key.pub')
|
|
targets_pubkey_path = os.path.join(keystore_directory, 'targets_key.pub')
|
|
snapshot_pubkey_path = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
timestamp_pubkey_path = os.path.join(keystore_directory, 'timestamp_key.pub')
|
|
role1_pubkey_path = os.path.join(keystore_directory, 'delegation_key.pub')
|
|
|
|
root_pubkey = repo_tool.import_rsa_publickey_from_file(root_pubkey_path)
|
|
targets_pubkey = \
|
|
repo_tool.import_rsa_publickey_from_file(targets_pubkey_path)
|
|
snapshot_pubkey = \
|
|
repo_tool.import_rsa_publickey_from_file(snapshot_pubkey_path)
|
|
timestamp_pubkey = \
|
|
repo_tool.import_rsa_publickey_from_file(timestamp_pubkey_path)
|
|
role1_pubkey = repo_tool.import_rsa_publickey_from_file(role1_pubkey_path)
|
|
|
|
# Load the private keys.
|
|
root_privkey_path = os.path.join(keystore_directory, 'root_key')
|
|
targets_privkey_path = os.path.join(keystore_directory, 'targets_key')
|
|
snapshot_privkey_path = os.path.join(keystore_directory, 'snapshot_key')
|
|
timestamp_privkey_path = os.path.join(keystore_directory, 'timestamp_key')
|
|
role1_privkey_path = os.path.join(keystore_directory, 'delegation_key')
|
|
|
|
root_privkey = \
|
|
repo_tool.import_rsa_privatekey_from_file(root_privkey_path, 'password')
|
|
targets_privkey = \
|
|
repo_tool.import_rsa_privatekey_from_file(targets_privkey_path,
|
|
'password')
|
|
snapshot_privkey = \
|
|
repo_tool.import_rsa_privatekey_from_file(snapshot_privkey_path,
|
|
'password')
|
|
timestamp_privkey = \
|
|
repo_tool.import_rsa_privatekey_from_file(timestamp_privkey_path,
|
|
'password')
|
|
role1_privkey = \
|
|
repo_tool.import_rsa_privatekey_from_file(role1_privkey_path,
|
|
'password')
|
|
|
|
|
|
# (2) Add top-level verification keys.
|
|
repository.root.add_verification_key(root_pubkey)
|
|
repository.targets.add_verification_key(targets_pubkey)
|
|
repository.snapshot.add_verification_key(snapshot_pubkey)
|
|
|
|
# Verify that repository.write() fails for insufficient threshold
|
|
# of signatures (default threshold = 1).
|
|
self.assertRaises(tuf.UnsignedMetadataError, repository.write)
|
|
|
|
repository.timestamp.add_verification_key(timestamp_pubkey)
|
|
|
|
|
|
# (3) Load top-level signing keys.
|
|
repository.root.load_signing_key(root_privkey)
|
|
repository.targets.load_signing_key(targets_privkey)
|
|
repository.snapshot.load_signing_key(snapshot_privkey)
|
|
|
|
# Verify that repository.write() fails for insufficient threshold
|
|
# of signatures (default threshold = 1).
|
|
self.assertRaises(tuf.UnsignedMetadataError, repository.write)
|
|
|
|
repository.timestamp.load_signing_key(timestamp_privkey)
|
|
|
|
|
|
# (4) Add target files.
|
|
target1 = os.path.join(targets_directory, 'file1.txt')
|
|
target2 = os.path.join(targets_directory, 'file2.txt')
|
|
target3 = os.path.join(targets_directory, 'file3.txt')
|
|
repository.targets.add_target(target1)
|
|
repository.targets.add_target(target2)
|
|
|
|
# (5) Perform delegation.
|
|
repository.targets.delegate('role1', [role1_pubkey], [target3])
|
|
repository.targets('role1').load_signing_key(role1_privkey)
|
|
|
|
# (6) Write repository.
|
|
repository.targets.compressions = ['gz']
|
|
repository.write()
|
|
|
|
|
|
# Verify that the expected metadata is written.
|
|
for role in ['root.json', 'targets.json', 'snapshot.json', 'timestamp.json']:
|
|
role_filepath = os.path.join(metadata_directory, role)
|
|
role_signable = tuf.util.load_json_file(role_filepath)
|
|
|
|
# Raise 'tuf.FormatError' if 'role_signable' is an invalid signable.
|
|
tuf.formats.check_signable_object_format(role_signable)
|
|
|
|
if role == 'targets.json':
|
|
compressed_filepath = role_filepath + '.gz'
|
|
self.assertTrue(os.path.exists(compressed_filepath))
|
|
|
|
# Verify the 'role1.json' delegation is also written.
|
|
role1_filepath = os.path.join(metadata_directory, 'role1.json')
|
|
role1_signable = tuf.util.load_json_file(role1_filepath)
|
|
tuf.formats.check_signable_object_format(role1_signable)
|
|
|
|
# Verify that an exception is *not* raised for multiple repository.write().
|
|
repository.write()
|
|
|
|
# Verify the status() does not raise an exception.
|
|
repository.status()
|
|
|
|
# Verify status() does not raise 'tuf.InsufficientKeysError' if a top-level
|
|
# role does and 'role1' do not contain a threshold of keys.
|
|
root_roleinfo = tuf.roledb.get_roleinfo('root')
|
|
old_threshold = root_roleinfo['threshold']
|
|
root_roleinfo['threshold'] = 10
|
|
role1_roleinfo = tuf.roledb.get_roleinfo('role1')
|
|
old_role1_threshold = role1_roleinfo['threshold']
|
|
role1_roleinfo['threshold'] = 10
|
|
tuf.roledb.update_roleinfo('root', root_roleinfo)
|
|
tuf.roledb.update_roleinfo('role1', role1_roleinfo)
|
|
repository.status()
|
|
|
|
# Restore the original threshold values.
|
|
root_roleinfo['threshold'] = old_threshold
|
|
tuf.roledb.update_roleinfo('root', root_roleinfo)
|
|
role1_roleinfo['threshold'] = old_role1_threshold
|
|
tuf.roledb.update_roleinfo('role1', role1_roleinfo)
|
|
|
|
|
|
# Verify status() does not raise 'tuf.UnsignedMetadataError' if any of the
|
|
# the top-level roles and 'role1' are improperly signed.
|
|
repository.root.unload_signing_key(root_privkey)
|
|
repository.root.load_signing_key(targets_privkey)
|
|
repository.targets('role1').unload_signing_key(role1_privkey)
|
|
repository.targets('role1').load_signing_key(targets_privkey)
|
|
repository.status()
|
|
|
|
# Reset Root and 'targets/role1', and verify Targets.
|
|
repository.root.unload_signing_key(targets_privkey)
|
|
repository.root.load_signing_key(root_privkey)
|
|
repository.targets('role1').unload_signing_key(targets_privkey)
|
|
repository.targets('role1').load_signing_key(role1_privkey)
|
|
repository.targets.unload_signing_key(targets_privkey)
|
|
repository.targets.load_signing_key(snapshot_privkey)
|
|
repository.status()
|
|
|
|
# Reset Targets and verify Snapshot.
|
|
repository.targets.unload_signing_key(snapshot_privkey)
|
|
repository.targets.load_signing_key(targets_privkey)
|
|
repository.snapshot.unload_signing_key(snapshot_privkey)
|
|
repository.snapshot.load_signing_key(timestamp_privkey)
|
|
repository.status()
|
|
|
|
# Reset Snapshot and verify timestamp.
|
|
repository.snapshot.unload_signing_key(timestamp_privkey)
|
|
repository.snapshot.load_signing_key(snapshot_privkey)
|
|
repository.timestamp.unload_signing_key(timestamp_privkey)
|
|
repository.timestamp.load_signing_key(root_privkey)
|
|
repository.status()
|
|
|
|
# Reset Timestamp
|
|
repository.timestamp.unload_signing_key(root_privkey)
|
|
repository.timestamp.load_signing_key(timestamp_privkey)
|
|
|
|
# Verify that a write() fails if a repository is loaded and a change
|
|
# is made to a role.
|
|
repo_tool.load_repository(repository_directory)
|
|
|
|
repository.timestamp.expiration = datetime.datetime(2030, 1, 1, 12, 0)
|
|
self.assertRaises(tuf.UnsignedMetadataError, repository.write)
|
|
|
|
# Verify that a write_partial() is allowed.
|
|
repository.write_partial()
|
|
|
|
# Next, perform a non-partial write() with consistent snapshots enabled.
|
|
# Since the timestamp was modified, load its private key.
|
|
repository.timestamp.load_signing_key(timestamp_privkey)
|
|
|
|
# Test creation of a consistent snapshot repository. Writing a consistent
|
|
# snapshot modifies the Root metadata, which specifies whether a repository
|
|
# supports consistent snapshots. Verify that an exception is raised due to
|
|
# the missing signatures of Root and Snapshot.
|
|
self.assertRaises(tuf.UnsignedMetadataError, repository.write,
|
|
False, True)
|
|
|
|
# Load the private keys of Root and Snapshot (new version required since
|
|
# Root has changed.)
|
|
repository.root.load_signing_key(root_privkey)
|
|
repository.snapshot.load_signing_key(snapshot_privkey)
|
|
|
|
# Verify that a consistent snapshot can be written and loaded.
|
|
repository.write(consistent_snapshot=True)
|
|
repo_tool.load_repository(repository_directory)
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(tuf.FormatError, repository.write, 3, False)
|
|
self.assertRaises(tuf.FormatError, repository.write, False, 3)
|
|
|
|
|
|
|
|
def test_get_filepaths_in_directory(self):
|
|
# Test normal case.
|
|
# Use the pre-generated metadata directory for testing.
|
|
# Set 'repo' reference to improve readability.
|
|
repo = repo_tool.Repository
|
|
metadata_directory = os.path.join('repository_data',
|
|
'repository', 'metadata')
|
|
# Verify the expected filenames. get_filepaths_in_directory() returns
|
|
# a list of absolute paths.
|
|
metadata_files = repo.get_filepaths_in_directory(metadata_directory)
|
|
expected_files = ['root.json', 'root.json.gz', 'targets.json',
|
|
'targets.json.gz', 'snapshot.json', 'snapshot.json.gz',
|
|
'timestamp.json', 'timestamp.json.gz', 'role1.json',
|
|
'role1.json.gz']
|
|
basenames = []
|
|
for filepath in metadata_files:
|
|
basenames.append(os.path.basename(filepath))
|
|
self.assertEqual(sorted(expected_files), sorted(basenames))
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(tuf.FormatError, repo.get_filepaths_in_directory,
|
|
3, recursive_walk=False, followlinks=False)
|
|
self.assertRaises(tuf.FormatError, repo.get_filepaths_in_directory,
|
|
metadata_directory, 3, followlinks=False)
|
|
self.assertRaises(tuf.FormatError, repo.get_filepaths_in_directory,
|
|
metadata_directory, recursive_walk=False, followlinks=3)
|
|
|
|
# Test invalid directory argument.
|
|
# A non-directory.
|
|
self.assertRaises(tuf.Error, repo.get_filepaths_in_directory,
|
|
os.path.join(metadata_directory, 'root.json'))
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
nonexistent_directory = os.path.join(temporary_directory, 'nonexistent/')
|
|
self.assertRaises(tuf.Error, repo.get_filepaths_in_directory,
|
|
nonexistent_directory, recursive_walk=False,
|
|
followlinks=False)
|
|
|
|
|
|
|
|
|
|
|
|
class TestMetadata(unittest.TestCase):
|
|
def setUp(self):
|
|
# Inherit from the repo_tool.Metadata() base class. All of the methods
|
|
# to be tested in TestMetadata require at least 1 role, so create it here
|
|
# and set its roleinfo.
|
|
class MetadataRole(repo_tool.Metadata):
|
|
|
|
def __init__(self):
|
|
super(MetadataRole, self).__init__()
|
|
|
|
self._rolename = 'metadata_role'
|
|
|
|
# Expire in 86400 seconds (1 day).
|
|
expiration = \
|
|
tuf.formats.unix_timestamp_to_datetime(int(time.time() + 86400))
|
|
expiration = expiration.isoformat() + 'Z'
|
|
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
|
|
'signatures': [], 'version': 0,
|
|
'consistent_snapshot': False,
|
|
'compressions': [''], 'expires': expiration,
|
|
'partial_loaded': False}
|
|
|
|
tuf.roledb.add_role(self._rolename, roleinfo)
|
|
|
|
self.metadata = MetadataRole()
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb()
|
|
tuf.keydb.clear_keydb()
|
|
self.metadata = None
|
|
|
|
|
|
|
|
def test_rolename(self):
|
|
base_metadata = repo_tool.Metadata()
|
|
|
|
self.assertEqual(base_metadata.rolename, None)
|
|
|
|
# Test the sub-classed MetadataRole().
|
|
self.assertEqual(self.metadata.rolename, 'metadata_role')
|
|
|
|
|
|
|
|
def test_version(self):
|
|
# Test version getter, and the default version number.
|
|
self.assertEqual(self.metadata.version, 0)
|
|
|
|
# Test version setter, and verify updated version number.
|
|
self.metadata.version = 8
|
|
self.assertEqual(self.metadata.version, 8)
|
|
|
|
|
|
|
|
def test_threshold(self):
|
|
# Test threshold getter, and the default threshold number.
|
|
self.assertEqual(self.metadata.threshold, 1)
|
|
|
|
# Test threshold setter, and verify updated threshold number.
|
|
self.metadata.threshold = 3
|
|
self.assertEqual(self.metadata.threshold, 3)
|
|
|
|
|
|
|
|
def test_expiration(self):
|
|
# Test expiration getter.
|
|
expiration = self.metadata.expiration
|
|
self.assertTrue(isinstance(expiration, datetime.datetime))
|
|
|
|
# Test expiration setter.
|
|
self.metadata.expiration = datetime.datetime(2030, 1, 1, 12, 0)
|
|
expiration = self.metadata.expiration
|
|
self.assertTrue(isinstance(expiration, datetime.datetime))
|
|
|
|
# test a setter with microseconds, we are forcing the microseconds value
|
|
expiration = datetime.datetime.today() + datetime.timedelta(weeks = 1)
|
|
# we force the microseconds value if we are unlucky enough to get a 0
|
|
if expiration.microsecond == 0:
|
|
expiration = expiration.replace(microsecond = 1)
|
|
|
|
new_expiration = self.metadata.expiration
|
|
self.assertTrue(isinstance(new_expiration, datetime.datetime))
|
|
|
|
# check that the expiration value is truncated
|
|
self.assertTrue(new_expiration.microsecond == 0)
|
|
|
|
# Test improperly formatted datetime.
|
|
try:
|
|
self.metadata.expiration = '3'
|
|
|
|
except tuf.FormatError:
|
|
pass
|
|
|
|
else:
|
|
self.fail('Setter failed to detect improperly formatted datetime.')
|
|
|
|
|
|
# Test invalid argument (i.e., expiration has already expired.)
|
|
expired_datetime = tuf.formats.unix_timestamp_to_datetime(int(time.time() - 1))
|
|
try:
|
|
self.metadata.expiration = expired_datetime
|
|
|
|
except tuf.Error:
|
|
pass
|
|
|
|
else:
|
|
self.fail('Setter failed to detect an expired datetime.')
|
|
|
|
|
|
|
|
def test_keys(self):
|
|
# Test default case, where a verification key has not been added.
|
|
self.assertEqual(self.metadata.keys, [])
|
|
|
|
|
|
# Test keys() getter after a verification key has been loaded.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'root_key.pub')
|
|
key_object = repo_tool.import_rsa_publickey_from_file(key_path)
|
|
self.metadata.add_verification_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.keys)
|
|
|
|
|
|
|
|
def test_signing_keys(self):
|
|
# Test default case, where a signing key has not been added.
|
|
self.assertEqual(self.metadata.signing_keys, [])
|
|
|
|
|
|
# Test signing_keys() getter after a signing key has been loaded.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'root_key')
|
|
key_object = repo_tool.import_rsa_privatekey_from_file(key_path, 'password')
|
|
self.metadata.load_signing_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.signing_keys)
|
|
|
|
|
|
|
|
def test_compressions(self):
|
|
# Test default case, where only uncompressed metadata is supported.
|
|
self.assertEqual(self.metadata.compressions, [''])
|
|
|
|
# Test compressions getter after a compressions algorithm is added.
|
|
self.metadata.compressions = ['gz']
|
|
|
|
self.assertEqual(self.metadata.compressions, ['', 'gz'])
|
|
|
|
|
|
# Test improperly formatted argument.
|
|
try:
|
|
self.metadata.compressions = 3
|
|
except tuf.FormatError:
|
|
pass
|
|
else:
|
|
self.fail('Setter failed to detect improperly formatted compressions')
|
|
|
|
|
|
|
|
def test_add_verification_key(self):
|
|
# Add verification key and verify with keys() that it was added.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'root_key.pub')
|
|
key_object = repo_tool.import_rsa_publickey_from_file(key_path)
|
|
self.metadata.add_verification_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.keys)
|
|
|
|
|
|
# Test improperly formatted key argument.
|
|
self.assertRaises(tuf.FormatError, self.metadata.add_verification_key, 3)
|
|
|
|
|
|
|
|
def test_remove_verification_key(self):
|
|
# Add verification key so that remove_verifiation_key() can be tested.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'root_key.pub')
|
|
key_object = repo_tool.import_rsa_publickey_from_file(key_path)
|
|
self.metadata.add_verification_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.keys)
|
|
|
|
|
|
# Test successful removal of verification key added above.
|
|
self.metadata.remove_verification_key(key_object)
|
|
self.assertEqual(self.metadata.keys, [])
|
|
|
|
|
|
# Test improperly formatted argument
|
|
self.assertRaises(tuf.FormatError, self.metadata.remove_verification_key, 3)
|
|
|
|
|
|
# Test non-existent public key argument.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'targets_key.pub')
|
|
unused_key_object = repo_tool.import_rsa_publickey_from_file(key_path)
|
|
|
|
self.assertRaises(tuf.Error, self.metadata.remove_verification_key,
|
|
unused_key_object)
|
|
|
|
|
|
|
|
def test_load_signing_key(self):
|
|
# Test normal case.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'root_key')
|
|
key_object = repo_tool.import_rsa_privatekey_from_file(key_path, 'password')
|
|
self.metadata.load_signing_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.signing_keys)
|
|
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(tuf.FormatError, self.metadata.load_signing_key, 3)
|
|
|
|
|
|
# Test non-private key.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'root_key.pub')
|
|
key_object = repo_tool.import_rsa_publickey_from_file(key_path)
|
|
self.assertRaises(tuf.Error, self.metadata.load_signing_key, key_object)
|
|
|
|
|
|
|
|
def test_unload_signing_key(self):
|
|
# Load a signing key so that unload_signing_key() can have a key to unload.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'root_key')
|
|
key_object = repo_tool.import_rsa_privatekey_from_file(key_path, 'password')
|
|
self.metadata.load_signing_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.signing_keys)
|
|
|
|
self.metadata.unload_signing_key(key_object)
|
|
|
|
self.assertEqual(self.metadata.signing_keys, [])
|
|
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(tuf.FormatError, self.metadata.unload_signing_key, 3)
|
|
|
|
|
|
# Test non-existent key argument.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'targets_key')
|
|
unused_key_object = repo_tool.import_rsa_privatekey_from_file(key_path,
|
|
'password')
|
|
|
|
self.assertRaises(tuf.Error, self.metadata.unload_signing_key,
|
|
unused_key_object)
|
|
|
|
|
|
|
|
def test_add_signature(self):
|
|
# Test normal case.
|
|
# Load signature list from any of pre-generated metadata; needed for
|
|
# testing.
|
|
metadata_directory = os.path.join('repository_data',
|
|
'repository', 'metadata')
|
|
root_filepath = os.path.join(metadata_directory, 'root.json')
|
|
root_signable = tuf.util.load_json_file(root_filepath)
|
|
signatures = root_signable['signatures']
|
|
|
|
# Add the first signature from the list, as only need one is needed.
|
|
self.metadata.add_signature(signatures[0])
|
|
self.assertEqual(signatures, self.metadata.signatures)
|
|
|
|
|
|
# Test improperly formatted signature argument.
|
|
self.assertRaises(tuf.FormatError, self.metadata.add_signature, 3)
|
|
|
|
|
|
|
|
def test_remove_signature(self):
|
|
# Test normal case.
|
|
# Add a signature so remove_signature() has some signature to remove.
|
|
metadata_directory = os.path.join('repository_data',
|
|
'repository', 'metadata')
|
|
root_filepath = os.path.join(metadata_directory, 'root.json')
|
|
root_signable = tuf.util.load_json_file(root_filepath)
|
|
signatures = root_signable['signatures']
|
|
self.metadata.add_signature(signatures[0])
|
|
|
|
self.metadata.remove_signature(signatures[0])
|
|
self.assertEqual(self.metadata.signatures, [])
|
|
|
|
|
|
# Test improperly formatted signature argument.
|
|
self.assertRaises(tuf.FormatError, self.metadata.remove_signature, 3)
|
|
|
|
|
|
# Test invalid signature argument (i.e., signature that has not been added.)
|
|
# Load an unused signature to be tested.
|
|
targets_filepath = os.path.join(metadata_directory, 'targets.json')
|
|
targets_signable = tuf.util.load_json_file(targets_filepath)
|
|
signatures = targets_signable['signatures']
|
|
|
|
self.assertRaises(tuf.Error, self.metadata.remove_signature, signatures[0])
|
|
|
|
|
|
|
|
def test_signatures(self):
|
|
# Test default case, where no signatures have been added yet.
|
|
self.assertEqual(self.metadata.signatures, [])
|
|
|
|
|
|
# Test getter after adding an example signature.
|
|
metadata_directory = os.path.join('repository_data',
|
|
'repository', 'metadata')
|
|
root_filepath = os.path.join(metadata_directory, 'root.json')
|
|
root_signable = tuf.util.load_json_file(root_filepath)
|
|
signatures = root_signable['signatures']
|
|
|
|
# Add the first signature from the list, as only need one is needed.
|
|
self.metadata.add_signature(signatures[0])
|
|
self.assertEqual(signatures, self.metadata.signatures)
|
|
|
|
|
|
|
|
class TestRoot(unittest.TestCase):
|
|
def setUp(self):
|
|
pass
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb()
|
|
tuf.keydb.clear_keydb()
|
|
|
|
|
|
|
|
def test_init(self):
|
|
|
|
# Test normal case.
|
|
# Root() subclasses Metadata(), and creates a 'root' role in 'tuf.roledb'.
|
|
root_object = repo_tool.Root()
|
|
self.assertTrue(isinstance(root_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('root'))
|
|
|
|
|
|
|
|
class TestTimestamp(unittest.TestCase):
|
|
def setUp(self):
|
|
pass
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb()
|
|
tuf.keydb.clear_keydb()
|
|
|
|
|
|
|
|
def test_init(self):
|
|
|
|
# Test normal case.
|
|
# Timestamp() subclasses Metadata(), and creates a 'timestamp' role in
|
|
# 'tuf.roledb'.
|
|
timestamp_object = repo_tool.Timestamp()
|
|
self.assertTrue(isinstance(timestamp_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('timestamp'))
|
|
|
|
|
|
|
|
|
|
|
|
class TestSnapshot(unittest.TestCase):
|
|
def setUp(self):
|
|
pass
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb()
|
|
tuf.keydb.clear_keydb()
|
|
|
|
|
|
|
|
def test_init(self):
|
|
|
|
# Test normal case.
|
|
# Snapshot() subclasses Metadata(), and creates a 'snapshot' role in
|
|
# 'tuf.roledb'.
|
|
snapshot_object = repo_tool.Snapshot()
|
|
self.assertTrue(isinstance(snapshot_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('snapshot'))
|
|
|
|
|
|
|
|
|
|
|
|
class TestTargets(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
# setUpClass() is called before tests in an individual class are executed.
|
|
|
|
# Create a temporary directory to store the repository, metadata, and target
|
|
# files. 'temporary_directory' must be deleted in TearDownClass() so that
|
|
# temporary files are always removed, even when exceptions occur.
|
|
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
|
|
|
|
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
|
|
# tearDownModule() is called after all the tests have run.
|
|
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
|
|
|
|
# Remove the temporary repository directory, which should contain all the
|
|
# metadata, targets, and key files generated for the test cases.
|
|
shutil.rmtree(cls.temporary_directory)
|
|
|
|
|
|
|
|
def setUp(self):
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
self.targets_directory = os.path.join(temporary_directory, 'repository',
|
|
'targets')
|
|
original_targets_directory = os.path.join('repository_data',
|
|
'repository', 'targets')
|
|
shutil.copytree(original_targets_directory, self.targets_directory)
|
|
self.targets_object = repo_tool.Targets(self.targets_directory)
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb()
|
|
tuf.keydb.clear_keydb()
|
|
self.targets_object = None
|
|
|
|
|
|
|
|
def test_init(self):
|
|
|
|
# Test normal case.
|
|
# Snapshot() subclasses Metadata(), and creates a 'snapshot' role in
|
|
# 'tuf.roledb'.
|
|
targets_object = repo_tool.Targets('targets_directory/')
|
|
self.assertTrue(isinstance(targets_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('targets'))
|
|
|
|
# Custom Targets object rolename.
|
|
targets_object = repo_tool.Targets('targets_directory/', 'project')
|
|
self.assertTrue(isinstance(targets_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('project'))
|
|
|
|
# Custom roleinfo object (i.e., tuf.formats.ROLEDB_SCHEMA). 'keyids' and
|
|
# 'threshold' are required, the rest are optional.
|
|
roleinfo = {'keyids':
|
|
['66c4cb5fef5e4d62b7013ef1cab4b8a827a36c14056d5603c3a970e21eb30e6f'],
|
|
'threshold': 8}
|
|
self.assertTrue(tuf.formats.ROLEDB_SCHEMA.matches(roleinfo))
|
|
|
|
targets_object = repo_tool.Targets('targets_directory/', 'package', roleinfo)
|
|
self.assertTrue(isinstance(targets_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('package'))
|
|
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(tuf.FormatError, repo_tool.Targets, 3)
|
|
self.assertRaises(tuf.FormatError, repo_tool.Targets, 'targets_directory/', 3)
|
|
self.assertRaises(tuf.FormatError, repo_tool.Targets, 'targets_directory/',
|
|
'targets', 3)
|
|
|
|
|
|
|
|
def test_call(self):
|
|
# Test normal case.
|
|
# Perform a delegation so that a delegated role can be accessed and tested
|
|
# through __call__(). Example: {targets_object}('role1').
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'root_key.pub')
|
|
public_key = repo_tool.import_rsa_publickey_from_file(public_keypath)
|
|
target1_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
|
|
|
# Create Targets() object to be tested.
|
|
targets_object = repo_tool.Targets(self.targets_directory)
|
|
targets_object.delegate('role1', [public_key], [target1_filepath])
|
|
|
|
self.assertTrue(isinstance(targets_object('role1'), repo_tool.Targets))
|
|
|
|
# Test invalid (i.e., non-delegated) rolename argument.
|
|
self.assertRaises(tuf.UnknownRoleError, targets_object, 'unknown_role')
|
|
|
|
# Test improperly formatted argument.
|
|
self.assertRaises(tuf.FormatError, targets_object, 1)
|
|
|
|
|
|
|
|
def test_get_delegated_rolenames(self):
|
|
# Test normal case.
|
|
# Perform two delegations so that get_delegated_rolenames() has roles to
|
|
# return.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'root_key.pub')
|
|
public_key = repo_tool.import_rsa_publickey_from_file(public_keypath)
|
|
target1_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
|
target2_filepath = os.path.join(self.targets_directory, 'file2.txt')
|
|
|
|
# Set needed arguments by delegate().
|
|
public_keys = [public_key]
|
|
threshold = 1
|
|
|
|
self.targets_object.delegate('tuf', public_keys, [target1_filepath],
|
|
threshold, restricted_paths=None,
|
|
path_hash_prefixes=None)
|
|
self.targets_object.delegate('warehouse', public_keys, [target2_filepath],
|
|
threshold, restricted_paths=None,
|
|
path_hash_prefixes=None)
|
|
|
|
# Test that get_delegated_rolenames returns the expected delegations.
|
|
expected_delegated_rolenames = ['targets/tuf/', 'targets/warehouse']
|
|
for delegated_rolename in self.targets_object.get_delegated_rolenames():
|
|
delegated_rolename in expected_delegated_rolenames
|
|
|
|
|
|
|
|
def test_target_files(self):
|
|
# Test normal case.
|
|
# Verify the targets object initially contains zero target files.
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
target_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
|
self.targets_object.add_target(target_filepath)
|
|
|
|
self.assertEqual(len(self.targets_object.target_files), 1)
|
|
self.assertTrue('/file1.txt' in self.targets_object.target_files)
|
|
|
|
|
|
|
|
def test_delegations(self):
|
|
# Test normal case.
|
|
# Perform a delegation so that delegations() has a Targets() object to
|
|
# return.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'root_key.pub')
|
|
public_key = repo_tool.import_rsa_publickey_from_file(public_keypath)
|
|
target1_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
|
|
|
# Set needed arguments by delegate().
|
|
public_keys = [public_key]
|
|
rolename = 'tuf'
|
|
list_of_targets = [target1_filepath]
|
|
threshold = 1
|
|
|
|
self.targets_object.delegate(rolename, public_keys, list_of_targets,
|
|
threshold, backtrack=True,
|
|
restricted_paths=None,
|
|
path_hash_prefixes=None)
|
|
|
|
# Test that a valid Targets() object is returned by delegations().
|
|
for delegated_object in self.targets_object.delegations:
|
|
self.assertTrue(isinstance(delegated_object, repo_tool.Targets))
|
|
|
|
|
|
|
|
def test_add_target(self):
|
|
# Test normal case.
|
|
# Verify the targets object initially contains zero target files.
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
target_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
|
self.targets_object.add_target(target_filepath)
|
|
|
|
self.assertEqual(len(self.targets_object.target_files), 1)
|
|
self.assertTrue('/file1.txt' in self.targets_object.target_files)
|
|
|
|
# Test the 'custom' parameter of add_target(), where additional information
|
|
# may be specified for the target.
|
|
target2_filepath = os.path.join(self.targets_directory, 'file2.txt')
|
|
|
|
# The file permission of the target (octal number specifying file access
|
|
# for owner, group, others (e.g., 0755).
|
|
octal_file_permissions = oct(os.stat(target2_filepath).st_mode)[4:]
|
|
custom_file_permissions = {'file_permissions': octal_file_permissions}
|
|
self.targets_object.add_target(target2_filepath, custom_file_permissions)
|
|
|
|
self.assertEqual(len(self.targets_object.target_files), 2)
|
|
self.assertTrue('/file2.txt' in self.targets_object.target_files)
|
|
self.assertEqual(self.targets_object.target_files['/file2.txt'],
|
|
custom_file_permissions)
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(tuf.FormatError, self.targets_object.add_target, 3)
|
|
self.assertRaises(tuf.FormatError, self.targets_object.add_target, 3,
|
|
custom_file_permissions)
|
|
self.assertRaises(tuf.FormatError, self.targets_object.add_target,
|
|
target_filepath, 3)
|
|
|
|
|
|
# Test invalid filepath argument (i.e., non-existent or invalid file.)
|
|
self.assertRaises(tuf.Error, self.targets_object.add_target,
|
|
'non-existent.txt')
|
|
|
|
# Not under the repository's targets directory.
|
|
self.assertRaises(tuf.Error, self.targets_object.add_target,
|
|
self.temporary_directory)
|
|
|
|
# Not a file (i.e., a valid path, but a directory.)
|
|
test_directory = os.path.join(self.targets_directory, 'test_directory')
|
|
os.mkdir(test_directory)
|
|
self.assertRaises(tuf.Error, self.targets_object.add_target, test_directory)
|
|
|
|
|
|
|
|
def test_add_targets(self):
|
|
# Test normal case.
|
|
# Verify the targets object initially contains zero target files.
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
target1_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
|
target2_filepath = os.path.join(self.targets_directory, 'file2.txt')
|
|
target3_filepath = os.path.join(self.targets_directory, 'file3.txt')
|
|
|
|
# Add a 'target1_filepath' duplicate for testing purposes
|
|
# ('target1_filepath' should not be added twice.)
|
|
target_files = \
|
|
[target1_filepath, target2_filepath, target3_filepath, target1_filepath]
|
|
self.targets_object.add_targets(target_files)
|
|
|
|
self.assertEqual(len(self.targets_object.target_files), 3)
|
|
self.assertEqual(self.targets_object.target_files,
|
|
{'/file1.txt': {}, '/file2.txt': {}, '/file3.txt': {}})
|
|
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(tuf.FormatError, self.targets_object.add_targets, 3)
|
|
|
|
# Test invalid filepath argument (i.e., non-existent or invalid file.)
|
|
self.assertRaises(tuf.Error, self.targets_object.add_targets,
|
|
['non-existent.txt'])
|
|
self.assertRaises(tuf.Error, self.targets_object.add_targets,
|
|
[target1_filepath, target2_filepath, 'non-existent.txt'])
|
|
self.assertRaises(tuf.Error, self.targets_object.add_targets,
|
|
[self.temporary_directory])
|
|
temp_directory = os.path.join(self.targets_directory, 'temp')
|
|
os.mkdir(temp_directory)
|
|
self.assertRaises(tuf.Error, self.targets_object.add_targets,
|
|
[temp_directory])
|
|
|
|
|
|
|
|
def test_remove_target(self):
|
|
# Test normal case.
|
|
# Verify the targets object initially contains zero target files.
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
# Add a target so that remove_target() has something to remove.
|
|
target_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
|
self.targets_object.add_target(target_filepath)
|
|
|
|
# Test remove_target()'s behavior.
|
|
self.targets_object.remove_target(target_filepath)
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(tuf.FormatError, self.targets_object.remove_target, 3)
|
|
|
|
|
|
# Test invalid filepath argument (i.e., non-existent or invalid file.)
|
|
self.assertRaises(tuf.Error, self.targets_object.remove_target,
|
|
'/non-existent.txt')
|
|
# Test for filepath that hasn't been added yet.
|
|
target5_filepath = os.path.join(self.targets_directory, 'file5.txt')
|
|
self.assertRaises(tuf.Error, self.targets_object.remove_target,
|
|
target5_filepath)
|
|
|
|
|
|
|
|
def test_clear_targets(self):
|
|
# Test normal case.
|
|
# Verify the targets object initially contains zero target files.
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
# Add targets, to be tested by clear_targets().
|
|
target1_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
|
target2_filepath = os.path.join(self.targets_directory, 'file2.txt')
|
|
self.targets_object.add_targets([target1_filepath, target2_filepath])
|
|
|
|
self.targets_object.clear_targets()
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
|
|
|
|
def test_delegate(self):
|
|
# Test normal case.
|
|
# Need at least one public key and valid target paths required by
|
|
# delegate().
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'root_key.pub')
|
|
public_key = repo_tool.import_rsa_publickey_from_file(public_keypath)
|
|
target1_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
|
target2_filepath = os.path.join(self.targets_directory, 'file2.txt')
|
|
|
|
|
|
# Set needed arguments by delegate().
|
|
public_keys = [public_key]
|
|
rolename = 'tuf'
|
|
list_of_targets = [target1_filepath, target2_filepath]
|
|
threshold = 1
|
|
restricted_paths = [self.targets_directory]
|
|
path_hash_prefixes = ['e3a3', '8fae', 'd543']
|
|
|
|
self.targets_object.delegate(rolename, public_keys, list_of_targets,
|
|
threshold, backtrack=True,
|
|
restricted_paths=restricted_paths,
|
|
path_hash_prefixes=path_hash_prefixes)
|
|
|
|
self.assertEqual(self.targets_object.get_delegated_rolenames(),
|
|
['tuf'])
|
|
|
|
# Try to delegate to a role that has already been delegated.
|
|
self.assertRaises(tuf.Error, self.targets_object.delegate, rolename,
|
|
public_keys, list_of_targets, threshold, backtrack=True,
|
|
restricted_paths=restricted_paths,
|
|
path_hash_prefixes=path_hash_prefixes)
|
|
|
|
# Test for targets that do not exist under the targets directory.
|
|
self.targets_object.revoke(rolename)
|
|
self.assertRaises(tuf.Error, self.targets_object.delegate, rolename,
|
|
public_keys, ['non-existent.txt'], threshold,
|
|
backtrack=True, restricted_paths=restricted_paths,
|
|
path_hash_prefixes=path_hash_prefixes)
|
|
|
|
# Test for targets that do not exist under the targets directory.
|
|
self.assertRaises(tuf.Error, self.targets_object.delegate, rolename,
|
|
public_keys, list_of_targets, threshold,
|
|
backtrack=True, restricted_paths=['non-existent.txt'],
|
|
path_hash_prefixes=path_hash_prefixes)
|
|
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(tuf.FormatError, self.targets_object.delegate,
|
|
3, public_keys, list_of_targets, threshold,
|
|
restricted_paths, path_hash_prefixes)
|
|
self.assertRaises(tuf.FormatError, self.targets_object.delegate,
|
|
rolename, 3, list_of_targets, threshold,
|
|
restricted_paths, path_hash_prefixes)
|
|
self.assertRaises(tuf.FormatError, self.targets_object.delegate,
|
|
rolename, public_keys, 3, threshold,
|
|
restricted_paths, path_hash_prefixes)
|
|
self.assertRaises(tuf.FormatError, self.targets_object.delegate,
|
|
rolename, public_keys, list_of_targets, '3',
|
|
restricted_paths, path_hash_prefixes)
|
|
self.assertRaises(tuf.FormatError, self.targets_object.delegate,
|
|
rolename, public_keys, list_of_targets, threshold,
|
|
3, path_hash_prefixes)
|
|
self.assertRaises(tuf.FormatError, self.targets_object.delegate,
|
|
rolename, public_keys, list_of_targets, threshold,
|
|
restricted_paths, 3)
|
|
|
|
|
|
# Test invalid arguments (e.g., already delegated 'rolename', non-existent
|
|
# files, etc.).
|
|
# Test duplicate 'rolename' delegation, which should have been delegated
|
|
# in the normal case above.
|
|
self.assertRaises(tuf.Error, self.targets_object.delegate,
|
|
rolename, public_keys, list_of_targets, threshold,
|
|
restricted_paths, path_hash_prefixes)
|
|
|
|
# Test non-existent target paths.
|
|
self.assertRaises(tuf.Error, self.targets_object.delegate,
|
|
rolename, public_keys, ['/non-existent'], threshold,
|
|
restricted_paths, path_hash_prefixes)
|
|
|
|
|
|
|
|
def test_delegate_hashed_bins(self):
|
|
# Test normal case.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'root_key.pub')
|
|
public_key = repo_tool.import_rsa_publickey_from_file(public_keypath)
|
|
target1_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
|
|
|
# Set needed arguments by delegate_hashed_bins().
|
|
public_keys = [public_key]
|
|
list_of_targets = [target1_filepath]
|
|
|
|
# Test delegate_hashed_bins() and verify that 16 hashed bins have
|
|
# been delegated in the parent's roleinfo.
|
|
self.targets_object.delegate_hashed_bins(list_of_targets, public_keys,
|
|
number_of_bins=16)
|
|
|
|
# The expected child rolenames, since 'number_of_bins' = 16
|
|
delegated_rolenames = ['0', '1', '2', '3', '4', '5', '6', '7',
|
|
'8', '9', 'a', 'b', 'c', 'd', 'e', 'f']
|
|
|
|
self.assertEqual(sorted(self.targets_object.get_delegated_rolenames()),
|
|
sorted(delegated_rolenames))
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(tuf.FormatError,
|
|
self.targets_object.delegate_hashed_bins, 3, public_keys,
|
|
number_of_bins=1)
|
|
self.assertRaises(tuf.FormatError,
|
|
self.targets_object.delegate_hashed_bins,
|
|
list_of_targets, 3, number_of_bins=1)
|
|
self.assertRaises(tuf.FormatError,
|
|
self.targets_object.delegate_hashed_bins,
|
|
list_of_targets, public_keys, '1')
|
|
|
|
|
|
# Test invalid arguments.
|
|
# Invalid number of bins, which must be a power of 2.
|
|
self.assertRaises(tuf.Error,
|
|
self.targets_object.delegate_hashed_bins,
|
|
list_of_targets, public_keys, number_of_bins=3)
|
|
|
|
# Invalid 'list_of_targets'.
|
|
self.assertRaises(tuf.Error,
|
|
self.targets_object.delegate_hashed_bins,
|
|
['/non-existent'], public_keys, number_of_bins=3)
|
|
|
|
|
|
|
|
def test_add_target_to_bin(self):
|
|
# Test normal case.
|
|
# Delegate the hashed bins so that add_target_to_bin() can be tested.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'targets_key.pub')
|
|
public_key = repo_tool.import_rsa_publickey_from_file(public_keypath)
|
|
target1_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
|
|
|
# Set needed arguments by delegate_hashed_bins().
|
|
public_keys = [public_key]
|
|
list_of_targets = [target1_filepath]
|
|
|
|
# Delegate to hashed bins. The target filepath to be tested is expected
|
|
# to contain a hash prefix of 'e', and should be available at:
|
|
# repository.targets('e').
|
|
self.targets_object.delegate_hashed_bins(list_of_targets, public_keys,
|
|
number_of_bins=16)
|
|
|
|
# Ensure each hashed bin initially contains zero targets.
|
|
for delegation in self.targets_object.delegations:
|
|
self.assertTrue(target1_filepath not in delegation.target_files)
|
|
|
|
# Add 'target1_filepath' and verify that the relative path of
|
|
# 'target1_filepath' is added to the correct bin.
|
|
self.targets_object.add_target_to_bin(target1_filepath)
|
|
|
|
for delegation in self.targets_object.delegations:
|
|
if delegation.rolename == 'e':
|
|
self.assertTrue('/file1.txt' in delegation.target_files)
|
|
|
|
else:
|
|
self.assertFalse('/file1.txt' in delegation.target_files)
|
|
|
|
# Test for non-existent delegations and hashed bins.
|
|
empty_targets_role = repo_tool.Targets(self.targets_directory, 'empty')
|
|
|
|
self.assertRaises(tuf.Error, empty_targets_role.add_target_to_bin,
|
|
target1_filepath)
|
|
|
|
# Test for a required hashed bin that does not exist.
|
|
self.targets_object.revoke('e')
|
|
self.assertRaises(tuf.Error, self.targets_object.add_target_to_bin,
|
|
target1_filepath)
|
|
|
|
# Test improperly formatted argument.
|
|
self.assertRaises(tuf.FormatError,
|
|
self.targets_object.add_target_to_bin, 3)
|
|
|
|
# Invalid target file path argument.
|
|
self.assertRaises(tuf.Error,
|
|
self.targets_object.add_target_to_bin, '/non-existent')
|
|
|
|
|
|
|
|
def test_remove_target_from_bin(self):
|
|
# Test normal case.
|
|
# Delegate the hashed bins so that add_target_to_bin() can be tested.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'targets_key.pub')
|
|
public_key = repo_tool.import_rsa_publickey_from_file(public_keypath)
|
|
target1_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
|
|
|
# Set needed arguments by delegate_hashed_bins().
|
|
public_keys = [public_key]
|
|
list_of_targets = [target1_filepath]
|
|
|
|
# Delegate to hashed bins. The target filepath to be tested is expected
|
|
# to contain a hash prefix of 'e', and can be accessed as:
|
|
# repository.targets('e').
|
|
self.targets_object.delegate_hashed_bins(list_of_targets, public_keys,
|
|
number_of_bins=16)
|
|
|
|
# Ensure each hashed bin initially contains zero targets.
|
|
for delegation in self.targets_object.delegations:
|
|
self.assertTrue(target1_filepath not in delegation.target_files)
|
|
|
|
# Add 'target1_filepath' and verify that the relative path of
|
|
# 'target1_filepath' is added to the correct bin.
|
|
self.targets_object.add_target_to_bin(target1_filepath)
|
|
|
|
for delegation in self.targets_object.delegations:
|
|
if delegation.rolename == 'e':
|
|
self.assertTrue('/file1.txt' in delegation.target_files)
|
|
|
|
else:
|
|
self.assertTrue('/file1.txt' not in delegation.target_files)
|
|
|
|
# Test the remove_target_from_bin() method. Verify that 'target1_filepath'
|
|
# has been removed.
|
|
self.targets_object.remove_target_from_bin(target1_filepath)
|
|
|
|
for delegation in self.targets_object.delegations:
|
|
if delegation.rolename == 'e':
|
|
self.assertTrue('/file1.txt' not in delegation.target_files)
|
|
|
|
else:
|
|
self.assertTrue('/file1.txt' not in delegation.target_files)
|
|
|
|
|
|
# Test improperly formatted argument.
|
|
self.assertRaises(tuf.FormatError,
|
|
self.targets_object.remove_target_from_bin, 3)
|
|
|
|
# Invalid target file path argument.
|
|
self.assertRaises(tuf.Error, self.targets_object.remove_target_from_bin,
|
|
'/non-existent')
|
|
|
|
|
|
|
|
def test_add_restricted_paths(self):
|
|
# Test normal case.
|
|
# Perform a delegation so that add_restricted_paths() has a child role
|
|
# to restrict.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'root_key.pub')
|
|
public_key = repo_tool.import_rsa_publickey_from_file(public_keypath)
|
|
|
|
# Set needed arguments by delegate().
|
|
public_keys = [public_key]
|
|
rolename = 'tuf'
|
|
threshold = 1
|
|
|
|
self.targets_object.delegate(rolename, public_keys, [],
|
|
threshold, restricted_paths=None,
|
|
path_hash_prefixes=None)
|
|
|
|
# Delegate an extra role for test coverage (i.e., check that restricted
|
|
# paths are not added to a child role not requested.)
|
|
self.targets_object.delegate('junk_role', public_keys, [])
|
|
|
|
restricted_path = os.path.join(self.targets_directory, 'tuf_files')
|
|
os.mkdir(restricted_path)
|
|
restricted_paths = [restricted_path]
|
|
self.targets_object.add_restricted_paths(restricted_paths, 'tuf')
|
|
|
|
# Retrieve 'targets_object' roleinfo, and verify the roleinfo contains
|
|
# the expected restricted paths of the delegated role. Only
|
|
# Repository.write() verifies that child target paths are allowed by the
|
|
# parent.
|
|
targets_object_roleinfo = tuf.roledb.get_roleinfo(self.targets_object.rolename)
|
|
|
|
delegated_role = targets_object_roleinfo['delegations']['roles'][0]
|
|
self.assertEqual(['/tuf_files/'], delegated_role['paths'])
|
|
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(tuf.FormatError, self.targets_object.add_restricted_paths,
|
|
3, 'tuf')
|
|
self.assertRaises(tuf.FormatError, self.targets_object.add_restricted_paths,
|
|
restricted_paths, 3)
|
|
|
|
|
|
# Test invalid arguments.
|
|
# A non-delegated child role.
|
|
self.assertRaises(tuf.Error, self.targets_object.add_restricted_paths,
|
|
restricted_paths, 'non_delegated_rolename')
|
|
|
|
# Non-existent 'restricted_paths'.
|
|
self.assertRaises(tuf.Error, self.targets_object.add_restricted_paths,
|
|
['/non-existent'], 'tuf')
|
|
|
|
# Directory not under the repository's targets directory.
|
|
repository_directory = os.path.join('repository_data', 'repository')
|
|
self.assertRaises(tuf.Error, self.targets_object.add_restricted_paths,
|
|
[repository_directory], 'tuf')
|
|
|
|
|
|
|
|
def test_revoke(self):
|
|
# Test normal case.
|
|
# Perform a delegation so that revoke() has a delegation to revoke.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'root_key.pub')
|
|
public_key = repo_tool.import_rsa_publickey_from_file(public_keypath)
|
|
target1_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
|
|
|
# Set needed arguments by delegate().
|
|
public_keys = [public_key]
|
|
rolename = 'tuf'
|
|
list_of_targets = [target1_filepath]
|
|
threshold = 1
|
|
|
|
self.targets_object.delegate(rolename, public_keys, list_of_targets,
|
|
threshold, restricted_paths=None,
|
|
path_hash_prefixes=None)
|
|
|
|
# Test revoke()
|
|
self.targets_object.revoke('tuf')
|
|
self.assertEqual(self.targets_object.get_delegated_rolenames(), [])
|
|
|
|
|
|
# Test improperly formatted rolename argument.
|
|
self.assertRaises(tuf.FormatError, self.targets_object.revoke, 3)
|
|
|
|
|
|
|
|
|
|
|
|
class TestRepositoryToolFunctions(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
# setUpClass() is called before tests in an individual class are executed.
|
|
|
|
# Create a temporary directory to store the repository, metadata, and target
|
|
# files. 'temporary_directory' must be deleted in TearDownClass() so that
|
|
# temporary files are always removed, even when exceptions occur.
|
|
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
|
|
|
|
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
|
|
# tearDownModule() is called after all the tests have run.
|
|
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
|
|
|
|
# Remove the temporary repository directory, which should contain all the
|
|
# metadata, targets, and key files generated for the test cases.
|
|
shutil.rmtree(cls.temporary_directory)
|
|
|
|
|
|
|
|
def setUp(self):
|
|
pass
|
|
|
|
|
|
def tearDown(self):
|
|
pass
|
|
|
|
|
|
|
|
def test_create_new_repository(self):
|
|
# Test normal case.
|
|
# Setup the temporary repository directories needed by
|
|
# create_new_repository().
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
repository_directory = os.path.join(temporary_directory, 'repository')
|
|
metadata_directory = os.path.join(repository_directory,
|
|
repo_tool.METADATA_STAGED_DIRECTORY_NAME)
|
|
targets_directory = os.path.join(repository_directory,
|
|
repo_tool.TARGETS_DIRECTORY_NAME)
|
|
|
|
repository = repo_tool.create_new_repository(repository_directory)
|
|
self.assertTrue(isinstance(repository, repo_tool.Repository))
|
|
|
|
# Verify that the 'repository/', 'repository/metadata', and
|
|
# 'repository/targets' directories were created.
|
|
self.assertTrue(os.path.exists(repository_directory))
|
|
self.assertTrue(os.path.exists(metadata_directory))
|
|
self.assertTrue(os.path.exists(targets_directory))
|
|
|
|
|
|
# Test that the 'repository' directory is created (along with the other
|
|
# sub-directories) when it does not exist yet. The repository tool creates
|
|
# the non-existent directory.
|
|
shutil.rmtree(repository_directory)
|
|
|
|
repository = repo_tool.create_new_repository(repository_directory)
|
|
repository = repo_tool.create_new_repository(repository_directory)
|
|
self.assertTrue(isinstance(repository, repo_tool.Repository))
|
|
|
|
# Verify that the 'repository/', 'repository/metadata', and
|
|
# 'repository/targets' directories were created.
|
|
self.assertTrue(os.path.exists(repository_directory))
|
|
self.assertTrue(os.path.exists(metadata_directory))
|
|
self.assertTrue(os.path.exists(targets_directory))
|
|
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(tuf.FormatError, repo_tool.create_new_repository, 3)
|
|
|
|
|
|
|
|
def test_load_repository(self):
|
|
# Test normal case.
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
original_repository_directory = os.path.join('repository_data',
|
|
'repository')
|
|
repository_directory = os.path.join(temporary_directory, 'repository')
|
|
shutil.copytree(original_repository_directory, repository_directory)
|
|
|
|
repository = repo_tool.load_repository(repository_directory)
|
|
self.assertTrue(isinstance(repository, repo_tool.Repository))
|
|
|
|
# Verify the expected roles have been loaded. See
|
|
# 'tuf/tests/repository_data/repository/'.
|
|
expected_roles = \
|
|
['root', 'targets', 'snapshot', 'timestamp', 'role1']
|
|
for role in tuf.roledb.get_rolenames():
|
|
self.assertTrue(role in expected_roles)
|
|
|
|
self.assertTrue(len(repository.root.keys))
|
|
self.assertTrue(len(repository.targets.keys))
|
|
self.assertTrue(len(repository.snapshot.keys))
|
|
self.assertTrue(len(repository.timestamp.keys))
|
|
self.assertTrue(len(repository.targets('role1').keys))
|
|
|
|
# Assumed the targets (tuf/tests/repository_data/) role contains 'file1.txt'
|
|
# and 'file2.txt'.
|
|
self.assertTrue('/file1.txt' in repository.targets.target_files)
|
|
self.assertTrue('/file2.txt' in repository.targets.target_files)
|
|
self.assertTrue('/file3.txt' in repository.targets('role1').target_files)
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(tuf.FormatError, repo_tool.load_repository, 3)
|
|
|
|
|
|
# Test for invalid 'repository_directory' (i.e., does not contain the
|
|
# minimum required metadata.
|
|
root_filepath = \
|
|
os.path.join(repository_directory,
|
|
repo_tool.METADATA_STAGED_DIRECTORY_NAME, 'root.json')
|
|
os.remove(root_filepath)
|
|
self.assertRaises(tuf.RepositoryError, repo_tool.load_repository,
|
|
repository_directory)
|
|
|
|
|
|
|
|
# Run the test cases.
|
|
if __name__ == '__main__':
|
|
unittest.main()
|