Embed modification function helpers

Instead of using general abstract modification functions embed smaller
modification functions inside each test where it's needed and
create modify_metadata function that does all of the common stuff like:
- instantiating a metadata object
- calling the modification function
- signing the modified object
- serializing back to bytes.

Signed-off-by: Martin Vrachev <mvrachev@vmware.com>
This commit is contained in:
Martin Vrachev 2021-07-21 17:34:23 +03:00
parent 71838562dc
commit 11531caf42

View file

@ -1,12 +1,12 @@
import logging
from typing import Optional, Union
from typing import Optional, Union, Callable
import os
import sys
import unittest
from datetime import datetime
from tuf import exceptions
from tuf.api.metadata import Metadata, MetaFile
from tuf.api.metadata import Metadata, Signed, Timestamp, Snapshot, MetaFile
from tuf.ngclient._internal.trusted_metadata_set import TrustedMetadataSet
from securesystemslib.signer import SSlibSigner
@ -84,41 +84,15 @@ def _update_all_besides_targets(
snapshot_bytes = snapshot_bytes or self.metadata["snapshot"]
self.trusted_set.update_snapshot(snapshot_bytes)
def _modify_timestamp_meta(self, version: Optional[int] = 1):
"""Remove hashes and length from timestamp.meta["snapshot.json"].
Create a timestamp.meta["snapshot.json"] containing only version.
Args:
version:
Version used when instantiating MetaFile for timestamp.meta.
"""
timestamp = Metadata.from_bytes(self.metadata["timestamp"])
timestamp.signed.meta["snapshot.json"] = MetaFile(version)
timestamp.sign(self.keystore["timestamp"])
return timestamp.to_bytes()
def _modify_snapshot_meta(
self, version: Union[int, None] = 1, length: Optional[int]= None
):
"""Modify hashes and length from snapshot_meta.meta["snapshot.json"].
If version and length is None, then snapshot meta will be an empty dictionary.
Args:
version:
Version used when instantiating MetaFile for snapshot.meta.
length:
Length used when instantiating MetaFile for snapshot.meta.
"""
snapshot = Metadata.from_bytes(self.metadata["snapshot"])
if version is None and length is None:
snapshot.signed.meta = {}
else:
for metafile_path in snapshot.signed.meta:
snapshot.signed.meta[metafile_path] = MetaFile(version, length)
snapshot.sign(self.keystore["snapshot"])
return snapshot.to_bytes()
def modify_metadata(
self, rolename: str, modification_func: Callable[["Signed"], None]
):
"""Instantiate metadata from rolename type, call modification_func and
sign it again with self.keystore[rolename] signer."""
metadata = Metadata.from_bytes(self.metadata[rolename])
modification_func(metadata.signed)
metadata.sign(self.keystore[rolename])
return metadata.to_bytes()
def test_update(self):
self.trusted_set.root_update_finished()
@ -255,7 +229,10 @@ def test_update_timestamp_new_timestamp_ver_below_trusted_ver(self):
self.trusted_set.update_timestamp(self.metadata["timestamp"])
def test_update_timestamp_snapshot_ver_below_trusted_snapshot_ver(self):
modified_timestamp = self._modify_timestamp_meta(version=3)
def version_modifier(timestamp: Timestamp):
timestamp.version = 3
modified_timestamp = self.modify_metadata("timestamp", version_modifier)
self._root_updated_and_update_timestamp(modified_timestamp)
# new_timestamp.snapshot.version < trusted_timestamp.snapshot.version
with self.assertRaises(exceptions.ReplayedMetadataError):
@ -272,16 +249,27 @@ def test_update_timestamp_expired(self):
def test_update_snapshot_cannot_verify_snapshot_with_threshold(self):
modified_timestamp = self._modify_timestamp_meta()
self._root_updated_and_update_timestamp(modified_timestamp)
def hashes_length_modifier(timestamp: Timestamp):
timestamp.meta["snapshot.json"].hashes = None
timestamp.meta["snapshot.json"].length = None
timestamp = self.modify_metadata("timestamp", hashes_length_modifier)
self._root_updated_and_update_timestamp(timestamp)
snapshot = Metadata.from_bytes(self.metadata["snapshot"])
snapshot.signatures.clear()
with self.assertRaises(exceptions.UnsignedMetadataError):
self.trusted_set.update_snapshot(snapshot.to_bytes())
def test_update_snapshot_version_different_timestamp_snapshot_version(self):
modified_timestamp = self._modify_timestamp_meta(version=2)
self._root_updated_and_update_timestamp(modified_timestamp)
def hashes_length_version_modifier(timestamp: Timestamp):
timestamp.meta["snapshot.json"].hashes = None
timestamp.meta["snapshot.json"].length = None
timestamp.meta["snapshot.json"].version = 2
timestamp = self.modify_metadata(
"timestamp", hashes_length_version_modifier
)
self._root_updated_and_update_timestamp(timestamp)
# new_snapshot.version != trusted timestamp.meta["snapshot"].version
snapshot = Metadata.from_bytes(self.metadata["snapshot"])
snapshot.signed.version = 3
@ -290,8 +278,12 @@ def test_update_snapshot_version_different_timestamp_snapshot_version(self):
self.trusted_set.update_snapshot(snapshot.to_bytes())
def test_update_snapshot_after_successful_update_new_snapshot_no_meta(self):
modified_timestamp = self._modify_timestamp_meta()
self._update_all_besides_targets(modified_timestamp)
def hashes_length_modifier(timestamp: Timestamp):
timestamp.meta["snapshot.json"].hashes = None
timestamp.meta["snapshot.json"].length = None
timestamp = self.modify_metadata("timestamp", hashes_length_modifier)
self._update_all_besides_targets(timestamp)
# Test removing a meta_file in new_snapshot compared to the old snapshot
snapshot = Metadata.from_bytes(self.metadata["snapshot"])
snapshot.signed.meta = {}
@ -300,8 +292,12 @@ def test_update_snapshot_after_successful_update_new_snapshot_no_meta(self):
self.trusted_set.update_snapshot(snapshot.to_bytes())
def test_update_snapshot_after_succesfull_update_new_snapshot_meta_version_different(self):
modified_timestamp = self._modify_timestamp_meta()
self._root_updated_and_update_timestamp(modified_timestamp)
def hashes_length_modifier(timestamp: Timestamp):
timestamp.meta["snapshot.json"].hashes = None
timestamp.meta["snapshot.json"].length = None
timestamp = self.modify_metadata("timestamp", hashes_length_modifier)
self._root_updated_and_update_timestamp(timestamp)
# snapshot.meta["project1"].version != new_snapshot.meta["project1"].version
snapshot = Metadata.from_bytes(self.metadata["snapshot"])
for metafile_path in snapshot.signed.meta.keys():
@ -312,8 +308,12 @@ def test_update_snapshot_after_succesfull_update_new_snapshot_meta_version_diffe
self.trusted_set.update_snapshot(self.metadata["snapshot"])
def test_update_snapshot_expired_new_snapshot(self):
modified_timestamp = self._modify_timestamp_meta()
self._root_updated_and_update_timestamp(modified_timestamp)
def hashes_length_modifier(timestamp: Timestamp):
timestamp.meta["snapshot.json"].hashes = None
timestamp.meta["snapshot.json"].length = None
timestamp = self.modify_metadata("timestamp", hashes_length_modifier)
self._root_updated_and_update_timestamp(timestamp)
# new_snapshot has expired
snapshot = Metadata.from_bytes(self.metadata["snapshot"])
snapshot.signed.expires = datetime(1970, 1, 1)
@ -323,34 +323,48 @@ def test_update_snapshot_expired_new_snapshot(self):
def test_update_targets_no_meta_in_snapshot(self):
modified_timestamp = self._modify_timestamp_meta()
modified_snapshot = self._modify_snapshot_meta(version=None)
self._update_all_besides_targets(
timestamp_bytes = modified_timestamp,
snapshot_bytes= modified_snapshot
)
def hashes_length_modifier(timestamp: Timestamp):
timestamp.meta["snapshot.json"].hashes = None
timestamp.meta["snapshot.json"].length = None
timestamp = self.modify_metadata("timestamp", hashes_length_modifier)
def no_meta_modifier(snapshot: Snapshot):
snapshot.meta = {}
snapshot = self.modify_metadata("snapshot", no_meta_modifier)
self._update_all_besides_targets(timestamp, snapshot)
# remove meta information with information about targets from snapshot
with self.assertRaises(exceptions.RepositoryError):
self.trusted_set.update_targets(self.metadata["targets"])
def test_update_targets_hash_different_than_snapshot_meta_hash(self):
modified_timestamp = self._modify_timestamp_meta()
modified_snapshot = self._modify_snapshot_meta(version=1, length=1)
self._update_all_besides_targets(
timestamp_bytes = modified_timestamp,
snapshot_bytes= modified_snapshot
)
def hashes_length_modifier(timestamp: Timestamp):
timestamp.meta["snapshot.json"].hashes = None
timestamp.meta["snapshot.json"].length = None
timestamp = self.modify_metadata("timestamp", hashes_length_modifier)
def meta_length_modifier(snapshot: Snapshot):
for metafile_path in snapshot.meta:
snapshot.meta[metafile_path] = MetaFile(version=1, length=1)
snapshot = self.modify_metadata("snapshot", meta_length_modifier)
self._update_all_besides_targets(timestamp, snapshot)
# observed_hash != stored hash in snapshot meta for targets
with self.assertRaises(exceptions.RepositoryError):
self.trusted_set.update_targets(self.metadata["targets"])
def test_update_targets_version_different_snapshot_meta_version(self):
modified_timestamp = self._modify_timestamp_meta()
modified_snapshot = self._modify_snapshot_meta(version=2)
self._update_all_besides_targets(
timestamp_bytes = modified_timestamp,
snapshot_bytes= modified_snapshot
)
def hashes_length_modifier(timestamp: Timestamp):
timestamp.meta["snapshot.json"].hashes = None
timestamp.meta["snapshot.json"].length = None
timestamp = self.modify_metadata("timestamp", hashes_length_modifier)
def meta_modifier(snapshot: Snapshot):
for metafile_path in snapshot.meta:
snapshot.meta[metafile_path] = MetaFile(version=2)
snapshot = self.modify_metadata("snapshot", meta_modifier)
self._update_all_besides_targets(timestamp, snapshot)
# new_delegate.signed.version != meta.version stored in snapshot
with self.assertRaises(exceptions.BadVersionNumberError):
self.trusted_set.update_targets(self.metadata["targets"])