python-tuf/tests/test_updater_root_rotation_integration.py
Vladimir Diaz 6b2dfc4abc
Fix test_updater_root_rotation_integration following change to repo_lib.py
Signed-off-by: Vladimir Diaz <vladimir.v.diaz@gmail.com>
2018-04-26 13:47:55 -04:00

386 lines
16 KiB
Python
Executable file

#!/usr/bin/env python
# Copyright 2016 - 2017, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""
<Program Name>
test_updater_root_rotation_integration.py
<Author>
Evan Cordell.
<Started>
August 8, 2016.
<Copyright>
See LICENSE-MIT OR LICENSE for licensing information.
<Purpose>
'test_updater_root_rotation.py' provides a collection of methods that test
root key rotation in the example client.
<Methodology>
Test cases here should follow a specific order (i.e., independent methods are
tested before dependent methods). More accurately, least dependent methods
are tested before most dependent methods. There is no reason to rewrite or
construct other methods that replicate already-tested methods solely for
testing purposes. This is possible because the 'unittest.TestCase' class
guarantees the order of unit tests. The 'test_something_A' method would
be tested before 'test_something_B'. To ensure the expected order of tests,
a number is placed after 'test' and before methods name like so:
'test_1_check_directory'. The number is a measure of dependence, where 1 is
less dependent than 2.
"""
# Help with Python 3 compatibility, where the print statement is a function, an
# implicit relative import is invalid, and the '/' operator performs true
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import os
import time
import shutil
import copy
import tempfile
import logging
import random
import subprocess
import sys
import unittest
import tuf
import tuf.log
import tuf.keydb
import tuf.roledb
import tuf.exceptions
import tuf.repository_tool as repo_tool
import tuf.unittest_toolbox as unittest_toolbox
import tuf.client.updater as updater
import securesystemslib
import six
logger = logging.getLogger('tuf.test_updater_root_rotation_integration')
repo_tool.disable_console_log_messages()
class TestUpdater(unittest_toolbox.Modified_TestCase):
@classmethod
def setUpClass(cls):
# setUpClass() is called before tests in an individual class are executed.
# Create a temporary directory to store the repository, metadata, and target
# files. 'temporary_directory' must be deleted in TearDownModule() so that
# temporary files are always removed, even when exceptions occur.
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
# Launch a SimpleHTTPServer (serves files in the current directory). Test
# cases will request metadata and target files that have been pre-generated
# in 'tuf/tests/repository_data', which will be served by the
# SimpleHTTPServer launched here. The test cases of 'test_updater.py'
# assume the pre-generated metadata files have a specific structure, such
# as a delegated role 'targets/role1', three target files, five key files,
# etc.
cls.SERVER_PORT = random.randint(30000, 45000)
command = ['python', 'simple_server.py', str(cls.SERVER_PORT)]
cls.server_process = subprocess.Popen(command, stderr=subprocess.PIPE)
logger.info('\n\tServer process started.')
logger.info('\tServer process id: '+str(cls.server_process.pid))
logger.info('\tServing on port: '+str(cls.SERVER_PORT))
cls.url = 'http://localhost:'+str(cls.SERVER_PORT) + os.path.sep
# NOTE: Following error is raised if a delay is not applied:
# <urlopen error [Errno 111] Connection refused>
time.sleep(1)
@classmethod
def tearDownClass(cls):
# tearDownModule() is called after all the tests have run.
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
# Remove the temporary repository directory, which should contain all the
# metadata, targets, and key files generated for the test cases.
shutil.rmtree(cls.temporary_directory)
# Kill the SimpleHTTPServer process.
if cls.server_process.returncode is None:
logger.info('\tServer process ' + str(cls.server_process.pid) + ' terminated.')
cls.server_process.kill()
def setUp(self):
# We are inheriting from custom class.
unittest_toolbox.Modified_TestCase.setUp(self)
self.repository_name = 'test_repository1'
# Copy the original repository files provided in the test folder so that
# any modifications made to repository files are restricted to the copies.
# The 'repository_data' directory is expected to exist in 'tuf.tests/'.
original_repository_files = os.path.join(os.getcwd(), 'repository_data')
temporary_repository_root = \
self.make_temp_directory(directory=self.temporary_directory)
# The original repository, keystore, and client directories will be copied
# for each test case.
original_repository = os.path.join(original_repository_files, 'repository')
original_keystore = os.path.join(original_repository_files, 'keystore')
original_client = os.path.join(original_repository_files, 'client')
# Save references to the often-needed client repository directories.
# Test cases need these references to access metadata and target files.
self.repository_directory = \
os.path.join(temporary_repository_root, 'repository')
self.keystore_directory = \
os.path.join(temporary_repository_root, 'keystore')
self.client_directory = os.path.join(temporary_repository_root, 'client')
self.client_metadata = os.path.join(self.client_directory,
self.repository_name, 'metadata')
self.client_metadata_current = os.path.join(self.client_metadata, 'current')
self.client_metadata_previous = os.path.join(self.client_metadata, 'previous')
# Copy the original 'repository', 'client', and 'keystore' directories
# to the temporary repository the test cases can use.
shutil.copytree(original_repository, self.repository_directory)
shutil.copytree(original_client, self.client_directory)
shutil.copytree(original_keystore, self.keystore_directory)
# 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'.
repository_basepath = self.repository_directory[len(os.getcwd()):]
url_prefix = \
'http://localhost:' + str(self.SERVER_PORT) + repository_basepath
# Setting 'tuf.settings.repository_directory' with the temporary client
# directory copied from the original repository files.
tuf.settings.repositories_directory = self.client_directory
self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix,
'metadata_path': 'metadata',
'targets_path': 'targets',
'confined_target_dirs': ['']}}
# Creating a repository instance. The test cases will use this client
# updater to refresh metadata, fetch target files, etc.
self.repository_updater = updater.Updater(self.repository_name,
self.repository_mirrors)
# Metadata role keys are needed by the test cases to make changes to the
# repository (e.g., adding a new target file to 'targets.json' and then
# requesting a refresh()).
self.role_keys = _load_role_keys(self.keystore_directory)
def tearDown(self):
# We are inheriting from custom class.
unittest_toolbox.Modified_TestCase.tearDown(self)
tuf.roledb.clear_roledb(clear_all=True)
tuf.keydb.clear_keydb(clear_all=True)
# UNIT TESTS.
def test_root_rotation(self):
repository = repo_tool.load_repository(self.repository_directory)
repository.root.threshold = 2
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
# Errors, not enough signing keys to satisfy root's threshold.
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall)
repository.root.add_verification_key(self.role_keys['role1']['public'])
repository.root.load_signing_key(self.role_keys['root']['private'])
repository.root.load_signing_key(self.role_keys['role1']['private'])
repository.writeall()
repository.root.add_verification_key(self.role_keys['snapshot']['public'])
repository.root.load_signing_key(self.role_keys['snapshot']['private'])
repository.root.threshold = 3
repository.writeall()
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
self.repository_updater.refresh()
def test_root_rotation_missing_keys(self):
repository = repo_tool.load_repository(self.repository_directory)
# A partially written root.json (threshold = 1, and not signed in this
# case) causes an invalid root chain later.
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
repository.write('root')
repository.write('snapshot')
repository.write('timestamp')
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# Create a new, valid root.json.
repository.root.threshold = 2
repository.root.add_verification_key(self.role_keys['role1']['public'])
repository.root.load_signing_key(self.role_keys['root']['private'])
repository.root.load_signing_key(self.role_keys['role1']['private'])
repository.writeall()
repository.root.add_verification_key(self.role_keys['snapshot']['public'])
repository.root.load_signing_key(self.role_keys['snapshot']['private'])
repository.root.threshold = 3
repository.writeall()
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
try:
self.repository_updater.refresh()
except tuf.exceptions.NoWorkingMirrorError as exception:
for mirror_url, mirror_error in six.iteritems(exception.mirror_errors):
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
url_file = os.path.join(url_prefix, 'metadata', '2.root.json')
# Verify that '2.root.json' is the culprit.
self.assertEqual(url_file.replace('\\', '/'), mirror_url)
self.assertTrue(isinstance(mirror_error,
securesystemslib.exceptions.BadSignatureError))
def test_root_rotation_unmet_threshold(self):
repository = repo_tool.load_repository(self.repository_directory)
# Add verification keys
repository.root.add_verification_key(self.role_keys['root']['public'])
repository.root.add_verification_key(self.role_keys['role1']['public'])
repository.targets.add_verification_key(self.role_keys['targets']['public'])
repository.snapshot.add_verification_key(self.role_keys['snapshot']['public'])
repository.timestamp.add_verification_key(self.role_keys['timestamp']['public'])
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
# Add signing keys
repository.root.load_signing_key(self.role_keys['root']['private'])
repository.root.load_signing_key(self.role_keys['role1']['private'])
# Set root threshold
repository.root.threshold = 2
repository.writeall()
# Unload Root's previous signing keys to ensure that these keys are not
# used by mistake.
repository.root.unload_signing_key(self.role_keys['role1']['private'])
repository.root.unload_signing_key(self.role_keys['root']['private'])
# Add new verification key
repository.root.add_verification_key(self.role_keys['snapshot']['public'])
# Remove one of the original signing keys
repository.root.remove_verification_key(self.role_keys['role1']['public'])
# Set the threshold for the new Root file, but note that the previous
# threshold of 2 must still be met.
repository.root.threshold = 1
repository.root.load_signing_key(self.role_keys['role1']['private'])
repository.root.load_signing_key(self.role_keys['snapshot']['private'])
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
# We use write() rather than writeall() because the latter should fail due
# to the missing self.role_keys['root'] signature.
repository.write('root', increment_version_number=True)
repository.write('snapshot', increment_version_number=True)
repository.write('timestamp', increment_version_number=True)
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# The following refresh should fail because root must be signed by the
# previous self.role_keys['root'] key, which wasn't loaded.
self.assertRaises(tuf.exceptions.NoWorkingMirrorError,
self.repository_updater.refresh)
def _load_role_keys(keystore_directory):
# Populating 'self.role_keys' by importing the required public and private
# keys of 'tuf/tests/repository_data/'. The role keys are needed when
# modifying the remote repository used by the test cases in this unit test.
# The pre-generated key files in 'repository_data/keystore' are all encrypted
# with a 'password' passphrase.
EXPECTED_KEYFILE_PASSWORD = 'password'
# Store and return the cryptography keys of the top-level roles, including 1
# delegated role.
role_keys = {}
root_key_file = os.path.join(keystore_directory, 'root_key')
targets_key_file = os.path.join(keystore_directory, 'targets_key')
snapshot_key_file = os.path.join(keystore_directory, 'snapshot_key')
timestamp_key_file = os.path.join(keystore_directory, 'timestamp_key')
delegation_key_file = os.path.join(keystore_directory, 'delegation_key')
role_keys = {'root': {}, 'targets': {}, 'snapshot': {}, 'timestamp': {},
'role1': {}}
# Import the top-level and delegated role public keys.
role_keys['root']['public'] = \
repo_tool.import_rsa_publickey_from_file(root_key_file+'.pub')
role_keys['targets']['public'] = \
repo_tool.import_ed25519_publickey_from_file(targets_key_file+'.pub')
role_keys['snapshot']['public'] = \
repo_tool.import_ed25519_publickey_from_file(snapshot_key_file+'.pub')
role_keys['timestamp']['public'] = \
repo_tool.import_ed25519_publickey_from_file(timestamp_key_file+'.pub')
role_keys['role1']['public'] = \
repo_tool.import_ed25519_publickey_from_file(delegation_key_file+'.pub')
# Import the private keys of the top-level and delegated roles.
role_keys['root']['private'] = \
repo_tool.import_rsa_privatekey_from_file(root_key_file,
EXPECTED_KEYFILE_PASSWORD)
role_keys['targets']['private'] = \
repo_tool.import_ed25519_privatekey_from_file(targets_key_file,
EXPECTED_KEYFILE_PASSWORD)
role_keys['snapshot']['private'] = \
repo_tool.import_ed25519_privatekey_from_file(snapshot_key_file,
EXPECTED_KEYFILE_PASSWORD)
role_keys['timestamp']['private'] = \
repo_tool.import_ed25519_privatekey_from_file(timestamp_key_file,
EXPECTED_KEYFILE_PASSWORD)
role_keys['role1']['private'] = \
repo_tool.import_ed25519_privatekey_from_file(delegation_key_file,
EXPECTED_KEYFILE_PASSWORD)
return role_keys
if __name__ == '__main__':
unittest.main()