diff --git a/tests/test_metadata_bundle.py b/tests/test_metadata_bundle.py index 27b35daa..e758e6e7 100644 --- a/tests/test_metadata_bundle.py +++ b/tests/test_metadata_bundle.py @@ -15,166 +15,108 @@ logger = logging.getLogger(__name__) class TestMetadataBundle(unittest.TestCase): - @classmethod - def setUpClass(cls): - cls.temp_dir = tempfile.mkdtemp(dir=os.getcwd()) - - @classmethod - def tearDownClass(cls): - shutil.rmtree(cls.temp_dir) - - def setUp(self): - # copy metadata to "local repo" - shutil.copytree( - os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata'), - self.temp_dir, - dirs_exist_ok=True - ) - - def test_local_load(self): - - # test loading all local metadata succesfully - bundle = MetadataBundle(self.temp_dir) - bundle.root_update_finished() - bundle.load_local_timestamp() - bundle.load_local_snapshot() - bundle.load_local_targets() - bundle.load_local_delegated_targets('role1','targets') - bundle.load_local_delegated_targets('role2','role1') - - # Make sure loading metadata without its "dependencies" fails - bundle = MetadataBundle(self.temp_dir) - - with self.assertRaises(RuntimeError): - bundle.load_local_timestamp() - bundle.root_update_finished() - with self.assertRaises(RuntimeError): - bundle.load_local_snapshot() - bundle.load_local_timestamp() - with self.assertRaises(RuntimeError): - bundle.load_local_targets() - bundle.load_local_snapshot() - with self.assertRaises(RuntimeError): - bundle.load_local_delegated_targets('role1','targets') - bundle.load_local_targets() - with self.assertRaises(RuntimeError): - bundle.load_local_delegated_targets('role2','role1') - bundle.load_local_delegated_targets('role1','targets') - bundle.load_local_delegated_targets('role2','role1') def test_update(self): - remote_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata') + repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata') - # remove all but root.json from local repo - os.remove(os.path.join(self.temp_dir, "timestamp.json")) - os.remove(os.path.join(self.temp_dir, "snapshot.json")) - os.remove(os.path.join(self.temp_dir, "targets.json")) - os.remove(os.path.join(self.temp_dir, "role1.json")) - os.remove(os.path.join(self.temp_dir, "role2.json")) - - bundle = MetadataBundle(self.temp_dir) + with open(os.path.join(repo_dir, "root.json"), "rb") as f: + bundle = MetadataBundle(f.read()) bundle.root_update_finished() - # test local load failure, then updating metadata succesfully - with self.assertRaises(exceptions.RepositoryError): - bundle.load_local_timestamp() - with open(os.path.join(remote_dir, "timestamp.json"), "rb") as f: + with open(os.path.join(repo_dir, "timestamp.json"), "rb") as f: bundle.update_timestamp(f.read()) - with open(os.path.join(remote_dir, "snapshot.json"), "rb") as f: + with open(os.path.join(repo_dir, "snapshot.json"), "rb") as f: bundle.update_snapshot(f.read()) - with open(os.path.join(remote_dir, "targets.json"), "rb") as f: + with open(os.path.join(repo_dir, "targets.json"), "rb") as f: bundle.update_targets(f.read()) - with open(os.path.join(remote_dir, "role1.json"), "rb") as f: + with open(os.path.join(repo_dir, "role1.json"), "rb") as f: bundle.update_delegated_targets(f.read(), "role1", "targets") - with open(os.path.join(remote_dir, "role2.json"), "rb") as f: + with open(os.path.join(repo_dir, "role2.json"), "rb") as f: bundle.update_delegated_targets(f.read(), "role2", "role1") - # test loading the metadata (that should now be locally available) - bundle = MetadataBundle(self.temp_dir) + def test_out_of_order_ops(self): + repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata') + data={} + for md in ["root", "timestamp", "snapshot", "targets", "role1"]: + with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f: + data[md] = f.read() + + bundle = MetadataBundle(data["root"]) + + # Update timestamp before root is finished + with self.assertRaises(RuntimeError): + bundle.update_timestamp(data["timestamp"]) + bundle.root_update_finished() - bundle.load_local_timestamp() - bundle.load_local_snapshot() - bundle.load_local_targets() - bundle.load_local_delegated_targets('role1','targets') - bundle.load_local_delegated_targets('role2','role1') - - # TODO test loading one version, then updating to new versions of each metadata - - def test_local_load_with_invalid_data(self): - # Test root and one of the top-level metadata files - - with tempfile.TemporaryDirectory() as tempdir: - # Missing root.json - with self.assertRaises(exceptions.RepositoryError): - MetadataBundle(tempdir) - - # root.json not a json file at all - with open(os.path.join(tempdir, "root.json"), "w") as f: - f.write("") - with self.assertRaises(exceptions.RepositoryError): - MetadataBundle(tempdir) - - # root.json does not validate - md = Metadata.from_file(os.path.join(self.temp_dir, "root.json")) - md.signed.version += 1 - md.to_file(os.path.join(tempdir, "root.json")) - with self.assertRaises(exceptions.RepositoryError): - MetadataBundle(tempdir) - - md.signed.version -= 1 - md.to_file(os.path.join(tempdir, "root.json")) - bundle = MetadataBundle(tempdir) + with self.assertRaises(RuntimeError): bundle.root_update_finished() - # Missing timestamp.json - with self.assertRaises(exceptions.RepositoryError): - bundle.load_local_timestamp() + # Update snapshot before timestamp + with self.assertRaises(RuntimeError): + bundle.update_snapshot(data["snapshot"]) - # timestamp not a json file at all - with open(os.path.join(tempdir, "timestamp.json"), "w") as f: - f.write("") - with self.assertRaises(exceptions.RepositoryError): - bundle.load_local_timestamp() + bundle.update_timestamp(data["timestamp"]) - # timestamp does not validate - md = Metadata.from_file(os.path.join(self.temp_dir, "timestamp.json")) - md.signed.version += 1 - md.to_file(os.path.join(tempdir, "timestamp.json")) - with self.assertRaises(exceptions.RepositoryError): - bundle.load_local_timestamp() + # Update targets before snapshot + with self.assertRaises(RuntimeError): + bundle.update_targets(data["targets"]) - md.signed.version -= 1 - md.to_file(os.path.join(tempdir, "timestamp.json")) - bundle.load_local_timestamp() + bundle.update_snapshot(data["snapshot"]) - def test_update_with_invalid_data(self): - # Test on of the top level metadata files + #update timestamp after snapshot + with self.assertRaises(RuntimeError): + bundle.update_timestamp(data["timestamp"]) - timestamp_md = Metadata.from_file(os.path.join(self.temp_dir, "timestamp.json")) + # Update delegated targets before targets + with self.assertRaises(RuntimeError): + bundle.update_delegated_targets(data["role1"], "role1", "targets") - # remove all but root.json from local repo - os.remove(os.path.join(self.temp_dir, "timestamp.json")) - os.remove(os.path.join(self.temp_dir, "snapshot.json")) - os.remove(os.path.join(self.temp_dir, "targets.json")) - os.remove(os.path.join(self.temp_dir, "role1.json")) - os.remove(os.path.join(self.temp_dir, "role2.json")) + bundle.update_targets(data["targets"]) + bundle.update_delegated_targets(data["role1"], "role1", "targets") - bundle = MetadataBundle(self.temp_dir) + def test_update_with_invalid_json(self): + repo_dir = os.path.join(os.getcwd(), 'repository_data', 'repository', 'metadata') + data={} + for md in ["root", "timestamp", "snapshot", "targets", "role1"]: + with open(os.path.join(repo_dir, f"{md}.json"), "rb") as f: + data[md] = f.read() + + # root.json not a json file at all + with self.assertRaises(exceptions.RepositoryError): + MetadataBundle(b"") + # root.json is invalid + root = Metadata.from_bytes(data["root"]) + root.signed.version += 1 + with self.assertRaises(exceptions.RepositoryError): + MetadataBundle(json.dumps(root.to_dict()).encode()) + + bundle = MetadataBundle(data["root"]) bundle.root_update_finished() - # timestamp not a json file at all - with self.assertRaises(exceptions.RepositoryError): - bundle.update_timestamp(b"") + top_level_md = [ + (data["timestamp"], bundle.update_timestamp), + (data["snapshot"], bundle.update_snapshot), + (data["targets"], bundle.update_targets), + ] + for metadata, update_func in top_level_md: + # metadata is not json + with self.assertRaises(exceptions.RepositoryError): + update_func(b"") + # metadata is invalid + md = Metadata.from_bytes(metadata) + md.signed.version += 1 + with self.assertRaises(exceptions.RepositoryError): + update_func(json.dumps(md.to_dict()).encode()) - # timestamp does not validate - timestamp_md.signed.version += 1 - data = timestamp_md.to_dict() - with self.assertRaises(exceptions.RepositoryError): - bundle.update_timestamp(json.dumps(data).encode()) + # metadata is of wrong type + with self.assertRaises(exceptions.RepositoryError): + update_func(data["root"]) - timestamp_md.signed.version -= 1 - data = timestamp_md.to_dict() - bundle.update_timestamp(json.dumps(data).encode()) + update_func(metadata) + + + # TODO test updating over initial metadata (new keys, newer timestamp, etc) + # TODO test the actual specification checks if __name__ == '__main__': diff --git a/tuf/client_rework/metadata_bundle.py b/tuf/client_rework/metadata_bundle.py index 7976a2c5..470f750e 100644 --- a/tuf/client_rework/metadata_bundle.py +++ b/tuf/client_rework/metadata_bundle.py @@ -6,83 +6,66 @@ MetadataBundle keeps track of current valid set of metadata for the client, and handles almost every step of the "Detailed client workflow" ( https://theupdateframework.github.io/specification/latest#detailed-client-workflow) -in the TUF specification (the remaining steps are download related). The -bundle takes care of persisting valid metadata on disk and loading local -metadata from disk. +in the TUF specification: the remaining steps are related to filesystem and +network IO which is not handled here. Loaded metadata can be accessed via the index access with rolename as key -or, in the case of top-level metadata using the helper properties like -'MetadataBundle.root' +(bundle["root"]) or, in the case of top-level metadata using the helper +properties (bundle.root). The rules for top-level metadata are * Metadata is loadable only if metadata it depends on is loaded * Metadata is immutable if any metadata depending on it has been loaded * Metadata must be loaded/updated in order: root -> timestamp -> snapshot -> targets -> (other delegated targets) - * For each metadata either local load or the remote update must succeed - * Caller should try loading local version before updating metadata from remote - (the exception is root where local data is loaded at MetadataBundle - initialization: the initialization fails if local data cannot be loaded) -Exceptions are raised if metadata fails to load in any way. The exception -to this is local loads -- only local root metadata needs to be valid: -other local metadata is allowed to be invalid (e.g. no longer signed): -it won't be loaded but there will not be an exception. +Exceptions are raised if metadata fails to load in any way. -Example (with hypothetical download function): +Example of loading root, timestamp and snapshot: ->>> # Load local root ->>> bundle = MetadataBundle("path/to/metadata") +>>> # Load local root (RepositoryErrors here stop the update) +>>> with open(root_path, "rb") as f: +>>> bundle = MetadataBundle(f.read()) >>> ->>> # update root until no more are available from remote +>>> # update root from remote until no more are available >>> with download("root", bundle.root.signed.version + 1) as f: >>> bundle.update_root(f.read()) >>> # ... >>> bundle.root_update_finished() >>> ->>> # load timestamp, then update from remote ->>> bundle.load_local_timestamp() +>>> # load local timestamp, then update from remote +>>> try: +>>> with open(timestamp_path, "rb") as f: +>>> bundle.update_timestamp(f.read()) +>>> except (RepositoryError, OSError): +>>> pass # failure to load a local file is ok +>>> >>> with download("timestamp") as f: >>> bundle.update_timestamp(f.read()) >>> ->>> # load snapshot, update from remote if needed ->>> if not bundle.load_local_snapshot(): ->>> # TODO get version from timestamp +>>> # load local snapshot, then update from remote if needed +>>> try: +>>> with open(snapshot_path, "rb") as f: +>>> bundle.update_snapshot(f.read()) +>>> except (RepositoryError, OSError): +>>> # local snapshot is not valid, load from remote +>>> # (RepositoryErrors here stop the update) >>> with download("snapshot", version) as f: >>> bundle.update_snapshot(f.read()) ->>> ->>> # load local targets, update from remote if needed ->>> if not bundle.load_local_targets(): ->>> # TODO get version from snapshot ->>> with download("targets", version) as f: ->>> bundle.update_targets(f.read()) ->>> ->>> # load local delegated role, update from remote if needed ->>> if not bundle.load_local_delegated_targets("rolename", "targets"): ->>> # TODO get version from snapshot ->>> with download("rolename", version) as f: ->>> bundle.update_targets(f.read(), "rolename", "targets") - TODO: - * exceptions are all over the place: the idea is that client could just handle + * exceptions are not final: the idea is that client could just handle a generic RepositoryError that covers every issue that server provided metadata could inflict (other errors would be user errors), but this is not yet the case * usefulness of root_update_finished() can be debated: it could be done in the beginning of load_timestamp()... - * there are some divergences from spec: in general local metadata files are - not deleted (they just won't succesfully load) - * a bit of repetition - * No tests! - * Naming maybe not final? * some metadata interactions might work better in Metadata itself * Progress through Specification update process should be documented (not sure yet how: maybe a spec_logger that logs specification events?) """ import logging -import os from collections import abc from datetime import datetime from typing import Dict, Iterator, Optional @@ -142,23 +125,16 @@ class MetadataBundle(abc.Mapping): update the metadata with the caller making decisions on what is updated. """ - def __init__(self, repository_path: str): - """Initialize by loading root metadata from disk""" - self._path = repository_path + def __init__(self, data: bytes): + """Initialize by loading trusted root metadata""" self._bundle = {} # type: Dict[str: Metadata] self.reference_time = datetime.utcnow() self._root_update_finished = False # Load and validate the local root metadata # Valid root metadata is required - logger.debug("Loading local root") - try: - with open(os.path.join(self._path, "root.json"), "rb") as f: - self._load_intermediate_root(f.read()) - except (OSError, exceptions.RepositoryError) as e: - raise exceptions.RepositoryError( - "Failed to load local root metadata" - ) from e + logger.debug("Updating initial trusted root") + self.update_root(data) # Implement Mapping def __getitem__(self, key: str) -> Metadata: @@ -187,110 +163,8 @@ def snapshot(self) -> Optional[Metadata]: def targets(self) -> Optional[Metadata]: return self._bundle.get("targets") - # Public methods + # Methods for updating metadata def update_root(self, data: bytes): - """Update root metadata with data from remote repository.""" - logger.debug("Updating root") - - self._load_intermediate_root(data) - with open(os.path.join(self._path, "root.json"), "wb") as f: - f.write(data) - - def root_update_finished(self): - """Mark root metadata as final.""" - if self._root_update_finished: - raise RuntimeError("Root update is already finished") - - if self.root.signed.is_expired(self.reference_time): - raise exceptions.ExpiredMetadataError("New root.json is expired") - - # We skip specification step 5.3.11: deleting timestamp and snapshot - # with rotated keys is not needed as they will be invalid, are not - # loaded and cannot be loaded - self._root_update_finished = True - logger.debug("Verified final root.json") - - def load_local_timestamp(self): - """Load cached timestamp metadata from local storage.""" - logger.debug("Loading local timestamp") - - try: - with open(os.path.join(self._path, "timestamp.json"), "rb") as f: - self._load_timestamp(f.read()) - except OSError as e: - raise exceptions.RepositoryError( - "Failed to load local timestamp" - ) from e - - def update_timestamp(self, data: bytes): - """Update timestamp metadata with data from remote repository.""" - logger.debug("Updating timestamp") - - self._load_timestamp(data) - with open(os.path.join(self._path, "timestamp.json"), "wb") as f: - f.write(data) - - def load_local_snapshot(self): - """Load cached snapshot metadata from local storage.""" - logger.debug("Loading local snapshot") - - try: - with open(os.path.join(self._path, "snapshot.json"), "rb") as f: - self._load_snapshot(f.read()) - except OSError as e: - raise exceptions.RepositoryError( - "Failed to load local snapshot" - ) from e - - def update_snapshot(self, data: bytes): - """Update snapshot metadata with data from remote repository.""" - logger.debug("Updating snapshot") - - self._load_snapshot(data) - with open(os.path.join(self._path, "snapshot.json"), "wb") as f: - f.write(data) - - def load_local_targets(self): - """Load cached targets metadata from local storage.""" - self.load_local_delegated_targets("targets", "root") - - def update_targets(self, data: bytes): - """Update targets metadata with data from remote repository.""" - self.update_delegated_targets(data, "targets", "root") - - def load_local_delegated_targets(self, role_name: str, delegator_name: str): - """Load cached metadata for 'role_name' from local storage. - - Metadata for 'delegator_name' must be loaded already. - """ - if self.get(role_name): - logger.debug("Local %s already loaded", role_name) - - logger.debug("Loading local %s", role_name) - - try: - with open(os.path.join(self._path, f"{role_name}.json"), "rb") as f: - self._load_delegated_targets( - f.read(), role_name, delegator_name - ) - except OSError as e: - raise exceptions.RepositoryError( - f"Failed to load local {role_name}" - ) from e - - def update_delegated_targets( - self, data: bytes, role_name: str, delegator_name: str = None - ): - """Update 'rolename' metadata with data from remote repository. - - Metadata for 'delegator_name' must be loaded already.""" - logger.debug("Updating %s", role_name) - - self._load_delegated_targets(data, role_name, delegator_name) - with open(os.path.join(self._path, f"{role_name}.json"), "wb") as f: - f.write(data) - - def _load_intermediate_root(self, data: bytes): """Verifies and loads 'data' as new root metadata. Note that an expired intermediate root is considered valid: expiry is @@ -299,6 +173,7 @@ def _load_intermediate_root(self, data: bytes): raise RuntimeError( "Cannot update root after root update is finished" ) + logger.debug("Updating root") try: new_root = Metadata.from_bytes(data) @@ -328,9 +203,23 @@ def _load_intermediate_root(self, data: bytes): ) self._bundle["root"] = new_root - logger.debug("Loaded root") + logger.debug("Updated root") - def _load_timestamp(self, data: bytes): + def root_update_finished(self): + """Mark root metadata as final.""" + if self._root_update_finished: + raise RuntimeError("Root update is already finished") + + if self.root.signed.is_expired(self.reference_time): + raise exceptions.ExpiredMetadataError("New root.json is expired") + + # We skip specification step 5.3.11: deleting timestamp and snapshot + # with rotated keys is not needed as they will be invalid, are not + # loaded and cannot be loaded + self._root_update_finished = True + logger.debug("Verified final root.json") + + def update_timestamp(self, data: bytes): """Verifies and loads 'data' as new timestamp metadata.""" if not self._root_update_finished: # root_update_finished() not called @@ -377,16 +266,17 @@ def _load_timestamp(self, data: bytes): raise exceptions.ExpiredMetadataError("New timestamp is expired") self._bundle["timestamp"] = new_timestamp - logger.debug("Loaded timestamp") + logger.debug("Updated timestamp") # TODO: remove pylint disable once the hash verification is in metadata.py - def _load_snapshot(self, data: bytes): # pylint: disable=too-many-branches + def update_snapshot(self, data: bytes): # pylint: disable=too-many-branches """Verifies and loads 'data' as new snapshot metadata.""" if self.timestamp is None: raise RuntimeError("Cannot update snapshot before timestamp") if self.targets is not None: raise RuntimeError("Cannot update snapshot after targets") + logger.debug("Updating snapshot") meta = self.timestamp.signed.meta["snapshot.json"] @@ -446,9 +336,13 @@ def _load_snapshot(self, data: bytes): # pylint: disable=too-many-branches raise exceptions.ExpiredMetadataError("New snapshot is expired") self._bundle["snapshot"] = new_snapshot - logger.debug("Loaded snapshot") + logger.debug("Updated snapshot") - def _load_delegated_targets( + def update_targets(self, data: bytes): + """Verifies and loads 'data' as new top-level targets metadata.""" + self.update_delegated_targets(data, "targets", "root") + + def update_delegated_targets( self, data: bytes, role_name: str, delegator_name: str ): """Verifies and loads 'data' as new metadata for target 'role_name'. @@ -462,6 +356,8 @@ def _load_delegated_targets( if delegator is None: raise RuntimeError("Cannot load targets before delegator") + logger.debug("Updating %s delegated by %s", role_name, delegator_name) + # Verify against the hashes in snapshot, if any meta = self.snapshot.signed.meta.get(f"{role_name}.json") if meta is None: @@ -504,4 +400,4 @@ def _load_delegated_targets( raise exceptions.ExpiredMetadataError(f"New {role_name} is expired") self._bundle[role_name] = new_delegate - logger.debug("Loaded %s delegated by %s", role_name, delegator_name) + logger.debug("Updated %s delegated by %s", role_name, delegator_name)