mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Currently TestTimestamp creates custom databases but only clears the default ones. This means next create_*db() call will fail meaning every test after this one will fail (currently TestTimestamp happens to be last but the effect can be seen by renaming it to TestATimestamp). Also remove the clear_*db() calls from TestRepository::Setup(): they are likely to be a workaround for a similar problem earlier (earlier test failed to cleanup). Signed-off-by: Jussi Kukkonen <jkukkonen@vmware.com>
2167 lines
84 KiB
Python
Executable file
2167 lines
84 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
# Copyright 2014 - 2017, New York University and the TUF contributors
|
|
# SPDX-License-Identifier: MIT OR Apache-2.0
|
|
|
|
"""
|
|
<Program Name>
|
|
test_repository_tool.py
|
|
|
|
<Author>
|
|
Vladimir Diaz <vladimir.v.diaz@gmail.com>
|
|
|
|
<Started>
|
|
April 7, 2014.
|
|
|
|
<Copyright>
|
|
See LICENSE-MIT OR LICENSE for licensing information.
|
|
|
|
<Purpose>
|
|
Unit test for 'repository_tool.py'.
|
|
"""
|
|
|
|
# Help with Python 3 compatibility, where the print statement is a function, an
|
|
# implicit relative import is invalid, and the '/' operator performs true
|
|
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
|
|
from __future__ import print_function
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import unicode_literals
|
|
|
|
import os
|
|
import time
|
|
import datetime
|
|
import unittest
|
|
import logging
|
|
import tempfile
|
|
import shutil
|
|
import sys
|
|
import errno
|
|
|
|
import tuf
|
|
import tuf.log
|
|
import tuf.formats
|
|
import tuf.roledb
|
|
import tuf.keydb
|
|
|
|
import tuf.repository_tool as repo_tool
|
|
import securesystemslib.exceptions
|
|
|
|
import securesystemslib
|
|
import securesystemslib.storage
|
|
import six
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
repo_tool.disable_console_log_messages()
|
|
|
|
|
|
class TestRepository(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
# setUpClass() is called before tests in an individual class are executed.
|
|
|
|
# Create a temporary directory to store the repository, metadata, and target
|
|
# files. 'temporary_directory' must be deleted in TearDownClass() so that
|
|
# temporary files are always removed, even when exceptions occur.
|
|
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
|
|
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
|
|
# tearDownModule() is called after all the tests have run.
|
|
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
|
|
|
|
# Remove the temporary repository directory, which should contain all the
|
|
# metadata, targets, and key files generated for the test cases.
|
|
shutil.rmtree(cls.temporary_directory)
|
|
|
|
|
|
|
|
def setUp(self):
|
|
tuf.roledb.create_roledb('test_repository')
|
|
tuf.keydb.create_keydb('test_repository')
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb(clear_all=True)
|
|
tuf.keydb.clear_keydb(clear_all=True)
|
|
|
|
|
|
def test_init(self):
|
|
# Test normal case.
|
|
repository_name = 'test_repository'
|
|
storage_backend = securesystemslib.storage.FilesystemBackend()
|
|
repository = repo_tool.Repository('repository_directory/',
|
|
'metadata_directory/', 'targets_directory/', storage_backend,
|
|
repository_name)
|
|
self.assertTrue(isinstance(repository.root, repo_tool.Root))
|
|
self.assertTrue(isinstance(repository.snapshot, repo_tool.Snapshot))
|
|
self.assertTrue(isinstance(repository.timestamp, repo_tool.Timestamp))
|
|
self.assertTrue(isinstance(repository.targets, repo_tool.Targets))
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_tool.Repository,
|
|
storage_backend, 3, 'metadata_directory/', 'targets_directory')
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_tool.Repository,
|
|
'repository_directory', storage_backend, 3, 'targets_directory')
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_tool.Repository,
|
|
'repository_directory', 'metadata_directory', 3, storage_backend)
|
|
|
|
|
|
|
|
def create_repository_directory(self):
|
|
# Create a repository directory and copy in test targets data
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
targets_directory = os.path.join(temporary_directory, 'repository',
|
|
repo_tool.TARGETS_DIRECTORY_NAME)
|
|
original_targets_directory = os.path.join('repository_data',
|
|
'repository', 'targets')
|
|
shutil.copytree(original_targets_directory, targets_directory)
|
|
|
|
# In this case, create_new_repository() creates the 'repository/'
|
|
# sub-directory in 'temporary_directory' if it does not exist.
|
|
return os.path.join(temporary_directory, 'repository')
|
|
|
|
|
|
|
|
|
|
def test_writeall(self):
|
|
# Test creation of a TUF repository.
|
|
#
|
|
# 1. Import public and private keys.
|
|
# 2. Add verification keys.
|
|
# 3. Load signing keys.
|
|
# 4. Add target files.
|
|
# 5. Perform delegation.
|
|
# 6. writeall()
|
|
#
|
|
# Copy the target files from 'tuf/tests/repository_data' so that writeall()
|
|
# has target fileinfo to include in metadata.
|
|
repository_name = 'test_repository'
|
|
repository_directory = self.create_repository_directory()
|
|
metadata_directory = os.path.join(repository_directory,
|
|
repo_tool.METADATA_STAGED_DIRECTORY_NAME)
|
|
|
|
repository = repo_tool.create_new_repository(repository_directory, repository_name)
|
|
|
|
# (1) Load the public and private keys of the top-level roles, and one
|
|
# delegated role.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
|
|
# Load the public keys.
|
|
root_pubkey_path = os.path.join(keystore_directory, 'root_key.pub')
|
|
targets_pubkey_path = os.path.join(keystore_directory, 'targets_key.pub')
|
|
snapshot_pubkey_path = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
timestamp_pubkey_path = os.path.join(keystore_directory, 'timestamp_key.pub')
|
|
role1_pubkey_path = os.path.join(keystore_directory, 'delegation_key.pub')
|
|
|
|
root_pubkey = repo_tool.import_rsa_publickey_from_file(root_pubkey_path)
|
|
targets_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(targets_pubkey_path)
|
|
snapshot_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(snapshot_pubkey_path)
|
|
timestamp_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(timestamp_pubkey_path)
|
|
role1_pubkey = repo_tool.import_ed25519_publickey_from_file(role1_pubkey_path)
|
|
|
|
# Load the private keys.
|
|
root_privkey_path = os.path.join(keystore_directory, 'root_key')
|
|
targets_privkey_path = os.path.join(keystore_directory, 'targets_key')
|
|
snapshot_privkey_path = os.path.join(keystore_directory, 'snapshot_key')
|
|
timestamp_privkey_path = os.path.join(keystore_directory, 'timestamp_key')
|
|
role1_privkey_path = os.path.join(keystore_directory, 'delegation_key')
|
|
|
|
root_privkey = \
|
|
repo_tool.import_rsa_privatekey_from_file(root_privkey_path, 'password')
|
|
targets_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(targets_privkey_path,
|
|
'password')
|
|
snapshot_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(snapshot_privkey_path,
|
|
'password')
|
|
timestamp_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(timestamp_privkey_path,
|
|
'password')
|
|
role1_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(role1_privkey_path,
|
|
'password')
|
|
|
|
|
|
# (2) Add top-level verification keys.
|
|
repository.root.add_verification_key(root_pubkey)
|
|
repository.targets.add_verification_key(targets_pubkey)
|
|
repository.snapshot.add_verification_key(snapshot_pubkey)
|
|
|
|
# Verify that repository.writeall() fails for insufficient threshold
|
|
# of signatures (default threshold = 1).
|
|
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall)
|
|
|
|
repository.timestamp.add_verification_key(timestamp_pubkey)
|
|
|
|
|
|
# (3) Load top-level signing keys.
|
|
repository.status()
|
|
repository.root.load_signing_key(root_privkey)
|
|
repository.status()
|
|
repository.targets.load_signing_key(targets_privkey)
|
|
repository.status()
|
|
repository.snapshot.load_signing_key(snapshot_privkey)
|
|
repository.status()
|
|
|
|
# Verify that repository.writeall() fails for insufficient threshold
|
|
# of signatures (default threshold = 1).
|
|
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall)
|
|
|
|
repository.timestamp.load_signing_key(timestamp_privkey)
|
|
|
|
|
|
# (4) Add target files.
|
|
target1 = 'file1.txt'
|
|
target2 = 'file2.txt'
|
|
target3 = 'file3.txt'
|
|
repository.targets.add_target(target1)
|
|
repository.targets.add_target(target2)
|
|
|
|
# (5) Perform delegation.
|
|
repository.targets.delegate('role1', [role1_pubkey], [target3])
|
|
repository.targets('role1').load_signing_key(role1_privkey)
|
|
|
|
# (6) Write repository.
|
|
repository.writeall()
|
|
|
|
# Verify that the expected metadata is written.
|
|
for role in ['root.json', 'targets.json', 'snapshot.json', 'timestamp.json']:
|
|
role_filepath = os.path.join(metadata_directory, role)
|
|
role_signable = securesystemslib.util.load_json_file(role_filepath)
|
|
|
|
# Raise 'securesystemslib.exceptions.FormatError' if 'role_signable' is
|
|
# an invalid signable.
|
|
tuf.formats.check_signable_object_format(role_signable)
|
|
|
|
self.assertTrue(os.path.exists(role_filepath))
|
|
|
|
# Verify the 'role1.json' delegation is also written.
|
|
role1_filepath = os.path.join(metadata_directory, 'role1.json')
|
|
role1_signable = securesystemslib.util.load_json_file(role1_filepath)
|
|
tuf.formats.check_signable_object_format(role1_signable)
|
|
|
|
# Verify that an exception is *not* raised for multiple
|
|
# repository.writeall().
|
|
repository.writeall()
|
|
|
|
# Verify that status() does not raise an exception.
|
|
repository.status()
|
|
|
|
# Verify that status() does not raise
|
|
# 'tuf.exceptions.InsufficientKeysError' if a top-level role
|
|
# does not contain a threshold of keys.
|
|
targets_roleinfo = tuf.roledb.get_roleinfo('targets', repository_name)
|
|
old_threshold = targets_roleinfo['threshold']
|
|
targets_roleinfo['threshold'] = 10
|
|
tuf.roledb.update_roleinfo('targets', targets_roleinfo,
|
|
repository_name=repository_name)
|
|
repository.status()
|
|
|
|
# Restore the original threshold values.
|
|
targets_roleinfo = tuf.roledb.get_roleinfo('targets', repository_name)
|
|
targets_roleinfo['threshold'] = old_threshold
|
|
tuf.roledb.update_roleinfo('targets', targets_roleinfo,
|
|
repository_name=repository_name)
|
|
|
|
# Verify that status() does not raise
|
|
# 'tuf.exceptions.InsufficientKeysError' if a delegated role
|
|
# does not contain a threshold of keys.
|
|
role1_roleinfo = tuf.roledb.get_roleinfo('role1', repository_name)
|
|
old_role1_threshold = role1_roleinfo['threshold']
|
|
role1_roleinfo['threshold'] = 10
|
|
tuf.roledb.update_roleinfo('role1', role1_roleinfo,
|
|
repository_name=repository_name)
|
|
repository.status()
|
|
|
|
# Restore role1's threshold.
|
|
role1_roleinfo = tuf.roledb.get_roleinfo('role1', repository_name)
|
|
role1_roleinfo['threshold'] = old_role1_threshold
|
|
tuf.roledb.update_roleinfo('role1', role1_roleinfo,
|
|
repository_name=repository_name)
|
|
|
|
# Verify status() does not raise 'tuf.exceptions.UnsignedMetadataError' if any of the
|
|
# the top-level roles. Test that 'root' is improperly signed.
|
|
repository.root.unload_signing_key(root_privkey)
|
|
repository.root.load_signing_key(targets_privkey)
|
|
repository.status()
|
|
|
|
repository.targets('role1').unload_signing_key(role1_privkey)
|
|
repository.targets('role1').load_signing_key(targets_privkey)
|
|
repository.status()
|
|
|
|
# Reset Root and 'role1', and verify Targets.
|
|
repository.root.unload_signing_key(targets_privkey)
|
|
repository.root.load_signing_key(root_privkey)
|
|
repository.targets('role1').unload_signing_key(targets_privkey)
|
|
repository.targets('role1').load_signing_key(role1_privkey)
|
|
repository.targets.unload_signing_key(targets_privkey)
|
|
repository.targets.load_signing_key(snapshot_privkey)
|
|
repository.status()
|
|
|
|
# Reset Targets and verify Snapshot.
|
|
repository.targets.unload_signing_key(snapshot_privkey)
|
|
repository.targets.load_signing_key(targets_privkey)
|
|
repository.snapshot.unload_signing_key(snapshot_privkey)
|
|
repository.snapshot.load_signing_key(timestamp_privkey)
|
|
repository.status()
|
|
|
|
# Reset Snapshot and verify timestamp.
|
|
repository.snapshot.unload_signing_key(timestamp_privkey)
|
|
repository.snapshot.load_signing_key(snapshot_privkey)
|
|
repository.timestamp.unload_signing_key(timestamp_privkey)
|
|
repository.timestamp.load_signing_key(root_privkey)
|
|
repository.status()
|
|
|
|
# Reset Timestamp
|
|
repository.timestamp.unload_signing_key(root_privkey)
|
|
repository.timestamp.load_signing_key(timestamp_privkey)
|
|
|
|
# Verify that a writeall() fails if a repository is loaded and a change
|
|
# is made to a role.
|
|
repo_tool.load_repository(repository_directory, repository_name)
|
|
|
|
repository.timestamp.expiration = datetime.datetime(2030, 1, 1, 12, 0)
|
|
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall)
|
|
|
|
# Load the required Timestamp key so that a valid repository can be written.
|
|
repository.timestamp.load_signing_key(timestamp_privkey)
|
|
repository.writeall()
|
|
|
|
# Test creation of a consistent snapshot repository. Writing a consistent
|
|
# snapshot modifies the Root metadata, which specifies whether a repository
|
|
# supports consistent snapshot. Verify that an exception is raised due to
|
|
# the missing signature of Root.
|
|
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall, True)
|
|
|
|
# Make sure the private keys of Root (new version required since Root will
|
|
# change to enable consistent snapshot), Snapshot, role1, and timestamp
|
|
# loaded before writing consistent snapshot.
|
|
repository.root.load_signing_key(root_privkey)
|
|
repository.snapshot.load_signing_key(snapshot_privkey)
|
|
# Must also load targets signing key, because targets is re-signed when
|
|
# updating 'role1'.
|
|
repository.targets.load_signing_key(targets_privkey)
|
|
repository.targets('role1').load_signing_key(role1_privkey)
|
|
|
|
# Verify that a consistent snapshot can be written and loaded. The roles
|
|
# above must be marked as dirty, otherwise writeall() will not create a
|
|
# consistent snapshot for them.
|
|
repository.mark_dirty(['role1', 'targets', 'root', 'snapshot', 'timestamp'])
|
|
repository.writeall(consistent_snapshot=True)
|
|
|
|
# Verify that the newly written consistent snapshot can be loaded
|
|
# successfully.
|
|
repo_tool.load_repository(repository_directory, repository_name)
|
|
|
|
# Verify the behavior of marking and unmarking roles as dirty.
|
|
# We begin by ensuring that writeall() cleared the list of dirty roles..
|
|
self.assertEqual([], tuf.roledb.get_dirty_roles(repository_name))
|
|
|
|
repository.mark_dirty(['root', 'timestamp'])
|
|
self.assertEqual(['root', 'timestamp'], tuf.roledb.get_dirty_roles(repository_name))
|
|
repository.unmark_dirty(['root'])
|
|
self.assertEqual(['timestamp'], tuf.roledb.get_dirty_roles(repository_name))
|
|
|
|
# Ensure status() does not leave behind any dirty roles.
|
|
repository.status()
|
|
self.assertEqual(['timestamp'], tuf.roledb.get_dirty_roles(repository_name))
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repository.writeall, 3)
|
|
|
|
|
|
def test_writeall_no_files(self):
|
|
# Test writeall() when using pre-supplied fileinfo
|
|
|
|
repository_name = 'test_repository'
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
repository_directory = os.path.join(temporary_directory, 'repository')
|
|
targets_directory = os.path.join(repository_directory,
|
|
repo_tool.TARGETS_DIRECTORY_NAME)
|
|
|
|
repository = repo_tool.create_new_repository(repository_directory, repository_name)
|
|
|
|
# (1) Load the public and private keys of the top-level roles, and one
|
|
# delegated role.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
|
|
# Load the public keys.
|
|
root_pubkey_path = os.path.join(keystore_directory, 'root_key.pub')
|
|
targets_pubkey_path = os.path.join(keystore_directory, 'targets_key.pub')
|
|
snapshot_pubkey_path = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
timestamp_pubkey_path = os.path.join(keystore_directory, 'timestamp_key.pub')
|
|
|
|
root_pubkey = repo_tool.import_rsa_publickey_from_file(root_pubkey_path)
|
|
targets_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(targets_pubkey_path)
|
|
snapshot_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(snapshot_pubkey_path)
|
|
timestamp_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(timestamp_pubkey_path)
|
|
|
|
# Load the private keys.
|
|
root_privkey_path = os.path.join(keystore_directory, 'root_key')
|
|
targets_privkey_path = os.path.join(keystore_directory, 'targets_key')
|
|
snapshot_privkey_path = os.path.join(keystore_directory, 'snapshot_key')
|
|
timestamp_privkey_path = os.path.join(keystore_directory, 'timestamp_key')
|
|
|
|
root_privkey = \
|
|
repo_tool.import_rsa_privatekey_from_file(root_privkey_path, 'password')
|
|
targets_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(targets_privkey_path,
|
|
'password')
|
|
snapshot_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(snapshot_privkey_path,
|
|
'password')
|
|
timestamp_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(timestamp_privkey_path,
|
|
'password')
|
|
|
|
|
|
# (2) Add top-level verification keys.
|
|
repository.root.add_verification_key(root_pubkey)
|
|
repository.targets.add_verification_key(targets_pubkey)
|
|
repository.snapshot.add_verification_key(snapshot_pubkey)
|
|
|
|
# Verify that repository.writeall() fails for insufficient threshold
|
|
# of signatures (default threshold = 1).
|
|
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall)
|
|
|
|
repository.timestamp.add_verification_key(timestamp_pubkey)
|
|
|
|
|
|
# (3) Load top-level signing keys.
|
|
repository.status()
|
|
repository.root.load_signing_key(root_privkey)
|
|
repository.status()
|
|
repository.targets.load_signing_key(targets_privkey)
|
|
repository.status()
|
|
repository.snapshot.load_signing_key(snapshot_privkey)
|
|
repository.status()
|
|
|
|
# Verify that repository.writeall() fails for insufficient threshold
|
|
# of signatures (default threshold = 1).
|
|
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall)
|
|
|
|
repository.timestamp.load_signing_key(timestamp_privkey)
|
|
|
|
# Add target fileinfo
|
|
target1_hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'}
|
|
target2_hashes = {'sha256': '517c0ce943e7274a2431fa5751e17cfd5225accd23e479bfaad13007751e87ef'}
|
|
target1_fileinfo = tuf.formats.make_fileinfo(555, target1_hashes)
|
|
target2_fileinfo = tuf.formats.make_fileinfo(37, target2_hashes)
|
|
target1 = 'file1.txt'
|
|
target2 = 'file2.txt'
|
|
repository.targets.add_target(target1, fileinfo=target1_fileinfo)
|
|
repository.targets.add_target(target2, fileinfo=target2_fileinfo)
|
|
|
|
repository.writeall(use_existing_fileinfo=True)
|
|
|
|
# Verify that the expected metadata is written.
|
|
metadata_directory = os.path.join(repository_directory,
|
|
repo_tool.METADATA_STAGED_DIRECTORY_NAME)
|
|
|
|
for role in ['root.json', 'targets.json', 'snapshot.json', 'timestamp.json']:
|
|
role_filepath = os.path.join(metadata_directory, role)
|
|
role_signable = securesystemslib.util.load_json_file(role_filepath)
|
|
|
|
# Raise 'securesystemslib.exceptions.FormatError' if 'role_signable' is
|
|
# an invalid signable.
|
|
tuf.formats.check_signable_object_format(role_signable)
|
|
|
|
self.assertTrue(os.path.exists(role_filepath))
|
|
|
|
|
|
|
|
def test_get_filepaths_in_directory(self):
|
|
# Test normal case.
|
|
# Use the pre-generated metadata directory for testing.
|
|
# Set 'repo' reference to improve readability.
|
|
repo = repo_tool.Repository
|
|
metadata_directory = os.path.join('repository_data',
|
|
'repository', 'metadata')
|
|
|
|
# Verify the expected filenames. get_filepaths_in_directory() returns
|
|
# a list of absolute paths.
|
|
metadata_files = repo.get_filepaths_in_directory(metadata_directory)
|
|
|
|
# Construct list of file paths expected, determining absolute paths.
|
|
expected_files = []
|
|
for filepath in ['1.root.json', 'root.json', 'targets.json',
|
|
'snapshot.json', 'timestamp.json', 'role1.json', 'role2.json']:
|
|
expected_files.append(os.path.abspath(os.path.join(
|
|
'repository_data', 'repository', 'metadata', filepath)))
|
|
|
|
self.assertEqual(sorted(expected_files), sorted(metadata_files))
|
|
|
|
|
|
# Test when the 'recursive_walk' argument is True.
|
|
# In this case, recursive walk should yield the same results as the
|
|
# previous, non-recursive call.
|
|
metadata_files = repo.get_filepaths_in_directory(metadata_directory,
|
|
recursive_walk=True)
|
|
self.assertEqual(sorted(expected_files), sorted(metadata_files))
|
|
|
|
# And this recursive call from the directory above should yield the same
|
|
# results as well, plus extra files.
|
|
metadata_files = repo.get_filepaths_in_directory(
|
|
os.path.join('repository_data', 'repository'), recursive_walk=True)
|
|
for expected_file in expected_files:
|
|
self.assertIn(expected_file, metadata_files)
|
|
# self.assertEqual(sorted(expected_files), sorted(metadata_files))
|
|
|
|
# Now let's check it against the full list of expected files for the parent
|
|
# directory.... We'll add to the existing list. Expect the same files in
|
|
# metadata.staged/ as in metadata/, and a few target files in targets/
|
|
# This is somewhat redundant with the previous test, but together they're
|
|
# probably more future-proof.
|
|
for filepath in ['file1.txt', 'file2.txt', 'file3.txt']:
|
|
expected_files.append(os.path.abspath(os.path.join(
|
|
'repository_data', 'repository', 'targets', filepath)))
|
|
for filepath in [ '1.root.json', 'root.json', 'targets.json',
|
|
'snapshot.json', 'timestamp.json', 'role1.json', 'role2.json']:
|
|
expected_files.append(os.path.abspath(os.path.join(
|
|
'repository_data', 'repository', 'metadata.staged', filepath)))
|
|
|
|
self.assertEqual(sorted(expected_files), sorted(metadata_files))
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo.get_filepaths_in_directory,
|
|
3, recursive_walk=False, followlinks=False)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo.get_filepaths_in_directory,
|
|
metadata_directory, 3, followlinks=False)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo.get_filepaths_in_directory,
|
|
metadata_directory, recursive_walk=False, followlinks=3)
|
|
|
|
# Test invalid directory argument.
|
|
# A non-directory.
|
|
self.assertRaises(securesystemslib.exceptions.Error, repo.get_filepaths_in_directory,
|
|
os.path.join(metadata_directory, 'root.json'))
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
nonexistent_directory = os.path.join(temporary_directory, 'nonexistent/')
|
|
self.assertRaises(securesystemslib.exceptions.Error, repo.get_filepaths_in_directory,
|
|
nonexistent_directory, recursive_walk=False,
|
|
followlinks=False)
|
|
|
|
|
|
|
|
def test_writeall_abstract_storage(self):
|
|
# Test creation of a TUF repository with a custom storage backend to ensure
|
|
# that functions relying on a storage backend being supplied operate
|
|
# correctly
|
|
|
|
|
|
class TestStorageBackend(securesystemslib.storage.StorageBackendInterface):
|
|
"""
|
|
An implementation of securesystemslib.storage.StorageBackendInterface
|
|
which mutates filenames on put()/get(), translating filename in memory
|
|
to filename + '.tst' on-disk, such that trying to read the
|
|
expected/canonical file paths from local storage doesn't find the TUF
|
|
metadata files.
|
|
"""
|
|
|
|
from contextlib import contextmanager
|
|
|
|
|
|
@contextmanager
|
|
def get(self, filepath):
|
|
file_object = open(filepath + '.tst', 'rb')
|
|
yield file_object
|
|
file_object.close()
|
|
|
|
|
|
def put(self, fileobj, filepath):
|
|
if not fileobj.closed:
|
|
fileobj.seek(0)
|
|
|
|
with open(filepath + '.tst', 'wb') as destination_file:
|
|
shutil.copyfileobj(fileobj, destination_file)
|
|
destination_file.flush()
|
|
os.fsync(destination_file.fileno())
|
|
|
|
|
|
def remove(self, filepath):
|
|
os.remove(filepath + '.tst')
|
|
|
|
|
|
def getsize(self, filepath):
|
|
return os.path.getsize(filepath + '.tst')
|
|
|
|
|
|
def create_folder(self, filepath):
|
|
if not filepath:
|
|
return
|
|
try:
|
|
os.makedirs(filepath)
|
|
except OSError as err:
|
|
pass
|
|
|
|
|
|
def list_folder(self, filepath):
|
|
contents = []
|
|
files = os.listdir(filepath)
|
|
|
|
for fi in files:
|
|
if fi.endswith('.tst'):
|
|
contents.append(fi.split('.tst')[0])
|
|
else:
|
|
contents.append(fi)
|
|
|
|
return contents
|
|
|
|
|
|
|
|
# Set up the repository directory
|
|
repository_name = 'test_repository'
|
|
repository_directory = self.create_repository_directory()
|
|
metadata_directory = os.path.join(repository_directory,
|
|
repo_tool.METADATA_STAGED_DIRECTORY_NAME)
|
|
targets_directory = os.path.join(repository_directory,
|
|
repo_tool.TARGETS_DIRECTORY_NAME)
|
|
|
|
# TestStorageBackend expects all files on disk to have an additional '.tst'
|
|
# file extension
|
|
for target in os.listdir(targets_directory):
|
|
src = os.path.join(targets_directory, target)
|
|
dst = os.path.join(targets_directory, target + '.tst')
|
|
os.rename(src, dst)
|
|
|
|
# (0) Create a repository with TestStorageBackend()
|
|
storage_backend = TestStorageBackend()
|
|
repository = repo_tool.create_new_repository(repository_directory,
|
|
repository_name,
|
|
storage_backend)
|
|
|
|
# (1) Load the public and private keys of the top-level roles, and one
|
|
# delegated role.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
|
|
# Load the public keys.
|
|
root_pubkey_path = os.path.join(keystore_directory, 'root_key.pub')
|
|
targets_pubkey_path = os.path.join(keystore_directory, 'targets_key.pub')
|
|
snapshot_pubkey_path = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
timestamp_pubkey_path = os.path.join(keystore_directory, 'timestamp_key.pub')
|
|
|
|
root_pubkey = repo_tool.import_rsa_publickey_from_file(root_pubkey_path)
|
|
targets_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(targets_pubkey_path)
|
|
snapshot_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(snapshot_pubkey_path)
|
|
timestamp_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(timestamp_pubkey_path)
|
|
|
|
# Load the private keys.
|
|
root_privkey_path = os.path.join(keystore_directory, 'root_key')
|
|
targets_privkey_path = os.path.join(keystore_directory, 'targets_key')
|
|
snapshot_privkey_path = os.path.join(keystore_directory, 'snapshot_key')
|
|
timestamp_privkey_path = os.path.join(keystore_directory, 'timestamp_key')
|
|
|
|
root_privkey = \
|
|
repo_tool.import_rsa_privatekey_from_file(root_privkey_path, 'password')
|
|
targets_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(targets_privkey_path,
|
|
'password')
|
|
snapshot_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(snapshot_privkey_path,
|
|
'password')
|
|
timestamp_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(timestamp_privkey_path,
|
|
'password')
|
|
|
|
|
|
# (2) Add top-level verification keys.
|
|
repository.root.add_verification_key(root_pubkey)
|
|
repository.targets.add_verification_key(targets_pubkey)
|
|
repository.snapshot.add_verification_key(snapshot_pubkey)
|
|
repository.timestamp.add_verification_key(timestamp_pubkey)
|
|
|
|
|
|
# (3) Load top-level signing keys.
|
|
repository.root.load_signing_key(root_privkey)
|
|
repository.targets.load_signing_key(targets_privkey)
|
|
repository.snapshot.load_signing_key(snapshot_privkey)
|
|
repository.timestamp.load_signing_key(timestamp_privkey)
|
|
|
|
|
|
# (4) Add target files.
|
|
target1 = 'file1.txt'
|
|
target2 = 'file2.txt'
|
|
target3 = 'file3.txt'
|
|
repository.targets.add_target(target1)
|
|
repository.targets.add_target(target2)
|
|
repository.targets.add_target(target3)
|
|
|
|
# (6) Write repository.
|
|
repository.writeall()
|
|
|
|
|
|
# Ensure all of the metadata files exist at the mutated file location and
|
|
# that those files are valid metadata
|
|
for role in ['root.json.tst', 'targets.json.tst', 'snapshot.json.tst',
|
|
'timestamp.json.tst']:
|
|
role_filepath = os.path.join(metadata_directory, role)
|
|
self.assertTrue(os.path.exists(role_filepath))
|
|
|
|
role_signable = securesystemslib.util.load_json_file(role_filepath)
|
|
# Raise 'securesystemslib.exceptions.FormatError' if 'role_signable' is
|
|
# an invalid signable.
|
|
tuf.formats.check_signable_object_format(role_signable)
|
|
|
|
|
|
|
|
|
|
|
|
class TestMetadata(unittest.TestCase):
|
|
def setUp(self):
|
|
# Inherit from the repo_tool.Metadata() base class. All of the methods
|
|
# to be tested in TestMetadata require at least 1 role, so create it here
|
|
# and set its roleinfo.
|
|
|
|
tuf.roledb.create_roledb('test_repository')
|
|
tuf.keydb.create_keydb('test_repository')
|
|
|
|
class MetadataRole(repo_tool.Metadata):
|
|
def __init__(self):
|
|
super(MetadataRole, self).__init__()
|
|
|
|
self._rolename = 'metadata_role'
|
|
self._repository_name = 'test_repository'
|
|
|
|
# Expire in 86400 seconds (1 day).
|
|
expiration = \
|
|
tuf.formats.unix_timestamp_to_datetime(int(time.time() + 86400))
|
|
expiration = expiration.isoformat() + 'Z'
|
|
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
|
|
'signatures': [], 'version': 0,
|
|
'consistent_snapshot': False,
|
|
'expires': expiration,
|
|
'partial_loaded': False}
|
|
|
|
tuf.roledb.add_role(self._rolename, roleinfo,
|
|
repository_name='test_repository')
|
|
|
|
self.metadata = MetadataRole()
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb(clear_all=True)
|
|
tuf.keydb.clear_keydb(clear_all=True)
|
|
self.metadata = None
|
|
|
|
|
|
|
|
def test_rolename(self):
|
|
base_metadata = repo_tool.Metadata()
|
|
|
|
self.assertEqual(base_metadata.rolename, None)
|
|
|
|
# Test the sub-classed MetadataRole().
|
|
self.assertEqual(self.metadata.rolename, 'metadata_role')
|
|
|
|
|
|
|
|
def test_version(self):
|
|
# Test version getter, and the default version number.
|
|
self.assertEqual(self.metadata.version, 0)
|
|
|
|
# Test version setter, and verify updated version number.
|
|
self.metadata.version = 8
|
|
self.assertEqual(self.metadata.version, 8)
|
|
|
|
|
|
|
|
def test_threshold(self):
|
|
# Test threshold getter, and the default threshold number.
|
|
self.assertEqual(self.metadata.threshold, 1)
|
|
|
|
# Test threshold setter, and verify updated threshold number.
|
|
self.metadata.threshold = 3
|
|
self.assertEqual(self.metadata.threshold, 3)
|
|
|
|
|
|
|
|
def test_expiration(self):
|
|
# Test expiration getter.
|
|
expiration = self.metadata.expiration
|
|
self.assertTrue(isinstance(expiration, datetime.datetime))
|
|
|
|
# Test expiration setter.
|
|
self.metadata.expiration = datetime.datetime(2030, 1, 1, 12, 0)
|
|
expiration = self.metadata.expiration
|
|
self.assertTrue(isinstance(expiration, datetime.datetime))
|
|
|
|
# test a setter with microseconds, we are forcing the microseconds value
|
|
expiration = datetime.datetime.today() + datetime.timedelta(weeks = 1)
|
|
# we force the microseconds value if we are unlucky enough to get a 0
|
|
if expiration.microsecond == 0:
|
|
expiration = expiration.replace(microsecond = 1)
|
|
|
|
new_expiration = self.metadata.expiration
|
|
self.assertTrue(isinstance(new_expiration, datetime.datetime))
|
|
|
|
# check that the expiration value is truncated
|
|
self.assertTrue(new_expiration.microsecond == 0)
|
|
|
|
# Test improperly formatted datetime.
|
|
try:
|
|
self.metadata.expiration = '3'
|
|
|
|
except securesystemslib.exceptions.FormatError:
|
|
pass
|
|
|
|
else:
|
|
self.fail('Setter failed to detect improperly formatted datetime.')
|
|
|
|
|
|
# Test invalid argument (i.e., expiration has already expired.)
|
|
expired_datetime = tuf.formats.unix_timestamp_to_datetime(int(time.time() - 1))
|
|
try:
|
|
self.metadata.expiration = expired_datetime
|
|
|
|
except securesystemslib.exceptions.Error:
|
|
pass
|
|
|
|
else:
|
|
self.fail('Setter failed to detect an expired datetime.')
|
|
|
|
|
|
|
|
def test_keys(self):
|
|
# Test default case, where a verification key has not been added.
|
|
self.assertEqual(self.metadata.keys, [])
|
|
|
|
|
|
# Test keys() getter after a verification key has been loaded.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'snapshot_key.pub')
|
|
key_object = repo_tool.import_ed25519_publickey_from_file(key_path)
|
|
self.metadata.add_verification_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.keys)
|
|
|
|
|
|
|
|
def test_signing_keys(self):
|
|
# Test default case, where a signing key has not been added.
|
|
self.assertEqual(self.metadata.signing_keys, [])
|
|
|
|
|
|
# Test signing_keys() getter after a signing key has been loaded.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'root_key')
|
|
key_object = repo_tool.import_rsa_privatekey_from_file(key_path, 'password')
|
|
self.metadata.load_signing_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.signing_keys)
|
|
|
|
|
|
|
|
|
|
|
|
def test_add_verification_key(self):
|
|
# Add verification key and verify that it was added via (role).keys.
|
|
key_path = os.path.join('repository_data', 'keystore', 'snapshot_key.pub')
|
|
key_object = repo_tool.import_ed25519_publickey_from_file(key_path)
|
|
self.metadata.add_verification_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.keys)
|
|
|
|
expiration = \
|
|
tuf.formats.unix_timestamp_to_datetime(int(time.time() + 86400))
|
|
expiration = expiration.isoformat() + 'Z'
|
|
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
|
|
'signatures': [], 'version': 0,
|
|
'consistent_snapshot': False, 'expires': expiration,
|
|
'partial_loaded': False}
|
|
|
|
tuf.roledb.add_role('Root', roleinfo, 'test_repository')
|
|
tuf.roledb.add_role('Targets', roleinfo, 'test_repository')
|
|
tuf.roledb.add_role('Snapshot', roleinfo, 'test_repository')
|
|
tuf.roledb.add_role('Timestamp', roleinfo, 'test_repository')
|
|
|
|
# Test for different top-level role names.
|
|
self.metadata._rolename = 'Targets'
|
|
self.metadata.add_verification_key(key_object)
|
|
self.metadata._rolename = 'Snapshot'
|
|
self.metadata.add_verification_key(key_object)
|
|
self.metadata._rolename = 'Timestamp'
|
|
self.metadata.add_verification_key(key_object)
|
|
|
|
# Test for a given 'expires' argument.
|
|
expires = datetime.datetime(2030, 1, 1, 12, 0)
|
|
self.metadata.add_verification_key(key_object, expires)
|
|
|
|
|
|
# Test for an expired 'expires'.
|
|
expired = datetime.datetime(1984, 1, 1, 12, 0)
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
self.metadata.add_verification_key, key_object, expired)
|
|
|
|
# Test improperly formatted key argument.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, self.metadata.add_verification_key, 3)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, self.metadata.add_verification_key, key_object, 3)
|
|
|
|
|
|
|
|
def test_remove_verification_key(self):
|
|
# Add verification key so that remove_verifiation_key() can be tested.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'snapshot_key.pub')
|
|
key_object = repo_tool.import_ed25519_publickey_from_file(key_path)
|
|
self.metadata.add_verification_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.keys)
|
|
|
|
|
|
# Test successful removal of verification key added above.
|
|
self.metadata.remove_verification_key(key_object)
|
|
self.assertEqual(self.metadata.keys, [])
|
|
|
|
|
|
# Test improperly formatted argument
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, self.metadata.remove_verification_key, 3)
|
|
|
|
|
|
# Test non-existent public key argument.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'targets_key.pub')
|
|
unused_key_object = repo_tool.import_ed25519_publickey_from_file(key_path)
|
|
|
|
self.assertRaises(securesystemslib.exceptions.Error, self.metadata.remove_verification_key,
|
|
unused_key_object)
|
|
|
|
|
|
|
|
def test_load_signing_key(self):
|
|
# Test normal case.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'snapshot_key')
|
|
key_object = repo_tool.import_ed25519_privatekey_from_file(key_path, 'password')
|
|
self.metadata.load_signing_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.signing_keys)
|
|
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, self.metadata.load_signing_key, 3)
|
|
|
|
|
|
# Test non-private key.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'snapshot_key.pub')
|
|
key_object = repo_tool.import_ed25519_publickey_from_file(key_path)
|
|
self.assertRaises(securesystemslib.exceptions.Error, self.metadata.load_signing_key, key_object)
|
|
|
|
|
|
|
|
def test_unload_signing_key(self):
|
|
# Load a signing key so that unload_signing_key() can have a key to unload.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'snapshot_key')
|
|
key_object = repo_tool.import_ed25519_privatekey_from_file(key_path, 'password')
|
|
self.metadata.load_signing_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.signing_keys)
|
|
|
|
self.metadata.unload_signing_key(key_object)
|
|
|
|
self.assertEqual(self.metadata.signing_keys, [])
|
|
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, self.metadata.unload_signing_key, 3)
|
|
|
|
|
|
# Test non-existent key argument.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'targets_key')
|
|
unused_key_object = repo_tool.import_ed25519_privatekey_from_file(key_path,
|
|
'password')
|
|
|
|
self.assertRaises(securesystemslib.exceptions.Error, self.metadata.unload_signing_key,
|
|
unused_key_object)
|
|
|
|
|
|
|
|
def test_add_signature(self):
|
|
# Test normal case.
|
|
# Load signature list from any of pre-generated metadata; needed for
|
|
# testing.
|
|
metadata_directory = os.path.join('repository_data',
|
|
'repository', 'metadata')
|
|
root_filepath = os.path.join(metadata_directory, 'root.json')
|
|
root_signable = securesystemslib.util.load_json_file(root_filepath)
|
|
signatures = root_signable['signatures']
|
|
|
|
# Add the first signature from the list, as only one is needed.
|
|
self.metadata.add_signature(signatures[0])
|
|
self.assertEqual(signatures, self.metadata.signatures)
|
|
|
|
# Verify that a signature is added if a 'signatures' entry is not present.
|
|
tuf.roledb.create_roledb_from_root_metadata(root_signable['signed'], repository_name='test_repository')
|
|
del tuf.roledb._roledb_dict['test_repository']['root']['signatures']
|
|
self.metadata._rolename = 'root'
|
|
self.metadata.add_signature(signatures[0])
|
|
|
|
# Add a duplicate signature.
|
|
self.metadata.add_signature(signatures[0])
|
|
|
|
# Test improperly formatted signature argument.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, self.metadata.add_signature, 3)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, self.metadata.add_signature, signatures[0], 3)
|
|
|
|
|
|
|
|
def test_remove_signature(self):
|
|
# Test normal case.
|
|
# Add a signature so remove_signature() has some signature to remove.
|
|
metadata_directory = os.path.join('repository_data',
|
|
'repository', 'metadata')
|
|
root_filepath = os.path.join(metadata_directory, 'root.json')
|
|
root_signable = securesystemslib.util.load_json_file(root_filepath)
|
|
signatures = root_signable['signatures']
|
|
self.metadata.add_signature(signatures[0])
|
|
|
|
self.metadata.remove_signature(signatures[0])
|
|
self.assertEqual(self.metadata.signatures, [])
|
|
|
|
|
|
# Test improperly formatted signature argument.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.metadata.remove_signature, 3)
|
|
|
|
# Test invalid signature argument (i.e., signature that has not been added.)
|
|
# Load an unused signature to be tested.
|
|
targets_filepath = os.path.join(metadata_directory, 'targets.json')
|
|
targets_signable = securesystemslib.util.load_json_file(targets_filepath)
|
|
signatures = targets_signable['signatures']
|
|
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
self.metadata.remove_signature, signatures[0])
|
|
|
|
|
|
|
|
def test_signatures(self):
|
|
# Test default case, where no signatures have been added yet.
|
|
self.assertEqual(self.metadata.signatures, [])
|
|
|
|
|
|
# Test getter after adding an example signature.
|
|
metadata_directory = os.path.join('repository_data',
|
|
'repository', 'metadata')
|
|
root_filepath = os.path.join(metadata_directory, 'root.json')
|
|
root_signable = securesystemslib.util.load_json_file(root_filepath)
|
|
signatures = root_signable['signatures']
|
|
|
|
# Add the first signature from the list, as only need one is needed.
|
|
self.metadata.add_signature(signatures[0])
|
|
self.assertEqual(signatures, self.metadata.signatures)
|
|
|
|
|
|
|
|
class TestRoot(unittest.TestCase):
|
|
def setUp(self):
|
|
tuf.roledb.create_roledb('test_repository')
|
|
tuf.keydb.create_keydb('test_repository')
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb(clear_all=True)
|
|
tuf.keydb.clear_keydb(clear_all=True)
|
|
|
|
|
|
|
|
def test_init(self):
|
|
|
|
# Test normal case.
|
|
# Root() subclasses Metadata(), and creates a 'root' role in 'tuf.roledb'.
|
|
repository_name = 'test_repository'
|
|
root_object = repo_tool.Root(repository_name)
|
|
self.assertTrue(isinstance(root_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('root', repository_name))
|
|
|
|
|
|
|
|
class TestTimestamp(unittest.TestCase):
|
|
def setUp(self):
|
|
tuf.roledb.create_roledb('test_repository')
|
|
tuf.keydb.create_keydb('test_repository')
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb(clear_all=True)
|
|
tuf.keydb.clear_keydb(clear_all=True)
|
|
|
|
|
|
|
|
def test_init(self):
|
|
|
|
# Test normal case.
|
|
# Timestamp() subclasses Metadata(), and creates a 'timestamp' role in
|
|
# 'tuf.roledb'.
|
|
timestamp_object = repo_tool.Timestamp('test_repository')
|
|
self.assertTrue(isinstance(timestamp_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('timestamp', 'test_repository'))
|
|
|
|
|
|
|
|
|
|
|
|
class TestSnapshot(unittest.TestCase):
|
|
def setUp(self):
|
|
tuf.roledb.create_roledb('test_repository')
|
|
tuf.keydb.create_keydb('test_repository')
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb(clear_all=True)
|
|
tuf.keydb.clear_keydb(clear_all=True)
|
|
|
|
|
|
|
|
def test_init(self):
|
|
|
|
# Test normal case.
|
|
# Snapshot() subclasses Metadata(), and creates a 'snapshot' role in
|
|
# 'tuf.roledb'.
|
|
snapshot_object = repo_tool.Snapshot('test_repository')
|
|
self.assertTrue(isinstance(snapshot_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('snapshot', 'test_repository'))
|
|
|
|
|
|
|
|
|
|
|
|
class TestTargets(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
# setUpClass() is called before tests in an individual class are executed.
|
|
|
|
# Create a temporary directory to store the repository, metadata, and target
|
|
# files. 'temporary_directory' must be deleted in TearDownClass() so that
|
|
# temporary files are always removed, even when exceptions occur.
|
|
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
|
|
|
|
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
|
|
# tearDownModule() is called after all the tests have run.
|
|
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
|
|
|
|
# Remove the temporary repository directory, which should contain all the
|
|
# metadata, targets, and key files generated for the test cases.
|
|
shutil.rmtree(cls.temporary_directory)
|
|
|
|
|
|
|
|
def setUp(self):
|
|
tuf.roledb.create_roledb('test_repository')
|
|
tuf.keydb.create_keydb('test_repository')
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
self.targets_directory = os.path.join(temporary_directory, 'repository',
|
|
'targets')
|
|
original_targets_directory = os.path.join('repository_data',
|
|
'repository', 'targets')
|
|
shutil.copytree(original_targets_directory, self.targets_directory)
|
|
self.targets_object = repo_tool.Targets(self.targets_directory,
|
|
repository_name='test_repository')
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb(clear_all=True)
|
|
tuf.keydb.clear_keydb(clear_all=True)
|
|
self.targets_object = None
|
|
|
|
|
|
|
|
def test_init(self):
|
|
|
|
# Test normal case.
|
|
# Snapshot() subclasses Metadata(), and creates a 'snapshot' role in
|
|
# 'tuf.roledb'.
|
|
targets_object = repo_tool.Targets('targets_directory/')
|
|
self.assertTrue(isinstance(targets_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('targets'))
|
|
|
|
# Custom Targets object rolename.
|
|
targets_object = repo_tool.Targets('targets_directory/', 'project')
|
|
self.assertTrue(isinstance(targets_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('project'))
|
|
|
|
# Custom roleinfo object (i.e., tuf.formats.ROLEDB_SCHEMA). 'keyids' and
|
|
# 'threshold' are required, the rest are optional.
|
|
roleinfo = {'keyids':
|
|
['66c4cb5fef5e4d62b7013ef1cab4b8a827a36c14056d5603c3a970e21eb30e6f'],
|
|
'threshold': 8}
|
|
self.assertTrue(tuf.formats.ROLEDB_SCHEMA.matches(roleinfo))
|
|
|
|
targets_object = repo_tool.Targets('targets_directory/', 'package', roleinfo)
|
|
self.assertTrue(isinstance(targets_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('package'))
|
|
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_tool.Targets, 3)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_tool.Targets, 'targets_directory/', 3)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_tool.Targets, 'targets_directory/',
|
|
'targets', 3)
|
|
|
|
|
|
|
|
def test_call(self):
|
|
# Test normal case.
|
|
# Perform a delegation so that a delegated role can be accessed and tested
|
|
# through __call__(). Example: {targets_object}('role1').
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
|
|
# Create Targets() object to be tested.
|
|
targets_object = repo_tool.Targets(self.targets_directory)
|
|
targets_object.delegate('role1', [public_key], ['file1.txt'])
|
|
|
|
self.assertTrue(isinstance(targets_object('role1'), repo_tool.Targets))
|
|
|
|
# Test invalid (i.e., non-delegated) rolename argument.
|
|
self.assertRaises(tuf.exceptions.UnknownRoleError, targets_object, 'unknown_role')
|
|
|
|
# Test improperly formatted argument.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, targets_object, 1)
|
|
|
|
|
|
|
|
def test_get_delegated_rolenames(self):
|
|
# Test normal case.
|
|
# Perform two delegations so that get_delegated_rolenames() has roles to
|
|
# return.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
|
|
# Set needed arguments by delegate().
|
|
public_keys = [public_key]
|
|
threshold = 1
|
|
|
|
self.targets_object.delegate('tuf', public_keys, [], threshold, False,
|
|
['file1.txt'], path_hash_prefixes=None)
|
|
|
|
self.targets_object.delegate('warehouse', public_keys, [], threshold, False,
|
|
['file2.txt'], path_hash_prefixes=None)
|
|
|
|
# Test that get_delegated_rolenames returns the expected delegations.
|
|
expected_delegated_rolenames = ['targets/tuf/', 'targets/warehouse']
|
|
for delegated_rolename in self.targets_object.get_delegated_rolenames():
|
|
delegated_rolename in expected_delegated_rolenames
|
|
|
|
|
|
|
|
def test_target_files(self):
|
|
# Test normal case.
|
|
# Verify the targets object initially contains zero target files.
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
target_filepath = 'file1.txt'
|
|
self.targets_object.add_target(target_filepath)
|
|
|
|
self.assertEqual(len(self.targets_object.target_files), 1)
|
|
self.assertTrue(target_filepath in self.targets_object.target_files)
|
|
|
|
|
|
|
|
def test_delegations(self):
|
|
# Test normal case.
|
|
# Perform a delegation so that delegations() has a Targets() object to
|
|
# return.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
|
|
# Set needed arguments by delegate().
|
|
public_keys = [public_key]
|
|
rolename = 'tuf'
|
|
paths = ['file1.txt']
|
|
threshold = 1
|
|
|
|
self.targets_object.delegate(rolename, public_keys, paths, threshold,
|
|
terminating=False, list_of_targets=None, path_hash_prefixes=None)
|
|
|
|
# Test that a valid Targets() object is returned by delegations().
|
|
for delegated_object in self.targets_object.delegations:
|
|
self.assertTrue(isinstance(delegated_object, repo_tool.Targets))
|
|
|
|
# For testing / coverage purposes, try to remove a delegated role with the
|
|
# remove_delegated_role() method.
|
|
self.targets_object.remove_delegated_role(rolename)
|
|
|
|
|
|
|
|
def test_add_delegated_role(self):
|
|
# Test for invalid targets object.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.add_delegated_role, 'targets', 'bad_object')
|
|
|
|
|
|
|
|
def test_add_target(self):
|
|
# Test normal case.
|
|
# Verify the targets object initially contains zero target files.
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
target_filepath = 'file1.txt'
|
|
self.targets_object.add_target(target_filepath)
|
|
|
|
self.assertEqual(len(self.targets_object.target_files), 1)
|
|
self.assertTrue(target_filepath in self.targets_object.target_files)
|
|
|
|
# Test the 'custom' parameter of add_target(), where additional information
|
|
# may be specified for the target.
|
|
target2_filepath = 'file2.txt'
|
|
target2_fullpath = os.path.join(self.targets_directory, target2_filepath)
|
|
|
|
# The file permission of the target (octal number specifying file access
|
|
# for owner, group, others (e.g., 0755).
|
|
octal_file_permissions = oct(os.stat(target2_fullpath).st_mode)[4:]
|
|
custom_file_permissions = {'file_permissions': octal_file_permissions}
|
|
self.targets_object.add_target(target2_filepath, custom_file_permissions)
|
|
|
|
self.assertEqual(len(self.targets_object.target_files), 2)
|
|
self.assertTrue(target2_filepath in self.targets_object.target_files)
|
|
self.assertEqual(self.targets_object.target_files['file2.txt']['custom'],
|
|
custom_file_permissions)
|
|
|
|
# Attempt to replace target that has already been added.
|
|
octal_file_permissions2 = oct(os.stat(target2_fullpath).st_mode)[4:]
|
|
custom_file_permissions2 = {'file_permissions': octal_file_permissions}
|
|
self.targets_object.add_target(target2_filepath, custom_file_permissions2)
|
|
self.assertEqual(self.targets_object.target_files[target2_filepath]['custom'],
|
|
custom_file_permissions2)
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.add_target, 3)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.add_target, 3, custom_file_permissions)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.add_target, target_filepath, 3)
|
|
|
|
# A target path starting with a directory separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.add_target, '/file1.txt')
|
|
|
|
# A target path using a backward slash as a separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.add_target, 'subdir\\file1.txt')
|
|
|
|
# Should not access the file system to check for non-existent files
|
|
self.targets_object.add_target('non-existent')
|
|
|
|
|
|
|
|
def test_add_targets(self):
|
|
# Test normal case.
|
|
# Verify the targets object initially contains zero target files.
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
target1_filepath = 'file1.txt'
|
|
target2_filepath = 'file2.txt'
|
|
target3_filepath = 'file3.txt'
|
|
|
|
# Add a 'target1_filepath' duplicate for testing purposes
|
|
# ('target1_filepath' should not be added twice.)
|
|
target_files = \
|
|
[target1_filepath, target2_filepath, 'file3.txt', target1_filepath]
|
|
self.targets_object.add_targets(target_files)
|
|
|
|
self.assertEqual(len(self.targets_object.target_files), 3)
|
|
self.assertEqual(self.targets_object.target_files,
|
|
{target1_filepath: {}, target2_filepath: {}, target3_filepath: {}})
|
|
|
|
# Attempt to replace targets that has already been added.
|
|
self.targets_object.add_targets(target_files)
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.add_targets, 3)
|
|
|
|
# A target path starting with a directory separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.add_targets, ['/file1.txt'])
|
|
|
|
# A target path using a backward slash as a separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.add_targets, ['subdir\\file1.txt'])
|
|
|
|
# Check if the addition of the whole list is rolled back in case of
|
|
# wrong target path
|
|
target_files = self.targets_object.target_files
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.add_targets, ['file4.txt', '/file5.txt'])
|
|
self.assertEqual(self.targets_object.target_files, target_files)
|
|
|
|
# Should not access the file system to check for non-existent files
|
|
self.targets_object.add_targets(['non-existent'])
|
|
|
|
|
|
def test_remove_target(self):
|
|
# Test normal case.
|
|
# Verify the targets object initially contains zero target files.
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
# Add a target so that remove_target() has something to remove.
|
|
target_filepath = 'file1.txt'
|
|
self.targets_object.add_target(target_filepath)
|
|
|
|
# Test remove_target()'s behavior.
|
|
self.targets_object.remove_target(target_filepath)
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.remove_target, 3)
|
|
|
|
# Test for filepath that hasn't been added yet.
|
|
target5_filepath = 'file5.txt'
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
self.targets_object.remove_target,
|
|
target5_filepath)
|
|
|
|
|
|
|
|
def test_clear_targets(self):
|
|
# Test normal case.
|
|
# Verify the targets object initially contains zero target files.
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
# Add targets, to be tested by clear_targets().
|
|
target1_filepath = 'file1.txt'
|
|
target2_filepath = 'file2.txt'
|
|
self.targets_object.add_targets([target1_filepath, target2_filepath])
|
|
|
|
self.targets_object.clear_targets()
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
|
|
|
|
def test_delegate(self):
|
|
# Test normal case.
|
|
# Need at least one public key and valid target paths required by
|
|
# delegate().
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
|
|
# Set needed arguments by delegate().
|
|
public_keys = [public_key]
|
|
rolename = 'tuf'
|
|
list_of_targets = ['file1.txt', 'file2.txt']
|
|
threshold = 1
|
|
paths = ['*']
|
|
path_hash_prefixes = ['e3a3', '8fae', 'd543']
|
|
|
|
self.targets_object.delegate(rolename, public_keys, paths,
|
|
threshold, terminating=False, list_of_targets=list_of_targets,
|
|
path_hash_prefixes=path_hash_prefixes)
|
|
|
|
self.assertEqual(self.targets_object.get_delegated_rolenames(),
|
|
['tuf'])
|
|
|
|
# Test for delegated paths that do not exist.
|
|
# An exception should not be raised for non-existent delegated paths, since
|
|
# these paths may not necessarily exist when the delegation is done,
|
|
# and also because the delegated paths can be glob patterns.
|
|
self.targets_object.delegate(rolename, public_keys, ['non-existent'],
|
|
threshold, terminating=False, list_of_targets=list_of_targets,
|
|
path_hash_prefixes=path_hash_prefixes)
|
|
|
|
# Test for delegated targets that do not exist.
|
|
# An exception should not be raised for non-existent delegated targets,
|
|
# since at this point the file system should not be accessed yet
|
|
self.targets_object.delegate(rolename, public_keys, [], threshold,
|
|
terminating=False, list_of_targets=['non-existent.txt'],
|
|
path_hash_prefixes=path_hash_prefixes)
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate, 3, public_keys, paths, threshold,
|
|
list_of_targets, path_hash_prefixes)
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate, rolename, 3, paths, threshold,
|
|
list_of_targets, path_hash_prefixes)
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate, rolename, public_keys, 3, threshold,
|
|
list_of_targets, path_hash_prefixes)
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate, rolename, public_keys, paths, '3',
|
|
list_of_targets, path_hash_prefixes)
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate, rolename, public_keys, paths, threshold,
|
|
3, path_hash_prefixes)
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate, rolename, public_keys, paths, threshold,
|
|
list_of_targets, 3)
|
|
|
|
# Test invalid arguments (e.g., already delegated 'rolename', non-existent
|
|
# files, etc.).
|
|
# Test duplicate 'rolename' delegation, which should have been delegated
|
|
# in the normal case above.
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
self.targets_object.delegate, rolename, public_keys, paths, threshold,
|
|
list_of_targets, path_hash_prefixes)
|
|
|
|
# A path or target starting with a directory separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.delegate, rolename, public_keys, ['/*'])
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.delegate, rolename, public_keys, [],
|
|
list_of_targets=['/file1.txt'])
|
|
|
|
# A path or target using '\' as a directory separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.delegate, rolename, public_keys, ['subpath\\*'])
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.delegate, rolename, public_keys, [],
|
|
list_of_targets=['subpath\\file1.txt'])
|
|
|
|
|
|
|
|
|
|
def test_delegate_hashed_bins(self):
|
|
# Test normal case.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
|
|
# Set needed arguments by delegate_hashed_bins().
|
|
public_keys = [public_key]
|
|
list_of_targets = ['file1.txt']
|
|
|
|
|
|
# A helper function to check that the range of prefixes the role is
|
|
# delegated for, specified in path_hash_prefixes, matches the range
|
|
# implied by the bin, or delegation role, name.
|
|
def check_prefixes_match_range():
|
|
roleinfo = tuf.roledb.get_roleinfo(self.targets_object.rolename,
|
|
'test_repository')
|
|
have_prefixes = False
|
|
|
|
for delegated_role in roleinfo['delegations']['roles']:
|
|
if len(delegated_role['path_hash_prefixes']) > 0:
|
|
rolename = delegated_role['name']
|
|
prefixes = delegated_role['path_hash_prefixes']
|
|
have_prefixes = True
|
|
|
|
if len(prefixes) > 1:
|
|
prefix_range = "{}-{}".format(prefixes[0], prefixes[-1])
|
|
else:
|
|
prefix_range = prefixes[0]
|
|
|
|
self.assertEqual(rolename, prefix_range)
|
|
|
|
# We expect at least one delegation with some path_hash_prefixes
|
|
self.assertTrue(have_prefixes)
|
|
|
|
|
|
# Test delegate_hashed_bins() and verify that 16 hashed bins have
|
|
# been delegated in the parent's roleinfo.
|
|
self.targets_object.delegate_hashed_bins(list_of_targets, public_keys,
|
|
number_of_bins=16)
|
|
|
|
# The expected child rolenames, since 'number_of_bins' = 16
|
|
delegated_rolenames = ['0', '1', '2', '3', '4', '5', '6', '7',
|
|
'8', '9', 'a', 'b', 'c', 'd', 'e', 'f']
|
|
|
|
self.assertEqual(sorted(self.targets_object.get_delegated_rolenames()),
|
|
sorted(delegated_rolenames))
|
|
check_prefixes_match_range()
|
|
|
|
# For testing / coverage purposes, try to create delegated bins that
|
|
# hold a range of hash prefixes (e.g., bin name: 000-003).
|
|
self.targets_object.delegate_hashed_bins(list_of_targets, public_keys,
|
|
number_of_bins=512)
|
|
check_prefixes_match_range()
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate_hashed_bins, 3, public_keys,
|
|
number_of_bins=1)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate_hashed_bins,
|
|
list_of_targets, 3, number_of_bins=1)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate_hashed_bins,
|
|
list_of_targets, public_keys, '1')
|
|
|
|
# Test invalid arguments.
|
|
# Invalid number of bins, which must be a power of 2.
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
self.targets_object.delegate_hashed_bins,
|
|
list_of_targets, public_keys, number_of_bins=3)
|
|
|
|
# Invalid 'list_of_targets'.
|
|
# A path or target starting with a directory separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.delegate_hashed_bins,
|
|
['/file1.txt'], public_keys,
|
|
number_of_bins=2)
|
|
|
|
# A path or target using '\' as a directory separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.delegate_hashed_bins,
|
|
['subpath\\file1.txt'], public_keys,
|
|
number_of_bins=2)
|
|
|
|
|
|
def test_add_target_to_bin(self):
|
|
# Test normal case.
|
|
# Delegate the hashed bins so that add_target_to_bin() can be tested.
|
|
repository_name = 'test_repository'
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'targets_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
target1_filepath = 'file1.txt'
|
|
|
|
# Set needed arguments by delegate_hashed_bins().
|
|
public_keys = [public_key]
|
|
|
|
# Delegate to hashed bins. The target filepath to be tested is expected
|
|
# to contain a hash prefix of 'e', and should be available at:
|
|
# repository.targets('e').
|
|
self.targets_object.delegate_hashed_bins([], public_keys,
|
|
number_of_bins=16)
|
|
|
|
# Ensure each hashed bin initially contains zero targets.
|
|
for delegation in self.targets_object.delegations:
|
|
self.assertEqual(delegation.target_files, {})
|
|
|
|
# Add 'target1_filepath' and verify that the relative path of
|
|
# 'target1_filepath' is added to the correct bin.
|
|
rolename = self.targets_object.add_target_to_bin(target1_filepath, 16)
|
|
|
|
for delegation in self.targets_object.delegations:
|
|
if delegation.rolename == rolename:
|
|
self.assertTrue('file1.txt' in delegation.target_files)
|
|
|
|
else:
|
|
self.assertFalse('file1.txt' in delegation.target_files)
|
|
|
|
# Test for non-existent delegations and hashed bins.
|
|
empty_targets_role = repo_tool.Targets(self.targets_directory, 'empty',
|
|
repository_name=repository_name)
|
|
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
empty_targets_role.add_target_to_bin,
|
|
target1_filepath, 16)
|
|
|
|
# Test for a required hashed bin that does not exist.
|
|
self.targets_object.revoke(rolename)
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
self.targets_object.add_target_to_bin,
|
|
target1_filepath, 16)
|
|
|
|
# Test adding a target with fileinfo
|
|
target2_hashes = {'sha256': '517c0ce943e7274a2431fa5751e17cfd5225accd23e479bfaad13007751e87ef'}
|
|
target2_fileinfo = tuf.formats.make_fileinfo(37, target2_hashes)
|
|
target2_filepath = 'file2.txt'
|
|
|
|
rolename = self.targets_object.add_target_to_bin(target2_filepath, 16,
|
|
fileinfo=target2_fileinfo)
|
|
|
|
for delegation in self.targets_object.delegations:
|
|
if delegation.rolename == rolename:
|
|
self.assertTrue(target2_filepath in delegation.target_files)
|
|
|
|
else:
|
|
self.assertFalse(target2_filepath in delegation.target_files)
|
|
|
|
# Test improperly formatted argument.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.add_target_to_bin, 3, 'foo')
|
|
|
|
|
|
|
|
def test_remove_target_from_bin(self):
|
|
# Test normal case.
|
|
# Delegate the hashed bins so that add_target_to_bin() can be tested.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'targets_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
target1_filepath = 'file1.txt'
|
|
|
|
# Set needed arguments by delegate_hashed_bins().
|
|
public_keys = [public_key]
|
|
|
|
# Delegate to hashed bins. The target filepath to be tested is expected
|
|
# to contain a hash prefix of 'e', and can be accessed as:
|
|
# repository.targets('e').
|
|
self.targets_object.delegate_hashed_bins([], public_keys,
|
|
number_of_bins=16)
|
|
|
|
# Ensure each hashed bin initially contains zero targets.
|
|
for delegation in self.targets_object.delegations:
|
|
self.assertEqual(delegation.target_files, {})
|
|
|
|
# Add 'target1_filepath' and verify that the relative path of
|
|
# 'target1_filepath' is added to the correct bin.
|
|
added_rolename = self.targets_object.add_target_to_bin(target1_filepath, 16)
|
|
|
|
for delegation in self.targets_object.delegations:
|
|
if delegation.rolename == added_rolename:
|
|
self.assertTrue('file1.txt' in delegation.target_files)
|
|
self.assertTrue(len(delegation.target_files) == 1)
|
|
else:
|
|
self.assertTrue('file1.txt' not in delegation.target_files)
|
|
|
|
# Test the remove_target_from_bin() method. Verify that 'target1_filepath'
|
|
# has been removed.
|
|
removed_rolename = self.targets_object.remove_target_from_bin(target1_filepath, 16)
|
|
self.assertEqual(added_rolename, removed_rolename)
|
|
|
|
for delegation in self.targets_object.delegations:
|
|
self.assertTrue(target1_filepath not in delegation.target_files)
|
|
|
|
|
|
# Test improperly formatted argument.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.remove_target_from_bin, 3, 'foo')
|
|
|
|
# Invalid target file path argument.
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
self.targets_object.remove_target_from_bin, 'non-existent', 16)
|
|
|
|
|
|
|
|
def test_default_bin_num(self):
|
|
# Test creating, adding to and removing from hashed bins with the default
|
|
# number of bins
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
target1_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
|
|
|
# Set needed arguments by delegate_hashed_bins().
|
|
public_keys = [public_key]
|
|
|
|
# Test default parameters for number_of_bins
|
|
self.targets_object.delegate_hashed_bins([], public_keys)
|
|
|
|
# Ensure each hashed bin initially contains zero targets.
|
|
for delegation in self.targets_object.delegations:
|
|
self.assertEqual(delegation.target_files, {})
|
|
|
|
# Add 'target1_filepath' and verify that the relative path of
|
|
# 'target1_filepath' is added to the correct bin.
|
|
added_rolename = self.targets_object.add_target_to_bin(os.path.basename(target1_filepath))
|
|
|
|
for delegation in self.targets_object.delegations:
|
|
if delegation.rolename == added_rolename:
|
|
self.assertTrue('file1.txt' in delegation.target_files)
|
|
|
|
else:
|
|
self.assertFalse('file1.txt' in delegation.target_files)
|
|
|
|
# Remove target1_filepath and verify that all bins are now empty
|
|
removed_rolename = self.targets_object.remove_target_from_bin(
|
|
os.path.basename(target1_filepath))
|
|
self.assertEqual(added_rolename, removed_rolename)
|
|
|
|
for delegation in self.targets_object.delegations:
|
|
self.assertEqual(delegation.target_files, {})
|
|
|
|
|
|
def test_add_paths(self):
|
|
# Test normal case.
|
|
# Perform a delegation so that add_paths() has a child role to delegate a
|
|
# path to.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
|
|
# Set needed arguments by delegate().
|
|
public_keys = [public_key]
|
|
rolename = 'tuf'
|
|
threshold = 1
|
|
|
|
self.targets_object.delegate(rolename, public_keys, [], threshold,
|
|
list_of_targets=None, path_hash_prefixes=None)
|
|
|
|
# Delegate an extra role for test coverage (i.e., to later verify that
|
|
# delegated paths are not added to a child role that was not requested).
|
|
self.targets_object.delegate('junk_role', public_keys, [])
|
|
|
|
paths = ['tuf_files/*']
|
|
self.targets_object.add_paths(paths, 'tuf')
|
|
|
|
# Retrieve 'targets_object' roleinfo, and verify the roleinfo contains the
|
|
# expected delegated paths of the delegated role.
|
|
targets_object_roleinfo = tuf.roledb.get_roleinfo(self.targets_object.rolename,
|
|
'test_repository')
|
|
|
|
delegated_role = targets_object_roleinfo['delegations']['roles'][0]
|
|
self.assertEqual(['tuf_files/*'], delegated_role['paths'])
|
|
|
|
# Try to add a delegated path that has already been set.
|
|
# add_paths() should simply log a message in this case.
|
|
self.targets_object.add_paths(paths, 'tuf')
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.add_paths, 3, 'tuf')
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.add_paths, paths, 3)
|
|
|
|
|
|
# Test invalid arguments.
|
|
# A non-delegated child role.
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
self.targets_object.add_paths, paths, 'non_delegated_rolename')
|
|
|
|
# A path starting with a directory separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.add_paths, ['/tuf_files/*'], 'tuf')
|
|
|
|
# A path using a backward slash as a separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.add_paths, ['tuf_files\\*'], 'tuf')
|
|
|
|
# add_paths() should not raise an exception for non-existent
|
|
# paths, which it previously did.
|
|
self.targets_object.add_paths(['non-existent'], 'tuf')
|
|
|
|
|
|
|
|
|
|
def test_revoke(self):
|
|
# Test normal case.
|
|
# Perform a delegation so that revoke() has a delegation to revoke.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
|
|
# Set needed arguments by delegate().
|
|
public_keys = [public_key]
|
|
rolename = 'tuf'
|
|
paths = ['file1.txt']
|
|
threshold = 1
|
|
|
|
self.targets_object.delegate(rolename, public_keys, [], threshold, False,
|
|
paths, path_hash_prefixes=None)
|
|
|
|
# Test revoke()
|
|
self.targets_object.revoke('tuf')
|
|
self.assertEqual(self.targets_object.get_delegated_rolenames(), [])
|
|
|
|
|
|
# Test improperly formatted rolename argument.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, self.targets_object.revoke, 3)
|
|
|
|
|
|
|
|
def test_check_path(self):
|
|
# Test that correct path does not raise exception: using '/' as a separator
|
|
# and does not start with a directory separator
|
|
self.targets_object._check_path('file1.txt')
|
|
|
|
# Test that non-existent path does not raise exception (_check_path
|
|
# checks only the path string for compliance)
|
|
self.targets_object._check_path('non-existent.txt')
|
|
self.targets_object._check_path('subdir/non-existent')
|
|
|
|
# Test improperly formatted pathname argument.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object._check_path, 3)
|
|
|
|
# Test invalid pathname
|
|
# Starting with os separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object._check_path, '/file1.txt')
|
|
|
|
# Starting with Windows-style separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object._check_path, '\\file1.txt')
|
|
|
|
# Using Windows-style separator ('\')
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object._check_path, 'subdir\\non-existent')
|
|
|
|
|
|
|
|
class TestRepositoryToolFunctions(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
# setUpClass() is called before tests in an individual class are executed.
|
|
|
|
# Create a temporary directory to store the repository, metadata, and target
|
|
# files. 'temporary_directory' must be deleted in TearDownClass() so that
|
|
# temporary files are always removed, even when exceptions occur.
|
|
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
|
|
|
|
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
|
|
# tearDownModule() is called after all the tests have run.
|
|
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
|
|
|
|
# Remove the temporary repository directory, which should contain all the
|
|
# metadata, targets, and key files generated for the test cases.
|
|
shutil.rmtree(cls.temporary_directory)
|
|
|
|
|
|
|
|
def setUp(self):
|
|
tuf.roledb.create_roledb('test_repository')
|
|
tuf.keydb.create_keydb('test_repository')
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb(clear_all=True)
|
|
tuf.keydb.clear_keydb(clear_all=True)
|
|
|
|
|
|
|
|
def test_create_new_repository(self):
|
|
# Test normal case.
|
|
# Setup the temporary repository directories needed by
|
|
# create_new_repository().
|
|
repository_name = 'test_repository'
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
repository_directory = os.path.join(temporary_directory, 'repository')
|
|
metadata_directory = os.path.join(repository_directory,
|
|
repo_tool.METADATA_STAGED_DIRECTORY_NAME)
|
|
targets_directory = os.path.join(repository_directory,
|
|
repo_tool.TARGETS_DIRECTORY_NAME)
|
|
|
|
repository = repo_tool.create_new_repository(repository_directory,
|
|
repository_name)
|
|
self.assertTrue(isinstance(repository, repo_tool.Repository))
|
|
|
|
# Verify that the 'repository/', 'repository/metadata', and
|
|
# 'repository/targets' directories were created.
|
|
self.assertTrue(os.path.exists(repository_directory))
|
|
self.assertTrue(os.path.exists(metadata_directory))
|
|
self.assertTrue(os.path.exists(targets_directory))
|
|
|
|
# Test that the 'repository' directory is created (along with the other
|
|
# sub-directories) when it does not exist yet. The repository tool creates
|
|
# the non-existent directory.
|
|
shutil.rmtree(repository_directory)
|
|
|
|
repository = repo_tool.create_new_repository(repository_directory,
|
|
repository_name)
|
|
self.assertTrue(isinstance(repository, repo_tool.Repository))
|
|
|
|
# Verify that the 'repository/', 'repository/metadata', and
|
|
# 'repository/targets' directories were created.
|
|
self.assertTrue(os.path.exists(repository_directory))
|
|
self.assertTrue(os.path.exists(metadata_directory))
|
|
self.assertTrue(os.path.exists(targets_directory))
|
|
|
|
# Test for a repository name that doesn't exist yet. Note:
|
|
# The 'test_repository' repository name is created in setup() before this
|
|
# test case is run.
|
|
repository = repo_tool.create_new_repository(repository_directory, 'my-repo')
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
repo_tool.create_new_repository, 3, repository_name)
|
|
|
|
# For testing purposes, try to create a repository directory that
|
|
# fails due to a non-errno.EEXIST exception raised.
|
|
self.assertRaises(securesystemslib.exceptions.StorageError,
|
|
repo_tool.create_new_repository, 'bad' * 2000, repository_name)
|
|
|
|
# Reset the 'repository_directory' so that the metadata and targets
|
|
# directories can be tested likewise.
|
|
repository_directory = os.path.join(temporary_directory, 'repository')
|
|
|
|
# The same test as before, but for the metadata and targets directories.
|
|
original_metadata_staged_directory = \
|
|
tuf.repository_tool.METADATA_STAGED_DIRECTORY_NAME
|
|
tuf.repository_tool.METADATA_STAGED_DIRECTORY_NAME = 'bad' * 2000
|
|
|
|
self.assertRaises(securesystemslib.exceptions.StorageError,
|
|
repo_tool.create_new_repository, repository_directory, repository_name)
|
|
|
|
# Reset metadata staged directory so that the targets directory can be
|
|
# tested...
|
|
tuf.repository_tool.METADATA_STAGED_DIRECTORY_NAME = \
|
|
original_metadata_staged_directory
|
|
|
|
original_targets_directory = tuf.repository_tool.TARGETS_DIRECTORY_NAME
|
|
tuf.repository_tool.TARGETS_DIRECTORY_NAME = 'bad' * 2000
|
|
|
|
self.assertRaises(securesystemslib.exceptions.StorageError,
|
|
repo_tool.create_new_repository, repository_directory, repository_name)
|
|
|
|
tuf.repository_tool.TARGETS_DIRECTORY_NAME = \
|
|
original_targets_directory
|
|
|
|
|
|
|
|
def test_load_repository(self):
|
|
# Test normal case.
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
original_repository_directory = os.path.join('repository_data',
|
|
'repository')
|
|
|
|
repository_directory = os.path.join(temporary_directory, 'repository')
|
|
metadata_directory = os.path.join(repository_directory, 'metadata.staged')
|
|
shutil.copytree(original_repository_directory, repository_directory)
|
|
|
|
# For testing purposes, add a metadata file with an extension that is
|
|
# not supported, and another with invalid JSON content.
|
|
invalid_metadata_file = os.path.join(metadata_directory, 'root.xml')
|
|
root_file = os.path.join(metadata_directory, 'root.json')
|
|
shutil.copyfile(root_file, invalid_metadata_file)
|
|
bad_root_content = os.path.join(metadata_directory, 'root_bad.json')
|
|
|
|
with open(bad_root_content, 'wb') as file_object:
|
|
file_object.write(b'bad')
|
|
|
|
repository = repo_tool.load_repository(repository_directory)
|
|
self.assertTrue(isinstance(repository, repo_tool.Repository))
|
|
self.assertTrue(isinstance(repository.targets('role1'),
|
|
repo_tool.Targets))
|
|
self.assertTrue(isinstance(repository.targets('role1')('role2'),
|
|
repo_tool.Targets))
|
|
|
|
# Verify the expected roles have been loaded. See
|
|
# 'tuf/tests/repository_data/repository/'.
|
|
expected_roles = \
|
|
['root', 'targets', 'snapshot', 'timestamp', 'role1', 'role2']
|
|
for role in tuf.roledb.get_rolenames():
|
|
self.assertTrue(role in expected_roles)
|
|
|
|
self.assertTrue(len(repository.root.keys))
|
|
self.assertTrue(len(repository.targets.keys))
|
|
self.assertTrue(len(repository.snapshot.keys))
|
|
self.assertTrue(len(repository.timestamp.keys))
|
|
self.assertEqual(1, repository.targets('role1').version)
|
|
|
|
# It is assumed that the targets (tuf/tests/repository_data/) role contains
|
|
# 'file1.txt' and 'file2.txt'.
|
|
self.assertTrue('file1.txt' in repository.targets.target_files)
|
|
self.assertTrue('file2.txt' in repository.targets.target_files)
|
|
self.assertTrue('file3.txt' in repository.targets('role1').target_files)
|
|
|
|
# Test if targets file info is loaded correctly: read the JSON metadata
|
|
# files separately and then compare with the loaded repository data.
|
|
targets_path = os.path.join(metadata_directory, 'targets.json')
|
|
role1_path = os.path.join(metadata_directory, 'role1.json')
|
|
|
|
targets_object = securesystemslib.util.load_json_file(targets_path)
|
|
role1_object = securesystemslib.util.load_json_file(role1_path)
|
|
|
|
targets_fileinfo = targets_object['signed']['targets']
|
|
role1_fileinfo = role1_object['signed']['targets']
|
|
|
|
repository = repo_tool.load_repository(repository_directory)
|
|
|
|
self.assertEqual(targets_fileinfo, repository.targets.target_files)
|
|
self.assertEqual(role1_fileinfo, repository.targets('role1').target_files)
|
|
|
|
# Test for a non-default repository name.
|
|
repository = repo_tool.load_repository(repository_directory, 'my-repo')
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
repo_tool.load_repository, 3)
|
|
|
|
|
|
# Test for invalid 'repository_directory' (i.e., does not contain the
|
|
# minimum required metadata.
|
|
root_filepath = os.path.join(repository_directory,
|
|
repo_tool.METADATA_STAGED_DIRECTORY_NAME, 'root.json')
|
|
os.remove(root_filepath)
|
|
self.assertRaises(tuf.exceptions.RepositoryError,
|
|
repo_tool.load_repository, repository_directory)
|
|
|
|
|
|
|
|
def test_dirty_roles(self):
|
|
repository_name = 'test_repository'
|
|
original_repository_directory = os.path.join('repository_data',
|
|
'repository')
|
|
repository = repo_tool.load_repository(original_repository_directory,
|
|
repository_name)
|
|
|
|
# dirty_roles() only logs the list of dirty roles.
|
|
repository.dirty_roles()
|
|
|
|
|
|
|
|
def test_dump_signable_metadata(self):
|
|
metadata_directory = os.path.join('repository_data',
|
|
'repository', 'metadata')
|
|
targets_metadata_file = os.path.join(metadata_directory, 'targets.json')
|
|
|
|
metadata_content = repo_tool.dump_signable_metadata(targets_metadata_file)
|
|
|
|
# Test for an invalid targets metadata file..
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
repo_tool.dump_signable_metadata, 1)
|
|
self.assertRaises(securesystemslib.exceptions.StorageError,
|
|
repo_tool.dump_signable_metadata, 'bad file path')
|
|
|
|
|
|
|
|
def test_append_signature(self):
|
|
metadata_directory = os.path.join('repository_data',
|
|
'repository', 'metadata')
|
|
targets_metadata_path = os.path.join(metadata_directory, 'targets.json')
|
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
tmp_targets_metadata_path = os.path.join(temporary_directory, 'targets.json')
|
|
shutil.copyfile(targets_metadata_path, tmp_targets_metadata_path)
|
|
|
|
# Test for normal case.
|
|
targets_metadata = securesystemslib.util.load_json_file(tmp_targets_metadata_path)
|
|
num_signatures = len(targets_metadata['signatures'])
|
|
signature = targets_metadata['signatures'][0]
|
|
|
|
repo_tool.append_signature(signature, tmp_targets_metadata_path)
|
|
|
|
targets_metadata = securesystemslib.util.load_json_file(tmp_targets_metadata_path)
|
|
self.assertTrue(num_signatures, len(targets_metadata['signatures']))
|
|
|
|
# Test for invalid arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
repo_tool.append_signature, 1, tmp_targets_metadata_path)
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
repo_tool.append_signature, signature, 1)
|
|
|
|
|
|
# Run the test cases.
|
|
if __name__ == '__main__':
|
|
unittest.main()
|