diff --git a/tests/test_api.py b/tests/test_api.py index ec7d182b..c3f443e0 100755 --- a/tests/test_api.py +++ b/tests/test_api.py @@ -28,6 +28,12 @@ Targets ) +from tuf.api.serialization.json import ( + JSONSerializer, + JSONDeserializer, + CanonicalJSONSerializer +) + from securesystemslib.interface import ( import_ed25519_publickey_from_file, import_ed25519_privatekey_from_file @@ -89,10 +95,10 @@ def test_generic_read(self): # Load JSON-formatted metdata of each supported type from file # and from out-of-band read JSON string path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') - metadata_obj = Metadata.from_json_file(path) + metadata_obj = Metadata.from_file(path) with open(path, 'rb') as f: metadata_str = f.read() - metadata_obj2 = Metadata.from_json(metadata_str) + metadata_obj2 = JSONDeserializer().deserialize(metadata_str) # Assert that both methods instantiate the right inner class for # each metadata type and ... @@ -113,27 +119,27 @@ def test_generic_read(self): f.write(json.dumps(bad_metadata).encode('utf-8')) with self.assertRaises(ValueError): - Metadata.from_json_file(bad_metadata_path) + Metadata.from_file(bad_metadata_path) os.remove(bad_metadata_path) def test_compact_json(self): path = os.path.join(self.repo_dir, 'metadata', 'targets.json') - metadata_obj = Metadata.from_json_file(path) + metadata_obj = Metadata.from_file(path) self.assertTrue( - len(metadata_obj.to_json(compact=True)) < - len(metadata_obj.to_json())) + len(JSONSerializer(compact=True).serialize(metadata_obj)) < + len(JSONSerializer().serialize(metadata_obj))) def test_read_write_read_compare(self): for metadata in ['snapshot', 'timestamp', 'targets']: path = os.path.join(self.repo_dir, 'metadata', metadata + '.json') - metadata_obj = Metadata.from_json_file(path) + metadata_obj = Metadata.from_file(path) path_2 = path + '.tmp' - metadata_obj.to_json_file(path_2) - metadata_obj_2 = Metadata.from_json_file(path_2) + metadata_obj.to_file(path_2) + metadata_obj_2 = Metadata.from_file(path_2) self.assertDictEqual( metadata_obj.to_dict(), @@ -145,7 +151,7 @@ def test_read_write_read_compare(self): def test_sign_verify(self): # Load sample metadata (targets) and assert ... path = os.path.join(self.repo_dir, 'metadata', 'targets.json') - metadata_obj = Metadata.from_json_file(path) + metadata_obj = Metadata.from_file(path) # ... it has a single existing signature, self.assertTrue(len(metadata_obj.signatures) == 1) @@ -192,7 +198,7 @@ def test_metadata_base(self): # with real data snapshot_path = os.path.join( self.repo_dir, 'metadata', 'snapshot.json') - md = Metadata.from_json_file(snapshot_path) + md = Metadata.from_file(snapshot_path) self.assertEqual(md.signed.version, 1) md.signed.bump_version() @@ -207,7 +213,7 @@ def test_metadata_base(self): def test_metadata_snapshot(self): snapshot_path = os.path.join( self.repo_dir, 'metadata', 'snapshot.json') - snapshot = Metadata.from_json_file(snapshot_path) + snapshot = Metadata.from_file(snapshot_path) # Create a dict representing what we expect the updated data to be fileinfo = copy.deepcopy(snapshot.signed.meta) @@ -225,7 +231,7 @@ def test_metadata_snapshot(self): def test_metadata_timestamp(self): timestamp_path = os.path.join( self.repo_dir, 'metadata', 'timestamp.json') - timestamp = Metadata.from_json_file(timestamp_path) + timestamp = Metadata.from_file(timestamp_path) self.assertEqual(timestamp.signed.version, 1) timestamp.signed.bump_version() @@ -260,7 +266,7 @@ def test_metadata_timestamp(self): def test_metadata_root(self): root_path = os.path.join( self.repo_dir, 'metadata', 'root.json') - root = Metadata.from_json_file(root_path) + root = Metadata.from_file(root_path) # Add a second key to root role root_key2 = import_ed25519_publickey_from_file( @@ -293,7 +299,7 @@ def test_metadata_root(self): def test_metadata_targets(self): targets_path = os.path.join( self.repo_dir, 'metadata', 'targets.json') - targets = Metadata.from_json_file(targets_path) + targets = Metadata.from_file(targets_path) # Create a fileinfo dict representing what we expect the updated data to be filename = 'file2.txt' diff --git a/tuf/api/metadata.py b/tuf/api/metadata.py index a747be6d..393a2a79 100644 --- a/tuf/api/metadata.py +++ b/tuf/api/metadata.py @@ -1,7 +1,7 @@ """TUF role metadata model. This module provides container classes for TUF role metadata, including methods -to read/serialize/write from and to JSON, perform TUF-compliant metadata +to read/serialize/write from and to file, perform TUF-compliant metadata updates, and create and verify signatures. """ @@ -9,22 +9,21 @@ from datetime import datetime, timedelta from typing import Any, Dict, Optional -import json import tempfile -from securesystemslib.formats import encode_canonical -from securesystemslib.util import ( - load_json_file, - load_json_string, - persist_temp_file -) -from securesystemslib.storage import StorageBackendInterface +from securesystemslib.util import persist_temp_file +from securesystemslib.storage import (StorageBackendInterface, + FilesystemBackend) from securesystemslib.keys import create_signature, verify_signature +from tuf.api.serialization import (MetadataSerializer, MetadataDeserializer, + SignedSerializer) + import tuf.formats import tuf.exceptions + # Types JsonDict = Dict[str, Any] @@ -101,32 +100,17 @@ class also that has a 'from_dict' factory method. (Currently this is @classmethod - def from_json(cls, metadata_json: str) -> 'Metadata': - """Loads JSON-formatted TUF metadata from a string. - - Arguments: - metadata_json: TUF metadata in JSON-string representation. - - Raises: - securesystemslib.exceptions.Error, ValueError, KeyError: The - metadata cannot be parsed. - - Returns: - A TUF Metadata object. - - """ - return cls.from_dict(load_json_string(metadata_json)) - - - @classmethod - def from_json_file( - cls, filename: str, - storage_backend: Optional[StorageBackendInterface] = None - ) -> 'Metadata': - """Loads JSON-formatted TUF metadata from file storage. + def from_file( + cls, filename: str, deserializer: MetadataDeserializer = None, + storage_backend: Optional[StorageBackendInterface] = None + ) -> 'Metadata': + """Loads TUF metadata from file storage. Arguments: filename: The path to read the file from. + deserializer: A MetadataDeserializer subclass instance that + implements the desired wireline format deserialization. Per + default a JSONDeserializer is used. storage_backend: An object that implements securesystemslib.storage.StorageBackendInterface. Per default a (local) FilesystemBackend is used. @@ -140,7 +124,19 @@ def from_json_file( A TUF Metadata object. """ - return cls.from_dict(load_json_file(filename, storage_backend)) + if deserializer is None: + # Function-scope import to avoid circular dependency. Yucky!!! + # TODO: At least move to _get_default_metadata_deserializer helper. + from tuf.api.serialization.json import JSONDeserializer # pylint: disable=import-outside-toplevel + deserializer = JSONDeserializer() + + if storage_backend is None: + storage_backend = FilesystemBackend() + + with storage_backend.get(filename) as file_obj: + raw_data = file_obj.read() + + return deserializer.deserialize(raw_data) # Serialization. @@ -151,40 +147,38 @@ def to_dict(self) -> JsonDict: 'signed': self.signed.to_dict() } - - def to_json(self, compact: bool = False) -> None: - """Returns the optionally compacted JSON representation of self. """ - return json.dumps( - self.to_dict(), - indent=(None if compact else 1), - separators=((',', ':') if compact else (',', ': ')), - sort_keys=True) - - - def to_json_file( - self, filename: str, compact: bool = False, - storage_backend: StorageBackendInterface = None) -> None: - """Writes the JSON representation of self to file storage. + def to_file(self, filename: str, serializer: MetadataSerializer = None, + storage_backend: StorageBackendInterface = None) -> None: + """Writes TUF metadata to file storage. Arguments: filename: The path to write the file to. - compact: A boolean indicating if the JSON string should be compact - by excluding whitespace. + serializer: A MetadataSerializer subclass instance that implements + the desired wireline format serialization. Per default a + JSONSerializer is used. storage_backend: An object that implements securesystemslib.storage.StorageBackendInterface. Per default a (local) FilesystemBackend is used. + Raises: securesystemslib.exceptions.StorageError: The file cannot be written. """ + if serializer is None: + # Function-scope import to avoid circular dependency. Yucky!!! + # TODO: At least move to a _get_default_metadata_serializer helper. + from tuf.api.serialization.json import JSONSerializer # pylint: disable=import-outside-toplevel + serializer = JSONSerializer(True) # Pass True to compact JSON + with tempfile.TemporaryFile() as temp_file: - temp_file.write(self.to_json(compact).encode('utf-8')) + temp_file.write(serializer.serialize(self)) persist_temp_file(temp_file, filename, storage_backend) # Signatures. - def sign(self, key: JsonDict, append: bool = False) -> JsonDict: + def sign(self, key: JsonDict, append: bool = False, + serializer: SignedSerializer = None) -> JsonDict: """Creates signature over 'signed' and assigns it to 'signatures'. Arguments: @@ -192,6 +186,9 @@ def sign(self, key: JsonDict, append: bool = False) -> JsonDict: append: A boolean indicating if the signature should be appended to the list of signatures or replace any existing signatures. The default behavior is to replace signatures. + serializer: A SignedSerializer subclass instance that implements + the desired canonicalization format. Per default a + CanonicalJSONSerializer is used. Raises: securesystemslib.exceptions.FormatError: Key argument is malformed. @@ -203,7 +200,13 @@ def sign(self, key: JsonDict, append: bool = False) -> JsonDict: A securesystemslib-style signature object. """ - signature = create_signature(key, self.signed.to_canonical_bytes()) + if serializer is None: + # Function-scope import to avoid circular dependency. Yucky!!! + # TODO: At least move to a _get_default_signed_serializer helper. + from tuf.api.serialization.json import CanonicalJSONSerializer # pylint: disable=import-outside-toplevel + serializer = CanonicalJSONSerializer() + + signature = create_signature(key, serializer.serialize(self.signed)) if append: self.signatures.append(signature) @@ -213,11 +216,15 @@ def sign(self, key: JsonDict, append: bool = False) -> JsonDict: return signature - def verify(self, key: JsonDict) -> bool: + def verify(self, key: JsonDict, + serializer: SignedSerializer = None) -> bool: """Verifies 'signatures' over 'signed' that match the passed key by id. Arguments: key: A securesystemslib-style public key object. + serializer: A SignedSerializer subclass instance that implements + the desired canonicalization format. Per default a + CanonicalJSONSerializer is used. Raises: # TODO: Revise exception taxonomy @@ -243,9 +250,15 @@ def verify(self, key: JsonDict) -> bool: f'{len(signatures_for_keyid)} signatures for key ' f'{key["keyid"]}, not sure which one to verify.') + if serializer is None: + # Function-scope import to avoid circular dependency. Yucky!!! + # TODO: At least move to a _get_default_signed_serializer helper. + from tuf.api.serialization.json import CanonicalJSONSerializer # pylint: disable=import-outside-toplevel + serializer = CanonicalJSONSerializer() + return verify_signature( key, signatures_for_keyid[0], - self.signed.to_canonical_bytes()) + serializer.serialize(self.signed)) @@ -307,12 +320,6 @@ def from_dict(cls, signed_dict: JsonDict) -> 'Signed': return cls(**signed_dict) - # Serialization. - def to_canonical_bytes(self) -> bytes: - """Returns the UTF-8 encoded canonical JSON representation of self. """ - return encode_canonical(self.to_dict()).encode('UTF-8') - - def to_dict(self) -> JsonDict: """Returns the JSON-serializable dictionary representation of self. """ return { diff --git a/tuf/api/serialization/json.py b/tuf/api/serialization/json.py index 28152e18..3f25085d 100644 --- a/tuf/api/serialization/json.py +++ b/tuf/api/serialization/json.py @@ -10,6 +10,9 @@ from securesystemslib.formats import encode_canonical +# pylint: disable=cyclic-import +# ... to allow de/serializing the correct metadata class here, while also +# creating default de/serializers there (see metadata function scope imports). from tuf.api.metadata import Metadata, Signed from tuf.api.serialization import (MetadataSerializer, MetadataDeserializer,