mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Adopt serialization sub-package in metadata API
- Rename Metadata methods: - to_json_file -> to_file - from_json_file -> from_file - Remove Metadata.from_json/to_json - Remove Signed.to_canonical_bytes - Accept optional de/serializer arguments: - from_file (default: JSONDeserializer) - to_file (default: JSONSerializer) - sign, verify (default: CanonicalJSONSerializer) - inline disable pylint cyclic-import checks Signed-off-by: Lukas Puehringer <lukas.puehringer@nyu.edu>
This commit is contained in:
parent
4a22b4a578
commit
499f1c858e
3 changed files with 91 additions and 75 deletions
|
|
@ -28,6 +28,12 @@
|
|||
Targets
|
||||
)
|
||||
|
||||
from tuf.api.serialization.json import (
|
||||
JSONSerializer,
|
||||
JSONDeserializer,
|
||||
CanonicalJSONSerializer
|
||||
)
|
||||
|
||||
from securesystemslib.interface import (
|
||||
import_ed25519_publickey_from_file,
|
||||
import_ed25519_privatekey_from_file
|
||||
|
|
@ -89,10 +95,10 @@ def test_generic_read(self):
|
|||
# Load JSON-formatted metdata of each supported type from file
|
||||
# and from out-of-band read JSON string
|
||||
path = os.path.join(self.repo_dir, 'metadata', metadata + '.json')
|
||||
metadata_obj = Metadata.from_json_file(path)
|
||||
metadata_obj = Metadata.from_file(path)
|
||||
with open(path, 'rb') as f:
|
||||
metadata_str = f.read()
|
||||
metadata_obj2 = Metadata.from_json(metadata_str)
|
||||
metadata_obj2 = JSONDeserializer().deserialize(metadata_str)
|
||||
|
||||
# Assert that both methods instantiate the right inner class for
|
||||
# each metadata type and ...
|
||||
|
|
@ -113,27 +119,27 @@ def test_generic_read(self):
|
|||
f.write(json.dumps(bad_metadata).encode('utf-8'))
|
||||
|
||||
with self.assertRaises(ValueError):
|
||||
Metadata.from_json_file(bad_metadata_path)
|
||||
Metadata.from_file(bad_metadata_path)
|
||||
|
||||
os.remove(bad_metadata_path)
|
||||
|
||||
|
||||
def test_compact_json(self):
|
||||
path = os.path.join(self.repo_dir, 'metadata', 'targets.json')
|
||||
metadata_obj = Metadata.from_json_file(path)
|
||||
metadata_obj = Metadata.from_file(path)
|
||||
self.assertTrue(
|
||||
len(metadata_obj.to_json(compact=True)) <
|
||||
len(metadata_obj.to_json()))
|
||||
len(JSONSerializer(compact=True).serialize(metadata_obj)) <
|
||||
len(JSONSerializer().serialize(metadata_obj)))
|
||||
|
||||
|
||||
def test_read_write_read_compare(self):
|
||||
for metadata in ['snapshot', 'timestamp', 'targets']:
|
||||
path = os.path.join(self.repo_dir, 'metadata', metadata + '.json')
|
||||
metadata_obj = Metadata.from_json_file(path)
|
||||
metadata_obj = Metadata.from_file(path)
|
||||
|
||||
path_2 = path + '.tmp'
|
||||
metadata_obj.to_json_file(path_2)
|
||||
metadata_obj_2 = Metadata.from_json_file(path_2)
|
||||
metadata_obj.to_file(path_2)
|
||||
metadata_obj_2 = Metadata.from_file(path_2)
|
||||
|
||||
self.assertDictEqual(
|
||||
metadata_obj.to_dict(),
|
||||
|
|
@ -145,7 +151,7 @@ def test_read_write_read_compare(self):
|
|||
def test_sign_verify(self):
|
||||
# Load sample metadata (targets) and assert ...
|
||||
path = os.path.join(self.repo_dir, 'metadata', 'targets.json')
|
||||
metadata_obj = Metadata.from_json_file(path)
|
||||
metadata_obj = Metadata.from_file(path)
|
||||
|
||||
# ... it has a single existing signature,
|
||||
self.assertTrue(len(metadata_obj.signatures) == 1)
|
||||
|
|
@ -192,7 +198,7 @@ def test_metadata_base(self):
|
|||
# with real data
|
||||
snapshot_path = os.path.join(
|
||||
self.repo_dir, 'metadata', 'snapshot.json')
|
||||
md = Metadata.from_json_file(snapshot_path)
|
||||
md = Metadata.from_file(snapshot_path)
|
||||
|
||||
self.assertEqual(md.signed.version, 1)
|
||||
md.signed.bump_version()
|
||||
|
|
@ -207,7 +213,7 @@ def test_metadata_base(self):
|
|||
def test_metadata_snapshot(self):
|
||||
snapshot_path = os.path.join(
|
||||
self.repo_dir, 'metadata', 'snapshot.json')
|
||||
snapshot = Metadata.from_json_file(snapshot_path)
|
||||
snapshot = Metadata.from_file(snapshot_path)
|
||||
|
||||
# Create a dict representing what we expect the updated data to be
|
||||
fileinfo = copy.deepcopy(snapshot.signed.meta)
|
||||
|
|
@ -225,7 +231,7 @@ def test_metadata_snapshot(self):
|
|||
def test_metadata_timestamp(self):
|
||||
timestamp_path = os.path.join(
|
||||
self.repo_dir, 'metadata', 'timestamp.json')
|
||||
timestamp = Metadata.from_json_file(timestamp_path)
|
||||
timestamp = Metadata.from_file(timestamp_path)
|
||||
|
||||
self.assertEqual(timestamp.signed.version, 1)
|
||||
timestamp.signed.bump_version()
|
||||
|
|
@ -260,7 +266,7 @@ def test_metadata_timestamp(self):
|
|||
def test_metadata_root(self):
|
||||
root_path = os.path.join(
|
||||
self.repo_dir, 'metadata', 'root.json')
|
||||
root = Metadata.from_json_file(root_path)
|
||||
root = Metadata.from_file(root_path)
|
||||
|
||||
# Add a second key to root role
|
||||
root_key2 = import_ed25519_publickey_from_file(
|
||||
|
|
@ -293,7 +299,7 @@ def test_metadata_root(self):
|
|||
def test_metadata_targets(self):
|
||||
targets_path = os.path.join(
|
||||
self.repo_dir, 'metadata', 'targets.json')
|
||||
targets = Metadata.from_json_file(targets_path)
|
||||
targets = Metadata.from_file(targets_path)
|
||||
|
||||
# Create a fileinfo dict representing what we expect the updated data to be
|
||||
filename = 'file2.txt'
|
||||
|
|
|
|||
|
|
@ -1,7 +1,7 @@
|
|||
"""TUF role metadata model.
|
||||
|
||||
This module provides container classes for TUF role metadata, including methods
|
||||
to read/serialize/write from and to JSON, perform TUF-compliant metadata
|
||||
to read/serialize/write from and to file, perform TUF-compliant metadata
|
||||
updates, and create and verify signatures.
|
||||
|
||||
"""
|
||||
|
|
@ -9,22 +9,21 @@
|
|||
from datetime import datetime, timedelta
|
||||
from typing import Any, Dict, Optional
|
||||
|
||||
import json
|
||||
import tempfile
|
||||
|
||||
from securesystemslib.formats import encode_canonical
|
||||
from securesystemslib.util import (
|
||||
load_json_file,
|
||||
load_json_string,
|
||||
persist_temp_file
|
||||
)
|
||||
from securesystemslib.storage import StorageBackendInterface
|
||||
from securesystemslib.util import persist_temp_file
|
||||
from securesystemslib.storage import (StorageBackendInterface,
|
||||
FilesystemBackend)
|
||||
from securesystemslib.keys import create_signature, verify_signature
|
||||
|
||||
from tuf.api.serialization import (MetadataSerializer, MetadataDeserializer,
|
||||
SignedSerializer)
|
||||
|
||||
import tuf.formats
|
||||
import tuf.exceptions
|
||||
|
||||
|
||||
|
||||
# Types
|
||||
JsonDict = Dict[str, Any]
|
||||
|
||||
|
|
@ -101,32 +100,17 @@ class also that has a 'from_dict' factory method. (Currently this is
|
|||
|
||||
|
||||
@classmethod
|
||||
def from_json(cls, metadata_json: str) -> 'Metadata':
|
||||
"""Loads JSON-formatted TUF metadata from a string.
|
||||
|
||||
Arguments:
|
||||
metadata_json: TUF metadata in JSON-string representation.
|
||||
|
||||
Raises:
|
||||
securesystemslib.exceptions.Error, ValueError, KeyError: The
|
||||
metadata cannot be parsed.
|
||||
|
||||
Returns:
|
||||
A TUF Metadata object.
|
||||
|
||||
"""
|
||||
return cls.from_dict(load_json_string(metadata_json))
|
||||
|
||||
|
||||
@classmethod
|
||||
def from_json_file(
|
||||
cls, filename: str,
|
||||
storage_backend: Optional[StorageBackendInterface] = None
|
||||
) -> 'Metadata':
|
||||
"""Loads JSON-formatted TUF metadata from file storage.
|
||||
def from_file(
|
||||
cls, filename: str, deserializer: MetadataDeserializer = None,
|
||||
storage_backend: Optional[StorageBackendInterface] = None
|
||||
) -> 'Metadata':
|
||||
"""Loads TUF metadata from file storage.
|
||||
|
||||
Arguments:
|
||||
filename: The path to read the file from.
|
||||
deserializer: A MetadataDeserializer subclass instance that
|
||||
implements the desired wireline format deserialization. Per
|
||||
default a JSONDeserializer is used.
|
||||
storage_backend: An object that implements
|
||||
securesystemslib.storage.StorageBackendInterface. Per default
|
||||
a (local) FilesystemBackend is used.
|
||||
|
|
@ -140,7 +124,19 @@ def from_json_file(
|
|||
A TUF Metadata object.
|
||||
|
||||
"""
|
||||
return cls.from_dict(load_json_file(filename, storage_backend))
|
||||
if deserializer is None:
|
||||
# Function-scope import to avoid circular dependency. Yucky!!!
|
||||
# TODO: At least move to _get_default_metadata_deserializer helper.
|
||||
from tuf.api.serialization.json import JSONDeserializer # pylint: disable=import-outside-toplevel
|
||||
deserializer = JSONDeserializer()
|
||||
|
||||
if storage_backend is None:
|
||||
storage_backend = FilesystemBackend()
|
||||
|
||||
with storage_backend.get(filename) as file_obj:
|
||||
raw_data = file_obj.read()
|
||||
|
||||
return deserializer.deserialize(raw_data)
|
||||
|
||||
|
||||
# Serialization.
|
||||
|
|
@ -151,40 +147,38 @@ def to_dict(self) -> JsonDict:
|
|||
'signed': self.signed.to_dict()
|
||||
}
|
||||
|
||||
|
||||
def to_json(self, compact: bool = False) -> None:
|
||||
"""Returns the optionally compacted JSON representation of self. """
|
||||
return json.dumps(
|
||||
self.to_dict(),
|
||||
indent=(None if compact else 1),
|
||||
separators=((',', ':') if compact else (',', ': ')),
|
||||
sort_keys=True)
|
||||
|
||||
|
||||
def to_json_file(
|
||||
self, filename: str, compact: bool = False,
|
||||
storage_backend: StorageBackendInterface = None) -> None:
|
||||
"""Writes the JSON representation of self to file storage.
|
||||
def to_file(self, filename: str, serializer: MetadataSerializer = None,
|
||||
storage_backend: StorageBackendInterface = None) -> None:
|
||||
"""Writes TUF metadata to file storage.
|
||||
|
||||
Arguments:
|
||||
filename: The path to write the file to.
|
||||
compact: A boolean indicating if the JSON string should be compact
|
||||
by excluding whitespace.
|
||||
serializer: A MetadataSerializer subclass instance that implements
|
||||
the desired wireline format serialization. Per default a
|
||||
JSONSerializer is used.
|
||||
storage_backend: An object that implements
|
||||
securesystemslib.storage.StorageBackendInterface. Per default
|
||||
a (local) FilesystemBackend is used.
|
||||
|
||||
Raises:
|
||||
securesystemslib.exceptions.StorageError:
|
||||
The file cannot be written.
|
||||
|
||||
"""
|
||||
if serializer is None:
|
||||
# Function-scope import to avoid circular dependency. Yucky!!!
|
||||
# TODO: At least move to a _get_default_metadata_serializer helper.
|
||||
from tuf.api.serialization.json import JSONSerializer # pylint: disable=import-outside-toplevel
|
||||
serializer = JSONSerializer(True) # Pass True to compact JSON
|
||||
|
||||
with tempfile.TemporaryFile() as temp_file:
|
||||
temp_file.write(self.to_json(compact).encode('utf-8'))
|
||||
temp_file.write(serializer.serialize(self))
|
||||
persist_temp_file(temp_file, filename, storage_backend)
|
||||
|
||||
|
||||
# Signatures.
|
||||
def sign(self, key: JsonDict, append: bool = False) -> JsonDict:
|
||||
def sign(self, key: JsonDict, append: bool = False,
|
||||
serializer: SignedSerializer = None) -> JsonDict:
|
||||
"""Creates signature over 'signed' and assigns it to 'signatures'.
|
||||
|
||||
Arguments:
|
||||
|
|
@ -192,6 +186,9 @@ def sign(self, key: JsonDict, append: bool = False) -> JsonDict:
|
|||
append: A boolean indicating if the signature should be appended to
|
||||
the list of signatures or replace any existing signatures. The
|
||||
default behavior is to replace signatures.
|
||||
serializer: A SignedSerializer subclass instance that implements
|
||||
the desired canonicalization format. Per default a
|
||||
CanonicalJSONSerializer is used.
|
||||
|
||||
Raises:
|
||||
securesystemslib.exceptions.FormatError: Key argument is malformed.
|
||||
|
|
@ -203,7 +200,13 @@ def sign(self, key: JsonDict, append: bool = False) -> JsonDict:
|
|||
A securesystemslib-style signature object.
|
||||
|
||||
"""
|
||||
signature = create_signature(key, self.signed.to_canonical_bytes())
|
||||
if serializer is None:
|
||||
# Function-scope import to avoid circular dependency. Yucky!!!
|
||||
# TODO: At least move to a _get_default_signed_serializer helper.
|
||||
from tuf.api.serialization.json import CanonicalJSONSerializer # pylint: disable=import-outside-toplevel
|
||||
serializer = CanonicalJSONSerializer()
|
||||
|
||||
signature = create_signature(key, serializer.serialize(self.signed))
|
||||
|
||||
if append:
|
||||
self.signatures.append(signature)
|
||||
|
|
@ -213,11 +216,15 @@ def sign(self, key: JsonDict, append: bool = False) -> JsonDict:
|
|||
return signature
|
||||
|
||||
|
||||
def verify(self, key: JsonDict) -> bool:
|
||||
def verify(self, key: JsonDict,
|
||||
serializer: SignedSerializer = None) -> bool:
|
||||
"""Verifies 'signatures' over 'signed' that match the passed key by id.
|
||||
|
||||
Arguments:
|
||||
key: A securesystemslib-style public key object.
|
||||
serializer: A SignedSerializer subclass instance that implements
|
||||
the desired canonicalization format. Per default a
|
||||
CanonicalJSONSerializer is used.
|
||||
|
||||
Raises:
|
||||
# TODO: Revise exception taxonomy
|
||||
|
|
@ -243,9 +250,15 @@ def verify(self, key: JsonDict) -> bool:
|
|||
f'{len(signatures_for_keyid)} signatures for key '
|
||||
f'{key["keyid"]}, not sure which one to verify.')
|
||||
|
||||
if serializer is None:
|
||||
# Function-scope import to avoid circular dependency. Yucky!!!
|
||||
# TODO: At least move to a _get_default_signed_serializer helper.
|
||||
from tuf.api.serialization.json import CanonicalJSONSerializer # pylint: disable=import-outside-toplevel
|
||||
serializer = CanonicalJSONSerializer()
|
||||
|
||||
return verify_signature(
|
||||
key, signatures_for_keyid[0],
|
||||
self.signed.to_canonical_bytes())
|
||||
serializer.serialize(self.signed))
|
||||
|
||||
|
||||
|
||||
|
|
@ -307,12 +320,6 @@ def from_dict(cls, signed_dict: JsonDict) -> 'Signed':
|
|||
return cls(**signed_dict)
|
||||
|
||||
|
||||
# Serialization.
|
||||
def to_canonical_bytes(self) -> bytes:
|
||||
"""Returns the UTF-8 encoded canonical JSON representation of self. """
|
||||
return encode_canonical(self.to_dict()).encode('UTF-8')
|
||||
|
||||
|
||||
def to_dict(self) -> JsonDict:
|
||||
"""Returns the JSON-serializable dictionary representation of self. """
|
||||
return {
|
||||
|
|
|
|||
|
|
@ -10,6 +10,9 @@
|
|||
|
||||
from securesystemslib.formats import encode_canonical
|
||||
|
||||
# pylint: disable=cyclic-import
|
||||
# ... to allow de/serializing the correct metadata class here, while also
|
||||
# creating default de/serializers there (see metadata function scope imports).
|
||||
from tuf.api.metadata import Metadata, Signed
|
||||
from tuf.api.serialization import (MetadataSerializer,
|
||||
MetadataDeserializer,
|
||||
|
|
|
|||
Loading…
Reference in a new issue