diff --git a/tests/test_metadata_serialization.py b/tests/test_metadata_serialization.py index 3e11082b..13bb5500 100644 --- a/tests/test_metadata_serialization.py +++ b/tests/test_metadata_serialization.py @@ -12,7 +12,7 @@ import unittest import copy -from typing import Dict, Callable +from typing import Dict from tests import utils @@ -31,28 +31,12 @@ logger = logging.getLogger(__name__) -# DataSet is only here so type hints can be used: -# It is a dict of name to test dict -DataSet = Dict[str, str] - -# Test runner decorator: Runs the test as a set of N SubTests, -# (where N is number of items in dataset), feeding the actual test -# function one test case at a time -def run_sub_tests_with_dataset(dataset: DataSet): - def real_decorator(function: Callable[["TestSerialization", str], None]): - def wrapper(test_cls: "TestSerialization"): - for case, data in dataset.items(): - with test_cls.subTest(case=case): - function(test_cls, data) - return wrapper - return real_decorator - class TestSerialization(unittest.TestCase): # Snapshot instances with meta = {} are valid, but for a full valid # repository it's required that meta has at least one element inside it. - invalid_signed: DataSet = { + invalid_signed: utils.DataSet = { "no _type": '{"spec_version": "1.0.0", "expires": "2030-01-01T00:00:00Z", "meta": {}}', "no spec_version": '{"_type": "signed", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}', "no version": '{"_type": "signed", "spec_version": "1.0.0", "expires": "2030-01-01T00:00:00Z", "meta": {}}', @@ -81,14 +65,14 @@ class TestSerialization(unittest.TestCase): '{"_type": "signed", "spec_version": "1.0.0", "version": 1, "expires": "abc", "meta": {}}', } - @run_sub_tests_with_dataset(invalid_signed) + @utils.run_sub_tests_with_dataset(invalid_signed) def test_invalid_signed_serialization(self, test_case_data: Dict[str, str]): case_dict = json.loads(test_case_data) with self.assertRaises((KeyError, ValueError, TypeError)): Snapshot.from_dict(copy.deepcopy(case_dict)) - valid_keys: DataSet = { + valid_keys: utils.DataSet = { "all": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", \ "keyval": {"public": "foo"}}', "unrecognized field": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", \ @@ -97,14 +81,14 @@ def test_invalid_signed_serialization(self, test_case_data: Dict[str, str]): "keyval": {"public": "foo", "foo": "bar"}}', } - @run_sub_tests_with_dataset(valid_keys) + @utils.run_sub_tests_with_dataset(valid_keys) def test_valid_key_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) key = Key.from_dict("id", copy.copy(case_dict)) self.assertDictEqual(case_dict, key.to_dict()) - invalid_keys: DataSet = { + invalid_keys: utils.DataSet = { "no keyid": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "abc"}}', "no keytype": '{"keyid": "id", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}', "no scheme": '{"keyid": "id", "keytype": "rsa", "keyval": {"public": "foo"}}', @@ -115,14 +99,14 @@ def test_valid_key_serialization(self, test_case_data: str): "keyval wrong type": '{"keyid": "id", "keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": 1}', } - @run_sub_tests_with_dataset(invalid_keys) + @utils.run_sub_tests_with_dataset(invalid_keys) def test_invalid_key_serialization(self, test_case_data: Dict[str, str]): case_dict = json.loads(test_case_data) with self.assertRaises((TypeError, KeyError)): keyid = case_dict.pop("keyid") Key.from_dict(keyid, copy.copy(case_dict)) - invalid_roles: DataSet = { + invalid_roles: utils.DataSet = { "no threshold": '{"keyids": ["keyid"]}', "no keyids": '{"threshold": 3}', "wrong threshold type": '{"keyids": ["keyid"], "threshold": "a"}', @@ -130,28 +114,28 @@ def test_invalid_key_serialization(self, test_case_data: Dict[str, str]): "duplicate keyids": '{"keyids": ["keyid", "keyid"], "threshold": 3}', } - @run_sub_tests_with_dataset(invalid_roles) + @utils.run_sub_tests_with_dataset(invalid_roles) def test_invalid_role_serialization(self, test_case_data: Dict[str, str]): case_dict = json.loads(test_case_data) with self.assertRaises((KeyError, TypeError, ValueError)): Role.from_dict(copy.deepcopy(case_dict)) - valid_roles: DataSet = { + valid_roles: utils.DataSet = { "all": '{"keyids": ["keyid"], "threshold": 3}', "many keyids": '{"keyids": ["a", "b", "c", "d", "e"], "threshold": 1}', "empty keyids": '{"keyids": [], "threshold": 1}', "unrecognized field": '{"keyids": ["keyid"], "threshold": 3, "foo": "bar"}', } - @run_sub_tests_with_dataset(valid_roles) + @utils.run_sub_tests_with_dataset(valid_roles) def test_role_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) role = Role.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, role.to_dict()) - valid_roots: DataSet = { + valid_roots: utils.DataSet = { "all": '{"_type": "root", "spec_version": "1.0.0", "version": 1, \ "expires": "2030-01-01T00:00:00Z", "consistent_snapshot": false, \ "keys": { \ @@ -178,14 +162,14 @@ def test_role_serialization(self, test_case_data: str): "foo": "bar"}', } - @run_sub_tests_with_dataset(valid_roots) + @utils.run_sub_tests_with_dataset(valid_roots) def test_root_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) root = Root.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, root.to_dict()) - invalid_metafiles: DataSet = { + invalid_metafiles: utils.DataSet = { "wrong length type": '{"version": 1, "length": "a", "hashes": {"sha256" : "abc"}}', "length 0": '{"version": 1, "length": 0, "hashes": {"sha256" : "abc"}}', "length below 0": '{"version": 1, "length": -1, "hashes": {"sha256" : "abc"}}', @@ -194,14 +178,14 @@ def test_root_serialization(self, test_case_data: str): "hashes values wrong type": '{"version": 1, "length": 1, "hashes": {"sha256": 1}}', } - @run_sub_tests_with_dataset(invalid_metafiles) + @utils.run_sub_tests_with_dataset(invalid_metafiles) def test_invalid_metafile_serialization(self, test_case_data: Dict[str, str]): case_dict = json.loads(test_case_data) with self.assertRaises((TypeError, ValueError, AttributeError)): MetaFile.from_dict(copy.deepcopy(case_dict)) - valid_metafiles: DataSet = { + valid_metafiles: utils.DataSet = { "all": '{"hashes": {"sha256" : "abc"}, "length": 12, "version": 1}', "no length": '{"hashes": {"sha256" : "abc"}, "version": 1 }', "no hashes": '{"length": 12, "version": 1}', @@ -209,38 +193,38 @@ def test_invalid_metafile_serialization(self, test_case_data: Dict[str, str]): "many hashes": '{"hashes": {"sha256" : "abc", "sha512": "cde"}, "length": 12, "version": 1}', } - @run_sub_tests_with_dataset(valid_metafiles) + @utils.run_sub_tests_with_dataset(valid_metafiles) def test_metafile_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) metafile = MetaFile.from_dict(copy.copy(case_dict)) self.assertDictEqual(case_dict, metafile.to_dict()) - invalid_timestamps: DataSet = { + invalid_timestamps: utils.DataSet = { "no metafile": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z"}', } - @run_sub_tests_with_dataset(invalid_timestamps) + @utils.run_sub_tests_with_dataset(invalid_timestamps) def test_invalid_timestamp_serialization(self, test_case_data: Dict[str, str]): case_dict = json.loads(test_case_data) with self.assertRaises((ValueError, KeyError)): Timestamp.from_dict(copy.deepcopy(case_dict)) - valid_timestamps: DataSet = { + valid_timestamps: utils.DataSet = { "all": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": {"snapshot.json": {"hashes": {"sha256" : "abc"}, "version": 1}}}', "unrecognized field": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": {"snapshot.json": {"hashes": {"sha256" : "abc"}, "version": 1}}, "foo": "bar"}', } - @run_sub_tests_with_dataset(valid_timestamps) + @utils.run_sub_tests_with_dataset(valid_timestamps) def test_timestamp_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) timestamp = Timestamp.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, timestamp.to_dict()) - valid_snapshots: DataSet = { + valid_snapshots: utils.DataSet = { "all": '{ "_type": "snapshot", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "meta": { \ "file1.txt": {"hashes": {"sha256" : "abc"}, "version": 1}, \ @@ -253,14 +237,14 @@ def test_timestamp_serialization(self, test_case_data: str): "meta": { "file.txt": { "hashes": {"sha256" : "abc"}, "version": 1 }}, "foo": "bar"}', } - @run_sub_tests_with_dataset(valid_snapshots) + @utils.run_sub_tests_with_dataset(valid_snapshots) def test_snapshot_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) snapshot = Snapshot.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, snapshot.to_dict()) - valid_delegated_roles: DataSet = { + valid_delegated_roles: utils.DataSet = { # DelegatedRole inherits Role and some use cases can be found in the valid_roles. "no hash prefix attribute": '{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], \ @@ -279,14 +263,14 @@ def test_snapshot_serialization(self, test_case_data: str): "terminating": false, "threshold": 1}', } - @run_sub_tests_with_dataset(valid_delegated_roles) + @utils.run_sub_tests_with_dataset(valid_delegated_roles) def test_delegated_role_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) deserialized_role = DelegatedRole.from_dict(copy.copy(case_dict)) self.assertDictEqual(case_dict, deserialized_role.to_dict()) - invalid_delegated_roles: DataSet = { + invalid_delegated_roles: utils.DataSet = { # DelegatedRole inherits Role and some use cases can be found in the invalid_roles. "missing hash prefixes and paths": '{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false}', @@ -295,14 +279,14 @@ def test_delegated_role_serialization(self, test_case_data: str): "paths": ["fn1", "fn2"], "path_hash_prefixes": ["h1", "h2"]}', } - @run_sub_tests_with_dataset(invalid_delegated_roles) + @utils.run_sub_tests_with_dataset(invalid_delegated_roles) def test_invalid_delegated_role_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) with self.assertRaises(ValueError): DelegatedRole.from_dict(copy.copy(case_dict)) - invalid_delegations: DataSet = { + invalid_delegations: utils.DataSet = { "empty delegations": '{}', "bad keys": '{"keys": "foo", \ "roles": [{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": false, "threshold": 3}]}', @@ -316,14 +300,14 @@ def test_invalid_delegated_role_serialization(self, test_case_data: str): }', } - @run_sub_tests_with_dataset(invalid_delegations) + @utils.run_sub_tests_with_dataset(invalid_delegations) def test_invalid_delegation_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) with self.assertRaises((ValueError, KeyError, AttributeError)): Delegations.from_dict(copy.deepcopy(case_dict)) - valid_delegations: DataSet = { + valid_delegations: utils.DataSet = { "all": '{"keys": { \ "keyid1" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}, \ @@ -341,28 +325,28 @@ def test_invalid_delegation_serialization(self, test_case_data: str): }', } - @run_sub_tests_with_dataset(valid_delegations) + @utils.run_sub_tests_with_dataset(valid_delegations) def test_delegation_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) delegation = Delegations.from_dict(copy.deepcopy(case_dict)) self.assertDictEqual(case_dict, delegation.to_dict()) - invalid_targetfiles: DataSet = { + invalid_targetfiles: utils.DataSet = { "no hashes": '{"length": 1}', "no length": '{"hashes": {"sha256": "abc"}}' # The remaining cases are the same as for invalid_hashes and # invalid_length datasets. } - @run_sub_tests_with_dataset(invalid_targetfiles) + @utils.run_sub_tests_with_dataset(invalid_targetfiles) def test_invalid_targetfile_serialization(self, test_case_data: Dict[str, str]): case_dict = json.loads(test_case_data) with self.assertRaises(KeyError): TargetFile.from_dict(copy.deepcopy(case_dict), "file1.txt") - valid_targetfiles: DataSet = { + valid_targetfiles: utils.DataSet = { "all": '{"length": 12, "hashes": {"sha256" : "abc"}, \ "custom" : {"foo": "bar"} }', "no custom": '{"length": 12, "hashes": {"sha256" : "abc"}}', @@ -370,14 +354,14 @@ def test_invalid_targetfile_serialization(self, test_case_data: Dict[str, str]): "custom" : {"foo": "bar"}, "foo": "bar"}', } - @run_sub_tests_with_dataset(valid_targetfiles) + @utils.run_sub_tests_with_dataset(valid_targetfiles) def test_targetfile_serialization(self, test_case_data: str): case_dict = json.loads(test_case_data) target_file = TargetFile.from_dict(copy.copy(case_dict), "file1.txt") self.assertDictEqual(case_dict, target_file.to_dict()) - valid_targets: DataSet = { + valid_targets: utils.DataSet = { "all attributes": '{"_type": "targets", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \ "targets": { \ "file.txt": {"length": 12, "hashes": {"sha256" : "abc"} }, \ @@ -403,7 +387,7 @@ def test_targetfile_serialization(self, test_case_data: str): "targets": {}, "foo": "bar"}', } - @run_sub_tests_with_dataset(valid_targets) + @utils.run_sub_tests_with_dataset(valid_targets) def test_targets_serialization(self, test_case_data): case_dict = json.loads(test_case_data) targets = Targets.from_dict(copy.deepcopy(case_dict)) diff --git a/tests/utils.py b/tests/utils.py index 6a3ee66b..15f28924 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -20,8 +20,10 @@ Provide common utilities for TUF tests """ -import argparse from contextlib import contextmanager +from typing import Dict, Any, Callable +import unittest +import argparse import errno import logging import socket @@ -40,6 +42,22 @@ TEST_HOST_ADDRESS = '127.0.0.1' +# DataSet is only here so type hints can be used. +DataSet = Dict[str, Any] + +# Test runner decorator: Runs the test as a set of N SubTests, +# (where N is number of items in dataset), feeding the actual test +# function one test case at a time +def run_sub_tests_with_dataset(dataset: DataSet): + def real_decorator(function: Callable[[unittest.TestCase, Any], None]): + def wrapper(test_cls: unittest.TestCase): + for case, data in dataset.items(): + with test_cls.subTest(case=case): + function(test_cls, data) + return wrapper + return real_decorator + + class TestServerProcessError(Exception): def __init__(self, value="TestServerProcess"):