New API: Add MetaFile class

In the top-level metadata classes, there are complex attributes such as
"meta" in Targets and Snapshot, "key" and "roles" in Root etc.
We want to represent those complex attributes with a class to allow
easier verification and support for metadata with unrecognized fields.
For more context read ADR 0004 and ADR 0008 in the docs/adr folder.

Additionally, after adding the MetaFile class, when we create an object
we are now calling from dict twice - one for the main class (Timestamp,
Snapshot) and one for the pacticular complex attribute -
MetaFile.from_dict(). Given that the "from_dict" methods have the
side effect of destroying the given dictionary, we would need to
start using deepcopy() for our tests.

Signed-off-by: Martin Vrachev <mvrachev@vmware.com>
This commit is contained in:
Martin Vrachev 2021-03-30 17:39:10 +03:00
parent 8348523b77
commit 3771a77ffe
2 changed files with 111 additions and 73 deletions

View file

@ -29,6 +29,7 @@
Targets,
Key,
Role,
MetaFile,
Delegations,
DelegatedRole,
)
@ -246,28 +247,31 @@ def test_metadata_snapshot(self):
self.repo_dir, 'metadata', 'snapshot.json')
snapshot = Metadata.from_file(snapshot_path)
# Create a dict representing what we expect the updated data to be
fileinfo = copy.deepcopy(snapshot.signed.meta)
# Create a MetaFile instance representing what we expect
# the updated data to be.
hashes = {'sha256': 'c2986576f5fdfd43944e2b19e775453b96748ec4fe2638a6d2f32f1310967095'}
fileinfo['role1.json']['version'] = 2
fileinfo['role1.json']['hashes'] = hashes
fileinfo['role1.json']['length'] = 123
fileinfo = MetaFile(2, 123, hashes)
self.assertNotEqual(snapshot.signed.meta, fileinfo)
self.assertNotEqual(
snapshot.signed.meta['role1.json'].to_dict(), fileinfo.to_dict()
)
snapshot.signed.update('role1', 2, 123, hashes)
self.assertEqual(snapshot.signed.meta, fileinfo)
self.assertEqual(
snapshot.signed.meta['role1.json'].to_dict(), fileinfo.to_dict()
)
# Update only version. Length and hashes are optional.
snapshot.signed.update('role2', 3)
fileinfo['role2.json'] = {'version': 3}
self.assertEqual(snapshot.signed.meta, fileinfo)
fileinfo = MetaFile(3)
self.assertEqual(
snapshot.signed.meta['role2.json'].to_dict(), fileinfo.to_dict()
)
# Test from_dict and to_dict without hashes and length.
snapshot_dict = snapshot.to_dict()
test_dict = snapshot_dict['signed'].copy()
del test_dict['meta']['role1.json']['length']
del test_dict['meta']['role1.json']['hashes']
del snapshot_dict['signed']['meta']['role1.json']['length']
del snapshot_dict['signed']['meta']['role1.json']['hashes']
test_dict = copy.deepcopy(snapshot_dict['signed'])
snapshot = Snapshot.from_dict(test_dict)
self.assertEqual(snapshot_dict['signed'], snapshot.to_dict())
@ -295,28 +299,33 @@ def test_metadata_timestamp(self):
timestamp.signed.bump_expiration(delta)
self.assertEqual(timestamp.signed.expires, datetime(2036, 1, 3, 0, 0))
# Create a MetaFile instance representing what we expect
# the updated data to be.
hashes = {'sha256': '0ae9664468150a9aa1e7f11feecb32341658eb84292851367fea2da88e8a58dc'}
fileinfo = copy.deepcopy(timestamp.signed.meta['snapshot.json'])
fileinfo['hashes'] = hashes
fileinfo['version'] = 2
fileinfo['length'] = 520
fileinfo = MetaFile(2, 520, hashes)
self.assertNotEqual(timestamp.signed.meta['snapshot.json'], fileinfo)
self.assertNotEqual(
timestamp.signed.meta['snapshot.json'].to_dict(), fileinfo.to_dict()
)
timestamp.signed.update(2, 520, hashes)
self.assertEqual(timestamp.signed.meta['snapshot.json'], fileinfo)
self.assertEqual(
timestamp.signed.meta['snapshot.json'].to_dict(), fileinfo.to_dict()
)
# Test from_dict and to_dict without hashes and length.
timestamp_dict = timestamp.to_dict()
test_dict = timestamp_dict['signed'].copy()
del test_dict['meta']['snapshot.json']['length']
del test_dict['meta']['snapshot.json']['hashes']
del timestamp_dict['signed']['meta']['snapshot.json']['length']
del timestamp_dict['signed']['meta']['snapshot.json']['hashes']
test_dict = copy.deepcopy(timestamp_dict['signed'])
timestamp_test = Timestamp.from_dict(test_dict)
self.assertEqual(timestamp_dict['signed'], timestamp_test.to_dict())
# Update only version. Length and hashes are optional.
timestamp.signed.update(3)
fileinfo = {'version': 3}
self.assertEqual(timestamp.signed.meta['snapshot.json'], fileinfo)
fileinfo = MetaFile(version=3)
self.assertEqual(
timestamp.signed.meta['snapshot.json'].to_dict(), fileinfo.to_dict()
)
def test_key_class(self):
keys = {

View file

@ -598,6 +598,60 @@ def remove_key(self, role: str, keyid: str) -> None:
del self.keys[keyid]
class MetaFile:
"""A container with information about a particular metadata file.
Attributes:
version: An integer indicating the version of the metadata file.
length: An optional integer indicating the length of the metadata file.
hashes: An optional dictionary mapping hash algorithms to the
hashes resulting from applying them over the metadata file
contents.::
'hashes': {
'<HASH ALGO 1>': '<METADATA FILE HASH 1>',
'<HASH ALGO 2>': '<METADATA FILE HASH 2>',
...
}
unrecognized_fields: Dictionary of all unrecognized fields.
"""
def __init__(
self,
version: int,
length: Optional[int] = None,
hashes: Optional[Dict[str, str]] = None,
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
self.version = version
self.length = length
self.hashes = hashes
self.unrecognized_fields: Mapping[str, Any] = unrecognized_fields or {}
@classmethod
def from_dict(cls, meta_dict: Dict[str, Any]) -> "MetaFile":
"""Creates MetaFile object from its dict representation."""
version = meta_dict.pop("version")
length = meta_dict.pop("length", None)
hashes = meta_dict.pop("hashes", None)
# All fields left in the meta_dict are unrecognized.
return cls(version, length, hashes, meta_dict)
def to_dict(self) -> Dict[str, Any]:
"""Returns the dictionary representation of self."""
res_dict = {"version": self.version, **self.unrecognized_fields}
if self.length is not None:
res_dict["length"] = self.length
if self.hashes is not None:
res_dict["hashes"] = self.hashes
return res_dict
class Timestamp(Signed):
"""A container for the signed part of timestamp metadata.
@ -605,15 +659,7 @@ class Timestamp(Signed):
meta: A dictionary that contains information about snapshot metadata::
{
'snapshot.json': {
'version': <SNAPSHOT METADATA VERSION NUMBER>,
'length': <SNAPSHOT METADATA FILE SIZE>, // optional
'hashes': {
'<HASH ALGO 1>': '<SNAPSHOT METADATA FILE HASH 1>',
'<HASH ALGO 2>': '<SNAPSHOT METADATA FILE HASH 2>',
...
} // optional
}
'snapshot.json': <MetaFile INSTANCE>
}
"""
@ -625,26 +671,28 @@ def __init__(
version: int,
spec_version: str,
expires: datetime,
meta: Dict[str, Any],
meta: Dict[str, MetaFile],
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
super().__init__(version, spec_version, expires, unrecognized_fields)
# TODO: Add class for meta
self.meta = meta
@classmethod
def from_dict(cls, timestamp_dict: Dict[str, Any]) -> "Timestamp":
"""Creates Timestamp object from its dict representation."""
common_args = cls._common_fields_from_dict(timestamp_dict)
meta = timestamp_dict.pop("meta")
meta_dict = timestamp_dict.pop("meta")
meta = {"snapshot.json": MetaFile.from_dict(meta_dict["snapshot.json"])}
# All fields left in the timestamp_dict are unrecognized.
return cls(*common_args, meta, timestamp_dict)
def to_dict(self) -> Dict[str, Any]:
"""Returns the dict representation of self."""
timestamp_dict = self._common_fields_to_dict()
timestamp_dict.update({"meta": self.meta})
return timestamp_dict
res_dict = self._common_fields_to_dict()
res_dict["meta"] = {
"snapshot.json": self.meta["snapshot.json"].to_dict()
}
return res_dict
# Modification.
def update(
@ -654,13 +702,7 @@ def update(
hashes: Optional[Dict[str, Any]] = None,
) -> None:
"""Assigns passed info about snapshot metadata to meta dict."""
self.meta["snapshot.json"] = {"version": version}
if length is not None:
self.meta["snapshot.json"]["length"] = length
if hashes is not None:
self.meta["snapshot.json"]["hashes"] = hashes
self.meta["snapshot.json"] = MetaFile(version, length, hashes)
class Snapshot(Signed):
@ -670,22 +712,9 @@ class Snapshot(Signed):
meta: A dictionary that contains information about targets metadata::
{
'targets.json': {
'version': <TARGETS METADATA VERSION NUMBER>,
'length': <TARGETS METADATA FILE SIZE>, // optional
'hashes': {
'<HASH ALGO 1>': '<TARGETS METADATA FILE HASH 1>',
'<HASH ALGO 2>': '<TARGETS METADATA FILE HASH 2>',
...
} // optional
},
'<DELEGATED TARGETS ROLE 1>.json': {
...
},
'<DELEGATED TARGETS ROLE 2>.json': {
...
},
...
'targets.json': <MetaFile INSTANCE>,
'<DELEGATED TARGETS ROLE 1>.json': <MetaFile INSTANCE>,
'<DELEGATED TARGETS ROLE 2>.json': <MetaFile INSTANCE>,
}
"""
@ -697,25 +726,31 @@ def __init__(
version: int,
spec_version: str,
expires: datetime,
meta: Dict[str, Any],
meta: Dict[str, MetaFile],
unrecognized_fields: Optional[Mapping[str, Any]] = None,
) -> None:
super().__init__(version, spec_version, expires, unrecognized_fields)
# TODO: Add class for meta
self.meta = meta
@classmethod
def from_dict(cls, snapshot_dict: Dict[str, Any]) -> "Snapshot":
"""Creates Snapshot object from its dict representation."""
common_args = cls._common_fields_from_dict(snapshot_dict)
meta = snapshot_dict.pop("meta")
meta_dicts = snapshot_dict.pop("meta")
meta = {}
for meta_path, meta_dict in meta_dicts.items():
meta[meta_path] = MetaFile.from_dict(meta_dict)
# All fields left in the snapshot_dict are unrecognized.
return cls(*common_args, meta, snapshot_dict)
def to_dict(self) -> Dict[str, Any]:
"""Returns the dict representation of self."""
snapshot_dict = self._common_fields_to_dict()
snapshot_dict.update({"meta": self.meta})
meta_dict = {}
for meta_path, meta_info in self.meta.items():
meta_dict[meta_path] = meta_info.to_dict()
snapshot_dict["meta"] = meta_dict
return snapshot_dict
# Modification.
@ -728,13 +763,7 @@ def update(
) -> None:
"""Assigns passed (delegated) targets role info to meta dict."""
metadata_fn = f"{rolename}.json"
self.meta[metadata_fn] = {"version": version}
if length is not None:
self.meta[metadata_fn]["length"] = length
if hashes is not None:
self.meta[metadata_fn]["hashes"] = hashes
self.meta[metadata_fn] = MetaFile(version, length, hashes)
class DelegatedRole(Role):