python-tuf/tests/test_key_revocation_integration.py
Martin Vrachev 028d1bc9f7 Make "utils" import more definite
Currently, we are importing the "utils" module in tests/utils
with "import utils".
This could become a problem when there is another module with
the same general name "utils" and could lead to import mistakes.

Signed-off-by: Martin Vrachev <mvrachev@vmware.com>
2020-11-23 22:17:31 +02:00

503 lines
22 KiB
Python
Executable file

#!/usr/bin/env python
# Copyright 2016 - 2017, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""
<Program Name>
test_key_revocation_integration.py
<Author>
Vladimir Diaz.
<Started>
April 28, 2016.
<Copyright>
See LICENSE-MIT OR LICENSE for licensing information.
<Purpose>
Integration test that verifies top-level roles are updated after all of their
keys have been revoked. There are unit tests in 'test_repository_tool.py'
that verify key and role revocation of specific roles, but these should be
expanded to verify key revocations over the span of multiple snapshots of the
repository.
The 'unittest_toolbox.py' module was created to provide additional testing
tools, such as automatically deleting temporary files created in test cases.
For more information on the additional testing tools, see
'tests/unittest_toolbox.py'.
"""
# Help with Python 3 compatibility, where the print statement is a function, an
# implicit relative import is invalid, and the '/' operator performs true
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import os
import shutil
import tempfile
import logging
import unittest
import sys
import tuf
import tuf.log
import tuf.roledb
import tuf.keydb
import tuf.repository_tool as repo_tool
import tuf.unittest_toolbox as unittest_toolbox
import tuf.client.updater as updater
from tests import utils
import securesystemslib
import six
logger = logging.getLogger(__name__)
repo_tool.disable_console_log_messages()
class TestKeyRevocation(unittest_toolbox.Modified_TestCase):
@classmethod
def setUpClass(cls):
# Create a temporary directory to store the repository, metadata, and target
# files. 'temporary_directory' must be deleted in TearDownModule() so that
# temporary files are always removed, even when exceptions occur.
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
# Launch a SimpleHTTPServer (serves files in the current directory). Test
# cases will request metadata and target files that have been pre-generated
# in 'tuf/tests/repository_data', which will be served by the
# SimpleHTTPServer launched here. The test cases of
# 'test_key_revocation.py' assume the pre-generated metadata files have a
# specific structure, such as a delegated role, three target files, five
# key files, etc.
cls.server_process_handler = utils.TestServerProcess(log=logger)
@classmethod
def tearDownClass(cls):
# Cleans the resources and flush the logged lines (if any).
cls.server_process_handler.clean()
# Remove the temporary repository directory, which should contain all the
# metadata, targets, and key files generated for the test cases.
shutil.rmtree(cls.temporary_directory)
def setUp(self):
# We are inheriting from custom class.
unittest_toolbox.Modified_TestCase.setUp(self)
self.repository_name = 'test_repository1'
# Copy the original repository files provided in the test folder so that
# any modifications made to repository files are restricted to the copies.
# The 'repository_data' directory is expected to exist in 'tuf.tests/'.
original_repository_files = os.path.join(os.getcwd(), 'repository_data')
temporary_repository_root = \
self.make_temp_directory(directory=self.temporary_directory)
# The original repository, keystore, and client directories will be copied
# for each test case.
original_repository = os.path.join(original_repository_files, 'repository')
original_keystore = os.path.join(original_repository_files, 'keystore')
original_client = os.path.join(original_repository_files, 'client')
# Save references to the often-needed client repository directories.
# Test cases need these references to access metadata and target files.
self.repository_directory = \
os.path.join(temporary_repository_root, 'repository')
self.keystore_directory = \
os.path.join(temporary_repository_root, 'keystore')
self.client_directory = os.path.join(temporary_repository_root, 'client')
self.client_metadata = os.path.join(self.client_directory,
self.repository_name, 'metadata')
self.client_metadata_current = os.path.join(self.client_metadata, 'current')
self.client_metadata_previous = os.path.join(self.client_metadata, 'previous')
# Copy the original 'repository', 'client', and 'keystore' directories
# to the temporary repository the test cases can use.
shutil.copytree(original_repository, self.repository_directory)
shutil.copytree(original_client, self.client_directory)
shutil.copytree(original_keystore, self.keystore_directory)
# 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'.
repository_basepath = self.repository_directory[len(os.getcwd()):]
url_prefix = 'http://localhost:' \
+ str(self.server_process_handler.port) + repository_basepath
# Setting 'tuf.settings.repository_directory' with the temporary client
# directory copied from the original repository files.
tuf.settings.repositories_directory = self.client_directory
self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix,
'metadata_path': 'metadata',
'targets_path': 'targets'}}
# Creating repository instance. The test cases will use this client
# updater to refresh metadata, fetch target files, etc.
self.repository_updater = updater.Updater(self.repository_name,
self.repository_mirrors)
# Metadata role keys are needed by the test cases to make changes to the
# repository (e.g., adding a new target file to 'targets.json' and then
# requesting a refresh()).
self.role_keys = _load_role_keys(self.keystore_directory)
def tearDown(self):
# We are inheriting from custom class.
unittest_toolbox.Modified_TestCase.tearDown(self)
tuf.roledb.clear_roledb(clear_all=True)
tuf.keydb.clear_keydb(clear_all=True)
# Logs stdout and stderr from the sever subprocess.
self.server_process_handler.flush_log()
# UNIT TESTS.
def test_timestamp_key_revocation(self):
# First verify that the Timestamp role is properly signed. Calling
# refresh() should not raise an exception.
self.repository_updater.refresh()
# There should only be one key for Timestamp. Store the keyid to later
# verify that it has been revoked.
timestamp_roleinfo = tuf.roledb.get_roleinfo('timestamp', self.repository_name)
timestamp_keyid = timestamp_roleinfo['keyids']
self.assertEqual(len(timestamp_keyid), 1)
# Remove 'timestamp_keyid' and add a new key. Verify that the client
# detects the removal and addition of keys to the Timestamp role.
repository = repo_tool.load_repository(self.repository_directory)
repository.timestamp.remove_verification_key(self.role_keys['timestamp']['public'])
repository.timestamp.add_verification_key(self.role_keys['snapshot']['public'])
# Root, Snapshot, and Timestamp must be rewritten. Root must be written
# because the timestamp key has changed; Snapshot, because Root has
# changed, and ...
repository.root.load_signing_key(self.role_keys['root']['private'])
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
repository.timestamp.load_signing_key(self.role_keys['snapshot']['private'])
repository.writeall()
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# The client performs a refresh of top-level metadata to get the latest
# changes.
self.repository_updater.refresh()
# Verify that the client is able to recognize that a new set of keys have
# been added to the Timestamp role.
# First, has 'timestamp_keyid' been removed?
timestamp_roleinfo = tuf.roledb.get_roleinfo('timestamp', self.repository_name)
self.assertTrue(timestamp_keyid not in timestamp_roleinfo['keyids'])
# Second, is Timestamp's new key correct? The new key should be Snapshot's.
self.assertEqual(len(timestamp_roleinfo['keyids']), 1)
snapshot_roleinfo = tuf.roledb.get_roleinfo('snapshot', self.repository_name)
self.assertEqual(timestamp_roleinfo['keyids'], snapshot_roleinfo['keyids'])
def test_snapshot_key_revocation(self):
# First verify that the Snapshot role is properly signed. Calling
# refresh() should not raise an exception.
self.repository_updater.refresh()
# There should only be one key for Snapshot. Store the keyid to later
# verify that it has been revoked.
snapshot_roleinfo = tuf.roledb.get_roleinfo('snapshot', self.repository_name)
snapshot_keyid = snapshot_roleinfo['keyids']
self.assertEqual(len(snapshot_keyid), 1)
# Remove 'snapshot_keyid' and add a new key. Verify that the client
# detects the removal and addition of keys to the Snapshot role.
repository = repo_tool.load_repository(self.repository_directory)
repository.snapshot.remove_verification_key(self.role_keys['snapshot']['public'])
repository.snapshot.add_verification_key(self.role_keys['timestamp']['public'])
# Root, Snapshot, and Timestamp must be rewritten. Root must be written
# because the timestamp key has changed; Snapshot, because Root has
# changed, and Timesamp, because it must sign its metadata with a new key.
repository.root.load_signing_key(self.role_keys['root']['private'])
# Note: we added Timestamp's key to the Snapshot role.
repository.snapshot.load_signing_key(self.role_keys['timestamp']['private'])
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
repository.writeall()
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# The client performs a refresh of top-level metadata to get the latest
# changes.
self.repository_updater.refresh()
# Verify that the client is able to recognize that a new set of keys have
# been added to the Snapshot role.
# First, has 'snapshot_keyid' been removed?
snapshot_roleinfo = tuf.roledb.get_roleinfo('snapshot', self.repository_name)
self.assertTrue(snapshot_keyid not in snapshot_roleinfo['keyids'])
# Second, is Snapshot's new key correct? The new key should be
# Timestamp's.
self.assertEqual(len(snapshot_roleinfo['keyids']), 1)
timestamp_roleinfo = tuf.roledb.get_roleinfo('timestamp', self.repository_name)
self.assertEqual(snapshot_roleinfo['keyids'], timestamp_roleinfo['keyids'])
def test_targets_key_revocation(self):
# First verify that the Targets role is properly signed. Calling
# refresh() should not raise an exception.
self.repository_updater.refresh()
# There should only be one key for Targets. Store the keyid to later
# verify that it has been revoked.
targets_roleinfo = tuf.roledb.get_roleinfo('targets', self.repository_name)
targets_keyid = targets_roleinfo['keyids']
self.assertEqual(len(targets_keyid), 1)
# Remove 'targets_keyid' and add a new key. Verify that the client
# detects the removal and addition of keys to the Targets role.
repository = repo_tool.load_repository(self.repository_directory)
repository.targets.remove_verification_key(self.role_keys['targets']['public'])
repository.targets.add_verification_key(self.role_keys['timestamp']['public'])
# Root, Snapshot, and Timestamp must be rewritten. Root must be written
# because the timestamp key has changed; Snapshot, because Root has
# changed, and Timestamp because it must sign its metadata with a new key.
repository.root.load_signing_key(self.role_keys['root']['private'])
# Note: we added Timestamp's key to the Targets role.
repository.targets.load_signing_key(self.role_keys['timestamp']['private'])
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
repository.writeall()
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# The client performs a refresh of top-level metadata to get the latest
# changes.
self.repository_updater.refresh()
# Verify that the client is able to recognize that a new set of keys have
# been added to the Targets role.
# First, has 'targets_keyid' been removed?
targets_roleinfo = tuf.roledb.get_roleinfo('targets', self.repository_name)
self.assertTrue(targets_keyid not in targets_roleinfo['keyids'])
# Second, is Targets's new key correct? The new key should be
# Timestamp's.
self.assertEqual(len(targets_roleinfo['keyids']), 1)
timestamp_roleinfo = tuf.roledb.get_roleinfo('timestamp', self.repository_name)
self.assertEqual(targets_roleinfo['keyids'], timestamp_roleinfo['keyids'])
def test_root_key_revocation(self):
# First verify that the Root role is properly signed. Calling
# refresh() should not raise an exception.
self.repository_updater.refresh()
# There should only be one key for Root. Store the keyid to later verify
# that it has been revoked.
root_roleinfo = tuf.roledb.get_roleinfo('root', self.repository_name)
root_keyid = root_roleinfo['keyids']
self.assertEqual(len(root_keyid), 1)
# Remove 'root_keyid' and add a new key. Verify that the client detects
# the removal and addition of keys to the Root file.
repository = repo_tool.load_repository(self.repository_directory)
repository.root.add_verification_key(self.role_keys['snapshot']['public'])
repository.root.add_verification_key(self.role_keys['targets']['public'])
repository.root.add_verification_key(self.role_keys['timestamp']['public'])
# Root, Snapshot, and Timestamp must be rewritten. Root must be written
# because the timestamp key has changed; Snapshot, because Root has
# changed, and Timestamp because it must sign its metadata with a new key.
repository.root.load_signing_key(self.role_keys['snapshot']['private'])
repository.root.load_signing_key(self.role_keys['targets']['private'])
repository.root.load_signing_key(self.role_keys['timestamp']['private'])
# Note: We added the Snapshot, Targets, and Timestampkeys to the Root role.
# The Root's expected private key has not been loaded yet, so that we can
# verify that refresh() correctly raises a
# securesystemslib.exceptions.BadSignatureError exception.
repository.snapshot.load_signing_key(self.role_keys['snapshot']['private'])
repository.timestamp.load_signing_key(self.role_keys['timestamp']['private'])
# Root's version number = 2 after the following writeall().
repository.writeall()
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# Note well: The client should reject the new Root file because the
# repository has revoked the only Root key that the client trusts.
try:
self.repository_updater.refresh()
except tuf.exceptions.NoWorkingMirrorError as exception:
for mirror_exception in exception.mirror_errors.values():
self.assertTrue(isinstance(mirror_exception,
securesystemslib.exceptions.BadSignatureError))
repository.root.add_verification_key(self.role_keys['root']['public'])
repository.root.load_signing_key(self.role_keys['root']['private'])
# root, snapshot, and timestamp should be dirty
repository.dirty_roles()
repository.write('root', increment_version_number=False)
repository.write('snapshot')
repository.write('timestamp')
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# Root's version number = 2...
# The client successfully performs a refresh of top-level metadata to get
# the latest changes.
self.repository_updater.refresh()
self.assertEqual(self.repository_updater.metadata['current']['root']['version'], 2)
# Revoke the snapshot and targets keys (added to root) so that multiple
# snapshots are created. Discontinue signing with the old root key now
# that the client has successfully updated (note: the old Root key
# was revoked, but the repository continued signing with it to allow
# the client to update).
repository.root.remove_verification_key(self.role_keys['root']['public'])
repository.root.unload_signing_key(self.role_keys['root']['private'])
repository.root.remove_verification_key(self.role_keys['snapshot']['public'])
repository.root.unload_signing_key(self.role_keys['snapshot']['private'])
repository.writeall()
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# Root's version number = 3...
self.repository_updater.refresh()
repository.root.remove_verification_key(self.role_keys['targets']['public'])
repository.root.unload_signing_key(self.role_keys['targets']['private'])
# The following should fail because root rotation requires the new Root
# to be signed with the previous self.role_keys['targets'] key.
self.assertRaises(tuf.exceptions.UnsignedMetadataError,
repository.writeall)
repository.root.load_signing_key(self.role_keys['targets']['private'])
repository.writeall()
# Move the staged metadata to the "live" metadata.
shutil.rmtree(os.path.join(self.repository_directory, 'metadata'))
shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'),
os.path.join(self.repository_directory, 'metadata'))
# Root's version number = 4...
self.repository_updater.refresh()
self.assertEqual(self.repository_updater.metadata['current']['root']['version'], 4)
# Verify that the client is able to recognize that a new set of keys have
# been added to the Root role.
# First, has 'root_keyid' been removed?
root_roleinfo = tuf.roledb.get_roleinfo('root', self.repository_name)
self.assertTrue(root_keyid not in root_roleinfo['keyids'])
# Second, is Root's new key correct? The new key should be
# Timestamp's.
self.assertEqual(len(root_roleinfo['keyids']), 1)
timestamp_roleinfo = tuf.roledb.get_roleinfo('timestamp', self.repository_name)
self.assertEqual(root_roleinfo['keyids'], timestamp_roleinfo['keyids'])
def _load_role_keys(keystore_directory):
# Populating 'self.role_keys' by importing the required public and private
# keys of 'tuf/tests/repository_data/'. The role keys are needed when
# modifying the remote repository used by the test cases in this unit test.
# The pre-generated key files in 'repository_data/keystore' are all encrypted with
# a 'password' passphrase.
EXPECTED_KEYFILE_PASSWORD = 'password'
# Store and return the cryptography keys of the top-level roles, including 1
# delegated role.
role_keys = {}
root_key_file = os.path.join(keystore_directory, 'root_key')
targets_key_file = os.path.join(keystore_directory, 'targets_key')
snapshot_key_file = os.path.join(keystore_directory, 'snapshot_key')
timestamp_key_file = os.path.join(keystore_directory, 'timestamp_key')
delegation_key_file = os.path.join(keystore_directory, 'delegation_key')
role_keys = {'root': {}, 'targets': {}, 'snapshot': {}, 'timestamp': {},
'role1': {}}
# Import the top-level and delegated role public keys.
role_keys['root']['public'] = \
repo_tool.import_rsa_publickey_from_file(root_key_file+'.pub')
role_keys['targets']['public'] = \
repo_tool.import_ed25519_publickey_from_file(targets_key_file + '.pub')
role_keys['snapshot']['public'] = \
repo_tool.import_ed25519_publickey_from_file(snapshot_key_file + '.pub')
role_keys['timestamp']['public'] = \
repo_tool.import_ed25519_publickey_from_file(timestamp_key_file + '.pub')
role_keys['role1']['public'] = \
repo_tool.import_ed25519_publickey_from_file(delegation_key_file + '.pub')
# Import the private keys of the top-level and delegated roles.
role_keys['root']['private'] = \
repo_tool.import_rsa_privatekey_from_file(root_key_file,
EXPECTED_KEYFILE_PASSWORD)
role_keys['targets']['private'] = \
repo_tool.import_ed25519_privatekey_from_file(targets_key_file,
EXPECTED_KEYFILE_PASSWORD)
role_keys['snapshot']['private'] = \
repo_tool.import_ed25519_privatekey_from_file(snapshot_key_file,
EXPECTED_KEYFILE_PASSWORD)
role_keys['timestamp']['private'] = \
repo_tool.import_ed25519_privatekey_from_file(timestamp_key_file,
EXPECTED_KEYFILE_PASSWORD)
role_keys['role1']['private'] = \
repo_tool.import_ed25519_privatekey_from_file(delegation_key_file,
EXPECTED_KEYFILE_PASSWORD)
return role_keys
if __name__ == '__main__':
utils.configure_test_logging(sys.argv)
unittest.main()