mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
New Updater: use the MetadataBundle
Use the MetadataBundle to verify metadata validity. * Updater now handles reading metadata files (from filesystem as well as network * Updater feeds bytes to MetadataBundle for verification * Updater persists data on disk after it had been verified Signed-off-by: Jussi Kukkonen <jkukkonen@vmware.com>
This commit is contained in:
parent
8bb704b166
commit
1d45c2aa2c
3 changed files with 99 additions and 521 deletions
|
|
@ -92,7 +92,7 @@ def setUp(self):
|
|||
# for each test case.
|
||||
original_repository = os.path.join(original_repository_files, 'repository')
|
||||
original_keystore = os.path.join(original_repository_files, 'keystore')
|
||||
original_client = os.path.join(original_repository_files, 'client')
|
||||
original_client = os.path.join(original_repository_files, 'client', 'test_repository1', 'metadata', 'current')
|
||||
|
||||
# Save references to the often-needed client repository directories.
|
||||
# Test cases need these references to access metadata and target files.
|
||||
|
|
@ -101,12 +101,7 @@ def setUp(self):
|
|||
self.keystore_directory = \
|
||||
os.path.join(temporary_repository_root, 'keystore')
|
||||
|
||||
self.client_directory = os.path.join(temporary_repository_root,
|
||||
'client')
|
||||
self.client_metadata = os.path.join(self.client_directory,
|
||||
self.repository_name, 'metadata')
|
||||
self.client_metadata_current = os.path.join(self.client_metadata,
|
||||
'current')
|
||||
self.client_directory = os.path.join(temporary_repository_root, 'client')
|
||||
|
||||
# Copy the original 'repository', 'client', and 'keystore' directories
|
||||
# to the temporary repository the test cases can use.
|
||||
|
|
@ -119,15 +114,11 @@ def setUp(self):
|
|||
url_prefix = 'http://' + utils.TEST_HOST_ADDRESS + ':' \
|
||||
+ str(self.server_process_handler.port) + repository_basepath
|
||||
|
||||
# Setting 'tuf.settings.repository_directory' with the temporary client
|
||||
# directory copied from the original repository files.
|
||||
tuf.settings.repositories_directory = self.client_directory
|
||||
|
||||
metadata_url = f"{url_prefix}/metadata/"
|
||||
targets_url = f"{url_prefix}/targets/"
|
||||
# Creating a repository instance. The test cases will use this client
|
||||
# updater to refresh metadata, fetch target files, etc.
|
||||
self.repository_updater = updater.Updater(self.repository_name,
|
||||
self.repository_updater = updater.Updater(self.client_directory,
|
||||
metadata_url,
|
||||
targets_url)
|
||||
|
||||
|
|
@ -154,7 +145,7 @@ def test_refresh(self):
|
|||
|
||||
for role in ['root', 'timestamp', 'snapshot', 'targets']:
|
||||
metadata_obj = metadata.Metadata.from_file(os.path.join(
|
||||
self.client_metadata_current, role + '.json'))
|
||||
self.client_directory, role + '.json'))
|
||||
|
||||
metadata_obj_2 = metadata.Metadata.from_file(os.path.join(
|
||||
self.repository_directory, 'metadata', role + '.json'))
|
||||
|
|
|
|||
|
|
@ -1,183 +0,0 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
# Copyright 2021, New York University and the TUF contributors
|
||||
# SPDX-License-Identifier: MIT OR Apache-2.0
|
||||
|
||||
"""Metadata wrapper
|
||||
"""
|
||||
import time
|
||||
|
||||
from securesystemslib.keys import format_metadata_to_key
|
||||
|
||||
from tuf import exceptions, formats
|
||||
from tuf.api import metadata
|
||||
|
||||
|
||||
class MetadataWrapper:
|
||||
"""Helper classes extending or adding missing
|
||||
functionality to metadata API
|
||||
"""
|
||||
|
||||
def __init__(self, meta):
|
||||
self._meta = meta
|
||||
|
||||
@classmethod
|
||||
def from_json_object(cls, raw_data):
|
||||
"""Loads JSON-formatted TUF metadata from a file object."""
|
||||
# Use local scope import to avoid circular import errors
|
||||
# pylint: disable=import-outside-toplevel
|
||||
from tuf.api.serialization.json import JSONDeserializer
|
||||
|
||||
deserializer = JSONDeserializer()
|
||||
meta = deserializer.deserialize(raw_data)
|
||||
return cls(meta=meta)
|
||||
|
||||
@classmethod
|
||||
def from_json_file(cls, filename):
|
||||
"""Loads JSON-formatted TUF metadata from a file."""
|
||||
meta = metadata.Metadata.from_file(filename)
|
||||
return cls(meta=meta)
|
||||
|
||||
@property
|
||||
def signed(self):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
return self._meta.signed
|
||||
|
||||
@property
|
||||
def version(self):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
return self._meta.signed.version
|
||||
|
||||
def verify(self, keys, threshold):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
verified = 0
|
||||
# 1.3. Check signatures
|
||||
for key in keys:
|
||||
self._meta.verify(key)
|
||||
verified += 1
|
||||
|
||||
if verified < threshold:
|
||||
raise exceptions.InsufficientKeysError
|
||||
|
||||
def persist(self, filename):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
self._meta.to_file(filename)
|
||||
|
||||
def expires(self, reference_time=None):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
if reference_time is None:
|
||||
expires_timestamp = formats.datetime_to_unix_timestamp(
|
||||
self._meta.signed.expires
|
||||
)
|
||||
reference_time = int(time.time())
|
||||
|
||||
if expires_timestamp < reference_time:
|
||||
raise exceptions.ExpiredMetadataError
|
||||
|
||||
|
||||
class RootWrapper(MetadataWrapper):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
|
||||
def keys(self, role):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
keys = []
|
||||
for keyid in self._meta.signed.roles[role].keyids:
|
||||
key_metadata = self._meta.signed.keys[keyid].to_dict()
|
||||
key, dummy = format_metadata_to_key(key_metadata)
|
||||
keys.append(key)
|
||||
|
||||
return keys
|
||||
|
||||
def threshold(self, role):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
return self._meta.signed.roles[role].threshold
|
||||
|
||||
|
||||
class TimestampWrapper(MetadataWrapper):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
|
||||
@property
|
||||
def snapshot(self):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
return self._meta.signed.meta["snapshot.json"]
|
||||
|
||||
|
||||
class SnapshotWrapper(MetadataWrapper):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
|
||||
def role(self, name):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
return self._meta.signed.meta[name + ".json"]
|
||||
|
||||
|
||||
class TargetsWrapper(MetadataWrapper):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
|
||||
@property
|
||||
def targets(self):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
return self._meta.signed.targets
|
||||
|
||||
@property
|
||||
def delegations(self):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
return self._meta.signed.delegations
|
||||
|
||||
def keys(self, role):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
keys = []
|
||||
if self._meta.signed.delegations is not None:
|
||||
for delegation in self._meta.signed.delegations.roles:
|
||||
if delegation.name == role:
|
||||
for keyid in delegation.keyids:
|
||||
key_metadata = self._meta.signed.delegations.keys[keyid]
|
||||
key, dummy = format_metadata_to_key(
|
||||
key_metadata.to_dict()
|
||||
)
|
||||
keys.append(key)
|
||||
return keys
|
||||
|
||||
return keys
|
||||
|
||||
def threshold(self, role):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
if self._meta.signed.delegations is not None:
|
||||
for delegation in self._meta.signed.delegations.roles:
|
||||
if delegation.name == role:
|
||||
return delegation.threshold
|
||||
|
||||
return None
|
||||
|
|
@ -17,18 +17,18 @@
|
|||
from securesystemslib import hash as sslib_hash
|
||||
from securesystemslib import util as sslib_util
|
||||
|
||||
from tuf import exceptions, settings
|
||||
from tuf import exceptions
|
||||
from tuf.client.fetcher import FetcherInterface
|
||||
from tuf.client_rework import download, requests_fetcher
|
||||
|
||||
from .metadata_wrapper import (
|
||||
RootWrapper,
|
||||
SnapshotWrapper,
|
||||
TargetsWrapper,
|
||||
TimestampWrapper,
|
||||
)
|
||||
from tuf.client_rework import download, metadata_bundle, requests_fetcher
|
||||
|
||||
# Globals
|
||||
MAX_ROOT_ROTATIONS = 32
|
||||
MAX_DELEGATIONS = 32
|
||||
DEFAULT_ROOT_MAX_LENGTH = 512000 # bytes
|
||||
DEFAULT_TIMESTAMP_MAX_LENGTH = 16384 # bytes
|
||||
DEFAULT_SNAPSHOT_MAX_LENGTH = 2000000 # bytes
|
||||
DEFAULT_TARGETS_MAX_LENGTH = 5000000 # bytes
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
# Classes
|
||||
|
|
@ -41,29 +41,30 @@ class Updater:
|
|||
|
||||
def __init__(
|
||||
self,
|
||||
repository_name: str,
|
||||
repository_dir: str,
|
||||
metadata_base_url: str,
|
||||
target_base_url: Optional[str] = None,
|
||||
fetcher: Optional[FetcherInterface] = None,
|
||||
):
|
||||
"""
|
||||
Args:
|
||||
repository_name: directory name (within a local directory
|
||||
defined by 'tuf.settings.repositories_directory')
|
||||
repository_dir: Local metadata directory. Must contain root.json
|
||||
metadata_base_url: Base URL for all remote metadata downloads
|
||||
target_base_url: Optional; Default base URL for all remote target
|
||||
downloads. Can be individually set in download_target()
|
||||
fetcher: Optional; FetcherInterface implementation used to download
|
||||
both metadata and targets. Default is RequestsFetcher
|
||||
"""
|
||||
self._repository_name = repository_name
|
||||
self._dir = repository_dir
|
||||
self._metadata_base_url = _ensure_trailing_slash(metadata_base_url)
|
||||
if target_base_url is None:
|
||||
self._target_base_url = None
|
||||
else:
|
||||
self._target_base_url = _ensure_trailing_slash(target_base_url)
|
||||
self._consistent_snapshot = False
|
||||
self._metadata = {}
|
||||
|
||||
# Read trusted local root metadata
|
||||
data = self._load_local_metadata("root")
|
||||
self._bundle = metadata_bundle.MetadataBundle(data)
|
||||
|
||||
if fetcher is None:
|
||||
self._fetcher = requests_fetcher.RequestsFetcher()
|
||||
|
|
@ -195,318 +196,111 @@ def download_target(
|
|||
)
|
||||
sslib_util.persist_temp_file(target_file, filepath)
|
||||
|
||||
def _get_full_meta_name(
|
||||
self, role: str, extension: str = ".json", version: int = None
|
||||
) -> str:
|
||||
"""
|
||||
Helper method returning full metadata file path given the role name
|
||||
and file extension.
|
||||
"""
|
||||
def _download_metadata(
|
||||
self, rolename: str, length: int, version: Optional[int] = None
|
||||
) -> bytes:
|
||||
"""download a metadata file and return it as bytes"""
|
||||
if version is None:
|
||||
filename = role + extension
|
||||
filename = f"{rolename}.json"
|
||||
else:
|
||||
filename = str(version) + "." + role + extension
|
||||
return os.path.join(
|
||||
settings.repositories_directory,
|
||||
self._repository_name,
|
||||
"metadata",
|
||||
"current",
|
||||
filename,
|
||||
filename = f"{version}.{rolename}.json"
|
||||
url = parse.urljoin(self._metadata_base_url, filename)
|
||||
return download.download_bytes(
|
||||
url,
|
||||
length,
|
||||
self._fetcher,
|
||||
strict_required_length=False,
|
||||
)
|
||||
|
||||
def _load_local_metadata(self, rolename: str) -> bytes:
|
||||
with open(os.path.join(self._dir, f"{rolename}.json"), "rb") as f:
|
||||
return f.read()
|
||||
|
||||
def _persist_metadata(self, rolename: str, data: bytes):
|
||||
with open(os.path.join(self._dir, f"{rolename}.json"), "wb") as f:
|
||||
f.write(data)
|
||||
|
||||
def _load_root(self) -> None:
|
||||
"""
|
||||
If metadata file for 'root' role does not exist locally, download it
|
||||
over a network, verify it and store it permanently.
|
||||
"""
|
||||
"""Load remote root metadata.
|
||||
|
||||
# Load trusted root metadata
|
||||
# TODO: this should happen much earlier, on Updater.__init__
|
||||
self._metadata["root"] = RootWrapper.from_json_file(
|
||||
self._get_full_meta_name("root")
|
||||
)
|
||||
Sequentially load and persist on local disk every newer root metadata
|
||||
version available on the remote.
|
||||
"""
|
||||
|
||||
# Update the root role
|
||||
# 1.1. Let N denote the version number of the trusted
|
||||
# root metadata file.
|
||||
lower_bound = self._metadata["root"].version
|
||||
upper_bound = lower_bound + settings.MAX_NUMBER_ROOT_ROTATIONS
|
||||
intermediate_root = None
|
||||
lower_bound = self._bundle.root.signed.version + 1
|
||||
upper_bound = lower_bound + MAX_ROOT_ROTATIONS
|
||||
|
||||
for next_version in range(lower_bound, upper_bound):
|
||||
try:
|
||||
root_url = parse.urljoin(
|
||||
self._metadata_base_url, f"{next_version}.root.json"
|
||||
data = self._download_metadata(
|
||||
"root", DEFAULT_ROOT_MAX_LENGTH, next_version
|
||||
)
|
||||
# For each version of root iterate over the list of mirrors
|
||||
# until an intermediate root is successfully downloaded and
|
||||
# verified.
|
||||
data = download.download_bytes(
|
||||
root_url,
|
||||
settings.DEFAULT_ROOT_REQUIRED_LENGTH,
|
||||
self._fetcher,
|
||||
strict_required_length=False,
|
||||
)
|
||||
|
||||
intermediate_root = self._verify_root(data)
|
||||
# TODO: persist should happen here for each intermediate
|
||||
# root according to the spec
|
||||
self._bundle.update_root(data)
|
||||
self._persist_metadata("root", data)
|
||||
|
||||
except exceptions.FetcherHTTPError as exception:
|
||||
if exception.status_code not in {403, 404}:
|
||||
raise
|
||||
# Stop looking for a bigger version if "File not found"
|
||||
# error is received
|
||||
# 404/403 means current root is newest available
|
||||
break
|
||||
|
||||
if intermediate_root:
|
||||
# Check for a freeze attack. The latest known time MUST be lower
|
||||
# than the expiration timestamp in the trusted root metadata file
|
||||
# TODO define which exceptions are part of the public API
|
||||
intermediate_root.expires()
|
||||
|
||||
# 1.9. If the timestamp and / or snapshot keys have been rotated,
|
||||
# then delete the trusted timestamp and snapshot metadata files.
|
||||
if self._metadata["root"].keys(
|
||||
"timestamp"
|
||||
) != intermediate_root.keys("timestamp"):
|
||||
# FIXME: use abstract storage
|
||||
os.remove(self._get_full_meta_name("timestamp"))
|
||||
self._metadata["timestamp"] = {}
|
||||
|
||||
if self._metadata["root"].keys(
|
||||
"snapshot"
|
||||
) != intermediate_root.keys("snapshot"):
|
||||
# FIXME: use abstract storage
|
||||
os.remove(self._get_full_meta_name("snapshot"))
|
||||
self._metadata["snapshot"] = {}
|
||||
|
||||
# Set the trusted root metadata file to the new root
|
||||
# metadata file
|
||||
self._metadata["root"] = intermediate_root
|
||||
# Persist root metadata. The client MUST write the file to
|
||||
# non-volatile storage as FILENAME.EXT (e.g. root.json).
|
||||
self._metadata["root"].persist(self._get_full_meta_name("root"))
|
||||
|
||||
# 1.10. Set whether consistent snapshots are used as per
|
||||
# the trusted root metadata file
|
||||
self._consistent_snapshot = self._metadata[
|
||||
"root"
|
||||
].signed.consistent_snapshot
|
||||
# Verify final root
|
||||
self._bundle.root_update_finished()
|
||||
|
||||
def _load_timestamp(self) -> None:
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
# TODO Check if timestamp exists locally
|
||||
timestamp_url = parse.urljoin(self._metadata_base_url, "timestamp.json")
|
||||
data = download.download_bytes(
|
||||
timestamp_url,
|
||||
settings.DEFAULT_TIMESTAMP_REQUIRED_LENGTH,
|
||||
self._fetcher,
|
||||
strict_required_length=False,
|
||||
)
|
||||
self._metadata["timestamp"] = self._verify_timestamp(data)
|
||||
self._metadata["timestamp"].persist(
|
||||
self._get_full_meta_name("timestamp.json")
|
||||
"""Load local and remote timestamp metadata"""
|
||||
try:
|
||||
data = self._load_local_metadata("timestamp")
|
||||
self._bundle.update_timestamp(data)
|
||||
except (OSError, exceptions.RepositoryError) as e:
|
||||
# Local load can fail: it's not fatal
|
||||
logger.debug("Failed to load local timestamp %s", e)
|
||||
|
||||
# Load from remote (whether local load succeeded or not)
|
||||
data = self._download_metadata(
|
||||
"timestamp", DEFAULT_TIMESTAMP_MAX_LENGTH
|
||||
)
|
||||
self._bundle.update_timestamp(data)
|
||||
self._persist_metadata("timestamp", data)
|
||||
|
||||
def _load_snapshot(self) -> None:
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
"""Load local (and if needed remote) snapshot metadata"""
|
||||
try:
|
||||
length = self._metadata["timestamp"].snapshot["length"]
|
||||
except KeyError:
|
||||
length = settings.DEFAULT_SNAPSHOT_REQUIRED_LENGTH
|
||||
data = self._load_local_metadata("snapshot")
|
||||
self._bundle.update_snapshot(data)
|
||||
except (OSError, exceptions.RepositoryError) as e:
|
||||
# Local load failed: we must update from remote
|
||||
logger.debug("Failed to load local snapshot %s", e)
|
||||
|
||||
# Uncomment when implementing consistent_snapshot
|
||||
# if self._consistent_snapshot:
|
||||
# version = self._metadata["timestamp"].snapshot["version"]
|
||||
# else:
|
||||
# version = None
|
||||
metainfo = self._bundle.timestamp.signed.meta["snapshot.json"]
|
||||
length = metainfo.get("length") or DEFAULT_SNAPSHOT_MAX_LENGTH
|
||||
version = None
|
||||
if self._bundle.root.signed.consistent_snapshot:
|
||||
version = metainfo["version"]
|
||||
|
||||
# TODO: Check if exists locally
|
||||
snapshot_url = parse.urljoin(self._metadata_base_url, "snapshot.json")
|
||||
data = download.download_bytes(
|
||||
snapshot_url,
|
||||
length,
|
||||
self._fetcher,
|
||||
strict_required_length=False,
|
||||
)
|
||||
data = self._download_metadata("snapshot", length, version)
|
||||
self._bundle.update_snapshot(data)
|
||||
self._persist_metadata("snapshot", data)
|
||||
|
||||
self._metadata["snapshot"] = self._verify_snapshot(data)
|
||||
self._metadata["snapshot"].persist(
|
||||
self._get_full_meta_name("snapshot.json")
|
||||
)
|
||||
|
||||
def _load_targets(self, targets_role: str, parent_role: str) -> None:
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
def _load_targets(self, role: str, parent_role: str) -> None:
|
||||
"""Load local (and if needed remote) metadata for 'role'."""
|
||||
try:
|
||||
length = self._metadata["snapshot"].role(targets_role)["length"]
|
||||
except KeyError:
|
||||
length = settings.DEFAULT_TARGETS_REQUIRED_LENGTH
|
||||
data = self._load_local_metadata(role)
|
||||
self._bundle.update_delegated_targets(data, role, parent_role)
|
||||
except (OSError, exceptions.RepositoryError) as e:
|
||||
# Local load failed: we must update from remote
|
||||
logger.debug("Failed to load local %s: %s", role, e)
|
||||
|
||||
# Uncomment when implementing consistent_snapshot
|
||||
# if self._consistent_snapshot:
|
||||
# version = self._metadata["snapshot"].role(targets_role)["version"]
|
||||
# else:
|
||||
# version = None
|
||||
metainfo = self._bundle.snapshot.signed.meta[f"{role}.json"]
|
||||
length = metainfo.get("length") or DEFAULT_TARGETS_MAX_LENGTH
|
||||
version = None
|
||||
if self._bundle.root.signed.consistent_snapshot:
|
||||
version = metainfo["version"]
|
||||
|
||||
# TODO: Check if exists locally
|
||||
|
||||
targets_url = parse.urljoin(
|
||||
self._metadata_base_url, f"{targets_role}.json"
|
||||
)
|
||||
data = download.download_bytes(
|
||||
targets_url,
|
||||
length,
|
||||
self._fetcher,
|
||||
strict_required_length=False,
|
||||
)
|
||||
|
||||
self._metadata[targets_role] = self._verify_targets(
|
||||
data, targets_role, parent_role
|
||||
)
|
||||
self._metadata[targets_role].persist(
|
||||
self._get_full_meta_name(targets_role, extension=".json")
|
||||
)
|
||||
|
||||
def _verify_root(self, file_content: bytes) -> RootWrapper:
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
|
||||
intermediate_root = RootWrapper.from_json_object(file_content)
|
||||
|
||||
# Check for an arbitrary software attack
|
||||
trusted_root = self._metadata["root"]
|
||||
intermediate_root.verify(
|
||||
trusted_root.keys("root"), trusted_root.threshold("root")
|
||||
)
|
||||
intermediate_root.verify(
|
||||
intermediate_root.keys("root"), intermediate_root.threshold("root")
|
||||
)
|
||||
|
||||
# Check for a rollback attack.
|
||||
if intermediate_root.version < trusted_root.version:
|
||||
raise exceptions.ReplayedMetadataError(
|
||||
"root", intermediate_root.version(), trusted_root.version()
|
||||
)
|
||||
# Note that the expiration of the new (intermediate) root metadata
|
||||
# file does not matter yet, because we will check for it in step 1.8.
|
||||
|
||||
return intermediate_root
|
||||
|
||||
def _verify_timestamp(self, file_content: bytes) -> TimestampWrapper:
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
intermediate_timestamp = TimestampWrapper.from_json_object(file_content)
|
||||
|
||||
# Check for an arbitrary software attack
|
||||
trusted_root = self._metadata["root"]
|
||||
intermediate_timestamp.verify(
|
||||
trusted_root.keys("timestamp"), trusted_root.threshold("timestamp")
|
||||
)
|
||||
|
||||
# Check for a rollback attack.
|
||||
if self._metadata.get("timestamp"):
|
||||
if (
|
||||
intermediate_timestamp.signed.version
|
||||
<= self._metadata["timestamp"].version
|
||||
):
|
||||
raise exceptions.ReplayedMetadataError(
|
||||
"root",
|
||||
intermediate_timestamp.version(),
|
||||
self._metadata["timestamp"].version(),
|
||||
)
|
||||
|
||||
if self._metadata.get("snapshot"):
|
||||
if (
|
||||
intermediate_timestamp.snapshot.version
|
||||
<= self._metadata["timestamp"].snapshot["version"]
|
||||
):
|
||||
raise exceptions.ReplayedMetadataError(
|
||||
"root",
|
||||
intermediate_timestamp.snapshot.version(),
|
||||
self._metadata["snapshot"].version(),
|
||||
)
|
||||
|
||||
intermediate_timestamp.expires()
|
||||
|
||||
return intermediate_timestamp
|
||||
|
||||
def _verify_snapshot(self, file_content: bytes) -> SnapshotWrapper:
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
|
||||
# Check against timestamp metadata
|
||||
if self._metadata["timestamp"].snapshot.get("hash"):
|
||||
_check_hashes(
|
||||
file_content, self._metadata["timestamp"].snapshot.get("hash")
|
||||
)
|
||||
|
||||
intermediate_snapshot = SnapshotWrapper.from_json_object(file_content)
|
||||
|
||||
if (
|
||||
intermediate_snapshot.version
|
||||
!= self._metadata["timestamp"].snapshot["version"]
|
||||
):
|
||||
raise exceptions.BadVersionNumberError
|
||||
|
||||
# Check for an arbitrary software attack
|
||||
trusted_root = self._metadata["root"]
|
||||
intermediate_snapshot.verify(
|
||||
trusted_root.keys("snapshot"), trusted_root.threshold("snapshot")
|
||||
)
|
||||
|
||||
# Check for a rollback attack
|
||||
if self._metadata.get("snapshot"):
|
||||
for target_role in intermediate_snapshot.signed.meta:
|
||||
if (
|
||||
target_role["version"]
|
||||
!= self._metadata["snapshot"].meta[target_role]["version"]
|
||||
):
|
||||
raise exceptions.BadVersionNumberError
|
||||
|
||||
intermediate_snapshot.expires()
|
||||
|
||||
return intermediate_snapshot
|
||||
|
||||
def _verify_targets(
|
||||
self, file_content: bytes, filename: str, parent_role: str
|
||||
) -> TargetsWrapper:
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
|
||||
# Check against timestamp metadata
|
||||
if self._metadata["snapshot"].role(filename).get("hash"):
|
||||
_check_hashes(
|
||||
file_content, self._metadata["snapshot"].targets.get("hash")
|
||||
)
|
||||
|
||||
intermediate_targets = TargetsWrapper.from_json_object(file_content)
|
||||
if (
|
||||
intermediate_targets.version
|
||||
!= self._metadata["snapshot"].role(filename)["version"]
|
||||
):
|
||||
raise exceptions.BadVersionNumberError
|
||||
|
||||
# Check for an arbitrary software attack
|
||||
parent_role = self._metadata[parent_role]
|
||||
|
||||
intermediate_targets.verify(
|
||||
parent_role.keys(filename), parent_role.threshold(filename)
|
||||
)
|
||||
|
||||
intermediate_targets.expires()
|
||||
|
||||
return intermediate_targets
|
||||
data = self._download_metadata(role, length, version)
|
||||
self._bundle.update_delegated_targets(data, role, parent_role)
|
||||
self._persist_metadata(role, data)
|
||||
|
||||
def _preorder_depth_first_walk(self, target_filepath) -> Dict:
|
||||
"""
|
||||
|
|
@ -516,7 +310,7 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict:
|
|||
target = None
|
||||
role_names = [("targets", "root")]
|
||||
visited_role_names = set()
|
||||
number_of_delegations = settings.MAX_NUMBER_OF_DELEGATIONS
|
||||
number_of_delegations = MAX_DELEGATIONS
|
||||
|
||||
# Ensure the client has the most up-to-date version of 'targets.json'.
|
||||
# Raise 'exceptions.NoWorkingMirrorError' if the changed metadata
|
||||
|
|
@ -542,14 +336,13 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict:
|
|||
|
||||
# The metadata for 'role_name' must be downloaded/updated before
|
||||
# its targets, delegations, and child roles can be inspected.
|
||||
# self._metadata['current'][role_name] is currently missing.
|
||||
# _refresh_targets_metadata() does not refresh 'targets.json', it
|
||||
# expects _update_metadata_if_changed() to have already refreshed
|
||||
# it, which this function has checked above.
|
||||
# self._refresh_targets_metadata(role_name,
|
||||
# refresh_all_delegated_roles=False)
|
||||
|
||||
role_metadata = self._metadata[role_name]
|
||||
role_metadata = self._bundle[role_name].signed
|
||||
target = role_metadata.targets.get(target_filepath)
|
||||
|
||||
# After preorder check, add current role to set of visited roles.
|
||||
|
|
@ -610,10 +403,8 @@ def _preorder_depth_first_walk(self, target_filepath) -> Dict:
|
|||
and len(role_names) > 0
|
||||
):
|
||||
msg = (
|
||||
f"{len(role_names)} roles left to visit, ",
|
||||
"but allowed to visit at most ",
|
||||
f"{settings.MAX_NUMBER_OF_DELEGATIONS}",
|
||||
" delegations.",
|
||||
f"{len(role_names)} roles left to visit, but allowed to ",
|
||||
f"visit at most {MAX_DELEGATIONS} delegations.",
|
||||
)
|
||||
logger.debug(msg)
|
||||
|
||||
|
|
@ -749,27 +540,6 @@ def _check_hashes_obj(file_object, trusted_hashes):
|
|||
)
|
||||
|
||||
|
||||
def _check_hashes(file_content, trusted_hashes):
|
||||
"""
|
||||
TODO
|
||||
"""
|
||||
# Verify each trusted hash of 'trusted_hashes'. If all are valid, simply
|
||||
# return.
|
||||
for algorithm, trusted_hash in trusted_hashes.items():
|
||||
digest_object = sslib_hash.digest(algorithm)
|
||||
|
||||
digest_object.update(file_content)
|
||||
computed_hash = digest_object.hexdigest()
|
||||
|
||||
# Raise an exception if any of the hashes are incorrect.
|
||||
if trusted_hash != computed_hash:
|
||||
raise exceptions.BadHashError(trusted_hash, computed_hash)
|
||||
|
||||
logger.info(
|
||||
"The file's " + algorithm + " hash is" " correct: " + trusted_hash
|
||||
)
|
||||
|
||||
|
||||
def _get_filepath_hash(target_filepath, hash_function="sha256"):
|
||||
"""
|
||||
TODO
|
||||
|
|
|
|||
Loading…
Reference in a new issue