2014-06-03 18:32:44 +00:00
|
|
|
#!/usr/bin/env python
|
|
|
|
|
|
2017-11-30 18:33:11 +00:00
|
|
|
# Copyright 2014 - 2017, New York University and the TUF contributors
|
|
|
|
|
# SPDX-License-Identifier: MIT OR Apache-2.0
|
|
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
"""
|
|
|
|
|
<Program Name>
|
|
|
|
|
test_repository_lib.py
|
|
|
|
|
|
2017-01-10 22:05:12 +00:00
|
|
|
<Author>
|
2014-06-03 18:32:44 +00:00
|
|
|
Vladimir Diaz <vladimir.v.diaz@gmail.com>
|
|
|
|
|
|
|
|
|
|
<Started>
|
|
|
|
|
June 1, 2014.
|
|
|
|
|
|
|
|
|
|
<Copyright>
|
2018-02-05 16:31:19 +00:00
|
|
|
See LICENSE-MIT OR LICENSE for licensing information.
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
<Purpose>
|
|
|
|
|
Unit test for 'repository_lib.py'.
|
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
# Help with Python 3 compatibility, where the print statement is a function, an
|
|
|
|
|
# implicit relative import is invalid, and the '/' operator performs true
|
|
|
|
|
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
|
|
|
|
|
from __future__ import print_function
|
|
|
|
|
from __future__ import absolute_import
|
|
|
|
|
from __future__ import division
|
|
|
|
|
from __future__ import unicode_literals
|
|
|
|
|
|
|
|
|
|
import os
|
|
|
|
|
import time
|
|
|
|
|
import datetime
|
|
|
|
|
import logging
|
|
|
|
|
import tempfile
|
2014-06-11 15:29:00 +00:00
|
|
|
import json
|
2014-06-03 18:32:44 +00:00
|
|
|
import shutil
|
2014-06-19 13:50:20 +00:00
|
|
|
import stat
|
2014-06-06 22:57:23 +00:00
|
|
|
import sys
|
2017-09-21 21:16:29 +00:00
|
|
|
import unittest
|
2018-04-27 19:43:55 +00:00
|
|
|
import platform
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
import tuf
|
2017-01-10 22:05:12 +00:00
|
|
|
import tuf.formats
|
2014-06-03 18:32:44 +00:00
|
|
|
import tuf.log
|
2016-11-09 22:10:05 +00:00
|
|
|
import tuf.formats
|
2014-06-03 18:32:44 +00:00
|
|
|
import tuf.roledb
|
2017-01-10 22:05:12 +00:00
|
|
|
import tuf.keydb
|
|
|
|
|
import tuf.settings
|
|
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
import tuf.repository_lib as repo_lib
|
2015-10-20 13:16:39 +00:00
|
|
|
import tuf.repository_tool as repo_tool
|
|
|
|
|
|
2017-01-10 22:05:12 +00:00
|
|
|
import securesystemslib
|
2015-06-02 14:28:02 +00:00
|
|
|
import six
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
logger = logging.getLogger('tuf.test_repository_lib')
|
|
|
|
|
|
|
|
|
|
repo_lib.disable_console_log_messages()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
class TestRepositoryToolFunctions(unittest.TestCase):
|
|
|
|
|
@classmethod
|
|
|
|
|
def setUpClass(cls):
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# setUpClass() is called before tests in an individual class are executed.
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# Create a temporary directory to store the repository, metadata, and target
|
|
|
|
|
# files. 'temporary_directory' must be deleted in TearDownClass() so that
|
2017-01-10 22:05:12 +00:00
|
|
|
# temporary files are always removed, even when exceptions occur.
|
2017-08-23 16:52:23 +00:00
|
|
|
tuf.roledb.clear_roledb(clear_all=True)
|
|
|
|
|
tuf.keydb.clear_keydb(clear_all=True)
|
2014-06-03 18:32:44 +00:00
|
|
|
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
|
|
|
|
|
|
|
|
|
|
|
2017-01-10 22:05:12 +00:00
|
|
|
|
|
|
|
|
@classmethod
|
2014-06-03 18:32:44 +00:00
|
|
|
def tearDownClass(cls):
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# tearDownModule() is called after all the tests have run.
|
|
|
|
|
# http://docs.python.org/2/library/unittest.html#class-and-module-fixtures
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# Remove the temporary repository directory, which should contain all the
|
|
|
|
|
# metadata, targets, and key files generated for the test cases.
|
2017-08-23 16:52:23 +00:00
|
|
|
tuf.roledb.clear_roledb(clear_all=True)
|
|
|
|
|
tuf.keydb.clear_keydb(clear_all=True)
|
2018-04-20 15:27:08 +00:00
|
|
|
|
2016-10-26 04:30:34 +00:00
|
|
|
shutil.rmtree(cls.temporary_directory)
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def setUp(self):
|
2017-02-08 20:15:07 +00:00
|
|
|
tuf.roledb.create_roledb('test_repository')
|
|
|
|
|
tuf.keydb.create_keydb('test_repository')
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def tearDown(self):
|
2016-07-15 19:24:45 +00:00
|
|
|
tuf.roledb.clear_roledb(clear_all=True)
|
2017-01-10 22:05:12 +00:00
|
|
|
tuf.keydb.clear_keydb(clear_all=True)
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_generate_and_write_rsa_keypair(self):
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# Test normal case.
|
2017-01-10 22:05:12 +00:00
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
2014-06-03 18:32:44 +00:00
|
|
|
test_keypath = os.path.join(temporary_directory, 'rsa_key')
|
|
|
|
|
|
|
|
|
|
repo_lib.generate_and_write_rsa_keypair(test_keypath, password='pw')
|
|
|
|
|
self.assertTrue(os.path.exists(test_keypath))
|
|
|
|
|
self.assertTrue(os.path.exists(test_keypath + '.pub'))
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# Ensure the generated key files are importable.
|
|
|
|
|
imported_pubkey = \
|
|
|
|
|
repo_lib.import_rsa_publickey_from_file(test_keypath + '.pub')
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertTrue(securesystemslib.formats.RSAKEY_SCHEMA.matches(imported_pubkey))
|
|
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
imported_privkey = \
|
|
|
|
|
repo_lib.import_rsa_privatekey_from_file(test_keypath, 'pw')
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertTrue(securesystemslib.formats.RSAKEY_SCHEMA.matches(imported_privkey))
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
# Custom 'bits' argument.
|
|
|
|
|
os.remove(test_keypath)
|
|
|
|
|
os.remove(test_keypath + '.pub')
|
|
|
|
|
repo_lib.generate_and_write_rsa_keypair(test_keypath, bits=2048,
|
|
|
|
|
password='pw')
|
|
|
|
|
self.assertTrue(os.path.exists(test_keypath))
|
|
|
|
|
self.assertTrue(os.path.exists(test_keypath + '.pub'))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Test improperly formatted arguments.
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_and_write_rsa_keypair,
|
2014-06-03 18:32:44 +00:00
|
|
|
3, bits=2048, password='pw')
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_and_write_rsa_keypair,
|
2014-06-03 18:32:44 +00:00
|
|
|
test_keypath, bits='bad', password='pw')
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_and_write_rsa_keypair,
|
2014-06-03 18:32:44 +00:00
|
|
|
test_keypath, bits=2048, password=3)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Test invalid 'bits' argument.
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_and_write_rsa_keypair,
|
2014-06-03 18:32:44 +00:00
|
|
|
test_keypath, bits=1024, password='pw')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_import_rsa_privatekey_from_file(self):
|
|
|
|
|
# Test normal case.
|
|
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# Load one of the pre-generated key files from 'tuf/tests/repository_data'.
|
|
|
|
|
# 'password' unlocks the pre-generated key files.
|
|
|
|
|
key_filepath = os.path.join('repository_data', 'keystore',
|
|
|
|
|
'root_key')
|
|
|
|
|
self.assertTrue(os.path.exists(key_filepath))
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
imported_rsa_key = repo_lib.import_rsa_privatekey_from_file(key_filepath,
|
|
|
|
|
'password')
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertTrue(securesystemslib.formats.RSAKEY_SCHEMA.matches(imported_rsa_key))
|
|
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
# Test improperly formatted argument.
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
2014-06-03 18:32:44 +00:00
|
|
|
repo_lib.import_rsa_privatekey_from_file, 3, 'pw')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Test invalid argument.
|
|
|
|
|
# Non-existent key file.
|
|
|
|
|
nonexistent_keypath = os.path.join(temporary_directory,
|
2017-01-10 22:05:12 +00:00
|
|
|
'nonexistent_keypath')
|
2014-06-03 18:32:44 +00:00
|
|
|
self.assertRaises(IOError, repo_lib.import_rsa_privatekey_from_file,
|
|
|
|
|
nonexistent_keypath, 'pw')
|
2017-01-10 22:05:12 +00:00
|
|
|
|
|
|
|
|
# Invalid key file argument.
|
|
|
|
|
invalid_keyfile = os.path.join(temporary_directory, 'invalid_keyfile')
|
2014-06-03 18:32:44 +00:00
|
|
|
with open(invalid_keyfile, 'wb') as file_object:
|
|
|
|
|
file_object.write(b'bad keyfile')
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.CryptoError, repo_lib.import_rsa_privatekey_from_file,
|
2014-06-03 18:32:44 +00:00
|
|
|
invalid_keyfile, 'pw')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_import_rsa_publickey_from_file(self):
|
|
|
|
|
# Test normal case.
|
|
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# Load one of the pre-generated key files from 'tuf/tests/repository_data'.
|
|
|
|
|
key_filepath = os.path.join('repository_data', 'keystore',
|
|
|
|
|
'root_key.pub')
|
|
|
|
|
self.assertTrue(os.path.exists(key_filepath))
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
imported_rsa_key = repo_lib.import_rsa_publickey_from_file(key_filepath)
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertTrue(securesystemslib.formats.RSAKEY_SCHEMA.matches(imported_rsa_key))
|
|
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
# Test improperly formatted argument.
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
2014-06-03 18:32:44 +00:00
|
|
|
repo_lib.import_rsa_privatekey_from_file, 3)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Test invalid argument.
|
|
|
|
|
# Non-existent key file.
|
|
|
|
|
nonexistent_keypath = os.path.join(temporary_directory,
|
|
|
|
|
'nonexistent_keypath')
|
|
|
|
|
self.assertRaises(IOError, repo_lib.import_rsa_publickey_from_file,
|
|
|
|
|
nonexistent_keypath)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
|
|
|
|
# Invalid key file argument.
|
|
|
|
|
invalid_keyfile = os.path.join(temporary_directory, 'invalid_keyfile')
|
2014-06-03 18:32:44 +00:00
|
|
|
with open(invalid_keyfile, 'wb') as file_object:
|
|
|
|
|
file_object.write(b'bad keyfile')
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.Error, repo_lib.import_rsa_publickey_from_file,
|
2014-06-03 18:32:44 +00:00
|
|
|
invalid_keyfile)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_generate_and_write_ed25519_keypair(self):
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# Test normal case.
|
2017-01-10 22:05:12 +00:00
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
2014-06-03 18:32:44 +00:00
|
|
|
test_keypath = os.path.join(temporary_directory, 'ed25519_key')
|
|
|
|
|
|
|
|
|
|
repo_lib.generate_and_write_ed25519_keypair(test_keypath, password='pw')
|
|
|
|
|
self.assertTrue(os.path.exists(test_keypath))
|
|
|
|
|
self.assertTrue(os.path.exists(test_keypath + '.pub'))
|
|
|
|
|
|
|
|
|
|
# Ensure the generated key files are importable.
|
|
|
|
|
imported_pubkey = \
|
|
|
|
|
repo_lib.import_ed25519_publickey_from_file(test_keypath + '.pub')
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertTrue(securesystemslib.formats.ED25519KEY_SCHEMA.matches(imported_pubkey))
|
|
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
imported_privkey = \
|
|
|
|
|
repo_lib.import_ed25519_privatekey_from_file(test_keypath, 'pw')
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertTrue(securesystemslib.formats.ED25519KEY_SCHEMA.matches(imported_privkey))
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# Test improperly formatted arguments.
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
2014-06-03 18:32:44 +00:00
|
|
|
repo_lib.generate_and_write_ed25519_keypair,
|
|
|
|
|
3, password='pw')
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_and_write_rsa_keypair,
|
2014-06-03 18:32:44 +00:00
|
|
|
test_keypath, password=3)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_import_ed25519_publickey_from_file(self):
|
|
|
|
|
# Test normal case.
|
|
|
|
|
# Generate ed25519 keys that can be imported.
|
|
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
2017-01-10 22:05:12 +00:00
|
|
|
ed25519_keypath = os.path.join(temporary_directory, 'ed25519_key')
|
2014-06-03 18:32:44 +00:00
|
|
|
repo_lib.generate_and_write_ed25519_keypair(ed25519_keypath, password='pw')
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
imported_ed25519_key = \
|
|
|
|
|
repo_lib.import_ed25519_publickey_from_file(ed25519_keypath + '.pub')
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertTrue(securesystemslib.formats.ED25519KEY_SCHEMA.matches(imported_ed25519_key))
|
|
|
|
|
|
|
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# Test improperly formatted argument.
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
2014-06-03 18:32:44 +00:00
|
|
|
repo_lib.import_ed25519_publickey_from_file, 3)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Test invalid argument.
|
|
|
|
|
# Non-existent key file.
|
|
|
|
|
nonexistent_keypath = os.path.join(temporary_directory,
|
|
|
|
|
'nonexistent_keypath')
|
|
|
|
|
self.assertRaises(IOError, repo_lib.import_ed25519_publickey_from_file,
|
|
|
|
|
nonexistent_keypath)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
|
|
|
|
# Invalid key file argument.
|
|
|
|
|
invalid_keyfile = os.path.join(temporary_directory, 'invalid_keyfile')
|
2014-06-03 18:32:44 +00:00
|
|
|
with open(invalid_keyfile, 'wb') as file_object:
|
|
|
|
|
file_object.write(b'bad keyfile')
|
2017-01-10 22:05:12 +00:00
|
|
|
|
|
|
|
|
self.assertRaises(securesystemslib.exceptions.Error, repo_lib.import_ed25519_publickey_from_file,
|
2014-06-03 18:32:44 +00:00
|
|
|
invalid_keyfile)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-11 15:29:00 +00:00
|
|
|
# Invalid public key imported (contains unexpected keytype.)
|
2017-01-10 22:05:12 +00:00
|
|
|
keytype = imported_ed25519_key['keytype']
|
2014-06-11 15:29:00 +00:00
|
|
|
keyval = imported_ed25519_key['keyval']
|
2017-08-23 14:55:38 +00:00
|
|
|
scheme = imported_ed25519_key['scheme']
|
2014-06-11 15:29:00 +00:00
|
|
|
ed25519key_metadata_format = \
|
2017-08-23 14:55:38 +00:00
|
|
|
securesystemslib.keys.format_keyval_to_metadata(keytype, scheme,
|
|
|
|
|
keyval, private=False)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-11 15:29:00 +00:00
|
|
|
ed25519key_metadata_format['keytype'] = 'invalid_keytype'
|
|
|
|
|
with open(ed25519_keypath + '.pub', 'wb') as file_object:
|
|
|
|
|
file_object.write(json.dumps(ed25519key_metadata_format).encode('utf-8'))
|
2017-01-10 22:05:12 +00:00
|
|
|
|
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
2014-06-11 15:29:00 +00:00
|
|
|
repo_lib.import_ed25519_publickey_from_file,
|
|
|
|
|
ed25519_keypath + '.pub')
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_import_ed25519_privatekey_from_file(self):
|
|
|
|
|
# Test normal case.
|
|
|
|
|
# Generate ed25519 keys that can be imported.
|
|
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
2017-01-10 22:05:12 +00:00
|
|
|
ed25519_keypath = os.path.join(temporary_directory, 'ed25519_key')
|
2014-06-03 18:32:44 +00:00
|
|
|
repo_lib.generate_and_write_ed25519_keypair(ed25519_keypath, password='pw')
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
imported_ed25519_key = \
|
|
|
|
|
repo_lib.import_ed25519_privatekey_from_file(ed25519_keypath, 'pw')
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertTrue(securesystemslib.formats.ED25519KEY_SCHEMA.matches(imported_ed25519_key))
|
|
|
|
|
|
|
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# Test improperly formatted argument.
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
2014-06-03 18:32:44 +00:00
|
|
|
repo_lib.import_ed25519_privatekey_from_file, 3, 'pw')
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Test invalid argument.
|
|
|
|
|
# Non-existent key file.
|
|
|
|
|
nonexistent_keypath = os.path.join(temporary_directory,
|
|
|
|
|
'nonexistent_keypath')
|
|
|
|
|
self.assertRaises(IOError, repo_lib.import_ed25519_privatekey_from_file,
|
|
|
|
|
nonexistent_keypath, 'pw')
|
2017-01-10 22:05:12 +00:00
|
|
|
|
|
|
|
|
# Invalid key file argument.
|
|
|
|
|
invalid_keyfile = os.path.join(temporary_directory, 'invalid_keyfile')
|
2014-06-03 18:32:44 +00:00
|
|
|
with open(invalid_keyfile, 'wb') as file_object:
|
|
|
|
|
file_object.write(b'bad keyfile')
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2017-11-09 15:54:46 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
|
|
|
repo_lib.import_ed25519_privatekey_from_file, invalid_keyfile, 'pw')
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-11 15:29:00 +00:00
|
|
|
# Invalid private key imported (contains unexpected keytype.)
|
|
|
|
|
imported_ed25519_key['keytype'] = 'invalid_keytype'
|
|
|
|
|
|
2017-11-09 15:54:46 +00:00
|
|
|
# Use 'pyca_crypto_keys.py' to bypass the key format validation performed by
|
2014-06-11 15:29:00 +00:00
|
|
|
# 'keys.py'.
|
|
|
|
|
salt, iterations, derived_key = \
|
2017-11-09 15:54:46 +00:00
|
|
|
securesystemslib.pyca_crypto_keys._generate_derived_key('pw')
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-11 15:29:00 +00:00
|
|
|
# Store the derived key info in a dictionary, the object expected
|
|
|
|
|
# by the non-public _encrypt() routine.
|
|
|
|
|
derived_key_information = {'salt': salt, 'iterations': iterations,
|
2017-11-09 15:54:46 +00:00
|
|
|
'derived_key': derived_key}
|
2014-06-11 15:29:00 +00:00
|
|
|
|
|
|
|
|
# Convert the key object to json string format and encrypt it with the
|
|
|
|
|
# derived key.
|
|
|
|
|
encrypted_key = \
|
2017-11-09 15:54:46 +00:00
|
|
|
securesystemslib.pyca_crypto_keys._encrypt(
|
|
|
|
|
json.dumps(imported_ed25519_key), derived_key_information)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-11 15:29:00 +00:00
|
|
|
with open(ed25519_keypath, 'wb') as file_object:
|
2017-11-09 15:54:46 +00:00
|
|
|
file_object.write(encrypted_key.encode('utf-8'))
|
2014-06-11 15:29:00 +00:00
|
|
|
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
2017-11-09 15:54:46 +00:00
|
|
|
repo_lib.import_ed25519_privatekey_from_file, ed25519_keypath, 'pw')
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_metadata_filenames(self):
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# Test normal case.
|
|
|
|
|
metadata_directory = os.path.join('metadata/')
|
|
|
|
|
filenames = {'root.json': metadata_directory + 'root.json',
|
|
|
|
|
'targets.json': metadata_directory + 'targets.json',
|
|
|
|
|
'snapshot.json': metadata_directory + 'snapshot.json',
|
|
|
|
|
'timestamp.json': metadata_directory + 'timestamp.json'}
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
self.assertEqual(filenames, repo_lib.get_metadata_filenames('metadata/'))
|
|
|
|
|
|
|
|
|
|
# If a directory argument is not specified, the current working directory
|
|
|
|
|
# is used.
|
|
|
|
|
metadata_directory = os.getcwd()
|
|
|
|
|
filenames = {'root.json': os.path.join(metadata_directory, 'root.json'),
|
|
|
|
|
'targets.json': os.path.join(metadata_directory, 'targets.json'),
|
|
|
|
|
'snapshot.json': os.path.join(metadata_directory, 'snapshot.json'),
|
|
|
|
|
'timestamp.json': os.path.join(metadata_directory, 'timestamp.json')}
|
|
|
|
|
self.assertEqual(filenames, repo_lib.get_metadata_filenames())
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Test improperly formatted argument.
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.get_metadata_filenames, 3)
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_metadata_fileinfo(self):
|
2017-01-10 22:05:12 +00:00
|
|
|
# Test normal case.
|
2014-06-03 18:32:44 +00:00
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
|
|
|
test_filepath = os.path.join(temporary_directory, 'file.txt')
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
with open(test_filepath, 'wt') as file_object:
|
|
|
|
|
file_object.write('test file')
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2016-06-17 13:57:54 +00:00
|
|
|
# Generate test fileinfo object. It is assumed SHA256 and SHA512 hashes
|
|
|
|
|
# are computed by get_metadata_fileinfo().
|
2014-06-03 18:32:44 +00:00
|
|
|
file_length = os.path.getsize(test_filepath)
|
2017-01-10 22:05:12 +00:00
|
|
|
sha256_digest_object = securesystemslib.hash.digest_filename(test_filepath)
|
|
|
|
|
sha512_digest_object = securesystemslib.hash.digest_filename(test_filepath, algorithm='sha512')
|
2016-06-17 13:57:54 +00:00
|
|
|
file_hashes = {'sha256': sha256_digest_object.hexdigest(),
|
|
|
|
|
'sha512': sha512_digest_object.hexdigest()}
|
2014-06-03 18:32:44 +00:00
|
|
|
fileinfo = {'length': file_length, 'hashes': file_hashes}
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertTrue(tuf.formats.FILEINFO_SCHEMA.matches(fileinfo))
|
|
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
self.assertEqual(fileinfo, repo_lib.get_metadata_fileinfo(test_filepath))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Test improperly formatted argument.
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.get_metadata_fileinfo, 3)
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# Test non-existent file.
|
|
|
|
|
nonexistent_filepath = os.path.join(temporary_directory, 'oops.txt')
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.Error, repo_lib.get_metadata_fileinfo,
|
2014-06-03 18:32:44 +00:00
|
|
|
nonexistent_filepath)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_get_target_hash(self):
|
2017-01-10 22:05:12 +00:00
|
|
|
# Test normal case.
|
2014-06-03 18:32:44 +00:00
|
|
|
expected_target_hashes = {
|
|
|
|
|
'/file1.txt': 'e3a3d89eb3b70ce3fbce6017d7b8c12d4abd5635427a0e8a238f53157df85b3d',
|
|
|
|
|
'/README.txt': '8faee106f1bb69f34aaf1df1e3c2e87d763c4d878cb96b91db13495e32ceb0b0',
|
2017-01-10 22:05:12 +00:00
|
|
|
'/packages/file2.txt': 'c9c4a5cdd84858dd6a23d98d7e6e6b2aec45034946c16b2200bc317c75415e92'
|
2014-06-03 18:32:44 +00:00
|
|
|
}
|
|
|
|
|
for filepath, target_hash in six.iteritems(expected_target_hashes):
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertTrue(securesystemslib.formats.RELPATH_SCHEMA.matches(filepath))
|
|
|
|
|
self.assertTrue(securesystemslib.formats.HASH_SCHEMA.matches(target_hash))
|
2014-06-03 18:32:44 +00:00
|
|
|
self.assertEqual(repo_lib.get_target_hash(filepath), target_hash)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# Test for improperly formatted argument.
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.get_target_hash, 8)
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_generate_root_metadata(self):
|
|
|
|
|
# Test normal case.
|
|
|
|
|
# Load the root metadata provided in 'tuf/tests/repository_data/'.
|
|
|
|
|
root_filepath = os.path.join('repository_data', 'repository',
|
|
|
|
|
'metadata', 'root.json')
|
2017-01-10 22:05:12 +00:00
|
|
|
root_signable = securesystemslib.util.load_json_file(root_filepath)
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
# generate_root_metadata() expects the top-level roles and keys to be
|
2017-01-10 22:05:12 +00:00
|
|
|
# available in 'tuf.keydb' and 'tuf.roledb'.
|
2014-06-03 18:32:44 +00:00
|
|
|
tuf.roledb.create_roledb_from_root_metadata(root_signable['signed'])
|
2017-01-10 22:05:12 +00:00
|
|
|
tuf.keydb.create_keydb_from_root_metadata(root_signable['signed'])
|
2014-06-03 18:32:44 +00:00
|
|
|
expires = '1985-10-21T01:22:00Z'
|
|
|
|
|
|
|
|
|
|
root_metadata = repo_lib.generate_root_metadata(1, expires,
|
2015-10-20 13:16:39 +00:00
|
|
|
consistent_snapshot=False)
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertTrue(tuf.formats.ROOT_SCHEMA.matches(root_metadata))
|
2014-06-03 18:32:44 +00:00
|
|
|
|
2016-08-18 12:26:53 +00:00
|
|
|
root_keyids = tuf.roledb.get_role_keyids('root')
|
2017-01-10 22:05:12 +00:00
|
|
|
tuf.keydb._keydb_dict['default'][root_keyids[0]]['keytype'] = 'bad_keytype'
|
|
|
|
|
self.assertRaises(securesystemslib.exceptions.Error, repo_lib.generate_root_metadata, 1,
|
2016-08-18 12:26:53 +00:00
|
|
|
expires, consistent_snapshot=False)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2016-08-18 12:26:53 +00:00
|
|
|
# Reset the root key's keytype, so that we can next verify that a different
|
2017-01-10 22:05:12 +00:00
|
|
|
# securesystemslib.exceptions.Error exception is raised for duplicate keyids.
|
|
|
|
|
tuf.keydb._keydb_dict['default'][root_keyids[0]]['keytype'] = 'rsa'
|
|
|
|
|
|
2016-08-18 12:26:53 +00:00
|
|
|
# Add duplicate keyid to root's roleinfo.
|
|
|
|
|
tuf.roledb._roledb_dict['default']['root']['keyids'].append(root_keyids[0])
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.Error, repo_lib.generate_root_metadata, 1,
|
2016-08-18 12:26:53 +00:00
|
|
|
expires, consistent_snapshot=False)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# Test improperly formatted arguments.
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_root_metadata,
|
2016-10-17 19:57:48 +00:00
|
|
|
'3', expires, False)
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_root_metadata,
|
|
|
|
|
1, '3', False)
|
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_root_metadata,
|
|
|
|
|
1, expires, 3)
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
# Test for missing required roles and keys.
|
|
|
|
|
tuf.roledb.clear_roledb()
|
2017-01-10 22:05:12 +00:00
|
|
|
tuf.keydb.clear_keydb()
|
|
|
|
|
self.assertRaises(securesystemslib.exceptions.Error, repo_lib.generate_root_metadata,
|
2014-06-03 18:32:44 +00:00
|
|
|
1, expires, False)
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_generate_targets_metadata(self):
|
2017-01-10 22:05:12 +00:00
|
|
|
# Test normal case.
|
2014-06-03 18:32:44 +00:00
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
|
|
|
targets_directory = os.path.join(temporary_directory, 'targets')
|
|
|
|
|
file1_path = os.path.join(targets_directory, 'file.txt')
|
2017-01-10 22:05:12 +00:00
|
|
|
securesystemslib.util.ensure_parent_dir(file1_path)
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
with open(file1_path, 'wt') as file_object:
|
|
|
|
|
file_object.write('test file.')
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-24 13:38:47 +00:00
|
|
|
# Set valid generate_targets_metadata() arguments. Add a custom field for
|
2014-06-25 13:45:55 +00:00
|
|
|
# the 'target_files' target set below.
|
2014-06-03 18:32:44 +00:00
|
|
|
version = 1
|
|
|
|
|
datetime_object = datetime.datetime(2030, 1, 1, 12, 0)
|
|
|
|
|
expiration_date = datetime_object.isoformat() + 'Z'
|
2017-01-10 22:05:12 +00:00
|
|
|
file_permissions = oct(os.stat(file1_path).st_mode)[4:]
|
2014-06-24 13:38:47 +00:00
|
|
|
target_files = {'file.txt': {'file_permission': file_permissions}}
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
delegations = {"keys": {
|
|
|
|
|
"a394c28384648328b16731f81440d72243c77bb44c07c040be99347f0df7d7bf": {
|
2017-01-10 22:05:12 +00:00
|
|
|
"keytype": "ed25519",
|
2014-06-03 18:32:44 +00:00
|
|
|
"keyval": {
|
|
|
|
|
"public": "3eb81026ded5af2c61fb3d4b272ac53cd1049a810ee88f4df1fc35cdaf918157"
|
|
|
|
|
}
|
|
|
|
|
}
|
2017-01-10 22:05:12 +00:00
|
|
|
},
|
2014-06-03 18:32:44 +00:00
|
|
|
"roles": [
|
|
|
|
|
{
|
|
|
|
|
"keyids": [
|
|
|
|
|
"a394c28384648328b16731f81440d72243c77bb44c07c040be99347f0df7d7bf"
|
2017-01-10 22:05:12 +00:00
|
|
|
],
|
|
|
|
|
"name": "targets/warehouse",
|
2014-06-03 18:32:44 +00:00
|
|
|
"paths": [
|
|
|
|
|
"/file1.txt", "/README.txt", '/warehouse/'
|
2017-01-10 22:05:12 +00:00
|
|
|
],
|
2014-06-03 18:32:44 +00:00
|
|
|
"threshold": 1
|
|
|
|
|
}
|
|
|
|
|
]
|
|
|
|
|
}
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
targets_metadata = \
|
|
|
|
|
repo_lib.generate_targets_metadata(targets_directory, target_files,
|
|
|
|
|
version, expiration_date, delegations,
|
|
|
|
|
False)
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertTrue(tuf.formats.TARGETS_SCHEMA.matches(targets_metadata))
|
|
|
|
|
|
2015-02-11 18:22:46 +00:00
|
|
|
# Valid arguments with 'delegations' set to None.
|
|
|
|
|
targets_metadata = \
|
|
|
|
|
repo_lib.generate_targets_metadata(targets_directory, target_files,
|
|
|
|
|
version, expiration_date, None,
|
|
|
|
|
False)
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertTrue(tuf.formats.TARGETS_SCHEMA.matches(targets_metadata))
|
2015-02-11 18:22:46 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# Verify that 'digest.filename' file is saved to 'targets_directory' if
|
|
|
|
|
# the 'write_consistent_targets' argument is True.
|
|
|
|
|
list_targets_directory = os.listdir(targets_directory)
|
|
|
|
|
targets_metadata = \
|
|
|
|
|
repo_lib.generate_targets_metadata(targets_directory, target_files,
|
|
|
|
|
version, expiration_date, delegations,
|
|
|
|
|
write_consistent_targets=True)
|
|
|
|
|
new_list_targets_directory = os.listdir(targets_directory)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# Verify that 'targets_directory' contains only one extra item.
|
|
|
|
|
self.assertTrue(len(list_targets_directory) + 1,
|
|
|
|
|
len(new_list_targets_directory))
|
|
|
|
|
|
2016-08-17 13:55:25 +00:00
|
|
|
# Verify that an exception is not raised if the target files already exist.
|
|
|
|
|
repo_lib.generate_targets_metadata(targets_directory, target_files,
|
|
|
|
|
version, expiration_date, delegations,
|
|
|
|
|
write_consistent_targets=True)
|
|
|
|
|
|
|
|
|
|
|
2014-06-24 13:38:47 +00:00
|
|
|
# Verify that 'targets_metadata' contains a 'custom' entry (optional)
|
|
|
|
|
# for 'file.txt'.
|
|
|
|
|
self.assertTrue('custom' in targets_metadata['targets']['file.txt'])
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
# Test improperly formatted arguments.
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_targets_metadata,
|
|
|
|
|
3, target_files, version, expiration_date)
|
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_targets_metadata,
|
|
|
|
|
targets_directory, 3, version, expiration_date)
|
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_targets_metadata,
|
|
|
|
|
targets_directory, target_files, '3', expiration_date)
|
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_targets_metadata,
|
|
|
|
|
targets_directory, target_files, version, '3')
|
|
|
|
|
|
|
|
|
|
# Improperly formatted 'delegations' and 'write_consistent_targets'
|
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_targets_metadata,
|
2014-06-03 18:32:44 +00:00
|
|
|
targets_directory, target_files, version, expiration_date,
|
2017-01-10 22:05:12 +00:00
|
|
|
3, False)
|
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_targets_metadata,
|
2014-06-03 18:32:44 +00:00
|
|
|
targets_directory, target_files, version, expiration_date,
|
2017-01-10 22:05:12 +00:00
|
|
|
delegations, 3)
|
2014-06-03 18:32:44 +00:00
|
|
|
|
2015-02-11 18:22:46 +00:00
|
|
|
# Test non-existent target file.
|
|
|
|
|
bad_target_file = \
|
|
|
|
|
{'non-existent.txt': {'file_permission': file_permissions}}
|
2014-06-03 18:32:44 +00:00
|
|
|
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.Error, repo_lib.generate_targets_metadata,
|
2015-02-11 18:22:46 +00:00
|
|
|
targets_directory, bad_target_file, version,
|
|
|
|
|
expiration_date)
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_generate_snapshot_metadata(self):
|
|
|
|
|
# Test normal case.
|
|
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
|
|
|
original_repository_path = os.path.join('repository_data',
|
|
|
|
|
'repository')
|
2017-01-10 22:05:12 +00:00
|
|
|
repository_directory = os.path.join(temporary_directory, 'repository')
|
2014-06-03 18:32:44 +00:00
|
|
|
shutil.copytree(original_repository_path, repository_directory)
|
|
|
|
|
metadata_directory = os.path.join(repository_directory,
|
|
|
|
|
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
|
2015-10-20 13:16:39 +00:00
|
|
|
targets_directory = os.path.join(repository_directory, repo_lib.TARGETS_DIRECTORY_NAME)
|
2014-06-03 18:32:44 +00:00
|
|
|
root_filename = os.path.join(metadata_directory, repo_lib.ROOT_FILENAME)
|
|
|
|
|
targets_filename = os.path.join(metadata_directory,
|
|
|
|
|
repo_lib.TARGETS_FILENAME)
|
|
|
|
|
version = 1
|
|
|
|
|
expiration_date = '1985-10-21T13:20:00Z'
|
2017-01-10 22:05:12 +00:00
|
|
|
|
|
|
|
|
# Load a valid repository so that top-level roles exist in roledb and
|
|
|
|
|
# generate_snapshot_metadata() has roles to specify in snapshot metadata.
|
2015-10-20 13:16:39 +00:00
|
|
|
repository = repo_tool.Repository(repository_directory, metadata_directory,
|
2015-10-28 15:45:01 +00:00
|
|
|
targets_directory)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2015-10-20 13:16:39 +00:00
|
|
|
repository_junk = repo_tool.load_repository(repository_directory)
|
|
|
|
|
|
2017-08-29 19:03:28 +00:00
|
|
|
# For testing purposes, store an invalid metadata file in the metadata directory
|
|
|
|
|
# to verify that it isn't loaded by generate_snapshot_metadata(). Unknown
|
|
|
|
|
# metadata file extensions should be ignored.
|
|
|
|
|
invalid_metadata_file = os.path.join(metadata_directory, 'role_file.xml')
|
|
|
|
|
with open(invalid_metadata_file, 'w') as file_object:
|
|
|
|
|
file_object.write('bad extension on metadata file')
|
|
|
|
|
|
2015-09-22 11:31:30 +00:00
|
|
|
root_filename = 'root'
|
|
|
|
|
targets_filename = 'targets'
|
2017-08-29 19:03:28 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
snapshot_metadata = \
|
|
|
|
|
repo_lib.generate_snapshot_metadata(metadata_directory, version,
|
2015-10-20 13:16:39 +00:00
|
|
|
expiration_date, root_filename,
|
|
|
|
|
targets_filename,
|
|
|
|
|
consistent_snapshot=False)
|
2016-11-09 22:10:05 +00:00
|
|
|
self.assertTrue(tuf.formats.SNAPSHOT_SCHEMA.matches(snapshot_metadata))
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# Test improperly formatted arguments.
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
|
2014-06-03 18:32:44 +00:00
|
|
|
3, version, expiration_date,
|
|
|
|
|
root_filename, targets_filename, consistent_snapshot=False)
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
|
2014-06-03 18:32:44 +00:00
|
|
|
metadata_directory, '3', expiration_date,
|
|
|
|
|
root_filename, targets_filename, consistent_snapshot=False)
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
|
2014-06-03 18:32:44 +00:00
|
|
|
metadata_directory, version, '3',
|
|
|
|
|
root_filename, targets_filename, consistent_snapshot=False)
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
|
2014-06-03 18:32:44 +00:00
|
|
|
metadata_directory, version, expiration_date,
|
|
|
|
|
3, targets_filename, consistent_snapshot=False)
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
|
2014-06-03 18:32:44 +00:00
|
|
|
metadata_directory, version, expiration_date,
|
|
|
|
|
root_filename, 3, consistent_snapshot=False)
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.generate_snapshot_metadata,
|
2014-06-03 18:32:44 +00:00
|
|
|
metadata_directory, version, expiration_date,
|
|
|
|
|
root_filename, targets_filename, 3)
|
2015-10-20 13:16:39 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_generate_timestamp_metadata(self):
|
|
|
|
|
# Test normal case.
|
2017-02-08 20:15:07 +00:00
|
|
|
repository_name = 'test_repository'
|
2014-06-03 18:32:44 +00:00
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
|
|
|
original_repository_path = os.path.join('repository_data',
|
|
|
|
|
'repository')
|
2017-01-10 22:05:12 +00:00
|
|
|
repository_directory = os.path.join(temporary_directory, 'repository')
|
2014-06-03 18:32:44 +00:00
|
|
|
shutil.copytree(original_repository_path, repository_directory)
|
|
|
|
|
metadata_directory = os.path.join(repository_directory,
|
|
|
|
|
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
|
2016-07-14 14:31:16 +00:00
|
|
|
targets_directory = os.path.join(repository_directory, repo_lib.TARGETS_DIRECTORY_NAME)
|
|
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
snapshot_filename = os.path.join(metadata_directory,
|
|
|
|
|
repo_lib.SNAPSHOT_FILENAME)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# Set valid generate_timestamp_metadata() arguments.
|
|
|
|
|
version = 1
|
|
|
|
|
expiration_date = '1985-10-21T13:20:00Z'
|
2017-01-10 22:05:12 +00:00
|
|
|
|
|
|
|
|
# Load a valid repository so that top-level roles exist in roledb and
|
|
|
|
|
# generate_snapshot_metadata() has roles to specify in snapshot metadata.
|
2016-07-14 14:31:16 +00:00
|
|
|
repository = repo_tool.Repository(repository_directory, metadata_directory,
|
2017-02-08 20:15:07 +00:00
|
|
|
targets_directory, repository_name)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2017-02-08 20:15:07 +00:00
|
|
|
repository_junk = repo_tool.load_repository(repository_directory,
|
|
|
|
|
repository_name)
|
2014-06-03 18:32:44 +00:00
|
|
|
|
2017-02-08 20:15:07 +00:00
|
|
|
timestamp_metadata = repo_lib.generate_timestamp_metadata(snapshot_filename,
|
|
|
|
|
version, expiration_date, repository_name)
|
2016-11-09 22:10:05 +00:00
|
|
|
self.assertTrue(tuf.formats.TIMESTAMP_SCHEMA.matches(timestamp_metadata))
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
# Test improperly formatted arguments.
|
2017-02-08 20:15:07 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
|
|
|
repo_lib.generate_timestamp_metadata, 3, version, expiration_date,
|
|
|
|
|
repository_name)
|
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
|
|
|
repo_lib.generate_timestamp_metadata, snapshot_filename, '3',
|
|
|
|
|
expiration_date, repository_name)
|
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
|
|
|
repo_lib.generate_timestamp_metadata, snapshot_filename, version, '3',
|
|
|
|
|
repository_name)
|
2015-10-27 20:11:11 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_sign_metadata(self):
|
|
|
|
|
# Test normal case.
|
2017-02-08 20:15:07 +00:00
|
|
|
repository_name = 'test_repository'
|
2014-06-03 18:32:44 +00:00
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
2017-02-08 20:15:07 +00:00
|
|
|
metadata_path = os.path.join('repository_data', 'repository', 'metadata')
|
|
|
|
|
keystore_path = os.path.join('repository_data', 'keystore')
|
2014-06-03 18:32:44 +00:00
|
|
|
root_filename = os.path.join(metadata_path, 'root.json')
|
2017-01-10 22:05:12 +00:00
|
|
|
root_metadata = securesystemslib.util.load_json_file(root_filename)['signed']
|
2016-08-17 13:55:25 +00:00
|
|
|
targets_filename = os.path.join(metadata_path, 'targets.json')
|
2017-01-10 22:05:12 +00:00
|
|
|
targets_metadata = securesystemslib.util.load_json_file(targets_filename)['signed']
|
|
|
|
|
|
2017-02-08 20:15:07 +00:00
|
|
|
tuf.keydb.create_keydb_from_root_metadata(root_metadata, repository_name)
|
|
|
|
|
tuf.roledb.create_roledb_from_root_metadata(root_metadata, repository_name)
|
|
|
|
|
root_keyids = tuf.roledb.get_role_keyids('root', repository_name)
|
|
|
|
|
targets_keyids = tuf.roledb.get_role_keyids('targets', repository_name)
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
root_private_keypath = os.path.join(keystore_path, 'root_key')
|
2017-02-08 20:15:07 +00:00
|
|
|
root_private_key = repo_lib.import_rsa_privatekey_from_file(root_private_keypath,
|
|
|
|
|
'password')
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2015-04-03 17:46:43 +00:00
|
|
|
# Sign with a valid, but not a threshold, key.
|
2016-08-17 13:55:25 +00:00
|
|
|
targets_public_keypath = os.path.join(keystore_path, 'targets_key.pub')
|
|
|
|
|
targets_public_key = \
|
|
|
|
|
repo_lib.import_ed25519_publickey_from_file(targets_public_keypath)
|
2015-04-03 17:46:43 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# sign_metadata() expects the private key 'root_metadata' to be in
|
2017-01-10 22:05:12 +00:00
|
|
|
# 'tuf.keydb'. Remove any public keys that may be loaded before
|
2014-06-03 18:32:44 +00:00
|
|
|
# adding private key, otherwise a 'tuf.KeyAlreadyExists' exception is
|
|
|
|
|
# raised.
|
2017-02-08 20:15:07 +00:00
|
|
|
tuf.keydb.remove_key(root_private_key['keyid'],
|
|
|
|
|
repository_name=repository_name)
|
|
|
|
|
tuf.keydb.add_key(root_private_key, repository_name=repository_name)
|
|
|
|
|
tuf.keydb.remove_key(targets_public_key['keyid'], repository_name=repository_name)
|
|
|
|
|
tuf.keydb.add_key(targets_public_key, repository_name=repository_name)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2016-08-17 13:55:25 +00:00
|
|
|
# Verify that a valid root signable is generated.
|
2014-06-03 18:32:44 +00:00
|
|
|
root_signable = repo_lib.sign_metadata(root_metadata, root_keyids,
|
2017-02-08 20:15:07 +00:00
|
|
|
root_filename, repository_name)
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertTrue(tuf.formats.SIGNABLE_SCHEMA.matches(root_signable))
|
2014-06-03 18:32:44 +00:00
|
|
|
|
2016-08-17 13:55:25 +00:00
|
|
|
# Test for an unset private key (in this case, target's).
|
2017-02-08 20:15:07 +00:00
|
|
|
repo_lib.sign_metadata(targets_metadata, targets_keyids, targets_filename,
|
|
|
|
|
repository_name)
|
2016-08-17 13:55:25 +00:00
|
|
|
|
2016-08-16 18:32:04 +00:00
|
|
|
# Add an invalid keytype to one of the root keys.
|
|
|
|
|
root_keyid = root_keyids[0]
|
2017-02-08 20:15:07 +00:00
|
|
|
tuf.keydb._keydb_dict[repository_name][root_keyid]['keytype'] = 'bad_keytype'
|
|
|
|
|
self.assertRaises(securesystemslib.exceptions.Error, repo_lib.sign_metadata,
|
|
|
|
|
root_metadata, root_keyids, root_filename, repository_name)
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
# Test improperly formatted arguments.
|
2017-02-08 20:15:07 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
|
|
|
repo_lib.sign_metadata, 3, root_keyids, 'root.json', repository_name)
|
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
|
|
|
repo_lib.sign_metadata, root_metadata, 3, 'root.json', repository_name)
|
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
|
|
|
repo_lib.sign_metadata, root_metadata, root_keyids, 3, repository_name)
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_write_metadata_file(self):
|
|
|
|
|
# Test normal case.
|
|
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
2017-02-08 20:15:07 +00:00
|
|
|
metadata_directory = os.path.join('repository_data', 'repository', 'metadata')
|
2014-06-03 18:32:44 +00:00
|
|
|
root_filename = os.path.join(metadata_directory, 'root.json')
|
2017-01-10 22:05:12 +00:00
|
|
|
root_signable = securesystemslib.util.load_json_file(root_filename)
|
|
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
output_filename = os.path.join(temporary_directory, 'root.json')
|
2015-10-20 13:16:39 +00:00
|
|
|
version_number = root_signable['signed']['version'] + 1
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
self.assertFalse(os.path.exists(output_filename))
|
2017-02-08 20:15:07 +00:00
|
|
|
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
|
2017-08-25 19:38:40 +00:00
|
|
|
consistent_snapshot=False)
|
2014-06-03 18:32:44 +00:00
|
|
|
self.assertTrue(os.path.exists(output_filename))
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2016-08-16 16:20:28 +00:00
|
|
|
# Attempt to over-write the previously written metadata file. An exception
|
2017-01-10 22:05:12 +00:00
|
|
|
# is not raised in this case, only a debug message is logged.
|
2017-02-08 20:15:07 +00:00
|
|
|
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
|
2017-08-25 19:38:40 +00:00
|
|
|
consistent_snapshot=False)
|
2014-06-03 18:32:44 +00:00
|
|
|
|
2016-10-26 16:01:09 +00:00
|
|
|
# Try to write a consistent metadate file. An exception is not raised in
|
|
|
|
|
# this case. For testing purposes, root.json should be a hard link to the
|
|
|
|
|
# consistent metadata file. We should verify that root.json points to
|
|
|
|
|
# the latest consistent files.
|
2017-01-10 22:05:12 +00:00
|
|
|
tuf.settings.CONSISTENT_METHOD = 'hard_link'
|
2017-02-08 20:15:07 +00:00
|
|
|
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
|
2017-08-25 19:38:40 +00:00
|
|
|
consistent_snapshot=True)
|
2016-10-25 22:28:06 +00:00
|
|
|
|
2016-10-26 16:01:09 +00:00
|
|
|
# Test if the consistent files are properly named
|
|
|
|
|
# Filename format of a consistent file: <version number>.rolename.json
|
2016-10-25 22:28:06 +00:00
|
|
|
version_and_filename = str(version_number) + '.' + 'root.json'
|
|
|
|
|
first_version_output_file = os.path.join(temporary_directory, version_and_filename)
|
|
|
|
|
self.assertTrue(os.path.exists(first_version_output_file))
|
|
|
|
|
|
2018-04-27 19:04:45 +00:00
|
|
|
# Verify that the consistent file content is equal to 'output_filename'.
|
|
|
|
|
self.assertEqual(
|
|
|
|
|
securesystemslib.util.get_file_details(output_filename),
|
|
|
|
|
securesystemslib.util.get_file_details(first_version_output_file))
|
|
|
|
|
|
2016-10-25 22:28:06 +00:00
|
|
|
# Try to add more consistent metadata files.
|
|
|
|
|
version_number += 1
|
2018-04-27 19:04:45 +00:00
|
|
|
root_signable['signed']['version'] = version_number
|
2016-10-25 22:28:06 +00:00
|
|
|
repo_lib.write_metadata_file(root_signable, output_filename,
|
2017-08-25 19:38:40 +00:00
|
|
|
version_number, consistent_snapshot=True)
|
2016-10-25 22:28:06 +00:00
|
|
|
|
|
|
|
|
# Test if the the latest root.json points to the expected consistent file
|
2016-10-26 16:01:09 +00:00
|
|
|
# and consistent metadata do not all point to the same root.json
|
2016-10-25 22:28:06 +00:00
|
|
|
version_and_filename = str(version_number) + '.' + 'root.json'
|
|
|
|
|
second_version_output_file = os.path.join(temporary_directory, version_and_filename)
|
|
|
|
|
self.assertTrue(os.path.exists(second_version_output_file))
|
2018-04-27 19:04:45 +00:00
|
|
|
|
|
|
|
|
# Verify that the second version is equal to the second output file, and
|
|
|
|
|
# that the second output filename differs from the first.
|
|
|
|
|
self.assertEqual(securesystemslib.util.get_file_details(output_filename),
|
|
|
|
|
securesystemslib.util.get_file_details(second_version_output_file))
|
|
|
|
|
self.assertNotEqual(securesystemslib.util.get_file_details(output_filename),
|
|
|
|
|
securesystemslib.util.get_file_details(first_version_output_file))
|
2016-10-25 22:28:06 +00:00
|
|
|
|
2016-11-01 19:38:40 +00:00
|
|
|
# Test for an improper settings.CONSISTENT_METHOD string value.
|
2017-01-10 22:05:12 +00:00
|
|
|
tuf.settings.CONSISTENT_METHOD = 'somebadidea'
|
2018-04-27 19:43:55 +00:00
|
|
|
|
|
|
|
|
# Test for invalid consistent methods on systems other than Windows,
|
|
|
|
|
# which always uses the copy method.
|
|
|
|
|
if platform.system() == 'Windows':
|
|
|
|
|
pass
|
|
|
|
|
|
|
|
|
|
else:
|
|
|
|
|
self.assertRaises(securesystemslib.exceptions.InvalidConfigurationError,
|
|
|
|
|
repo_lib.write_metadata_file, root_signable, output_filename,
|
|
|
|
|
version_number, consistent_snapshot=True)
|
2016-10-26 21:04:06 +00:00
|
|
|
|
|
|
|
|
# Try to create a link to root.json when root.json doesn't exist locally.
|
|
|
|
|
# repository_lib should log a message if this is the case.
|
2017-01-10 22:05:12 +00:00
|
|
|
tuf.settings.CONSISTENT_METHOD = 'hard_link'
|
2016-10-26 21:04:06 +00:00
|
|
|
os.remove(output_filename)
|
2017-02-08 20:15:07 +00:00
|
|
|
repo_lib.write_metadata_file(root_signable, output_filename, version_number,
|
2017-08-25 19:38:40 +00:00
|
|
|
consistent_snapshot=True)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2016-10-26 17:07:04 +00:00
|
|
|
# Reset CONSISTENT_METHOD so that subsequent tests work as expected.
|
2017-01-10 22:05:12 +00:00
|
|
|
tuf.settings.CONSISTENT_METHOD = 'copy'
|
|
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# Test improperly formatted arguments.
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
|
2017-08-25 19:38:40 +00:00
|
|
|
3, output_filename, version_number, False)
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
|
2017-08-25 19:38:40 +00:00
|
|
|
root_signable, 3, version_number, False)
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
|
2017-08-25 19:38:40 +00:00
|
|
|
root_signable, output_filename, '3', False)
|
2017-01-10 22:05:12 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError, repo_lib.write_metadata_file,
|
2017-08-25 19:38:40 +00:00
|
|
|
root_signable, output_filename, version_number, 3)
|
|
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_create_tuf_client_directory(self):
|
|
|
|
|
# Test normal case.
|
|
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
2017-08-25 19:38:40 +00:00
|
|
|
repository_directory = os.path.join('repository_data', 'repository')
|
2014-06-03 18:32:44 +00:00
|
|
|
client_directory = os.path.join(temporary_directory, 'client')
|
|
|
|
|
|
|
|
|
|
repo_lib.create_tuf_client_directory(repository_directory, client_directory)
|
|
|
|
|
|
|
|
|
|
self.assertTrue(os.path.exists(client_directory))
|
|
|
|
|
metadata_directory = os.path.join(client_directory, 'metadata')
|
|
|
|
|
current_directory = os.path.join(metadata_directory, 'current')
|
|
|
|
|
previous_directory = os.path.join(metadata_directory, 'previous')
|
|
|
|
|
self.assertTrue(os.path.exists(client_directory))
|
|
|
|
|
self.assertTrue(os.path.exists(metadata_directory))
|
|
|
|
|
self.assertTrue(os.path.exists(current_directory))
|
|
|
|
|
self.assertTrue(os.path.exists(previous_directory))
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# Test improperly formatted arguments.
|
2017-08-25 19:38:40 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
|
|
|
repo_lib.create_tuf_client_directory, 3, client_directory)
|
|
|
|
|
self.assertRaises(securesystemslib.exceptions.FormatError,
|
|
|
|
|
repo_lib.create_tuf_client_directory, repository_directory, 3)
|
2014-06-03 18:32:44 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
# Test invalid argument (i.e., client directory already exists.)
|
2017-08-25 19:38:40 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.RepositoryError,
|
|
|
|
|
repo_lib.create_tuf_client_directory, repository_directory,
|
|
|
|
|
client_directory)
|
2014-06-03 18:32:44 +00:00
|
|
|
|
2014-06-19 13:50:20 +00:00
|
|
|
# Test invalid client metadata directory (i.e., non-errno.EEXIST exceptions
|
2017-01-10 22:05:12 +00:00
|
|
|
# should be re-raised.)
|
2014-06-19 13:50:20 +00:00
|
|
|
shutil.rmtree(metadata_directory)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2018-04-20 15:27:08 +00:00
|
|
|
# Save the original metadata directory name so that it can be restored
|
|
|
|
|
# after testing.
|
|
|
|
|
metadata_directory_name = repo_lib.METADATA_DIRECTORY_NAME
|
|
|
|
|
repo_lib.METADATA_DIRECTORY_NAME = '/'
|
2014-06-19 13:50:20 +00:00
|
|
|
|
2018-04-20 15:27:08 +00:00
|
|
|
# Creation of the '/' directory is forbidden on all supported OSs. The '/'
|
|
|
|
|
# argument to create_tuf_client_directory should cause it to re-raise a
|
|
|
|
|
# non-errno.EEXIST exception.
|
2018-04-20 17:36:49 +00:00
|
|
|
self.assertRaises((OSError, securesystemslib.exceptions.RepositoryError),
|
|
|
|
|
repo_lib.create_tuf_client_directory, repository_directory, '/')
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2018-04-20 15:27:08 +00:00
|
|
|
# Restore the metadata directory name in repo_lib.
|
|
|
|
|
repo_lib.METADATA_DIRECTORY_NAME = metadata_directory_name
|
2014-06-19 13:50:20 +00:00
|
|
|
|
|
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
|
2014-06-11 15:29:00 +00:00
|
|
|
def test__check_directory(self):
|
|
|
|
|
# Test for non-existent directory.
|
2017-08-25 19:38:40 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.Error,
|
|
|
|
|
repo_lib._check_directory, 'non-existent')
|
2014-06-11 15:29:00 +00:00
|
|
|
|
|
|
|
|
|
|
|
|
|
|
2015-04-02 20:37:11 +00:00
|
|
|
def test__generate_and_write_metadata(self):
|
|
|
|
|
# Test for invalid, or unsupported, rolename.
|
|
|
|
|
# Load the root metadata provided in 'tuf/tests/repository_data/'.
|
2017-02-08 20:15:07 +00:00
|
|
|
repository_name = 'repository_name'
|
2015-04-02 20:37:11 +00:00
|
|
|
root_filepath = os.path.join('repository_data', 'repository',
|
|
|
|
|
'metadata', 'root.json')
|
2017-01-10 22:05:12 +00:00
|
|
|
root_signable = securesystemslib.util.load_json_file(root_filepath)
|
2015-04-02 20:37:11 +00:00
|
|
|
|
|
|
|
|
# _generate_and_write_metadata() expects the top-level roles
|
|
|
|
|
# (specifically 'snapshot') and keys to be available in 'tuf.roledb'.
|
2017-02-08 20:15:07 +00:00
|
|
|
tuf.roledb.create_roledb_from_root_metadata(root_signable['signed'],
|
|
|
|
|
repository_name)
|
2015-04-02 20:37:11 +00:00
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
|
|
|
|
targets_directory = os.path.join(temporary_directory, 'targets')
|
|
|
|
|
os.mkdir(targets_directory)
|
2017-01-10 22:05:12 +00:00
|
|
|
repository_directory = os.path.join(temporary_directory, 'repository')
|
2015-04-02 20:37:11 +00:00
|
|
|
metadata_directory = os.path.join(repository_directory,
|
2017-02-08 20:15:07 +00:00
|
|
|
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
|
2015-04-02 20:37:11 +00:00
|
|
|
targets_metadata = os.path.join('repository_data', 'repository', 'metadata',
|
2017-02-08 20:15:07 +00:00
|
|
|
'targets.json')
|
|
|
|
|
obsolete_metadata = os.path.join(metadata_directory, 'obsolete_role.json')
|
2017-01-10 22:05:12 +00:00
|
|
|
securesystemslib.util.ensure_parent_dir(obsolete_metadata)
|
2015-04-02 20:37:11 +00:00
|
|
|
shutil.copyfile(targets_metadata, obsolete_metadata)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2015-04-02 20:37:11 +00:00
|
|
|
# Verify that obsolete metadata (a metadata file exists on disk, but the
|
|
|
|
|
# role is unavailable in 'tuf.roledb'). First add the obsolete
|
|
|
|
|
# role to 'tuf.roledb' so that its metadata file can be written to disk.
|
2017-02-08 20:15:07 +00:00
|
|
|
targets_roleinfo = tuf.roledb.get_roleinfo('targets', repository_name)
|
2015-04-02 20:37:11 +00:00
|
|
|
targets_roleinfo['version'] = 1
|
|
|
|
|
expiration = \
|
2016-11-09 22:10:05 +00:00
|
|
|
tuf.formats.unix_timestamp_to_datetime(int(time.time() + 86400))
|
2017-01-10 22:05:12 +00:00
|
|
|
expiration = expiration.isoformat() + 'Z'
|
|
|
|
|
targets_roleinfo['expires'] = expiration
|
2017-02-08 20:15:07 +00:00
|
|
|
tuf.roledb.add_role('obsolete_role', targets_roleinfo,
|
|
|
|
|
repository_name=repository_name)
|
2016-08-18 12:26:53 +00:00
|
|
|
|
|
|
|
|
repo_lib._generate_and_write_metadata('obsolete_role', obsolete_metadata,
|
2017-02-08 20:15:07 +00:00
|
|
|
targets_directory, metadata_directory, consistent_snapshot=False,
|
2017-08-25 19:38:40 +00:00
|
|
|
filenames=None, repository_name=repository_name)
|
2015-04-02 20:37:11 +00:00
|
|
|
|
|
|
|
|
snapshot_filepath = os.path.join('repository_data', 'repository',
|
|
|
|
|
'metadata', 'snapshot.json')
|
2017-01-10 22:05:12 +00:00
|
|
|
snapshot_signable = securesystemslib.util.load_json_file(snapshot_filepath)
|
2017-02-08 20:15:07 +00:00
|
|
|
tuf.roledb.remove_role('obsolete_role', repository_name)
|
2015-04-02 20:37:11 +00:00
|
|
|
self.assertTrue(os.path.exists(os.path.join(metadata_directory,
|
2016-08-18 12:26:53 +00:00
|
|
|
'obsolete_role.json')))
|
2015-04-02 20:37:11 +00:00
|
|
|
tuf.repository_lib._delete_obsolete_metadata(metadata_directory,
|
2017-02-08 20:15:07 +00:00
|
|
|
snapshot_signable['signed'], False, repository_name)
|
2016-08-18 12:26:53 +00:00
|
|
|
self.assertFalse(os.path.exists(metadata_directory + 'obsolete_role.json'))
|
|
|
|
|
shutil.copyfile(targets_metadata, obsolete_metadata)
|
|
|
|
|
|
2015-04-02 20:37:11 +00:00
|
|
|
|
|
|
|
|
|
2016-08-26 20:42:21 +00:00
|
|
|
def test__delete_obsolete_metadata(self):
|
2017-02-08 20:15:07 +00:00
|
|
|
repository_name = 'test_repository'
|
2016-08-26 20:42:21 +00:00
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
2017-01-10 22:05:12 +00:00
|
|
|
repository_directory = os.path.join(temporary_directory, 'repository')
|
2016-08-26 20:42:21 +00:00
|
|
|
metadata_directory = os.path.join(repository_directory,
|
2017-02-08 20:15:07 +00:00
|
|
|
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
|
2016-08-26 20:42:21 +00:00
|
|
|
os.makedirs(metadata_directory)
|
|
|
|
|
snapshot_filepath = os.path.join('repository_data', 'repository',
|
2017-02-08 20:15:07 +00:00
|
|
|
'metadata', 'snapshot.json')
|
2017-01-10 22:05:12 +00:00
|
|
|
snapshot_signable = securesystemslib.util.load_json_file(snapshot_filepath)
|
|
|
|
|
|
2016-08-26 20:42:21 +00:00
|
|
|
# Create role metadata that should not exist in snapshot.json.
|
2017-02-08 20:15:07 +00:00
|
|
|
role1_filepath = os.path.join('repository_data', 'repository', 'metadata',
|
|
|
|
|
'role1.json')
|
2016-08-26 20:42:21 +00:00
|
|
|
shutil.copyfile(role1_filepath, os.path.join(metadata_directory, 'role2.json'))
|
|
|
|
|
|
|
|
|
|
repo_lib._delete_obsolete_metadata(metadata_directory,
|
2017-02-08 20:15:07 +00:00
|
|
|
snapshot_signable['signed'], True, repository_name)
|
2016-08-26 20:42:21 +00:00
|
|
|
|
2016-11-10 21:21:57 +00:00
|
|
|
# _delete_obsolete_metadata should never delete root.json.
|
2017-02-08 20:15:07 +00:00
|
|
|
root_filepath = os.path.join('repository_data', 'repository', 'metadata',
|
|
|
|
|
'root.json')
|
2016-11-10 21:21:57 +00:00
|
|
|
shutil.copyfile(root_filepath, os.path.join(metadata_directory, 'root.json'))
|
|
|
|
|
repo_lib._delete_obsolete_metadata(metadata_directory,
|
2017-02-08 20:15:07 +00:00
|
|
|
snapshot_signable['signed'], True, repository_name)
|
2016-11-10 21:21:57 +00:00
|
|
|
self.assertTrue(os.path.exists(os.path.join(metadata_directory, 'root.json')))
|
|
|
|
|
|
|
|
|
|
# Verify what happens for a non-existent metadata directory (a debug
|
|
|
|
|
# message is logged).
|
2016-08-30 14:56:07 +00:00
|
|
|
repo_lib._delete_obsolete_metadata('non-existent',
|
2017-02-08 20:15:07 +00:00
|
|
|
snapshot_signable['signed'], True, repository_name)
|
2016-08-30 14:56:07 +00:00
|
|
|
|
2016-08-26 20:42:21 +00:00
|
|
|
|
2016-08-30 21:09:03 +00:00
|
|
|
def test__load_top_level_metadata(self):
|
2017-02-08 20:15:07 +00:00
|
|
|
repository_name = 'test_repository'
|
2016-08-30 21:09:03 +00:00
|
|
|
|
|
|
|
|
temporary_directory = tempfile.mkdtemp(dir=self.temporary_directory)
|
2017-01-10 22:05:12 +00:00
|
|
|
repository_directory = os.path.join(temporary_directory, 'repository')
|
2016-08-30 21:09:03 +00:00
|
|
|
metadata_directory = os.path.join(repository_directory,
|
2017-02-08 20:15:07 +00:00
|
|
|
repo_lib.METADATA_STAGED_DIRECTORY_NAME)
|
2016-08-30 21:09:03 +00:00
|
|
|
targets_directory = os.path.join(repository_directory,
|
2017-02-08 20:15:07 +00:00
|
|
|
repo_lib.TARGETS_DIRECTORY_NAME)
|
2016-08-30 21:09:03 +00:00
|
|
|
shutil.copytree(os.path.join('repository_data', 'repository', 'metadata'),
|
2017-02-08 20:15:07 +00:00
|
|
|
metadata_directory)
|
2016-08-30 21:09:03 +00:00
|
|
|
shutil.copytree(os.path.join('repository_data', 'repository', 'targets'),
|
2017-02-08 20:15:07 +00:00
|
|
|
targets_directory)
|
2016-08-30 21:09:03 +00:00
|
|
|
|
2016-09-01 15:07:02 +00:00
|
|
|
# Add a duplicate signature to the Root file for testing purposes).
|
|
|
|
|
root_file = os.path.join(metadata_directory, 'root.json')
|
2017-01-10 22:05:12 +00:00
|
|
|
signable = securesystemslib.util.load_json_file(os.path.join(metadata_directory, 'root.json'))
|
2016-09-01 15:07:02 +00:00
|
|
|
signable['signatures'].append(signable['signatures'][0])
|
|
|
|
|
|
2017-08-25 19:38:40 +00:00
|
|
|
repo_lib.write_metadata_file(signable, root_file, 8, False)
|
2016-09-01 15:07:02 +00:00
|
|
|
|
2017-07-13 18:40:52 +00:00
|
|
|
# Attempt to load a repository that contains a compressed Root file.
|
|
|
|
|
repository = repo_tool.create_new_repository(repository_directory, repository_name)
|
|
|
|
|
filenames = repo_lib.get_metadata_filenames(metadata_directory)
|
|
|
|
|
repo_lib._load_top_level_metadata(repository, filenames, repository_name)
|
|
|
|
|
|
2016-08-30 21:09:03 +00:00
|
|
|
filenames = repo_lib.get_metadata_filenames(metadata_directory)
|
2017-02-08 20:15:07 +00:00
|
|
|
repository = repo_tool.create_new_repository(repository_directory, repository_name)
|
|
|
|
|
repo_lib._load_top_level_metadata(repository, filenames, repository_name)
|
2016-08-30 21:09:03 +00:00
|
|
|
|
|
|
|
|
# Partially write all top-level roles (we increase the threshold of each
|
|
|
|
|
# top-level role so that they are flagged as partially written.
|
|
|
|
|
repository.root.threshold = repository.root.threshold + 1
|
|
|
|
|
repository.snapshot.threshold = repository.snapshot.threshold + 1
|
|
|
|
|
repository.targets.threshold = repository.targets.threshold + 1
|
|
|
|
|
repository.timestamp.threshold = repository.timestamp.threshold + 1
|
2016-09-15 19:24:00 +00:00
|
|
|
repository.write('root', )
|
|
|
|
|
repository.write('snapshot')
|
|
|
|
|
repository.write('targets')
|
|
|
|
|
repository.write('timestamp')
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2017-02-08 20:15:07 +00:00
|
|
|
repo_lib._load_top_level_metadata(repository, filenames, repository_name)
|
2016-08-30 21:09:03 +00:00
|
|
|
|
|
|
|
|
# Attempt to load a repository with missing top-level metadata.
|
|
|
|
|
for role_file in os.listdir(metadata_directory):
|
|
|
|
|
if role_file.endswith('.json') and not role_file.startswith('root'):
|
|
|
|
|
role_filename = os.path.join(metadata_directory, role_file)
|
|
|
|
|
os.remove(role_filename)
|
2017-02-08 20:15:07 +00:00
|
|
|
repo_lib._load_top_level_metadata(repository, filenames, repository_name)
|
2016-08-30 21:09:03 +00:00
|
|
|
|
|
|
|
|
# Remove the required Root file and verify that an exception is raised.
|
|
|
|
|
os.remove(os.path.join(metadata_directory, 'root.json'))
|
2017-02-08 20:15:07 +00:00
|
|
|
self.assertRaises(securesystemslib.exceptions.RepositoryError,
|
|
|
|
|
repo_lib._load_top_level_metadata, repository, filenames,
|
|
|
|
|
repository_name)
|
2016-08-30 21:09:03 +00:00
|
|
|
|
|
|
|
|
|
2015-04-02 20:37:11 +00:00
|
|
|
|
|
|
|
|
def test__remove_invalid_and_duplicate_signatures(self):
|
|
|
|
|
# Remove duplicate PSS signatures (same key generates valid, but different
|
|
|
|
|
# signatures). First load a valid signable (in this case, the root role).
|
2017-02-08 20:15:07 +00:00
|
|
|
repository_name = 'test_repository'
|
2015-04-02 20:37:11 +00:00
|
|
|
root_filepath = os.path.join('repository_data', 'repository',
|
2017-02-08 20:15:07 +00:00
|
|
|
'metadata', 'root.json')
|
2017-01-10 22:05:12 +00:00
|
|
|
root_signable = securesystemslib.util.load_json_file(root_filepath)
|
2016-07-25 15:20:30 +00:00
|
|
|
key_filepath = os.path.join('repository_data', 'keystore', 'root_key')
|
|
|
|
|
root_rsa_key = repo_lib.import_rsa_privatekey_from_file(key_filepath,
|
2017-02-08 20:15:07 +00:00
|
|
|
'password')
|
2017-01-10 22:05:12 +00:00
|
|
|
|
|
|
|
|
# Add 'root_rsa_key' to tuf.keydb, since
|
2016-07-25 15:20:30 +00:00
|
|
|
# _remove_invalid_and_duplicate_signatures() checks for unknown keys in
|
2017-01-10 22:05:12 +00:00
|
|
|
# tuf.keydb.
|
2017-02-08 20:15:07 +00:00
|
|
|
tuf.keydb.add_key(root_rsa_key, repository_name=repository_name)
|
2015-04-02 20:37:11 +00:00
|
|
|
|
|
|
|
|
# Append the new valid, but duplicate PSS signature, and test that
|
2016-07-25 15:20:30 +00:00
|
|
|
# duplicates are removed. create_signature() generates a key for the
|
|
|
|
|
# key type of the first argument (i.e., root_rsa_key).
|
2017-01-10 22:05:12 +00:00
|
|
|
new_pss_signature = securesystemslib.keys.create_signature(root_rsa_key,
|
2017-02-08 20:15:07 +00:00
|
|
|
root_signable['signed'])
|
2015-04-02 20:37:11 +00:00
|
|
|
root_signable['signatures'].append(new_pss_signature)
|
2016-07-25 15:20:30 +00:00
|
|
|
|
2015-04-02 20:37:11 +00:00
|
|
|
expected_number_of_signatures = len(root_signable['signatures'])
|
2017-02-08 20:15:07 +00:00
|
|
|
tuf.repository_lib._remove_invalid_and_duplicate_signatures(root_signable,
|
|
|
|
|
repository_name)
|
2015-04-02 20:37:11 +00:00
|
|
|
self.assertEqual(len(root_signable), expected_number_of_signatures)
|
|
|
|
|
|
2016-09-20 15:40:38 +00:00
|
|
|
# Test for an invalid keyid.
|
|
|
|
|
root_signable['signatures'][0]['keyid'] = '404'
|
2017-02-08 20:15:07 +00:00
|
|
|
tuf.repository_lib._remove_invalid_and_duplicate_signatures(root_signable,
|
|
|
|
|
repository_name)
|
2017-01-10 22:05:12 +00:00
|
|
|
|
2016-09-20 15:40:38 +00:00
|
|
|
# Re-add a valid signature for the following test condition.
|
|
|
|
|
root_signable['signatures'].append(new_pss_signature)
|
|
|
|
|
|
2016-09-20 15:02:53 +00:00
|
|
|
# Test that an exception is not raised if an invalid sig is present,
|
|
|
|
|
# and that the duplicate key is removed 'root_signable'.
|
|
|
|
|
root_signable['signatures'][0]['sig'] = '4040'
|
2017-01-10 22:05:12 +00:00
|
|
|
invalid_keyid = root_signable['signatures'][0]['keyid']
|
2017-02-08 20:15:07 +00:00
|
|
|
tuf.repository_lib._remove_invalid_and_duplicate_signatures(root_signable,
|
|
|
|
|
repository_name)
|
2015-04-02 20:37:11 +00:00
|
|
|
|
2016-09-20 15:02:53 +00:00
|
|
|
for signature in root_signable['signatures']:
|
|
|
|
|
self.assertFalse(invalid_keyid == signature['keyid'])
|
|
|
|
|
|
2015-04-02 20:37:11 +00:00
|
|
|
|
2016-07-25 15:20:30 +00:00
|
|
|
|
2014-06-03 18:32:44 +00:00
|
|
|
# Run the test cases.
|
|
|
|
|
if __name__ == '__main__':
|
|
|
|
|
unittest.main()
|