ngclient TrustedMetadataSet: improve unit testing

The current situation with the TrustedMetadataSet testing is that
we don't have a mnimimal amount of unit tests testing the different
branches in the various API functionality in the class.

This commit proposes simple unit tests covering almost all of the
branches in the API functions and increasing the unit test coverage
(as reported from the "coverage" tool) from 74 % to 97 %.

The code could be complicated at places, because the different
branches in the update_* functions depend on other metadata classes
as well.
Still, I hope we can find a way and simplify the code.

Signed-off-by: Martin Vrachev <mvrachev@vmware.com>
This commit is contained in:
Martin Vrachev 2021-07-01 20:14:13 +03:00
parent 726af73256
commit 6a942889ff

View file

@ -1,14 +1,22 @@
import copy
import json
import logging
import os
import shutil
import sys
import tempfile
import unittest
from typing import Dict, Any
from datetime import datetime
from tuf import exceptions
from tuf.api.metadata import Metadata
from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet
from tuf.ngclient._internal.trusted_metadata_set import(
TrustedMetadataSet
)
from securesystemslib.signer import SSlibSigner
from securesystemslib.interface import(
import_ed25519_privatekey_from_file,
import_rsa_privatekey_from_file
)
from tests import utils
@ -26,63 +34,91 @@ def setUpClass(cls):
with open(os.path.join(cls.repo_dir, f"{md}.json"), "rb") as f:
cls.metadata[md] = f.read()
keystore_dir = os.path.join(os.getcwd(), 'repository_data', 'keystore')
cls.keystore = {}
root_key_dict = import_rsa_privatekey_from_file(
os.path.join(keystore_dir, "root" + '_key'),
password="password"
)
cls.keystore["root"] = SSlibSigner(root_key_dict)
for role in ["delegation", "snapshot", "targets", "timestamp"]:
key_dict = import_ed25519_privatekey_from_file(
os.path.join(keystore_dir, role + '_key'),
password="password"
)
cls.keystore[role] = SSlibSigner(key_dict)
def setUp(self) -> None:
self.trusted_set = TrustedMetadataSet(self.metadata["root"])
def _root_update_finished_and_update_timestamp(self):
self.trusted_set.root_update_finished()
self.trusted_set.update_timestamp(self.metadata["timestamp"])
def _update_all_besides_targets(self):
self.trusted_set.root_update_finished()
self.trusted_set.update_timestamp(self.metadata["timestamp"])
self.trusted_set.update_snapshot(self.metadata["snapshot"])
def test_update(self):
trusted_set = TrustedMetadataSet(self.metadata["root"])
trusted_set.root_update_finished()
trusted_set.update_timestamp(self.metadata["timestamp"])
trusted_set.update_snapshot(self.metadata["snapshot"])
trusted_set.update_targets(self.metadata["targets"])
trusted_set.update_delegated_targets(
self.trusted_set.root_update_finished()
self.trusted_set.update_timestamp(self.metadata["timestamp"])
self.trusted_set.update_snapshot(self.metadata["snapshot"])
self.trusted_set.update_targets(self.metadata["targets"])
self.trusted_set.update_delegated_targets(
self.metadata["role1"], "role1", "targets"
)
trusted_set.update_delegated_targets(
self.trusted_set.update_delegated_targets(
self.metadata["role2"], "role2", "role1"
)
# the 4 top level metadata objects + 2 additional delegated targets
self.assertTrue(len(self.trusted_set), 6)
def test_out_of_order_ops(self):
trusted_set = TrustedMetadataSet(self.metadata["root"])
# Update timestamp before root is finished
with self.assertRaises(RuntimeError):
trusted_set.update_timestamp(self.metadata["timestamp"])
self.trusted_set.update_timestamp(self.metadata["timestamp"])
trusted_set.root_update_finished()
self.trusted_set.root_update_finished()
with self.assertRaises(RuntimeError):
trusted_set.root_update_finished()
self.trusted_set.root_update_finished()
# Update root after a previous successful root update
with self.assertRaises(RuntimeError):
self.trusted_set.update_root(self.metadata["root"])
# Update snapshot before timestamp
with self.assertRaises(RuntimeError):
trusted_set.update_snapshot(self.metadata["snapshot"])
self.trusted_set.update_snapshot(self.metadata["snapshot"])
trusted_set.update_timestamp(self.metadata["timestamp"])
self.trusted_set.update_timestamp(self.metadata["timestamp"])
# Update targets before snapshot
with self.assertRaises(RuntimeError):
trusted_set.update_targets(self.metadata["targets"])
self.trusted_set.update_targets(self.metadata["targets"])
trusted_set.update_snapshot(self.metadata["snapshot"])
self.trusted_set.update_snapshot(self.metadata["snapshot"])
# update timestamp after snapshot
with self.assertRaises(RuntimeError):
trusted_set.update_timestamp(self.metadata["timestamp"])
self.trusted_set.update_timestamp(self.metadata["timestamp"])
# Update delegated targets before targets
with self.assertRaises(RuntimeError):
trusted_set.update_delegated_targets(
self.trusted_set.update_delegated_targets(
self.metadata["role1"], "role1", "targets"
)
trusted_set.update_targets(self.metadata["targets"])
trusted_set.update_delegated_targets(
self.trusted_set.update_targets(self.metadata["targets"])
# Update snapshot after sucessful targets update
with self.assertRaises(RuntimeError):
self.trusted_set.update_snapshot(self.metadata["snapshot"])
self.trusted_set.update_delegated_targets(
self.metadata["role1"], "role1", "targets"
)
trusted_set.update_targets(self.metadata["targets"])
trusted_set.update_delegated_targets(
self.metadata["role1"], "role1", "targets"
)
def test_update_with_invalid_json(self):
# root.json not a json file at all
@ -94,20 +130,27 @@ def test_update_with_invalid_json(self):
with self.assertRaises(exceptions.RepositoryError):
TrustedMetadataSet(json.dumps(root.to_dict()).encode())
trusted_set = TrustedMetadataSet(self.metadata["root"])
trusted_set.root_update_finished()
# update_root called with the wrong metadata type
with self.assertRaises(exceptions.RepositoryError):
self.trusted_set.update_root(self.metadata["snapshot"])
self.trusted_set.root_update_finished()
top_level_md = [
(self.metadata["timestamp"], trusted_set.update_timestamp),
(self.metadata["snapshot"], trusted_set.update_snapshot),
(self.metadata["targets"], trusted_set.update_targets),
(self.metadata["timestamp"], self.trusted_set.update_timestamp),
(self.metadata["snapshot"], self.trusted_set.update_snapshot),
(self.metadata["targets"], self.trusted_set.update_targets),
]
for metadata, update_func in top_level_md:
md = Metadata.from_bytes(metadata)
if md.signed.type == "snapshot":
# timestamp hashes and length intervene when testing snapshot
self.trusted_set.timestamp.signed.meta["snapshot.json"].hashes = None
self.trusted_set.timestamp.signed.meta["snapshot.json"].length = None
# metadata is not json
with self.assertRaises(exceptions.RepositoryError):
update_func(b"")
# metadata is invalid
md = Metadata.from_bytes(metadata)
md.signed.version += 1
with self.assertRaises(exceptions.RepositoryError):
update_func(json.dumps(md.to_dict()).encode())
@ -119,8 +162,139 @@ def test_update_with_invalid_json(self):
update_func(metadata)
def test_update_root_new_root_cannot_be_verified_with_threshold(self):
# new_root data with threshold which cannot be verified.
modified_threshold_data = copy.deepcopy(
json.loads(self.metadata["root"])
)
# change something in root so signature doesn't match the content.
modified_threshold_data["signed"]["roles"]["root"]["version"] = 2
modified_threshold_data = json.dumps(modified_threshold_data).encode()
with self.assertRaises(exceptions.UnsignedMetadataError):
self.trusted_set.update_root(modified_threshold_data)
def test_update_root_new_root_ver_same_as_trusted_root_ver(self):
with self.assertRaises(exceptions.ReplayedMetadataError):
self.trusted_set.update_root(self.metadata["root"])
def test_root_update_finished_expired(self):
root = Metadata.from_bytes(self.metadata["root"])
root.signed.expires = datetime(1970, 1, 1)
root.sign(self.keystore["root"])
modified_root_data = json.dumps(root.to_dict()).encode()
tmp_trusted_set = TrustedMetadataSet(modified_root_data)
# call root_update_finished when trusted root has expired
with self.assertRaises(exceptions.ExpiredMetadataError):
tmp_trusted_set.root_update_finished()
def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self):
self._root_update_finished_and_update_timestamp()
# new_timestamp.version < trusted_timestamp.version
self.trusted_set.timestamp.signed.version = 2
with self.assertRaises(exceptions.ReplayedMetadataError):
self.trusted_set.update_timestamp(self.metadata["timestamp"])
def test_update_timestamp_snapshot_ver_below_trusted_snapshot_ver(self):
self._root_update_finished_and_update_timestamp()
# new_timestamp.snapshot.version < trusted_timestamp.snapshot.version
self.trusted_set.timestamp.signed.meta["snapshot.json"].version = 2
with self.assertRaises(exceptions.ReplayedMetadataError):
self.trusted_set.update_timestamp(self.metadata["timestamp"])
def test_update_timestamp_expired(self):
self._root_update_finished_and_update_timestamp()
# new_timestamp has expired
timestamp = Metadata.from_bytes(self.metadata["timestamp"])
timestamp.signed.expires = datetime(1970, 1, 1)
timestamp.sign(self.keystore["timestamp"])
new_timestamp_byte_data = json.dumps(timestamp.to_dict()).encode()
with self.assertRaises(exceptions.ExpiredMetadataError):
self.trusted_set.update_timestamp(new_timestamp_byte_data)
def test_update_snapshot_cannot_verify_snapshot_with_threshold(self):
self._root_update_finished_and_update_timestamp()
# remove keyids representing snapshot signatures from root data
self.trusted_set.root.signed.roles["snapshot"].keyids = []
with self.assertRaises(exceptions.UnsignedMetadataError):
self.trusted_set.update_snapshot(self.metadata["snapshot"])
def test_update_snapshot_version_different_timestamp_snapshot_version(self):
self._root_update_finished_and_update_timestamp()
# new_snapshot.version != trusted timestamp.meta["snapshot"].version
self.trusted_set.timestamp.signed.meta["snapshot.json"].version = 2
with self.assertRaises(exceptions.BadVersionNumberError):
self.trusted_set.update_snapshot(self.metadata["snapshot"])
def test_update_snapshot_after_successful_update_new_snapshot_no_meta(self):
self._update_all_besides_targets()
# Test removing a meta_file in new_snapshot compared to the old snapshot
snapshot = Metadata.from_bytes(self.metadata["snapshot"])
snapshot.signed.meta = {}
snapshot.sign(self.keystore["snapshot"])
self.trusted_set.timestamp.signed.meta["snapshot.json"].hashes = None
self.trusted_set.timestamp.signed.meta["snapshot.json"].length = None
modified_snapshot_data = json.dumps(snapshot.to_dict()).encode()
with self.assertRaises(exceptions.RepositoryError):
self.trusted_set.update_snapshot(modified_snapshot_data)
def test_update_snapshot_after_succesfull_update_new_snapshot_meta_version_different(self):
self._update_all_besides_targets()
# snapshot.meta["project1"].version != new_snapshot.meta["project1"].version
for metafile in self.trusted_set.snapshot.signed.meta.values():
metafile.version += 1
with self.assertRaises(exceptions.BadVersionNumberError):
self.trusted_set.update_snapshot(self.metadata["snapshot"])
def test_update_snapshot_after_succesfull_expired_new_snapshot(self):
self._update_all_besides_targets()
# new_snapshot has expired
snapshot = Metadata.from_bytes(self.metadata["snapshot"])
snapshot.signed.expires = datetime(1970, 1, 1)
snapshot.sign(self.keystore["snapshot"])
self.trusted_set.timestamp.signed.meta["snapshot.json"].hashes = None
self.trusted_set.timestamp.signed.meta["snapshot.json"].length = None
modified_snapshot_data = json.dumps(snapshot.to_dict()).encode()
with self.assertRaises(exceptions.ExpiredMetadataError):
self.trusted_set.update_snapshot(modified_snapshot_data)
def test_update_targets_no_meta_in_snapshot(self):
self._update_all_besides_targets()
# remove meta information with information about targets from snapshot
self.trusted_set.snapshot.signed.meta = {}
with self.assertRaises(exceptions.RepositoryError):
self.trusted_set.update_targets(self.metadata["targets"])
def test_update_targets_hash_different_than_snapshot_meta_hash(self):
self._update_all_besides_targets()
# observed_hash != stored hash in snapshot meta for targets
for target_path in self.trusted_set.snapshot.signed.meta.keys():
self.trusted_set.snapshot.signed.meta[target_path].hashes = {"sha256": "b"}
with self.assertRaises(exceptions.RepositoryError):
self.trusted_set.update_targets(self.metadata["targets"])
def test_update_targets_version_different_snapshot_meta_version(self):
self._update_all_besides_targets()
# new_delegate.signed.version != meta.version stored in snapshot
for target_path in self.trusted_set.snapshot.signed.meta.keys():
self.trusted_set.snapshot.signed.meta[target_path].version = 2
with self.assertRaises(exceptions.BadVersionNumberError):
self.trusted_set.update_targets(self.metadata["targets"])
def test_update_targets_expired_new_target(self):
self._update_all_besides_targets()
# new_delegated_target has expired
targets = Metadata.from_bytes(self.metadata["targets"])
targets.signed.expires = datetime(1970, 1, 1)
targets.sign(self.keystore["targets"])
modified_targets_data = json.dumps(targets.to_dict()).encode()
with self.assertRaises(exceptions.ExpiredMetadataError):
self.trusted_set.update_targets(modified_targets_data)
# TODO test updating over initial metadata (new keys, newer timestamp, etc)
# TODO test the actual specification checks
if __name__ == '__main__':