mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
ngclient: change envelope type config to flag
The flag allows adding other envelope types in the future (unlikely), or parallel support (`METADATA & SIMPLE`) without breaking the API. Internally, the flag is now just passed on to TrustedMetadataSet as mandatory parameter. (Optional parameters make less sense when we control all the invocations.) This change requires updating all invocations of TrustedMetadataSet, including the duplication of a test function. Signed-off-by: Lukas Puehringer <lukas.puehringer@nyu.edu>
This commit is contained in:
parent
8544bbd6f1
commit
1897f9a652
5 changed files with 78 additions and 36 deletions
|
|
@ -15,6 +15,7 @@ from urllib import request
|
|||
|
||||
from tuf.api.exceptions import DownloadError, RepositoryError
|
||||
from tuf.ngclient import Updater, UpdaterConfig
|
||||
from tuf.ngclient.config import EnvelopeType
|
||||
|
||||
# constants
|
||||
DOWNLOAD_DIR = "./downloads"
|
||||
|
|
@ -74,7 +75,7 @@ def download(base_url: str, target: str, use_dsse: bool) -> bool:
|
|||
os.mkdir(DOWNLOAD_DIR)
|
||||
|
||||
config = UpdaterConfig()
|
||||
config.use_dsse = use_dsse
|
||||
config.envelope_type = EnvelopeType.SIMPLE
|
||||
|
||||
try:
|
||||
updater = Updater(
|
||||
|
|
|
|||
|
|
@ -26,6 +26,7 @@
|
|||
)
|
||||
from tuf.api.serialization.json import JSONSerializer
|
||||
from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet
|
||||
from tuf.ngclient.config import EnvelopeType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -94,7 +95,9 @@ def hashes_length_modifier(timestamp: Timestamp) -> None:
|
|||
)
|
||||
|
||||
def setUp(self) -> None:
|
||||
self.trusted_set = TrustedMetadataSet(self.metadata[Root.type])
|
||||
self.trusted_set = TrustedMetadataSet(
|
||||
self.metadata[Root.type], EnvelopeType.METADATA
|
||||
)
|
||||
|
||||
def _update_all_besides_targets(
|
||||
self,
|
||||
|
|
@ -193,22 +196,37 @@ def test_out_of_order_ops(self) -> None:
|
|||
self.metadata["role1"], "role1", Targets.type
|
||||
)
|
||||
|
||||
def test_root_with_invalid_json(self) -> None:
|
||||
# Test loading initial root and root update
|
||||
for test_func in [TrustedMetadataSet, self.trusted_set.update_root]:
|
||||
# root is not json
|
||||
with self.assertRaises(exceptions.RepositoryError):
|
||||
test_func(b"") # type: ignore[operator]
|
||||
def test_bad_initial_root(self) -> None:
|
||||
# root is not json
|
||||
with self.assertRaises(exceptions.RepositoryError):
|
||||
TrustedMetadataSet(b"", EnvelopeType.METADATA)
|
||||
|
||||
# root is invalid
|
||||
root = Metadata.from_bytes(self.metadata[Root.type])
|
||||
root.signed.version += 1
|
||||
with self.assertRaises(exceptions.UnsignedMetadataError):
|
||||
test_func(root.to_bytes()) # type: ignore[operator]
|
||||
# root is invalid
|
||||
root = Metadata.from_bytes(self.metadata[Root.type])
|
||||
root.signed.version += 1
|
||||
with self.assertRaises(exceptions.UnsignedMetadataError):
|
||||
TrustedMetadataSet(root.to_bytes(), EnvelopeType.METADATA)
|
||||
|
||||
# metadata is of wrong type
|
||||
with self.assertRaises(exceptions.RepositoryError):
|
||||
test_func(self.metadata[Snapshot.type]) # type: ignore[operator]
|
||||
# metadata is of wrong type
|
||||
with self.assertRaises(exceptions.RepositoryError):
|
||||
TrustedMetadataSet(
|
||||
self.metadata[Snapshot.type], EnvelopeType.METADATA
|
||||
)
|
||||
|
||||
def test_bad_root_update(self) -> None:
|
||||
# root is not json
|
||||
with self.assertRaises(exceptions.RepositoryError):
|
||||
self.trusted_set.update_root(b"")
|
||||
|
||||
# root is invalid
|
||||
root = Metadata.from_bytes(self.metadata[Root.type])
|
||||
root.signed.version += 1
|
||||
with self.assertRaises(exceptions.UnsignedMetadataError):
|
||||
self.trusted_set.update_root(root.to_bytes())
|
||||
|
||||
# metadata is of wrong type
|
||||
with self.assertRaises(exceptions.RepositoryError):
|
||||
self.trusted_set.update_root(self.metadata[Snapshot.type])
|
||||
|
||||
def test_top_level_md_with_invalid_json(self) -> None:
|
||||
top_level_md: List[Tuple[bytes, Callable[[bytes], Signed]]] = [
|
||||
|
|
@ -261,7 +279,7 @@ def root_expired_modifier(root: Root) -> None:
|
|||
|
||||
# intermediate root can be expired
|
||||
root = self.modify_metadata(Root.type, root_expired_modifier)
|
||||
tmp_trusted_set = TrustedMetadataSet(root)
|
||||
tmp_trusted_set = TrustedMetadataSet(root, EnvelopeType.METADATA)
|
||||
# update timestamp to trigger final root expiry check
|
||||
with self.assertRaises(exceptions.ExpiredMetadataError):
|
||||
tmp_trusted_set.update_timestamp(self.metadata[Timestamp.type])
|
||||
|
|
|
|||
|
|
@ -34,7 +34,7 @@
|
|||
|
||||
>>> # Load local root (RepositoryErrors here stop the update)
|
||||
>>> with open(root_path, "rb") as f:
|
||||
>>> trusted_set = TrustedMetadataSet(f.read())
|
||||
>>> trusted_set = TrustedMetadataSet(f.read(), EnvelopeType.METADATA)
|
||||
>>>
|
||||
>>> # update root from remote until no more are available
|
||||
>>> with download(Root.type, trusted_set.root.version + 1) as f:
|
||||
|
|
@ -68,7 +68,12 @@
|
|||
|
||||
from tuf.api import exceptions
|
||||
from tuf.api.metadata import Root, Signed, Snapshot, Targets, Timestamp
|
||||
from tuf.ngclient._internal.wrapping import MetadataUnwrapper, Unwrapper
|
||||
from tuf.ngclient._internal.wrapping import (
|
||||
EnvelopeUnwrapper,
|
||||
MetadataUnwrapper,
|
||||
Unwrapper,
|
||||
)
|
||||
from tuf.ngclient.config import EnvelopeType
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
|
@ -82,22 +87,26 @@ class TrustedMetadataSet(abc.Mapping):
|
|||
what is updated.
|
||||
"""
|
||||
|
||||
def __init__(self, root_data: bytes, unwrapper: Optional[Unwrapper] = None):
|
||||
def __init__(self, root_data: bytes, envelope_type: EnvelopeType):
|
||||
"""Initialize ``TrustedMetadataSet`` by loading trusted root metadata.
|
||||
|
||||
Args:
|
||||
root_data: Trusted root metadata as bytes. Note that this metadata
|
||||
will only be verified by itself: it is the source of trust for
|
||||
all metadata in the ``TrustedMetadataSet``
|
||||
unwrapper: Used to unwrap and verify metadata. Default is
|
||||
MetadataUnwrapper.
|
||||
envelope_type: Configures deserialization and verification mode of
|
||||
TUF metadata.
|
||||
|
||||
Raises:
|
||||
RepositoryError: Metadata failed to load or verify. The actual
|
||||
error type and content will contain more details.
|
||||
"""
|
||||
if unwrapper is None:
|
||||
unwrapper: Unwrapper
|
||||
if envelope_type is EnvelopeType.SIMPLE:
|
||||
unwrapper = EnvelopeUnwrapper()
|
||||
else:
|
||||
unwrapper = MetadataUnwrapper()
|
||||
|
||||
self._unwrapper = unwrapper
|
||||
|
||||
self._trusted_set: Dict[str, Signed] = {}
|
||||
|
|
|
|||
|
|
@ -5,6 +5,20 @@
|
|||
"""
|
||||
|
||||
from dataclasses import dataclass
|
||||
from enum import Flag, unique
|
||||
|
||||
|
||||
@unique
|
||||
class EnvelopeType(Flag):
|
||||
"""Configures deserialization and verification mode of TUF metadata.
|
||||
|
||||
Args:
|
||||
METADATA: Traditional canonical JSON -based TUF Metadata.
|
||||
SIMPLE: Dead Simple Signing Envelope. (experimental)
|
||||
"""
|
||||
|
||||
METADATA = 1
|
||||
SIMPLE = 2
|
||||
|
||||
|
||||
@dataclass
|
||||
|
|
@ -23,8 +37,9 @@ class UpdaterConfig:
|
|||
are used, target download URLs are formed by prefixing the filename
|
||||
with a hash digest of file content by default. This can be
|
||||
overridden by setting ``prefix_targets_with_hash`` to ``False``.
|
||||
use_dsse: If true, expect metadata in a DSSE Envelope. Use
|
||||
traditional Metadata (canonical json) otherwise.
|
||||
envelope_type: Configures deserialization and verification mode of TUF
|
||||
metadata. Per default, it is treated as traditional canonical JSON
|
||||
-based TUF Metadata.
|
||||
"""
|
||||
|
||||
max_root_rotations: int = 32
|
||||
|
|
@ -34,4 +49,4 @@ class UpdaterConfig:
|
|||
snapshot_max_length: int = 2000000 # bytes
|
||||
targets_max_length: int = 5000000 # bytes
|
||||
prefix_targets_with_hash: bool = True
|
||||
use_dsse: bool = False
|
||||
envelope_type: EnvelopeType = EnvelopeType.METADATA
|
||||
|
|
|
|||
|
|
@ -46,12 +46,8 @@
|
|||
|
||||
from tuf.api import exceptions
|
||||
from tuf.api.metadata import Root, Snapshot, TargetFile, Targets, Timestamp
|
||||
from tuf.ngclient._internal import (
|
||||
requests_fetcher,
|
||||
trusted_metadata_set,
|
||||
wrapping,
|
||||
)
|
||||
from tuf.ngclient.config import UpdaterConfig
|
||||
from tuf.ngclient._internal import requests_fetcher, trusted_metadata_set
|
||||
from tuf.ngclient.config import EnvelopeType, UpdaterConfig
|
||||
from tuf.ngclient.fetcher import FetcherInterface
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
|
@ -101,12 +97,15 @@ def __init__(
|
|||
self._fetcher = fetcher or requests_fetcher.RequestsFetcher()
|
||||
self.config = config or UpdaterConfig()
|
||||
|
||||
unwrapper: Optional[wrapping.Unwrapper] = None
|
||||
if self.config.use_dsse:
|
||||
unwrapper = wrapping.EnvelopeUnwrapper()
|
||||
supported_envelopes = [EnvelopeType.METADATA, EnvelopeType.SIMPLE]
|
||||
if self.config.envelope_type not in supported_envelopes:
|
||||
raise ValueError(
|
||||
f"config: envelope_type must be one of {supported_envelopes}, "
|
||||
f"got '{self.config.envelope_type}'"
|
||||
)
|
||||
|
||||
self._trusted_set = trusted_metadata_set.TrustedMetadataSet(
|
||||
data, unwrapper
|
||||
data, self.config.envelope_type
|
||||
)
|
||||
|
||||
def refresh(self) -> None:
|
||||
|
|
|
|||
Loading…
Reference in a new issue