diff --git a/examples/client/client b/examples/client/client index 9c509816..e1f08672 100755 --- a/examples/client/client +++ b/examples/client/client @@ -15,6 +15,7 @@ from urllib import request from tuf.api.exceptions import DownloadError, RepositoryError from tuf.ngclient import Updater, UpdaterConfig +from tuf.ngclient.config import EnvelopeType # constants DOWNLOAD_DIR = "./downloads" @@ -74,7 +75,7 @@ def download(base_url: str, target: str, use_dsse: bool) -> bool: os.mkdir(DOWNLOAD_DIR) config = UpdaterConfig() - config.use_dsse = use_dsse + config.envelope_type = EnvelopeType.SIMPLE try: updater = Updater( diff --git a/tests/test_trusted_metadata_set.py b/tests/test_trusted_metadata_set.py index 510fae2b..71d9bf16 100644 --- a/tests/test_trusted_metadata_set.py +++ b/tests/test_trusted_metadata_set.py @@ -26,6 +26,7 @@ ) from tuf.api.serialization.json import JSONSerializer from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet +from tuf.ngclient.config import EnvelopeType logger = logging.getLogger(__name__) @@ -94,7 +95,9 @@ def hashes_length_modifier(timestamp: Timestamp) -> None: ) def setUp(self) -> None: - self.trusted_set = TrustedMetadataSet(self.metadata[Root.type]) + self.trusted_set = TrustedMetadataSet( + self.metadata[Root.type], EnvelopeType.METADATA + ) def _update_all_besides_targets( self, @@ -193,22 +196,37 @@ def test_out_of_order_ops(self) -> None: self.metadata["role1"], "role1", Targets.type ) - def test_root_with_invalid_json(self) -> None: - # Test loading initial root and root update - for test_func in [TrustedMetadataSet, self.trusted_set.update_root]: - # root is not json - with self.assertRaises(exceptions.RepositoryError): - test_func(b"") # type: ignore[operator] + def test_bad_initial_root(self) -> None: + # root is not json + with self.assertRaises(exceptions.RepositoryError): + TrustedMetadataSet(b"", EnvelopeType.METADATA) - # root is invalid - root = Metadata.from_bytes(self.metadata[Root.type]) - root.signed.version += 1 - with self.assertRaises(exceptions.UnsignedMetadataError): - test_func(root.to_bytes()) # type: ignore[operator] + # root is invalid + root = Metadata.from_bytes(self.metadata[Root.type]) + root.signed.version += 1 + with self.assertRaises(exceptions.UnsignedMetadataError): + TrustedMetadataSet(root.to_bytes(), EnvelopeType.METADATA) - # metadata is of wrong type - with self.assertRaises(exceptions.RepositoryError): - test_func(self.metadata[Snapshot.type]) # type: ignore[operator] + # metadata is of wrong type + with self.assertRaises(exceptions.RepositoryError): + TrustedMetadataSet( + self.metadata[Snapshot.type], EnvelopeType.METADATA + ) + + def test_bad_root_update(self) -> None: + # root is not json + with self.assertRaises(exceptions.RepositoryError): + self.trusted_set.update_root(b"") + + # root is invalid + root = Metadata.from_bytes(self.metadata[Root.type]) + root.signed.version += 1 + with self.assertRaises(exceptions.UnsignedMetadataError): + self.trusted_set.update_root(root.to_bytes()) + + # metadata is of wrong type + with self.assertRaises(exceptions.RepositoryError): + self.trusted_set.update_root(self.metadata[Snapshot.type]) def test_top_level_md_with_invalid_json(self) -> None: top_level_md: List[Tuple[bytes, Callable[[bytes], Signed]]] = [ @@ -261,7 +279,7 @@ def root_expired_modifier(root: Root) -> None: # intermediate root can be expired root = self.modify_metadata(Root.type, root_expired_modifier) - tmp_trusted_set = TrustedMetadataSet(root) + tmp_trusted_set = TrustedMetadataSet(root, EnvelopeType.METADATA) # update timestamp to trigger final root expiry check with self.assertRaises(exceptions.ExpiredMetadataError): tmp_trusted_set.update_timestamp(self.metadata[Timestamp.type]) diff --git a/tuf/ngclient/_internal/trusted_metadata_set.py b/tuf/ngclient/_internal/trusted_metadata_set.py index fea8992e..6d049063 100644 --- a/tuf/ngclient/_internal/trusted_metadata_set.py +++ b/tuf/ngclient/_internal/trusted_metadata_set.py @@ -34,7 +34,7 @@ >>> # Load local root (RepositoryErrors here stop the update) >>> with open(root_path, "rb") as f: ->>> trusted_set = TrustedMetadataSet(f.read()) +>>> trusted_set = TrustedMetadataSet(f.read(), EnvelopeType.METADATA) >>> >>> # update root from remote until no more are available >>> with download(Root.type, trusted_set.root.version + 1) as f: @@ -68,7 +68,12 @@ from tuf.api import exceptions from tuf.api.metadata import Root, Signed, Snapshot, Targets, Timestamp -from tuf.ngclient._internal.wrapping import MetadataUnwrapper, Unwrapper +from tuf.ngclient._internal.wrapping import ( + EnvelopeUnwrapper, + MetadataUnwrapper, + Unwrapper, +) +from tuf.ngclient.config import EnvelopeType logger = logging.getLogger(__name__) @@ -82,22 +87,26 @@ class TrustedMetadataSet(abc.Mapping): what is updated. """ - def __init__(self, root_data: bytes, unwrapper: Optional[Unwrapper] = None): + def __init__(self, root_data: bytes, envelope_type: EnvelopeType): """Initialize ``TrustedMetadataSet`` by loading trusted root metadata. Args: root_data: Trusted root metadata as bytes. Note that this metadata will only be verified by itself: it is the source of trust for all metadata in the ``TrustedMetadataSet`` - unwrapper: Used to unwrap and verify metadata. Default is - MetadataUnwrapper. + envelope_type: Configures deserialization and verification mode of + TUF metadata. Raises: RepositoryError: Metadata failed to load or verify. The actual error type and content will contain more details. """ - if unwrapper is None: + unwrapper: Unwrapper + if envelope_type is EnvelopeType.SIMPLE: + unwrapper = EnvelopeUnwrapper() + else: unwrapper = MetadataUnwrapper() + self._unwrapper = unwrapper self._trusted_set: Dict[str, Signed] = {} diff --git a/tuf/ngclient/config.py b/tuf/ngclient/config.py index 4d8ee40a..3ef29406 100644 --- a/tuf/ngclient/config.py +++ b/tuf/ngclient/config.py @@ -5,6 +5,20 @@ """ from dataclasses import dataclass +from enum import Flag, unique + + +@unique +class EnvelopeType(Flag): + """Configures deserialization and verification mode of TUF metadata. + + Args: + METADATA: Traditional canonical JSON -based TUF Metadata. + SIMPLE: Dead Simple Signing Envelope. (experimental) + """ + + METADATA = 1 + SIMPLE = 2 @dataclass @@ -23,8 +37,9 @@ class UpdaterConfig: are used, target download URLs are formed by prefixing the filename with a hash digest of file content by default. This can be overridden by setting ``prefix_targets_with_hash`` to ``False``. - use_dsse: If true, expect metadata in a DSSE Envelope. Use - traditional Metadata (canonical json) otherwise. + envelope_type: Configures deserialization and verification mode of TUF + metadata. Per default, it is treated as traditional canonical JSON + -based TUF Metadata. """ max_root_rotations: int = 32 @@ -34,4 +49,4 @@ class UpdaterConfig: snapshot_max_length: int = 2000000 # bytes targets_max_length: int = 5000000 # bytes prefix_targets_with_hash: bool = True - use_dsse: bool = False + envelope_type: EnvelopeType = EnvelopeType.METADATA diff --git a/tuf/ngclient/updater.py b/tuf/ngclient/updater.py index 5db8519b..2cfccc66 100644 --- a/tuf/ngclient/updater.py +++ b/tuf/ngclient/updater.py @@ -46,12 +46,8 @@ from tuf.api import exceptions from tuf.api.metadata import Root, Snapshot, TargetFile, Targets, Timestamp -from tuf.ngclient._internal import ( - requests_fetcher, - trusted_metadata_set, - wrapping, -) -from tuf.ngclient.config import UpdaterConfig +from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set +from tuf.ngclient.config import EnvelopeType, UpdaterConfig from tuf.ngclient.fetcher import FetcherInterface logger = logging.getLogger(__name__) @@ -101,12 +97,15 @@ def __init__( self._fetcher = fetcher or requests_fetcher.RequestsFetcher() self.config = config or UpdaterConfig() - unwrapper: Optional[wrapping.Unwrapper] = None - if self.config.use_dsse: - unwrapper = wrapping.EnvelopeUnwrapper() + supported_envelopes = [EnvelopeType.METADATA, EnvelopeType.SIMPLE] + if self.config.envelope_type not in supported_envelopes: + raise ValueError( + f"config: envelope_type must be one of {supported_envelopes}, " + f"got '{self.config.envelope_type}'" + ) self._trusted_set = trusted_metadata_set.TrustedMetadataSet( - data, unwrapper + data, self.config.envelope_type ) def refresh(self) -> None: