Tests: move decorator in utils so it can be reused

Generalize the decorator used in test_metadata_serialization.py and
move it inside tests/utils.py, so it can be reused in other similar
situations.

Signed-off-by: Martin Vrachev <mvrachev@vmware.com>
This commit is contained in:
Martin Vrachev 2021-10-04 17:17:25 +03:00
parent 76cf36eb86
commit 9989d3c614
2 changed files with 56 additions and 54 deletions

View file

@ -12,7 +12,7 @@
import unittest
import copy
from typing import Dict, Callable
from typing import Dict
from tests import utils
@ -31,28 +31,12 @@
logger = logging.getLogger(__name__)
# DataSet is only here so type hints can be used:
# It is a dict of name to test dict
DataSet = Dict[str, str]
# Test runner decorator: Runs the test as a set of N SubTests,
# (where N is number of items in dataset), feeding the actual test
# function one test case at a time
def run_sub_tests_with_dataset(dataset: DataSet):
def real_decorator(function: Callable[["TestSerialization", str], None]):
def wrapper(test_cls: "TestSerialization"):
for case, data in dataset.items():
with test_cls.subTest(case=case):
function(test_cls, data)
return wrapper
return real_decorator
class TestSerialization(unittest.TestCase):
# Snapshot instances with meta = {} are valid, but for a full valid
# repository it's required that meta has at least one element inside it.
invalid_signed: DataSet = {
invalid_signed: utils.DataSet = {
"no _type": '{"spec_version": "1.0.0", "expires": "2030-01-01T00:00:00Z", "meta": {}}',
"no spec_version": '{"_type": "signed", "version": 1, "expires": "2030-01-01T00:00:00Z", "meta": {}}',
"no version": '{"_type": "signed", "spec_version": "1.0.0", "expires": "2030-01-01T00:00:00Z", "meta": {}}',
@ -81,14 +65,14 @@ class TestSerialization(unittest.TestCase):
'{"_type": "signed", "spec_version": "1.0.0", "version": 1, "expires": "abc", "meta": {}}',
}
@run_sub_tests_with_dataset(invalid_signed)
@utils.run_sub_tests_with_dataset(invalid_signed)
def test_invalid_signed_serialization(self, test_case_data: Dict[str, str]):
case_dict = json.loads(test_case_data)
with self.assertRaises((KeyError, ValueError, TypeError)):
Snapshot.from_dict(copy.deepcopy(case_dict))
valid_keys: DataSet = {
valid_keys: utils.DataSet = {
"all": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", \
"keyval": {"public": "foo"}}',
"unrecognized field": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", \
@ -97,14 +81,14 @@ def test_invalid_signed_serialization(self, test_case_data: Dict[str, str]):
"keyval": {"public": "foo", "foo": "bar"}}',
}
@run_sub_tests_with_dataset(valid_keys)
@utils.run_sub_tests_with_dataset(valid_keys)
def test_valid_key_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
key = Key.from_dict("id", copy.copy(case_dict))
self.assertDictEqual(case_dict, key.to_dict())
invalid_keys: DataSet = {
invalid_keys: utils.DataSet = {
"no keyid": '{"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "abc"}}',
"no keytype": '{"keyid": "id", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}',
"no scheme": '{"keyid": "id", "keytype": "rsa", "keyval": {"public": "foo"}}',
@ -115,14 +99,14 @@ def test_valid_key_serialization(self, test_case_data: str):
"keyval wrong type": '{"keyid": "id", "keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": 1}',
}
@run_sub_tests_with_dataset(invalid_keys)
@utils.run_sub_tests_with_dataset(invalid_keys)
def test_invalid_key_serialization(self, test_case_data: Dict[str, str]):
case_dict = json.loads(test_case_data)
with self.assertRaises((TypeError, KeyError)):
keyid = case_dict.pop("keyid")
Key.from_dict(keyid, copy.copy(case_dict))
invalid_roles: DataSet = {
invalid_roles: utils.DataSet = {
"no threshold": '{"keyids": ["keyid"]}',
"no keyids": '{"threshold": 3}',
"wrong threshold type": '{"keyids": ["keyid"], "threshold": "a"}',
@ -130,28 +114,28 @@ def test_invalid_key_serialization(self, test_case_data: Dict[str, str]):
"duplicate keyids": '{"keyids": ["keyid", "keyid"], "threshold": 3}',
}
@run_sub_tests_with_dataset(invalid_roles)
@utils.run_sub_tests_with_dataset(invalid_roles)
def test_invalid_role_serialization(self, test_case_data: Dict[str, str]):
case_dict = json.loads(test_case_data)
with self.assertRaises((KeyError, TypeError, ValueError)):
Role.from_dict(copy.deepcopy(case_dict))
valid_roles: DataSet = {
valid_roles: utils.DataSet = {
"all": '{"keyids": ["keyid"], "threshold": 3}',
"many keyids": '{"keyids": ["a", "b", "c", "d", "e"], "threshold": 1}',
"empty keyids": '{"keyids": [], "threshold": 1}',
"unrecognized field": '{"keyids": ["keyid"], "threshold": 3, "foo": "bar"}',
}
@run_sub_tests_with_dataset(valid_roles)
@utils.run_sub_tests_with_dataset(valid_roles)
def test_role_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
role = Role.from_dict(copy.deepcopy(case_dict))
self.assertDictEqual(case_dict, role.to_dict())
valid_roots: DataSet = {
valid_roots: utils.DataSet = {
"all": '{"_type": "root", "spec_version": "1.0.0", "version": 1, \
"expires": "2030-01-01T00:00:00Z", "consistent_snapshot": false, \
"keys": { \
@ -178,14 +162,14 @@ def test_role_serialization(self, test_case_data: str):
"foo": "bar"}',
}
@run_sub_tests_with_dataset(valid_roots)
@utils.run_sub_tests_with_dataset(valid_roots)
def test_root_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
root = Root.from_dict(copy.deepcopy(case_dict))
self.assertDictEqual(case_dict, root.to_dict())
invalid_metafiles: DataSet = {
invalid_metafiles: utils.DataSet = {
"wrong length type": '{"version": 1, "length": "a", "hashes": {"sha256" : "abc"}}',
"length 0": '{"version": 1, "length": 0, "hashes": {"sha256" : "abc"}}',
"length below 0": '{"version": 1, "length": -1, "hashes": {"sha256" : "abc"}}',
@ -194,14 +178,14 @@ def test_root_serialization(self, test_case_data: str):
"hashes values wrong type": '{"version": 1, "length": 1, "hashes": {"sha256": 1}}',
}
@run_sub_tests_with_dataset(invalid_metafiles)
@utils.run_sub_tests_with_dataset(invalid_metafiles)
def test_invalid_metafile_serialization(self, test_case_data: Dict[str, str]):
case_dict = json.loads(test_case_data)
with self.assertRaises((TypeError, ValueError, AttributeError)):
MetaFile.from_dict(copy.deepcopy(case_dict))
valid_metafiles: DataSet = {
valid_metafiles: utils.DataSet = {
"all": '{"hashes": {"sha256" : "abc"}, "length": 12, "version": 1}',
"no length": '{"hashes": {"sha256" : "abc"}, "version": 1 }',
"no hashes": '{"length": 12, "version": 1}',
@ -209,38 +193,38 @@ def test_invalid_metafile_serialization(self, test_case_data: Dict[str, str]):
"many hashes": '{"hashes": {"sha256" : "abc", "sha512": "cde"}, "length": 12, "version": 1}',
}
@run_sub_tests_with_dataset(valid_metafiles)
@utils.run_sub_tests_with_dataset(valid_metafiles)
def test_metafile_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
metafile = MetaFile.from_dict(copy.copy(case_dict))
self.assertDictEqual(case_dict, metafile.to_dict())
invalid_timestamps: DataSet = {
invalid_timestamps: utils.DataSet = {
"no metafile": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z"}',
}
@run_sub_tests_with_dataset(invalid_timestamps)
@utils.run_sub_tests_with_dataset(invalid_timestamps)
def test_invalid_timestamp_serialization(self, test_case_data: Dict[str, str]):
case_dict = json.loads(test_case_data)
with self.assertRaises((ValueError, KeyError)):
Timestamp.from_dict(copy.deepcopy(case_dict))
valid_timestamps: DataSet = {
valid_timestamps: utils.DataSet = {
"all": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \
"meta": {"snapshot.json": {"hashes": {"sha256" : "abc"}, "version": 1}}}',
"unrecognized field": '{ "_type": "timestamp", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \
"meta": {"snapshot.json": {"hashes": {"sha256" : "abc"}, "version": 1}}, "foo": "bar"}',
}
@run_sub_tests_with_dataset(valid_timestamps)
@utils.run_sub_tests_with_dataset(valid_timestamps)
def test_timestamp_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
timestamp = Timestamp.from_dict(copy.deepcopy(case_dict))
self.assertDictEqual(case_dict, timestamp.to_dict())
valid_snapshots: DataSet = {
valid_snapshots: utils.DataSet = {
"all": '{ "_type": "snapshot", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \
"meta": { \
"file1.txt": {"hashes": {"sha256" : "abc"}, "version": 1}, \
@ -253,14 +237,14 @@ def test_timestamp_serialization(self, test_case_data: str):
"meta": { "file.txt": { "hashes": {"sha256" : "abc"}, "version": 1 }}, "foo": "bar"}',
}
@run_sub_tests_with_dataset(valid_snapshots)
@utils.run_sub_tests_with_dataset(valid_snapshots)
def test_snapshot_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
snapshot = Snapshot.from_dict(copy.deepcopy(case_dict))
self.assertDictEqual(case_dict, snapshot.to_dict())
valid_delegated_roles: DataSet = {
valid_delegated_roles: utils.DataSet = {
# DelegatedRole inherits Role and some use cases can be found in the valid_roles.
"no hash prefix attribute":
'{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], \
@ -279,14 +263,14 @@ def test_snapshot_serialization(self, test_case_data: str):
"terminating": false, "threshold": 1}',
}
@run_sub_tests_with_dataset(valid_delegated_roles)
@utils.run_sub_tests_with_dataset(valid_delegated_roles)
def test_delegated_role_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
deserialized_role = DelegatedRole.from_dict(copy.copy(case_dict))
self.assertDictEqual(case_dict, deserialized_role.to_dict())
invalid_delegated_roles: DataSet = {
invalid_delegated_roles: utils.DataSet = {
# DelegatedRole inherits Role and some use cases can be found in the invalid_roles.
"missing hash prefixes and paths":
'{"name": "a", "keyids": ["keyid"], "threshold": 1, "terminating": false}',
@ -295,14 +279,14 @@ def test_delegated_role_serialization(self, test_case_data: str):
"paths": ["fn1", "fn2"], "path_hash_prefixes": ["h1", "h2"]}',
}
@run_sub_tests_with_dataset(invalid_delegated_roles)
@utils.run_sub_tests_with_dataset(invalid_delegated_roles)
def test_invalid_delegated_role_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
with self.assertRaises(ValueError):
DelegatedRole.from_dict(copy.copy(case_dict))
invalid_delegations: DataSet = {
invalid_delegations: utils.DataSet = {
"empty delegations": '{}',
"bad keys": '{"keys": "foo", \
"roles": [{"keyids": ["keyid"], "name": "a", "paths": ["fn1", "fn2"], "terminating": false, "threshold": 3}]}',
@ -316,14 +300,14 @@ def test_invalid_delegated_role_serialization(self, test_case_data: str):
}',
}
@run_sub_tests_with_dataset(invalid_delegations)
@utils.run_sub_tests_with_dataset(invalid_delegations)
def test_invalid_delegation_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
with self.assertRaises((ValueError, KeyError, AttributeError)):
Delegations.from_dict(copy.deepcopy(case_dict))
valid_delegations: DataSet = {
valid_delegations: utils.DataSet = {
"all":
'{"keys": { \
"keyid1" : {"keytype": "rsa", "scheme": "rsassa-pss-sha256", "keyval": {"public": "foo"}}, \
@ -341,28 +325,28 @@ def test_invalid_delegation_serialization(self, test_case_data: str):
}',
}
@run_sub_tests_with_dataset(valid_delegations)
@utils.run_sub_tests_with_dataset(valid_delegations)
def test_delegation_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
delegation = Delegations.from_dict(copy.deepcopy(case_dict))
self.assertDictEqual(case_dict, delegation.to_dict())
invalid_targetfiles: DataSet = {
invalid_targetfiles: utils.DataSet = {
"no hashes": '{"length": 1}',
"no length": '{"hashes": {"sha256": "abc"}}'
# The remaining cases are the same as for invalid_hashes and
# invalid_length datasets.
}
@run_sub_tests_with_dataset(invalid_targetfiles)
@utils.run_sub_tests_with_dataset(invalid_targetfiles)
def test_invalid_targetfile_serialization(self, test_case_data: Dict[str, str]):
case_dict = json.loads(test_case_data)
with self.assertRaises(KeyError):
TargetFile.from_dict(copy.deepcopy(case_dict), "file1.txt")
valid_targetfiles: DataSet = {
valid_targetfiles: utils.DataSet = {
"all": '{"length": 12, "hashes": {"sha256" : "abc"}, \
"custom" : {"foo": "bar"} }',
"no custom": '{"length": 12, "hashes": {"sha256" : "abc"}}',
@ -370,14 +354,14 @@ def test_invalid_targetfile_serialization(self, test_case_data: Dict[str, str]):
"custom" : {"foo": "bar"}, "foo": "bar"}',
}
@run_sub_tests_with_dataset(valid_targetfiles)
@utils.run_sub_tests_with_dataset(valid_targetfiles)
def test_targetfile_serialization(self, test_case_data: str):
case_dict = json.loads(test_case_data)
target_file = TargetFile.from_dict(copy.copy(case_dict), "file1.txt")
self.assertDictEqual(case_dict, target_file.to_dict())
valid_targets: DataSet = {
valid_targets: utils.DataSet = {
"all attributes": '{"_type": "targets", "spec_version": "1.0.0", "version": 1, "expires": "2030-01-01T00:00:00Z", \
"targets": { \
"file.txt": {"length": 12, "hashes": {"sha256" : "abc"} }, \
@ -403,7 +387,7 @@ def test_targetfile_serialization(self, test_case_data: str):
"targets": {}, "foo": "bar"}',
}
@run_sub_tests_with_dataset(valid_targets)
@utils.run_sub_tests_with_dataset(valid_targets)
def test_targets_serialization(self, test_case_data):
case_dict = json.loads(test_case_data)
targets = Targets.from_dict(copy.deepcopy(case_dict))

View file

@ -20,8 +20,10 @@
Provide common utilities for TUF tests
"""
import argparse
from contextlib import contextmanager
from typing import Dict, Any, Callable
import unittest
import argparse
import errno
import logging
import socket
@ -40,6 +42,22 @@
TEST_HOST_ADDRESS = '127.0.0.1'
# DataSet is only here so type hints can be used.
DataSet = Dict[str, Any]
# Test runner decorator: Runs the test as a set of N SubTests,
# (where N is number of items in dataset), feeding the actual test
# function one test case at a time
def run_sub_tests_with_dataset(dataset: DataSet):
def real_decorator(function: Callable[[unittest.TestCase, Any], None]):
def wrapper(test_cls: unittest.TestCase):
for case, data in dataset.items():
with test_cls.subTest(case=case):
function(test_cls, data)
return wrapper
return real_decorator
class TestServerProcessError(Exception):
def __init__(self, value="TestServerProcess"):