MetadataBundle: Don't do any file IO

Remove file IO from MetadataBundle:
* This make the bundle API very clear and easy to understand
* This means caller must now read from and persist data to disk
  but initial prototypes suggest this won't make Updater too
  complex

This change is something we can still back out from if it turns out to
be the wrong decision: the file-persisting MetadataBundle has been tested
and works fine.

Signed-off-by: Jussi Kukkonen <jkukkonen@vmware.com>
This commit is contained in:
Jussi Kukkonen 2021-05-16 12:24:17 +03:00
parent 6b53ac78d0
commit f2cff951a6
2 changed files with 136 additions and 298 deletions

View file

@ -15,166 +15,108 @@
logger = logging.getLogger(__name__)
class TestMetadataBundle(unittest.TestCase):
@classmethod
def setUpClass(cls):
cls.temp_dir = tempfile.mkdtemp(dir=os.getcwd())
@classmethod
def tearDownClass(cls):
shutil.rmtree(cls.temp_dir)
def setUp(self):
# copy metadata to "local repo"
shutil.copytree(
os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata'),
self.temp_dir,
dirs_exist_ok=True
)
def test_local_load(self):
# test loading all local metadata succesfully
bundle = MetadataBundle(self.temp_dir)
bundle.root_update_finished()
bundle.load_local_timestamp()
bundle.load_local_snapshot()
bundle.load_local_targets()
bundle.load_local_delegated_targets('role1','targets')
bundle.load_local_delegated_targets('role2','role1')
# Make sure loading metadata without its "dependencies" fails
bundle = MetadataBundle(self.temp_dir)
with self.assertRaises(RuntimeError):
bundle.load_local_timestamp()
bundle.root_update_finished()
with self.assertRaises(RuntimeError):
bundle.load_local_snapshot()
bundle.load_local_timestamp()
with self.assertRaises(RuntimeError):
bundle.load_local_targets()
bundle.load_local_snapshot()
with self.assertRaises(RuntimeError):
bundle.load_local_delegated_targets('role1','targets')
bundle.load_local_targets()
with self.assertRaises(RuntimeError):
bundle.load_local_delegated_targets('role2','role1')
bundle.load_local_delegated_targets('role1','targets')
bundle.load_local_delegated_targets('role2','role1')
def test_update(self):
remote_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata')
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata')
# remove all but root.json from local repo
os.remove(os.path.join(self.temp_dir, "timestamp.json"))
os.remove(os.path.join(self.temp_dir, "snapshot.json"))
os.remove(os.path.join(self.temp_dir, "targets.json"))
os.remove(os.path.join(self.temp_dir, "role1.json"))
os.remove(os.path.join(self.temp_dir, "role2.json"))
bundle = MetadataBundle(self.temp_dir)
with open(os.path.join(repo_dir, "root.json"), "rb") as f:
bundle = MetadataBundle(f.read())
bundle.root_update_finished()
# test local load failure, then updating metadata succesfully
with self.assertRaises(exceptions.RepositoryError):
bundle.load_local_timestamp()
with open(os.path.join(remote_dir, "timestamp.json"), "rb") as f:
with open(os.path.join(repo_dir, "timestamp.json"), "rb") as f:
bundle.update_timestamp(f.read())
with open(os.path.join(remote_dir, "snapshot.json"), "rb") as f:
with open(os.path.join(repo_dir, "snapshot.json"), "rb") as f:
bundle.update_snapshot(f.read())
with open(os.path.join(remote_dir, "targets.json"), "rb") as f:
with open(os.path.join(repo_dir, "targets.json"), "rb") as f:
bundle.update_targets(f.read())
with open(os.path.join(remote_dir, "role1.json"), "rb") as f:
with open(os.path.join(repo_dir, "role1.json"), "rb") as f:
bundle.update_delegated_targets(f.read(), "role1", "targets")
with open(os.path.join(remote_dir, "role2.json"), "rb") as f:
with open(os.path.join(repo_dir, "role2.json"), "rb") as f:
bundle.update_delegated_targets(f.read(), "role2", "role1")
# test loading the metadata (that should now be locally available)
bundle = MetadataBundle(self.temp_dir)
def test_out_of_order_ops(self):
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata')
data={}
for md in ["root", "timestamp", "snapshot", "targets", "role1"]:
with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f:
data[md] = f.read()
bundle = MetadataBundle(data["root"])
# Update timestamp before root is finished
with self.assertRaises(RuntimeError):
bundle.update_timestamp(data["timestamp"])
bundle.root_update_finished()
bundle.load_local_timestamp()
bundle.load_local_snapshot()
bundle.load_local_targets()
bundle.load_local_delegated_targets('role1','targets')
bundle.load_local_delegated_targets('role2','role1')
# TODO test loading one version, then updating to new versions of each metadata
def test_local_load_with_invalid_data(self):
# Test root and one of the top-level metadata files
with tempfile.TemporaryDirectory() as tempdir:
# Missing root.json
with self.assertRaises(exceptions.RepositoryError):
MetadataBundle(tempdir)
# root.json not a json file at all
with open(os.path.join(tempdir, "root.json"), "w") as f:
f.write("")
with self.assertRaises(exceptions.RepositoryError):
MetadataBundle(tempdir)
# root.json does not validate
md = Metadata.from_file(os.path.join(self.temp_dir, "root.json"))
md.signed.version += 1
md.to_file(os.path.join(tempdir, "root.json"))
with self.assertRaises(exceptions.RepositoryError):
MetadataBundle(tempdir)
md.signed.version -= 1
md.to_file(os.path.join(tempdir, "root.json"))
bundle = MetadataBundle(tempdir)
with self.assertRaises(RuntimeError):
bundle.root_update_finished()
# Missing timestamp.json
with self.assertRaises(exceptions.RepositoryError):
bundle.load_local_timestamp()
# Update snapshot before timestamp
with self.assertRaises(RuntimeError):
bundle.update_snapshot(data["snapshot"])
# timestamp not a json file at all
with open(os.path.join(tempdir, "timestamp.json"), "w") as f:
f.write("")
with self.assertRaises(exceptions.RepositoryError):
bundle.load_local_timestamp()
bundle.update_timestamp(data["timestamp"])
# timestamp does not validate
md = Metadata.from_file(os.path.join(self.temp_dir, "timestamp.json"))
md.signed.version += 1
md.to_file(os.path.join(tempdir, "timestamp.json"))
with self.assertRaises(exceptions.RepositoryError):
bundle.load_local_timestamp()
# Update targets before snapshot
with self.assertRaises(RuntimeError):
bundle.update_targets(data["targets"])
md.signed.version -= 1
md.to_file(os.path.join(tempdir, "timestamp.json"))
bundle.load_local_timestamp()
bundle.update_snapshot(data["snapshot"])
def test_update_with_invalid_data(self):
# Test on of the top level metadata files
#update timestamp after snapshot
with self.assertRaises(RuntimeError):
bundle.update_timestamp(data["timestamp"])
timestamp_md = Metadata.from_file(os.path.join(self.temp_dir, "timestamp.json"))
# Update delegated targets before targets
with self.assertRaises(RuntimeError):
bundle.update_delegated_targets(data["role1"], "role1", "targets")
# remove all but root.json from local repo
os.remove(os.path.join(self.temp_dir, "timestamp.json"))
os.remove(os.path.join(self.temp_dir, "snapshot.json"))
os.remove(os.path.join(self.temp_dir, "targets.json"))
os.remove(os.path.join(self.temp_dir, "role1.json"))
os.remove(os.path.join(self.temp_dir, "role2.json"))
bundle.update_targets(data["targets"])
bundle.update_delegated_targets(data["role1"], "role1", "targets")
bundle = MetadataBundle(self.temp_dir)
def test_update_with_invalid_json(self):
repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata')
data={}
for md in ["root", "timestamp", "snapshot", "targets", "role1"]:
with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f:
data[md] = f.read()
# root.json not a json file at all
with self.assertRaises(exceptions.RepositoryError):
MetadataBundle(b"")
# root.json is invalid
root = Metadata.from_bytes(data["root"])
root.signed.version += 1
with self.assertRaises(exceptions.RepositoryError):
MetadataBundle(json.dumps(root.to_dict()).encode())
bundle = MetadataBundle(data["root"])
bundle.root_update_finished()
# timestamp not a json file at all
with self.assertRaises(exceptions.RepositoryError):
bundle.update_timestamp(b"")
top_level_md = [
(data["timestamp"], bundle.update_timestamp),
(data["snapshot"], bundle.update_snapshot),
(data["targets"], bundle.update_targets),
]
for metadata, update_func in top_level_md:
# metadata is not json
with self.assertRaises(exceptions.RepositoryError):
update_func(b"")
# metadata is invalid
md = Metadata.from_bytes(metadata)
md.signed.version += 1
with self.assertRaises(exceptions.RepositoryError):
update_func(json.dumps(md.to_dict()).encode())
# timestamp does not validate
timestamp_md.signed.version += 1
data = timestamp_md.to_dict()
with self.assertRaises(exceptions.RepositoryError):
bundle.update_timestamp(json.dumps(data).encode())
# metadata is of wrong type
with self.assertRaises(exceptions.RepositoryError):
update_func(data["root"])
timestamp_md.signed.version -= 1
data = timestamp_md.to_dict()
bundle.update_timestamp(json.dumps(data).encode())
update_func(metadata)
# TODO test updating over initial metadata (new keys, newer timestamp, etc)
# TODO test the actual specification checks
if __name__ == '__main__':

View file

@ -6,83 +6,66 @@
MetadataBundle keeps track of current valid set of metadata for the client,
and handles almost every step of the "Detailed client workflow" (
https://theupdateframework.github.io/specification/latest#detailed-client-workflow)
in the TUF specification (the remaining steps are download related). The
bundle takes care of persisting valid metadata on disk and loading local
metadata from disk.
in the TUF specification: the remaining steps are related to filesystem and
network IO which is not handled here.
Loaded metadata can be accessed via the index access with rolename as key
or, in the case of top-level metadata using the helper properties like
'MetadataBundle.root'
(bundle["root"]) or, in the case of top-level metadata using the helper
properties (bundle.root).
The rules for top-level metadata are
* Metadata is loadable only if metadata it depends on is loaded
* Metadata is immutable if any metadata depending on it has been loaded
* Metadata must be loaded/updated in order:
root -> timestamp -> snapshot -> targets -> (other delegated targets)
* For each metadata either local load or the remote update must succeed
* Caller should try loading local version before updating metadata from remote
(the exception is root where local data is loaded at MetadataBundle
initialization: the initialization fails if local data cannot be loaded)
Exceptions are raised if metadata fails to load in any way. The exception
to this is local loads -- only local root metadata needs to be valid:
other local metadata is allowed to be invalid (e.g. no longer signed):
it won't be loaded but there will not be an exception.
Exceptions are raised if metadata fails to load in any way.
Example (with hypothetical download function):
Example of loading root, timestamp and snapshot:
>>> # Load local root
>>> bundle = MetadataBundle("path/to/metadata")
>>> # Load local root (RepositoryErrors here stop the update)
>>> with open(root_path, "rb") as f:
>>> bundle = MetadataBundle(f.read())
>>>
>>> # update root until no more are available from remote
>>> # update root from remote until no more are available
>>> with download("root", bundle.root.signed.version + 1) as f:
>>> bundle.update_root(f.read())
>>> # ...
>>> bundle.root_update_finished()
>>>
>>> # load timestamp, then update from remote
>>> bundle.load_local_timestamp()
>>> # load local timestamp, then update from remote
>>> try:
>>> with open(timestamp_path, "rb") as f:
>>> bundle.update_timestamp(f.read())
>>> except (RepositoryError, OSError):
>>> pass # failure to load a local file is ok
>>>
>>> with download("timestamp") as f:
>>> bundle.update_timestamp(f.read())
>>>
>>> # load snapshot, update from remote if needed
>>> if not bundle.load_local_snapshot():
>>> # TODO get version from timestamp
>>> # load local snapshot, then update from remote if needed
>>> try:
>>> with open(snapshot_path, "rb") as f:
>>> bundle.update_snapshot(f.read())
>>> except (RepositoryError, OSError):
>>> # local snapshot is not valid, load from remote
>>> # (RepositoryErrors here stop the update)
>>> with download("snapshot", version) as f:
>>> bundle.update_snapshot(f.read())
>>>
>>> # load local targets, update from remote if needed
>>> if not bundle.load_local_targets():
>>> # TODO get version from snapshot
>>> with download("targets", version) as f:
>>> bundle.update_targets(f.read())
>>>
>>> # load local delegated role, update from remote if needed
>>> if not bundle.load_local_delegated_targets("rolename", "targets"):
>>> # TODO get version from snapshot
>>> with download("rolename", version) as f:
>>> bundle.update_targets(f.read(), "rolename", "targets")
TODO:
* exceptions are all over the place: the idea is that client could just handle
* exceptions are not final: the idea is that client could just handle
a generic RepositoryError that covers every issue that server provided
metadata could inflict (other errors would be user errors), but this is not
yet the case
* usefulness of root_update_finished() can be debated: it could be done
in the beginning of load_timestamp()...
* there are some divergences from spec: in general local metadata files are
not deleted (they just won't succesfully load)
* a bit of repetition
* No tests!
* Naming maybe not final?
* some metadata interactions might work better in Metadata itself
* Progress through Specification update process should be documented
(not sure yet how: maybe a spec_logger that logs specification events?)
"""
import logging
import os
from collections import abc
from datetime import datetime
from typing import Dict, Iterator, Optional
@ -142,23 +125,16 @@ class MetadataBundle(abc.Mapping):
update the metadata with the caller making decisions on what is updated.
"""
def __init__(self, repository_path: str):
"""Initialize by loading root metadata from disk"""
self._path = repository_path
def __init__(self, data: bytes):
"""Initialize by loading trusted root metadata"""
self._bundle = {} # type: Dict[str: Metadata]
self.reference_time = datetime.utcnow()
self._root_update_finished = False
# Load and validate the local root metadata
# Valid root metadata is required
logger.debug("Loading local root")
try:
with open(os.path.join(self._path, "root.json"), "rb") as f:
self._load_intermediate_root(f.read())
except (OSError, exceptions.RepositoryError) as e:
raise exceptions.RepositoryError(
"Failed to load local root metadata"
) from e
logger.debug("Updating initial trusted root")
self.update_root(data)
# Implement Mapping
def __getitem__(self, key: str) -> Metadata:
@ -187,110 +163,8 @@ def snapshot(self) -> Optional[Metadata]:
def targets(self) -> Optional[Metadata]:
return self._bundle.get("targets")
# Public methods
# Methods for updating metadata
def update_root(self, data: bytes):
"""Update root metadata with data from remote repository."""
logger.debug("Updating root")
self._load_intermediate_root(data)
with open(os.path.join(self._path, "root.json"), "wb") as f:
f.write(data)
def root_update_finished(self):
"""Mark root metadata as final."""
if self._root_update_finished:
raise RuntimeError("Root update is already finished")
if self.root.signed.is_expired(self.reference_time):
raise exceptions.ExpiredMetadataError("New root.json is expired")
# We skip specification step 5.3.11: deleting timestamp and snapshot
# with rotated keys is not needed as they will be invalid, are not
# loaded and cannot be loaded
self._root_update_finished = True
logger.debug("Verified final root.json")
def load_local_timestamp(self):
"""Load cached timestamp metadata from local storage."""
logger.debug("Loading local timestamp")
try:
with open(os.path.join(self._path, "timestamp.json"), "rb") as f:
self._load_timestamp(f.read())
except OSError as e:
raise exceptions.RepositoryError(
"Failed to load local timestamp"
) from e
def update_timestamp(self, data: bytes):
"""Update timestamp metadata with data from remote repository."""
logger.debug("Updating timestamp")
self._load_timestamp(data)
with open(os.path.join(self._path, "timestamp.json"), "wb") as f:
f.write(data)
def load_local_snapshot(self):
"""Load cached snapshot metadata from local storage."""
logger.debug("Loading local snapshot")
try:
with open(os.path.join(self._path, "snapshot.json"), "rb") as f:
self._load_snapshot(f.read())
except OSError as e:
raise exceptions.RepositoryError(
"Failed to load local snapshot"
) from e
def update_snapshot(self, data: bytes):
"""Update snapshot metadata with data from remote repository."""
logger.debug("Updating snapshot")
self._load_snapshot(data)
with open(os.path.join(self._path, "snapshot.json"), "wb") as f:
f.write(data)
def load_local_targets(self):
"""Load cached targets metadata from local storage."""
self.load_local_delegated_targets("targets", "root")
def update_targets(self, data: bytes):
"""Update targets metadata with data from remote repository."""
self.update_delegated_targets(data, "targets", "root")
def load_local_delegated_targets(self, role_name: str, delegator_name: str):
"""Load cached metadata for 'role_name' from local storage.
Metadata for 'delegator_name' must be loaded already.
"""
if self.get(role_name):
logger.debug("Local %s already loaded", role_name)
logger.debug("Loading local %s", role_name)
try:
with open(os.path.join(self._path, f"{role_name}.json"), "rb") as f:
self._load_delegated_targets(
f.read(), role_name, delegator_name
)
except OSError as e:
raise exceptions.RepositoryError(
f"Failed to load local {role_name}"
) from e
def update_delegated_targets(
self, data: bytes, role_name: str, delegator_name: str = None
):
"""Update 'rolename' metadata with data from remote repository.
Metadata for 'delegator_name' must be loaded already."""
logger.debug("Updating %s", role_name)
self._load_delegated_targets(data, role_name, delegator_name)
with open(os.path.join(self._path, f"{role_name}.json"), "wb") as f:
f.write(data)
def _load_intermediate_root(self, data: bytes):
"""Verifies and loads 'data' as new root metadata.
Note that an expired intermediate root is considered valid: expiry is
@ -299,6 +173,7 @@ def _load_intermediate_root(self, data: bytes):
raise RuntimeError(
"Cannot update root after root update is finished"
)
logger.debug("Updating root")
try:
new_root = Metadata.from_bytes(data)
@ -328,9 +203,23 @@ def _load_intermediate_root(self, data: bytes):
)
self._bundle["root"] = new_root
logger.debug("Loaded root")
logger.debug("Updated root")
def _load_timestamp(self, data: bytes):
def root_update_finished(self):
"""Mark root metadata as final."""
if self._root_update_finished:
raise RuntimeError("Root update is already finished")
if self.root.signed.is_expired(self.reference_time):
raise exceptions.ExpiredMetadataError("New root.json is expired")
# We skip specification step 5.3.11: deleting timestamp and snapshot
# with rotated keys is not needed as they will be invalid, are not
# loaded and cannot be loaded
self._root_update_finished = True
logger.debug("Verified final root.json")
def update_timestamp(self, data: bytes):
"""Verifies and loads 'data' as new timestamp metadata."""
if not self._root_update_finished:
# root_update_finished() not called
@ -377,16 +266,17 @@ def _load_timestamp(self, data: bytes):
raise exceptions.ExpiredMetadataError("New timestamp is expired")
self._bundle["timestamp"] = new_timestamp
logger.debug("Loaded timestamp")
logger.debug("Updated timestamp")
# TODO: remove pylint disable once the hash verification is in metadata.py
def _load_snapshot(self, data: bytes): # pylint: disable=too-many-branches
def update_snapshot(self, data: bytes): # pylint: disable=too-many-branches
"""Verifies and loads 'data' as new snapshot metadata."""
if self.timestamp is None:
raise RuntimeError("Cannot update snapshot before timestamp")
if self.targets is not None:
raise RuntimeError("Cannot update snapshot after targets")
logger.debug("Updating snapshot")
meta = self.timestamp.signed.meta["snapshot.json"]
@ -446,9 +336,13 @@ def _load_snapshot(self, data: bytes): # pylint: disable=too-many-branches
raise exceptions.ExpiredMetadataError("New snapshot is expired")
self._bundle["snapshot"] = new_snapshot
logger.debug("Loaded snapshot")
logger.debug("Updated snapshot")
def _load_delegated_targets(
def update_targets(self, data: bytes):
"""Verifies and loads 'data' as new top-level targets metadata."""
self.update_delegated_targets(data, "targets", "root")
def update_delegated_targets(
self, data: bytes, role_name: str, delegator_name: str
):
"""Verifies and loads 'data' as new metadata for target 'role_name'.
@ -462,6 +356,8 @@ def _load_delegated_targets(
if delegator is None:
raise RuntimeError("Cannot load targets before delegator")
logger.debug("Updating %s delegated by %s", role_name, delegator_name)
# Verify against the hashes in snapshot, if any
meta = self.snapshot.signed.meta.get(f"{role_name}.json")
if meta is None:
@ -504,4 +400,4 @@ def _load_delegated_targets(
raise exceptions.ExpiredMetadataError(f"New {role_name} is expired")
self._bundle[role_name] = new_delegate
logger.debug("Loaded %s delegated by %s", role_name, delegator_name)
logger.debug("Updated %s delegated by %s", role_name, delegator_name)