From e997097d1cd51b3f596a1adee5b3bdb84bbbbb8a Mon Sep 17 00:00:00 2001 From: Lukas Puehringer Date: Tue, 18 Aug 2020 17:04:55 +0200 Subject: [PATCH] Add generic Metadata.read_from_json class method Add generic read from json class method that returns a Metadata object with a signed field that contains the appropriate Signed subclass, based on the signed._type field of the read metadata. Signed-off-by: Lukas Puehringer --- tests/test_api.py | 34 ++++++++++++++++++++++++++++++++ tuf/api/metadata.py | 48 +++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 78 insertions(+), 4 deletions(-) diff --git a/tests/test_api.py b/tests/test_api.py index 3fdac39b..97580c60 100644 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -6,6 +6,7 @@ """ +import json import sys import logging import os @@ -27,8 +28,10 @@ def setUpModule(): # Since setUpModule is called after imports we need to import conditionally. if IS_PY_VERSION_SUPPORTED: from tuf.api.metadata import ( + Metadata, Snapshot, Timestamp, + Targets ) @@ -90,6 +93,37 @@ def tearDownClass(cls): # threshold = Threshold(1, 5) # return KeyRing(threshold=threshold, keys=key_list) + def test_generic_read(self): + for metadata, inner_metadata_cls in [ + ("snapshot", Snapshot), + ("timestamp", Timestamp), + ("targets", Targets)]: + + path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') + metadata_obj = Metadata.read_from_json(path) + + # Assert that generic method ... + # ... instantiates the right inner class for each metadata type + self.assertTrue( + isinstance(metadata_obj.signed, inner_metadata_cls)) + # ... and reads the same metadata file as the corresponding method + # on the inner class would do (compare their dict representation) + self.assertDictEqual( + metadata_obj.as_dict(), + inner_metadata_cls.read_from_json(path).as_dict()) + + # Assert that it chokes correctly on an unknown metadata type + bad_metadata_path = 'bad-metadata.json' + bad_metadata = {'signed': {'_type': 'bad-metadata'}} + with open(bad_metadata_path, 'wb') as f: + f.write(json.dumps(bad_metadata).encode('utf-8')) + + with self.assertRaises(ValueError): + Metadata.read_from_json(bad_metadata_path) + + os.remove(bad_metadata_path) + + def test_metadata_base(self): # Use of Snapshot is arbitrary, we're just testing the base class features # with real data diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index dddb47af..c75e0246 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -23,10 +23,6 @@ disable check as there might be a justified reason to write WIP metadata to json. - * It might be nice to have a generic Metadata.read_from_json that - can load any TUF role metadata and instantiate the appropriate object based - on the json '_type' field. - * Add Root metadata class """ @@ -116,6 +112,50 @@ def as_dict(self) -> JsonDict: # break # return len(verified_keyids) >= key_ring.threshold.least + @classmethod + def read_from_json( + cls, filename: str, + storage_backend: Optional[StorageBackendInterface] = None + ) -> 'Metadata': + """Loads JSON-formatted TUF metadata from a file storage. + + Arguments: + filename: The path to read the file from. + storage_backend: An object that implements + securesystemslib.storage.StorageBackendInterface. Per default + a (local) FilesystemBackend is used. + + Raises: + securesystemslib.exceptions.StorageError: The file cannot be read. + securesystemslib.exceptions.Error, ValueError: The metadata cannot + be parsed. + + Returns: + A TUF Metadata object. + + """ + signable = load_json_file(filename, storage_backend) + + # TODO: Should we use constants? + # And/or maybe a dispatch table? (<-- maybe too much magic) + _type = signable['signed']['_type'] + + if _type == 'targets': + inner_cls = Targets + elif _type == 'snapshot': + inner_cls = Snapshot + elif _type == 'timestamp': + inner_cls = Timestamp + elif _type == 'root': + # TODO: implement Root class + raise NotImplementedError('Root not yet implemented') + else: + raise ValueError(f'unrecognized metadata type "{_type}"') + + return Metadata( + signed=inner_cls(**signable['signed']), + signatures=signable['signatures']) + def write_to_json( self, filename: str,