mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Extend test_load_repository to check if targets file info is loaded correctly. Signed-off-by: Teodora Sechkova <tsechkova@vmware.com>
2166 lines
83 KiB
Python
Executable file
2166 lines
83 KiB
Python
Executable file
#!/usr/bin/env python
|
|
|
|
# Copyright 2014 - 2017, New York University and the TUF contributors
|
|
# SPDX-License-Identifier: MIT OR Apache-2.0
|
|
|
|
"""
|
|
<Program Name>
|
|
test_repository_tool.py
|
|
|
|
<Author>
|
|
Vladimir Diaz <vladimir.v.diaz@gmail.com>
|
|
|
|
<Started>
|
|
April 7, 2014.
|
|
|
|
<Copyright>
|
|
See LICENSE-MIT OR LICENSE for licensing information.
|
|
|
|
<Purpose>
|
|
Unit test for 'repository_tool.py'.
|
|
"""
|
|
|
|
# Help with Python 3 compatibility, where the print statement is a function, an
|
|
# implicit relative import is invalid, and the '/' operator performs true
|
|
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
|
|
from __future__ import print_function
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import unicode_literals
|
|
|
|
import os
|
|
import time
|
|
import datetime
|
|
import unittest
|
|
import logging
|
|
import tempfile
|
|
import shutil
|
|
import sys
|
|
import errno
|
|
|
|
import tuf
|
|
import tuf.log
|
|
import tuf.formats
|
|
import tuf.roledb
|
|
import tuf.keydb
|
|
|
|
import tuf.repository_tool as repo_tool
|
|
import securesystemslib.exceptions
|
|
|
|
import securesystemslib
|
|
import securesystemslib.storage
|
|
import six
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
repo_tool.disable_console_log_messages()
|
|
|
|
|
|
class TestRepository(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
# setUpClass() is called before tests in an individual class are executed.
|
|
|
|
# Create a temporary directory to store the repository, metadata, and target
|
|
# files. 'temporary_directory' must be deleted in TearDownClass() so that
|
|
# temporary files are always removed, even when exceptions occur.
|
|
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
|
|
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
|
|
# tearDownModule() is called after all the tests have run.
|
|
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
|
|
|
|
# Remove the temporary repository directory, which should contain all the
|
|
# metadata, targets, and key files generated for the test cases.
|
|
shutil.rmtree(cls.temporary_directory)
|
|
|
|
|
|
|
|
def setUp(self):
|
|
tuf.roledb.clear_roledb(clear_all=True)
|
|
tuf.keydb.clear_keydb(clear_all=True)
|
|
|
|
tuf.roledb.create_roledb('test_repository')
|
|
tuf.keydb.create_keydb('test_repository')
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb(clear_all=True)
|
|
tuf.keydb.clear_keydb(clear_all=True)
|
|
|
|
|
|
def test_init(self):
|
|
# Test normal case.
|
|
repository_name = 'test_repository'
|
|
storage_backend = securesystemslib.storage.FilesystemBackend()
|
|
repository = repo_tool.Repository('repository_directory/',
|
|
'metadata_directory/', 'targets_directory/', storage_backend,
|
|
repository_name)
|
|
self.assertTrue(isinstance(repository.root, repo_tool.Root))
|
|
self.assertTrue(isinstance(repository.snapshot, repo_tool.Snapshot))
|
|
self.assertTrue(isinstance(repository.timestamp, repo_tool.Timestamp))
|
|
self.assertTrue(isinstance(repository.targets, repo_tool.Targets))
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_tool.Repository,
|
|
storage_backend, 3, 'metadata_directory/', 'targets_directory')
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_tool.Repository,
|
|
'repository_directory', storage_backend, 3, 'targets_directory')
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_tool.Repository,
|
|
'repository_directory', 'metadata_directory', 3, storage_backend)
|
|
|
|
|
|
|
|
def create_repository_directory(self):
|
|
# Create a repository directory and copy in test targets data
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
targets_directory = os.path.join(temporary_directory, 'repository',
|
|
repo_tool.TARGETS_DIRECTORY_NAME)
|
|
original_targets_directory = os.path.join('repository_data',
|
|
'repository', 'targets')
|
|
shutil.copytree(original_targets_directory, targets_directory)
|
|
|
|
# In this case, create_new_repository() creates the 'repository/'
|
|
# sub-directory in 'temporary_directory' if it does not exist.
|
|
return os.path.join(temporary_directory, 'repository')
|
|
|
|
|
|
|
|
|
|
def test_writeall(self):
|
|
# Test creation of a TUF repository.
|
|
#
|
|
# 1. Import public and private keys.
|
|
# 2. Add verification keys.
|
|
# 3. Load signing keys.
|
|
# 4. Add target files.
|
|
# 5. Perform delegation.
|
|
# 6. writeall()
|
|
#
|
|
# Copy the target files from 'tuf/tests/repository_data' so that writeall()
|
|
# has target fileinfo to include in metadata.
|
|
repository_name = 'test_repository'
|
|
repository_directory = self.create_repository_directory()
|
|
metadata_directory = os.path.join(repository_directory,
|
|
repo_tool.METADATA_STAGED_DIRECTORY_NAME)
|
|
|
|
repository = repo_tool.create_new_repository(repository_directory, repository_name)
|
|
|
|
# (1) Load the public and private keys of the top-level roles, and one
|
|
# delegated role.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
|
|
# Load the public keys.
|
|
root_pubkey_path = os.path.join(keystore_directory, 'root_key.pub')
|
|
targets_pubkey_path = os.path.join(keystore_directory, 'targets_key.pub')
|
|
snapshot_pubkey_path = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
timestamp_pubkey_path = os.path.join(keystore_directory, 'timestamp_key.pub')
|
|
role1_pubkey_path = os.path.join(keystore_directory, 'delegation_key.pub')
|
|
|
|
root_pubkey = repo_tool.import_rsa_publickey_from_file(root_pubkey_path)
|
|
targets_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(targets_pubkey_path)
|
|
snapshot_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(snapshot_pubkey_path)
|
|
timestamp_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(timestamp_pubkey_path)
|
|
role1_pubkey = repo_tool.import_ed25519_publickey_from_file(role1_pubkey_path)
|
|
|
|
# Load the private keys.
|
|
root_privkey_path = os.path.join(keystore_directory, 'root_key')
|
|
targets_privkey_path = os.path.join(keystore_directory, 'targets_key')
|
|
snapshot_privkey_path = os.path.join(keystore_directory, 'snapshot_key')
|
|
timestamp_privkey_path = os.path.join(keystore_directory, 'timestamp_key')
|
|
role1_privkey_path = os.path.join(keystore_directory, 'delegation_key')
|
|
|
|
root_privkey = \
|
|
repo_tool.import_rsa_privatekey_from_file(root_privkey_path, 'password')
|
|
targets_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(targets_privkey_path,
|
|
'password')
|
|
snapshot_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(snapshot_privkey_path,
|
|
'password')
|
|
timestamp_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(timestamp_privkey_path,
|
|
'password')
|
|
role1_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(role1_privkey_path,
|
|
'password')
|
|
|
|
|
|
# (2) Add top-level verification keys.
|
|
repository.root.add_verification_key(root_pubkey)
|
|
repository.targets.add_verification_key(targets_pubkey)
|
|
repository.snapshot.add_verification_key(snapshot_pubkey)
|
|
|
|
# Verify that repository.writeall() fails for insufficient threshold
|
|
# of signatures (default threshold = 1).
|
|
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall)
|
|
|
|
repository.timestamp.add_verification_key(timestamp_pubkey)
|
|
|
|
|
|
# (3) Load top-level signing keys.
|
|
repository.status()
|
|
repository.root.load_signing_key(root_privkey)
|
|
repository.status()
|
|
repository.targets.load_signing_key(targets_privkey)
|
|
repository.status()
|
|
repository.snapshot.load_signing_key(snapshot_privkey)
|
|
repository.status()
|
|
|
|
# Verify that repository.writeall() fails for insufficient threshold
|
|
# of signatures (default threshold = 1).
|
|
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall)
|
|
|
|
repository.timestamp.load_signing_key(timestamp_privkey)
|
|
|
|
|
|
# (4) Add target files.
|
|
target1 = 'file1.txt'
|
|
target2 = 'file2.txt'
|
|
target3 = 'file3.txt'
|
|
repository.targets.add_target(target1)
|
|
repository.targets.add_target(target2)
|
|
|
|
# (5) Perform delegation.
|
|
repository.targets.delegate('role1', [role1_pubkey], [target3])
|
|
repository.targets('role1').load_signing_key(role1_privkey)
|
|
|
|
# (6) Write repository.
|
|
repository.writeall()
|
|
|
|
# Verify that the expected metadata is written.
|
|
for role in ['root.json', 'targets.json', 'snapshot.json', 'timestamp.json']:
|
|
role_filepath = os.path.join(metadata_directory, role)
|
|
role_signable = securesystemslib.util.load_json_file(role_filepath)
|
|
|
|
# Raise 'securesystemslib.exceptions.FormatError' if 'role_signable' is
|
|
# an invalid signable.
|
|
tuf.formats.check_signable_object_format(role_signable)
|
|
|
|
self.assertTrue(os.path.exists(role_filepath))
|
|
|
|
# Verify the 'role1.json' delegation is also written.
|
|
role1_filepath = os.path.join(metadata_directory, 'role1.json')
|
|
role1_signable = securesystemslib.util.load_json_file(role1_filepath)
|
|
tuf.formats.check_signable_object_format(role1_signable)
|
|
|
|
# Verify that an exception is *not* raised for multiple
|
|
# repository.writeall().
|
|
repository.writeall()
|
|
|
|
# Verify that status() does not raise an exception.
|
|
repository.status()
|
|
|
|
# Verify that status() does not raise
|
|
# 'tuf.exceptions.InsufficientKeysError' if a top-level role
|
|
# does not contain a threshold of keys.
|
|
targets_roleinfo = tuf.roledb.get_roleinfo('targets', repository_name)
|
|
old_threshold = targets_roleinfo['threshold']
|
|
targets_roleinfo['threshold'] = 10
|
|
tuf.roledb.update_roleinfo('targets', targets_roleinfo,
|
|
repository_name=repository_name)
|
|
repository.status()
|
|
|
|
# Restore the original threshold values.
|
|
targets_roleinfo = tuf.roledb.get_roleinfo('targets', repository_name)
|
|
targets_roleinfo['threshold'] = old_threshold
|
|
tuf.roledb.update_roleinfo('targets', targets_roleinfo,
|
|
repository_name=repository_name)
|
|
|
|
# Verify that status() does not raise
|
|
# 'tuf.exceptions.InsufficientKeysError' if a delegated role
|
|
# does not contain a threshold of keys.
|
|
role1_roleinfo = tuf.roledb.get_roleinfo('role1', repository_name)
|
|
old_role1_threshold = role1_roleinfo['threshold']
|
|
role1_roleinfo['threshold'] = 10
|
|
tuf.roledb.update_roleinfo('role1', role1_roleinfo,
|
|
repository_name=repository_name)
|
|
repository.status()
|
|
|
|
# Restore role1's threshold.
|
|
role1_roleinfo = tuf.roledb.get_roleinfo('role1', repository_name)
|
|
role1_roleinfo['threshold'] = old_role1_threshold
|
|
tuf.roledb.update_roleinfo('role1', role1_roleinfo,
|
|
repository_name=repository_name)
|
|
|
|
# Verify status() does not raise 'tuf.exceptions.UnsignedMetadataError' if any of the
|
|
# the top-level roles. Test that 'root' is improperly signed.
|
|
repository.root.unload_signing_key(root_privkey)
|
|
repository.root.load_signing_key(targets_privkey)
|
|
repository.status()
|
|
|
|
repository.targets('role1').unload_signing_key(role1_privkey)
|
|
repository.targets('role1').load_signing_key(targets_privkey)
|
|
repository.status()
|
|
|
|
# Reset Root and 'role1', and verify Targets.
|
|
repository.root.unload_signing_key(targets_privkey)
|
|
repository.root.load_signing_key(root_privkey)
|
|
repository.targets('role1').unload_signing_key(targets_privkey)
|
|
repository.targets('role1').load_signing_key(role1_privkey)
|
|
repository.targets.unload_signing_key(targets_privkey)
|
|
repository.targets.load_signing_key(snapshot_privkey)
|
|
repository.status()
|
|
|
|
# Reset Targets and verify Snapshot.
|
|
repository.targets.unload_signing_key(snapshot_privkey)
|
|
repository.targets.load_signing_key(targets_privkey)
|
|
repository.snapshot.unload_signing_key(snapshot_privkey)
|
|
repository.snapshot.load_signing_key(timestamp_privkey)
|
|
repository.status()
|
|
|
|
# Reset Snapshot and verify timestamp.
|
|
repository.snapshot.unload_signing_key(timestamp_privkey)
|
|
repository.snapshot.load_signing_key(snapshot_privkey)
|
|
repository.timestamp.unload_signing_key(timestamp_privkey)
|
|
repository.timestamp.load_signing_key(root_privkey)
|
|
repository.status()
|
|
|
|
# Reset Timestamp
|
|
repository.timestamp.unload_signing_key(root_privkey)
|
|
repository.timestamp.load_signing_key(timestamp_privkey)
|
|
|
|
# Verify that a writeall() fails if a repository is loaded and a change
|
|
# is made to a role.
|
|
repo_tool.load_repository(repository_directory, repository_name)
|
|
|
|
repository.timestamp.expiration = datetime.datetime(2030, 1, 1, 12, 0)
|
|
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall)
|
|
|
|
# Load the required Timestamp key so that a valid repository can be written.
|
|
repository.timestamp.load_signing_key(timestamp_privkey)
|
|
repository.writeall()
|
|
|
|
# Test creation of a consistent snapshot repository. Writing a consistent
|
|
# snapshot modifies the Root metadata, which specifies whether a repository
|
|
# supports consistent snapshot. Verify that an exception is raised due to
|
|
# the missing signature of Root.
|
|
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall, True)
|
|
|
|
# Make sure the private keys of Root (new version required since Root will
|
|
# change to enable consistent snapshot), Snapshot, role1, and timestamp
|
|
# loaded before writing consistent snapshot.
|
|
repository.root.load_signing_key(root_privkey)
|
|
repository.snapshot.load_signing_key(snapshot_privkey)
|
|
# Must also load targets signing key, because targets is re-signed when
|
|
# updating 'role1'.
|
|
repository.targets.load_signing_key(targets_privkey)
|
|
repository.targets('role1').load_signing_key(role1_privkey)
|
|
|
|
# Verify that a consistent snapshot can be written and loaded. The roles
|
|
# above must be marked as dirty, otherwise writeall() will not create a
|
|
# consistent snapshot for them.
|
|
repository.mark_dirty(['role1', 'targets', 'root', 'snapshot', 'timestamp'])
|
|
repository.writeall(consistent_snapshot=True)
|
|
|
|
# Verify that the newly written consistent snapshot can be loaded
|
|
# successfully.
|
|
repo_tool.load_repository(repository_directory, repository_name)
|
|
|
|
# Verify the behavior of marking and unmarking roles as dirty.
|
|
# We begin by ensuring that writeall() cleared the list of dirty roles..
|
|
self.assertEqual([], tuf.roledb.get_dirty_roles(repository_name))
|
|
|
|
repository.mark_dirty(['root', 'timestamp'])
|
|
self.assertEqual(['root', 'timestamp'], tuf.roledb.get_dirty_roles(repository_name))
|
|
repository.unmark_dirty(['root'])
|
|
self.assertEqual(['timestamp'], tuf.roledb.get_dirty_roles(repository_name))
|
|
|
|
# Ensure status() does not leave behind any dirty roles.
|
|
repository.status()
|
|
self.assertEqual(['timestamp'], tuf.roledb.get_dirty_roles(repository_name))
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repository.writeall, 3)
|
|
|
|
|
|
def test_writeall_no_files(self):
|
|
# Test writeall() when using pre-supplied fileinfo
|
|
|
|
repository_name = 'test_repository'
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
repository_directory = os.path.join(temporary_directory, 'repository')
|
|
targets_directory = os.path.join(repository_directory,
|
|
repo_tool.TARGETS_DIRECTORY_NAME)
|
|
|
|
repository = repo_tool.create_new_repository(repository_directory, repository_name)
|
|
|
|
# (1) Load the public and private keys of the top-level roles, and one
|
|
# delegated role.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
|
|
# Load the public keys.
|
|
root_pubkey_path = os.path.join(keystore_directory, 'root_key.pub')
|
|
targets_pubkey_path = os.path.join(keystore_directory, 'targets_key.pub')
|
|
snapshot_pubkey_path = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
timestamp_pubkey_path = os.path.join(keystore_directory, 'timestamp_key.pub')
|
|
|
|
root_pubkey = repo_tool.import_rsa_publickey_from_file(root_pubkey_path)
|
|
targets_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(targets_pubkey_path)
|
|
snapshot_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(snapshot_pubkey_path)
|
|
timestamp_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(timestamp_pubkey_path)
|
|
|
|
# Load the private keys.
|
|
root_privkey_path = os.path.join(keystore_directory, 'root_key')
|
|
targets_privkey_path = os.path.join(keystore_directory, 'targets_key')
|
|
snapshot_privkey_path = os.path.join(keystore_directory, 'snapshot_key')
|
|
timestamp_privkey_path = os.path.join(keystore_directory, 'timestamp_key')
|
|
|
|
root_privkey = \
|
|
repo_tool.import_rsa_privatekey_from_file(root_privkey_path, 'password')
|
|
targets_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(targets_privkey_path,
|
|
'password')
|
|
snapshot_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(snapshot_privkey_path,
|
|
'password')
|
|
timestamp_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(timestamp_privkey_path,
|
|
'password')
|
|
|
|
|
|
# (2) Add top-level verification keys.
|
|
repository.root.add_verification_key(root_pubkey)
|
|
repository.targets.add_verification_key(targets_pubkey)
|
|
repository.snapshot.add_verification_key(snapshot_pubkey)
|
|
|
|
# Verify that repository.writeall() fails for insufficient threshold
|
|
# of signatures (default threshold = 1).
|
|
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall)
|
|
|
|
repository.timestamp.add_verification_key(timestamp_pubkey)
|
|
|
|
|
|
# (3) Load top-level signing keys.
|
|
repository.status()
|
|
repository.root.load_signing_key(root_privkey)
|
|
repository.status()
|
|
repository.targets.load_signing_key(targets_privkey)
|
|
repository.status()
|
|
repository.snapshot.load_signing_key(snapshot_privkey)
|
|
repository.status()
|
|
|
|
# Verify that repository.writeall() fails for insufficient threshold
|
|
# of signatures (default threshold = 1).
|
|
self.assertRaises(tuf.exceptions.UnsignedMetadataError, repository.writeall)
|
|
|
|
repository.timestamp.load_signing_key(timestamp_privkey)
|
|
|
|
# Add target fileinfo
|
|
target1_hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'}
|
|
target2_hashes = {'sha256': '517c0ce943e7274a2431fa5751e17cfd5225accd23e479bfaad13007751e87ef'}
|
|
target1_fileinfo = tuf.formats.make_fileinfo(555, target1_hashes)
|
|
target2_fileinfo = tuf.formats.make_fileinfo(37, target2_hashes)
|
|
target1 = 'file1.txt'
|
|
target2 = 'file2.txt'
|
|
repository.targets.add_target(target1, fileinfo=target1_fileinfo)
|
|
repository.targets.add_target(target2, fileinfo=target2_fileinfo)
|
|
|
|
repository.writeall(use_existing_fileinfo=True)
|
|
|
|
# Verify that the expected metadata is written.
|
|
metadata_directory = os.path.join(repository_directory,
|
|
repo_tool.METADATA_STAGED_DIRECTORY_NAME)
|
|
|
|
for role in ['root.json', 'targets.json', 'snapshot.json', 'timestamp.json']:
|
|
role_filepath = os.path.join(metadata_directory, role)
|
|
role_signable = securesystemslib.util.load_json_file(role_filepath)
|
|
|
|
# Raise 'securesystemslib.exceptions.FormatError' if 'role_signable' is
|
|
# an invalid signable.
|
|
tuf.formats.check_signable_object_format(role_signable)
|
|
|
|
self.assertTrue(os.path.exists(role_filepath))
|
|
|
|
|
|
|
|
def test_get_filepaths_in_directory(self):
|
|
# Test normal case.
|
|
# Use the pre-generated metadata directory for testing.
|
|
# Set 'repo' reference to improve readability.
|
|
repo = repo_tool.Repository
|
|
metadata_directory = os.path.join('repository_data',
|
|
'repository', 'metadata')
|
|
|
|
# Verify the expected filenames. get_filepaths_in_directory() returns
|
|
# a list of absolute paths.
|
|
metadata_files = repo.get_filepaths_in_directory(metadata_directory)
|
|
|
|
# Construct list of file paths expected, determining absolute paths.
|
|
expected_files = []
|
|
for filepath in ['1.root.json', 'root.json', 'targets.json',
|
|
'snapshot.json', 'timestamp.json', 'role1.json', 'role2.json']:
|
|
expected_files.append(os.path.abspath(os.path.join(
|
|
'repository_data', 'repository', 'metadata', filepath)))
|
|
|
|
self.assertEqual(sorted(expected_files), sorted(metadata_files))
|
|
|
|
|
|
# Test when the 'recursive_walk' argument is True.
|
|
# In this case, recursive walk should yield the same results as the
|
|
# previous, non-recursive call.
|
|
metadata_files = repo.get_filepaths_in_directory(metadata_directory,
|
|
recursive_walk=True)
|
|
self.assertEqual(sorted(expected_files), sorted(metadata_files))
|
|
|
|
# And this recursive call from the directory above should yield the same
|
|
# results as well, plus extra files.
|
|
metadata_files = repo.get_filepaths_in_directory(
|
|
os.path.join('repository_data', 'repository'), recursive_walk=True)
|
|
for expected_file in expected_files:
|
|
self.assertIn(expected_file, metadata_files)
|
|
# self.assertEqual(sorted(expected_files), sorted(metadata_files))
|
|
|
|
# Now let's check it against the full list of expected files for the parent
|
|
# directory.... We'll add to the existing list. Expect the same files in
|
|
# metadata.staged/ as in metadata/, and a few target files in targets/
|
|
# This is somewhat redundant with the previous test, but together they're
|
|
# probably more future-proof.
|
|
for filepath in ['file1.txt', 'file2.txt', 'file3.txt']:
|
|
expected_files.append(os.path.abspath(os.path.join(
|
|
'repository_data', 'repository', 'targets', filepath)))
|
|
for filepath in [ '1.root.json', 'root.json', 'targets.json',
|
|
'snapshot.json', 'timestamp.json', 'role1.json', 'role2.json']:
|
|
expected_files.append(os.path.abspath(os.path.join(
|
|
'repository_data', 'repository', 'metadata.staged', filepath)))
|
|
|
|
self.assertEqual(sorted(expected_files), sorted(metadata_files))
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo.get_filepaths_in_directory,
|
|
3, recursive_walk=False, followlinks=False)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo.get_filepaths_in_directory,
|
|
metadata_directory, 3, followlinks=False)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo.get_filepaths_in_directory,
|
|
metadata_directory, recursive_walk=False, followlinks=3)
|
|
|
|
# Test invalid directory argument.
|
|
# A non-directory.
|
|
self.assertRaises(securesystemslib.exceptions.Error, repo.get_filepaths_in_directory,
|
|
os.path.join(metadata_directory, 'root.json'))
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
nonexistent_directory = os.path.join(temporary_directory, 'nonexistent/')
|
|
self.assertRaises(securesystemslib.exceptions.Error, repo.get_filepaths_in_directory,
|
|
nonexistent_directory, recursive_walk=False,
|
|
followlinks=False)
|
|
|
|
|
|
|
|
def test_writeall_abstract_storage(self):
|
|
# Test creation of a TUF repository with a custom storage backend to ensure
|
|
# that functions relying on a storage backend being supplied operate
|
|
# correctly
|
|
|
|
|
|
class TestStorageBackend(securesystemslib.storage.StorageBackendInterface):
|
|
"""
|
|
An implementation of securesystemslib.storage.StorageBackendInterface
|
|
which mutates filenames on put()/get(), translating filename in memory
|
|
to filename + '.tst' on-disk, such that trying to read the
|
|
expected/canonical file paths from local storage doesn't find the TUF
|
|
metadata files.
|
|
"""
|
|
|
|
from contextlib import contextmanager
|
|
|
|
|
|
@contextmanager
|
|
def get(self, filepath):
|
|
file_object = open(filepath + '.tst', 'rb')
|
|
yield file_object
|
|
file_object.close()
|
|
|
|
|
|
def put(self, fileobj, filepath):
|
|
if not fileobj.closed:
|
|
fileobj.seek(0)
|
|
|
|
with open(filepath + '.tst', 'wb') as destination_file:
|
|
shutil.copyfileobj(fileobj, destination_file)
|
|
destination_file.flush()
|
|
os.fsync(destination_file.fileno())
|
|
|
|
|
|
def remove(self, filepath):
|
|
os.remove(filepath + '.tst')
|
|
|
|
|
|
def getsize(self, filepath):
|
|
return os.path.getsize(filepath + '.tst')
|
|
|
|
|
|
def create_folder(self, filepath):
|
|
if not filepath:
|
|
return
|
|
try:
|
|
os.makedirs(filepath)
|
|
except OSError as err:
|
|
pass
|
|
|
|
|
|
def list_folder(self, filepath):
|
|
contents = []
|
|
files = os.listdir(filepath)
|
|
|
|
for fi in files:
|
|
if fi.endswith('.tst'):
|
|
contents.append(fi.split('.tst')[0])
|
|
else:
|
|
contents.append(fi)
|
|
|
|
return contents
|
|
|
|
|
|
|
|
# Set up the repository directory
|
|
repository_name = 'test_repository'
|
|
repository_directory = self.create_repository_directory()
|
|
metadata_directory = os.path.join(repository_directory,
|
|
repo_tool.METADATA_STAGED_DIRECTORY_NAME)
|
|
targets_directory = os.path.join(repository_directory,
|
|
repo_tool.TARGETS_DIRECTORY_NAME)
|
|
|
|
# TestStorageBackend expects all files on disk to have an additional '.tst'
|
|
# file extension
|
|
for target in os.listdir(targets_directory):
|
|
src = os.path.join(targets_directory, target)
|
|
dst = os.path.join(targets_directory, target + '.tst')
|
|
os.rename(src, dst)
|
|
|
|
# (0) Create a repository with TestStorageBackend()
|
|
storage_backend = TestStorageBackend()
|
|
repository = repo_tool.create_new_repository(repository_directory,
|
|
repository_name,
|
|
storage_backend)
|
|
|
|
# (1) Load the public and private keys of the top-level roles, and one
|
|
# delegated role.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
|
|
# Load the public keys.
|
|
root_pubkey_path = os.path.join(keystore_directory, 'root_key.pub')
|
|
targets_pubkey_path = os.path.join(keystore_directory, 'targets_key.pub')
|
|
snapshot_pubkey_path = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
timestamp_pubkey_path = os.path.join(keystore_directory, 'timestamp_key.pub')
|
|
|
|
root_pubkey = repo_tool.import_rsa_publickey_from_file(root_pubkey_path)
|
|
targets_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(targets_pubkey_path)
|
|
snapshot_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(snapshot_pubkey_path)
|
|
timestamp_pubkey = \
|
|
repo_tool.import_ed25519_publickey_from_file(timestamp_pubkey_path)
|
|
|
|
# Load the private keys.
|
|
root_privkey_path = os.path.join(keystore_directory, 'root_key')
|
|
targets_privkey_path = os.path.join(keystore_directory, 'targets_key')
|
|
snapshot_privkey_path = os.path.join(keystore_directory, 'snapshot_key')
|
|
timestamp_privkey_path = os.path.join(keystore_directory, 'timestamp_key')
|
|
|
|
root_privkey = \
|
|
repo_tool.import_rsa_privatekey_from_file(root_privkey_path, 'password')
|
|
targets_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(targets_privkey_path,
|
|
'password')
|
|
snapshot_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(snapshot_privkey_path,
|
|
'password')
|
|
timestamp_privkey = \
|
|
repo_tool.import_ed25519_privatekey_from_file(timestamp_privkey_path,
|
|
'password')
|
|
|
|
|
|
# (2) Add top-level verification keys.
|
|
repository.root.add_verification_key(root_pubkey)
|
|
repository.targets.add_verification_key(targets_pubkey)
|
|
repository.snapshot.add_verification_key(snapshot_pubkey)
|
|
repository.timestamp.add_verification_key(timestamp_pubkey)
|
|
|
|
|
|
# (3) Load top-level signing keys.
|
|
repository.root.load_signing_key(root_privkey)
|
|
repository.targets.load_signing_key(targets_privkey)
|
|
repository.snapshot.load_signing_key(snapshot_privkey)
|
|
repository.timestamp.load_signing_key(timestamp_privkey)
|
|
|
|
|
|
# (4) Add target files.
|
|
target1 = 'file1.txt'
|
|
target2 = 'file2.txt'
|
|
target3 = 'file3.txt'
|
|
repository.targets.add_target(target1)
|
|
repository.targets.add_target(target2)
|
|
repository.targets.add_target(target3)
|
|
|
|
# (6) Write repository.
|
|
repository.writeall()
|
|
|
|
|
|
# Ensure all of the metadata files exist at the mutated file location and
|
|
# that those files are valid metadata
|
|
for role in ['root.json.tst', 'targets.json.tst', 'snapshot.json.tst',
|
|
'timestamp.json.tst']:
|
|
role_filepath = os.path.join(metadata_directory, role)
|
|
self.assertTrue(os.path.exists(role_filepath))
|
|
|
|
role_signable = securesystemslib.util.load_json_file(role_filepath)
|
|
# Raise 'securesystemslib.exceptions.FormatError' if 'role_signable' is
|
|
# an invalid signable.
|
|
tuf.formats.check_signable_object_format(role_signable)
|
|
|
|
|
|
|
|
|
|
|
|
class TestMetadata(unittest.TestCase):
|
|
def setUp(self):
|
|
# Inherit from the repo_tool.Metadata() base class. All of the methods
|
|
# to be tested in TestMetadata require at least 1 role, so create it here
|
|
# and set its roleinfo.
|
|
|
|
tuf.roledb.create_roledb('test_repository')
|
|
tuf.keydb.create_keydb('test_repository')
|
|
|
|
class MetadataRole(repo_tool.Metadata):
|
|
def __init__(self):
|
|
super(MetadataRole, self).__init__()
|
|
|
|
self._rolename = 'metadata_role'
|
|
self._repository_name = 'test_repository'
|
|
|
|
# Expire in 86400 seconds (1 day).
|
|
expiration = \
|
|
tuf.formats.unix_timestamp_to_datetime(int(time.time() + 86400))
|
|
expiration = expiration.isoformat() + 'Z'
|
|
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
|
|
'signatures': [], 'version': 0,
|
|
'consistent_snapshot': False,
|
|
'expires': expiration,
|
|
'partial_loaded': False}
|
|
|
|
tuf.roledb.add_role(self._rolename, roleinfo,
|
|
repository_name='test_repository')
|
|
|
|
self.metadata = MetadataRole()
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb(clear_all=True)
|
|
tuf.keydb.clear_keydb(clear_all=True)
|
|
self.metadata = None
|
|
|
|
|
|
|
|
def test_rolename(self):
|
|
base_metadata = repo_tool.Metadata()
|
|
|
|
self.assertEqual(base_metadata.rolename, None)
|
|
|
|
# Test the sub-classed MetadataRole().
|
|
self.assertEqual(self.metadata.rolename, 'metadata_role')
|
|
|
|
|
|
|
|
def test_version(self):
|
|
# Test version getter, and the default version number.
|
|
self.assertEqual(self.metadata.version, 0)
|
|
|
|
# Test version setter, and verify updated version number.
|
|
self.metadata.version = 8
|
|
self.assertEqual(self.metadata.version, 8)
|
|
|
|
|
|
|
|
def test_threshold(self):
|
|
# Test threshold getter, and the default threshold number.
|
|
self.assertEqual(self.metadata.threshold, 1)
|
|
|
|
# Test threshold setter, and verify updated threshold number.
|
|
self.metadata.threshold = 3
|
|
self.assertEqual(self.metadata.threshold, 3)
|
|
|
|
|
|
|
|
def test_expiration(self):
|
|
# Test expiration getter.
|
|
expiration = self.metadata.expiration
|
|
self.assertTrue(isinstance(expiration, datetime.datetime))
|
|
|
|
# Test expiration setter.
|
|
self.metadata.expiration = datetime.datetime(2030, 1, 1, 12, 0)
|
|
expiration = self.metadata.expiration
|
|
self.assertTrue(isinstance(expiration, datetime.datetime))
|
|
|
|
# test a setter with microseconds, we are forcing the microseconds value
|
|
expiration = datetime.datetime.today() + datetime.timedelta(weeks = 1)
|
|
# we force the microseconds value if we are unlucky enough to get a 0
|
|
if expiration.microsecond == 0:
|
|
expiration = expiration.replace(microsecond = 1)
|
|
|
|
new_expiration = self.metadata.expiration
|
|
self.assertTrue(isinstance(new_expiration, datetime.datetime))
|
|
|
|
# check that the expiration value is truncated
|
|
self.assertTrue(new_expiration.microsecond == 0)
|
|
|
|
# Test improperly formatted datetime.
|
|
try:
|
|
self.metadata.expiration = '3'
|
|
|
|
except securesystemslib.exceptions.FormatError:
|
|
pass
|
|
|
|
else:
|
|
self.fail('Setter failed to detect improperly formatted datetime.')
|
|
|
|
|
|
# Test invalid argument (i.e., expiration has already expired.)
|
|
expired_datetime = tuf.formats.unix_timestamp_to_datetime(int(time.time() - 1))
|
|
try:
|
|
self.metadata.expiration = expired_datetime
|
|
|
|
except securesystemslib.exceptions.Error:
|
|
pass
|
|
|
|
else:
|
|
self.fail('Setter failed to detect an expired datetime.')
|
|
|
|
|
|
|
|
def test_keys(self):
|
|
# Test default case, where a verification key has not been added.
|
|
self.assertEqual(self.metadata.keys, [])
|
|
|
|
|
|
# Test keys() getter after a verification key has been loaded.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'snapshot_key.pub')
|
|
key_object = repo_tool.import_ed25519_publickey_from_file(key_path)
|
|
self.metadata.add_verification_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.keys)
|
|
|
|
|
|
|
|
def test_signing_keys(self):
|
|
# Test default case, where a signing key has not been added.
|
|
self.assertEqual(self.metadata.signing_keys, [])
|
|
|
|
|
|
# Test signing_keys() getter after a signing key has been loaded.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'root_key')
|
|
key_object = repo_tool.import_rsa_privatekey_from_file(key_path, 'password')
|
|
self.metadata.load_signing_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.signing_keys)
|
|
|
|
|
|
|
|
|
|
|
|
def test_add_verification_key(self):
|
|
# Add verification key and verify that it was added via (role).keys.
|
|
key_path = os.path.join('repository_data', 'keystore', 'snapshot_key.pub')
|
|
key_object = repo_tool.import_ed25519_publickey_from_file(key_path)
|
|
self.metadata.add_verification_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.keys)
|
|
|
|
expiration = \
|
|
tuf.formats.unix_timestamp_to_datetime(int(time.time() + 86400))
|
|
expiration = expiration.isoformat() + 'Z'
|
|
roleinfo = {'keyids': [], 'signing_keyids': [], 'threshold': 1,
|
|
'signatures': [], 'version': 0,
|
|
'consistent_snapshot': False, 'expires': expiration,
|
|
'partial_loaded': False}
|
|
|
|
tuf.roledb.add_role('Root', roleinfo, 'test_repository')
|
|
tuf.roledb.add_role('Targets', roleinfo, 'test_repository')
|
|
tuf.roledb.add_role('Snapshot', roleinfo, 'test_repository')
|
|
tuf.roledb.add_role('Timestamp', roleinfo, 'test_repository')
|
|
|
|
# Test for different top-level role names.
|
|
self.metadata._rolename = 'Targets'
|
|
self.metadata.add_verification_key(key_object)
|
|
self.metadata._rolename = 'Snapshot'
|
|
self.metadata.add_verification_key(key_object)
|
|
self.metadata._rolename = 'Timestamp'
|
|
self.metadata.add_verification_key(key_object)
|
|
|
|
# Test for a given 'expires' argument.
|
|
expires = datetime.datetime(2030, 1, 1, 12, 0)
|
|
self.metadata.add_verification_key(key_object, expires)
|
|
|
|
|
|
# Test for an expired 'expires'.
|
|
expired = datetime.datetime(1984, 1, 1, 12, 0)
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
self.metadata.add_verification_key, key_object, expired)
|
|
|
|
# Test improperly formatted key argument.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, self.metadata.add_verification_key, 3)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, self.metadata.add_verification_key, key_object, 3)
|
|
|
|
|
|
|
|
def test_remove_verification_key(self):
|
|
# Add verification key so that remove_verifiation_key() can be tested.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'snapshot_key.pub')
|
|
key_object = repo_tool.import_ed25519_publickey_from_file(key_path)
|
|
self.metadata.add_verification_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.keys)
|
|
|
|
|
|
# Test successful removal of verification key added above.
|
|
self.metadata.remove_verification_key(key_object)
|
|
self.assertEqual(self.metadata.keys, [])
|
|
|
|
|
|
# Test improperly formatted argument
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, self.metadata.remove_verification_key, 3)
|
|
|
|
|
|
# Test non-existent public key argument.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'targets_key.pub')
|
|
unused_key_object = repo_tool.import_ed25519_publickey_from_file(key_path)
|
|
|
|
self.assertRaises(securesystemslib.exceptions.Error, self.metadata.remove_verification_key,
|
|
unused_key_object)
|
|
|
|
|
|
|
|
def test_load_signing_key(self):
|
|
# Test normal case.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'snapshot_key')
|
|
key_object = repo_tool.import_ed25519_privatekey_from_file(key_path, 'password')
|
|
self.metadata.load_signing_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.signing_keys)
|
|
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, self.metadata.load_signing_key, 3)
|
|
|
|
|
|
# Test non-private key.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'snapshot_key.pub')
|
|
key_object = repo_tool.import_ed25519_publickey_from_file(key_path)
|
|
self.assertRaises(securesystemslib.exceptions.Error, self.metadata.load_signing_key, key_object)
|
|
|
|
|
|
|
|
def test_unload_signing_key(self):
|
|
# Load a signing key so that unload_signing_key() can have a key to unload.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'snapshot_key')
|
|
key_object = repo_tool.import_ed25519_privatekey_from_file(key_path, 'password')
|
|
self.metadata.load_signing_key(key_object)
|
|
|
|
keyid = key_object['keyid']
|
|
self.assertEqual([keyid], self.metadata.signing_keys)
|
|
|
|
self.metadata.unload_signing_key(key_object)
|
|
|
|
self.assertEqual(self.metadata.signing_keys, [])
|
|
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, self.metadata.unload_signing_key, 3)
|
|
|
|
|
|
# Test non-existent key argument.
|
|
key_path = os.path.join('repository_data',
|
|
'keystore', 'targets_key')
|
|
unused_key_object = repo_tool.import_ed25519_privatekey_from_file(key_path,
|
|
'password')
|
|
|
|
self.assertRaises(securesystemslib.exceptions.Error, self.metadata.unload_signing_key,
|
|
unused_key_object)
|
|
|
|
|
|
|
|
def test_add_signature(self):
|
|
# Test normal case.
|
|
# Load signature list from any of pre-generated metadata; needed for
|
|
# testing.
|
|
metadata_directory = os.path.join('repository_data',
|
|
'repository', 'metadata')
|
|
root_filepath = os.path.join(metadata_directory, 'root.json')
|
|
root_signable = securesystemslib.util.load_json_file(root_filepath)
|
|
signatures = root_signable['signatures']
|
|
|
|
# Add the first signature from the list, as only one is needed.
|
|
self.metadata.add_signature(signatures[0])
|
|
self.assertEqual(signatures, self.metadata.signatures)
|
|
|
|
# Verify that a signature is added if a 'signatures' entry is not present.
|
|
tuf.roledb.create_roledb_from_root_metadata(root_signable['signed'], repository_name='test_repository')
|
|
del tuf.roledb._roledb_dict['test_repository']['root']['signatures']
|
|
self.metadata._rolename = 'root'
|
|
self.metadata.add_signature(signatures[0])
|
|
|
|
# Add a duplicate signature.
|
|
self.metadata.add_signature(signatures[0])
|
|
|
|
# Test improperly formatted signature argument.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, self.metadata.add_signature, 3)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, self.metadata.add_signature, signatures[0], 3)
|
|
|
|
|
|
|
|
def test_remove_signature(self):
|
|
# Test normal case.
|
|
# Add a signature so remove_signature() has some signature to remove.
|
|
metadata_directory = os.path.join('repository_data',
|
|
'repository', 'metadata')
|
|
root_filepath = os.path.join(metadata_directory, 'root.json')
|
|
root_signable = securesystemslib.util.load_json_file(root_filepath)
|
|
signatures = root_signable['signatures']
|
|
self.metadata.add_signature(signatures[0])
|
|
|
|
self.metadata.remove_signature(signatures[0])
|
|
self.assertEqual(self.metadata.signatures, [])
|
|
|
|
|
|
# Test improperly formatted signature argument.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.metadata.remove_signature, 3)
|
|
|
|
# Test invalid signature argument (i.e., signature that has not been added.)
|
|
# Load an unused signature to be tested.
|
|
targets_filepath = os.path.join(metadata_directory, 'targets.json')
|
|
targets_signable = securesystemslib.util.load_json_file(targets_filepath)
|
|
signatures = targets_signable['signatures']
|
|
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
self.metadata.remove_signature, signatures[0])
|
|
|
|
|
|
|
|
def test_signatures(self):
|
|
# Test default case, where no signatures have been added yet.
|
|
self.assertEqual(self.metadata.signatures, [])
|
|
|
|
|
|
# Test getter after adding an example signature.
|
|
metadata_directory = os.path.join('repository_data',
|
|
'repository', 'metadata')
|
|
root_filepath = os.path.join(metadata_directory, 'root.json')
|
|
root_signable = securesystemslib.util.load_json_file(root_filepath)
|
|
signatures = root_signable['signatures']
|
|
|
|
# Add the first signature from the list, as only need one is needed.
|
|
self.metadata.add_signature(signatures[0])
|
|
self.assertEqual(signatures, self.metadata.signatures)
|
|
|
|
|
|
|
|
class TestRoot(unittest.TestCase):
|
|
def setUp(self):
|
|
tuf.roledb.create_roledb('test_repository')
|
|
tuf.keydb.create_keydb('test_repository')
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb(clear_all=True)
|
|
tuf.keydb.clear_keydb(clear_all=True)
|
|
|
|
|
|
|
|
def test_init(self):
|
|
|
|
# Test normal case.
|
|
# Root() subclasses Metadata(), and creates a 'root' role in 'tuf.roledb'.
|
|
repository_name = 'test_repository'
|
|
root_object = repo_tool.Root(repository_name)
|
|
self.assertTrue(isinstance(root_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('root', repository_name))
|
|
|
|
|
|
|
|
class TestTimestamp(unittest.TestCase):
|
|
def setUp(self):
|
|
tuf.roledb.create_roledb('test_repository')
|
|
tuf.keydb.create_keydb('test_repository')
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb()
|
|
tuf.keydb.clear_keydb()
|
|
|
|
|
|
|
|
def test_init(self):
|
|
|
|
# Test normal case.
|
|
# Timestamp() subclasses Metadata(), and creates a 'timestamp' role in
|
|
# 'tuf.roledb'.
|
|
timestamp_object = repo_tool.Timestamp('test_repository')
|
|
self.assertTrue(isinstance(timestamp_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('timestamp', 'test_repository'))
|
|
|
|
|
|
|
|
|
|
|
|
class TestSnapshot(unittest.TestCase):
|
|
def setUp(self):
|
|
tuf.roledb.create_roledb('test_repository')
|
|
tuf.keydb.create_keydb('test_repository')
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb(clear_all=True)
|
|
tuf.keydb.clear_keydb(clear_all=True)
|
|
|
|
|
|
|
|
def test_init(self):
|
|
|
|
# Test normal case.
|
|
# Snapshot() subclasses Metadata(), and creates a 'snapshot' role in
|
|
# 'tuf.roledb'.
|
|
snapshot_object = repo_tool.Snapshot('test_repository')
|
|
self.assertTrue(isinstance(snapshot_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('snapshot', 'test_repository'))
|
|
|
|
|
|
|
|
|
|
|
|
class TestTargets(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
# setUpClass() is called before tests in an individual class are executed.
|
|
|
|
# Create a temporary directory to store the repository, metadata, and target
|
|
# files. 'temporary_directory' must be deleted in TearDownClass() so that
|
|
# temporary files are always removed, even when exceptions occur.
|
|
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
|
|
|
|
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
|
|
# tearDownModule() is called after all the tests have run.
|
|
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
|
|
|
|
# Remove the temporary repository directory, which should contain all the
|
|
# metadata, targets, and key files generated for the test cases.
|
|
shutil.rmtree(cls.temporary_directory)
|
|
|
|
|
|
|
|
def setUp(self):
|
|
tuf.roledb.create_roledb('test_repository')
|
|
tuf.keydb.create_keydb('test_repository')
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
self.targets_directory = os.path.join(temporary_directory, 'repository',
|
|
'targets')
|
|
original_targets_directory = os.path.join('repository_data',
|
|
'repository', 'targets')
|
|
shutil.copytree(original_targets_directory, self.targets_directory)
|
|
self.targets_object = repo_tool.Targets(self.targets_directory,
|
|
repository_name='test_repository')
|
|
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb(clear_all=True)
|
|
tuf.keydb.clear_keydb(clear_all=True)
|
|
self.targets_object = None
|
|
|
|
|
|
|
|
def test_init(self):
|
|
|
|
# Test normal case.
|
|
# Snapshot() subclasses Metadata(), and creates a 'snapshot' role in
|
|
# 'tuf.roledb'.
|
|
targets_object = repo_tool.Targets('targets_directory/')
|
|
self.assertTrue(isinstance(targets_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('targets'))
|
|
|
|
# Custom Targets object rolename.
|
|
targets_object = repo_tool.Targets('targets_directory/', 'project')
|
|
self.assertTrue(isinstance(targets_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('project'))
|
|
|
|
# Custom roleinfo object (i.e., tuf.formats.ROLEDB_SCHEMA). 'keyids' and
|
|
# 'threshold' are required, the rest are optional.
|
|
roleinfo = {'keyids':
|
|
['66c4cb5fef5e4d62b7013ef1cab4b8a827a36c14056d5603c3a970e21eb30e6f'],
|
|
'threshold': 8}
|
|
self.assertTrue(tuf.formats.ROLEDB_SCHEMA.matches(roleinfo))
|
|
|
|
targets_object = repo_tool.Targets('targets_directory/', 'package', roleinfo)
|
|
self.assertTrue(isinstance(targets_object, repo_tool.Metadata))
|
|
self.assertTrue(tuf.roledb.role_exists('package'))
|
|
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_tool.Targets, 3)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_tool.Targets, 'targets_directory/', 3)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_tool.Targets, 'targets_directory/',
|
|
'targets', 3)
|
|
|
|
|
|
|
|
def test_call(self):
|
|
# Test normal case.
|
|
# Perform a delegation so that a delegated role can be accessed and tested
|
|
# through __call__(). Example: {targets_object}('role1').
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
|
|
# Create Targets() object to be tested.
|
|
targets_object = repo_tool.Targets(self.targets_directory)
|
|
targets_object.delegate('role1', [public_key], ['file1.txt'])
|
|
|
|
self.assertTrue(isinstance(targets_object('role1'), repo_tool.Targets))
|
|
|
|
# Test invalid (i.e., non-delegated) rolename argument.
|
|
self.assertRaises(tuf.exceptions.UnknownRoleError, targets_object, 'unknown_role')
|
|
|
|
# Test improperly formatted argument.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, targets_object, 1)
|
|
|
|
|
|
|
|
def test_get_delegated_rolenames(self):
|
|
# Test normal case.
|
|
# Perform two delegations so that get_delegated_rolenames() has roles to
|
|
# return.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
|
|
# Set needed arguments by delegate().
|
|
public_keys = [public_key]
|
|
threshold = 1
|
|
|
|
self.targets_object.delegate('tuf', public_keys, [], threshold, False,
|
|
['file1.txt'], path_hash_prefixes=None)
|
|
|
|
self.targets_object.delegate('warehouse', public_keys, [], threshold, False,
|
|
['file2.txt'], path_hash_prefixes=None)
|
|
|
|
# Test that get_delegated_rolenames returns the expected delegations.
|
|
expected_delegated_rolenames = ['targets/tuf/', 'targets/warehouse']
|
|
for delegated_rolename in self.targets_object.get_delegated_rolenames():
|
|
delegated_rolename in expected_delegated_rolenames
|
|
|
|
|
|
|
|
def test_target_files(self):
|
|
# Test normal case.
|
|
# Verify the targets object initially contains zero target files.
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
target_filepath = 'file1.txt'
|
|
self.targets_object.add_target(target_filepath)
|
|
|
|
self.assertEqual(len(self.targets_object.target_files), 1)
|
|
self.assertTrue(target_filepath in self.targets_object.target_files)
|
|
|
|
|
|
|
|
def test_delegations(self):
|
|
# Test normal case.
|
|
# Perform a delegation so that delegations() has a Targets() object to
|
|
# return.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
|
|
# Set needed arguments by delegate().
|
|
public_keys = [public_key]
|
|
rolename = 'tuf'
|
|
paths = ['file1.txt']
|
|
threshold = 1
|
|
|
|
self.targets_object.delegate(rolename, public_keys, paths, threshold,
|
|
terminating=False, list_of_targets=None, path_hash_prefixes=None)
|
|
|
|
# Test that a valid Targets() object is returned by delegations().
|
|
for delegated_object in self.targets_object.delegations:
|
|
self.assertTrue(isinstance(delegated_object, repo_tool.Targets))
|
|
|
|
# For testing / coverage purposes, try to remove a delegated role with the
|
|
# remove_delegated_role() method.
|
|
self.targets_object.remove_delegated_role(rolename)
|
|
|
|
|
|
|
|
def test_add_delegated_role(self):
|
|
# Test for invalid targets object.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.add_delegated_role, 'targets', 'bad_object')
|
|
|
|
|
|
|
|
def test_add_target(self):
|
|
# Test normal case.
|
|
# Verify the targets object initially contains zero target files.
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
target_filepath = 'file1.txt'
|
|
self.targets_object.add_target(target_filepath)
|
|
|
|
self.assertEqual(len(self.targets_object.target_files), 1)
|
|
self.assertTrue(target_filepath in self.targets_object.target_files)
|
|
|
|
# Test the 'custom' parameter of add_target(), where additional information
|
|
# may be specified for the target.
|
|
target2_filepath = 'file2.txt'
|
|
target2_fullpath = os.path.join(self.targets_directory, target2_filepath)
|
|
|
|
# The file permission of the target (octal number specifying file access
|
|
# for owner, group, others (e.g., 0755).
|
|
octal_file_permissions = oct(os.stat(target2_fullpath).st_mode)[4:]
|
|
custom_file_permissions = {'file_permissions': octal_file_permissions}
|
|
self.targets_object.add_target(target2_filepath, custom_file_permissions)
|
|
|
|
self.assertEqual(len(self.targets_object.target_files), 2)
|
|
self.assertTrue(target2_filepath in self.targets_object.target_files)
|
|
self.assertEqual(self.targets_object.target_files['file2.txt']['custom'],
|
|
custom_file_permissions)
|
|
|
|
# Attempt to replace target that has already been added.
|
|
octal_file_permissions2 = oct(os.stat(target2_fullpath).st_mode)[4:]
|
|
custom_file_permissions2 = {'file_permissions': octal_file_permissions}
|
|
self.targets_object.add_target(target2_filepath, custom_file_permissions2)
|
|
self.assertEqual(self.targets_object.target_files[target2_filepath]['custom'],
|
|
custom_file_permissions2)
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.add_target, 3)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.add_target, 3, custom_file_permissions)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.add_target, target_filepath, 3)
|
|
|
|
# A target path starting with a directory separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.add_target, '/file1.txt')
|
|
|
|
# A target path using a backward slash as a separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.add_target, 'subdir\\file1.txt')
|
|
|
|
# Should not access the file system to check for non-existent files
|
|
self.targets_object.add_target('non-existent')
|
|
|
|
|
|
|
|
def test_add_targets(self):
|
|
# Test normal case.
|
|
# Verify the targets object initially contains zero target files.
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
target1_filepath = 'file1.txt'
|
|
target2_filepath = 'file2.txt'
|
|
target3_filepath = 'file3.txt'
|
|
|
|
# Add a 'target1_filepath' duplicate for testing purposes
|
|
# ('target1_filepath' should not be added twice.)
|
|
target_files = \
|
|
[target1_filepath, target2_filepath, 'file3.txt', target1_filepath]
|
|
self.targets_object.add_targets(target_files)
|
|
|
|
self.assertEqual(len(self.targets_object.target_files), 3)
|
|
self.assertEqual(self.targets_object.target_files,
|
|
{target1_filepath: {}, target2_filepath: {}, target3_filepath: {}})
|
|
|
|
# Attempt to replace targets that has already been added.
|
|
self.targets_object.add_targets(target_files)
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.add_targets, 3)
|
|
|
|
# A target path starting with a directory separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.add_targets, ['/file1.txt'])
|
|
|
|
# A target path using a backward slash as a separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.add_targets, ['subdir\\file1.txt'])
|
|
|
|
# Check if the addition of the whole list is rolled back in case of
|
|
# wrong target path
|
|
target_files = self.targets_object.target_files
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.add_targets, ['file4.txt', '/file5.txt'])
|
|
self.assertEqual(self.targets_object.target_files, target_files)
|
|
|
|
# Should not access the file system to check for non-existent files
|
|
self.targets_object.add_targets(['non-existent'])
|
|
|
|
|
|
def test_remove_target(self):
|
|
# Test normal case.
|
|
# Verify the targets object initially contains zero target files.
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
# Add a target so that remove_target() has something to remove.
|
|
target_filepath = 'file1.txt'
|
|
self.targets_object.add_target(target_filepath)
|
|
|
|
# Test remove_target()'s behavior.
|
|
self.targets_object.remove_target(target_filepath)
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.remove_target, 3)
|
|
|
|
# Test for filepath that hasn't been added yet.
|
|
target5_filepath = 'file5.txt'
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
self.targets_object.remove_target,
|
|
target5_filepath)
|
|
|
|
|
|
|
|
def test_clear_targets(self):
|
|
# Test normal case.
|
|
# Verify the targets object initially contains zero target files.
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
# Add targets, to be tested by clear_targets().
|
|
target1_filepath = 'file1.txt'
|
|
target2_filepath = 'file2.txt'
|
|
self.targets_object.add_targets([target1_filepath, target2_filepath])
|
|
|
|
self.targets_object.clear_targets()
|
|
self.assertEqual(self.targets_object.target_files, {})
|
|
|
|
|
|
|
|
def test_delegate(self):
|
|
# Test normal case.
|
|
# Need at least one public key and valid target paths required by
|
|
# delegate().
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
|
|
# Set needed arguments by delegate().
|
|
public_keys = [public_key]
|
|
rolename = 'tuf'
|
|
list_of_targets = ['file1.txt', 'file2.txt']
|
|
threshold = 1
|
|
paths = ['*']
|
|
path_hash_prefixes = ['e3a3', '8fae', 'd543']
|
|
|
|
self.targets_object.delegate(rolename, public_keys, paths,
|
|
threshold, terminating=False, list_of_targets=list_of_targets,
|
|
path_hash_prefixes=path_hash_prefixes)
|
|
|
|
self.assertEqual(self.targets_object.get_delegated_rolenames(),
|
|
['tuf'])
|
|
|
|
# Test for delegated paths that do not exist.
|
|
# An exception should not be raised for non-existent delegated paths, since
|
|
# these paths may not necessarily exist when the delegation is done,
|
|
# and also because the delegated paths can be glob patterns.
|
|
self.targets_object.delegate(rolename, public_keys, ['non-existent'],
|
|
threshold, terminating=False, list_of_targets=list_of_targets,
|
|
path_hash_prefixes=path_hash_prefixes)
|
|
|
|
# Test for delegated targets that do not exist.
|
|
# An exception should not be raised for non-existent delegated targets,
|
|
# since at this point the file system should not be accessed yet
|
|
self.targets_object.delegate(rolename, public_keys, [], threshold,
|
|
terminating=False, list_of_targets=['non-existent.txt'],
|
|
path_hash_prefixes=path_hash_prefixes)
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate, 3, public_keys, paths, threshold,
|
|
list_of_targets, path_hash_prefixes)
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate, rolename, 3, paths, threshold,
|
|
list_of_targets, path_hash_prefixes)
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate, rolename, public_keys, 3, threshold,
|
|
list_of_targets, path_hash_prefixes)
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate, rolename, public_keys, paths, '3',
|
|
list_of_targets, path_hash_prefixes)
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate, rolename, public_keys, paths, threshold,
|
|
3, path_hash_prefixes)
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate, rolename, public_keys, paths, threshold,
|
|
list_of_targets, 3)
|
|
|
|
# Test invalid arguments (e.g., already delegated 'rolename', non-existent
|
|
# files, etc.).
|
|
# Test duplicate 'rolename' delegation, which should have been delegated
|
|
# in the normal case above.
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
self.targets_object.delegate, rolename, public_keys, paths, threshold,
|
|
list_of_targets, path_hash_prefixes)
|
|
|
|
# A path or target starting with a directory separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.delegate, rolename, public_keys, ['/*'])
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.delegate, rolename, public_keys, [],
|
|
list_of_targets=['/file1.txt'])
|
|
|
|
# A path or target using '\' as a directory separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.delegate, rolename, public_keys, ['subpath\\*'])
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.delegate, rolename, public_keys, [],
|
|
list_of_targets=['subpath\\file1.txt'])
|
|
|
|
|
|
|
|
|
|
def test_delegate_hashed_bins(self):
|
|
# Test normal case.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
|
|
# Set needed arguments by delegate_hashed_bins().
|
|
public_keys = [public_key]
|
|
list_of_targets = ['file1.txt']
|
|
|
|
|
|
# A helper function to check that the range of prefixes the role is
|
|
# delegated for, specified in path_hash_prefixes, matches the range
|
|
# implied by the bin, or delegation role, name.
|
|
def check_prefixes_match_range():
|
|
roleinfo = tuf.roledb.get_roleinfo(self.targets_object.rolename,
|
|
'test_repository')
|
|
have_prefixes = False
|
|
|
|
for delegated_role in roleinfo['delegations']['roles']:
|
|
if len(delegated_role['path_hash_prefixes']) > 0:
|
|
rolename = delegated_role['name']
|
|
prefixes = delegated_role['path_hash_prefixes']
|
|
have_prefixes = True
|
|
|
|
if len(prefixes) > 1:
|
|
prefix_range = "{}-{}".format(prefixes[0], prefixes[-1])
|
|
else:
|
|
prefix_range = prefixes[0]
|
|
|
|
self.assertEqual(rolename, prefix_range)
|
|
|
|
# We expect at least one delegation with some path_hash_prefixes
|
|
self.assertTrue(have_prefixes)
|
|
|
|
|
|
# Test delegate_hashed_bins() and verify that 16 hashed bins have
|
|
# been delegated in the parent's roleinfo.
|
|
self.targets_object.delegate_hashed_bins(list_of_targets, public_keys,
|
|
number_of_bins=16)
|
|
|
|
# The expected child rolenames, since 'number_of_bins' = 16
|
|
delegated_rolenames = ['0', '1', '2', '3', '4', '5', '6', '7',
|
|
'8', '9', 'a', 'b', 'c', 'd', 'e', 'f']
|
|
|
|
self.assertEqual(sorted(self.targets_object.get_delegated_rolenames()),
|
|
sorted(delegated_rolenames))
|
|
check_prefixes_match_range()
|
|
|
|
# For testing / coverage purposes, try to create delegated bins that
|
|
# hold a range of hash prefixes (e.g., bin name: 000-003).
|
|
self.targets_object.delegate_hashed_bins(list_of_targets, public_keys,
|
|
number_of_bins=512)
|
|
check_prefixes_match_range()
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate_hashed_bins, 3, public_keys,
|
|
number_of_bins=1)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate_hashed_bins,
|
|
list_of_targets, 3, number_of_bins=1)
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.delegate_hashed_bins,
|
|
list_of_targets, public_keys, '1')
|
|
|
|
# Test invalid arguments.
|
|
# Invalid number of bins, which must be a power of 2.
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
self.targets_object.delegate_hashed_bins,
|
|
list_of_targets, public_keys, number_of_bins=3)
|
|
|
|
# Invalid 'list_of_targets'.
|
|
# A path or target starting with a directory separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.delegate_hashed_bins,
|
|
['/file1.txt'], public_keys,
|
|
number_of_bins=2)
|
|
|
|
# A path or target using '\' as a directory separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.delegate_hashed_bins,
|
|
['subpath\\file1.txt'], public_keys,
|
|
number_of_bins=2)
|
|
|
|
|
|
def test_add_target_to_bin(self):
|
|
# Test normal case.
|
|
# Delegate the hashed bins so that add_target_to_bin() can be tested.
|
|
repository_name = 'test_repository'
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'targets_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
target1_filepath = 'file1.txt'
|
|
|
|
# Set needed arguments by delegate_hashed_bins().
|
|
public_keys = [public_key]
|
|
|
|
# Delegate to hashed bins. The target filepath to be tested is expected
|
|
# to contain a hash prefix of 'e', and should be available at:
|
|
# repository.targets('e').
|
|
self.targets_object.delegate_hashed_bins([], public_keys,
|
|
number_of_bins=16)
|
|
|
|
# Ensure each hashed bin initially contains zero targets.
|
|
for delegation in self.targets_object.delegations:
|
|
self.assertEqual(delegation.target_files, {})
|
|
|
|
# Add 'target1_filepath' and verify that the relative path of
|
|
# 'target1_filepath' is added to the correct bin.
|
|
rolename = self.targets_object.add_target_to_bin(target1_filepath, 16)
|
|
|
|
for delegation in self.targets_object.delegations:
|
|
if delegation.rolename == rolename:
|
|
self.assertTrue('file1.txt' in delegation.target_files)
|
|
|
|
else:
|
|
self.assertFalse('file1.txt' in delegation.target_files)
|
|
|
|
# Test for non-existent delegations and hashed bins.
|
|
empty_targets_role = repo_tool.Targets(self.targets_directory, 'empty',
|
|
repository_name=repository_name)
|
|
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
empty_targets_role.add_target_to_bin,
|
|
target1_filepath, 16)
|
|
|
|
# Test for a required hashed bin that does not exist.
|
|
self.targets_object.revoke(rolename)
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
self.targets_object.add_target_to_bin,
|
|
target1_filepath, 16)
|
|
|
|
# Test adding a target with fileinfo
|
|
target2_hashes = {'sha256': '517c0ce943e7274a2431fa5751e17cfd5225accd23e479bfaad13007751e87ef'}
|
|
target2_fileinfo = tuf.formats.make_fileinfo(37, target2_hashes)
|
|
target2_filepath = 'file2.txt'
|
|
|
|
rolename = self.targets_object.add_target_to_bin(target2_filepath, 16,
|
|
fileinfo=target2_fileinfo)
|
|
|
|
for delegation in self.targets_object.delegations:
|
|
if delegation.rolename == rolename:
|
|
self.assertTrue(target2_filepath in delegation.target_files)
|
|
|
|
else:
|
|
self.assertFalse(target2_filepath in delegation.target_files)
|
|
|
|
# Test improperly formatted argument.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.add_target_to_bin, 3, 'foo')
|
|
|
|
|
|
|
|
def test_remove_target_from_bin(self):
|
|
# Test normal case.
|
|
# Delegate the hashed bins so that add_target_to_bin() can be tested.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'targets_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
target1_filepath = 'file1.txt'
|
|
|
|
# Set needed arguments by delegate_hashed_bins().
|
|
public_keys = [public_key]
|
|
|
|
# Delegate to hashed bins. The target filepath to be tested is expected
|
|
# to contain a hash prefix of 'e', and can be accessed as:
|
|
# repository.targets('e').
|
|
self.targets_object.delegate_hashed_bins([], public_keys,
|
|
number_of_bins=16)
|
|
|
|
# Ensure each hashed bin initially contains zero targets.
|
|
for delegation in self.targets_object.delegations:
|
|
self.assertEqual(delegation.target_files, {})
|
|
|
|
# Add 'target1_filepath' and verify that the relative path of
|
|
# 'target1_filepath' is added to the correct bin.
|
|
added_rolename = self.targets_object.add_target_to_bin(target1_filepath, 16)
|
|
|
|
for delegation in self.targets_object.delegations:
|
|
if delegation.rolename == added_rolename:
|
|
self.assertTrue('file1.txt' in delegation.target_files)
|
|
self.assertTrue(len(delegation.target_files) == 1)
|
|
else:
|
|
self.assertTrue('file1.txt' not in delegation.target_files)
|
|
|
|
# Test the remove_target_from_bin() method. Verify that 'target1_filepath'
|
|
# has been removed.
|
|
removed_rolename = self.targets_object.remove_target_from_bin(target1_filepath, 16)
|
|
self.assertEqual(added_rolename, removed_rolename)
|
|
|
|
for delegation in self.targets_object.delegations:
|
|
self.assertTrue(target1_filepath not in delegation.target_files)
|
|
|
|
|
|
# Test improperly formatted argument.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.remove_target_from_bin, 3, 'foo')
|
|
|
|
# Invalid target file path argument.
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
self.targets_object.remove_target_from_bin, 'non-existent', 16)
|
|
|
|
|
|
|
|
def test_default_bin_num(self):
|
|
# Test creating, adding to and removing from hashed bins with the default
|
|
# number of bins
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
target1_filepath = os.path.join(self.targets_directory, 'file1.txt')
|
|
|
|
# Set needed arguments by delegate_hashed_bins().
|
|
public_keys = [public_key]
|
|
|
|
# Test default parameters for number_of_bins
|
|
self.targets_object.delegate_hashed_bins([], public_keys)
|
|
|
|
# Ensure each hashed bin initially contains zero targets.
|
|
for delegation in self.targets_object.delegations:
|
|
self.assertEqual(delegation.target_files, {})
|
|
|
|
# Add 'target1_filepath' and verify that the relative path of
|
|
# 'target1_filepath' is added to the correct bin.
|
|
added_rolename = self.targets_object.add_target_to_bin(os.path.basename(target1_filepath))
|
|
|
|
for delegation in self.targets_object.delegations:
|
|
if delegation.rolename == added_rolename:
|
|
self.assertTrue('file1.txt' in delegation.target_files)
|
|
|
|
else:
|
|
self.assertFalse('file1.txt' in delegation.target_files)
|
|
|
|
# Remove target1_filepath and verify that all bins are now empty
|
|
removed_rolename = self.targets_object.remove_target_from_bin(
|
|
os.path.basename(target1_filepath))
|
|
self.assertEqual(added_rolename, removed_rolename)
|
|
|
|
for delegation in self.targets_object.delegations:
|
|
self.assertEqual(delegation.target_files, {})
|
|
|
|
|
|
def test_add_paths(self):
|
|
# Test normal case.
|
|
# Perform a delegation so that add_paths() has a child role to delegate a
|
|
# path to.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
|
|
# Set needed arguments by delegate().
|
|
public_keys = [public_key]
|
|
rolename = 'tuf'
|
|
threshold = 1
|
|
|
|
self.targets_object.delegate(rolename, public_keys, [], threshold,
|
|
list_of_targets=None, path_hash_prefixes=None)
|
|
|
|
# Delegate an extra role for test coverage (i.e., to later verify that
|
|
# delegated paths are not added to a child role that was not requested).
|
|
self.targets_object.delegate('junk_role', public_keys, [])
|
|
|
|
paths = ['tuf_files/*']
|
|
self.targets_object.add_paths(paths, 'tuf')
|
|
|
|
# Retrieve 'targets_object' roleinfo, and verify the roleinfo contains the
|
|
# expected delegated paths of the delegated role.
|
|
targets_object_roleinfo = tuf.roledb.get_roleinfo(self.targets_object.rolename,
|
|
'test_repository')
|
|
|
|
delegated_role = targets_object_roleinfo['delegations']['roles'][0]
|
|
self.assertEqual(['tuf_files/*'], delegated_role['paths'])
|
|
|
|
# Try to add a delegated path that has already been set.
|
|
# add_paths() should simply log a message in this case.
|
|
self.targets_object.add_paths(paths, 'tuf')
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.add_paths, 3, 'tuf')
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object.add_paths, paths, 3)
|
|
|
|
|
|
# Test invalid arguments.
|
|
# A non-delegated child role.
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
self.targets_object.add_paths, paths, 'non_delegated_rolename')
|
|
|
|
# A path starting with a directory separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.add_paths, ['/tuf_files/*'], 'tuf')
|
|
|
|
# A path using a backward slash as a separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object.add_paths, ['tuf_files\\*'], 'tuf')
|
|
|
|
# add_paths() should not raise an exception for non-existent
|
|
# paths, which it previously did.
|
|
self.targets_object.add_paths(['non-existent'], 'tuf')
|
|
|
|
|
|
|
|
|
|
def test_revoke(self):
|
|
# Test normal case.
|
|
# Perform a delegation so that revoke() has a delegation to revoke.
|
|
keystore_directory = os.path.join('repository_data', 'keystore')
|
|
public_keypath = os.path.join(keystore_directory, 'snapshot_key.pub')
|
|
public_key = repo_tool.import_ed25519_publickey_from_file(public_keypath)
|
|
|
|
# Set needed arguments by delegate().
|
|
public_keys = [public_key]
|
|
rolename = 'tuf'
|
|
paths = ['file1.txt']
|
|
threshold = 1
|
|
|
|
self.targets_object.delegate(rolename, public_keys, [], threshold, False,
|
|
paths, path_hash_prefixes=None)
|
|
|
|
# Test revoke()
|
|
self.targets_object.revoke('tuf')
|
|
self.assertEqual(self.targets_object.get_delegated_rolenames(), [])
|
|
|
|
|
|
# Test improperly formatted rolename argument.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, self.targets_object.revoke, 3)
|
|
|
|
|
|
|
|
def test_check_path(self):
|
|
# Test that correct path does not raise exception: using '/' as a separator
|
|
# and does not start with a directory separator
|
|
self.targets_object._check_path('file1.txt')
|
|
|
|
# Test that non-existent path does not raise exception (_check_path
|
|
# checks only the path string for compliance)
|
|
self.targets_object._check_path('non-existent.txt')
|
|
self.targets_object._check_path('subdir/non-existent')
|
|
|
|
# Test improperly formatted pathname argument.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
self.targets_object._check_path, 3)
|
|
|
|
# Test invalid pathname
|
|
# Starting with os separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object._check_path, '/file1.txt')
|
|
|
|
# Starting with Windows-style separator
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object._check_path, '\\file1.txt')
|
|
|
|
# Using Windows-style separator ('\')
|
|
self.assertRaises(tuf.exceptions.InvalidNameError,
|
|
self.targets_object._check_path, 'subdir\\non-existent')
|
|
|
|
|
|
|
|
class TestRepositoryToolFunctions(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
# setUpClass() is called before tests in an individual class are executed.
|
|
|
|
# Create a temporary directory to store the repository, metadata, and target
|
|
# files. 'temporary_directory' must be deleted in TearDownClass() so that
|
|
# temporary files are always removed, even when exceptions occur.
|
|
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
|
|
|
|
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
|
|
# tearDownModule() is called after all the tests have run.
|
|
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
|
|
|
|
# Remove the temporary repository directory, which should contain all the
|
|
# metadata, targets, and key files generated for the test cases.
|
|
shutil.rmtree(cls.temporary_directory)
|
|
|
|
|
|
|
|
def setUp(self):
|
|
tuf.roledb.create_roledb('test_repository')
|
|
tuf.keydb.create_keydb('test_repository')
|
|
|
|
|
|
def tearDown(self):
|
|
tuf.roledb.clear_roledb(clear_all=True)
|
|
tuf.keydb.clear_keydb(clear_all=True)
|
|
|
|
|
|
|
|
def test_create_new_repository(self):
|
|
# Test normal case.
|
|
# Setup the temporary repository directories needed by
|
|
# create_new_repository().
|
|
repository_name = 'test_repository'
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
repository_directory = os.path.join(temporary_directory, 'repository')
|
|
metadata_directory = os.path.join(repository_directory,
|
|
repo_tool.METADATA_STAGED_DIRECTORY_NAME)
|
|
targets_directory = os.path.join(repository_directory,
|
|
repo_tool.TARGETS_DIRECTORY_NAME)
|
|
|
|
repository = repo_tool.create_new_repository(repository_directory,
|
|
repository_name)
|
|
self.assertTrue(isinstance(repository, repo_tool.Repository))
|
|
|
|
# Verify that the 'repository/', 'repository/metadata', and
|
|
# 'repository/targets' directories were created.
|
|
self.assertTrue(os.path.exists(repository_directory))
|
|
self.assertTrue(os.path.exists(metadata_directory))
|
|
self.assertTrue(os.path.exists(targets_directory))
|
|
|
|
# Test that the 'repository' directory is created (along with the other
|
|
# sub-directories) when it does not exist yet. The repository tool creates
|
|
# the non-existent directory.
|
|
shutil.rmtree(repository_directory)
|
|
|
|
repository = repo_tool.create_new_repository(repository_directory,
|
|
repository_name)
|
|
self.assertTrue(isinstance(repository, repo_tool.Repository))
|
|
|
|
# Verify that the 'repository/', 'repository/metadata', and
|
|
# 'repository/targets' directories were created.
|
|
self.assertTrue(os.path.exists(repository_directory))
|
|
self.assertTrue(os.path.exists(metadata_directory))
|
|
self.assertTrue(os.path.exists(targets_directory))
|
|
|
|
# Test for a repository name that doesn't exist yet. Note:
|
|
# The 'test_repository' repository name is created in setup() before this
|
|
# test case is run.
|
|
repository = repo_tool.create_new_repository(repository_directory, 'my-repo')
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
repo_tool.create_new_repository, 3, repository_name)
|
|
|
|
# For testing purposes, try to create a repository directory that
|
|
# fails due to a non-errno.EEXIST exception raised.
|
|
self.assertRaises(securesystemslib.exceptions.StorageError,
|
|
repo_tool.create_new_repository, 'bad' * 2000, repository_name)
|
|
|
|
# Reset the 'repository_directory' so that the metadata and targets
|
|
# directories can be tested likewise.
|
|
repository_directory = os.path.join(temporary_directory, 'repository')
|
|
|
|
# The same test as before, but for the metadata and targets directories.
|
|
original_metadata_staged_directory = \
|
|
tuf.repository_tool.METADATA_STAGED_DIRECTORY_NAME
|
|
tuf.repository_tool.METADATA_STAGED_DIRECTORY_NAME = 'bad' * 2000
|
|
|
|
self.assertRaises(securesystemslib.exceptions.StorageError,
|
|
repo_tool.create_new_repository, repository_directory, repository_name)
|
|
|
|
# Reset metadata staged directory so that the targets directory can be
|
|
# tested...
|
|
tuf.repository_tool.METADATA_STAGED_DIRECTORY_NAME = \
|
|
original_metadata_staged_directory
|
|
|
|
original_targets_directory = tuf.repository_tool.TARGETS_DIRECTORY_NAME
|
|
tuf.repository_tool.TARGETS_DIRECTORY_NAME = 'bad' * 2000
|
|
|
|
self.assertRaises(securesystemslib.exceptions.StorageError,
|
|
repo_tool.create_new_repository, repository_directory, repository_name)
|
|
|
|
tuf.repository_tool.TARGETS_DIRECTORY_NAME = \
|
|
original_targets_directory
|
|
|
|
|
|
|
|
def test_load_repository(self):
|
|
# Test normal case.
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
original_repository_directory = os.path.join('repository_data',
|
|
'repository')
|
|
|
|
repository_directory = os.path.join(temporary_directory, 'repository')
|
|
metadata_directory = os.path.join(repository_directory, 'metadata.staged')
|
|
shutil.copytree(original_repository_directory, repository_directory)
|
|
|
|
# For testing purposes, add a metadata file with an extension that is
|
|
# not supported, and another with invalid JSON content.
|
|
invalid_metadata_file = os.path.join(metadata_directory, 'root.xml')
|
|
root_file = os.path.join(metadata_directory, 'root.json')
|
|
shutil.copyfile(root_file, invalid_metadata_file)
|
|
bad_root_content = os.path.join(metadata_directory, 'root_bad.json')
|
|
|
|
with open(bad_root_content, 'wb') as file_object:
|
|
file_object.write(b'bad')
|
|
|
|
repository = repo_tool.load_repository(repository_directory)
|
|
self.assertTrue(isinstance(repository, repo_tool.Repository))
|
|
|
|
# Verify the expected roles have been loaded. See
|
|
# 'tuf/tests/repository_data/repository/'.
|
|
expected_roles = \
|
|
['root', 'targets', 'snapshot', 'timestamp', 'role1', 'role2']
|
|
for role in tuf.roledb.get_rolenames():
|
|
self.assertTrue(role in expected_roles)
|
|
|
|
self.assertTrue(len(repository.root.keys))
|
|
self.assertTrue(len(repository.targets.keys))
|
|
self.assertTrue(len(repository.snapshot.keys))
|
|
self.assertTrue(len(repository.timestamp.keys))
|
|
self.assertEqual(1, repository.targets('role1').version)
|
|
|
|
# It is assumed that the targets (tuf/tests/repository_data/) role contains
|
|
# 'file1.txt' and 'file2.txt'.
|
|
self.assertTrue('file1.txt' in repository.targets.target_files)
|
|
self.assertTrue('file2.txt' in repository.targets.target_files)
|
|
self.assertTrue('file3.txt' in repository.targets('role1').target_files)
|
|
|
|
# Test if targets file info is loaded correctly: read the JSON metadata
|
|
# files separately and then compare with the loaded repository data.
|
|
targets_path = os.path.join(metadata_directory, 'targets.json')
|
|
role1_path = os.path.join(metadata_directory, 'role1.json')
|
|
|
|
targets_object = securesystemslib.util.load_json_file(targets_path)
|
|
role1_object = securesystemslib.util.load_json_file(role1_path)
|
|
|
|
targets_fileinfo = targets_object['signed']['targets']
|
|
role1_fileinfo = role1_object['signed']['targets']
|
|
|
|
repository = repo_tool.load_repository(repository_directory)
|
|
|
|
self.assertEqual(targets_fileinfo, repository.targets.target_files)
|
|
self.assertEqual(role1_fileinfo, repository.targets('role1').target_files)
|
|
|
|
# Test for a non-default repository name.
|
|
repository = repo_tool.load_repository(repository_directory, 'my-repo')
|
|
|
|
# Test improperly formatted arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
repo_tool.load_repository, 3)
|
|
|
|
|
|
# Test for invalid 'repository_directory' (i.e., does not contain the
|
|
# minimum required metadata.
|
|
root_filepath = os.path.join(repository_directory,
|
|
repo_tool.METADATA_STAGED_DIRECTORY_NAME, 'root.json')
|
|
os.remove(root_filepath)
|
|
self.assertRaises(tuf.exceptions.RepositoryError,
|
|
repo_tool.load_repository, repository_directory)
|
|
|
|
|
|
|
|
def test_dirty_roles(self):
|
|
repository_name = 'test_repository'
|
|
original_repository_directory = os.path.join('repository_data',
|
|
'repository')
|
|
repository = repo_tool.load_repository(original_repository_directory,
|
|
repository_name)
|
|
|
|
# dirty_roles() only logs the list of dirty roles.
|
|
repository.dirty_roles()
|
|
|
|
|
|
|
|
def test_dump_signable_metadata(self):
|
|
metadata_directory = os.path.join('repository_data',
|
|
'repository', 'metadata')
|
|
targets_metadata_file = os.path.join(metadata_directory, 'targets.json')
|
|
|
|
metadata_content = repo_tool.dump_signable_metadata(targets_metadata_file)
|
|
|
|
# Test for an invalid targets metadata file..
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
repo_tool.dump_signable_metadata, 1)
|
|
self.assertRaises(securesystemslib.exceptions.StorageError,
|
|
repo_tool.dump_signable_metadata, 'bad file path')
|
|
|
|
|
|
|
|
def test_append_signature(self):
|
|
metadata_directory = os.path.join('repository_data',
|
|
'repository', 'metadata')
|
|
targets_metadata_path = os.path.join(metadata_directory, 'targets.json')
|
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
tmp_targets_metadata_path = os.path.join(temporary_directory, 'targets.json')
|
|
shutil.copyfile(targets_metadata_path, tmp_targets_metadata_path)
|
|
|
|
# Test for normal case.
|
|
targets_metadata = securesystemslib.util.load_json_file(tmp_targets_metadata_path)
|
|
num_signatures = len(targets_metadata['signatures'])
|
|
signature = targets_metadata['signatures'][0]
|
|
|
|
repo_tool.append_signature(signature, tmp_targets_metadata_path)
|
|
|
|
targets_metadata = securesystemslib.util.load_json_file(tmp_targets_metadata_path)
|
|
self.assertTrue(num_signatures, len(targets_metadata['signatures']))
|
|
|
|
# Test for invalid arguments.
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
repo_tool.append_signature, 1, tmp_targets_metadata_path)
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
repo_tool.append_signature, signature, 1)
|
|
|
|
|
|
# Run the test cases.
|
|
if __name__ == '__main__':
|
|
unittest.main()
|