python-tuf/tuf/repository_lib.py
Vladimir Diaz 4ee3537d6d
Prompt for password if one isn't given and the key file is encrypted
Signed-off-by: Vladimir Diaz <vladimir.v.diaz@gmail.com>
2018-06-21 14:58:36 -04:00

2139 lines
77 KiB
Python
Executable file

#!/usr/bin/env python
# Copyright 2014 - 2017, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""
<Program Name>
repository_lib.py
<Author>
Vladimir Diaz <vladimir.v.diaz@gmail.com>
<Started>
June 1, 2014.
<Copyright>
See LICENSE-MIT OR LICENSE for licensing information.
<Purpose>
Provide a library for the repository tool that can create a TUF repository.
The repository tool can be used with the Python interpreter in interactive
mode, or imported directly into a Python module. See 'tuf/README' for the
complete guide to using 'tuf.repository_tool.py'.
"""
# Help with Python 3 compatibility, where the print statement is a function, an
# implicit relative import is invalid, and the '/' operator performs true
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import os
import errno
import time
import logging
import shutil
import json
import platform
import tuf
import tuf.formats
import tuf.exceptions
import tuf.keydb
import tuf.roledb
import tuf.sig
import tuf.log
import tuf.settings
import securesystemslib
import securesystemslib.interface
import iso8601
import six
# See 'log.py' to learn how logging is handled in TUF.
logger = logging.getLogger('tuf.repository_lib')
# Disable 'iso8601' logger messages to prevent 'iso8601' from clogging the
# log file.
iso8601_logger = logging.getLogger('iso8601')
iso8601_logger.disabled = True
# Recommended RSA key sizes:
# http://www.emc.com/emc-plus/rsa-labs/historical/twirl-and-rsa-key-size.htm#table1
# According to the document above, revised May 6, 2003, RSA keys of
# size 3072 provide security through 2031 and beyond. 2048-bit keys
# are the recommended minimum and are good from the present through 2030.
DEFAULT_RSA_KEY_BITS = 3072
# The extension of TUF metadata.
METADATA_EXTENSION = '.json'
# The targets and metadata directory names. Metadata files are written
# to the staged metadata directory instead of the "live" one.
METADATA_STAGED_DIRECTORY_NAME = 'metadata.staged'
METADATA_DIRECTORY_NAME = 'metadata'
TARGETS_DIRECTORY_NAME = 'targets'
# The metadata filenames of the top-level roles.
ROOT_FILENAME = 'root' + METADATA_EXTENSION
TARGETS_FILENAME = 'targets' + METADATA_EXTENSION
SNAPSHOT_FILENAME = 'snapshot' + METADATA_EXTENSION
TIMESTAMP_FILENAME = 'timestamp' + METADATA_EXTENSION
# Log warning when metadata expires in n days, or less.
# root = 1 month, snapshot = 1 day, targets = 10 days, timestamp = 1 day.
ROOT_EXPIRES_WARN_SECONDS = 2630000
SNAPSHOT_EXPIRES_WARN_SECONDS = 86400
TARGETS_EXPIRES_WARN_SECONDS = 864000
TIMESTAMP_EXPIRES_WARN_SECONDS = 86400
# Supported key types.
SUPPORTED_KEY_TYPES = ['rsa', 'ed25519', 'ecdsa-sha2-nistp256']
def _generate_and_write_metadata(rolename, metadata_filename,
targets_directory, metadata_directory, consistent_snapshot=False,
filenames=None, allow_partially_signed=False, increment_version_number=True,
repository_name='default'):
"""
Non-public function that can generate and write the metadata for the
specified 'rolename'. It also increments the version number of 'rolename' if
the 'increment_version_number' argument is True.
"""
metadata = None
# Retrieve the roleinfo of 'rolename' to extract the needed metadata
# attributes, such as version number, expiration, etc.
roleinfo = tuf.roledb.get_roleinfo(rolename, repository_name)
previous_keyids = roleinfo.get('previous_keyids', [])
previous_threshold = roleinfo.get('previous_threshold', 1)
signing_keyids = list(set(roleinfo['signing_keyids']))
# Generate the appropriate role metadata for 'rolename'.
if rolename == 'root':
metadata = generate_root_metadata(roleinfo['version'], roleinfo['expires'],
consistent_snapshot, repository_name)
_log_warning_if_expires_soon(ROOT_FILENAME, roleinfo['expires'],
ROOT_EXPIRES_WARN_SECONDS)
elif rolename == 'snapshot':
root_filename = ROOT_FILENAME[:-len(METADATA_EXTENSION)]
targets_filename = TARGETS_FILENAME[:-len(METADATA_EXTENSION)]
metadata = generate_snapshot_metadata(metadata_directory,
roleinfo['version'], roleinfo['expires'], root_filename,
targets_filename, consistent_snapshot, repository_name)
_log_warning_if_expires_soon(SNAPSHOT_FILENAME, roleinfo['expires'],
SNAPSHOT_EXPIRES_WARN_SECONDS)
elif rolename == 'timestamp':
snapshot_filename = filenames['snapshot']
metadata = generate_timestamp_metadata(snapshot_filename, roleinfo['version'],
roleinfo['expires'], repository_name)
_log_warning_if_expires_soon(TIMESTAMP_FILENAME, roleinfo['expires'],
TIMESTAMP_EXPIRES_WARN_SECONDS)
# All other roles are either the top-level 'targets' role, or
# a delegated role.
else:
# Only print a warning if the top-level 'targets' role expires soon.
if rolename == 'targets':
_log_warning_if_expires_soon(TARGETS_FILENAME, roleinfo['expires'],
TARGETS_EXPIRES_WARN_SECONDS)
metadata = generate_targets_metadata(targets_directory, roleinfo['paths'],
roleinfo['version'], roleinfo['expires'], roleinfo['delegations'],
consistent_snapshot)
# Before writing 'rolename' to disk, automatically increment its version
# number (if 'increment_version_number' is True) so that the caller does not
# have to manually perform this action. The version number should be
# incremented in both the metadata file and roledb (required so that Snapshot
# references the latest version).
# Store the 'current_version' in case the version number must be restored
# (e.g., if 'rolename' cannot be written to disk because its metadata is not
# properly signed).
current_version = metadata['version']
if increment_version_number:
roleinfo = tuf.roledb.get_roleinfo(rolename, repository_name)
metadata['version'] = metadata['version'] + 1
roleinfo['version'] = roleinfo['version'] + 1
tuf.roledb.update_roleinfo(rolename, roleinfo,
repository_name=repository_name)
else:
logger.debug('Not incrementing ' + repr(rolename) + '\'s version number.')
if rolename in ['root', 'targets', 'snapshot', 'timestamp'] and not allow_partially_signed:
# Verify that the top-level 'rolename' is fully signed. Only a delegated
# role should not be written to disk without full verification of its
# signature(s), since it can only be considered fully signed depending on
# the delegating role.
signable = sign_metadata(metadata, signing_keyids, metadata_filename,
repository_name)
def should_write():
# Root must be signed by its previous keys and threshold.
if rolename == 'root' and len(previous_keyids) > 0:
if not tuf.sig.verify(signable, rolename, repository_name,
previous_threshold, previous_keyids):
return False
else:
logger.debug('Root is signed by a threshold of its previous keyids.')
# In the normal case, we should write metadata if the threshold is met.
return tuf.sig.verify(signable, rolename, repository_name,
roleinfo['threshold'], roleinfo['signing_keyids'])
if should_write():
_remove_invalid_and_duplicate_signatures(signable, repository_name)
# Root should always be written as if consistent_snapshot is True (i.e.,
# write <version>.root.json and root.json to disk).
if rolename == 'root':
consistent_snapshot = True
filename = write_metadata_file(signable, metadata_filename,
metadata['version'], consistent_snapshot)
# 'signable' contains an invalid threshold of signatures.
else:
# Since new metadata cannot be successfully written, restore the current
# version number.
roleinfo = tuf.roledb.get_roleinfo(rolename, repository_name)
roleinfo['version'] = current_version
tuf.roledb.update_roleinfo(rolename, roleinfo,
repository_name=repository_name)
# Note that 'signable' is an argument to tuf.UnsignedMetadataError().
raise tuf.exceptions.UnsignedMetadataError('Not enough'
' signatures for ' + repr(metadata_filename), signable)
# 'rolename' is a delegated role or a top-level role that is partially
# signed, and thus its signatures should not be verified.
else:
signable = sign_metadata(metadata, signing_keyids, metadata_filename,
repository_name)
_remove_invalid_and_duplicate_signatures(signable, repository_name)
# Root should always be written as if consistent_snapshot is True (i.e.,
# <version>.root.json and root.json).
if rolename == 'root':
filename = write_metadata_file(signable, metadata_filename,
metadata['version'], consistent_snapshot=True)
else:
filename = write_metadata_file(signable, metadata_filename,
metadata['version'], consistent_snapshot)
return signable, filename
def _metadata_is_partially_loaded(rolename, signable, repository_name):
"""
Non-public function that determines whether 'rolename' is loaded with
at least zero good signatures, but an insufficient threshold (which means
'rolename' was written to disk with repository.write_partial()). A repository
maintainer may write partial metadata without including a valid signature.
Howerver, the final repository.write() must include a threshold number of
signatures.
If 'rolename' is found to be partially loaded, mark it as partially loaded in
its 'tuf.roledb' roleinfo. This function exists to assist in deciding whether
a role's version number should be incremented when write() or write_parital()
is called. Return True if 'rolename' was partially loaded, False otherwise.
"""
# The signature status lists the number of good signatures, including
# bad, untrusted, unknown, etc.
status = tuf.sig.get_signature_status(signable, rolename, repository_name)
if len(status['good_sigs']) < status['threshold'] and \
len(status['good_sigs']) >= 0:
return True
else:
return False
def _check_directory(directory):
"""
<Purpose>
Non-public function that ensures 'directory' is valid and it exists. This
is not a security check, but a way for the caller to determine the cause of
an invalid directory provided by the user. If the directory argument is
valid, it is returned normalized and as an absolute path.
<Arguments>
directory:
The directory to check.
<Exceptions>
securesystemslib.exceptions.Error, if 'directory' could not be validated.
securesystemslib.exceptions.FormatError, if 'directory' is not properly
formatted.
<Side Effects>
None.
<Returns>
The normalized absolutized path of 'directory'.
"""
# Does 'directory' have the correct format?
# Raise 'securesystemslib.exceptions.FormatError' if there is a mismatch.
securesystemslib.formats.PATH_SCHEMA.check_match(directory)
# Check if the directory exists.
if not os.path.isdir(directory):
raise securesystemslib.exceptions.Error(repr(directory) + ' directory does not exist.')
directory = os.path.abspath(directory)
return directory
def _check_role_keys(rolename, repository_name):
"""
Non-public function that verifies the public and signing keys of 'rolename'.
If either contain an invalid threshold of keys, raise an exception.
"""
# Extract the total number of public and private keys of 'rolename' from its
# roleinfo in 'tuf.roledb'.
roleinfo = tuf.roledb.get_roleinfo(rolename, repository_name)
total_keyids = len(roleinfo['keyids'])
threshold = roleinfo['threshold']
total_signatures = len(roleinfo['signatures'])
total_signing_keys = len(roleinfo['signing_keyids'])
# Raise an exception for an invalid threshold of public keys.
if total_keyids < threshold:
raise securesystemslib.exceptions.InsufficientKeysError(repr(rolename) + ' role contains'
' ' + repr(total_keyids) + ' / ' + repr(threshold) + ' public keys.')
# Raise an exception for an invalid threshold of signing keys.
if total_signatures == 0 and total_signing_keys < threshold:
raise securesystemslib.exceptions.InsufficientKeysError(repr(rolename) + ' role contains'
' ' + repr(total_signing_keys) + ' / ' + repr(threshold) + ' signing keys.')
def _remove_invalid_and_duplicate_signatures(signable, repository_name):
"""
Non-public function that removes invalid or duplicate signatures from
'signable'. 'signable' may contain signatures (invalid) from previous
versions of the metadata that were loaded with load_repository(). Invalid,
or duplicate signatures, are removed from 'signable'.
"""
# Store the keyids of valid signatures. 'signature_keyids' is checked for
# duplicates rather than comparing signature objects because PSS may generate
# duplicate valid signatures for the same data, yet contain different
# signatures.
signature_keyids = []
for signature in signable['signatures']:
signed = signable['signed']
keyid = signature['keyid']
key = None
# Remove 'signature' from 'signable' if the listed keyid does not exist
# in 'tuf.keydb'.
try:
key = tuf.keydb.get_key(keyid, repository_name=repository_name)
except securesystemslib.exceptions.UnknownKeyError:
signable['signatures'].remove(signature)
continue
# Remove 'signature' from 'signable' if it is an invalid signature.
if not securesystemslib.keys.verify_signature(key, signature, signed):
logger.debug('Removing invalid signature for ' + repr(keyid))
signable['signatures'].remove(signature)
# Although valid, it may still need removal if it is a duplicate. Check
# the keyid, rather than the signature, to remove duplicate PSS signatures.
# PSS may generate multiple different signatures for the same keyid.
else:
if keyid in signature_keyids:
signable['signatures'].remove(signature)
# 'keyid' is valid and not a duplicate, so add it to 'signature_keyids'.
else:
signature_keyids.append(keyid)
def _delete_obsolete_metadata(metadata_directory, snapshot_metadata,
consistent_snapshot, repository_name):
"""
Non-public function that deletes metadata files marked as removed by
'repository_tool.py'. Revoked metadata files are not actually deleted until
this function is called. Obsolete metadata should *not* be retained in
"metadata.staged", otherwise they may be re-loaded by 'load_repository()'.
Note: Obsolete metadata may not always be easily detected (by inspecting
top-level metadata during loading) due to partial metadata and top-level
metadata that have not been written yet.
"""
# Walk the repository's metadata sub-directory, which is where all metadata
# is stored (including delegated roles). The 'django.json' role (e.g.,
# delegated by Targets) would be located in the
# '{repository_directory}/metadata/' directory.
if os.path.exists(metadata_directory) and os.path.isdir(metadata_directory):
for directory_path, junk, files in os.walk(metadata_directory):
# 'files' here is a list of target file names.
for basename in files:
# If we encounter 'root.json', skip it. We don't ever delete root.json
# files, since they should it always exist.
if basename.endswith('root.json'):
continue
metadata_path = os.path.join(directory_path, basename)
# Strip the metadata dirname and the leading path separator.
# '{repository_directory}/metadata/django.json' -->
# 'django.json'
metadata_name = \
metadata_path[len(metadata_directory):].lstrip(os.path.sep)
# Strip the version number if 'consistent_snapshot' is True. Example:
# '10.django.json' --> 'django.json'. Consistent and non-consistent
# metadata might co-exist if write() and
# write(consistent_snapshot=True) are mixed, so ensure only
# '<version_number>.filename' metadata is stripped.
# Should we check if 'consistent_snapshot' is True? It might have been
# set previously, but 'consistent_snapshot' can potentially be False
# now. We'll proceed with the understanding that 'metadata_name' can
# have a prepended version number even though the repository is now
# a non-consistent one.
if metadata_name not in snapshot_metadata['meta']:
metadata_name, junk = _strip_version_number(metadata_name,
consistent_snapshot)
else:
logger.debug(repr(metadata_name) + ' found in the snapshot role.')
# Strip metadata extension from filename. The role database does not
# include the metadata extension.
if metadata_name.endswith(METADATA_EXTENSION):
metadata_name = metadata_name[:-len(METADATA_EXTENSION)]
else:
logger.debug(repr(metadata_name) + ' does not match'
' supported extension ' + repr(METADATA_EXTENSION))
if metadata_name in ['root', 'targets', 'snapshot', 'timestamp']:
return
# Delete the metadata file if it does not exist in 'tuf.roledb'.
# 'repository_tool.py' might have removed 'metadata_name,'
# but its metadata file is not actually deleted yet. Do it now.
if not tuf.roledb.role_exists(metadata_name, repository_name):
logger.info('Removing outdated metadata: ' + repr(metadata_path))
os.remove(metadata_path)
else:
logger.debug('Not removing metadata: ' + repr(metadata_path))
# TODO: Should we delete outdated consistent snapshots, or does it make
# more sense for integrators to remove outdated consistent snapshots?
else:
logger.debug('Metadata directory does not exist: ' + repr(metadata_directory))
def _get_written_metadata(metadata_signable):
"""
Non-public function that returns the actual content of written metadata.
"""
# Explicitly specify the JSON separators for Python 2 + 3 consistency.
written_metadata_content = json.dumps(metadata_signable, indent=1,
separators=(',', ': '), sort_keys=True).encode('utf-8')
return written_metadata_content
def _strip_version_number(metadata_filename, consistent_snapshot):
"""
Strip from 'metadata_filename' any version number (in the
expected '{dirname}/<version_number>.rolename.<ext>' format) that
it may contain, and return the stripped filename and version number,
as a tuple. 'consistent_snapshot' is a boolean indicating if a version
number is prepended to 'metadata_filename'.
"""
# Strip the version number if 'consistent_snapshot' is True.
# Example: '10.django.json' --> 'django.json'
if consistent_snapshot:
dirname, basename = os.path.split(metadata_filename)
version_number, basename = basename.split('.', 1)
stripped_metadata_filename = os.path.join(dirname, basename)
if not version_number.isdigit():
return metadata_filename, ''
else:
return stripped_metadata_filename, version_number
else:
return metadata_filename, ''
def _load_top_level_metadata(repository, top_level_filenames, repository_name):
"""
Load the metadata of the Root, Timestamp, Targets, and Snapshot roles. At a
minimum, the Root role must exist and load successfully.
"""
root_filename = top_level_filenames[ROOT_FILENAME]
targets_filename = top_level_filenames[TARGETS_FILENAME]
snapshot_filename = top_level_filenames[SNAPSHOT_FILENAME]
timestamp_filename = top_level_filenames[TIMESTAMP_FILENAME]
root_metadata = None
targets_metadata = None
snapshot_metadata = None
timestamp_metadata = None
# Load 'root.json'. A Root role file without a version number is always
# written.
if os.path.exists(root_filename):
# Initialize the key and role metadata of the top-level roles.
signable = securesystemslib.util.load_json_file(root_filename)
tuf.formats.check_signable_object_format(signable)
root_metadata = signable['signed']
tuf.keydb.create_keydb_from_root_metadata(root_metadata, repository_name)
tuf.roledb.create_roledb_from_root_metadata(root_metadata, repository_name)
# Load Root's roleinfo and update 'tuf.roledb'.
roleinfo = tuf.roledb.get_roleinfo('root', repository_name)
roleinfo['consistent_snapshot'] = root_metadata['consistent_snapshot']
roleinfo['signatures'] = []
for signature in signable['signatures']:
if signature not in roleinfo['signatures']:
roleinfo['signatures'].append(signature)
else:
logger.debug('Found a Root signature that is already loaded:'
' ' + repr(signature))
# By default, roleinfo['partial_loaded'] of top-level roles should be set
# to False in 'create_roledb_from_root_metadata()'. Update this field, if
# necessary, now that we have its signable object.
if _metadata_is_partially_loaded('root', signable, repository_name):
roleinfo['partial_loaded'] = True
else:
logger.debug('Root was not partially loaded.')
_log_warning_if_expires_soon(ROOT_FILENAME, roleinfo['expires'],
ROOT_EXPIRES_WARN_SECONDS)
tuf.roledb.update_roleinfo('root', roleinfo, mark_role_as_dirty=False,
repository_name=repository_name)
# Ensure the 'consistent_snapshot' field is extracted.
consistent_snapshot = root_metadata['consistent_snapshot']
else:
raise securesystemslib.exceptions.RepositoryError('Cannot load the required'
' root file: ' + repr(root_filename))
# Load 'timestamp.json'. A Timestamp role file without a version number is
# always written.
if os.path.exists(timestamp_filename):
signable = securesystemslib.util.load_json_file(timestamp_filename)
timestamp_metadata = signable['signed']
for signature in signable['signatures']:
repository.timestamp.add_signature(signature, mark_role_as_dirty=False)
# Load Timestamp's roleinfo and update 'tuf.roledb'.
roleinfo = tuf.roledb.get_roleinfo('timestamp', repository_name)
roleinfo['expires'] = timestamp_metadata['expires']
roleinfo['version'] = timestamp_metadata['version']
if _metadata_is_partially_loaded('timestamp', signable, repository_name):
roleinfo['partial_loaded'] = True
else:
logger.debug('The Timestamp role was not partially loaded.')
_log_warning_if_expires_soon(TIMESTAMP_FILENAME, roleinfo['expires'],
TIMESTAMP_EXPIRES_WARN_SECONDS)
tuf.roledb.update_roleinfo('timestamp', roleinfo, mark_role_as_dirty=False,
repository_name=repository_name)
else:
logger.debug('Cannot load the Timestamp file: ' + repr(timestamp_filename))
# Load 'snapshot.json'. A consistent snapshot.json must be calculated if
# 'consistent_snapshot' is True.
# The Snapshot and Root roles are both accessed by their hashes.
if consistent_snapshot:
snapshot_version = timestamp_metadata['meta'][SNAPSHOT_FILENAME]['version']
dirname, basename = os.path.split(snapshot_filename)
basename = basename.split(METADATA_EXTENSION, 1)[0]
snapshot_filename = os.path.join(dirname,
str(snapshot_version) + '.' + basename + METADATA_EXTENSION)
if os.path.exists(snapshot_filename):
signable = securesystemslib.util.load_json_file(snapshot_filename)
tuf.formats.check_signable_object_format(signable)
snapshot_metadata = signable['signed']
for signature in signable['signatures']:
repository.snapshot.add_signature(signature, mark_role_as_dirty=False)
# Load Snapshot's roleinfo and update 'tuf.roledb'.
roleinfo = tuf.roledb.get_roleinfo('snapshot', repository_name)
roleinfo['expires'] = snapshot_metadata['expires']
roleinfo['version'] = snapshot_metadata['version']
if _metadata_is_partially_loaded('snapshot', signable, repository_name):
roleinfo['partial_loaded'] = True
else:
logger.debug('Snapshot was not partially loaded.')
_log_warning_if_expires_soon(SNAPSHOT_FILENAME, roleinfo['expires'],
SNAPSHOT_EXPIRES_WARN_SECONDS)
tuf.roledb.update_roleinfo('snapshot', roleinfo, mark_role_as_dirty=False,
repository_name=repository_name)
else:
logger.debug('The Snapshot file cannot be loaded: ' + repr(snapshot_filename))
# Load 'targets.json'. A consistent snapshot of the Targets role must be
# calculated if 'consistent_snapshot' is True.
if consistent_snapshot:
targets_version = snapshot_metadata['meta'][TARGETS_FILENAME]['version']
dirname, basename = os.path.split(targets_filename)
targets_filename = os.path.join(dirname, str(targets_version) + '.' + basename)
if os.path.exists(targets_filename):
signable = securesystemslib.util.load_json_file(targets_filename)
tuf.formats.check_signable_object_format(signable)
targets_metadata = signable['signed']
for signature in signable['signatures']:
repository.targets.add_signature(signature, mark_role_as_dirty=False)
# Update 'targets.json' in 'tuf.roledb.py'
roleinfo = tuf.roledb.get_roleinfo('targets', repository_name)
for filepath, fileinfo in six.iteritems(targets_metadata['targets']):
roleinfo['paths'].update({filepath: fileinfo.get('custom', {})})
roleinfo['version'] = targets_metadata['version']
roleinfo['expires'] = targets_metadata['expires']
roleinfo['delegations'] = targets_metadata['delegations']
if _metadata_is_partially_loaded('targets', signable, repository_name):
roleinfo['partial_loaded'] = True
else:
logger.debug('Targets file was not partially loaded.')
_log_warning_if_expires_soon(TARGETS_FILENAME, roleinfo['expires'],
TARGETS_EXPIRES_WARN_SECONDS)
tuf.roledb.update_roleinfo('targets', roleinfo, mark_role_as_dirty=False,
repository_name=repository_name)
# Add the keys specified in the delegations field of the Targets role.
for key_metadata in six.itervalues(targets_metadata['delegations']['keys']):
# The repo may have used hashing algorithms for the generated keyids
# that doesn't match the client's set of hash algorithms. Make sure
# to only used the repo's selected hashing algorithms.
hash_algorithms = securesystemslib.settings.HASH_ALGORITHMS
securesystemslib.settings.HASH_ALGORITHMS = key_metadata['keyid_hash_algorithms']
key_object, keyids = securesystemslib.keys.format_metadata_to_key(key_metadata)
securesystemslib.settings.HASH_ALGORITHMS = hash_algorithms
# Add 'key_object' to the list of recognized keys. Keys may be shared,
# so do not raise an exception if 'key_object' has already been loaded.
# In contrast to the methods that may add duplicate keys, do not log
# a warning as there may be many such duplicate key warnings. The
# repository maintainer should have also been made aware of the duplicate
# key when it was added.
try:
for keyid in keyids: #pragma: no branch
key_object['keyid'] = keyid
tuf.keydb.add_key(key_object, keyid=None,
repository_name=repository_name)
except securesystemslib.exceptions.KeyAlreadyExistsError:
pass
else:
logger.debug('The Targets file cannot be loaded: ' + repr(targets_filename))
return repository, consistent_snapshot
def _log_warning_if_expires_soon(rolename, expires_iso8601_timestamp,
seconds_remaining_to_warn):
"""
Non-public function that logs a warning if 'rolename' expires in
'seconds_remaining_to_warn' seconds, or less.
"""
# Metadata stores expiration datetimes in ISO8601 format. Convert to
# unix timestamp, subtract from from current time.time() (also in POSIX time)
# and compare against 'seconds_remaining_to_warn'. Log a warning message
# to console if 'rolename' expires soon.
datetime_object = iso8601.parse_date(expires_iso8601_timestamp)
expires_unix_timestamp = \
tuf.formats.datetime_to_unix_timestamp(datetime_object)
seconds_until_expires = expires_unix_timestamp - int(time.time())
if seconds_until_expires <= seconds_remaining_to_warn:
if seconds_until_expires <= 0:
logger.warning(
repr(rolename) + ' expired ' + repr(datetime_object.ctime() + ' (UTC).'))
else:
days_until_expires = seconds_until_expires / 86400
logger.warning(repr(rolename) + ' expires ' + datetime_object.ctime() + ''
' (UTC). ' + repr(days_until_expires) + ' day(s) until it expires.')
else:
pass
def generate_and_write_rsa_keypair(filepath, bits=DEFAULT_RSA_KEY_BITS,
password=None):
"""
<Purpose>
Generate an RSA key file, create an encrypted PEM string (using 'password'
as the pass phrase), and store it in 'filepath'. The public key portion of
the generated RSA key is stored in <'filepath'>.pub.
<Arguments>
filepath:
The public and private key files are saved to <filepath>.pub, <filepath>,
respectively.
bits:
The number of bits of the generated RSA key.
password:
The password used to encrypt 'filepath'.
<Exceptions>
securesystemslib.exceptions.FormatError, if the arguments are improperly
formatted.
<Side Effects>
Writes key files to '<filepath>' and '<filepath>.pub'.
<Returns>
None.
"""
securesystemslib.interface.generate_and_write_rsa_keypair(
filepath, bits, password)
def import_rsa_privatekey_from_file(filepath, password=None):
"""
<Purpose>
Import the encrypted PEM file in 'filepath', decrypt it, and return the key
object in 'securesystemslib.RSAKEY_SCHEMA' format.
<Arguments>
filepath:
<filepath> file, an RSA encrypted PEM file. Unlike the public RSA PEM
key file, 'filepath' does not have an extension.
password:
The passphrase to decrypt 'filepath'.
<Exceptions>
securesystemslib.exceptions.FormatError, if the arguments are improperly
formatted.
securesystemslib.exceptions.CryptoError, if 'filepath' is not a valid
encrypted key file.
<Side Effects>
The contents of 'filepath' is read, decrypted, and the key stored.
<Returns>
An RSA key object, conformant to 'securesystemslib.RSAKEY_SCHEMA'.
"""
# Note: securesystemslib.interface.import_rsa_privatekey_from_file() does not
# allow both 'password' and 'prompt' to be True, nor does it automatically
# prompt for a password if the key file is encrypted and a password isn't
# given.
try:
private_key = securesystemslib.interface.import_rsa_privatekey_from_file(
filepath, password)
# The user might not have given a password for an encrypted private key.
# Prompt for a password for convenience.
except securesystemslib.exceptions.CryptoError:
if password is None:
private_key = securesystemslib.interface.import_rsa_privatekey_from_file(
filepath, password, prompt=True)
else:
raise
return private_key
def import_rsa_publickey_from_file(filepath):
"""
<Purpose>
Import the RSA key stored in 'filepath'. The key object returned is a TUF
key, specifically 'securesystemslib.RSAKEY_SCHEMA'. If the RSA PEM
in 'filepath' contains a private key, it is discarded.
<Arguments>
filepath:
<filepath>.pub file, an RSA PEM file.
<Exceptions>
securesystemslib.exceptions.FormatError, if 'filepath' is improperly formatted.
securesystemslib.exceptions.Error, if a valid RSA key object cannot be
generated. This may be caused by an improperly formatted PEM file.
<Side Effects>
'filepath' is read and its contents extracted.
<Returns>
An RSA key object conformant to 'securesystemslib.RSAKEY_SCHEMA'.
"""
return securesystemslib.interface.import_rsa_publickey_from_file(filepath)
def generate_and_write_ed25519_keypair(filepath, password=None):
"""
<Purpose>
Generate an Ed25519 key file, create an encrypted TUF key (using 'password'
as the pass phrase), and store it in 'filepath'. The public key portion of
the generated ED25519 key is stored in <'filepath'>.pub. Which cryptography
library performs the cryptographic decryption is determined by the string
set in 'settings.ED25519_CRYPTO_LIBRARY'.
The Ed25519 private key is encrypted with AES-256 and CTR the mode of
operation. The password is strengthened with PBKDF2-HMAC-SHA256.
<Arguments>
filepath:
The public and private key files are saved to <filepath>.pub and
<filepath>, respectively.
password:
The password, or passphrase, to encrypt the private portion of the
generated ed25519 key. A symmetric encryption key is derived from
'password', so it is not directly used.
<Exceptions>
securesystemslib.exceptions.FormatError, if the arguments are improperly
formatted.
securesystemslib.exceptions.CryptoError, if 'filepath' cannot be encrypted.
securesystemslib.exceptions.UnsupportedLibraryError, if 'filepath' cannot be
encrypted due to an invalid configuration setting (i.e., invalid
'tuf.settings.py' setting).
<Side Effects>
Writes key files to '<filepath>' and '<filepath>.pub'.
<Returns>
None.
"""
securesystemslib.interface.generate_and_write_ed25519_keypair(
filepath, password)
def import_ed25519_publickey_from_file(filepath):
"""
<Purpose>
Load the ED25519 public key object (conformant to
'securesystemslib.KEY_SCHEMA') stored in 'filepath'. Return
'filepath' in securesystemslib.ED25519KEY_SCHEMA format.
If the TUF key object in 'filepath' contains a private key, it is discarded.
<Arguments>
filepath:
<filepath>.pub file, a TUF public key file.
<Exceptions>
securesystemslib.exceptions.FormatError, if 'filepath' is improperly
formatted or is an unexpected key type.
<Side Effects>
The contents of 'filepath' is read and saved.
<Returns>
An ED25519 key object conformant to
'securesystemslib.ED25519KEY_SCHEMA'.
"""
return securesystemslib.interface.import_ed25519_publickey_from_file(filepath)
def import_ed25519_privatekey_from_file(filepath, password=None):
"""
<Purpose>
Import the encrypted ed25519 TUF key file in 'filepath', decrypt it, and
return the key object in 'securesystemslib.ED25519KEY_SCHEMA' format.
The TUF private key (may also contain the public part) is encrypted with
AES 256 and CTR the mode of operation. The password is strengthened with
PBKDF2-HMAC-SHA256.
<Arguments>
filepath:
<filepath> file, an RSA encrypted TUF key file.
password:
The password, or passphrase, to import the private key (i.e., the
encrypted key file 'filepath' must be decrypted before the ed25519 key
object can be returned.
<Exceptions>
securesystemslib.exceptions.FormatError, if the arguments are improperly
formatted or the imported key object contains an invalid key type (i.e.,
not 'ed25519').
securesystemslib.exceptions.CryptoError, if 'filepath' cannot be decrypted.
securesystemslib.exceptions.UnsupportedLibraryError, if 'filepath' cannot be
decrypted due to an invalid configuration setting (i.e., invalid
'tuf.settings.py' setting).
<Side Effects>
'password' is used to decrypt the 'filepath' key file.
<Returns>
An ed25519 key object of the form: 'securesystemslib.ED25519KEY_SCHEMA'.
"""
# Note: securesystemslib.interface.import_ed25519_privatekey_from_file() does
# not allow both 'password' and 'prompt' to be True, nor does it
# automatically prompt for a password if the key file is encrypted and a
# password isn't given.
try:
private_key = securesystemslib.interface.import_ed25519_privatekey_from_file(
filepath, password)
# The user might not have given a password for an encrypted private key.
# Prompt for a password for convenience.
except securesystemslib.exceptions.CryptoError:
if password is None:
private_key = securesystemslib.interface.import_ed25519_privatekey_from_file(
filepath, password, prompt=True)
else:
raise
return private_key
def get_metadata_filenames(metadata_directory=None):
"""
<Purpose>
Return a dictionary containing the filenames of the top-level roles.
If 'metadata_directory' is set to 'metadata', the dictionary
returned would contain:
filenames = {'root.json': 'metadata/root.json',
'targets.json': 'metadata/targets.json',
'snapshot.json': 'metadata/snapshot.json',
'timestamp.json': 'metadata/timestamp.json'}
If 'metadata_directory' is not set by the caller, the current directory is
used.
<Arguments>
metadata_directory:
The directory containing the metadata files.
<Exceptions>
securesystemslib.exceptions.FormatError, if 'metadata_directory' is
improperly formatted.
<Side Effects>
None.
<Returns>
A dictionary containing the expected filenames of the top-level
metadata files, such as 'root.json' and 'snapshot.json'.
"""
if metadata_directory is None:
metadata_directory = os.getcwd()
# Does 'metadata_directory' have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named.
# Raise 'securesystemslib.exceptions.FormatError' if there is a mismatch.
securesystemslib.formats.PATH_SCHEMA.check_match(metadata_directory)
# Store the filepaths of the top-level roles, including the
# 'metadata_directory' for each one.
filenames = {}
filenames[ROOT_FILENAME] = \
os.path.join(metadata_directory, ROOT_FILENAME)
filenames[TARGETS_FILENAME] = \
os.path.join(metadata_directory, TARGETS_FILENAME)
filenames[SNAPSHOT_FILENAME] = \
os.path.join(metadata_directory, SNAPSHOT_FILENAME)
filenames[TIMESTAMP_FILENAME] = \
os.path.join(metadata_directory, TIMESTAMP_FILENAME)
return filenames
def get_metadata_fileinfo(filename, custom=None):
"""
<Purpose>
Retrieve the file information of 'filename'. The object returned
conforms to 'tuf.formats.FILEINFO_SCHEMA'. The information
generated for 'filename' is stored in metadata files like 'targets.json'.
The fileinfo object returned has the form:
fileinfo = {'length': 1024,
'hashes': {'sha256': 1233dfba312, ...},
'custom': {...}}
<Arguments>
filename:
The metadata file whose file information is needed. It must exist.
custom:
An optional object providing additional information about the file.
<Exceptions>
securesystemslib.exceptions.FormatError, if 'filename' is improperly
formatted.
securesystemslib.exceptions.Error, if 'filename' doesn't exist.
<Side Effects>
The file is opened and information about the file is generated,
such as file size and its hash.
<Returns>
A dictionary conformant to 'tuf.formats.FILEINFO_SCHEMA'. This
dictionary contains the length, hashes, and custom data about the
'filename' metadata file. SHA256 hashes are generated by default.
"""
# Does 'filename' and 'custom' have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named.
# Raise 'securesystemslib.exceptions.FormatError' if there is a mismatch.
securesystemslib.formats.PATH_SCHEMA.check_match(filename)
if custom is not None:
tuf.formats.CUSTOM_SCHEMA.check_match(custom)
if not os.path.isfile(filename):
message = repr(filename) + ' is not a file.'
raise securesystemslib.exceptions.Error(message)
# Note: 'filehashes' is a dictionary of the form
# {'sha256': 1233dfba312, ...}. 'custom' is an optional
# dictionary that a client might define to include additional
# file information, such as the file's author, version/revision
# numbers, etc.
filesize, filehashes = securesystemslib.util.get_file_details(filename,
securesystemslib.settings.HASH_ALGORITHMS)
return tuf.formats.make_fileinfo(filesize, filehashes, custom=custom)
def get_metadata_versioninfo(rolename, repository_name):
"""
<Purpose>
Retrieve the version information of 'rolename'. The object returned
conforms to 'securesystemslib.VERSIONINFO_SCHEMA'. The information
generated for 'rolename' is stored in 'snapshot.json'.
The versioninfo object returned has the form:
versioninfo = {'version': 14}
<Arguments>
rolename:
The metadata role whose versioninfo is needed. It must exist, otherwise
a 'securesystemslib.exceptions.UnknownRoleError' exception is raised.
repository_name:
The name of the repository. If not supplied, 'rolename' is added to the
'default' repository.
<Exceptions>
securesystemslib.exceptions.FormatError, if 'rolename' is improperly
formatted.
securesystemslib.exceptions.UnknownRoleError, if 'rolename' does not exist.
<Side Effects>
None.
<Returns>
A dictionary conformant to 'securesystemslib.VERSIONINFO_SCHEMA'.
This dictionary contains the version number of 'rolename'.
"""
# Does 'rolename' have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named.
tuf.formats.ROLENAME_SCHEMA.check_match(rolename)
roleinfo = tuf.roledb.get_roleinfo(rolename, repository_name)
versioninfo = {'version': roleinfo['version']}
return versioninfo
def get_target_hash(target_filepath):
"""
<Purpose>
Compute the hash of 'target_filepath'. This is useful in conjunction with
the "path_hash_prefixes" attribute in a delegated targets role, which
tells us which paths it is implicitly responsible for.
The repository may optionally organize targets into hashed bins to ease
target delegations and role metadata management. The use of consistent
hashing allows for a uniform distribution of targets into bins.
<Arguments>
target_filepath:
The path to the target file on the repository. This will be relative to
the 'targets' (or equivalent) directory on a given mirror.
<Exceptions>
None.
<Side Effects>
None.
<Returns>
The hash of 'target_filepath'.
"""
return securesystemslib.util.get_target_hash(target_filepath)
def generate_root_metadata(version, expiration_date, consistent_snapshot,
repository_name='default'):
"""
<Purpose>
Create the root metadata. 'tuf.roledb.py' and 'tuf.keydb.py'
are read and the information returned by these modules is used to generate
the root metadata object.
<Arguments>
version:
The metadata version number. Clients use the version number to
determine if the downloaded version is newer than the one currently
trusted.
expiration_date:
The expiration date of the metadata file. Conformant to
'securesystemslib.formats.ISO8601_DATETIME_SCHEMA'.
consistent_snapshot:
Boolean. If True, a file digest is expected to be prepended to the
filename of any target file located in the targets directory. Each digest
is stripped from the target filename and listed in the snapshot metadata.
repository_name:
The name of the repository. If not supplied, 'rolename' is added to the
'default' repository.
<Exceptions>
securesystemslib.exceptions.FormatError, if the generated root metadata
object could not be generated with the correct format.
securesystemslib.exceptions.Error, if an error is encountered while
generating the root metadata object (e.g., a required top-level role not
found in 'tuf.roledb'.)
<Side Effects>
The contents of 'tuf.keydb.py' and 'tuf.roledb.py' are read.
<Returns>
A root metadata object, conformant to 'tuf.formats.ROOT_SCHEMA'.
"""
# Do the arguments have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named. Raise
# 'securesystemslib.exceptions.FormatError' if any of the arguments are
# improperly formatted.
tuf.formats.METADATAVERSION_SCHEMA.check_match(version)
securesystemslib.formats.ISO8601_DATETIME_SCHEMA.check_match(expiration_date)
securesystemslib.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot)
securesystemslib.formats.NAME_SCHEMA.check_match(repository_name)
# The role and key dictionaries to be saved in the root metadata object.
# Conformant to 'ROLEDICT_SCHEMA' and 'KEYDICT_SCHEMA', respectively.
roledict = {}
keydict = {}
# Extract the role, threshold, and keyid information of the top-level roles,
# which Root stores in its metadata. The necessary role metadata is generated
# from this information.
for rolename in ['root', 'targets', 'snapshot', 'timestamp']:
# If a top-level role is missing from 'tuf.roledb.py', raise an exception.
if not tuf.roledb.role_exists(rolename, repository_name):
raise securesystemslib.exceptions.Error(repr(rolename) + ' not in'
' "tuf.roledb".')
# Keep track of the keys loaded to avoid duplicates.
keyids = []
# Generate keys for the keyids listed by the role being processed.
for keyid in tuf.roledb.get_role_keyids(rolename, repository_name):
key = tuf.keydb.get_key(keyid, repository_name=repository_name)
# If 'key' is an RSA key, it would conform to
# 'securesystemslib.formats.RSAKEY_SCHEMA', and have the form:
# {'keytype': 'rsa',
# 'keyid': keyid,
# 'keyval': {'public': '-----BEGIN RSA PUBLIC KEY----- ...',
# 'private': '-----BEGIN RSA PRIVATE KEY----- ...'}}
keyid = key['keyid']
if keyid not in keydict:
# This appears to be a new keyid. Generate the key for it.
if key['keytype'] in ['rsa', 'ed25519', 'ecdsa-sha2-nistp256']:
keytype = key['keytype']
keyval = key['keyval']
scheme = key['scheme']
keydict[keyid] = \
securesystemslib.keys.format_keyval_to_metadata(keytype,
scheme, keyval, private=False)
# This is not a recognized key. Raise an exception.
else:
raise securesystemslib.exceptions.Error('Unsupported keytype:'
' ' + key['keytype'])
# Do we have a duplicate?
if keyid in keyids:
raise securesystemslib.exceptions.Error('Same keyid listed twice:'
' ' + keyid)
# Add the loaded keyid for the role being processed.
keyids.append(keyid)
# Generate and store the role data belonging to the processed role.
role_threshold = tuf.roledb.get_role_threshold(rolename, repository_name)
role_metadata = tuf.formats.make_role_metadata(keyids, role_threshold)
roledict[rolename] = role_metadata
# Generate the root metadata object.
root_metadata = tuf.formats.RootFile.make_metadata(version, expiration_date,
keydict, roledict, consistent_snapshot)
return root_metadata
def generate_targets_metadata(targets_directory, target_files, version,
expiration_date, delegations=None, write_consistent_targets=False):
"""
<Purpose>
Generate the targets metadata object. The targets in 'target_files' must
exist at the same path they should on the repo. 'target_files' is a list
of targets. The 'custom' field of the targets metadata is not currently
supported.
<Arguments>
targets_directory:
The directory containing the target files and directories of the
repository.
target_files:
The target files tracked by 'targets.json'. 'target_files' is a
dictionary of target paths that are relative to the targets directory and
an optional custom value (e.g., {'file1.txt': {'custom_data: 0755},
'Django/module.py': {}}).
version:
The metadata version number. Clients use the version number to
determine if the downloaded version is newer than the one currently
trusted.
expiration_date:
The expiration date of the metadata file. Conformant to
'securesystemslib.formats.ISO8601_DATETIME_SCHEMA'.
delegations:
The delegations made by the targets role to be generated. 'delegations'
must match 'tuf.formats.DELEGATIONS_SCHEMA'.
write_consistent_targets:
Boolean that indicates whether file digests should be prepended to the
target files.
<Exceptions>
securesystemslib.exceptions.FormatError, if an error occurred trying to
generate the targets metadata object.
securesystemslib.exceptions.Error, if any of the target files cannot be
read.
<Side Effects>
The target files are read and file information generated about them. If
'write_consistent_targets' is True, each target in 'target_files' will be
copied to a file with a digest prepended to its filename. For example, if
'some_file.txt' is one of the targets of 'target_files', consistent targets
<sha-2 hash>.some_file.txt, <sha-3 hash>.some_file.txt, etc., are created
and the content of 'some_file.txt' will be copied into them.
<Returns>
A targets metadata object, conformant to
'tuf.formats.TARGETS_SCHEMA'.
"""
# Do the arguments have the correct format?
# Ensure the arguments have the appropriate number of objects and object
# types, and that all dict keys are properly named.
# Raise 'securesystemslib.exceptions.FormatError' if there is a mismatch.
securesystemslib.formats.PATH_SCHEMA.check_match(targets_directory)
securesystemslib.formats.PATH_FILEINFO_SCHEMA.check_match(target_files)
tuf.formats.METADATAVERSION_SCHEMA.check_match(version)
securesystemslib.formats.ISO8601_DATETIME_SCHEMA.check_match(expiration_date)
securesystemslib.formats.BOOLEAN_SCHEMA.check_match(write_consistent_targets)
if delegations is not None:
tuf.formats.DELEGATIONS_SCHEMA.check_match(delegations)
# Store the file attributes of targets in 'target_files'. 'filedict',
# conformant to 'tuf.formats.FILEDICT_SCHEMA', is added to the
# targets metadata object returned.
filedict = {}
# Ensure the user is aware of a non-existent 'target_directory', and convert
# it to its abosolute path, if it exists.
targets_directory = _check_directory(targets_directory)
# Generate the fileinfo of all the target files listed in 'target_files'.
for target, custom in six.iteritems(target_files):
# The root-most folder of the targets directory should not be included in
# target paths listed in targets metadata.
# (e.g., 'targets/more_targets/somefile.txt' -> 'more_targets/somefile.txt')
relative_targetpath = target
# Note: join() discards 'targets_directory' if 'target' contains a leading
# path separator (i.e., is treated as an absolute path).
target_path = os.path.join(targets_directory, target.lstrip(os.sep))
# Ensure all target files listed in 'target_files' exist. If just one of
# these files does not exist, raise an exception.
if not os.path.exists(target_path):
raise securesystemslib.exceptions.Error(repr(target_path) + ' cannot'
' be read. Unable to generate targets metadata.')
# Add 'custom' if it has been provided. Custom data about the target is
# optional and will only be included in metadata (i.e., a 'custom' field in
# the target's fileinfo dictionary) if specified here.
custom_data = None
if len(custom):
custom_data = custom
filedict[relative_targetpath.replace('\\', '/').lstrip('/')] = \
get_metadata_fileinfo(target_path, custom_data)
# Copy 'target_path' to 'digest_target' if consistent hashing is enabled.
if write_consistent_targets:
for target_digest in six.itervalues(filedict[relative_targetpath]['hashes']):
dirname, basename = os.path.split(target_path)
digest_filename = target_digest + '.' + basename
digest_target = os.path.join(dirname, digest_filename)
shutil.copyfile(target_path, digest_target)
# Generate the targets metadata object.
targets_metadata = tuf.formats.TargetsFile.make_metadata(version,
expiration_date, filedict, delegations)
return targets_metadata
def generate_snapshot_metadata(metadata_directory, version, expiration_date,
root_filename, targets_filename, consistent_snapshot=False,
repository_name='default'):
"""
<Purpose>
Create the snapshot metadata. The minimum metadata must exist (i.e.,
'root.json' and 'targets.json'). This function searches
'metadata_directory' and the resulting snapshot file will list all the
delegated roles found there.
<Arguments>
metadata_directory:
The directory containing the 'root.json' and 'targets.json' metadata
files.
version:
The metadata version number. Clients use the version number to
determine if the downloaded version is newer than the one currently
trusted.
expiration_date:
The expiration date of the metadata file.
Conformant to 'securesystemslib.formats.ISO8601_DATETIME_SCHEMA'.
root_filename:
The filename of the top-level root role. The hash and file size of this
file is listed in the snapshot role.
targets_filename:
The filename of the top-level targets role. The hash and file size of
this file is listed in the snapshot role.
consistent_snapshot:
Boolean. If True, a file digest is expected to be prepended to the
filename of any target file located in the targets directory. Each digest
is stripped from the target filename and listed in the snapshot metadata.
repository_name:
The name of the repository. If not supplied, 'rolename' is added to the
'default' repository.
<Exceptions>
securesystemslib.exceptions.FormatError, if the arguments are improperly
formatted.
securesystemslib.exceptions.Error, if an error occurred trying to generate
the snapshot metadata object.
<Side Effects>
The 'root.json' and 'targets.json' files are read.
<Returns>
The snapshot metadata object, conformant to 'tuf.formats.SNAPSHOT_SCHEMA'.
"""
# Do the arguments have the correct format?
# This check ensures arguments have the appropriate number of objects and
# object types, and that all dict keys are properly named.
# Raise 'securesystemslib.exceptions.FormatError' if the check fails.
securesystemslib.formats.PATH_SCHEMA.check_match(metadata_directory)
tuf.formats.METADATAVERSION_SCHEMA.check_match(version)
securesystemslib.formats.ISO8601_DATETIME_SCHEMA.check_match(expiration_date)
securesystemslib.formats.PATH_SCHEMA.check_match(root_filename)
securesystemslib.formats.PATH_SCHEMA.check_match(targets_filename)
securesystemslib.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot)
securesystemslib.formats.NAME_SCHEMA.check_match(repository_name)
metadata_directory = _check_directory(metadata_directory)
# Snapshot's 'fileinfodict' shall contain the version number of Root,
# Targets, and all delegated roles fo the repository.
fileinfodict = {}
fileinfodict[ROOT_FILENAME] = get_metadata_versioninfo(root_filename,
repository_name)
fileinfodict[TARGETS_FILENAME] = get_metadata_versioninfo(targets_filename,
repository_name)
# We previously also stored the compressed versions of roles in
# snapshot.json, however, this is no longer needed as their hashes and
# lengths are not used and their version numbers match the uncompressed role
# files.
# Search the metadata directory and generate the versioninfo of all the role
# files found there. This information is stored in the 'meta' field of
# 'snapshot.json'.
for metadata_filename in sorted(os.listdir(metadata_directory), reverse=True):
# Strip the version number if 'consistent_snapshot' is True.
# Example: '10.django.json' --> 'django.json'
metadata_name, junk = _strip_version_number(metadata_filename,
consistent_snapshot)
# All delegated roles are added to the snapshot file.
if metadata_filename.endswith(METADATA_EXTENSION):
rolename = metadata_filename[:-len(METADATA_EXTENSION)]
# Obsolete role files may still be found. Ensure only roles loaded
# in the roledb are included in the Snapshot metadata. Since the
# snapshot and timestamp roles are not listed in snapshot.json, do not
# list these roles found in the metadata directory.
if tuf.roledb.role_exists(rolename, repository_name) and \
rolename not in ['root', 'snapshot', 'timestamp', 'targets']:
fileinfodict[metadata_name] = get_metadata_versioninfo(rolename,
repository_name)
else:
logger.debug('Metadata file has an unsupported file'
' extension: ' + metadata_filename)
# Generate the Snapshot metadata object.
snapshot_metadata = tuf.formats.SnapshotFile.make_metadata(version,
expiration_date, fileinfodict)
return snapshot_metadata
def generate_timestamp_metadata(snapshot_filename, version, expiration_date,
repository_name):
"""
<Purpose>
Generate the timestamp metadata object. The 'snapshot.json' file must
exist.
<Arguments>
snapshot_filename:
The required filename of the snapshot metadata file. The timestamp role
needs to the calculate the file size and hash of this file.
version:
The timestamp's version number. Clients use the version number to
determine if the downloaded version is newer than the one currently
trusted.
expiration_date:
The expiration date of the metadata file, conformant to
'securesystemslib.formats.ISO8601_DATETIME_SCHEMA'.
repository_name:
The name of the repository. If not supplied, 'rolename' is added to the
'default' repository.
<Exceptions>
securesystemslib.exceptions.FormatError, if the generated timestamp metadata
object cannot be formatted correctly, or one of the arguments is improperly
formatted.
<Side Effects>
None.
<Returns>
A timestamp metadata object, conformant to 'tuf.formats.TIMESTAMP_SCHEMA'.
"""
# Do the arguments have the correct format?
# This check ensures arguments have the appropriate number of objects and
# object types, and that all dict keys are properly named.
# Raise 'securesystemslib.exceptions.FormatError' if the check fails.
securesystemslib.formats.PATH_SCHEMA.check_match(snapshot_filename)
tuf.formats.METADATAVERSION_SCHEMA.check_match(version)
securesystemslib.formats.ISO8601_DATETIME_SCHEMA.check_match(expiration_date)
securesystemslib.formats.NAME_SCHEMA.check_match(repository_name)
# Retrieve the versioninfo of the Snapshot metadata file.
snapshot_fileinfo = {}
length, hashes = securesystemslib.util.get_file_details(snapshot_filename)
snapshot_version = get_metadata_versioninfo('snapshot', repository_name)
snapshot_fileinfo[SNAPSHOT_FILENAME] = \
tuf.formats.make_fileinfo(length, hashes, version=snapshot_version['version'])
# We previously saved the versioninfo of the compressed versions of
# 'snapshot.json' in 'versioninfo'. Since version numbers are now stored,
# the version numbers of compressed roles do not change and can thus be
# excluded.
# Generate the timestamp metadata object.
timestamp_metadata = tuf.formats.TimestampFile.make_metadata(version,
expiration_date, snapshot_fileinfo)
return timestamp_metadata
def sign_metadata(metadata_object, keyids, filename, repository_name):
"""
<Purpose>
Sign a metadata object. If any of the keyids have already signed the file,
the old signature is replaced. The keys in 'keyids' must already be
loaded in 'tuf.keydb'.
<Arguments>
metadata_object:
The metadata object to sign. For example, 'metadata' might correspond to
'tuf.formats.ROOT_SCHEMA' or
'tuf.formats.TARGETS_SCHEMA'.
keyids:
The keyids list of the signing keys.
filename:
The intended filename of the signed metadata object.
For example, 'root.json' or 'targets.json'. This function
does NOT save the signed metadata to this filename.
repository_name:
The name of the repository. If not supplied, 'rolename' is added to the
'default' repository.
<Exceptions>
securesystemslib.exceptions.FormatError, if a valid 'signable' object could
not be generated or the arguments are improperly formatted.
securesystemslib.exceptions.Error, if an invalid keytype was found in the
keystore.
<Side Effects>
None.
<Returns>
A signable object conformant to 'tuf.formats.SIGNABLE_SCHEMA'.
"""
# Do the arguments have the correct format?
# This check ensures arguments have the appropriate number of objects and
# object types, and that all dict keys are properly named.
# Raise 'securesystemslib.exceptions.FormatError' if the check fails.
tuf.formats.ANYROLE_SCHEMA.check_match(metadata_object)
securesystemslib.formats.KEYIDS_SCHEMA.check_match(keyids)
securesystemslib.formats.PATH_SCHEMA.check_match(filename)
securesystemslib.formats.NAME_SCHEMA.check_match(repository_name)
# Make sure the metadata is in 'signable' format. That is,
# it contains a 'signatures' field containing the result
# of signing the 'signed' field of 'metadata' with each
# keyid of 'keyids'.
signable = tuf.formats.make_signable(metadata_object)
# Sign the metadata with each keyid in 'keyids'. 'signable' should have
# zero signatures (metadata_object contained none).
for keyid in keyids:
# Load the signing key.
key = tuf.keydb.get_key(keyid, repository_name=repository_name)
# Generate the signature using the appropriate signing method.
if key['keytype'] in SUPPORTED_KEY_TYPES:
if 'private' in key['keyval']:
signed = signable['signed']
try:
signature = securesystemslib.keys.create_signature(key, signed)
signable['signatures'].append(signature)
except Exception:
logger.warning('Unable to create signature for keyid: ' + repr(keyid))
else:
logger.debug('Private key unset. Skipping: ' + repr(keyid))
else:
raise securesystemslib.exceptions.Error('The keydb contains a key with'
' an invalid key type.' + repr(key['keytype']))
# Raise 'securesystemslib.exceptions.FormatError' if the resulting 'signable'
# is not formatted correctly.
tuf.formats.check_signable_object_format(signable)
return signable
def write_metadata_file(metadata, filename, version_number, consistent_snapshot):
"""
<Purpose>
If necessary, write the 'metadata' signable object to 'filename'.
<Arguments>
metadata:
The object that will be saved to 'filename', conformant to
'tuf.formats.SIGNABLE_SCHEMA'.
filename:
The filename of the metadata to be written (e.g., 'root.json').
version_number:
The version number of the metadata file to be written. The version
number is needed for consistent snapshots, which prepend the version
number to 'filename'.
consistent_snapshot:
Boolean that determines whether the metadata file's digest should be
prepended to the filename.
<Exceptions>
securesystemslib.exceptions.FormatError, if the arguments are improperly
formatted.
securesystemslib.exceptions.Error, if the directory of 'filename' does not
exist.
Any other runtime (e.g., IO) exception.
<Side Effects>
The 'filename' file is created, or overwritten if it exists.
<Returns>
The filename of the written file.
"""
# Do the arguments have the correct format?
# This check ensures arguments have the appropriate number of objects and
# object types, and that all dict keys are properly named.
# Raise 'securesystemslib.exceptions.FormatError' if the check fails.
tuf.formats.SIGNABLE_SCHEMA.check_match(metadata)
securesystemslib.formats.PATH_SCHEMA.check_match(filename)
tuf.formats.METADATAVERSION_SCHEMA.check_match(version_number)
securesystemslib.formats.BOOLEAN_SCHEMA.check_match(consistent_snapshot)
# Verify the directory of 'filename', and convert 'filename' to its absolute
# path so that temporary files are moved to their expected destinations.
filename = os.path.abspath(filename)
written_filename = filename
_check_directory(os.path.dirname(filename))
# Generate the actual metadata file content of 'metadata'. Metadata is
# saved as JSON and includes formatting, such as indentation and sorted
# objects. The new digest of 'metadata' is also calculated to help determine
# if re-saving is required.
file_content = _get_written_metadata(metadata)
# We previously verified whether new metadata needed to be written (i.e., has
# not been previously written or has changed). It is now assumed that the
# caller intends to write changes that have been marked as dirty.
# The 'metadata' object is written to 'file_object'. To avoid partial
# metadata from being written, 'metadata' is first written to a temporary
# location (i.e., 'file_object') and then moved to 'filename'.
file_object = securesystemslib.util.TempFile()
# Serialize 'metadata' to the file-like object and then write 'file_object'
# to disk. The dictionary keys of 'metadata' are sorted and indentation is
# used. The 'securesystemslib.util.TempFile' file-like object is automically
# closed after the final move.
file_object.write(file_content)
if consistent_snapshot:
dirname, basename = os.path.split(written_filename)
basename = basename.split(METADATA_EXTENSION, 1)[0]
version_and_filename = str(version_number) + '.' + basename + METADATA_EXTENSION
written_consistent_filename = os.path.join(dirname, version_and_filename)
# If we were to point consistent snapshots to 'written_filename', they
# would always point to the current version. Example: 1.root.json and
# 2.root.json -> root.json. If consistent snapshot is True, we should save
# the consistent snapshot and point 'written_filename' to it.
logger.debug('Creating a consistent file for ' + repr(written_filename))
logger.debug('Saving ' + repr(written_consistent_filename))
file_object.move(written_consistent_filename)
# For GitHub issue #374 https://github.com/theupdateframework/tuf/issues/374
# We provide the option of either (1) creating a link via os.link() to the
# consistent file or (2) creating a copy of the consistent file and saving
# to its expected filename (e.g., root.json). The option of either
# creating a copy or link should be configurable in tuf.settings.py.
if tuf.settings.CONSISTENT_METHOD == 'copy' or platform.system() == 'Windows':
logger.debug('Pointing ' + repr(filename) + ' to the consistent'
' file: ' + repr(written_consistent_filename))
shutil.copyfile(written_consistent_filename, written_filename)
elif tuf.settings.CONSISTENT_METHOD == 'hard_link':
logger.info('Hard linking ' + repr(written_consistent_filename))
# 'written_filename' must not exist, otherwise os.link() complains.
if os.path.exists(written_filename):
os.remove(written_filename)
else:
logger.debug(repr(written_filename) + ' does not exist.')
os.link(written_consistent_filename, written_filename)
else:
raise securesystemslib.exceptions.InvalidConfigurationError('The'
' consistent method specified in tuf.settings.py is not supported, try'
' either "copy" or "hard_link"')
else:
logger.debug('Not creating a consistent snapshot for ' + repr(written_filename))
logger.debug('Saving ' + repr(written_filename))
file_object.move(written_filename)
return written_filename
def _log_status_of_top_level_roles(targets_directory, metadata_directory,
repository_name):
"""
Non-public function that logs whether any of the top-level roles contain an
invalid number of public and private keys, or an insufficient threshold of
signatures. Considering that the top-level metadata have to be verified in
the expected root -> targets -> snapshot -> timestamp order, this function
logs the error message and returns as soon as a required metadata file is
found to be invalid. It is assumed here that the delegated roles have been
written and verified. Example output:
'root' role contains 1 / 1 signatures.
'targets' role contains 1 / 1 signatures.
'snapshot' role contains 1 / 1 signatures.
'timestamp' role contains 1 / 1 signatures.
Note: Temporary metadata is generated so that file hashes & sizes may be
computed and verified against the attached signatures. 'metadata_directory'
should be a directory in a temporary repository directory.
"""
# The expected full filenames of the top-level roles needed to write them to
# disk.
filenames = get_metadata_filenames(metadata_directory)
root_filename = filenames[ROOT_FILENAME]
targets_filename = filenames[TARGETS_FILENAME]
snapshot_filename = filenames[SNAPSHOT_FILENAME]
timestamp_filename = filenames[TIMESTAMP_FILENAME]
# Verify that the top-level roles contain a valid number of public keys and
# that their corresponding private keys have been loaded.
for rolename in ['root', 'targets', 'snapshot', 'timestamp']:
try:
_check_role_keys(rolename, repository_name)
except securesystemslib.exceptions.InsufficientKeysError as e:
logger.info(str(e))
# Do the top-level roles contain a valid threshold of signatures? Top-level
# metadata is verified in Root -> Targets -> Snapshot -> Timestamp order.
# Verify the metadata of the Root role.
dirty_rolenames = tuf.roledb.get_dirty_roles(repository_name)
root_roleinfo = tuf.roledb.get_roleinfo('root', repository_name)
root_is_dirty = None
if 'root' in dirty_rolenames:
root_is_dirty = True
else:
root_is_dirty = False
try:
signable, root_filename = \
_generate_and_write_metadata('root', root_filename, targets_directory,
metadata_directory, repository_name=repository_name)
_log_status('root', signable, repository_name)
# 'tuf.exceptions.UnsignedMetadataError' raised if metadata contains an
# invalid threshold of signatures. log the valid/threshold message, where
# valid < threshold.
except tuf.exceptions.UnsignedMetadataError as e:
_log_status('root', e.signable, repository_name)
return
finally:
tuf.roledb.unmark_dirty(['root'], repository_name)
tuf.roledb.update_roleinfo('root', root_roleinfo,
mark_role_as_dirty=root_is_dirty, repository_name=repository_name)
# Verify the metadata of the Targets role.
targets_roleinfo = tuf.roledb.get_roleinfo('targets', repository_name)
targets_is_dirty = None
if 'targets' in dirty_rolenames:
targets_is_dirty = True
else:
targets_is_dirty = False
try:
signable, targets_filename = \
_generate_and_write_metadata('targets', targets_filename,
targets_directory, metadata_directory, repository_name=repository_name)
_log_status('targets', signable, repository_name)
except tuf.exceptions.UnsignedMetadataError as e:
_log_status('targets', e.signable, repository_name)
return
finally:
tuf.roledb.unmark_dirty(['targets'], repository_name)
tuf.roledb.update_roleinfo('targets', targets_roleinfo,
mark_role_as_dirty=targets_is_dirty, repository_name=repository_name)
# Verify the metadata of the snapshot role.
snapshot_roleinfo = tuf.roledb.get_roleinfo('snapshot', repository_name)
snapshot_is_dirty = None
if 'snapshot' in dirty_rolenames:
snapshot_is_dirty = True
else:
snapshot_is_dirty = False
filenames = {'root': root_filename, 'targets': targets_filename}
try:
signable, snapshot_filename = \
_generate_and_write_metadata('snapshot', snapshot_filename,
targets_directory, metadata_directory, False, filenames,
repository_name=repository_name)
_log_status('snapshot', signable, repository_name)
except tuf.exceptions.UnsignedMetadataError as e:
_log_status('snapshot', e.signable, repository_name)
return
finally:
tuf.roledb.unmark_dirty(['snapshot'], repository_name)
tuf.roledb.update_roleinfo('snapshot', snapshot_roleinfo,
mark_role_as_dirty=snapshot_is_dirty, repository_name=repository_name)
# Verify the metadata of the Timestamp role.
timestamp_roleinfo = tuf.roledb.get_roleinfo('timestamp', repository_name)
timestamp_is_dirty = None
if 'timestamp' in dirty_rolenames:
timestamp_is_dirty = True
else:
timestamp_is_dirty = False
filenames = {'snapshot': snapshot_filename}
try:
signable, timestamp_filename = \
_generate_and_write_metadata('timestamp', timestamp_filename,
targets_directory, metadata_directory, False, filenames,
repository_name=repository_name)
_log_status('timestamp', signable, repository_name)
except tuf.exceptions.UnsignedMetadataError as e:
_log_status('timestamp', e.signable, repository_name)
return
finally:
tuf.roledb.unmark_dirty(['timestamp'], repository_name)
tuf.roledb.update_roleinfo('timestamp', timestamp_roleinfo,
mark_role_as_dirty=timestamp_is_dirty, repository_name=repository_name)
def _log_status(rolename, signable, repository_name):
"""
Non-public function logs the number of (good/threshold) signatures of
'rolename'.
"""
status = tuf.sig.get_signature_status(signable, rolename, repository_name)
logger.info(repr(rolename) + ' role contains ' + \
repr(len(status['good_sigs'])) + ' / ' + repr(status['threshold']) + \
' signatures.')
def create_tuf_client_directory(repository_directory, client_directory):
"""
<Purpose>
Create a client directory structure that the 'tuf.interposition' package
and 'tuf.client.updater' module expect of clients. Metadata files
downloaded from a remote TUF repository are saved to 'client_directory'.
The Root file must initially exist before an update request can be
satisfied. create_tuf_client_directory() ensures the minimum metadata
is copied and that required directories ('previous' and 'current') are
created in 'client_directory'. Software updaters integrating TUF may
use the client directory created as an initial copy of the repository's
metadadata.
<Arguments>
repository_directory:
The path of the root repository directory. The 'metadata' and 'targets'
sub-directories should be available in 'repository_directory'. The
metadata files of 'repository_directory' are copied to 'client_directory'.
client_directory:
The path of the root client directory. The 'current' and 'previous'
sub-directies are created and will store the metadata files copied
from 'repository_directory'. 'client_directory' will store metadata
and target files downloaded from a TUF repository.
<Exceptions>
securesystemslib.exceptions.FormatError, if the arguments are improperly
formatted.
securesystemslib.exceptions.RepositoryError, if the metadata directory in
'client_directory' already exists.
<Side Effects>
Copies metadata files and directories from 'repository_directory' to
'client_directory'. Parent directories are created if they do not exist.
<Returns>
None.
"""
# Do the arguments have the correct format?
# This check ensures arguments have the appropriate number of objects and
# object types, and that all dict keys are properly named.
# Raise 'securesystemslib.exceptions.FormatError' if the check fails.
securesystemslib.formats.PATH_SCHEMA.check_match(repository_directory)
securesystemslib.formats.PATH_SCHEMA.check_match(client_directory)
# Set the absolute path of the Repository's metadata directory. The metadata
# directory should be the one served by the Live repository. At a minimum,
# the repository's root file must be copied.
repository_directory = os.path.abspath(repository_directory)
metadata_directory = os.path.join(repository_directory,
METADATA_DIRECTORY_NAME)
# Set the client's metadata directory, which will store the metadata copied
# from the repository directory set above.
client_directory = os.path.abspath(client_directory)
client_metadata_directory = os.path.join(client_directory,
METADATA_DIRECTORY_NAME)
# If the client's metadata directory does not already exist, create it and
# any of its parent directories, otherwise raise an exception. An exception
# is raised to avoid accidently overwritting previous metadata.
try:
os.makedirs(client_metadata_directory)
except OSError as e:
if e.errno == errno.EEXIST:
message = 'Cannot create a fresh client metadata directory: ' +\
repr(client_metadata_directory) + '. Already exists.'
raise securesystemslib.exceptions.RepositoryError(message)
# Testing of non-errno.EEXIST exceptions have been verified on all
# supported OSs. An unexpected exception (the '/' directory exists, rather
# than disallowed path) is possible on Travis, so the '#pragma: no branch'
# below is included to prevent coverage failure.
else: #pragma: no branch
raise
# Move all metadata to the client's 'current' and 'previous' directories.
# The root metadata file MUST exist in '{client_metadata_directory}/current'.
# 'tuf.interposition' and 'tuf.client.updater.py' expect the 'current' and
# 'previous' directories to exist under 'metadata'.
client_current = os.path.join(client_metadata_directory, 'current')
client_previous = os.path.join(client_metadata_directory, 'previous')
shutil.copytree(metadata_directory, client_current)
shutil.copytree(metadata_directory, client_previous)
def disable_console_log_messages():
"""
<Purpose>
Disable logger messages printed to the console. For example, repository
maintainers may want to call this function if many roles will be sharing
keys, otherwise detected duplicate keys will continually log a warning
message.
<Arguments>
None.
<Exceptions>
None.
<Side Effects>
Removes the 'tuf.log' console handler, added by default when
'tuf.repository_tool.py' is imported.
<Returns>
None.
"""
tuf.log.remove_console_handler()
if __name__ == '__main__':
# The interactive sessions of the documentation strings can
# be tested by running repository_tool.py as a standalone module:
# $ python repository_lib.py.
import doctest
doctest.testmod()