mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
This commit performs restructuring on the recently added metadata class model architecture, which shall be part of a new simple TUF API. The key change is that the Metadata class is now used as container for inner TUF metadata (Root, Timestamp, Snapshot, Targets) instead of serving as base class for these, that means we use 'composition' instead of 'inheritance'. Still, in order to aggregate common attributes of the inner Metadata (expires, version, spec_version), we use a new baseclass 'Signed', which also corresponds to the signed field of the outer metadata container. Based on prior observations in TUF's sister project in-toto, this architecture seems to more closely represent the metadata model as it is defined in the specification (see in-toto/in-toto#98 and in-toto/in-toto#142 for related discussions). Note that the proposed changes require us to now access some attributes/methods via the signed attribute of a Metadata object and not directly on the Metadata object, but it would be possible to add short-cuts. (see todo notes in doc header). Further changes include: - Add minimal doc header with TODO notes - Make attributes that correspond to fields in TUF JSON metadata public again. There doesn't seem to be a good reason to protect them with leading underscores and use setters/getters instead, it just adds more code. - Generally try to reduce code. - Remove keyring and consistent_snapshot attributes from metadata class. As discussed in #1060 they are a better fit for extra management code (also see #660) - Remove sslib schema checks (see TODO notes about validation in doc header) - Drop usage of build_dict_conforming_to_schema, it seems a lot simpler and more explicit to just code this here. - ... same goes for make_metadata_fileinfo - Adapt tests accordingly TODO: Document!!! Signed-off-by: Lukas Puehringer <lukas.puehringer@nyu.edu>
218 lines
6.9 KiB
Python
218 lines
6.9 KiB
Python
#!/usr/bin/env python
|
|
|
|
# Copyright 2020, New York University and the TUF contributors
|
|
# SPDX-License-Identifier: MIT OR Apache-2.0
|
|
|
|
"""
|
|
<Program Name>
|
|
test_tuf_api.py
|
|
|
|
<Author>
|
|
Joshua Lock <jlock@vmware.com>
|
|
|
|
<Started>
|
|
June 30, 2020.
|
|
|
|
<Copyright>
|
|
See LICENSE-MIT OR LICENSE for licensing information.
|
|
|
|
<Purpose>
|
|
Unit tests for tuf.api
|
|
"""
|
|
|
|
# Help with Python 3 compatibility, where the print statement is a function, an
|
|
# implicit relative import is invalid, and the '/' operator performs true
|
|
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
|
|
from __future__ import print_function
|
|
from __future__ import absolute_import
|
|
from __future__ import division
|
|
from __future__ import unicode_literals
|
|
|
|
import unittest
|
|
import logging
|
|
import tempfile
|
|
import shutil
|
|
import sys
|
|
import errno
|
|
import os
|
|
from datetime import timedelta
|
|
from dateutil.relativedelta import relativedelta
|
|
|
|
from tuf.api import metadata
|
|
from tuf.api import keys
|
|
|
|
import iso8601
|
|
import six
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class TestTufApi(unittest.TestCase):
|
|
@classmethod
|
|
def setUpClass(cls):
|
|
|
|
# Create a temporary directory to store the repository, metadata, and target
|
|
# files. 'temporary_directory' must be deleted in TearDownClass() so that
|
|
# temporary files are always removed, even when exceptions occur.
|
|
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
|
|
|
|
test_repo_data = os.path.join(
|
|
os.path.dirname(os.path.realpath(__file__)), 'repository_data')
|
|
|
|
cls.repo_dir = os.path.join(cls.temporary_directory, 'repository')
|
|
shutil.copytree(os.path.join(test_repo_data, 'repository'), cls.repo_dir)
|
|
|
|
cls.keystore_dir = os.path.join(cls.temporary_directory, 'keystore')
|
|
shutil.copytree(os.path.join(test_repo_data, 'keystore'), cls.keystore_dir)
|
|
|
|
|
|
|
|
@classmethod
|
|
def tearDownClass(cls):
|
|
|
|
# Remove the temporary repository directory, which should contain all the
|
|
# metadata, targets, and key files generated for the test cases.
|
|
shutil.rmtree(cls.temporary_directory)
|
|
|
|
|
|
|
|
def _load_key_ring(self):
|
|
key_list = []
|
|
root_key = keys.RAMKey.read_from_file(os.path.join(self.keystore_dir, 'root_key'),
|
|
'RSA', 'password')
|
|
key_list.append(root_key)
|
|
|
|
for key_file in os.listdir(self.keystore_dir):
|
|
if key_file.endswith('.pub'):
|
|
# ignore public keys
|
|
continue
|
|
|
|
if key_file.startswith('root_key'):
|
|
# root key is loaded
|
|
continue
|
|
|
|
key = keys.RAMKey.read_from_file(os.path.join(self.keystore_dir, key_file),
|
|
'ED25519', 'password')
|
|
key_list.append(key)
|
|
threshold = keys.Threshold(1, 1)
|
|
return keys.KeyRing(threshold=threshold, keys=key_list)
|
|
|
|
def test_metadata_base(self):
|
|
# Use of Snapshot is arbitrary, we're just testing the base class features
|
|
# with real data
|
|
snapshot_path = os.path.join(self.repo_dir, 'metadata', 'snapshot.json')
|
|
md = metadata.Snapshot.read_from_json(snapshot_path)
|
|
|
|
self.assertEqual(md.signed.version, 1)
|
|
md.signed.bump_version()
|
|
self.assertEqual(md.signed.version, 2)
|
|
self.assertEqual(md.signed.expires, '2030-01-01T00:00:00Z')
|
|
md.signed.bump_expiration()
|
|
self.assertEqual(md.signed.expires, '2030-01-02T00:00:00Z')
|
|
md.signed.bump_expiration(timedelta(days=365))
|
|
self.assertEqual(md.signed.expires, '2031-01-02T00:00:00Z')
|
|
|
|
|
|
def test_metadata_snapshot(self):
|
|
snapshot_path = os.path.join(self.repo_dir, 'metadata', 'snapshot.json')
|
|
snapshot = metadata.Snapshot.read_from_json(snapshot_path)
|
|
|
|
key_ring = self._load_key_ring()
|
|
snapshot.verify(key_ring)
|
|
|
|
# Create a dict representing what we expect the updated data to be
|
|
fileinfo = snapshot.signed.meta
|
|
hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'}
|
|
fileinfo['role1.json']['version'] = 2
|
|
fileinfo['role1.json']['hashes'] = hashes
|
|
fileinfo['role1.json']['length'] = 123
|
|
|
|
snapshot.signed.update('role1', 2, 123, hashes)
|
|
self.assertEqual(snapshot.signed.meta, fileinfo)
|
|
|
|
# snapshot.signable()
|
|
|
|
# snapshot.sign()
|
|
|
|
# snapshot.verify()
|
|
|
|
# snapshot.write_to_json(os.path.join(cls.temporary_directory, 'api_snapshot.json'))
|
|
|
|
|
|
def test_metadata_timestamp(self):
|
|
timestamp_path = os.path.join(self.repo_dir, 'metadata', 'timestamp.json')
|
|
timestamp = metadata.Timestamp.read_from_json(timestamp_path)
|
|
|
|
key_ring = self._load_key_ring()
|
|
timestamp.verify(key_ring)
|
|
|
|
self.assertEqual(timestamp.signed.version, 1)
|
|
timestamp.signed.bump_version()
|
|
self.assertEqual(timestamp.signed.version, 2)
|
|
|
|
self.assertEqual(timestamp.signed.expires, '2030-01-01T00:00:00Z')
|
|
timestamp.signed.bump_expiration()
|
|
self.assertEqual(timestamp.signed.expires, '2030-01-02T00:00:00Z')
|
|
timestamp.signed.bump_expiration(timedelta(days=365))
|
|
self.assertEqual(timestamp.signed.expires, '2031-01-02T00:00:00Z')
|
|
|
|
# Test whether dateutil.relativedelta works, this provides a much easier to
|
|
# use interface for callers
|
|
delta = relativedelta(days=1)
|
|
timestamp.signed.bump_expiration(delta)
|
|
self.assertEqual(timestamp.signed.expires, '2031-01-03T00:00:00Z')
|
|
delta = relativedelta(years=5)
|
|
timestamp.signed.bump_expiration(delta)
|
|
self.assertEqual(timestamp.signed.expires, '2036-01-03T00:00:00Z')
|
|
|
|
hashes = {'sha256': '0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc'}
|
|
fileinfo = timestamp.signed.meta['snapshot.json']
|
|
fileinfo['hashes'] = hashes
|
|
fileinfo['version'] = 2
|
|
fileinfo['length'] = 520
|
|
timestamp.signed.update(2, 520, hashes)
|
|
self.assertEqual(timestamp.signed.meta['snapshot.json'], fileinfo)
|
|
|
|
# timestamp.sign()
|
|
|
|
# timestamp.write_to_json()
|
|
|
|
def test_Threshold(self):
|
|
# test default values
|
|
keys.Threshold()
|
|
# test correct arguments
|
|
keys.Threshold(least=4, most=5)
|
|
|
|
# test incorrect input
|
|
self.assertRaises(ValueError, keys.Threshold, 5, 4)
|
|
self.assertRaises(ValueError, keys.Threshold, 0, 5)
|
|
self.assertRaises(ValueError, keys.Threshold, 5, 0)
|
|
|
|
|
|
def test_KeyRing(self):
|
|
key_list = []
|
|
root_key = keys.RAMKey.read_from_file(os.path.join(self.keystore_dir, 'root_key'),
|
|
'RSA', 'password')
|
|
root_key2 = keys.RAMKey.read_from_file(os.path.join(self.keystore_dir, 'root_key2'),
|
|
'ED25519', 'password')
|
|
key_list.append(root_key)
|
|
key_list.append(root_key2)
|
|
threshold = keys.Threshold()
|
|
keyring = keys.KeyRing(threshold, key_list)
|
|
self.assertEqual(keyring.threshold, threshold)
|
|
self.assertEqual(keyring.keys, key_list)
|
|
|
|
|
|
def test_RAMKey_read_from_file(self):
|
|
filename = os.path.join(self.keystore_dir, 'root_key')
|
|
algorithm = 'RSA'
|
|
passphrase = 'password'
|
|
|
|
self.assertTrue(isinstance(keys.RAMKey.read_from_file(filename, algorithm, passphrase), keys.RAMKey))
|
|
|
|
# TODO:
|
|
# def test_RAMKey(self):
|
|
|
|
# Run unit test.
|
|
if __name__ == '__main__':
|
|
unittest.main()
|