python-tuf/tests/test_repository_lib.py
Joshua Lock 5e5c598769 Support abstract storage for timestamp metadata
This was erroneously absent in PR 1024, which added support for abstract
files and directories. Resolve by adding a storage_backend argument to
generate_timestamp_metadata() and using it so that the fileinfo (hashes
and length) for the snapshot file can be generated for a snapshot
metadata file on any supported storage.

Signed-off-by: Joshua Lock <jlock@vmware.com>
2020-06-03 14:16:47 +01:00

885 lines
39 KiB
Python
Executable file

#!/usr/bin/env python
# Copyright 2014 - 2017, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""
<Program Name>
test_repository_lib.py
<Author>
Vladimir Diaz <vladimir.v.diaz@gmail.com>
<Started>
June 1, 2014.
<Copyright>
See LICENSE-MIT OR LICENSE for licensing information.
<Purpose>
Unit test for 'repository_lib.py'.
"""
# Help with Python 3 compatibility, where the print statement is a function, an
# implicit relative import is invalid, and the '/' operator performs true
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import os
import time
import datetime
import logging
import tempfile
import json
import shutil
import stat
import sys
import unittest
import platform
import tuf
import tuf.formats
import tuf.log
import tuf.formats
import tuf.roledb
import tuf.keydb
import tuf.settings
import tuf.repository_lib as repo_lib
import tuf.repository_tool as repo_tool
import securesystemslib
import securesystemslib.exceptions
import securesystemslib.rsa_keys
import securesystemslib.interface
import securesystemslib.storage
import six
logger = logging.getLogger(__name__)
repo_lib.disable_console_log_messages()
class TestRepositoryToolFunctions(unittest.TestCase):
@classmethod
def setUpClass(cls):
# setUpClass() is called before tests in an individual class are executed.
# Create a temporary directory to store the repository, metadata, and target
# files. 'temporary_directory' must be deleted in TearDownClass() so that
# temporary files are always removed, even when exceptions occur.
tuf.roledb.clear_roledb(clear_all=True)
tuf.keydb.clear_keydb(clear_all=True)
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
@classmethod
def tearDownClass(cls):
# tearDownModule() is called after all the tests have run.
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
# Remove the temporary repository directory, which should contain all the
# metadata, targets, and key files generated for the test cases.
tuf.roledb.clear_roledb(clear_all=True)
tuf.keydb.clear_keydb(clear_all=True)
shutil.rmtree(cls.temporary_directory)
def setUp(self):
tuf.roledb.create_roledb('test_repository')
tuf.keydb.create_keydb('test_repository')
def tearDown(self):
tuf.roledb.clear_roledb(clear_all=True)
tuf.keydb.clear_keydb(clear_all=True)
def test_import_rsa_privatekey_from_file(self):
# Test normal case.
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
# Load one of the pre-generated key files from 'tuf/tests/repository_data'.
# 'password' unlocks the pre-generated key files.
key_filepath = os.path.join('repository_data', 'keystore',
'root_key')
self.assertTrue(os.path.exists(key_filepath))
imported_rsa_key = repo_lib.import_rsa_privatekey_from_file(key_filepath,
'password')
self.assertTrue(securesystemslib.formats.RSAKEY_SCHEMA.matches(imported_rsa_key))
# Test improperly formatted argument.
self.assertRaises(securesystemslib.exceptions.FormatError,
repo_lib.import_rsa_privatekey_from_file, 3, 'pw')
# Test invalid argument.
# Non-existent key file.
nonexistent_keypath = os.path.join(temporary_directory,
'nonexistent_keypath')
self.assertRaises(securesystemslib.exceptions.StorageError,
repo_lib.import_rsa_privatekey_from_file,
nonexistent_keypath, 'pw')
# Invalid key file argument.
invalid_keyfile = os.path.join(temporary_directory, 'invalid_keyfile')
with open(invalid_keyfile, 'wb') as file_object:
file_object.write(b'bad keyfile')
self.assertRaises(securesystemslib.exceptions.CryptoError, repo_lib.import_rsa_privatekey_from_file,
invalid_keyfile, 'pw')
def test_import_ed25519_privatekey_from_file(self):
# Test normal case.
# Generate ed25519 keys that can be imported.
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
ed25519_keypath = os.path.join(temporary_directory, 'ed25519_key')
securesystemslib.interface.generate_and_write_ed25519_keypair(
ed25519_keypath, password='pw')
imported_ed25519_key = \
repo_lib.import_ed25519_privatekey_from_file(ed25519_keypath, 'pw')
self.assertTrue(securesystemslib.formats.ED25519KEY_SCHEMA.matches(imported_ed25519_key))
# Test improperly formatted argument.
self.assertRaises(securesystemslib.exceptions.FormatError,
repo_lib.import_ed25519_privatekey_from_file, 3, 'pw')
# Test invalid argument.
# Non-existent key file.
nonexistent_keypath = os.path.join(temporary_directory,
'nonexistent_keypath')
self.assertRaises(securesystemslib.exceptions.StorageError,
repo_lib.import_ed25519_privatekey_from_file,
nonexistent_keypath, 'pw')
# Invalid key file argument.
invalid_keyfile = os.path.join(temporary_directory, 'invalid_keyfile')
with open(invalid_keyfile, 'wb') as file_object:
file_object.write(b'bad keyfile')
self.assertRaises(securesystemslib.exceptions.Error,
repo_lib.import_ed25519_privatekey_from_file, invalid_keyfile, 'pw')
# Invalid private key imported (contains unexpected keytype.)
imported_ed25519_key['keytype'] = 'invalid_keytype'
# Use 'rsa_keys.py' to bypass the key format validation performed by
# 'keys.py'.
salt, iterations, derived_key = \
securesystemslib.rsa_keys._generate_derived_key('pw')
# Store the derived key info in a dictionary, the object expected
# by the non-public _encrypt() routine.
derived_key_information = {'salt': salt, 'iterations': iterations,
'derived_key': derived_key}
# Convert the key object to json string format and encrypt it with the
# derived key.
encrypted_key = securesystemslib.rsa_keys._encrypt(
json.dumps(imported_ed25519_key), derived_key_information)
with open(ed25519_keypath, 'wb') as file_object:
file_object.write(encrypted_key.encode('utf-8'))
self.assertRaises(securesystemslib.exceptions.FormatError,
repo_lib.import_ed25519_privatekey_from_file, ed25519_keypath, 'pw')
def test_get_metadata_filenames(self):
# Test normal case.
metadata_directory = os.path.join('metadata/')
filenames = {'root.json': metadata_directory + 'root.json',
'targets.json': metadata_directory + 'targets.json',
'snapshot.json': metadata_directory + 'snapshot.json',
'timestamp.json': metadata_directory + 'timestamp.json'}
self.assertEqual(filenames, repo_lib.get_metadata_filenames('metadata/'))
# If a directory argument is not specified, the current working directory
# is used.
metadata_directory = os.getcwd()
filenames = {'root.json': os.path.join(metadata_directory, 'root.json'),
'targets.json': os.path.join(metadata_directory, 'targets.json'),
'snapshot.json': os.path.join(metadata_directory, 'snapshot.json'),
'timestamp.json': os.path.join(metadata_directory, 'timestamp.json')}
self.assertEqual(filenames, repo_lib.get_metadata_filenames(metadata_directory))
# Test improperly formatted argument.
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.get_metadata_filenames, 3)
def test_get_metadata_fileinfo(self):
# Test normal case.
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
test_filepath = os.path.join(temporary_directory, 'file.txt')
with open(test_filepath, 'wt') as file_object:
file_object.write('test file')
# Generate test fileinfo object. It is assumed SHA256 and SHA512 hashes
# are computed by get_metadata_fileinfo().
file_length = os.path.getsize(test_filepath)
sha256_digest_object = securesystemslib.hash.digest_filename(test_filepath)
sha512_digest_object = securesystemslib.hash.digest_filename(test_filepath, algorithm='sha512')
file_hashes = {'sha256': sha256_digest_object.hexdigest(),
'sha512': sha512_digest_object.hexdigest()}
fileinfo = {'length': file_length, 'hashes': file_hashes}
self.assertTrue(tuf.formats.FILEINFO_SCHEMA.matches(fileinfo))
storage_backend = securesystemslib.storage.FilesystemBackend()
self.assertEqual(fileinfo, repo_lib.get_metadata_fileinfo(test_filepath,
storage_backend))
# Test improperly formatted argument.
self.assertRaises(securesystemslib.exceptions.FormatError,
repo_lib.get_metadata_fileinfo, 3,
storage_backend)
# Test non-existent file.
nonexistent_filepath = os.path.join(temporary_directory, 'oops.txt')
self.assertRaises(securesystemslib.exceptions.Error,
repo_lib.get_metadata_fileinfo,
nonexistent_filepath, storage_backend)
def test_get_target_hash(self):
# Test normal case.
expected_target_hashes = {
'/file1.txt': 'e3a3d89eb3b70ce3fbce6017d7b8c12d4abd5635427a0e8a238f53157df85b3d',
'/README.txt': '8faee106f1bb69f34aaf1df1e3c2e87d763c4d878cb96b91db13495e32ceb0b0',
'/packages/file2.txt': 'c9c4a5cdd84858dd6a23d98d7e6e6b2aec45034946c16b2200bc317c75415e92'
}
for filepath, target_hash in six.iteritems(expected_target_hashes):
self.assertTrue(tuf.formats.RELPATH_SCHEMA.matches(filepath))
self.assertTrue(securesystemslib.formats.HASH_SCHEMA.matches(target_hash))
self.assertEqual(repo_lib.get_target_hash(filepath), target_hash)
# Test for improperly formatted argument.
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.get_target_hash, 8)
def test_generate_root_metadata(self):
# Test normal case.
# Load the root metadata provided in 'tuf/tests/repository_data/'.
root_filepath = os.path.join('repository_data', 'repository',
'metadata', 'root.json')
root_signable = securesystemslib.util.load_json_file(root_filepath)
# generate_root_metadata() expects the top-level roles and keys to be
# available in 'tuf.keydb' and 'tuf.roledb'.
tuf.roledb.create_roledb_from_root_metadata(root_signable['signed'])
tuf.keydb.create_keydb_from_root_metadata(root_signable['signed'])
expires = '1985-10-21T01:22:00Z'
root_metadata = repo_lib.generate_root_metadata(1, expires,
consistent_snapshot=False)
self.assertTrue(tuf.formats.ROOT_SCHEMA.matches(root_metadata))
root_keyids = tuf.roledb.get_role_keyids('root')
tuf.keydb._keydb_dict['default'][root_keyids[0]]['keytype'] = 'bad_keytype'
self.assertRaises(securesystemslib.exceptions.Error, repo_lib.generate_root_metadata, 1,
expires, consistent_snapshot=False)
# Reset the root key's keytype, so that we can next verify that a different
# securesystemslib.exceptions.Error exception is raised for duplicate keyids.
tuf.keydb._keydb_dict['default'][root_keyids[0]]['keytype'] = 'rsa'
# Add duplicate keyid to root's roleinfo.
tuf.roledb._roledb_dict['default']['root']['keyids'].append(root_keyids[0])
self.assertRaises(securesystemslib.exceptions.Error, repo_lib.generate_root_metadata, 1,
expires, consistent_snapshot=False)
# Test improperly formatted arguments.
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_root_metadata,
'3', expires, False)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_root_metadata,
1, '3', False)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_root_metadata,
1, expires, 3)
# Test for missing required roles and keys.
tuf.roledb.clear_roledb()
tuf.keydb.clear_keydb()
self.assertRaises(securesystemslib.exceptions.Error, repo_lib.generate_root_metadata,
1, expires, False)
def test_generate_targets_metadata(self):
# Test normal case.
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
targets_directory = os.path.join(temporary_directory, 'targets')
file1_path = os.path.join(targets_directory, 'file.txt')
securesystemslib.util.ensure_parent_dir(file1_path)
with open(file1_path, 'wt') as file_object:
file_object.write('test file.')
# Set valid generate_targets_metadata() arguments. Add a custom field for
# the 'target_files' target set below.
version = 1
datetime_object = datetime.datetime(2030, 1, 1, 12, 0)
expiration_date = datetime_object.isoformat() + 'Z'
file_permissions = oct(os.stat(file1_path).st_mode)[4:]
target_files = {'file.txt': {'custom': {'file_permission': file_permissions}}}
delegations = {"keys": {
"a394c28384648328b16731f81440d72243c77bb44c07c040be99347f0df7d7bf": {
"keytype": "ed25519",
"keyval": {
"public": "3eb81026ded5af2c61fb3d4b272ac53cd1049a810ee88f4df1fc35cdaf918157"
}
}
},
"roles": [
{
"keyids": [
"a394c28384648328b16731f81440d72243c77bb44c07c040be99347f0df7d7bf"
],
"name": "targets/warehouse",
"paths": [
"/file1.txt", "/README.txt", '/warehouse/'
],
"threshold": 1
}
]
}
targets_metadata = \
repo_lib.generate_targets_metadata(targets_directory, target_files,
version, expiration_date, delegations,
False)
self.assertTrue(tuf.formats.TARGETS_SCHEMA.matches(targets_metadata))
# Valid arguments with 'delegations' set to None.
targets_metadata = \
repo_lib.generate_targets_metadata(targets_directory, target_files,
version, expiration_date, None,
False)
self.assertTrue(tuf.formats.TARGETS_SCHEMA.matches(targets_metadata))
# Verify that 'digest.filename' file is saved to 'targets_directory' if
# the 'write_consistent_targets' argument is True.
list_targets_directory = os.listdir(targets_directory)
targets_metadata = \
repo_lib.generate_targets_metadata(targets_directory, target_files,
version, expiration_date, delegations,
write_consistent_targets=True)
new_list_targets_directory = os.listdir(targets_directory)
# Verify that 'targets_directory' contains only one extra item.
self.assertTrue(len(list_targets_directory) + 1,
len(new_list_targets_directory))
# Verify that an exception is not raised if the target files already exist.
repo_lib.generate_targets_metadata(targets_directory, target_files,
version, expiration_date, delegations,
write_consistent_targets=True)
# Verify that 'targets_metadata' contains a 'custom' entry (optional)
# for 'file.txt'.
self.assertTrue('custom' in targets_metadata['targets']['file.txt'])
# Test improperly formatted arguments.
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_targets_metadata,
3, target_files, version, expiration_date)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_targets_metadata,
targets_directory, 3, version, expiration_date)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_targets_metadata,
targets_directory, target_files, '3', expiration_date)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_targets_metadata,
targets_directory, target_files, version, '3')
# Improperly formatted 'delegations' and 'write_consistent_targets'
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_targets_metadata,
targets_directory, target_files, version, expiration_date,
3, False)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_targets_metadata,
targets_directory, target_files, version, expiration_date,
delegations, 3)
# Test non-existent target file.
bad_target_file = \
{'non-existent.txt': {'file_permission': file_permissions}}
self.assertRaises(securesystemslib.exceptions.Error, repo_lib.generate_targets_metadata,
targets_directory, bad_target_file, version,
expiration_date)
def test_generate_snapshot_metadata(self):
# Test normal case.
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
original_repository_path = os.path.join('repository_data',
'repository')
repository_directory = os.path.join(temporary_directory, 'repository')
shutil.copytree(original_repository_path, repository_directory)
metadata_directory = os.path.join(repository_directory,
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
targets_directory = os.path.join(repository_directory, repo_lib.TARGETS_DIRECTORY_NAME)
targets_filename = os.path.join(metadata_directory,
repo_lib.TARGETS_FILENAME)
version = 1
expiration_date = '1985-10-21T13:20:00Z'
# Load a valid repository so that top-level roles exist in roledb and
# generate_snapshot_metadata() has roles to specify in snapshot metadata.
storage_backend = securesystemslib.storage.FilesystemBackend()
repository = repo_tool.Repository(repository_directory, metadata_directory,
targets_directory, storage_backend)
repository_junk = repo_tool.load_repository(repository_directory)
# For testing purposes, store an invalid metadata file in the metadata directory
# to verify that it isn't loaded by generate_snapshot_metadata(). Unknown
# metadata file extensions should be ignored.
invalid_metadata_file = os.path.join(metadata_directory, 'role_file.xml')
with open(invalid_metadata_file, 'w') as file_object:
file_object.write('bad extension on metadata file')
targets_filename = 'targets'
snapshot_metadata = \
repo_lib.generate_snapshot_metadata(metadata_directory, version,
expiration_date,
targets_filename,
storage_backend,
consistent_snapshot=False)
self.assertTrue(tuf.formats.SNAPSHOT_SCHEMA.matches(snapshot_metadata))
# Test improperly formatted arguments.
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
3, version, expiration_date,
targets_filename, consistent_snapshot=False, storage_backend=storage_backend)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
metadata_directory, '3', expiration_date,
targets_filename, storage_backend, consistent_snapshot=False)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
metadata_directory, version, '3',
targets_filename, storage_backend, consistent_snapshot=False)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
metadata_directory, version, expiration_date,
3, storage_backend, consistent_snapshot=False)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
metadata_directory, version, expiration_date,
targets_filename, 3, storage_backend)
def test_generate_timestamp_metadata(self):
# Test normal case.
repository_name = 'test_repository'
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
original_repository_path = os.path.join('repository_data',
'repository')
repository_directory = os.path.join(temporary_directory, 'repository')
shutil.copytree(original_repository_path, repository_directory)
metadata_directory = os.path.join(repository_directory,
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
targets_directory = os.path.join(repository_directory, repo_lib.TARGETS_DIRECTORY_NAME)
snapshot_filename = os.path.join(metadata_directory,
repo_lib.SNAPSHOT_FILENAME)
# Set valid generate_timestamp_metadata() arguments.
version = 1
expiration_date = '1985-10-21T13:20:00Z'
storage_backend = securesystemslib.storage.FilesystemBackend()
# Load a valid repository so that top-level roles exist in roledb and
# generate_snapshot_metadata() has roles to specify in snapshot metadata.
repository = repo_tool.Repository(repository_directory, metadata_directory,
targets_directory, repository_name)
repository_junk = repo_tool.load_repository(repository_directory,
repository_name)
timestamp_metadata = repo_lib.generate_timestamp_metadata(snapshot_filename,
version, expiration_date, storage_backend, repository_name)
self.assertTrue(tuf.formats.TIMESTAMP_SCHEMA.matches(timestamp_metadata))
# Test improperly formatted arguments.
self.assertRaises(securesystemslib.exceptions.FormatError,
repo_lib.generate_timestamp_metadata, 3, version, expiration_date,
storage_backend, repository_name)
self.assertRaises(securesystemslib.exceptions.FormatError,
repo_lib.generate_timestamp_metadata, snapshot_filename, '3',
expiration_date, storage_backend, repository_name)
self.assertRaises(securesystemslib.exceptions.FormatError,
repo_lib.generate_timestamp_metadata, snapshot_filename, version, '3',
storage_backend, repository_name)
def test_sign_metadata(self):
# Test normal case.
repository_name = 'test_repository'
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
metadata_path = os.path.join('repository_data', 'repository', 'metadata')
keystore_path = os.path.join('repository_data', 'keystore')
root_filename = os.path.join(metadata_path, 'root.json')
root_metadata = securesystemslib.util.load_json_file(root_filename)['signed']
targets_filename = os.path.join(metadata_path, 'targets.json')
targets_metadata = securesystemslib.util.load_json_file(targets_filename)['signed']
tuf.keydb.create_keydb_from_root_metadata(root_metadata, repository_name)
tuf.roledb.create_roledb_from_root_metadata(root_metadata, repository_name)
root_keyids = tuf.roledb.get_role_keyids('root', repository_name)
targets_keyids = tuf.roledb.get_role_keyids('targets', repository_name)
root_private_keypath = os.path.join(keystore_path, 'root_key')
root_private_key = repo_lib.import_rsa_privatekey_from_file(root_private_keypath,
'password')
# Sign with a valid, but not a threshold, key.
targets_public_keypath = os.path.join(keystore_path, 'targets_key.pub')
targets_public_key = securesystemslib.interface.\
import_ed25519_publickey_from_file(targets_public_keypath)
# sign_metadata() expects the private key 'root_metadata' to be in
# 'tuf.keydb'. Remove any public keys that may be loaded before
# adding private key, otherwise a 'tuf.KeyAlreadyExists' exception is
# raised.
tuf.keydb.remove_key(root_private_key['keyid'],
repository_name=repository_name)
tuf.keydb.add_key(root_private_key, repository_name=repository_name)
tuf.keydb.remove_key(targets_public_key['keyid'], repository_name=repository_name)
tuf.keydb.add_key(targets_public_key, repository_name=repository_name)
# Verify that a valid root signable is generated.
root_signable = repo_lib.sign_metadata(root_metadata, root_keyids,
root_filename, repository_name)
self.assertTrue(tuf.formats.SIGNABLE_SCHEMA.matches(root_signable))
# Test for an unset private key (in this case, target's).
repo_lib.sign_metadata(targets_metadata, targets_keyids, targets_filename,
repository_name)
# Add an invalid keytype to one of the root keys.
root_keyid = root_keyids[0]
tuf.keydb._keydb_dict[repository_name][root_keyid]['keytype'] = 'bad_keytype'
self.assertRaises(securesystemslib.exceptions.Error, repo_lib.sign_metadata,
root_metadata, root_keyids, root_filename, repository_name)
# Test improperly formatted arguments.
self.assertRaises(securesystemslib.exceptions.FormatError,
repo_lib.sign_metadata, 3, root_keyids, 'root.json', repository_name)
self.assertRaises(securesystemslib.exceptions.FormatError,
repo_lib.sign_metadata, root_metadata, 3, 'root.json', repository_name)
self.assertRaises(securesystemslib.exceptions.FormatError,
repo_lib.sign_metadata, root_metadata, root_keyids, 3, repository_name)
def test_write_metadata_file(self):
# Test normal case.
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
metadata_directory = os.path.join('repository_data', 'repository', 'metadata')
root_filename = os.path.join(metadata_directory, 'root.json')
root_signable = securesystemslib.util.load_json_file(root_filename)
output_filename = os.path.join(temporary_directory, 'root.json')
version_number = root_signable['signed']['version'] + 1
self.assertFalse(os.path.exists(output_filename))
storage_backend = securesystemslib.storage.FilesystemBackend()
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
consistent_snapshot=False, storage_backend=storage_backend)
self.assertTrue(os.path.exists(output_filename))
# Attempt to over-write the previously written metadata file. An exception
# is not raised in this case, only a debug message is logged.
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
consistent_snapshot=False, storage_backend=storage_backend)
# Test improperly formatted arguments.
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
3, output_filename, version_number, False, storage_backend)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
root_signable, 3, version_number, False, storage_backend)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
root_signable, output_filename, '3', False, storage_backend)
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
root_signable, output_filename, storage_backend, version_number, 3)
def test_create_tuf_client_directory(self):
# Test normal case.
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
repository_directory = os.path.join('repository_data', 'repository')
client_directory = os.path.join(temporary_directory, 'client')
repo_lib.create_tuf_client_directory(repository_directory, client_directory)
self.assertTrue(os.path.exists(client_directory))
metadata_directory = os.path.join(client_directory, 'metadata')
current_directory = os.path.join(metadata_directory, 'current')
previous_directory = os.path.join(metadata_directory, 'previous')
self.assertTrue(os.path.exists(client_directory))
self.assertTrue(os.path.exists(metadata_directory))
self.assertTrue(os.path.exists(current_directory))
self.assertTrue(os.path.exists(previous_directory))
# Test improperly formatted arguments.
self.assertRaises(securesystemslib.exceptions.FormatError,
repo_lib.create_tuf_client_directory, 3, client_directory)
self.assertRaises(securesystemslib.exceptions.FormatError,
repo_lib.create_tuf_client_directory, repository_directory, 3)
# Test invalid argument (i.e., client directory already exists.)
self.assertRaises(tuf.exceptions.RepositoryError,
repo_lib.create_tuf_client_directory, repository_directory,
client_directory)
# Test invalid client metadata directory (i.e., non-errno.EEXIST exceptions
# should be re-raised.)
shutil.rmtree(metadata_directory)
# Save the original metadata directory name so that it can be restored
# after testing.
metadata_directory_name = repo_lib.METADATA_DIRECTORY_NAME
repo_lib.METADATA_DIRECTORY_NAME = '/'
# Creation of the '/' directory is forbidden on all supported OSs. The '/'
# argument to create_tuf_client_directory should cause it to re-raise a
# non-errno.EEXIST exception.
self.assertRaises((OSError, tuf.exceptions.RepositoryError),
repo_lib.create_tuf_client_directory, repository_directory, '/')
# Restore the metadata directory name in repo_lib.
repo_lib.METADATA_DIRECTORY_NAME = metadata_directory_name
def test__generate_and_write_metadata(self):
# Test for invalid, or unsupported, rolename.
# Load the root metadata provided in 'tuf/tests/repository_data/'.
repository_name = 'repository_name'
root_filepath = os.path.join('repository_data', 'repository',
'metadata', 'root.json')
root_signable = securesystemslib.util.load_json_file(root_filepath)
# _generate_and_write_metadata() expects the top-level roles
# (specifically 'snapshot') and keys to be available in 'tuf.roledb'.
tuf.roledb.create_roledb_from_root_metadata(root_signable['signed'],
repository_name)
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
targets_directory = os.path.join(temporary_directory, 'targets')
os.mkdir(targets_directory)
repository_directory = os.path.join(temporary_directory, 'repository')
metadata_directory = os.path.join(repository_directory,
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
targets_metadata = os.path.join('repository_data', 'repository', 'metadata',
'targets.json')
obsolete_metadata = os.path.join(metadata_directory, 'obsolete_role.json')
securesystemslib.util.ensure_parent_dir(obsolete_metadata)
shutil.copyfile(targets_metadata, obsolete_metadata)
# Verify that obsolete metadata (a metadata file exists on disk, but the
# role is unavailable in 'tuf.roledb'). First add the obsolete
# role to 'tuf.roledb' so that its metadata file can be written to disk.
targets_roleinfo = tuf.roledb.get_roleinfo('targets', repository_name)
targets_roleinfo['version'] = 1
expiration = \
tuf.formats.unix_timestamp_to_datetime(int(time.time() + 86400))
expiration = expiration.isoformat() + 'Z'
targets_roleinfo['expires'] = expiration
tuf.roledb.add_role('obsolete_role', targets_roleinfo,
repository_name=repository_name)
storage_backend = securesystemslib.storage.FilesystemBackend()
repo_lib._generate_and_write_metadata('obsolete_role', obsolete_metadata,
targets_directory, metadata_directory, storage_backend,
consistent_snapshot=False, filenames=None,
repository_name=repository_name)
snapshot_filepath = os.path.join('repository_data', 'repository',
'metadata', 'snapshot.json')
snapshot_signable = securesystemslib.util.load_json_file(snapshot_filepath)
tuf.roledb.remove_role('obsolete_role', repository_name)
self.assertTrue(os.path.exists(os.path.join(metadata_directory,
'obsolete_role.json')))
tuf.repository_lib._delete_obsolete_metadata(metadata_directory,
snapshot_signable['signed'], False, repository_name,
storage_backend)
self.assertFalse(os.path.exists(metadata_directory + 'obsolete_role.json'))
shutil.copyfile(targets_metadata, obsolete_metadata)
def test__delete_obsolete_metadata(self):
repository_name = 'test_repository'
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
repository_directory = os.path.join(temporary_directory, 'repository')
metadata_directory = os.path.join(repository_directory,
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
os.makedirs(metadata_directory)
snapshot_filepath = os.path.join('repository_data', 'repository',
'metadata', 'snapshot.json')
snapshot_signable = securesystemslib.util.load_json_file(snapshot_filepath)
storage_backend = securesystemslib.storage.FilesystemBackend()
# Create role metadata that should not exist in snapshot.json.
role1_filepath = os.path.join('repository_data', 'repository', 'metadata',
'role1.json')
shutil.copyfile(role1_filepath, os.path.join(metadata_directory, 'role2.json'))
repo_lib._delete_obsolete_metadata(metadata_directory,
snapshot_signable['signed'], True, repository_name, storage_backend)
# _delete_obsolete_metadata should never delete root.json.
root_filepath = os.path.join('repository_data', 'repository', 'metadata',
'root.json')
shutil.copyfile(root_filepath, os.path.join(metadata_directory, 'root.json'))
repo_lib._delete_obsolete_metadata(metadata_directory,
snapshot_signable['signed'], True, repository_name, storage_backend)
self.assertTrue(os.path.exists(os.path.join(metadata_directory, 'root.json')))
# Verify what happens for a non-existent metadata directory (a debug
# message is logged).
self.assertRaises(securesystemslib.exceptions.StorageError,
repo_lib._delete_obsolete_metadata, 'non-existent',
snapshot_signable['signed'], True, repository_name, storage_backend)
def test__load_top_level_metadata(self):
repository_name = 'test_repository'
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
repository_directory = os.path.join(temporary_directory, 'repository')
metadata_directory = os.path.join(repository_directory,
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
targets_directory = os.path.join(repository_directory,
repo_lib.TARGETS_DIRECTORY_NAME)
shutil.copytree(os.path.join('repository_data', 'repository', 'metadata'),
metadata_directory)
shutil.copytree(os.path.join('repository_data', 'repository', 'targets'),
targets_directory)
# Add a duplicate signature to the Root file for testing purposes).
root_file = os.path.join(metadata_directory, 'root.json')
signable = securesystemslib.util.load_json_file(os.path.join(metadata_directory, 'root.json'))
signable['signatures'].append(signable['signatures'][0])
storage_backend = securesystemslib.storage.FilesystemBackend()
repo_lib.write_metadata_file(signable, root_file, 8, False, storage_backend)
filenames = repo_lib.get_metadata_filenames(metadata_directory)
repository = repo_tool.create_new_repository(repository_directory, repository_name)
repo_lib._load_top_level_metadata(repository, filenames, repository_name)
# Partially write all top-level roles (we increase the threshold of each
# top-level role so that they are flagged as partially written.
repository.root.threshold = repository.root.threshold + 1
repository.snapshot.threshold = repository.snapshot.threshold + 1
repository.targets.threshold = repository.targets.threshold + 1
repository.timestamp.threshold = repository.timestamp.threshold + 1
repository.write('root', )
repository.write('snapshot')
repository.write('targets')
repository.write('timestamp')
repo_lib._load_top_level_metadata(repository, filenames, repository_name)
# Attempt to load a repository with missing top-level metadata.
for role_file in os.listdir(metadata_directory):
if role_file.endswith('.json') and not role_file.startswith('root'):
role_filename = os.path.join(metadata_directory, role_file)
os.remove(role_filename)
self.assertRaises(tuf.exceptions.RepositoryError,
repo_lib._load_top_level_metadata, repository, filenames,
repository_name)
# Remove the required Root file and verify that an exception is raised.
os.remove(os.path.join(metadata_directory, 'root.json'))
self.assertRaises(tuf.exceptions.RepositoryError,
repo_lib._load_top_level_metadata, repository, filenames,
repository_name)
def test__remove_invalid_and_duplicate_signatures(self):
# Remove duplicate PSS signatures (same key generates valid, but different
# signatures). First load a valid signable (in this case, the root role).
repository_name = 'test_repository'
root_filepath = os.path.join('repository_data', 'repository',
'metadata', 'root.json')
root_signable = securesystemslib.util.load_json_file(root_filepath)
key_filepath = os.path.join('repository_data', 'keystore', 'root_key')
root_rsa_key = repo_lib.import_rsa_privatekey_from_file(key_filepath,
'password')
# Add 'root_rsa_key' to tuf.keydb, since
# _remove_invalid_and_duplicate_signatures() checks for unknown keys in
# tuf.keydb.
tuf.keydb.add_key(root_rsa_key, repository_name=repository_name)
# Append the new valid, but duplicate PSS signature, and test that
# duplicates are removed. create_signature() generates a key for the
# key type of the first argument (i.e., root_rsa_key).
data = securesystemslib.formats.encode_canonical(root_signable['signed']).encode('utf-8')
new_pss_signature = securesystemslib.keys.create_signature(root_rsa_key,
data)
root_signable['signatures'].append(new_pss_signature)
expected_number_of_signatures = len(root_signable['signatures'])
tuf.repository_lib._remove_invalid_and_duplicate_signatures(root_signable,
repository_name)
self.assertEqual(len(root_signable), expected_number_of_signatures)
# Test for an invalid keyid.
root_signable['signatures'][0]['keyid'] = '404'
tuf.repository_lib._remove_invalid_and_duplicate_signatures(root_signable,
repository_name)
# Re-add a valid signature for the following test condition.
root_signable['signatures'].append(new_pss_signature)
# Test that an exception is not raised if an invalid sig is present,
# and that the duplicate key is removed 'root_signable'.
root_signable['signatures'][0]['sig'] = '4040'
invalid_keyid = root_signable['signatures'][0]['keyid']
tuf.repository_lib._remove_invalid_and_duplicate_signatures(root_signable,
repository_name)
for signature in root_signable['signatures']:
self.assertFalse(invalid_keyid == signature['keyid'])
# Run the test cases.
if __name__ == '__main__':
unittest.main()