mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Implement root bootstrapping
Application may have a "more secure" data store than the metadata cache is: Allow application to bootstrap the Updater with this more secure root. This means the Updater must also cache the subsequent root versions (and not just the last one). * Store versioned root metadata in local cache * maintain a non versioned symlink to last known good root * When loading root metadata, look in local cache too * Add a 'bootstrap' argument to Updater: this allows initializing the Updater with known good root metadata instead of trusting the root.json in cache Additional changes to current functionality: * when using bootstrap argument, the initial root is written to cache. This write happens every time Updater is initialized with bootstrap * The "root.json" symlink is recreated at the end of every refresh() Signed-off-by: Jussi Kukkonen <jkukkonen@google.com>
This commit is contained in:
parent
f35b237739
commit
cea1745cef
3 changed files with 78 additions and 25 deletions
|
|
@ -11,7 +11,8 @@ import sys
|
|||
import traceback
|
||||
from hashlib import sha256
|
||||
from pathlib import Path
|
||||
from urllib import request
|
||||
|
||||
import urllib3
|
||||
|
||||
from tuf.api.exceptions import DownloadError, RepositoryError
|
||||
from tuf.ngclient import Updater
|
||||
|
|
@ -30,18 +31,25 @@ def build_metadata_dir(base_url: str) -> str:
|
|||
def init_tofu(base_url: str) -> bool:
|
||||
"""Initialize local trusted metadata (Trust-On-First-Use) and create a
|
||||
directory for downloads"""
|
||||
|
||||
metadata_dir = build_metadata_dir(base_url)
|
||||
|
||||
if not os.path.isdir(metadata_dir):
|
||||
os.makedirs(metadata_dir)
|
||||
|
||||
root_url = f"{base_url}/metadata/1.root.json"
|
||||
try:
|
||||
request.urlretrieve(root_url, f"{metadata_dir}/root.json")
|
||||
except OSError:
|
||||
print(f"Failed to download initial root from {root_url}")
|
||||
response = urllib3.request("GET", f"{base_url}/metadata/1.root.json")
|
||||
if response.status != 200:
|
||||
print(f"Failed to download initial root {base_url}/metadata/1.root.json")
|
||||
return False
|
||||
|
||||
Updater(
|
||||
metadata_dir=metadata_dir,
|
||||
metadata_base_url=f"{base_url}/metadata/",
|
||||
target_base_url=f"{base_url}/targets/",
|
||||
target_dir=DOWNLOAD_DIR,
|
||||
bootstrap=response.data,
|
||||
)
|
||||
|
||||
print(f"Trust-on-First-Use: Initialized new root in {metadata_dir}")
|
||||
return True
|
||||
|
||||
|
|
|
|||
|
|
@ -5,13 +5,13 @@
|
|||
|
||||
from __future__ import annotations
|
||||
|
||||
from collections.abc import Iterable
|
||||
import logging
|
||||
import os
|
||||
import shutil
|
||||
import sys
|
||||
import tempfile
|
||||
import unittest
|
||||
from collections.abc import Iterable
|
||||
from typing import TYPE_CHECKING, Callable, ClassVar
|
||||
from unittest.mock import MagicMock, patch
|
||||
|
||||
|
|
|
|||
|
|
@ -49,9 +49,9 @@
|
|||
|
||||
from tuf.api import exceptions
|
||||
from tuf.api.metadata import Root, Snapshot, TargetFile, Targets, Timestamp
|
||||
from tuf.ngclient import urllib3_fetcher
|
||||
from tuf.ngclient._internal import trusted_metadata_set
|
||||
from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet
|
||||
from tuf.ngclient.config import EnvelopeType, UpdaterConfig
|
||||
from tuf.ngclient.urllib3_fetcher import Urllib3Fetcher
|
||||
|
||||
if TYPE_CHECKING:
|
||||
from tuf.ngclient.fetcher import FetcherInterface
|
||||
|
|
@ -75,6 +75,9 @@ class Updater:
|
|||
download both metadata and targets. Default is ``Urllib3Fetcher``
|
||||
config: ``Optional``; ``UpdaterConfig`` could be used to setup common
|
||||
configuration options.
|
||||
bootstrap: ``Optional``; initial root metadata. If a boostrap root is
|
||||
not provided then the root.json in the metadata cache is used as the
|
||||
initial root.
|
||||
|
||||
Raises:
|
||||
OSError: Local root.json cannot be read
|
||||
|
|
@ -89,6 +92,7 @@ def __init__(
|
|||
target_base_url: str | None = None,
|
||||
fetcher: FetcherInterface | None = None,
|
||||
config: UpdaterConfig | None = None,
|
||||
bootstrap: bytes | None = None,
|
||||
):
|
||||
self._dir = metadata_dir
|
||||
self._metadata_base_url = _ensure_trailing_slash(metadata_base_url)
|
||||
|
|
@ -99,14 +103,12 @@ def __init__(
|
|||
self._target_base_url = _ensure_trailing_slash(target_base_url)
|
||||
|
||||
self.config = config or UpdaterConfig()
|
||||
|
||||
if fetcher is not None:
|
||||
self._fetcher = fetcher
|
||||
else:
|
||||
self._fetcher = urllib3_fetcher.Urllib3Fetcher(
|
||||
self._fetcher = Urllib3Fetcher(
|
||||
app_user_agent=self.config.app_user_agent
|
||||
)
|
||||
|
||||
supported_envelopes = [EnvelopeType.METADATA, EnvelopeType.SIMPLE]
|
||||
if self.config.envelope_type not in supported_envelopes:
|
||||
raise ValueError(
|
||||
|
|
@ -114,12 +116,15 @@ def __init__(
|
|||
f"got '{self.config.envelope_type}'"
|
||||
)
|
||||
|
||||
# Read trusted local root metadata
|
||||
data = self._load_local_metadata(Root.type)
|
||||
if not bootstrap:
|
||||
# if no root was provided, use the cached non-versioned root.json
|
||||
bootstrap = self._load_local_metadata(Root.type)
|
||||
|
||||
self._trusted_set = trusted_metadata_set.TrustedMetadataSet(
|
||||
data, self.config.envelope_type
|
||||
# Load the initial root, make sure it's cached in root_history/
|
||||
self._trusted_set = TrustedMetadataSet(
|
||||
bootstrap, self.config.envelope_type
|
||||
)
|
||||
self._persist_root(self._trusted_set.root.version, bootstrap)
|
||||
|
||||
def refresh(self) -> None:
|
||||
"""Refresh top-level metadata.
|
||||
|
|
@ -296,12 +301,31 @@ def _load_local_metadata(self, rolename: str) -> bytes:
|
|||
return f.read()
|
||||
|
||||
def _persist_metadata(self, rolename: str, data: bytes) -> None:
|
||||
"""Write metadata to disk atomically to avoid data loss."""
|
||||
temp_file_name: str | None = None
|
||||
"""Write metadata to disk atomically to avoid data loss.
|
||||
|
||||
Use a filename _not_ prefixed with version (e.g. "timestamp.json")
|
||||
. Encode the rolename to avoid issues with e.g. path separators
|
||||
"""
|
||||
|
||||
encoded_name = parse.quote(rolename, "")
|
||||
filename = os.path.join(self._dir, f"{encoded_name}.json")
|
||||
self._persist_file(filename, data)
|
||||
|
||||
def _persist_root(self, version: int, data: bytes) -> None:
|
||||
"""Write root metadata to disk atomically to avoid data loss.
|
||||
|
||||
Use a filename prefixed with version (e.g. "1.root.json").
|
||||
"""
|
||||
rootdir = os.path.join(self._dir, "root_history")
|
||||
with contextlib.suppress(FileExistsError):
|
||||
os.mkdir(rootdir)
|
||||
self._persist_file(os.path.join(rootdir, f"{version}.root.json"), data)
|
||||
|
||||
def _persist_file(self, filename: str, data: bytes) -> None:
|
||||
"""Write a file to disk atomically to avoid data loss."""
|
||||
temp_file_name = None
|
||||
|
||||
try:
|
||||
# encode the rolename to avoid issues with e.g. path separators
|
||||
encoded_name = parse.quote(rolename, "")
|
||||
filename = os.path.join(self._dir, f"{encoded_name}.json")
|
||||
with tempfile.NamedTemporaryFile(
|
||||
dir=self._dir, delete=False
|
||||
) as temp_file:
|
||||
|
|
@ -317,10 +341,10 @@ def _persist_metadata(self, rolename: str, data: bytes) -> None:
|
|||
raise e
|
||||
|
||||
def _load_root(self) -> None:
|
||||
"""Load remote root metadata.
|
||||
"""Load root metadata.
|
||||
|
||||
Sequentially load and persist on local disk every newer root metadata
|
||||
version available on the remote.
|
||||
Sequentially load and persist every newer root metadata
|
||||
version available, either locally or on the remote.
|
||||
"""
|
||||
|
||||
# Update the root role
|
||||
|
|
@ -328,6 +352,19 @@ def _load_root(self) -> None:
|
|||
upper_bound = lower_bound + self.config.max_root_rotations
|
||||
|
||||
for next_version in range(lower_bound, upper_bound):
|
||||
# look for next_version in local cache
|
||||
try:
|
||||
root_path = os.path.join(
|
||||
self._dir, "root_history", f"{next_version}.root.json"
|
||||
)
|
||||
with open(root_path, "rb") as f:
|
||||
self._trusted_set.update_root(f.read())
|
||||
continue
|
||||
except (OSError, exceptions.RepositoryError) as e:
|
||||
# this root did not exist locally or is invalid
|
||||
logger.debug("Local root is not valid: %s", e)
|
||||
|
||||
# next_version was not found locally, try remote
|
||||
try:
|
||||
data = self._download_metadata(
|
||||
Root.type,
|
||||
|
|
@ -335,7 +372,7 @@ def _load_root(self) -> None:
|
|||
next_version,
|
||||
)
|
||||
self._trusted_set.update_root(data)
|
||||
self._persist_metadata(Root.type, data)
|
||||
self._persist_root(next_version, data)
|
||||
|
||||
except exceptions.DownloadHTTPError as exception:
|
||||
if exception.status_code not in {403, 404}:
|
||||
|
|
@ -343,6 +380,14 @@ def _load_root(self) -> None:
|
|||
# 404/403 means current root is newest available
|
||||
break
|
||||
|
||||
# Make sure there's a non-versioned root.json
|
||||
linkname = os.path.join(self._dir, "root.json")
|
||||
version = self._trusted_set.root.version
|
||||
current = os.path.join("root_history", f"{version}.root.json")
|
||||
with contextlib.suppress(FileNotFoundError):
|
||||
os.remove(linkname)
|
||||
os.symlink(current, linkname)
|
||||
|
||||
def _load_timestamp(self) -> None:
|
||||
"""Load local and remote timestamp metadata."""
|
||||
try:
|
||||
|
|
|
|||
Loading…
Reference in a new issue