Use crypto-related formats from SSL in tuf.formats.py

This commit is contained in:
Vladimir Diaz 2017-01-09 16:58:40 -05:00
parent 8c913b6824
commit 33920a8184

View file

@ -16,7 +16,7 @@
<Purpose>
A central location for all format-related checking of TUF objects.
Some crypto-related formats may also be defined in tuf.ssl_commons.
Some crypto-related formats may also be defined in securesystemslib.
Note: 'formats.py' depends heavily on 'schema.py', so the 'schema.py'
module should be read and understood before tackling this module.
@ -26,7 +26,7 @@
The first section deals with schemas and object matching based on format.
There are two ways of checking the format of objects. The first method
raises a 'tuf.ssl_commons.exceptions.FormatError' exception if the match
raises a 'securesystemslib.exceptions.FormatError' exception if the match
fails and the other returns a Boolean result.
tuf.formats.<SCHEMA>.check_match(object)
@ -39,8 +39,8 @@
'keyval': {'public': 'public_key',
'private': 'private_key'}
tuf.ssl_commons.formats.RSAKEY_SCHEMA.check_match(rsa_key)
tuf.ssl_commons.formats.RSAKEY_SCHEMA.matches(rsa_key)
securesystemslib.formats.RSAKEY_SCHEMA.check_match(rsa_key)
securesystemslib.formats.RSAKEY_SCHEMA.matches(rsa_key)
In this example, if a dict key or dict value is missing or incorrect,
the match fails. There are numerous variations of object checking
@ -79,8 +79,9 @@
import time
import tuf
import tuf.ssl_crypto.formats
import tuf.ssl_commons.schema as SCHEMA
import securesystemslib.formats
import securesystemslib.schema as SCHEMA
import six
@ -94,33 +95,33 @@
# role. The dict keys hold the relative file paths, and the dict values the
# corresponding version numbers and/or file information.
FILEINFODICT_SCHEMA = SCHEMA.DictOf(
key_schema = tuf.ssl_crypto.formats.RELPATH_SCHEMA,
value_schema = SCHEMA.OneOf([tuf.ssl_crypto.formats.VERSIONINFO_SCHEMA,
tuf.ssl_crypto.formats.FILEINFO_SCHEMA]))
key_schema = securesystemslib.formats.RELPATH_SCHEMA,
value_schema = SCHEMA.OneOf([securesystemslib.formats.VERSIONINFO_SCHEMA,
securesystemslib.formats.FILEINFO_SCHEMA]))
# Role object in {'keyids': [keydids..], 'name': 'ABC', 'threshold': 1,
# 'paths':[filepaths..]} format.
ROLE_SCHEMA = SCHEMA.Object(
object_name = 'ROLE_SCHEMA',
name = SCHEMA.Optional(tuf.ssl_crypto.formats.ROLENAME_SCHEMA),
keyids = tuf.ssl_crypto.formats.KEYIDS_SCHEMA,
threshold = tuf.ssl_crypto.formats.THRESHOLD_SCHEMA,
terminating = SCHEMA.Optional(tuf.ssl_crypto.formats.BOOLEAN_SCHEMA),
paths = SCHEMA.Optional(tuf.ssl_crypto.formats.RELPATHS_SCHEMA),
path_hash_prefixes = SCHEMA.Optional(tuf.ssl_crypto.formats.PATH_HASH_PREFIXES_SCHEMA))
name = SCHEMA.Optional(securesystemslib.formats.ROLENAME_SCHEMA),
keyids = securesystemslib.formats.KEYIDS_SCHEMA,
threshold = securesystemslib.formats.THRESHOLD_SCHEMA,
terminating = SCHEMA.Optional(securesystemslib.formats.BOOLEAN_SCHEMA),
paths = SCHEMA.Optional(securesystemslib.formats.RELPATHS_SCHEMA),
path_hash_prefixes = SCHEMA.Optional(securesystemslib.formats.PATH_HASH_PREFIXES_SCHEMA))
# A dictionary of ROLEDICT, where dictionary keys can be repository names, and
# dictionary values containing information for each role available on the
# repository (corresponding to the repository belonging to named repository in
# the dictionary key)
ROLEDICTDB_SCHEMA = SCHEMA.DictOf(
key_schema = tuf.ssl_crypto.formats.NAME_SCHEMA,
value_schema = tuf.ssl_crypto.formats.ROLEDICT_SCHEMA)
key_schema = securesystemslib.formats.NAME_SCHEMA,
value_schema = securesystemslib.formats.ROLEDICT_SCHEMA)
# Command argument list, as used by the CLI tool.
# Example: {'keytype': ed25519, 'expires': 365,}
COMMAND_SCHEMA = SCHEMA.DictOf(
key_schema = tuf.ssl_crypto.formats.NAME_SCHEMA,
key_schema = securesystemslib.formats.NAME_SCHEMA,
value_schema = SCHEMA.Any())
# A dictionary holding version information.
@ -268,17 +269,61 @@
SNAPSHOT_SCHEMA = SCHEMA.Object(
object_name = 'SNAPSHOT_SCHEMA',
_type = SCHEMA.String('Snapshot'),
version = tuf.ssl_crypto.formats.METADATAVERSION_SCHEMA,
expires = tuf.ssl_crypto.formats.ISO8601_DATETIME_SCHEMA,
version = securesystemslib.formats.METADATAVERSION_SCHEMA,
expires = securesystemslib.formats.ISO8601_DATETIME_SCHEMA,
meta = FILEINFODICT_SCHEMA)
# Timestamp role: indicates the latest version of the snapshot file.
TIMESTAMP_SCHEMA = SCHEMA.Object(
object_name = 'TIMESTAMP_SCHEMA',
_type = SCHEMA.String('Timestamp'),
version = tuf.ssl_crypto.formats.METADATAVERSION_SCHEMA,
expires = tuf.ssl_crypto.formats.ISO8601_DATETIME_SCHEMA,
meta = tuf.ssl_crypto.formats.FILEDICT_SCHEMA)
version = securesystemslib.formats.METADATAVERSION_SCHEMA,
expires = securesystemslib.formats.ISO8601_DATETIME_SCHEMA,
meta = securesystemslib.formats.FILEDICT_SCHEMA)
# project.cfg file: stores information about the project in a json dictionary
PROJECT_CFG_SCHEMA = SCHEMA.Object(
object_name = 'PROJECT_CFG_SCHEMA',
project_name = SCHEMA.AnyString(),
layout_type = SCHEMA.OneOf([SCHEMA.String('repo-like'), SCHEMA.String('flat')]),
targets_location = securesystemslib.formats.PATH_SCHEMA,
metadata_location = securesystemslib.formats.PATH_SCHEMA,
prefix = securesystemslib.formats.PATH_SCHEMA,
public_keys = securesystemslib.formats.KEYDICT_SCHEMA,
threshold = SCHEMA.Integer(lo = 0, hi = 2)
)
# A schema containing information a repository mirror may require,
# such as a url, the path of the directory metadata files, etc.
MIRROR_SCHEMA = SCHEMA.Object(
object_name = 'MIRROR_SCHEMA',
url_prefix = securesystemslib.formats.URL_SCHEMA,
metadata_path = securesystemslib.formats.RELPATH_SCHEMA,
targets_path = securesystemslib.formats.RELPATH_SCHEMA,
confined_target_dirs = securesystemslib.formats.RELPATHS_SCHEMA,
custom = SCHEMA.Optional(SCHEMA.Object()))
# A dictionary of mirrors where the dict keys hold the mirror's name and
# and the dict values the mirror's data (i.e., 'MIRROR_SCHEMA').
# The repository class of 'updater.py' accepts dictionaries
# of this type provided by the TUF client.
MIRRORDICT_SCHEMA = SCHEMA.DictOf(
key_schema = SCHEMA.AnyString(),
value_schema = MIRROR_SCHEMA)
# A Mirrorlist: indicates all the live mirrors, and what documents they
# serve.
MIRRORLIST_SCHEMA = SCHEMA.Object(
object_name = 'MIRRORLIST_SCHEMA',
_type = SCHEMA.String('Mirrors'),
version = METADATAVERSION_SCHEMA,
expires = securesystemslib.formats.ISO8601_DATETIME_SCHEMA,
mirrors = SCHEMA.ListOf(MIRROR_SCHEMA))
# Any of the role schemas (e.g., TIMESTAMP_SCHEMA, SNAPSHOT_SCHEMA, etc.)
ANYROLE_SCHEMA = SCHEMA.OneOf([ROOT_SCHEMA, TARGETS_SCHEMA, SNAPSHOT_SCHEMA,
TIMESTAMP_SCHEMA, MIRROR_SCHEMA])
@ -329,7 +374,7 @@ def __init__(self, version, expires, filedict):
@staticmethod
def from_metadata(object):
# Is 'object' a Timestamp metadata file?
# Raise tuf.ssl_commons.exceptions.FormatError if not.
# Raise securesystemslib.exceptions.FormatError if not.
TIMESTAMP_SCHEMA.check_match(object)
version = object['version']
@ -347,7 +392,7 @@ def make_metadata(version, expiration_date, filedict):
result['meta'] = filedict
# Is 'result' a Timestamp metadata file?
# Raise 'tuf.ssl_commons.exceptions.FormatError' if not.
# Raise 'securesystemslib.exceptions.FormatError' if not.
TIMESTAMP_SCHEMA.check_match(result)
return result
@ -369,7 +414,7 @@ def __init__(self, version, expires, keys, roles, consistent_snapshot,
@staticmethod
def from_metadata(object):
# Is 'object' a Root metadata file?
# Raise 'tuf.ssl_commons.exceptions.FormatError' if not.
# Raise 'securesystemslib.exceptions.FormatError' if not.
tuf.ssl_crypto.formats.ROOT_SCHEMA.check_match(object)
version = object['version']
@ -395,8 +440,8 @@ def make_metadata(version, expiration_date, keydict, roledict,
result['compression_algorithms'] = compression_algorithms
# Is 'result' a Root metadata file?
# Raise 'tuf.ssl_commons.exceptions.FormatError' if not.
tuf.ssl_crypto.formats.ROOT_SCHEMA.check_match(result)
# Raise 'securesystemslib.exceptions.FormatError' if not.
ROOT_SCHEMA.check_match(result)
return result
@ -414,7 +459,7 @@ def __init__(self, version, expires, versiondict):
@staticmethod
def from_metadata(object):
# Is 'object' a Snapshot metadata file?
# Raise 'tuf.ssl_commons.exceptions.FormatError' if not.
# Raise 'securesystemslib.exceptions.FormatError' if not.
SNAPSHOT_SCHEMA.check_match(object)
version = object['version']
@ -432,7 +477,7 @@ def make_metadata(version, expiration_date, versiondict):
result['meta'] = versiondict
# Is 'result' a Snapshot metadata file?
# Raise 'tuf.ssl_commons.exceptions.FormatError' if not.
# Raise 'securesystemslib.exceptions.FormatError' if not.
SNAPSHOT_SCHEMA.check_match(result)
return result
@ -456,7 +501,7 @@ def __init__(self, version, expires, filedict=None, delegations=None):
@staticmethod
def from_metadata(object):
# Is 'object' a Targets metadata file?
# Raise tuf.ssl_commons.exceptions.FormatError if not.
# Raise securesystemslib.exceptions.FormatError if not.
tuf.ssl_crypto.formats.TARGETS_SCHEMA.check_match(object)
version = object['version']
@ -470,7 +515,7 @@ def from_metadata(object):
@staticmethod
def make_metadata(version, expiration_date, filedict=None, delegations=None):
if filedict is None and delegations is None:
raise tuf.ssl_commons.exceptions.Error('We don\'t allow completely'
raise securesystemslib.exceptions.Error('We don\'t allow completely'
' empty targets metadata.')
result = {'_type' : 'Targets'}
@ -483,7 +528,7 @@ def make_metadata(version, expiration_date, filedict=None, delegations=None):
result['delegations'] = delegations
# Is 'result' a Targets metadata file?
# Raise 'tuf.ssl_commons.exceptions.FormatError' if not.
# Raise 'securesystemslib.exceptions.FormatError' if not.
tuf.ssl_crypto.formats.TARGETS_SCHEMA.check_match(result)
return result
@ -510,11 +555,11 @@ def make_metadata():
# A dict holding the recognized schemas for the top-level roles.
SCHEMAS_BY_TYPE = {
'Root' : tuf.ssl_crypto.formats.ROOT_SCHEMA,
'Targets' : tuf.ssl_crypto.formats.TARGETS_SCHEMA,
'Root' : ROOT_SCHEMA,
'Targets' : TARGETS_SCHEMA,
'Snapshot' : SNAPSHOT_SCHEMA,
'Timestamp' : TIMESTAMP_SCHEMA,
'Mirrors' : tuf.ssl_crypto.formats.MIRRORLIST_SCHEMA}
'Mirrors' : MIRRORLIST_SCHEMA}
# A dict holding the recognized class names for the top-level roles.
# That is, the role classes listed in this module (e.g., class TargetsFile()).
@ -544,7 +589,7 @@ def datetime_to_unix_timestamp(datetime_object):
The datetime.datetime() object to convert to a Unix timestamp.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError, if 'datetime_object' is not a
securesystemslib.exceptions.FormatError, if 'datetime_object' is not a
datetime.datetime() object.
<Side Effects>
@ -555,10 +600,10 @@ def datetime_to_unix_timestamp(datetime_object):
"""
# Is 'datetime_object' a datetime.datetime() object?
# Raise 'tuf.ssl_commons.exceptions.FormatError' if not.
# Raise 'securesystemslib.exceptions.FormatError' if not.
if not isinstance(datetime_object, datetime.datetime):
message = repr(datetime_object) + ' is not a datetime.datetime() object.'
raise tuf.ssl_commons.exceptions.FormatError(message)
raise securesystemslib.exceptions.FormatError(message)
unix_timestamp = calendar.timegm(datetime_object.timetuple())
@ -582,10 +627,10 @@ def unix_timestamp_to_datetime(unix_timestamp):
<Arguments>
unix_timestamp:
An integer representing the time (e.g., 1445455680). Conformant to
'tuf.ssl_crypto.formats.UNIX_TIMESTAMP_SCHEMA'.
'securesystemslib.formats.UNIX_TIMESTAMP_SCHEMA'.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError, if 'unix_timestamp' is improperly
securesystemslib.exceptions.FormatError, if 'unix_timestamp' is improperly
formatted.
<Side Effects>
@ -596,8 +641,8 @@ def unix_timestamp_to_datetime(unix_timestamp):
"""
# Is 'unix_timestamp' properly formatted?
# Raise 'tuf.ssl_commons.exceptions.FormatError' if there is a mismatch.
tuf.ssl_crypto.formats.UNIX_TIMESTAMP_SCHEMA.check_match(unix_timestamp)
# Raise 'securesystemslib.exceptions.FormatError' if there is a mismatch.
securesystemslib.formats.UNIX_TIMESTAMP_SCHEMA.check_match(unix_timestamp)
# Convert 'unix_timestamp' to a 'time.struct_time', in UTC. The Daylight
# Savings Time (DST) flag is set to zero. datetime.fromtimestamp() is not
@ -622,7 +667,7 @@ def format_base64(data):
Binary or buffer of data to convert.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError, if the base64 encoding fails or the
securesystemslib.exceptions.FormatError, if the base64 encoding fails or the
argument is invalid.
<Side Effects>
@ -636,7 +681,7 @@ def format_base64(data):
return binascii.b2a_base64(data).decode('utf-8').rstrip('=\n ')
except (TypeError, binascii.Error) as e:
raise tuf.ssl_commons.exceptions.FormatError('Invalid base64'
raise securesystemslib.exceptions.FormatError('Invalid base64'
' encoding: ' + str(e))
@ -652,7 +697,7 @@ def parse_base64(base64_string):
A string holding a base64 value.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError, if 'base64_string' cannot be parsed
securesystemslib.exceptions.FormatError, if 'base64_string' cannot be parsed
due to an invalid base64 encoding.
<Side Effects>
@ -665,7 +710,7 @@ def parse_base64(base64_string):
if not isinstance(base64_string, six.string_types):
message = 'Invalid argument: '+repr(base64_string)
raise tuf.ssl_commons.exceptions.FormatError(message)
raise securesystemslib.exceptions.FormatError(message)
extra = len(base64_string) % 4
if extra:
@ -676,7 +721,7 @@ def parse_base64(base64_string):
return binascii.a2b_base64(base64_string.encode('utf-8'))
except (TypeError, binascii.Error) as e:
raise tuf.ssl_commons.exceptions.FormatError('Invalid base64'
raise securesystemslib.exceptions.FormatError('Invalid base64'
' encoding: ' + str(e))
@ -702,13 +747,13 @@ def make_fileinfo(length, hashes, version=None, custom=None):
An optional object providing additional information about the file.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError, if the 'FILEINFO_SCHEMA' to be
securesystemslib.exceptions.FormatError, if the 'FILEINFO_SCHEMA' to be
returned does not have the correct format.
<Side Effects>
If any of the arguments are incorrectly formatted, the dict
returned will be checked for formatting errors, and if found,
will raise a 'tuf.ssl_commons.exceptions.FormatError' exception.
will raise a 'securesystemslib.exceptions.FormatError' exception.
<Returns>
A dictionary conformant to 'FILEINFO_SCHEMA', representing the file
@ -723,8 +768,8 @@ def make_fileinfo(length, hashes, version=None, custom=None):
if custom is not None:
fileinfo['custom'] = custom
# Raise 'tuf.ssl_commons.exceptions.FormatError' if the check fails.
tuf.ssl_crypto.formats.FILEINFO_SCHEMA.check_match(fileinfo)
# Raise 'securesystemslib.exceptions.FormatError' if the check fails.
securesystemslib.formats.FILEINFO_SCHEMA.check_match(fileinfo)
return fileinfo
@ -743,7 +788,7 @@ def make_versioninfo(version_number):
in Snapshot metadata.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError, if the dict to be returned does not
securesystemslib.exceptions.FormatError, if the dict to be returned does not
have the correct format (i.e., VERSIONINFO_SCHEMA).
<Side Effects>
@ -756,7 +801,7 @@ def make_versioninfo(version_number):
versioninfo = {'version': version_number}
# Raise 'tuf.ssl_commons.exceptions.FormatError' if 'versioninfo' is
# Raise 'securesystemslib.exceptions.FormatError' if 'versioninfo' is
# improperly formatted.
try:
tuf.ssl_crypto.formats.VERSIONINFO_SCHEMA.check_match(versioninfo)
@ -800,12 +845,12 @@ def make_role_metadata(keyids, threshold, name=None, paths=None,
target files.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError, if the returned role meta is
securesystemslib.exceptions.FormatError, if the returned role meta is
formatted incorrectly.
<Side Effects>
If any of the arguments do not have a proper format, a
tuf.ssl_commons.exceptions.FormatError exception is raised when the
securesystemslib.exceptions.FormatError exception is raised when the
'ROLE_SCHEMA' dict is created.
<Returns>
@ -826,7 +871,7 @@ def make_role_metadata(keyids, threshold, name=None, paths=None,
# we must do it for ourselves.
if paths is not None and path_hash_prefixes is not None:
raise tuf.ssl_commons.exceptions.FormatError('Both "paths" and'
raise securesystemslib.exceptions.FormatError('Both "paths" and'
' "path_hash_prefixes" are specified.')
if path_hash_prefixes is not None:
@ -858,7 +903,7 @@ def get_role_class(expected_rolename):
to return.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError, if 'expected_rolename' is not a
securesystemslib.exceptions.FormatError, if 'expected_rolename' is not a
supported role.
<Side Effects>
@ -872,15 +917,15 @@ def get_role_class(expected_rolename):
# Does 'expected_rolename' have the correct type?
# This check ensures 'expected_rolename' conforms to
# 'tuf.ssl_crypto.formats.NAME_SCHEMA'.
# Raise 'tuf.ssl_commons.exceptions.FormatError' if there is a mismatch.
tuf.ssl_crypto.formats.NAME_SCHEMA.check_match(expected_rolename)
# 'securesystemslib.formats.NAME_SCHEMA'.
# Raise 'securesystemslib.exceptions.FormatError' if there is a mismatch.
securesystemslib.formats.NAME_SCHEMA.check_match(expected_rolename)
try:
role_class = ROLE_CLASSES_BY_TYPE[expected_rolename]
except KeyError:
raise tuf.ssl_commons.exceptions.FormatError(repr(expected_rolename) + ' '
raise securesystemslib.exceptions.FormatError(repr(expected_rolename) + ' '
'not supported.')
else:
@ -904,7 +949,7 @@ def expected_meta_rolename(meta_rolename):
E.g., 'root', 'targets'.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError, if 'meta_rolename' is improperly
securesystemslib.exceptions.FormatError, if 'meta_rolename' is improperly
formatted.
<Side Effects>
@ -916,9 +961,9 @@ def expected_meta_rolename(meta_rolename):
# Does 'meta_rolename' have the correct type?
# This check ensures 'meta_rolename' conforms to
# 'tuf.ssl_crypto.formats.NAME_SCHEMA'.
# Raise 'tuf.ssl_commons.exceptions.FormatError' if there is a mismatch.
tuf.ssl_crypto.formats.NAME_SCHEMA.check_match(meta_rolename)
# 'securesystemslib.formats.NAME_SCHEMA'.
# Raise 'securesystemslib.exceptions.FormatError' if there is a mismatch.
securesystemslib.formats.NAME_SCHEMA.check_match(meta_rolename)
return string.capwords(meta_rolename)
@ -928,9 +973,9 @@ def check_signable_object_format(object):
"""
<Purpose>
Ensure 'object' is properly formatted, conformant to
'tuf.ssl_crypto.formats.SIGNABLE_SCHEMA'. Return the signing role on
'SIGNABLE_SCHEMA'. Return the signing role on
success. Note: The 'signed' field of a 'SIGNABLE_SCHEMA' is checked
against tuf.ssl_commons.schema.Any(). The 'signed' field, however, should
against securesystemslib.schema.Any(). The 'signed' field, however, should
actually hold one of the supported role schemas (e.g., 'ROOT_SCHEMA',
'TARGETS_SCHEMA'). The role schemas all differ in their format, so this
function determines exactly which schema is listed in the 'signed' field.
@ -940,7 +985,7 @@ def check_signable_object_format(object):
The object compare against 'SIGNABLE.SCHEMA'.
<Exceptions>
tuf.ssl_commons.exceptions.FormatError, if 'object' does not have the
securesystemslib.exceptions.FormatError, if 'object' does not have the
correct format.
<Side Effects>
@ -953,23 +998,23 @@ def check_signable_object_format(object):
# Does 'object' have the correct type?
# This check ensures 'object' conforms to
# 'tuf.ssl_crypto.formats.SIGNABLE_SCHEMA'.
tuf.ssl_crypto.formats.SIGNABLE_SCHEMA.check_match(object)
# 'SIGNABLE_SCHEMA'.
SIGNABLE_SCHEMA.check_match(object)
try:
role_type = object['signed']['_type']
except (KeyError, TypeError):
raise tuf.ssl_commons.exceptions.FormatError('Untyped object')
raise securesystemslib.exceptions.FormatError('Untyped object')
try:
schema = SCHEMAS_BY_TYPE[role_type]
except KeyError:
raise tuf.ssl_commons.exceptions.FormatError('Unrecognized'
raise securesystemslib.exceptions.FormatError('Unrecognized'
' type ' + repr(role_type))
# 'tuf.ssl_commons.exceptions.FormatError' raised if 'object' does not have a
# 'securesystemslib.exceptions.FormatError' raised if 'object' does not have a
# properly formatted role schema.
schema.check_match(object['signed'])