mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
This commit performs restructuring on the recently added metadata class model architecture, which shall be part of a new simple TUF API. The key change is that the Metadata class is now used as container for inner TUF metadata (Root, Timestamp, Snapshot, Targets) instead of serving as base class for these, that means we use 'composition' instead of 'inheritance'. Still, in order to aggregate common attributes of the inner Metadata (expires, version, spec_version), we use a new baseclass 'Signed', which also corresponds to the signed field of the outer metadata container. Based on prior observations in TUF's sister project in-toto, this architecture seems to more closely represent the metadata model as it is defined in the specification (see in-toto/in-toto#98 and in-toto/in-toto#142 for related discussions). Note that the proposed changes require us to now access some attributes/methods via the signed attribute of a Metadata object and not directly on the Metadata object, but it would be possible to add short-cuts. (see todo notes in doc header). Further changes include: - Add minimal doc header with TODO notes - Make attributes that correspond to fields in TUF JSON metadata public again. There doesn't seem to be a good reason to protect them with leading underscores and use setters/getters instead, it just adds more code. - Generally try to reduce code. - Remove keyring and consistent_snapshot attributes from metadata class. As discussed in #1060 they are a better fit for extra management code (also see #660) - Remove sslib schema checks (see TODO notes about validation in doc header) - Drop usage of build_dict_conforming_to_schema, it seems a lot simpler and more explicit to just code this here. - ... same goes for make_metadata_fileinfo - Adapt tests accordingly TODO: Document!!! Signed-off-by: Lukas Puehringer <lukas.puehringer@nyu.edu>
288 lines
9.8 KiB
Python
288 lines
9.8 KiB
Python
"""TUF role metadata model.
|
|
|
|
This module provides container classes for TUF role metadata, including methods
|
|
to read/serialize/write from and to JSON, perform TUF-compliant metadata
|
|
updates, and create and verify signatures.
|
|
|
|
TODO:
|
|
|
|
* Add docstrings
|
|
|
|
* Finalize/Document Verify/Sign functions (I am not fully sure about expected
|
|
behavior)
|
|
|
|
* Validation (some thoughts ...)
|
|
- Avoid schema, see secure-systems-lab/securesystemslib#183
|
|
- Provide methods to validate JSON representation (at user boundary)
|
|
- Fail on bad json metadata in read_from_json method
|
|
- Be lenient on bad/invalid metadata objects in memory, they might be
|
|
work in progress. E.g. it might be convenient to create empty metadata
|
|
and assign attributes later on.
|
|
- Fail on bad json metadata in write_to_json method, but with option to
|
|
disable check as there might be a justified reason to write WIP
|
|
metadata to json.
|
|
|
|
* It might be nice to have short-cuts on the Metadata class to methods and
|
|
attributes of the contained Signed object. If we do this, we should only do
|
|
it on common methods/attributes (e.g. version, bump_version, expires,
|
|
bump_expiration)
|
|
|
|
* Similarly, it might be nice to have a generic Metadata.read_from_json that
|
|
can load any TUF role metadata and instantiate the appropriate object based
|
|
on the json '_type' field.
|
|
|
|
|
|
"""
|
|
# Imports.
|
|
|
|
# 1st-party.
|
|
from tuf.api.keys import KeyRing
|
|
|
|
# 2nd-party.
|
|
from abc import ABC, abstractmethod
|
|
from datetime import datetime, timedelta
|
|
from typing import Any, Dict, List, Optional
|
|
|
|
import json
|
|
import tempfile
|
|
|
|
# 3rd-party.
|
|
|
|
from securesystemslib.formats import encode_canonical
|
|
from securesystemslib.util import load_json_file, persist_temp_file
|
|
from securesystemslib.storage import StorageBackendInterface
|
|
from tuf.repository_lib import (
|
|
_get_written_metadata,
|
|
_strip_version_number,
|
|
generate_snapshot_metadata,
|
|
generate_targets_metadata,
|
|
generate_timestamp_metadata,
|
|
)
|
|
|
|
import iso8601
|
|
import tuf.formats
|
|
|
|
# Types.
|
|
|
|
JsonDict = Dict[str, Any]
|
|
|
|
# Classes.
|
|
|
|
|
|
class Metadata(ABC):
|
|
def __init__(
|
|
self, signed: 'Signed' = None, signatures: list = None) -> None:
|
|
# TODO: How much init magic do we want?
|
|
self.signed = signed
|
|
self.signatures = signatures
|
|
|
|
def as_dict(self) -> JsonDict:
|
|
return {
|
|
'signatures': self.signatures,
|
|
'signed': self.signed.as_dict()
|
|
}
|
|
|
|
def __update_signature(self, signatures, keyid, signature):
|
|
updated = False
|
|
keyid_signature = {'keyid':keyid, 'sig':signature}
|
|
for idx, keyid_sig in enumerate(signatures):
|
|
if keyid_sig['keyid'] == keyid:
|
|
signatures[idx] = keyid_signature
|
|
updated = True
|
|
if not updated:
|
|
signatures.append(keyid_signature)
|
|
|
|
def sign(self, key_ring: KeyRing) -> JsonDict:
|
|
# FIXME: Needs documentation of expected behavior
|
|
signed_bytes = self.signed_bytes
|
|
signatures = self.__signatures
|
|
|
|
for key in key_ring.keys:
|
|
signature = key.sign(self.signed_bytes)
|
|
self.__update_signature(signatures, key.keyid, signature)
|
|
|
|
self.__signatures = signatures
|
|
return self.signable
|
|
|
|
def verify(self, key_ring: KeyRing) -> bool:
|
|
# FIXME: Needs documentation of expected behavior
|
|
signed_bytes = self.signed.signed_bytes
|
|
signatures = self.signatures
|
|
verified_keyids = set()
|
|
|
|
for signature in signatures:
|
|
# TODO: handle an empty keyring
|
|
for key in key_ring.keys:
|
|
keyid = key.keyid
|
|
if keyid == signature['keyid']:
|
|
try:
|
|
verified = key.verify(signed_bytes, signature)
|
|
except:
|
|
logging.exception(f'Could not verify signature for key {keyid}')
|
|
continue
|
|
else:
|
|
# Avoid https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2020-6174
|
|
verified_keyids.add(keyid)
|
|
|
|
break
|
|
|
|
return len(verified_keyids) >= key_ring.threshold.least
|
|
|
|
def write_to_json(
|
|
self, filename: str,
|
|
storage_backend: StorageBackendInterface = None) -> None:
|
|
with tempfile.TemporaryFile() as f:
|
|
f.write(_get_written_metadata(self.sign()).encode_canonical())
|
|
persist_temp_file(f, filename, storage_backend)
|
|
|
|
|
|
class Signed:
|
|
# NOTE: Signed is a stupid name, because this might not be signed yet, but
|
|
# we keep it to match spec terminology (I often refer to this as "payload",
|
|
# or "inner metadata")
|
|
|
|
# TODO: Re-think default values. It might be better to pass some things
|
|
# as args and not es kwargs. Then we'd need to pop those from
|
|
# signable["signed"] in read_from_json and pass them explicitly, which
|
|
# some say is better than implicit. :)
|
|
def __init__(
|
|
self, _type: str = None, version: int = 0,
|
|
spec_version: str = None, expires: datetime = None
|
|
) -> None:
|
|
# TODO: How much init magic do we want?
|
|
|
|
self._type = _type
|
|
self.spec_version = spec_version
|
|
|
|
# We always intend times to be UTC
|
|
# NOTE: we could do this with datetime.fromisoformat() but that is not
|
|
# available in Python 2.7's datetime
|
|
# NOTE: Store as datetime object for convenient handling, use 'expires'
|
|
# property to get the TUF metadata format representation
|
|
self.__expiration = iso8601.parse_date(expires).replace(tzinfo=None)
|
|
|
|
if version < 0:
|
|
raise ValueError(f'version must be < 0, got {version}')
|
|
self.version = version
|
|
|
|
@property
|
|
def signed_bytes(self) -> bytes:
|
|
return encode_canonical(self.as_dict()).encode('UTF-8')
|
|
|
|
@property
|
|
def expires(self) -> str:
|
|
"""The expiration property in TUF metadata format."""
|
|
return self.__expiration.isoformat() + 'Z'
|
|
|
|
def bump_expiration(self, delta: timedelta = timedelta(days=1)) -> None:
|
|
self.__expiration = self.__expiration + delta
|
|
|
|
def bump_version(self) -> None:
|
|
self.version += 1
|
|
|
|
def as_dict(self) -> JsonDict:
|
|
# NOTE: The classes should be the single source of truth about metadata
|
|
# let's define the dict representation here and not in some dubious
|
|
# build_dict_conforming_to_schema
|
|
return {
|
|
'_type': self._type,
|
|
'version': self.version,
|
|
'spec_version': self.spec_version,
|
|
'expires': self.expires
|
|
}
|
|
|
|
@classmethod
|
|
def read_from_json(
|
|
cls, filename: str,
|
|
storage_backend: Optional[StorageBackendInterface] = None
|
|
) -> Metadata:
|
|
signable = load_json_file(filename, storage_backend)
|
|
|
|
# FIXME: It feels dirty to access signable["signed"]["version"] here in
|
|
# order to do this check, and also a bit random (there are likely other
|
|
# things to check), but later we don't have the filename anymore. If we
|
|
# want to stick to the check, which seems reasonable, we should maybe
|
|
# think of a better place.
|
|
_, fn_prefix = _strip_version_number(filename, True)
|
|
if fn_prefix and fn_prefix != signable['signed']['version']:
|
|
raise ValueError(
|
|
f'version filename prefix ({fn_prefix}) must align with '
|
|
f'version in metadata ({signable["signed"]["version"]}).')
|
|
|
|
return Metadata(
|
|
signed=cls(**signable['signed']),
|
|
signatures=signable['signatures'])
|
|
|
|
|
|
class Timestamp(Signed):
|
|
def __init__(self, meta: JsonDict = None, **kwargs) -> None:
|
|
super().__init__(**kwargs)
|
|
# TODO: How much init magic do we want?
|
|
# TODO: Is there merit in creating classes for dict fields?
|
|
self.meta = meta
|
|
|
|
def as_dict(self) -> JsonDict:
|
|
json_dict = super().as_dict()
|
|
json_dict.update({
|
|
'meta': self.meta
|
|
})
|
|
return json_dict
|
|
|
|
# Update metadata about the snapshot metadata.
|
|
def update(self, version: int, length: int, hashes: JsonDict) -> None:
|
|
fileinfo = self.meta.get('snapshot.json', {})
|
|
fileinfo['version'] = version
|
|
fileinfo['length'] = length
|
|
fileinfo['hashes'] = hashes
|
|
self.meta['snapshot.json'] = fileinfo
|
|
|
|
|
|
class Snapshot(Signed):
|
|
def __init__(self, meta: JsonDict = None, **kwargs) -> None:
|
|
# TODO: How much init magic do we want?
|
|
# TODO: Is there merit in creating classes for dict fields?
|
|
super().__init__(**kwargs)
|
|
self.meta = meta
|
|
|
|
def as_dict(self) -> JsonDict:
|
|
json_dict = super().as_dict()
|
|
json_dict.update({
|
|
'meta': self.meta
|
|
})
|
|
return json_dict
|
|
|
|
# Add or update metadata about the targets metadata.
|
|
def update(
|
|
self, rolename: str, version: int, length: Optional[int] = None,
|
|
hashes: Optional[JsonDict] = None) -> None:
|
|
metadata_fn = f'{rolename}.json'
|
|
|
|
self.meta[metadata_fn] = {'version': version}
|
|
if length is not None:
|
|
self.meta[metadata_fn]['length'] = length
|
|
|
|
if hashes is not None:
|
|
self.meta[metadata_fn]['hashes'] = hashes
|
|
|
|
|
|
class Targets(Signed):
|
|
def __init__(
|
|
self, targets: JsonDict = None, delegations: JsonDict = None,
|
|
**kwargs) -> None:
|
|
# TODO: How much init magic do we want?
|
|
# TODO: Is there merit in creating classes for dict fields?
|
|
super().__init__(**kwargs)
|
|
self.targets = targets
|
|
self.delegations = delegations
|
|
|
|
def as_dict(self) -> JsonDict:
|
|
json_dict = super().as_dict()
|
|
json_dict.update({
|
|
'targets': self.targets,
|
|
'delegations': self.delegations,
|
|
})
|
|
return json_dict
|
|
|
|
# Add or update metadata about the target.
|
|
def update(self, filename: str, fileinfo: JsonDict) -> None:
|
|
self.targets[filename] = fileinfo
|