#!/usr/bin/env python # Copyright 2018, New York University and the TUF contributors # SPDX-License-Identifier: MIT OR Apache-2.0 """ repo.py Vladimir Diaz January 2018. See LICENSE-MIT OR LICENSE for licensing information. Provide a command-line interface to create and modify TUF repositories. The CLI removes the need to write Python code when creating or modifying repositories, which is the case with repository_tool.py and developer_tool.py. Note: 'pip install securesystemslib[crypto,pynacl]' is required by the CLI, which installs the 3rd-party dependencies: cryptography, pynacl, and colorama. Note: arguments within brackets are optional. $ repo.py --init [--consistent, --bare, --path, --root_pw, --targets_pw, --snapshot_pw, --timestamp_pw] $ repo.py --add ... [--path, --recursive] $ repo.py --remove $ repo.py --distrust --pubkeys [--role] $ repo.py --trust --pubkeys [--role] $ repo.py --sign [--role ] $ repo.py --key [--filename --path , --pw [my_password]] $ repo.py --delegate --delegatee --pubkeys [--role --terminating --threshold --sign ] $ repo.py --revoke --delegatee [--role --sign ] $ repo.py --verbose <0-5> $ repo.py --clean [--path] --init: Create new TUF repository in current working or specified directory. --consistent: Enable consistent snapshots for newly created TUF repository. --bare: Specify creation of bare TUF repository with no key created or set. --path: Choose specified path location of a TUF repository or key(s). --role: Specify top-level role(s) affected by the main command-line option. --pubkeys: Indicate location of key(s) affected by the main command-line option. --root_pw: Set password for encrypting top-level key file of root role. --targets_pw: Set password for encrypting top-level key file of targets role. --snapshot_pw: Set password for encrypting top-level key file of snapshot role. --timestamp_pw: Set password for encrypting top-level key file of timestamp role. --add: Add file specified by to the Targets metadata. --recursive: Include files in subdirectories of specified directory . --remove: Remove target files from Targets metadata matching . --distrust: Discontinue trust of keys located in directory of a role. --trust: Indicate trusted keys located in directory of a role. --sign: Sign metadata of target role(s) with keys in specified directory. --key: Generate cryptographic key of specified type (default: Ed25519). --filename: Specify filename associated with generated top-level key. --pw: Set password for the generated key of specified type . --delegate: Delegate trust of target files from Targets role (or specified in --role) to --delegatee role with specified . --delegatee: Specify role that is targetted by delegator in --role to sign for target files matching delegated or in revocation of trust. --terminating: Mark delegation to --delegatee role from delegator as a terminating one. --threshold: Specify signature threshold of --delegatee role as the value . --revoke: Revoke trust of target files from delegated role (--delegatee) --verbose: Set the verbosity level of logging messages. Accepts values 1-5. --clean: Delete repo in current working or specified directory. """ # Help with Python 2+3 compatibility, where the print statement is a function, # an implicit relative import is invalid, and the '/' operator performs true # division. Example: print 'hello world' raises a 'SyntaxError' exception. from __future__ import print_function from __future__ import absolute_import from __future__ import division from __future__ import unicode_literals import os import sys import logging import argparse import shutil import time import fnmatch import tuf import tuf.log import tuf.formats import tuf.repository_tool as repo_tool # 'pip install securesystemslib[crypto,pynacl]' is required for the CLI, # which installs the cryptography, pynacl, and colorama dependencies. import securesystemslib from colorama import Fore import six # See 'log.py' to learn how logging is handled in TUF. logger = logging.getLogger('tuf.scripts.repo') repo_tool.disable_console_log_messages() PROG_NAME = 'repo.py' REPO_DIR = 'tufrepo' CLIENT_DIR = 'tufclient' KEYSTORE_DIR = 'tufkeystore' ROOT_KEY_NAME = 'root_key' TARGETS_KEY_NAME = 'targets_key' SNAPSHOT_KEY_NAME = 'snapshot_key' TIMESTAMP_KEY_NAME = 'timestamp_key' STAGED_METADATA_DIR = 'metadata.staged' METADATA_DIR = 'metadata' # The keytype strings, as expected on the command line. ED25519_KEYTYPE = 'ed25519' ECDSA_KEYTYPE = 'ecdsa' RSA_KEYTYPE = 'rsa' SUPPORTED_CLI_KEYTYPES = (ECDSA_KEYTYPE, ED25519_KEYTYPE, RSA_KEYTYPE) # The supported keytype strings (as they appear in metadata) are listed here # because they won't necessarily match the key types supported by # securesystemslib. SUPPORTED_KEY_TYPES = ('ed25519', 'ecdsa-sha2-nistp256', 'rsa') def process_command_line_arguments(parsed_arguments): """ Perform the relevant operations on the repo according to the chosen command-line options. Which functions are executed depends on 'parsed_arguments'. For instance, the --init and --clean options will cause the init_repo() and clean_repo() functions to be called. Multiple operations can be executed in one invocation of the CLI. parsed_arguments: The parsed arguments returned by argparse.parse_args(). securesystemslib.exceptions.Error, if any of the arguments are improperly formatted or if any of the argument could not be processed. None. None. """ # Do we have a valid argparse Namespace? if not isinstance(parsed_arguments, argparse.Namespace): raise tuf.exceptions.Error('Invalid namespace: ' + repr(parsed_arguments)) else: logger.debug('We have a valid argparse Namespace.') # TODO: Make sure the order that the arguments are processed allows for the # most convenient use of multiple options in one invocation of the CLI. For # instance, it might be best for --clean to be processed first before --init # so that a user can do the following: repo.py --clean --init (that is, first # clear the repo in the current working directory, and then initialize a new # one. if parsed_arguments.clean: clean_repo(parsed_arguments) if parsed_arguments.init: init_repo(parsed_arguments) if parsed_arguments.remove: remove_targets(parsed_arguments) if parsed_arguments.add: add_targets(parsed_arguments) if parsed_arguments.distrust: remove_verification_key(parsed_arguments) if parsed_arguments.trust: add_verification_key(parsed_arguments) if parsed_arguments.key: gen_key(parsed_arguments) if parsed_arguments.revoke: revoke(parsed_arguments) if parsed_arguments.delegate: delegate(parsed_arguments) # --sign should be processed last, after the other options, so that metadata # is signed last after potentially being modified by the other options. if parsed_arguments.sign: sign_role(parsed_arguments) def delegate(parsed_arguments): if not parsed_arguments.delegatee: raise tuf.exceptions.Error( '--delegatee must be set to perform the delegation.') if parsed_arguments.delegatee in ('root', 'snapshot', 'timestamp', 'targets'): raise tuf.exceptions.Error( 'Cannot delegate to the top-level role: ' + repr(parsed_arguments.delegatee)) if not parsed_arguments.pubkeys: raise tuf.exceptions.Error( '--pubkeys must be set to perform the delegation.') public_keys = [] for public_key in parsed_arguments.pubkeys: imported_pubkey = import_publickey_from_file(public_key) public_keys.append(imported_pubkey) repository = repo_tool.load_repository( os.path.join(parsed_arguments.path, REPO_DIR)) if parsed_arguments.role == 'targets': repository.targets.delegate(parsed_arguments.delegatee, public_keys, parsed_arguments.delegate, parsed_arguments.threshold, parsed_arguments.terminating, list_of_targets=None, path_hash_prefixes=None) targets_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, TARGETS_KEY_NAME), parsed_arguments.targets_pw) repository.targets.load_signing_key(targets_private) # A delegated (non-top-level-Targets) role. else: repository.targets(parsed_arguments.role).delegate( parsed_arguments.delegatee, public_keys, parsed_arguments.delegate, parsed_arguments.threshold, parsed_arguments.terminating, list_of_targets=None, path_hash_prefixes=None) # Update the required top-level roles, Snapshot and Timestamp, to make a new # release. Automatically making a new release can be disabled via # --no_release. if not parsed_arguments.no_release: snapshot_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, SNAPSHOT_KEY_NAME), parsed_arguments.snapshot_pw) timestamp_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, TIMESTAMP_KEY_NAME), parsed_arguments.timestamp_pw) repository.snapshot.load_signing_key(snapshot_private) repository.timestamp.load_signing_key(timestamp_private) consistent_snapshot = tuf.roledb.get_roleinfo('root', repository._repository_name)['consistent_snapshot'] repository.writeall(consistent_snapshot=consistent_snapshot) # Move staged metadata directory to "live" metadata directory. write_to_live_repo(parsed_arguments) def revoke(parsed_arguments): repository = repo_tool.load_repository( os.path.join(parsed_arguments.path, REPO_DIR)) if parsed_arguments.role == 'targets': repository.targets.revoke(parsed_arguments.delegatee) targets_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, TARGETS_KEY_NAME), parsed_arguments.targets_pw) repository.targets.load_signing_key(targets_private) # A non-top-level role. else: repository.targets(parsed_arguments.role).revoke(parsed_arguments.delegatee) role_privatekey = import_privatekey_from_file(parsed_arguments.sign) repository.targets(parsed_arguments.role).load_signing_key(role_privatekey) # Update the required top-level roles, Snapshot and Timestamp, to make a new # release. Automatically making a new release can be disabled via # --no_release. if not parsed_arguments.no_release: snapshot_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, SNAPSHOT_KEY_NAME), parsed_arguments.snapshot_pw) timestamp_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, TIMESTAMP_KEY_NAME), parsed_arguments.timestamp_pw) repository.snapshot.load_signing_key(snapshot_private) repository.timestamp.load_signing_key(timestamp_private) consistent_snapshot = tuf.roledb.get_roleinfo('root', repository._repository_name)['consistent_snapshot'] repository.writeall(consistent_snapshot=consistent_snapshot) # Move staged metadata directory to "live" metadata directory. write_to_live_repo(parsed_arguments) def gen_key(parsed_arguments): if parsed_arguments.filename: parsed_arguments.filename = os.path.join(parsed_arguments.path, KEYSTORE_DIR, parsed_arguments.filename) keypath = None if parsed_arguments.key not in SUPPORTED_CLI_KEYTYPES: tuf.exceptions.Error( 'Invalid key type: ' + repr(parsed_arguments.key) + '. Supported' ' key types: ' + repr(SUPPORTED_CLI_KEYTYPES)) elif parsed_arguments.key == ECDSA_KEYTYPE: keypath = securesystemslib.interface.generate_and_write_ecdsa_keypair( parsed_arguments.filename, password=parsed_arguments.pw) elif parsed_arguments.key == ED25519_KEYTYPE: keypath = securesystemslib.interface.generate_and_write_ed25519_keypair( parsed_arguments.filename, password=parsed_arguments.pw) # RSA key.. else: keypath = securesystemslib.interface.generate_and_write_rsa_keypair( parsed_arguments.filename, password=parsed_arguments.pw) # If a filename is not given, the generated keypair is saved to the current # working directory. By default, the keypair is written to .pub # and (private key). if not parsed_arguments.filename: privkey_repo_path = os.path.join(parsed_arguments.path, KEYSTORE_DIR, os.path.basename(keypath)) pubkey_repo_path = os.path.join(parsed_arguments.path, KEYSTORE_DIR, os.path.basename(keypath + '.pub')) securesystemslib.util.ensure_parent_dir(privkey_repo_path) securesystemslib.util.ensure_parent_dir(pubkey_repo_path) # Move them from the CWD to the repo's keystore. shutil.move(keypath, privkey_repo_path) shutil.move(keypath + '.pub', pubkey_repo_path) def import_privatekey_from_file(keypath, password=None): # Note: should securesystemslib support this functionality (import any # privatekey type)? # If the caller does not provide a password argument, prompt for one. # Password confirmation is disabled here, which should ideally happen only # when creating encrypted key files. if password is None: # pragma: no cover # It is safe to specify the full path of 'filepath' in the prompt and not # worry about leaking sensitive information about the key's location. # However, care should be taken when including the full path in exceptions # and log files. password = securesystemslib.interface.get_password('Enter a password for' ' the encrypted key (' + Fore.RED + repr(keypath) + Fore.RESET + '): ', confirm=False) # Does 'password' have the correct format? securesystemslib.formats.PASSWORD_SCHEMA.check_match(password) # Store the encrypted contents of 'filepath' prior to calling the decryption # routine. encrypted_key = None with open(keypath, 'rb') as file_object: encrypted_key = file_object.read() # Decrypt the loaded key file, calling the 'cryptography' library to generate # the derived encryption key from 'password'. Raise # 'securesystemslib.exceptions.CryptoError' if the decryption fails. try: key_object = securesystemslib.keys.decrypt_key(encrypted_key.decode('utf-8'), password) except securesystemslib.exceptions.CryptoError: try: logger.debug( 'Decryption failed. Attempting to import a private PEM instead.') key_object = securesystemslib.keys.import_rsakey_from_private_pem( encrypted_key, 'rsassa-pss-sha256', password) except securesystemslib.exceptions.CryptoError: raise tuf.exceptions.Error(repr(keypath) + ' cannot be imported, possibly' ' because an invalid key file is given or the decryption password is' ' incorrect.') if key_object['keytype'] not in SUPPORTED_KEY_TYPES: raise tuf.exceptions.Error('Trying to import an unsupported key' ' type: ' + repr(key_object['keytype'] + '.' ' Supported key types: ' + repr(SUPPORTED_KEY_TYPES))) else: # Add "keyid_hash_algorithms" so that equal keys with different keyids can # be associated using supported keyid_hash_algorithms. key_object['keyid_hash_algorithms'] = securesystemslib.settings.HASH_ALGORITHMS return key_object def import_publickey_from_file(keypath): try: key_metadata = securesystemslib.util.load_json_file(keypath) # An RSA public key is saved to disk in PEM format (not JSON), so the # load_json_file() call above can fail for this reason. Try to potentially # load the PEM string in keypath if an exception is raised. except securesystemslib.exceptions.Error: key_metadata = securesystemslib.interface.import_rsa_publickey_from_file( keypath) key_object, junk = securesystemslib.keys.format_metadata_to_key(key_metadata) if key_object['keytype'] not in SUPPORTED_KEY_TYPES: raise tuf.exceptions.Error('Trying to import an unsupported key' ' type: ' + repr(key_object['keytype'] + '.' ' Supported key types: ' + repr(SUPPORTED_KEY_TYPES))) else: return key_object def add_verification_key(parsed_arguments): if not parsed_arguments.pubkeys: raise tuf.exceptions.Error('--pubkeys must be given with --trust.') repository = repo_tool.load_repository( os.path.join(parsed_arguments.path, REPO_DIR)) for keypath in parsed_arguments.pubkeys: imported_pubkey = import_publickey_from_file(keypath) if parsed_arguments.role not in ('root', 'targets', 'snapshot', 'timestamp'): raise tuf.exceptions.Error('The given --role is not a top-level role.') elif parsed_arguments.role == 'root': repository.root.add_verification_key(imported_pubkey) elif parsed_arguments.role == 'targets': repository.targets.add_verification_key(imported_pubkey) elif parsed_arguments.role == 'snapshot': repository.snapshot.add_verification_key(imported_pubkey) # The timestamp role.. else: repository.timestamp.add_verification_key(imported_pubkey) consistent_snapshot = tuf.roledb.get_roleinfo('root', repository._repository_name)['consistent_snapshot'] repository.write('root', consistent_snapshot=consistent_snapshot, increment_version_number=False) # Move staged metadata directory to "live" metadata directory. write_to_live_repo(parsed_arguments) def remove_verification_key(parsed_arguments): if not parsed_arguments.pubkeys: raise tuf.exceptions.Error('--pubkeys must be given with --distrust.') repository = repo_tool.load_repository( os.path.join(parsed_arguments.path, REPO_DIR)) for keypath in parsed_arguments.pubkeys: imported_pubkey = import_publickey_from_file(keypath) try: if parsed_arguments.role not in ('root', 'targets', 'snapshot', 'timestamp'): raise tuf.exceptions.Error('The given --role is not a top-level role.') elif parsed_arguments.role == 'root': repository.root.remove_verification_key(imported_pubkey) elif parsed_arguments.role == 'targets': repository.targets.remove_verification_key(imported_pubkey) elif parsed_arguments.role == 'snapshot': repository.snapshot.remove_verification_key(imported_pubkey) # The Timestamp key.. else: repository.timestamp.remove_verification_key(imported_pubkey) # It is assumed remove_verification_key() only raises # securesystemslib.exceptions.Error and # securesystemslib.exceptions.FormatError, and the latter is not raised # bacause a valid key should have been returned by # import_publickey_from_file(). except securesystemslib.exceptions.Error: print(repr(keypath) + ' is not a trusted key. Skipping.') consistent_snapshot = tuf.roledb.get_roleinfo('root', repository._repository_name)['consistent_snapshot'] repository.write('root', consistent_snapshot=consistent_snapshot, increment_version_number=False) # Move staged metadata directory to "live" metadata directory. write_to_live_repo(parsed_arguments) def sign_role(parsed_arguments): repository = repo_tool.load_repository( os.path.join(parsed_arguments.path, REPO_DIR)) consistent_snapshot = tuf.roledb.get_roleinfo('root', repository._repository_name)['consistent_snapshot'] for keypath in parsed_arguments.sign: role_privatekey = import_privatekey_from_file(keypath) if parsed_arguments.role == 'targets': repository.targets.load_signing_key(role_privatekey) elif parsed_arguments.role == 'root': repository.root.load_signing_key(role_privatekey) elif parsed_arguments.role == 'snapshot': repository.snapshot.load_signing_key(role_privatekey) elif parsed_arguments.role == 'timestamp': repository.timestamp.load_signing_key(role_privatekey) else: # TODO: repository_tool.py will be refactored to clean up the following # code, which adds and signs for a non-existent role. if not tuf.roledb.role_exists(parsed_arguments.role): # Load the private key keydb and set the roleinfo in roledb so that # metadata can be written with repository.write(). tuf.keydb.remove_key(role_privatekey['keyid'], repository_name = repository._repository_name) tuf.keydb.add_key( role_privatekey, repository_name = repository._repository_name) # Set the delegated metadata file to expire in 3 months. expiration = tuf.formats.unix_timestamp_to_datetime( int(time.time() + 7889230)) expiration = expiration.isoformat() + 'Z' roleinfo = {'name': parsed_arguments.role, 'keyids': [role_privatekey['keyid']], 'signing_keyids': [role_privatekey['keyid']], 'partial_loaded': False, 'paths': {}, 'signatures': [], 'version': 1, 'expires': expiration, 'delegations': {'keys': {}, 'roles': []}} tuf.roledb.add_role(parsed_arguments.role, roleinfo, repository_name=repository._repository_name) # Generate the Targets object of --role, and add it to the top-level # 'targets' object. new_targets_object = repo_tool.Targets(repository._targets_directory, parsed_arguments.role, roleinfo, repository_name=repository._repository_name) repository.targets._delegated_roles[parsed_arguments.role] = new_targets_object else: repository.targets(parsed_arguments.role).load_signing_key(role_privatekey) # Write the Targets metadata now that it's been modified. Once write() is # called on a role, it is no longer considered "dirty" and the role will not # be written again if another write() or writeall() were subsequently made. repository.write(parsed_arguments.role, consistent_snapshot=consistent_snapshot, increment_version_number=False) # Write the updated top-level roles, if any. Also write Snapshot and # Timestamp to make a new release. Automatically making a new release can be # disabled via --no_release. if not parsed_arguments.no_release: snapshot_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, SNAPSHOT_KEY_NAME), parsed_arguments.snapshot_pw) timestamp_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, TIMESTAMP_KEY_NAME), parsed_arguments.timestamp_pw) repository.snapshot.load_signing_key(snapshot_private) repository.timestamp.load_signing_key(timestamp_private) repository.writeall(consistent_snapshot=consistent_snapshot) # Move staged metadata directory to "live" metadata directory. write_to_live_repo(parsed_arguments) def clean_repo(parsed_arguments): repo_dir = os.path.join(parsed_arguments.path, REPO_DIR) client_dir = os.path.join(parsed_arguments.path, CLIENT_DIR) keystore_dir = os.path.join(parsed_arguments.path, KEYSTORE_DIR) shutil.rmtree(repo_dir, ignore_errors=True) shutil.rmtree(client_dir, ignore_errors=True) shutil.rmtree(keystore_dir, ignore_errors=True) def write_to_live_repo(parsed_arguments): staged_meta_directory = os.path.join( parsed_arguments.path, REPO_DIR, STAGED_METADATA_DIR) live_meta_directory = os.path.join( parsed_arguments.path, REPO_DIR, METADATA_DIR) shutil.rmtree(live_meta_directory, ignore_errors=True) shutil.copytree(staged_meta_directory, live_meta_directory) def add_target_to_repo(parsed_arguments, target_path, repo_targets_path, repository, custom=None): """ (1) Copy 'target_path' to 'repo_targets_path'. (2) Add 'target_path' to Targets metadata of 'repository'. """ if custom is None: custom = {} if not os.path.exists(target_path): logger.debug(repr(target_path) + ' does not exist. Skipping.') else: securesystemslib.util.ensure_parent_dir( os.path.join(repo_targets_path, target_path)) shutil.copy(target_path, os.path.join(repo_targets_path, target_path)) roleinfo = tuf.roledb.get_roleinfo( parsed_arguments.role, repository_name=repository._repository_name) # It is assumed we have a delegated role, and that the caller has made # sure to reject top-level roles specified with --role. if target_path not in roleinfo['paths']: logger.debug('Adding new target: ' + repr(target_path)) roleinfo['paths'].update({target_path: custom}) else: logger.debug('Replacing target: ' + repr(target_path)) roleinfo['paths'].update({target_path: custom}) tuf.roledb.update_roleinfo(parsed_arguments.role, roleinfo, mark_role_as_dirty=True, repository_name=repository._repository_name) def remove_target_files_from_metadata(parsed_arguments, repository): if parsed_arguments.role in ('root', 'snapshot', 'timestamp'): raise tuf.exceptions.Error( 'Invalid rolename specified: ' + repr(parsed_arguments.role) + '.' ' It must be "targets" or a delegated rolename.') else: # NOTE: The following approach of using tuf.roledb to update the target # files will be modified in the future when the repository tool's API is # refactored. roleinfo = tuf.roledb.get_roleinfo( parsed_arguments.role, repository._repository_name) for glob_pattern in parsed_arguments.remove: for path in list(six.iterkeys(roleinfo['paths'])): if fnmatch.fnmatch(path, glob_pattern): del roleinfo['paths'][path] else: logger.debug('Delegated path ' + repr(path) + ' does not match' ' given path/glob pattern ' + repr(glob_pattern)) continue tuf.roledb.update_roleinfo( parsed_arguments.role, roleinfo, mark_role_as_dirty=True, repository_name=repository._repository_name) def add_targets(parsed_arguments): repo_targets_path = os.path.join(parsed_arguments.path, REPO_DIR, 'targets') repository = repo_tool.load_repository( os.path.join(parsed_arguments.path, REPO_DIR)) # Copy the target files in --path to the repo directory, and # add them to Targets metadata. Make sure to also copy & add files # in directories (and subdirectories, if --recursive is True). for target_path in parsed_arguments.add: if os.path.isdir(target_path): for sub_target_path in repository.get_filepaths_in_directory( target_path, parsed_arguments.recursive): add_target_to_repo(parsed_arguments, sub_target_path, repo_targets_path, repository) else: add_target_to_repo(parsed_arguments, target_path, repo_targets_path, repository) consistent_snapshot = tuf.roledb.get_roleinfo('root', repository._repository_name)['consistent_snapshot'] if parsed_arguments.role == 'targets': # Load the top-level, non-root, keys to make a new release. targets_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, TARGETS_KEY_NAME), parsed_arguments.targets_pw) repository.targets.load_signing_key(targets_private) elif parsed_arguments.role not in ('root', 'snapshot', 'timestamp'): repository.write(parsed_arguments.role, consistent_snapshot=consistent_snapshot, increment_version_number=True) return # Update the required top-level roles, Snapshot and Timestamp, to make a new # release. Automatically making a new release can be disabled via # --no_release. if not parsed_arguments.no_release: snapshot_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, SNAPSHOT_KEY_NAME), parsed_arguments.snapshot_pw) timestamp_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, TIMESTAMP_KEY_NAME), parsed_arguments.timestamp_pw) repository.snapshot.load_signing_key(snapshot_private) repository.timestamp.load_signing_key(timestamp_private) repository.writeall(consistent_snapshot=consistent_snapshot) # Move staged metadata directory to "live" metadata directory. write_to_live_repo(parsed_arguments) def remove_targets(parsed_arguments): repository = repo_tool.load_repository( os.path.join(parsed_arguments.path, REPO_DIR)) # Remove target files from the Targets metadata (or the role specified in # --role) that match the glob patterns specified in --remove. remove_target_files_from_metadata(parsed_arguments, repository) # Examples of how the --pw command-line option is interpreted: # repo.py --init': parsed_arguments.pw = 'pw' # repo.py --init --pw my_password: parsed_arguments.pw = 'my_password' # repo.py --init --pw: The user is prompted for a password, as follows: if not parsed_arguments.pw: parsed_arguments.pw = securesystemslib.interface.get_password( prompt='Enter a password for the top-level role keys: ', confirm=True) targets_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, TARGETS_KEY_NAME), parsed_arguments.targets_pw) repository.targets.load_signing_key(targets_private) # Load the top-level keys for Snapshot and Timestamp to make a new release. # Automatically making a new release can be disabled via --no_release. if not parsed_arguments.no_release: snapshot_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, SNAPSHOT_KEY_NAME), parsed_arguments.snapshot_pw) timestamp_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, TIMESTAMP_KEY_NAME), parsed_arguments.timestamp_pw) repository.snapshot.load_signing_key(snapshot_private) repository.timestamp.load_signing_key(timestamp_private) consistent_snapshot = tuf.roledb.get_roleinfo('root', repository._repository_name)['consistent_snapshot'] repository.writeall(consistent_snapshot=consistent_snapshot) # Move staged metadata directory to "live" metadata directory. write_to_live_repo(parsed_arguments) def init_repo(parsed_arguments): """ Create a repo at the specified location in --path (the current working directory, by default). Each top-level role has one key, if --bare' is False (default). """ repo_path = os.path.join(parsed_arguments.path, REPO_DIR) repository = repo_tool.create_new_repository(repo_path) if not parsed_arguments.bare: set_top_level_keys(repository, parsed_arguments) repository.writeall(consistent_snapshot=parsed_arguments.consistent) else: repository.write( 'root', consistent_snapshot=parsed_arguments.consistent) repository.write('targets', consistent_snapshot=parsed_arguments.consistent) repository.write('snapshot', consistent_snapshot=parsed_arguments.consistent) repository.write('timestamp', consistent_snapshot=parsed_arguments.consistent) write_to_live_repo(parsed_arguments) # Create the client files. The client directory contains the required # directory structure and metadata files for clients to successfully perform # an update. repo_tool.create_tuf_client_directory( os.path.join(parsed_arguments.path, REPO_DIR), os.path.join(parsed_arguments.path, CLIENT_DIR, REPO_DIR)) def set_top_level_keys(repository, parsed_arguments): """ Generate, write, and set the top-level keys. 'repository' is modified. """ # Examples of how the --pw command-line option is interpreted: # repo.py --init': parsed_arguments.pw = 'pw' # repo.py --init --pw my_pw: parsed_arguments.pw = 'my_pw' # repo.py --init --pw: The user is prompted for a password, here. if not parsed_arguments.pw: parsed_arguments.pw = securesystemslib.interface.get_password( prompt='Enter a password for the top-level role keys: ', confirm=True) repo_tool.generate_and_write_ed25519_keypair( os.path.join(parsed_arguments.path, KEYSTORE_DIR, ROOT_KEY_NAME), password=parsed_arguments.root_pw) repo_tool.generate_and_write_ed25519_keypair( os.path.join(parsed_arguments.path, KEYSTORE_DIR, TARGETS_KEY_NAME), password=parsed_arguments.targets_pw) repo_tool.generate_and_write_ed25519_keypair( os.path.join(parsed_arguments.path, KEYSTORE_DIR, SNAPSHOT_KEY_NAME), password=parsed_arguments.snapshot_pw) repo_tool.generate_and_write_ed25519_keypair( os.path.join(parsed_arguments.path, KEYSTORE_DIR, TIMESTAMP_KEY_NAME), password=parsed_arguments.timestamp_pw) # Import the private keys. They are needed to generate the signatures # included in metadata. root_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, ROOT_KEY_NAME), parsed_arguments.root_pw) targets_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, TARGETS_KEY_NAME), parsed_arguments.targets_pw) snapshot_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, SNAPSHOT_KEY_NAME), parsed_arguments.snapshot_pw) timestamp_private = import_privatekey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, TIMESTAMP_KEY_NAME), parsed_arguments.timestamp_pw) # Import the public keys. They are needed so that metadata roles are # assigned verification keys, which clients need in order to verify the # signatures created by the corresponding private keys. root_public = import_publickey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, ROOT_KEY_NAME) + '.pub') targets_public = import_publickey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, TARGETS_KEY_NAME) + '.pub') snapshot_public = import_publickey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, SNAPSHOT_KEY_NAME) + '.pub') timestamp_public = import_publickey_from_file( os.path.join(parsed_arguments.path, KEYSTORE_DIR, TIMESTAMP_KEY_NAME) + '.pub') # Add the verification keys to the top-level roles. repository.root.add_verification_key(root_public) repository.targets.add_verification_key(targets_public) repository.snapshot.add_verification_key(snapshot_public) repository.timestamp.add_verification_key(timestamp_public) # Load the previously imported signing keys for the top-level roles so that # valid metadata can be written. repository.root.load_signing_key(root_private) repository.targets.load_signing_key(targets_private) repository.snapshot.load_signing_key(snapshot_private) repository.timestamp.load_signing_key(timestamp_private) def parse_arguments(): """ Parse the command-line arguments. Also set the logging level, as specified via the --verbose argument (2, by default). Example: # Create a TUF repository in the current working directory. The # top-level roles are created, each containing one key. $ repo.py --init $ repo.py --init --bare --consistent --verbose 3 If a required argument is unset, a parser error is printed and the script exits. None. None. Sets the logging level for TUF logging. A tuple ('options.REPOSITORY_PATH', command, command_arguments). 'command' 'command_arguments' correspond to a repository tool fuction. """ parser = argparse.ArgumentParser( description='Create or modify a TUF repository.') parser.add_argument('-i', '--init', action='store_true', help='Create a repository. The "tufrepo", "tufkeystore", and' ' "tufclient" directories are created in the current working' ' directory, unless --path is specified.') parser.add_argument('-p', '--path', nargs='?', default='.', metavar='', help='Specify a repository path. If used' ' with --init, the initialized repository is saved to the given' ' path.') parser.add_argument('-b', '--bare', action='store_true', help='If initializing a repository, neither create nor set keys' ' for any of the top-level roles. False, by default.') parser.add_argument('--no_release', action='store_true', help='Do not automatically sign Snapshot and Timestamp metadata.' ' False, by default.') parser.add_argument('--consistent', action='store_true', help='Set consistent snapshots for an initialized repository.' ' Consistent snapshot is False by default.') parser.add_argument('-c', '--clean', type=str, nargs='?', const='.', metavar='', help='Delete the repo files from the' ' specified directory. If a directory is not specified, the current' ' working directory is cleaned.') parser.add_argument('-a', '--add', type=str, nargs='+', metavar='', help='Add one or more target files to the' ' "targets" role (or the role specified in --role). If a directory' ' is given, all files in the directory are added.') parser.add_argument('--remove', type=str, nargs='+', metavar='', help='Remove one or more target files from the' ' "targets" role (or the role specified in --role).') parser.add_argument('--role', nargs='?', type=str, const='targets', default='targets', metavar='', help='Specify a rolename.' ' The rolename "targets" is used by default.') parser.add_argument('-r', '--recursive', action='store_true', help='By setting -r, any directory specified with --add is processed' ' recursively. If unset, the default behavior is to not add target' ' files in subdirectories.') parser.add_argument('-k', '--key', type=str, nargs='?', const=ED25519_KEYTYPE, default=None, choices=[ECDSA_KEYTYPE, ED25519_KEYTYPE, RSA_KEYTYPE], help='Generate an ECDSA, Ed25519, or RSA key. An Ed25519 key is' ' created if the key type is unspecified.') parser.add_argument('--filename', nargs='?', default=None, const=None, metavar='', help='Specify a filename. This option can' ' be used to name a generated key file. The top-level keys should' ' be named "root_key", "targets_key", "snapshot_key", "timestamp_key."') parser.add_argument('--trust', action='store_true', help='Indicate the trusted key(s) (via --pubkeys) for the role in --role.' ' This action modifies Root metadata with the trusted key(s).') parser.add_argument('--distrust', action='store_true', help='Discontinue trust of key(s) (via --pubkeys) for the role in --role.' ' This action modifies Root metadata by removing trusted key(s).') parser.add_argument('--sign', nargs='+', type=str, metavar='', help='Sign the "targets"' ' metadata (or the one for --role) with the specified key(s).') parser.add_argument('--pw', nargs='?', default='pw', metavar='', help='Specify a password. "pw" is used if --pw is unset, or a' ' password can be entered via a prompt by specifying --pw by itself.' ' This option can be used with --sign and --key.') parser.add_argument('--root_pw', nargs='?', default='pw', metavar='', help='Specify a Root password. "pw" is used if --pw is unset, or a' ' password can be entered via a prompt by specifying --pw by itself.') parser.add_argument('--targets_pw', nargs='?', default='pw', metavar='', help='Specify a Targets password. "pw" is used if --pw is unset, or a' ' password can be entered via a prompt by specifying --pw by itself.') parser.add_argument('--snapshot_pw', nargs='?', default='pw', metavar='', help='Specify a Snapshot password. "pw" is used if --pw is unset, or a' ' password can be entered via a prompt by specifying --pw by itself.') parser.add_argument('--timestamp_pw', nargs='?', default='pw', metavar='', help='Specify a Timestamp password. "pw" is used if --pw is unset, or a' ' password can be entered via a prompt by specifying --pw by itself.') parser.add_argument('-d', '--delegate', type=str, nargs='+', metavar='', help='Delegate trust of target files' ' from the "targets" role (or --role) to some other role (--delegatee).' ' The named delegatee is trusted to sign for the target files that' ' match the glob pattern(s).') parser.add_argument('--delegatee', nargs='?', type=str, const=None, default=None, metavar='', help='Specify the rolename' ' of the delegated role. Can be used with --delegate.') parser.add_argument('-t', '--terminating', action='store_true', help='Set the terminating flag to True. Can be used with --delegate.') parser.add_argument('--threshold', type=int, default=1, metavar='', help='Set the threshold number of signatures' ' needed to validate a metadata file. Can be used with --delegate.') parser.add_argument('--pubkeys', type=str, nargs='+', metavar='', help='Specify one or more public keys' ' for the delegated role. Can be used with --delegate.') parser.add_argument('--revoke', action='store_true', help='Revoke trust of target files from a delegated role.') # Add the parser arguments supported by PROG_NAME. parser.add_argument('-v', '--verbose', type=int, default=2, choices=range(0, 6), help='Set the verbosity level of logging messages.' ' The lower the setting, the greater the verbosity. Supported logging' ' levels: 0=UNSET, 1=DEBUG, 2=INFO, 3=WARNING, 4=ERROR,' ' 5=CRITICAL') # Should we include usage examples in the help output? parsed_args = parser.parse_args() # Set the logging level. logging_levels = [logging.NOTSET, logging.DEBUG, logging.INFO, logging.WARNING, logging.ERROR, logging.CRITICAL] tuf.log.set_log_level(logging_levels[parsed_args.verbose]) return parsed_args if __name__ == '__main__': # Parse the arguments and set the logging level. arguments = parse_arguments() # Create or modify the repository depending on the option specified on the # command line. For example, the following adds the 'foo.bar.gz' to the # default repository and updates the relevant metadata (i.e., Targets, # Snapshot, and Timestamp metadata are updated): # $ repo.py --add foo.bar.gz try: process_command_line_arguments(arguments) except (tuf.exceptions.Error) as e: sys.stderr.write('Error: ' + str(e) + '\n') sys.exit(1) # Successfully created or updated the TUF repository. sys.exit(0)