#!/usr/bin/env python # Copyright 2012 - 2017, New York University and the TUF contributors # SPDX-License-Identifier: MIT OR Apache-2.0 """ test_updater.py Konstantin Andrianov. October 15, 2012. March 11, 2014. Refactored to remove mocked modules and old repository tool dependence, use exact repositories, and add realistic retrieval of files. -vladimir.v.diaz See LICENSE-MIT OR LICENSE for licensing information. 'test_updater.py' provides a collection of methods that test the public / non-public methods and functions of 'tuf.client.updater.py'. The 'unittest_toolbox.py' module was created to provide additional testing tools, such as automatically deleting temporary files created in test cases. For more information, see 'tests/unittest_toolbox.py'. Test cases here should follow a specific order (i.e., independent methods are tested before dependent methods). More accurately, least dependent methods are tested before most dependent methods. There is no reason to rewrite or construct other methods that replicate already-tested methods solely for testing purposes. This is possible because the 'unittest.TestCase' class guarantees the order of unit tests. The 'test_something_A' method would be tested before 'test_something_B'. To ensure the expected order of tests, a number is placed after 'test' and before methods name like so: 'test_1_check_directory'. The number is a measure of dependence, where 1 is less dependent than 2. """ # Help with Python 3 compatibility, where the print statement is a function, an # implicit relative import is invalid, and the '/' operator performs true # division. Example: print 'hello world' raises a 'SyntaxError' exception. from __future__ import print_function from __future__ import absolute_import from __future__ import division from __future__ import unicode_literals import os import time import shutil import copy import tempfile import logging import random import subprocess import sys import errno import unittest import tuf import tuf.exceptions import tuf.log import tuf.formats import tuf.keydb import tuf.roledb import tuf.repository_tool as repo_tool import tuf.repository_lib as repo_lib import tuf.unittest_toolbox as unittest_toolbox import tuf.client.updater as updater import securesystemslib import six logger = logging.getLogger('tuf.test_updater') repo_tool.disable_console_log_messages() class TestUpdater(unittest_toolbox.Modified_TestCase): @classmethod def setUpClass(cls): # setUpClass() is called before tests in an individual class are executed. # Create a temporary directory to store the repository, metadata, and target # files. 'temporary_directory' must be deleted in TearDownModule() so that # temporary files are always removed, even when exceptions occur. cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) # Launch a SimpleHTTPServer (serves files in the current directory). # Test cases will request metadata and target files that have been # pre-generated in 'tuf/tests/repository_data', which will be served # by the SimpleHTTPServer launched here. The test cases of 'test_updater.py' # assume the pre-generated metadata files have a specific structure, such # as a delegated role 'targets/role1', three target files, five key files, # etc. cls.SERVER_PORT = random.randint(30000, 45000) command = ['python', '-m', 'tuf.scripts.simple_server', str(cls.SERVER_PORT)] cls.server_process = subprocess.Popen(command, stderr=subprocess.PIPE) logger.info('\n\tServer process started.') logger.info('\tServer process id: '+str(cls.server_process.pid)) logger.info('\tServing on port: '+str(cls.SERVER_PORT)) cls.url = 'http://localhost:'+str(cls.SERVER_PORT) + os.path.sep # NOTE: Following error is raised if a delay is not long enough to allow # the server process to set up and start listening: # # or, on Windows: # Failed to establish a new connection: [Errno 111] Connection refused' # While 0.3s has consistently worked on Travis and local builds, it led to # occasional failures in AppVeyor builds, so increasing this to 2s, sadly. time.sleep(2) @classmethod def tearDownClass(cls): # tearDownModule() is called after all the tests have run. # http://docs.python.org/2/library/unittest.html#class-and-module-fixtures # Kill the SimpleHTTPServer process. if cls.server_process.returncode is None: logger.info('\tServer process ' + str(cls.server_process.pid) + ' terminated.') cls.server_process.kill() # Remove the temporary repository directory, which should contain all the # metadata, targets, and key files generated for the test cases. sleep # for a bit to allow the kill'd server process to terminate. time.sleep(.3) shutil.rmtree(cls.temporary_directory) def setUp(self): # We are inheriting from custom class. unittest_toolbox.Modified_TestCase.setUp(self) tuf.roledb.clear_roledb(clear_all=True) tuf.keydb.clear_keydb(clear_all=True) self.repository_name = 'test_repository1' # Copy the original repository files provided in the test folder so that # any modifications made to repository files are restricted to the copies. # The 'repository_data' directory is expected to exist in 'tuf.tests/'. original_repository_files = os.path.join(os.getcwd(), 'repository_data') temporary_repository_root = \ self.make_temp_directory(directory=self.temporary_directory) # The original repository, keystore, and client directories will be copied # for each test case. original_repository = os.path.join(original_repository_files, 'repository') original_keystore = os.path.join(original_repository_files, 'keystore') original_client = os.path.join(original_repository_files, 'client') # Save references to the often-needed client repository directories. # Test cases need these references to access metadata and target files. self.repository_directory = \ os.path.join(temporary_repository_root, 'repository') self.keystore_directory = \ os.path.join(temporary_repository_root, 'keystore') self.client_directory = os.path.join(temporary_repository_root, 'client') self.client_metadata = os.path.join(self.client_directory, self.repository_name, 'metadata') self.client_metadata_current = os.path.join(self.client_metadata, 'current') self.client_metadata_previous = os.path.join(self.client_metadata, 'previous') # Copy the original 'repository', 'client', and 'keystore' directories # to the temporary repository the test cases can use. shutil.copytree(original_repository, self.repository_directory) shutil.copytree(original_client, self.client_directory) shutil.copytree(original_keystore, self.keystore_directory) # 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'. repository_basepath = self.repository_directory[len(os.getcwd()):] url_prefix = \ 'http://localhost:' + str(self.SERVER_PORT) + repository_basepath # Setting 'tuf.settings.repository_directory' with the temporary client # directory copied from the original repository files. tuf.settings.repositories_directory = self.client_directory self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix, 'metadata_path': 'metadata', 'targets_path': 'targets', 'confined_target_dirs': ['']}} # Creating a repository instance. The test cases will use this client # updater to refresh metadata, fetch target files, etc. self.repository_updater = updater.Updater(self.repository_name, self.repository_mirrors) # Metadata role keys are needed by the test cases to make changes to the # repository (e.g., adding a new target file to 'targets.json' and then # requesting a refresh()). self.role_keys = _load_role_keys(self.keystore_directory) def tearDown(self): # We are inheriting from custom class. unittest_toolbox.Modified_TestCase.tearDown(self) tuf.roledb.clear_roledb(clear_all=True) tuf.keydb.clear_keydb(clear_all=True) # UNIT TESTS. def test_1__init__exceptions(self): # The client's repository requires a metadata directory (and the 'current' # and 'previous' sub-directories), and at least the 'root.json' file. # setUp(), called before each test case, instantiates the required updater # objects and keys. The needed objects/data is available in # 'self.repository_updater', 'self.client_directory', etc. # Test: Invalid arguments. # Invalid 'updater_name' argument. String expected. self.assertRaises(securesystemslib.exceptions.FormatError, updater.Updater, 8, self.repository_mirrors) # Invalid 'repository_mirrors' argument. 'tuf.formats.MIRRORDICT_SCHEMA' # expected. self.assertRaises(securesystemslib.exceptions.FormatError, updater.Updater, updater.Updater, 8) # 'tuf.client.updater.py' requires that the client's repositories directory # be configured in 'tuf.settings.py'. tuf.settings.repositories_directory = None self.assertRaises(tuf.exceptions.RepositoryError, updater.Updater, 'test_repository1', self.repository_mirrors) # Restore 'tuf.settings.repositories_directory' to the original client # directory. tuf.settings.repositories_directory = self.client_directory # Test: empty client repository (i.e., no metadata directory). metadata_backup = self.client_metadata + '.backup' shutil.move(self.client_metadata, metadata_backup) self.assertRaises(tuf.exceptions.RepositoryError, updater.Updater, 'test_repository1', self.repository_mirrors) # Restore the client's metadata directory. shutil.move(metadata_backup, self.client_metadata) # Test: repository with only a '{repository_directory}/metadata' directory. # (i.e., missing the required 'current' and 'previous' sub-directories). current_backup = self.client_metadata_current + '.backup' previous_backup = self.client_metadata_previous + '.backup' shutil.move(self.client_metadata_current, current_backup) shutil.move(self.client_metadata_previous, previous_backup) self.assertRaises(tuf.exceptions.RepositoryError, updater.Updater, 'test_repository1', self.repository_mirrors) # Restore the client's previous directory. The required 'current' directory # is still missing. shutil.move(previous_backup, self.client_metadata_previous) # Test: repository with only a '{repository_directory}/metadata/previous' # directory. self.assertRaises(tuf.exceptions.RepositoryError, updater.Updater, 'test_repository1', self.repository_mirrors) # Restore the client's current directory. shutil.move(current_backup, self.client_metadata_current) # Test: repository with a '{repository_directory}/metadata/current' # directory, but the 'previous' directory is missing. shutil.move(self.client_metadata_previous, previous_backup) self.assertRaises(tuf.exceptions.RepositoryError, updater.Updater, 'test_repository1', self.repository_mirrors) shutil.move(previous_backup, self.client_metadata_previous) # Test: repository missing the required 'root.json' file. client_root_file = os.path.join(self.client_metadata_current, 'root.json') backup_root_file = client_root_file + '.backup' shutil.move(client_root_file, backup_root_file) self.assertRaises(tuf.exceptions.RepositoryError, updater.Updater, 'test_repository1', self.repository_mirrors) # Restore the client's 'root.json file. shutil.move(backup_root_file, client_root_file) # Test: Normal 'tuf.client.updater.Updater' instantiation. updater.Updater('test_repository1', self.repository_mirrors) def test_1__load_metadata_from_file(self): # Setup # Get the 'role1.json' filepath. Manually load the role metadata, and # compare it against the loaded metadata by '_load_metadata_from_file()'. role1_filepath = \ os.path.join(self.client_metadata_current, 'role1.json') role1_meta = securesystemslib.util.load_json_file(role1_filepath) # Load the 'role1.json' file with _load_metadata_from_file, which should # store the loaded metadata in the 'self.repository_updater.metadata' # store. self.assertEqual(len(self.repository_updater.metadata['current']), 4) self.repository_updater._load_metadata_from_file('current', 'role1') # Verify that the correct number of metadata objects has been loaded # (i.e., only the 'root.json' file should have been loaded. self.assertEqual(len(self.repository_updater.metadata['current']), 5) # Verify that the content of root metadata is valid. self.assertEqual(self.repository_updater.metadata['current']['role1'], role1_meta['signed']) # Verify that _load_metadata_from_file() doesn't raise an exception for # improperly formatted metadata, and doesn't load the bad file. with open(role1_filepath, 'ab') as file_object: file_object.write(b'bad JSON data') self.repository_updater._load_metadata_from_file('current', 'role1') self.assertEqual(len(self.repository_updater.metadata['current']), 5) # Test if we fail gracefully if we can't deserialize a meta file self.repository_updater._load_metadata_from_file('current', 'empty_file') self.assertFalse('empty_file' in self.repository_updater.metadata['current']) # Test invalid metadata set argument (must be either # 'current' or 'previous'.) self.assertRaises(securesystemslib.exceptions.Error, self.repository_updater._load_metadata_from_file, 'bad_metadata_set', 'role1') def test_1__rebuild_key_and_role_db(self): # Setup root_roleinfo = tuf.roledb.get_roleinfo('root', self.repository_name) root_metadata = self.repository_updater.metadata['current']['root'] root_threshold = root_metadata['roles']['root']['threshold'] number_of_root_keys = len(root_metadata['keys']) self.assertEqual(root_roleinfo['threshold'], root_threshold) # Ensure we add 2 to the number of root keys (actually, the number of root # keys multiplied by the number of keyid hash algorithms), to include the # delegated targets key (+1 for its sha512 keyid). The delegated roles of # 'targets.json' are also loaded when the repository object is # instantiated. self.assertEqual(number_of_root_keys * 2 + 2, len(tuf.keydb._keydb_dict[self.repository_name])) # Test: normal case. self.repository_updater._rebuild_key_and_role_db() root_roleinfo = tuf.roledb.get_roleinfo('root', self.repository_name) self.assertEqual(root_roleinfo['threshold'], root_threshold) # _rebuild_key_and_role_db() will only rebuild the keys and roles specified # in the 'root.json' file, unlike __init__(). Instantiating an updater # object calls both _rebuild_key_and_role_db() and _import_delegations(). self.assertEqual(number_of_root_keys * 2, len(tuf.keydb._keydb_dict[self.repository_name])) # Test: properly updated roledb and keydb dicts if the Root role changes. root_metadata = self.repository_updater.metadata['current']['root'] root_metadata['roles']['root']['threshold'] = 8 root_metadata['keys'].popitem() self.repository_updater._rebuild_key_and_role_db() root_roleinfo = tuf.roledb.get_roleinfo('root', self.repository_name) self.assertEqual(root_roleinfo['threshold'], 8) self.assertEqual(number_of_root_keys * 2 - 2, len(tuf.keydb._keydb_dict[self.repository_name])) def test_1__update_versioninfo(self): # Tests # Verify that the 'self.versioninfo' dictionary is empty (it starts off # empty and is only populated if _update_versioninfo() is called. versioninfo_dict = self.repository_updater.versioninfo self.assertEqual(len(versioninfo_dict), 0) # Load the versioninfo of the top-level Targets role. This action # populates the 'self.versioninfo' dictionary. self.repository_updater._update_versioninfo('targets.json') self.assertEqual(len(versioninfo_dict), 1) self.assertTrue(tuf.formats.FILEINFODICT_SCHEMA.matches(versioninfo_dict)) # The Snapshot role stores the version numbers of all the roles available # on the repository. Load Snapshot to extract Root's version number # and compare it against the one loaded by 'self.repository_updater'. snapshot_filepath = os.path.join(self.client_metadata_current, 'snapshot.json') snapshot_signable = securesystemslib.util.load_json_file(snapshot_filepath) targets_versioninfo = snapshot_signable['signed']['meta']['targets.json'] # Verify that the manually loaded version number of root.json matches # the one loaded by the updater object. self.assertTrue('targets.json' in versioninfo_dict) self.assertEqual(versioninfo_dict['targets.json'], targets_versioninfo) # Verify that 'self.versioninfo' is incremented if another role is updated. self.repository_updater._update_versioninfo('role1.json') self.assertEqual(len(versioninfo_dict), 2) # Verify that 'self.versioninfo' is incremented if a non-existent role is # requested, and has its versioninfo entry set to 'None'. self.repository_updater._update_versioninfo('bad_role.json') self.assertEqual(len(versioninfo_dict), 3) self.assertEqual(versioninfo_dict['bad_role.json'], None) # Verify that the versioninfo specified in Timestamp is used if the Snapshot # role hasn't been downloaded yet. del self.repository_updater.metadata['current']['snapshot'] #self.assertRaises(self.repository_updater._update_versioninfo('snapshot.json')) self.repository_updater._update_versioninfo('snapshot.json') self.assertEqual(versioninfo_dict['snapshot.json']['version'], 1) def test_1__update_fileinfo(self): # Tests # Verify that the 'self.fileinfo' dictionary is empty (its starts off empty # and is only populated if _update_fileinfo() is called. fileinfo_dict = self.repository_updater.fileinfo self.assertEqual(len(fileinfo_dict), 0) # Load the fileinfo of the top-level root role. This populates the # 'self.fileinfo' dictionary. self.repository_updater._update_fileinfo('root.json') self.assertEqual(len(fileinfo_dict), 1) self.assertTrue(tuf.formats.FILEDICT_SCHEMA.matches(fileinfo_dict)) root_filepath = os.path.join(self.client_metadata_current, 'root.json') length, hashes = securesystemslib.util.get_file_details(root_filepath) root_fileinfo = tuf.formats.make_fileinfo(length, hashes) self.assertTrue('root.json' in fileinfo_dict) self.assertEqual(fileinfo_dict['root.json'], root_fileinfo) # Verify that 'self.fileinfo' is incremented if another role is updated. self.repository_updater._update_fileinfo('targets.json') self.assertEqual(len(fileinfo_dict), 2) # Verify that 'self.fileinfo' is inremented if a non-existent role is # requested, and has its fileinfo entry set to 'None'. self.repository_updater._update_fileinfo('bad_role.json') self.assertEqual(len(fileinfo_dict), 3) self.assertEqual(fileinfo_dict['bad_role.json'], None) def test_2__fileinfo_has_changed(self): # Verify that the method returns 'False' if file info was not changed. root_filepath = os.path.join(self.client_metadata_current, 'root.json') length, hashes = securesystemslib.util.get_file_details(root_filepath) root_fileinfo = tuf.formats.make_fileinfo(length, hashes) self.assertFalse(self.repository_updater._fileinfo_has_changed('root.json', root_fileinfo)) # Verify that the method returns 'True' if length or hashes were changed. new_length = 8 new_root_fileinfo = tuf.formats.make_fileinfo(new_length, hashes) self.assertTrue(self.repository_updater._fileinfo_has_changed('root.json', new_root_fileinfo)) # Hashes were changed. new_hashes = {'sha256': self.random_string()} new_root_fileinfo = tuf.formats.make_fileinfo(length, new_hashes) self.assertTrue(self.repository_updater._fileinfo_has_changed('root.json', new_root_fileinfo)) # Verify that _fileinfo_has_changed() returns True if no fileinfo (or set # to None) exists for some role. self.assertTrue(self.repository_updater._fileinfo_has_changed('bad.json', new_root_fileinfo)) saved_fileinfo = self.repository_updater.fileinfo['root.json'] self.repository_updater.fileinfo['root.json'] = None self.assertTrue(self.repository_updater._fileinfo_has_changed('root.json', new_root_fileinfo)) self.repository_updater.fileinfo['root.json'] = saved_fileinfo new_root_fileinfo['hashes']['sha666'] = '666' self.repository_updater._fileinfo_has_changed('root.json', new_root_fileinfo) def test_2__import_delegations(self): # Setup. # In order to test '_import_delegations' the parent of the delegation # has to be in Repository.metadata['current'], but it has to be inserted # there without using '_load_metadata_from_file()' since it calls # '_import_delegations()'. repository_name = self.repository_updater.repository_name tuf.keydb.clear_keydb(repository_name) tuf.roledb.clear_roledb(repository_name) self.assertEqual(len(tuf.roledb._roledb_dict[repository_name]), 0) self.assertEqual(len(tuf.keydb._keydb_dict[repository_name]), 0) self.repository_updater._rebuild_key_and_role_db() self.assertEqual(len(tuf.roledb._roledb_dict[repository_name]), 4) # Take into account the number of keyids algorithms supported by default, # which this test condition expects to be two (sha256 and sha512). self.assertEqual(4 * 2, len(tuf.keydb._keydb_dict[repository_name])) # Test: pass a role without delegations. self.repository_updater._import_delegations('root') # Verify that there was no change to the roledb and keydb dictionaries by # checking the number of elements in the dictionaries. self.assertEqual(len(tuf.roledb._roledb_dict[repository_name]), 4) # Take into account the number of keyid hash algorithms, which this # test condition expects to be two (for sha256 and sha512). self.assertEqual(len(tuf.keydb._keydb_dict[repository_name]), 4 * 2) # Test: normal case, first level delegation. self.repository_updater._import_delegations('targets') self.assertEqual(len(tuf.roledb._roledb_dict[repository_name]), 5) # The number of root keys (times the number of key hash algorithms) + # delegation's key (+1 for its sha512 keyid). self.assertEqual(len(tuf.keydb._keydb_dict[repository_name]), 4 * 2 + 2) # Verify that roledb dictionary was added. self.assertTrue('role1' in tuf.roledb._roledb_dict[repository_name]) # Verify that keydb dictionary was updated. role1_signable = \ securesystemslib.util.load_json_file(os.path.join(self.client_metadata_current, 'role1.json')) keyids = [] for signature in role1_signable['signatures']: keyids.append(signature['keyid']) for keyid in keyids: self.assertTrue(keyid in tuf.keydb._keydb_dict[repository_name]) # Verify that _import_delegations() ignores invalid keytypes in the 'keys' # field of parent role's 'delegations'. existing_keyid = keyids[0] self.repository_updater.metadata['current']['targets']\ ['delegations']['keys'][existing_keyid]['keytype'] = 'bad_keytype' self.repository_updater._import_delegations('targets') # Restore the keytype of 'existing_keyid'. self.repository_updater.metadata['current']['targets']\ ['delegations']['keys'][existing_keyid]['keytype'] = 'ed25519' # Verify that _import_delegations() raises an exception if one of the # delegated keys is malformed. valid_keyval = self.repository_updater.metadata['current']['targets']\ ['delegations']['keys'][existing_keyid]['keyval'] self.repository_updater.metadata['current']['targets']\ ['delegations']['keys'][existing_keyid]['keyval'] = 1 self.assertRaises(securesystemslib.exceptions.FormatError, self.repository_updater._import_delegations, 'targets') self.repository_updater.metadata['current']['targets']\ ['delegations']['keys'][existing_keyid]['keyval'] = valid_keyval # Verify that _import_delegations() raises an exception if one of the # delegated roles is malformed. self.repository_updater.metadata['current']['targets']\ ['delegations']['roles'][0]['name'] = 1 self.assertRaises(securesystemslib.exceptions.FormatError, self.repository_updater._import_delegations, 'targets') def test_2__versioninfo_has_been_updated(self): # Verify that the method returns 'False' if a versioninfo was not changed. snapshot_filepath = os.path.join(self.client_metadata_current, 'snapshot.json') snapshot_signable = securesystemslib.util.load_json_file(snapshot_filepath) targets_versioninfo = snapshot_signable['signed']['meta']['targets.json'] self.assertFalse(self.repository_updater._versioninfo_has_been_updated('targets.json', targets_versioninfo)) # Verify that the method returns 'True' if Root's version number changes. targets_versioninfo['version'] = 8 self.assertTrue(self.repository_updater._versioninfo_has_been_updated('targets.json', targets_versioninfo)) def test_2__move_current_to_previous(self): # Test case will consist of removing a metadata file from client's # '{client_repository}/metadata/previous' directory, executing the method # and then verifying that the 'previous' directory contains the snapshot # file. previous_snapshot_filepath = os.path.join(self.client_metadata_previous, 'snapshot.json') os.remove(previous_snapshot_filepath) self.assertFalse(os.path.exists(previous_snapshot_filepath)) # Verify that the current 'snapshot.json' is moved to the previous directory. self.repository_updater._move_current_to_previous('snapshot') self.assertTrue(os.path.exists(previous_snapshot_filepath)) def test_2__delete_metadata(self): # This test will verify that 'root' metadata is never deleted. When a role # is deleted verify that the file is not present in the # 'self.repository_updater.metadata' dictionary. self.repository_updater._delete_metadata('root') self.assertTrue('root' in self.repository_updater.metadata['current']) self.repository_updater._delete_metadata('timestamp') self.assertFalse('timestamp' in self.repository_updater.metadata['current']) def test_2__ensure_not_expired(self): # This test condition will verify that nothing is raised when a metadata # file has a future expiration date. root_metadata = self.repository_updater.metadata['current']['root'] self.repository_updater._ensure_not_expired(root_metadata, 'root') # 'tuf.exceptions.ExpiredMetadataError' should be raised in this next test condition, # because the expiration_date has expired by 10 seconds. expires = tuf.formats.unix_timestamp_to_datetime(int(time.time() - 10)) expires = expires.isoformat() + 'Z' root_metadata['expires'] = expires # Ensure the 'expires' value of the root file is valid by checking the # the formats of the 'root.json' object. self.assertTrue(tuf.formats.ROOT_SCHEMA.matches(root_metadata)) self.assertRaises(tuf.exceptions.ExpiredMetadataError, self.repository_updater._ensure_not_expired, root_metadata, 'root') def test_3__update_metadata(self): # Setup # _update_metadata() downloads, verifies, and installs the specified # metadata role. Remove knowledge of currently installed metadata and # verify that they are re-installed after calling _update_metadata(). # This is the default metadata that we would create for the timestamp role, # because it has no signed metadata for itself. DEFAULT_TIMESTAMP_FILELENGTH = tuf.settings.DEFAULT_TIMESTAMP_REQUIRED_LENGTH # This is the the upper bound length for Targets metadata. DEFAULT_TARGETS_FILELENGTH = tuf.settings.DEFAULT_TARGETS_REQUIRED_LENGTH # Save the versioninfo of 'targets.json,' needed later when re-installing # with _update_metadata(). targets_versioninfo = \ self.repository_updater.metadata['current']['snapshot']['meta']\ ['targets.json'] # Remove the currently installed metadata from the store and disk. Verify # that the metadata dictionary is re-populated after calling # _update_metadata(). del self.repository_updater.metadata['current']['timestamp'] del self.repository_updater.metadata['current']['targets'] timestamp_filepath = \ os.path.join(self.client_metadata_current, 'timestamp.json') targets_filepath = os.path.join(self.client_metadata_current, 'targets.json') root_filepath = os.path.join(self.client_metadata_current, 'root.json') os.remove(timestamp_filepath) os.remove(targets_filepath) # Test: normal case. # Verify 'timestamp.json' is properly installed. self.assertFalse('timestamp' in self.repository_updater.metadata) logger.info('\nroleinfo: ' + repr(tuf.roledb.get_rolenames(self.repository_name))) self.repository_updater._update_metadata('timestamp', DEFAULT_TIMESTAMP_FILELENGTH) self.assertTrue('timestamp' in self.repository_updater.metadata['current']) os.path.exists(timestamp_filepath) # Verify 'targets.json' is properly installed. self.assertFalse('targets' in self.repository_updater.metadata['current']) self.repository_updater._update_metadata('targets', DEFAULT_TARGETS_FILELENGTH, targets_versioninfo['version']) self.assertTrue('targets' in self.repository_updater.metadata['current']) targets_signable = securesystemslib.util.load_json_file(targets_filepath) loaded_targets_version = targets_signable['signed']['version'] self.assertEqual(targets_versioninfo['version'], loaded_targets_version) # Test: Invalid / untrusted version numbers. # Invalid version number for 'targets.json'. self.assertRaises(tuf.exceptions.NoWorkingMirrorError, self.repository_updater._update_metadata, 'targets', DEFAULT_TARGETS_FILELENGTH, 88) # Verify that the specific exception raised is correct for the previous # case. try: self.repository_updater._update_metadata('targets', DEFAULT_TARGETS_FILELENGTH, 88) except tuf.exceptions.NoWorkingMirrorError as e: for mirror_error in six.itervalues(e.mirror_errors): assert isinstance(mirror_error, securesystemslib.exceptions.BadVersionNumberError) # Verify that the specific exception raised is correct for the previous # case. The version number is checked, so the specific error in # this case should be 'securesystemslib.exceptions.BadVersionNumberError'. try: self.repository_updater._update_metadata('targets', DEFAULT_TARGETS_FILELENGTH, 88) except tuf.exceptions.NoWorkingMirrorError as e: for mirror_error in six.itervalues(e.mirror_errors): assert isinstance(mirror_error, securesystemslib.exceptions.BadVersionNumberError) def test_3__get_metadata_file(self): valid_tuf_version = tuf.formats.TUF_VERSION_NUMBER tuf.formats.TUF_VERSION_NUMBER = '2' repository = repo_tool.load_repository(self.repository_directory) repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) repository.writeall() # Move the staged metadata to the "live" metadata. shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), os.path.join(self.repository_directory, 'metadata')) upperbound_filelength = tuf.settings.DEFAULT_TIMESTAMP_REQUIRED_LENGTH try: self.repository_updater._get_metadata_file('timestamp', 'timestamp.json', upperbound_filelength, 1) except tuf.exceptions.NoWorkingMirrorError as e: for mirror_error in six.itervalues(e.mirror_errors): assert isinstance(mirror_error, securesystemslib.exceptions.BadVersionNumberError) # Test for an improperly formatted TUF version number. tuf.formats.TUF_VERSION_NUMBER = 'BAD' repository = repo_tool.load_repository(self.repository_directory) repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) repository.writeall() # Move the staged metadata to the "live" metadata. shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), os.path.join(self.repository_directory, 'metadata')) try: self.repository_updater._get_metadata_file('timestamp', 'timestamp.json', upperbound_filelength, 1) except tuf.exceptions.NoWorkingMirrorError as e: for mirror_error in six.itervalues(e.mirror_errors): assert isinstance(mirror_error, securesystemslib.exceptions.FormatError) # Reset the TUF_VERSION_NUMBER so that subsequent unit tests use the # expected value. tuf.formats.TUF_VERSION_NUMBER = valid_tuf_version def test_3__update_metadata_if_changed(self): # Setup. # The client repository is initially loaded with only four top-level roles. # Verify that the metadata store contains the metadata of only these four # roles before updating the metadata of 'targets.json'. self.assertEqual(len(self.repository_updater.metadata['current']), 4) self.assertTrue('targets' in self.repository_updater.metadata['current']) targets_path = os.path.join(self.client_metadata_current, 'targets.json') self.assertTrue(os.path.exists(targets_path)) self.assertEqual(self.repository_updater.metadata['current']['targets']['version'], 1) # Test: normal case. Update 'targets.json'. The version number should not # change. self.repository_updater._update_metadata_if_changed('targets') # Verify the current version of 'targets.json' has not changed. self.assertEqual(self.repository_updater.metadata['current']['targets']['version'], 1) # Modify one target file on the remote repository. repository = repo_tool.load_repository(self.repository_directory) target3 = os.path.join(self.repository_directory, 'targets', 'file3.txt') repository.targets.add_target(target3) repository.root.version = repository.root.version + 1 repository.root.load_signing_key(self.role_keys['root']['private']) repository.targets.load_signing_key(self.role_keys['targets']['private']) repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) repository.writeall() # Move the staged metadata to the "live" metadata. shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), os.path.join(self.repository_directory, 'metadata')) # Update 'targets.json' and verify that the client's current 'targets.json' # has been updated. 'timestamp' and 'snapshot' must be manually updated # so that new 'targets' can be recognized. DEFAULT_TIMESTAMP_FILELENGTH = tuf.settings.DEFAULT_TIMESTAMP_REQUIRED_LENGTH self.repository_updater._update_metadata('timestamp', DEFAULT_TIMESTAMP_FILELENGTH) self.repository_updater._update_metadata_if_changed('snapshot', 'timestamp') self.repository_updater._update_metadata_if_changed('targets') self.repository_updater._update_metadata_if_changed('root') targets_path = os.path.join(self.client_metadata_current, 'targets.json') self.assertTrue(os.path.exists(targets_path)) self.assertTrue(self.repository_updater.metadata['current']['targets']) self.assertEqual(self.repository_updater.metadata['current']['targets']['version'], 2) # Test for an invalid 'referenced_metadata' argument. self.assertRaises(tuf.exceptions.RepositoryError, self.repository_updater._update_metadata_if_changed, 'snapshot', 'bad_role') def test_3__targets_of_role(self): # Setup. # Extract the list of targets from 'targets.json', to be compared to what # is returned by _targets_of_role('targets'). targets_in_metadata = \ self.repository_updater.metadata['current']['targets']['targets'] # Test: normal case. targetinfos_list = self.repository_updater._targets_of_role('targets') # Verify that the list of targets was returned, and that it contains valid # target files. self.assertTrue(tuf.formats.TARGETINFOS_SCHEMA.matches(targetinfos_list)) for targetinfo in targetinfos_list: self.assertTrue((targetinfo['filepath'], targetinfo['fileinfo']) in six.iteritems(targets_in_metadata)) def test_4_refresh(self): # This unit test is based on adding an extra target file to the # server and rebuilding all server-side metadata. All top-level metadata # should be updated when the client calls refresh(). # First verify that an expired root metadata is updated. expired_date = '1960-01-01T12:00:00Z' self.repository_updater.metadata['current']['root']['expires'] = expired_date self.repository_updater.refresh() # Second, verify that expired root metadata is not updated if # 'unsafely_update_root_if_necessary' is explictly set to 'False'. expired_date = '1960-01-01T12:00:00Z' self.repository_updater.metadata['current']['root']['expires'] = expired_date self.assertRaises(tuf.exceptions.ExpiredMetadataError, self.repository_updater.refresh, unsafely_update_root_if_necessary=False) repository = repo_tool.load_repository(self.repository_directory) target3 = os.path.join(self.repository_directory, 'targets', 'file3.txt') repository.targets.add_target(target3) repository.targets.load_signing_key(self.role_keys['targets']['private']) repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) repository.writeall() # Move the staged metadata to the "live" metadata. shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), os.path.join(self.repository_directory, 'metadata')) # Reference 'self.Repository.metadata['current']['targets']'. Ensure # 'target3' is not already specified. targets_metadata = self.repository_updater.metadata['current']['targets'] self.assertFalse(target3 in targets_metadata['targets']) # Verify the expected version numbers of the roles to be modified. self.assertEqual(self.repository_updater.metadata['current']['targets']\ ['version'], 1) self.assertEqual(self.repository_updater.metadata['current']['snapshot']\ ['version'], 1) self.assertEqual(self.repository_updater.metadata['current']['timestamp']\ ['version'], 1) # Test: normal case. 'targes.json' should now specify 'target3', and the # following top-level metadata should have also been updated: # 'snapshot.json' and 'timestamp.json'. self.repository_updater.refresh() # Verify that the client's metadata was updated. targets_metadata = self.repository_updater.metadata['current']['targets'] targets_directory = os.path.join(self.repository_directory, 'targets') target3 = target3[len(targets_directory) + 1:] self.assertTrue(target3 in targets_metadata['targets']) # Verify the expected version numbers of the updated roles. self.assertEqual(self.repository_updater.metadata['current']['targets']\ ['version'], 2) self.assertEqual(self.repository_updater.metadata['current']['snapshot']\ ['version'], 2) self.assertEqual(self.repository_updater.metadata['current']['timestamp']\ ['version'], 2) def test_4__refresh_targets_metadata(self): # Setup. # It is assumed that the client repository has only loaded the top-level # metadata. Refresh the 'targets.json' metadata, including all delegated # roles (i.e., the client should add the missing 'role1.json' metadata. self.assertEqual(len(self.repository_updater.metadata['current']), 4) # Test: normal case. self.repository_updater._refresh_targets_metadata(refresh_all_delegated_roles=True) # Verify that client's metadata files were refreshed successfully. self.assertEqual(len(self.repository_updater.metadata['current']), 6) # Test for non-existing rolename. self.repository_updater._refresh_targets_metadata('bad_rolename', refresh_all_delegated_roles=False) # Test that non-json metadata in Snapshot is ignored. self.repository_updater.metadata['current']['snapshot']['meta']['bad_role.xml'] = {} self.repository_updater._refresh_targets_metadata(refresh_all_delegated_roles=True) def test_5_all_targets(self): # Setup # As with '_refresh_targets_metadata()', # Update top-level metadata before calling one of the "targets" methods, as # recommended by 'updater.py'. self.repository_updater.refresh() # Test: normal case. all_targets = self.repository_updater.all_targets() # Verify format of 'all_targets', it should correspond to # 'TARGETINFOS_SCHEMA'. self.assertTrue(tuf.formats.TARGETINFOS_SCHEMA.matches(all_targets)) # Verify that there is a correct number of records in 'all_targets' list, # and the expected filepaths specified in the metadata. On the targets # directory of the repository, there should be 3 target files (2 of # which are specified by 'targets.json'.) The delegated role 'role1' # specifies 1 target file. The expected total number targets in # 'all_targets' should be 3. self.assertEqual(len(all_targets), 3) target_filepaths = [] for target in all_targets: target_filepaths.append(target['filepath']) self.assertTrue('file1.txt' in target_filepaths) self.assertTrue('file2.txt' in target_filepaths) self.assertTrue('file3.txt' in target_filepaths) def test_5_targets_of_role(self): # Setup # Remove knowledge of 'targets.json' from the metadata store. self.repository_updater.metadata['current']['targets'] # Remove the metadata of the delegated roles. #shutil.rmtree(os.path.join(self.client_metadata, 'targets')) os.remove(os.path.join(self.client_metadata_current, 'targets.json')) # Extract the target files specified by the delegated role, 'role1.json', # as available on the server-side version of the role. role1_filepath = os.path.join(self.repository_directory, 'metadata', 'role1.json') role1_signable = securesystemslib.util.load_json_file(role1_filepath) expected_targets = role1_signable['signed']['targets'] # Test: normal case. targetinfos = self.repository_updater.targets_of_role('role1') # Verify that the expected role files were downloaded and installed. os.path.exists(os.path.join(self.client_metadata_current, 'targets.json')) os.path.exists(os.path.join(self.client_metadata_current, 'targets', 'role1.json')) self.assertTrue('targets' in self.repository_updater.metadata['current']) self.assertTrue('role1' in self.repository_updater.metadata['current']) # Verify that list of targets was returned and that it contains valid # target files. self.assertTrue(tuf.formats.TARGETINFOS_SCHEMA.matches(targetinfos)) for targetinfo in targetinfos: self.assertTrue((targetinfo['filepath'], targetinfo['fileinfo']) in six.iteritems(expected_targets)) # Test: Invalid arguments. # targets_of_role() expected a string rolename. self.assertRaises(securesystemslib.exceptions.FormatError, self.repository_updater.targets_of_role, 8) self.assertRaises(tuf.exceptions.UnknownRoleError, self.repository_updater.targets_of_role, 'unknown_rolename') def test_6_get_one_valid_targetinfo(self): # Setup # Unlike some of the other tests, start up a fresh server here. # The SimpleHTTPServer started in the setupclass has a tendency to # timeout in Windows after a few tests. SERVER_PORT = random.randint(30000, 45000) command = ['python', '-m', 'tuf.scripts.simple_server', str(SERVER_PORT)] server_process = subprocess.Popen(command, stderr=subprocess.PIPE) # NOTE: Following error is raised if a delay is not long enough: # # or, on Windows: # Failed to establish a new connection: [Errno 111] Connection refused' # While 0.3s has consistently worked on Travis and local builds, it led to # occasional failures in AppVeyor builds, so increasing this to 2s, sadly. time.sleep(2) # 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'. repository_basepath = self.repository_directory[len(os.getcwd()):] url_prefix = \ 'http://localhost:' + str(SERVER_PORT) + repository_basepath self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix, 'metadata_path': 'metadata', 'targets_path': 'targets', 'confined_target_dirs': ['']}} # Creating a repository instance. The test cases will use this client # updater to refresh metadata, fetch target files, etc. self.repository_updater = updater.Updater(self.repository_name, self.repository_mirrors) # Extract the file information of the targets specified in 'targets.json'. self.repository_updater.refresh() targets_metadata = self.repository_updater.metadata['current']['targets'] target_files = targets_metadata['targets'] # Extract random target from 'target_files', which will be compared to what # is returned by get_one_valid_targetinfo(). Restore the popped target # (dict value stored in the metadata store) so that it can be found later. filepath, fileinfo = target_files.popitem() target_files[filepath] = fileinfo target_targetinfo = self.repository_updater.get_one_valid_targetinfo(filepath) self.assertTrue(tuf.formats.TARGETINFO_SCHEMA.matches(target_targetinfo)) self.assertEqual(target_targetinfo['filepath'], filepath) self.assertEqual(target_targetinfo['fileinfo'], fileinfo) # Test: invalid target path. self.assertRaises(tuf.exceptions.UnknownTargetError, self.repository_updater.get_one_valid_targetinfo, self.random_path().lstrip(os.sep).lstrip('/')) # Test updater.get_one_valid_targetinfo() backtracking behavior (enabled by # default.) targets_directory = os.path.join(self.repository_directory, 'targets') foo_directory = os.path.join(targets_directory, 'foo') foo_pattern = 'foo/foo*.tar.gz' os.makedirs(foo_directory) foo_package = os.path.join(foo_directory, 'foo1.1.tar.gz') with open(foo_package, 'wb') as file_object: file_object.write(b'new release') # Modify delegations on the remote repository to test backtracking behavior. repository = repo_tool.load_repository(self.repository_directory) repository.targets.delegate('role3', [self.role_keys['targets']['public']], [foo_pattern]) repository.targets.delegate('role4', [self.role_keys['targets']['public']], [foo_pattern], list_of_targets=[foo_package]) repository.targets('role4').add_target(foo_package) repository.targets.load_signing_key(self.role_keys['targets']['private']) repository.targets('role3').load_signing_key(self.role_keys['targets']['private']) repository.targets('role4').load_signing_key(self.role_keys['targets']['private']) repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) repository.writeall() # Move the staged metadata to the "live" metadata. shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), os.path.join(self.repository_directory, 'metadata')) # updater.get_one_valid_targetinfo() should find 'foo1.1.tar.gz' by # backtracking to 'role3'. 'role2' allows backtracking. self.repository_updater.refresh() self.repository_updater.get_one_valid_targetinfo('foo/foo1.1.tar.gz') # A leading path separator is disallowed. self.assertRaises(tuf.exceptions.FormatError, self.repository_updater.get_one_valid_targetinfo, '/foo/foo1.1.tar.gz') # Test when 'role2' does *not* allow backtracking. If 'foo/foo1.1.tar.gz' # is not provided by the authoritative 'role2', # updater.get_one_valid_targetinfo() should return a # 'tuf.exceptions.UnknownTargetError' exception. repository = repo_tool.load_repository(self.repository_directory) repository.targets.revoke('role3') repository.targets.revoke('role4') # Ensure we delegate in trusted order (i.e., 'role2' has higher priority.) repository.targets.delegate('role3', [self.role_keys['targets']['public']], [foo_pattern], terminating=True, list_of_targets=[]) repository.targets.delegate('role4', [self.role_keys['targets']['public']], [foo_pattern], list_of_targets=[foo_package]) repository.targets('role3').load_signing_key(self.role_keys['targets']['private']) repository.targets('role4').load_signing_key(self.role_keys['targets']['private']) repository.targets.load_signing_key(self.role_keys['targets']['private']) repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) repository.writeall() # Move the staged metadata to the "live" metadata. shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), os.path.join(self.repository_directory, 'metadata')) # Verify that 'tuf.exceptions.UnknownTargetError' is raised by # updater.get_one_valid_targetinfo(). self.repository_updater.refresh() self.assertRaises(tuf.exceptions.UnknownTargetError, self.repository_updater.get_one_valid_targetinfo, 'foo/foo1.1.tar.gz') # Verify that a 'tuf.exceptions.FormatError' is raised for delegated paths # that contain a leading path separator. self.assertRaises(tuf.exceptions.FormatError, self.repository_updater.get_one_valid_targetinfo, '/foo/foo1.1.tar.gz') server_process.kill() def test_6_download_target(self): # Create temporary directory (destination directory of downloaded targets) # that will be passed as an argument to 'download_target()'. destination_directory = self.make_temp_directory() target_filepaths = \ list(self.repository_updater.metadata['current']['targets']['targets'].keys()) # Test: normal case. # Get the target info, which is an argument to 'download_target()'. # 'target_filepaths' is expected to have at least two targets. The first # target will be used to test against download_target(). The second # will be used to test against download_target() and a repository with # 'consistent_snapshot' set to True. target_filepath1 = target_filepaths.pop() targetinfo = self.repository_updater.get_one_valid_targetinfo(target_filepath1) self.repository_updater.download_target(targetinfo, destination_directory) download_filepath = \ os.path.join(destination_directory, target_filepath1.lstrip('/')) self.assertTrue(os.path.exists(download_filepath)) length, hashes = \ securesystemslib.util.get_file_details(download_filepath, securesystemslib.settings.HASH_ALGORITHMS) download_targetfileinfo = tuf.formats.make_fileinfo(length, hashes) # Add any 'custom' data from the repository's target fileinfo to the # 'download_targetfileinfo' object being tested. if 'custom' in targetinfo['fileinfo']: download_targetfileinfo['custom'] = targetinfo['fileinfo']['custom'] self.assertEqual(targetinfo['fileinfo'], download_targetfileinfo) # Test when consistent snapshots is set. First, create a valid # repository with consistent snapshots set (root.json contains a # "consistent_snapshot" entry that the updater uses to correctly fetch # snapshots. The updater expects the existence of # '.filename' files if root.json sets 'consistent_snapshot # = True'. # The repository must be rewritten with 'consistent_snapshot' set. repository = repo_tool.load_repository(self.repository_directory) # Write metadata for all the top-level roles , since consistent snapshot # is now being set to true (i.e., the pre-generated repository isn't set # to support consistent snapshots. A new version of targets.json is needed # to ensure .filename target files are written to disk. repository.targets.load_signing_key(self.role_keys['targets']['private']) repository.root.load_signing_key(self.role_keys['root']['private']) repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) repository.writeall(consistent_snapshot=True) # Move the staged metadata to the "live" metadata. shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), os.path.join(self.repository_directory, 'metadata')) # And ensure the client has the latest top-level metadata. self.repository_updater.refresh() target_filepath2 = target_filepaths.pop() targetinfo2 = self.repository_updater.get_one_valid_targetinfo(target_filepath2) self.repository_updater.download_target(targetinfo2, destination_directory) # Test for a destination that cannot be written to (apart from a target # file that already exists at the destination) and which raises an # exception. bad_destination_directory = 'bad' * 2000 try: self.repository_updater.download_target(targetinfo, bad_destination_directory) except OSError as e: self.assertTrue(e.errno == errno.ENAMETOOLONG or e.errno == errno.ENOENT) # Test: Invalid arguments. self.assertRaises(securesystemslib.exceptions.FormatError, self.repository_updater.download_target, 8, destination_directory) self.assertRaises(securesystemslib.exceptions.FormatError, self.repository_updater.download_target, targetinfo, 8) # Test: # Attempt a file download of a valid target, however, a download exception # occurs because the target is not within the mirror's confined target # directories. Adjust mirrors dictionary, so that 'confined_target_dirs' # field contains at least one confined target and excludes needed target # file. mirrors = self.repository_updater.mirrors for mirror_name, mirror_info in six.iteritems(mirrors): mirrors[mirror_name]['confined_target_dirs'] = [self.random_path()] try: self.repository_updater.download_target(targetinfo, destination_directory) except tuf.exceptions.NoWorkingMirrorError as exception: # Ensure that no mirrors were found due to mismatch in confined target # directories. get_list_of_mirrors() returns an empty list in this case, # which does not generate specific exception errors. self.assertEqual(len(exception.mirror_errors), 0) def test_7_updated_targets(self): # Verify that the list of targets returned by updated_targets() contains # all the files that need to be updated, these files include modified and # new target files. Also, confirm that files that need not to be updated # are absent from the list. # Setup # Unlike some of the other tests, start up a fresh server here. # The SimpleHTTPServer started in the setupclass has a tendency to # timeout in Windows after a few tests. SERVER_PORT = random.randint(30000, 45000) command = ['python', '-m', 'tuf.scripts.simple_server', str(SERVER_PORT)] server_process = subprocess.Popen(command, stderr=subprocess.PIPE) # NOTE: Following error is raised if a delay is not long enough to allow # the server process to set up and start listening: # # or, on Windows: # Failed to establish a new connection: [Errno 111] Connection refused' # While 0.3s has consistently worked on Travis and local builds, it led to # occasional failures in AppVeyor builds, so increasing this to 2s, sadly. time.sleep(2) # 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'. repository_basepath = self.repository_directory[len(os.getcwd()):] url_prefix = \ 'http://localhost:' + str(SERVER_PORT) + repository_basepath # Setting 'tuf.settings.repository_directory' with the temporary client # directory copied from the original repository files. tuf.settings.repositories_directory = self.client_directory self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix, 'metadata_path': 'metadata', 'targets_path': 'targets', 'confined_target_dirs': ['']}} # Creating a repository instance. The test cases will use this client # updater to refresh metadata, fetch target files, etc. self.repository_updater = updater.Updater(self.repository_name, self.repository_mirrors) # Create temporary directory which will hold client's target files. destination_directory = self.make_temp_directory() # Get the list of target files. It will be used as an argument to the # 'updated_targets()' function. all_targets = self.repository_updater.all_targets() # Test for duplicates and targets in the root directory of the repository. additional_target = all_targets[0].copy() all_targets.append(additional_target) additional_target_in_root_directory = additional_target.copy() additional_target_in_root_directory['filepath'] = 'file1.txt' all_targets.append(additional_target_in_root_directory) # At this point client needs to update and download all targets. # Test: normal cases. updated_targets = \ self.repository_updater.updated_targets(all_targets, destination_directory) all_targets = self.repository_updater.all_targets() # Assumed the pre-generated repository specifies two target files in # 'targets.json' and one delegated target file in 'role1.json'. self.assertEqual(len(updated_targets), 3) # Test: download one of the targets. download_target = copy.deepcopy(updated_targets).pop() self.repository_updater.download_target(download_target, destination_directory) updated_targets = \ self.repository_updater.updated_targets(all_targets, destination_directory) self.assertEqual(len(updated_targets), 2) # Test: download all the targets. for download_target in all_targets: self.repository_updater.download_target(download_target, destination_directory) updated_targets = \ self.repository_updater.updated_targets(all_targets, destination_directory) self.assertEqual(len(updated_targets), 0) # Test: Invalid arguments. self.assertRaises(securesystemslib.exceptions.FormatError, self.repository_updater.updated_targets, 8, destination_directory) self.assertRaises(securesystemslib.exceptions.FormatError, self.repository_updater.updated_targets, all_targets, 8) # Modify one target file on the remote repository. repository = repo_tool.load_repository(self.repository_directory) target1 = os.path.join(self.repository_directory, 'targets', 'file1.txt') repository.targets.remove_target(os.path.basename(target1)) length, hashes = securesystemslib.util.get_file_details(target1) repository.targets.add_target(target1) repository.targets.load_signing_key(self.role_keys['targets']['private']) repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) with open(target1, 'ab') as file_object: file_object.write(b'append extra text') length, hashes = securesystemslib.util.get_file_details(target1) repository.targets.add_target(target1) repository.targets.load_signing_key(self.role_keys['targets']['private']) repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) repository.writeall() # Move the staged metadata to the "live" metadata. shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), os.path.join(self.repository_directory, 'metadata')) # Ensure the client has up-to-date metadata. self.repository_updater.refresh() # Verify that the new target file is considered updated. all_targets = self.repository_updater.all_targets() updated_targets = \ self.repository_updater.updated_targets(all_targets, destination_directory) self.assertEqual(len(updated_targets), 1) server_process.kill() def test_8_remove_obsolete_targets(self): # Setup. # Unlike some of the other tests, start up a fresh server here. # The SimpleHTTPServer started in the setupclass has a tendency to # timeout in Windows after a few tests. SERVER_PORT = random.randint(30000, 45000) command = ['python', '-m', 'tuf.scripts.simple_server', str(SERVER_PORT)] server_process = subprocess.Popen(command, stderr=subprocess.PIPE) # NOTE: Following error is raised if a delay is not long enough to allow # the server process to set up and start listening: # # or, on Windows: # Failed to establish a new connection: [Errno 111] Connection refused' # While 0.3s has consistently worked on Travis and local builds, it led to # occasional failures in AppVeyor builds, so increasing this to 2s, sadly. time.sleep(2) # 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'. repository_basepath = self.repository_directory[len(os.getcwd()):] url_prefix = \ 'http://localhost:' + str(SERVER_PORT) + repository_basepath # Setting 'tuf.settings.repository_directory' with the temporary client # directory copied from the original repository files. tuf.settings.repositories_directory = self.client_directory self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix, 'metadata_path': 'metadata', 'targets_path': 'targets', 'confined_target_dirs': ['']}} # Creating a repository instance. The test cases will use this client # updater to refresh metadata, fetch target files, etc. self.repository_updater = updater.Updater(self.repository_name, self.repository_mirrors) # Create temporary directory that will hold the client's target files. destination_directory = self.make_temp_directory() # Populate 'destination_direction' with all target files. all_targets = self.repository_updater.all_targets() self.assertEqual(len(os.listdir(destination_directory)), 0) for target in all_targets: self.repository_updater.download_target(target, destination_directory) self.assertEqual(len(os.listdir(destination_directory)), 3) # Remove two target files from the server's repository. repository = repo_tool.load_repository(self.repository_directory) target1 = os.path.join(self.repository_directory, 'targets', 'file1.txt') repository.targets.remove_target(os.path.basename(target1)) repository.targets.load_signing_key(self.role_keys['targets']['private']) repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) repository.writeall() # Move the staged metadata to the "live" metadata. shutil.rmtree(os.path.join(self.repository_directory, 'metadata')) shutil.copytree(os.path.join(self.repository_directory, 'metadata.staged'), os.path.join(self.repository_directory, 'metadata')) # Update client's metadata. self.repository_updater.refresh() # Test: normal case. # Verify number of target files in 'destination_directory' (should be 1 # after the update made to the remote repository), and call # 'remove_obsolete_targets()'. all_targets = self.repository_updater.all_targets() updated_targets = \ self.repository_updater.updated_targets(all_targets, destination_directory) for updated_target in updated_targets: self.repository_updater.download_target(updated_target, destination_directory) self.assertEqual(len(os.listdir(destination_directory)), 3) self.repository_updater.remove_obsolete_targets(destination_directory) self.assertEqual(len(os.listdir(destination_directory)), 2) # Verify that, if there are no obsolete files, the number of files # in 'destination_directory' remains the same. self.repository_updater.remove_obsolete_targets(destination_directory) self.assertEqual(len(os.listdir(destination_directory)), 2) # Test coverage for a destination path that causes an exception not due # to an already removed target. bad_destination_directory = 'bad' * 2000 self.repository_updater.remove_obsolete_targets(bad_destination_directory) # Test coverage for a target that is not specified in current metadata. del self.repository_updater.metadata['current']['targets']['targets']['file2.txt'] self.repository_updater.remove_obsolete_targets(destination_directory) # Test coverage for a role that doesn't exist in the previously trusted set # of metadata. del self.repository_updater.metadata['previous']['targets'] self.repository_updater.remove_obsolete_targets(destination_directory) server_process.kill() def test_9__get_target_hash(self): # Test normal case. # Test target filepaths with ascii and non-ascii characters. expected_target_hashes = { '/file1.txt': 'e3a3d89eb3b70ce3fbce6017d7b8c12d4abd5635427a0e8a238f53157df85b3d', '/Jalape\xc3\xb1o': '78bfd5c314680545eb48ecad508aceb861f8d6e680f4fe1b791da45c298cda88' } for filepath, target_hash in six.iteritems(expected_target_hashes): self.assertTrue(securesystemslib.formats.RELPATH_SCHEMA.matches(filepath)) self.assertTrue(securesystemslib.formats.HASH_SCHEMA.matches(target_hash)) self.assertEqual(self.repository_updater._get_target_hash(filepath), target_hash) # Test for improperly formatted argument. #self.assertRaises(securesystemslib.exceptions.FormatError, self.repository_updater._get_target_hash, 8) def test_10__hard_check_file_length(self): # Test for exception if file object is not equal to trusted file length. temp_file_object = securesystemslib.util.TempFile() temp_file_object.write(b'X') temp_file_object.seek(0) self.assertRaises(tuf.exceptions.DownloadLengthMismatchError, self.repository_updater._hard_check_file_length, temp_file_object, 10) def test_10__soft_check_file_length(self): # Test for exception if file object is not equal to trusted file length. temp_file_object = securesystemslib.util.TempFile() temp_file_object.write(b'XXX') temp_file_object.seek(0) self.assertRaises(tuf.exceptions.DownloadLengthMismatchError, self.repository_updater._soft_check_file_length, temp_file_object, 1) # Verify that an exception is not raised if the file length <= the observed # file length. temp_file_object.seek(0) self.repository_updater._soft_check_file_length(temp_file_object, 3) temp_file_object.seek(0) self.repository_updater._soft_check_file_length(temp_file_object, 4) def test_10__targets_of_role(self): # Test for non-existent role. self.assertRaises(tuf.exceptions.UnknownRoleError, self.repository_updater._targets_of_role, 'non-existent-role') # Test for role that hasn't been loaded yet. del self.repository_updater.metadata['current']['targets'] self.assertEqual(len(self.repository_updater._targets_of_role('targets', skip_refresh=True)), 0) # 'targets.json' tracks two targets. self.assertEqual(len(self.repository_updater._targets_of_role('targets')), 2) def test_10__preorder_depth_first_walk(self): # Test that infinit loop is prevented if the target file is not found and # the max number of delegations is reached. valid_max_number_of_delegations = tuf.settings.MAX_NUMBER_OF_DELEGATIONS tuf.settings.MAX_NUMBER_OF_DELEGATIONS = 0 self.assertEqual(None, self.repository_updater._preorder_depth_first_walk('unknown.txt')) # Reset the setting for max number of delegations so that subsequent unit # tests reference the expected setting. tuf.settings.MAX_NUMBER_OF_DELEGATIONS = valid_max_number_of_delegations # Attempt to create a circular delegation, where role1 performs a # delegation to the top-level Targets role. The updater should ignore the # delegation and not raise an exception. targets_path = os.path.join(self.client_metadata_current, 'targets.json') targets_metadata = securesystemslib.util.load_json_file(targets_path) targets_metadata['signed']['delegations']['roles'][0]['paths'] = ['/file8.txt'] with open(targets_path, 'wb') as file_object: file_object.write(repo_lib._get_written_metadata(targets_metadata)) role1_path = os.path.join(self.client_metadata_current, 'role1.json') role1_metadata = securesystemslib.util.load_json_file(role1_path) role1_metadata['signed']['delegations']['roles'][0]['name'] = 'targets' role1_metadata['signed']['delegations']['roles'][0]['paths'] = ['/file8.txt'] with open(role1_path, 'wb') as file_object: file_object.write(repo_lib._get_written_metadata(role1_metadata)) role2_path = os.path.join(self.client_metadata_current, 'role2.json') role2_metadata = securesystemslib.util.load_json_file(role2_path) role2_metadata['signed']['delegations']['roles'] = role1_metadata['signed']['delegations']['roles'] role2_metadata['signed']['delegations']['roles'][0]['paths'] = ['/file8.txt'] with open(role2_path, 'wb') as file_object: file_object.write(repo_lib._get_written_metadata(role2_metadata)) logger.debug('attempting circular delegation') self.assertEqual(None, self.repository_updater._preorder_depth_first_walk('/file8.txt')) def test_10__visit_child_role(self): # Call _visit_child_role and test the dict keys: 'paths', # 'path_hash_prefixes', and if both are missing. targets_role = self.repository_updater.metadata['current']['targets'] targets_role['delegations']['roles'][0]['paths'] = ['/*.txt', '/target.exe'] child_role = targets_role['delegations']['roles'][0] role1_path = os.path.join(self.client_metadata_current, 'role1.json') role1_metadata = securesystemslib.util.load_json_file(role1_path) role1_metadata['signed']['delegations']['roles'][0]['paths'] = ['/*.exe'] with open(role1_path, 'wb') as file_object: file_object.write(repo_lib._get_written_metadata(role1_metadata)) self.assertEqual(self.repository_updater._visit_child_role(child_role, '/target.exe'), child_role['name']) # Test for a valid path hash prefix... child_role['path_hash_prefixes'] = ['8baf'] self.assertEqual(self.repository_updater._visit_child_role(child_role, '/file3.txt'), child_role['name']) # ... and an invalid one, as well. child_role['path_hash_prefixes'] = ['badd'] self.assertEqual(self.repository_updater._visit_child_role(child_role, '/file3.txt'), None) # Test for a forbidden target. del child_role['path_hash_prefixes'] self.repository_updater._visit_child_role(child_role, '/forbidden.tgz') # Verify that unequal path_hash_prefixes are skipped. child_role['path_hash_prefixes'] = ['bad', 'bad'] self.assertEqual(None, self.repository_updater._visit_child_role(child_role, '/unknown.exe')) # Test if both 'path' and 'path_hash_prefixes' are missing. del child_role['paths'] del child_role['path_hash_prefixes'] self.assertRaises(securesystemslib.exceptions.FormatError, self.repository_updater._visit_child_role, child_role, child_role['name']) def test_11__verify_uncompressed_metadata_file(self): # Test for invalid metadata content. metadata_file_object = securesystemslib.util.TempFile() metadata_file_object.write(b'X') metadata_file_object.seek(0) self.assertRaises(tuf.exceptions.InvalidMetadataJSONError, self.repository_updater._verify_uncompressed_metadata_file, metadata_file_object, 'root') def test_12__verify_root_chain_link(self): # Test for an invalid signature in the chain link. # current = (i.e., 1.root.json) # next = signable for the next metadata in the chain (i.e., 2.root.json) rolename = 'root' current_root = self.repository_updater.metadata['current']['root'] targets_path = os.path.join(self.repository_directory, 'metadata', 'targets.json') # 'next_invalid_root' is a Targets signable, as written to disk. # We use the Targets metadata here to ensure the signatures are invalid. next_invalid_root = securesystemslib.util.load_json_file(targets_path) self.assertRaises(securesystemslib.exceptions.BadSignatureError, self.repository_updater._verify_root_chain_link, rolename, current_root, next_invalid_root) def test_13__get_file(self): # Test for an "unsafe" download, where the file is downloaded up to # a required length (and no more). The "safe" download approach # downloads an exact required length. targets_path = os.path.join(self.repository_directory, 'metadata', 'targets.json') file_size, file_hashes = securesystemslib.util.get_file_details(targets_path) file_type = 'meta' def verify_target_file(targets_path): # Every target file must have its length and hashes inspected. self.repository_updater._hard_check_file_length(targets_path, file_size) self.repository_updater._check_hashes(targets_path, file_hashes) self.repository_updater._get_file('targets.json', verify_target_file, file_type, file_size, download_safely=True) self.repository_updater._get_file('targets.json', verify_target_file, file_type, file_size, download_safely=False) def test_14__targets_of_role(self): # Test case where a list of targets is given. By default, the 'targets' # parameter is None. targets = [{'filepath': 'file1.txt', 'fileinfo': {'length': 1, 'hashes': {'sha256': 'abc'}}}] self.repository_updater._targets_of_role('targets', targets=targets, skip_refresh=False) class TestMultiRepoUpdater(unittest_toolbox.Modified_TestCase): def setUp(self): # We are inheriting from custom class. unittest_toolbox.Modified_TestCase.setUp(self) self.temporary_directory = tempfile.mkdtemp(dir=os.getcwd()) # Copy the original repository files provided in the test folder so that # any modifications made to repository files are restricted to the copies. # The 'repository_data' directory is expected to exist in 'tuf/tests/'. original_repository_files = os.path.join(os.getcwd(), 'repository_data') self.temporary_repository_root = self.make_temp_directory(directory= self.temporary_directory) # The original repository, keystore, and client directories will be copied # for each test case. original_repository = os.path.join(original_repository_files, 'repository') original_client = os.path.join(original_repository_files, 'client', 'test_repository1') original_keystore = os.path.join(original_repository_files, 'keystore') original_map_file = os.path.join(original_repository_files, 'map.json') # Save references to the often-needed client repository directories. # Test cases need these references to access metadata and target files. self.repository_directory = os.path.join(self.temporary_repository_root, 'repository_server1') self.repository_directory2 = os.path.join(self.temporary_repository_root, 'repository_server2') # Setting 'tuf.settings.repositories_directory' with the temporary client # directory copied from the original repository files. tuf.settings.repositories_directory = self.temporary_repository_root repository_name = 'test_repository1' repository_name2 = 'test_repository2' self.client_directory = os.path.join(self.temporary_repository_root, repository_name) self.client_directory2 = os.path.join(self.temporary_repository_root, repository_name2) self.keystore_directory = os.path.join(self.temporary_repository_root, 'keystore') self.map_file = os.path.join(self.client_directory, 'map.json') self.map_file2 = os.path.join(self.client_directory2, 'map.json') # Copy the original 'repository', 'client', and 'keystore' directories # to the temporary repository the test cases can use. shutil.copytree(original_repository, self.repository_directory) shutil.copytree(original_repository, self.repository_directory2) shutil.copytree(original_client, self.client_directory) shutil.copytree(original_client, self.client_directory2) shutil.copyfile(original_map_file, self.map_file) shutil.copyfile(original_map_file, self.map_file2) shutil.copytree(original_keystore, self.keystore_directory) # Launch a SimpleHTTPServer (serves files in the current directory). # Test cases will request metadata and target files that have been # pre-generated in 'tuf/tests/repository_data', which will be served by the # SimpleHTTPServer launched here. The test cases of this unit test assume # the pre-generated metadata files have a specific structure, such # as a delegated role 'targets/role1', three target files, five key files, # etc. self.SERVER_PORT = 30001 self.SERVER_PORT2 = 30002 command = ['python', '-m', 'tuf.scripts.simple_server', str(self.SERVER_PORT)] command2 = ['python', '-m', 'tuf.scripts.simple_server', str(self.SERVER_PORT2)] self.server_process = subprocess.Popen(command, stderr=subprocess.PIPE, cwd=self.repository_directory) logger.debug('Server process started.') logger.debug('Server process id: ' + str(self.server_process.pid)) logger.debug('Serving on port: ' + str(self.SERVER_PORT)) self.server_process2 = subprocess.Popen(command2, stderr=subprocess.PIPE, cwd=self.repository_directory2) logger.debug('Server process 2 started.') logger.debug('Server 2 process id: ' + str(self.server_process2.pid)) logger.debug('Serving 2 on port: ' + str(self.SERVER_PORT2)) self.url = 'http://localhost:' + str(self.SERVER_PORT) + os.path.sep self.url2 = 'http://localhost:' + str(self.SERVER_PORT2) + os.path.sep # NOTE: Following error is raised if a delay is not long enough to allow # the server process to set up and start listening: # # or, on Windows: # Failed to establish a new connection: [Errno 111] Connection refused' # While 0.3s has consistently worked on Travis and local builds, it led to # occasional failures in AppVeyor builds, so increasing this to 2s, sadly. time.sleep(2) url_prefix = 'http://localhost:' + str(self.SERVER_PORT) url_prefix2 = 'http://localhost:' + str(self.SERVER_PORT2) self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix, 'metadata_path': 'metadata', 'targets_path': 'targets', 'confined_target_dirs': ['']}} self.repository_mirrors2 = {'mirror1': {'url_prefix': url_prefix2, 'metadata_path': 'metadata', 'targets_path': 'targets', 'confined_target_dirs': ['']}} # Create the repository instances. The test cases will use these client # updaters to refresh metadata, fetch target files, etc. self.repository_updater = updater.Updater(repository_name, self.repository_mirrors) self.repository_updater2 = updater.Updater(repository_name2, self.repository_mirrors2) # Creating a repository instance. The test cases will use this client # updater to refresh metadata, fetch target files, etc. self.multi_repo_updater = updater.MultiRepoUpdater(self.map_file) # Metadata role keys are needed by the test cases to make changes to the # repository (e.g., adding a new target file to 'targets.json' and then # requesting a refresh()). self.role_keys = _load_role_keys(self.keystore_directory) def tearDown(self): # Modified_TestCase.tearDown() automatically deletes temporary files and # directories that may have been created during each test case. unittest_toolbox.Modified_TestCase.tearDown(self) # Kill the SimpleHTTPServer process. if self.server_process.returncode is None: logger.info('Server process ' + str(self.server_process.pid) + ' terminated.') self.server_process.kill() if self.server_process2.returncode is None: logger.info('Server 2 process ' + str(self.server_process2.pid) + ' terminated.') self.server_process2.kill() # updater.Updater() populates the roledb with the name "test_repository1" tuf.roledb.clear_roledb(clear_all=True) tuf.keydb.clear_keydb(clear_all=True) # Remove the temporary repository directory, which should contain all the # metadata, targets, and key files generated of all the test cases. sleep # for a bit to allow the kill'd server processes to terminate. time.sleep(.3) shutil.rmtree(self.temporary_directory) # UNIT TESTS. def test__init__(self): # The client's repository requires a metadata directory (and the 'current' # and 'previous' sub-directories), and at least the 'root.json' file. # setUp(), called before each test case, instantiates the required updater # objects and keys. The needed objects/data is available in # 'self.repository_updater', 'self.client_directory', etc. # Test: Invalid arguments. # Invalid 'updater_name' argument. String expected. self.assertRaises(securesystemslib.exceptions.FormatError, updater.MultiRepoUpdater, 8) # Restore 'tuf.settings.repositories_directory' to the original client # directory. tuf.settings.repositories_directory = self.client_directory # Test for a non-existent map file. self.assertRaises(tuf.exceptions.Error, updater.MultiRepoUpdater, 'non-existent.json') # Test for a map file that doesn't contain the required fields. root_filepath = os.path.join( self.repository_directory, 'metadata', 'root.json') self.assertRaises(securesystemslib.exceptions.FormatError, updater.MultiRepoUpdater, root_filepath) # Test for a valid instantiation. map_file = os.path.join(self.client_directory, 'map.json') multi_repo_updater = updater.MultiRepoUpdater(map_file) def test__target_matches_path_pattern(self): map_file = os.path.join(self.client_directory, 'map.json') multi_repo_updater = updater.MultiRepoUpdater(map_file) paths = ['foo*.tgz', 'bar*.tgz', 'file1.txt'] self.assertTrue( multi_repo_updater._target_matches_path_pattern('bar-1.0.tgz', paths)) self.assertTrue( multi_repo_updater._target_matches_path_pattern('file1.txt', paths)) self.assertFalse( multi_repo_updater._target_matches_path_pattern('baz-1.0.tgz', paths)) def test_get_valid_targetinfo(self): map_file = os.path.join(self.client_directory, 'map.json') multi_repo_updater = updater.MultiRepoUpdater(map_file) # Verify the multi repo updater refuses to save targetinfo if # required local repositories are missing. repo_dir = os.path.join(tuf.settings.repositories_directory, 'test_repository1') backup_repo_dir = os.path.join(tuf.settings.repositories_directory, 'test_repository1.backup') shutil.move(repo_dir, backup_repo_dir) self.assertRaises(tuf.exceptions.Error, multi_repo_updater.get_valid_targetinfo, 'file3.txt') # Restore the client's repository directory. shutil.move(backup_repo_dir, repo_dir) # Verify that the Root file must exist. root_filepath = os.path.join(repo_dir, 'metadata', 'current', 'root.json') backup_root_filepath = os.path.join(root_filepath, root_filepath + '.backup') shutil.move(root_filepath, backup_root_filepath) self.assertRaises(tuf.exceptions.Error, multi_repo_updater.get_valid_targetinfo, 'file3.txt') # Restore the Root file. shutil.move(backup_root_filepath, root_filepath) # Test that the first mapping is skipped if it's irrelevant to the target # file. self.assertRaises(tuf.exceptions.UnknownTargetError, multi_repo_updater.get_valid_targetinfo, 'non-existent.txt') # Verify that a targetinfo is not returned for a non-existent target. multi_repo_updater.map_file['mapping'][1]['terminating'] = False self.assertRaises(tuf.exceptions.UnknownTargetError, multi_repo_updater.get_valid_targetinfo, 'non-existent.txt') multi_repo_updater.map_file['mapping'][1]['terminating'] = True # Test for a mapping that sets terminating = True, and that appears before # the final mapping. multi_repo_updater.map_file['mapping'][0]['terminating'] = True self.assertRaises(tuf.exceptions.UnknownTargetError, multi_repo_updater.get_valid_targetinfo, 'bad3.txt') multi_repo_updater.map_file['mapping'][0]['terminating'] = False # Test for the case where multiple repos sign for the same target. valid_targetinfo = multi_repo_updater.get_valid_targetinfo('file1.txt') multi_repo_updater.map_file['mapping'][0]['threshold'] = 2 valid_targetinfo = multi_repo_updater.get_valid_targetinfo('file1.txt') # Verify that valid targetinfo is matched for two repositories that provide # different custom field. Make sure to set the 'match_custom_field' # argument to 'False' when calling get_valid_targetinfo(). repository = repo_tool.load_repository(self.repository_directory2) target1 = os.path.join(self.repository_directory2, 'targets', 'file1.txt') repository.targets.remove_target(os.path.basename(target1)) custom_field = {"custom": "my_custom_data"} repository.targets.add_target(target1, custom_field) repository.targets.load_signing_key(self.role_keys['targets']['private']) repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) repository.writeall() # Move the staged metadata to the "live" metadata. shutil.rmtree(os.path.join(self.repository_directory2, 'metadata')) shutil.copytree(os.path.join(self.repository_directory2, 'metadata.staged'), os.path.join(self.repository_directory2, 'metadata')) # Do we get the expected match for the two targetinfo that only differ # by the custom field? valid_targetinfo = multi_repo_updater.get_valid_targetinfo( 'file1.txt', match_custom_field=False) # Verify the case where two repositories provide different targetinfo. # Modify file1.txt so that different length and hashes are reported by the # two repositories. repository = repo_tool.load_repository(self.repository_directory2) target1 = os.path.join(self.repository_directory2, 'targets', 'file1.txt') with open(target1, 'ab') as file_object: file_object.write(b'append extra text') repository.targets.remove_target(os.path.basename(target1)) repository.targets.add_target(target1) repository.targets.load_signing_key(self.role_keys['targets']['private']) repository.snapshot.load_signing_key(self.role_keys['snapshot']['private']) repository.timestamp.load_signing_key(self.role_keys['timestamp']['private']) repository.writeall() # Move the staged metadata to the "live" metadata. shutil.rmtree(os.path.join(self.repository_directory2, 'metadata')) shutil.copytree(os.path.join(self.repository_directory2, 'metadata.staged'), os.path.join(self.repository_directory2, 'metadata')) # Ensure the threshold is modified to 2 (assumed to be 1, by default) and # verify that get_valid_targetinfo() raises an UnknownTargetError # despite both repos signing for file1.txt. multi_repo_updater.map_file['mapping'][0]['threshold'] = 2 self.assertRaises(tuf.exceptions.UnknownTargetError, multi_repo_updater.get_valid_targetinfo, 'file1.txt') def test_get_updater(self): map_file = os.path.join(self.client_directory, 'map.json') multi_repo_updater = updater.MultiRepoUpdater(map_file) # Test for a non-existent repository name. self.assertEqual(None, multi_repo_updater.get_updater('bad_repo_name')) # Test get_updater indirectly via the "private" _update_from_repository(). self.assertRaises(tuf.exceptions.Error, multi_repo_updater._update_from_repository, 'bad_repo_name', 'file3.txt') # Test for a repository that doesn't exist. multi_repo_updater.map_file['repositories']['bad_repo_name'] = ['https://bogus:30002'] self.assertEqual(None, multi_repo_updater.get_updater('bad_repo_name')) def _load_role_keys(keystore_directory): # Populating 'self.role_keys' by importing the required public and private # keys of 'tuf/tests/repository_data/'. The role keys are needed when # modifying the remote repository used by the test cases in this unit test. # The pre-generated key files in 'repository_data/keystore' are all encrypted with # a 'password' passphrase. EXPECTED_KEYFILE_PASSWORD = 'password' # Store and return the cryptography keys of the top-level roles, including 1 # delegated role. role_keys = {} root_key_file = os.path.join(keystore_directory, 'root_key') targets_key_file = os.path.join(keystore_directory, 'targets_key') snapshot_key_file = os.path.join(keystore_directory, 'snapshot_key') timestamp_key_file = os.path.join(keystore_directory, 'timestamp_key') delegation_key_file = os.path.join(keystore_directory, 'delegation_key') role_keys = {'root': {}, 'targets': {}, 'snapshot': {}, 'timestamp': {}, 'role1': {}} # Import the top-level and delegated role public keys. role_keys['root']['public'] = \ repo_tool.import_rsa_publickey_from_file(root_key_file+'.pub') role_keys['targets']['public'] = \ repo_tool.import_ed25519_publickey_from_file(targets_key_file+'.pub') role_keys['snapshot']['public'] = \ repo_tool.import_ed25519_publickey_from_file(snapshot_key_file+'.pub') role_keys['timestamp']['public'] = \ repo_tool.import_ed25519_publickey_from_file(timestamp_key_file+'.pub') role_keys['role1']['public'] = \ repo_tool.import_ed25519_publickey_from_file(delegation_key_file+'.pub') # Import the private keys of the top-level and delegated roles. role_keys['root']['private'] = \ repo_tool.import_rsa_privatekey_from_file(root_key_file, EXPECTED_KEYFILE_PASSWORD) role_keys['targets']['private'] = \ repo_tool.import_ed25519_privatekey_from_file(targets_key_file, EXPECTED_KEYFILE_PASSWORD) role_keys['snapshot']['private'] = \ repo_tool.import_ed25519_privatekey_from_file(snapshot_key_file, EXPECTED_KEYFILE_PASSWORD) role_keys['timestamp']['private'] = \ repo_tool.import_ed25519_privatekey_from_file(timestamp_key_file, EXPECTED_KEYFILE_PASSWORD) role_keys['role1']['private'] = \ repo_tool.import_ed25519_privatekey_from_file(delegation_key_file, EXPECTED_KEYFILE_PASSWORD) return role_keys if __name__ == '__main__': unittest.main()