This commit is contained in:
dachshund 2013-03-01 11:11:53 -05:00
commit f7bec19e46
12 changed files with 1021 additions and 1056 deletions

View file

@ -31,8 +31,3 @@
# which already exists and within that directory should have the file
# 'metadata/current/root.txt'. This must be set!
repository_directory = None
# A directory where you may find certificate authorities
# https://en.wikipedia.org/wiki/Certificate_authority
# http://docs.python.org/2/library/ssl.html#certificates
ca_certs = None

View file

@ -25,15 +25,12 @@
import urllib2
import logging
import tuf.conf
import tuf.hash
import tuf.util
import tuf.formats
import tuf.urllib2_ssl
# See 'log.py' to learn how logging is handled in TUF.
logger = logging.getLogger('tuf.download')
_opener = None
def _open_connection(url):
@ -73,28 +70,13 @@ def _open_connection(url):
# 'Mozilla/4.0 (compatible; MSIE 7.0; Windows NT 5.1)' this can be useful if
# servers do not recognize connections that originates from
# Python-urllib/x.y.
global _opener
if _opener is None:
# If user has not asked for SSL certificate verification,
# use default opener.
if tuf.conf.ca_certs is None:
_opener = urllib2.build_opener()
# Otherwise, use an opener which will provide SSL certificate
# verification.
else:
_opener = urllib2.build_opener(
tuf.urllib2_ssl.HTTPSHandler(
ca_certs = tuf.conf.ca_certs
)
)
response = _opener.open( url )
request = urllib2.Request(url)
connection = urllib2.urlopen(request)
# urllib2.urlopen returns a file-like object: a handle to the remote data.
return connection
except Exception, e:
raise tuf.DownloadError(e)
# urllib2.urlopen returns a file-like object: a handle to the remote data.
return response

View file

@ -33,7 +33,7 @@
of the repository. For example, the repository owner wants to change the
'targets.txt' signing key. The owner would run 'signercli.py' to
generate a new RSA key, add the new key to the configuration file created
by 'quickstart.py', and then run 'signercli' to update the metadata files.
by 'quickstart.py', and then run 'signercli.py' to update the metadata files.
<Usage>
$ python signercli.py --<option> <keystore_directory>
@ -140,14 +140,45 @@ def _get_metadata_directory():
def _list_keyids(keystore_directory):
def _list_keyids(keystore_directory, metadata_directory):
"""
List the key files found in 'keystore_directory'.
It is assumed the directory exists and has been validated by
the caller. The keyids are listed without the '.key' extension.
It is assumed the directory arguments exist and have been validated by
the caller. The keyids are listed without the '.key' extension,
along with their associated roles.
"""
# Determine the 'root.txt' filename. This metadata file is needed
# to extract the keyids belonging to the top-level roles.
filenames = tuf.repo.signerlib.get_metadata_filenames(metadata_directory)
root_filename = filenames['root']
# Load the root metadata file. The loaded object should conform to
# 'tuf.formats.SIGNABLE_SCHEMA'.
metadata_signable = tuf.util.load_json_file(root_filename)
# Ensure the loaded json object is properly formatted.
try:
tuf.formats.check_signable_object_format(metadata_signable)
except tuf.FormatError, e:
message = 'Invalid metadata format: '+repr(root_filename)+'.'
raise tuf.RepositoryError(message)
# Extract the 'signed' role object from 'metadata_signable'.
root_metadata = metadata_signable['signed']
# Extract the 'roles' dict, where the dict keys are top-level roles and dict
# values a dictionary containing a list of corresponding keyids and a
# threshold.
top_level_keyids = root_metadata['roles']
# Determine the keyids associated with all the targets roles.
try:
targets_keyids = tuf.repo.signerlib.get_target_keyids(metadata_directory)
except tuf.FormatError, e:
raise tuf.RepositoryError('Format error: '+str(e))
# Extract the key files ending in a '.key' extension.
key_paths = []
for filename in os.listdir(keystore_directory):
@ -155,10 +186,32 @@ def _list_keyids(keystore_directory):
if filename.endswith('.key') and not os.path.isdir(full_path):
key_paths.append(filename)
# Print the keys without the '.key' extension.
logger.info('Listing the keyids in '+repr(keystore_directory))
# For each keyid listed in the keystore, search 'top_level_keyids'
# and 'targets_keyids' for a possible entry. 'keyids_dict' stores
# the associated roles for each keyid.
keyids_dict = {}
for keyid in key_paths:
logger.info(keyid[0:keyid.rfind('.key')])
# Strip the '.key' extension. These raw keyids are needed to search
# for the roles attached to them in the metadata files.
keyid = keyid[0:keyid.rfind('.key')]
keyids_dict[keyid] = []
# Is 'keyid' listed in any of the top-level roles?
for top_level_role in top_level_keyids:
if keyid in top_level_keyids[top_level_role]['keyids']:
# To avoid a duplicate, ignore the 'targets.txt' role for now.
# 'targets_keyids' will also contain the keyids for this top-level role.
if top_level_role != 'targets':
keyids_dict[keyid].append(top_level_role)
# Is 'keyid' listed in any of the targets roles?
for targets_role, keyids in targets_keyids.items():
if keyid in keyids:
keyids_dict[keyid].append(targets_role)
# Print the keyids without the '.key' extension and the roles
# associated with them.
logger.info('Listing the keyids in '+repr(keystore_directory))
for keyid in keyids_dict:
logger.info(keyid+' : '+str(keyids_dict[keyid]))
@ -371,9 +424,18 @@ def change_password(keystore_directory):
# Verify the 'keystore_directory' argument.
keystore_directory = _check_directory(keystore_directory)
# Retrieve the metadata directory. The 'root.txt' and all the targets
# metadata are needed to extract rolenames and their corresponding
# keyids.
try:
metadata_directory = _get_metadata_directory()
except (tuf.FormatError, tuf.Error), e:
message = str(e)+'\n'
raise tuf.RepositoryError(message)
# List the keyids in the keystore and prompt the user for the keyid they
# wish to modify.
_list_keyids(keystore_directory)
_list_keyids(keystore_directory, metadata_directory)
# Retrieve the keyid from the user.
message = '\nEnter the keyid for the password you wish to change: '
@ -460,8 +522,8 @@ def generate_rsa_key(keystore_directory):
def list_signing_keys(keystore_directory):
"""
<Purpose>
Print the key IDs of the signing keys listed in the keystore
directory.
Print the key IDs of the signing keys listed in the keystore directory.
The associated roles of each keyid is also listed.
<Arguments>
keystore_directory:
@ -469,7 +531,8 @@ def list_signing_keys(keystore_directory):
in '.key').
<Exceptions>
tuf.RepositoryError, if the keystore directory is invalid.
tuf.RepositoryError, if the keystore directory is invalid or if the
required metadata files cannot be read.
<Side Effects>
None.
@ -482,7 +545,16 @@ def list_signing_keys(keystore_directory):
# Verify the 'keystore_directory' argument.
keystore_directory = _check_directory(keystore_directory)
_list_keyids(keystore_directory)
# Retrieve the metadata directory. The 'root.txt' file and all the metadata
# for the targets roles are needed to extract rolenames and their associated
# keyids.
try:
metadata_directory = _get_metadata_directory()
except (tuf.FormatError, tuf.Error), e:
message = str(e)+'\n'
raise tuf.RepositoryError(message)
_list_keyids(keystore_directory, metadata_directory)
@ -518,8 +590,17 @@ def dump_key(keystore_directory):
# Verify the 'keystore_directory' argument.
keystore_directory = _check_directory(keystore_directory)
# Retrieve the metadata directory. The 'root.txt' and all the targets
# role metadata files are needed to extract rolenames and their corresponding
# keyids.
try:
metadata_directory = _get_metadata_directory()
except (tuf.FormatError, tuf.Error), e:
message = str(e)+'\n'
raise tuf.RepositoryError(message)
# List the keyids found in 'keystore_directory', minus the '.key' extension.
_list_keyids(keystore_directory)
_list_keyids(keystore_directory, metadata_directory)
# Retrieve the keyid and password from the user.
message = '\nEnter the keyid for the signing key you wish to dump: '
@ -823,8 +904,17 @@ def sign_metadata_file(keystore_directory):
# Verify the 'keystore_directory' argument.
keystore_directory = _check_directory(keystore_directory)
# Retrieve the metadata directory. The 'root.txt' and all the targets
# role metadata files are needed to extract rolenames and their corresponding
# keyids.
try:
metadata_directory = _get_metadata_directory()
except (tuf.FormatError, tuf.Error), e:
message = str(e)+'\n'
raise tuf.RepositoryError(message)
# List the keyids available in the keystore.
_list_keyids(keystore_directory)
_list_keyids(keystore_directory, metadata_directory)
# Retrieve the keyids of the signing keys from the user.
logger.info('The keyids that will sign the metadata file must be loaded.')
@ -890,7 +980,7 @@ def make_delegation(keystore_directory):
# Get the delegated role's target directory, which should be located within
# the repository's targets directory. We need this directory to generate the
# delegated role's target paths.
prompt = '\nNOTE: The directory entered below should be located within the '+\
prompt = '\nThe directory entered below should be located within the '+\
'repository\'s targets directory.\nEnter the directory containing the '+\
'delegated role\'s target files: '
delegated_targets_directory = _prompt(prompt, str)
@ -912,7 +1002,8 @@ def make_delegation(keystore_directory):
# Load the delegated role specified by the user. The delegated role must be
# loaded so its metadata file can be created.
delegated_role, delegated_keyids = _get_delegated_role(keystore_directory)
delegated_role, delegated_keyids = _get_delegated_role(keystore_directory,
metadata_directory)
# Create, sign, and write the delegated role's metadata file.
delegated_paths = _make_delegated_metadata(metadata_directory,
@ -986,7 +1077,7 @@ def _load_parent_role(metadata_directory, keystore_directory, targets_roles):
def _get_delegated_role(keystore_directory):
def _get_delegated_role(keystore_directory, metadata_directory):
"""
Get the delegated role specified by the user. The user is presented with
a list of keyids available in the keystore and asked to enter the keyid
@ -1000,7 +1091,7 @@ def _get_delegated_role(keystore_directory):
# List the keyids available in the keystore. The user will next
# identify the keyids for the new delegated role.
_list_keyids(keystore_directory)
_list_keyids(keystore_directory, metadata_directory)
# Retrieve the delegated role\'s keyids from the user.
logger.info('The keyid of the delegated role must be loaded.')

View file

@ -797,7 +797,7 @@ def get_target_keyids(metadata_directory):
# Read the 'targets.txt' file. This file must exist.
targets_filepath = os.path.join(metadata_directory, 'targets.txt')
if not os.path.exists(targets_filepath):
raise RepositoryError('"targets.txt" not found')
raise tuf.RepositoryError('"targets.txt" not found')
# Read the contents of 'targets.txt' and save the signable.
targets_signable = tuf.util.load_json_file(targets_filepath)
@ -806,7 +806,7 @@ def get_target_keyids(metadata_directory):
try:
tuf.formats.check_signable_object_format(targets_signable)
except tuf.FormatError, e:
raise RepositoryError('"targets.txt" is improperly formatted')
raise tuf.RepositoryError('"targets.txt" is improperly formatted')
# Store the keyids of the 'targets' role. This target role is
# required.

View file

@ -1,107 +0,0 @@
"""
<Program Name>
replay_attack_setup.py
<Author>
Konstantin Andrianov
<Started>
February 22, 2012
<Copyright>
See LICENSE for licensing information.
<Purpose>
A helper module that provides a method that simulates a replay
attack.Simulate a replay attack.
"""
import os
import shutil
import tempfile
import test_system_setup as setup
def _tuf_update_meta_and_download_files(target):
"""
Client performs metadata update and downloads the files.
Method returns file content of the 'target'.
'target' should be a dictionary key from main rapository i.e.
one of setup['repo']['repo_files'] elements.
"""
setup.tuf_client_refresh_metadata()
setup.tuf_client_download_updates()
filename = setup.setup_info['repo'][target][0]
targetpath = os.path.join(setup.setup_info['dest_path'], filename)
msg = '[TUF] Failed to download files.'
assert os.path.exists(targetpath), msg
return setup.read_file_content(targetpath)
def replay_attack():
"""
<Purpose>
Illustrate replay attack vulnerability.
"""
# try block is used here to remove temporary files.
try:
# Internal setup.
repo_files = setup.setup_info['repo']['repo_files']
# Attacker saves on of the initial file, in which he found a voulnerability.
evil_dir = tempfile.mkdtemp(prefix='evil_', dir=os.getcwd())
shutil.copy(setup.setup_info['repo'][repo_files[0]][1], evil_dir)
# Client performs initial updates.
if setup.setup_info['tuf']:
downloaded_file_content = _tuf_update_meta_and_download_files(repo_files[0])
else:
downloaded_file_content = setup.client_download(repo_files[0])
# Content of the file at the repository.
file_content_at_repo = \
setup.read_file_content(setup.setup_info['repo'][repo_files[0]][1])
msg = '[Initial Updata] Failed to download the file.'
assert file_content_at_repo == downloaded_file_content, msg
# Developer patches 'repo_files[0]' file and updates the repository.
new_data = 'NewData'
setup.add_or_change_file_at_repository(repo_file=repo_files[0], data=new_data)
if setup.setup_info['tuf']:
# If TUF is implemented, the developer needs to refresh tuf repository.
setup.refresh_tuf_repository()
# Client downloads the patched file.
if setup.setup_info['tuf']:
downloaded_file_content = _tuf_update_meta_and_download_files(repo_files[0])
else:
downloaded_file_content = setup.client_download(repo_files[0])
msg = '[Updata] Failed to update the file.'
assert new_data == downloaded_file_content, msg
# Attacker tries to be clever, redirects clients to his repo.
rel_evil_dir = os.path.basename(evil_dir)
setup.setup_info['url'] = \
'http://localhost:'+str(setup.setup_info['port'])+'/'+rel_evil_dir+'/'
# Client downloads the updated file 'repo_files[0]' one more time.
if setup.setup_info['tuf']:
downloaded_file_content = _tuf_update_meta_and_download_files(repo_files[0])
else:
downloaded_file_content = setup.client_download(repo_files[0])
# Check whether the attack succeeded by inspecting the content of the
# update. The update should contain 'new_data'.
msg = 'Replay attack was succeeded!\n'
assert new_data == downloaded_file_content, msg
finally:
shutil.rmtree(evil_dir)

View file

@ -1,48 +0,0 @@
"""
<Program Name>
test_replay_attack.py
<Author>
Konstantin Andrianov
<Started>
February 22, 2012
<Copyright>
See LICENSE for licensing information.
<Purpose>
Simulate a replay attack. A simple client update vs. client update
implementing TUF.
Note: It's assumed that attacker does NOT have access to metadata signing
keys. Keep them safe!
"""
import test_system_setup
import replay_attack_setup
test_system_setup.init_repo(tuf=False)
try:
replay_attack_setup.replay_attack()
except AssertionError, e:
print 'Expected Failure: '+repr(e)
else:
print 'Unexpected Failure!'
test_system_setup.cleanup()
test_system_setup.init_repo(tuf=True)
try:
replay_attack_setup.replay_attack()
except AssertionError, e:
print 'Unexpected Failure: '+repr(e)
else:
print 'Expected Success!'
test_system_setup.cleanup()

View file

@ -1,617 +0,0 @@
"""
<Program Name>
test_system_setup.py
<Author>
Konstantin Andrianov
<Started>
February 19, 2012
<Copyright>
See LICENSE for licensing information.
<Purpose>
Provide automatic setup and clean-up functionality.
Initial repository looks like this:
simple server --> repository_dir
|
--------------------------
| |
file0 file1
This modules uses unittest module to provide easy setup and tear down
capability.
Essentially there are two choices: either a system that simply performs
update downloads without any protections or a system that utilizes TUF to
perform secure update downloads.
A structure that does NOT implementing TUF. A direct download over http.
Repository + Server <---------------> Client
The TUF structure is described bellow in the class and tuf_tearDown() docs.
Repository + TUF + Server <---------> TUF + Client
"""
# Repository setup. Repository will consist of a temporary directory
# with few files in it.
import os
import sys
import time
import shutil
import random
import urllib2
import logging
import tempfile
import subprocess
import tuf.client.updater
import tuf.repo.signerlib as signerlib
# Disable/Enable logging. Comment-out to Enable logging.
logging.getLogger('tuf')
logging.disable(logging.CRITICAL)
"""
client_download(repo_file):
Downloads a file ('repo_file') from the 'url' (described bellow).
Returns the contents of the file.
add_or_change_file_at_repository(filename, data):
Allows to add or change a file on the repository ('repo_path').
Modifies 'setup_info' and returns full path of the added/changed file.
delete_file_at_repository(filename):
Deletes a file on the repository ('repository_dir').
refresh_tuf_repository()
Updates TUF metadata and targets paths.
setup_info:
Main dictionary where all setup related information is stored.
Ex: setup_info['repo_path']
repo_path:
Repository directory, where updates are located.
repo:
file#:
Stores a tuple of file's basename and full path of the file.
setup_info['repo']['file0'] = (filename, fullpath)
Note:
*Access the files using the 'file#' (or whatever str) dictionary keys.
*'filename' is appended to the url in order to get the file.
repo_files:
A list that stores all dictionary keys of files available at the
repository that is all 'file#' dictionary keys described above.
setup_info['repo']['repo_files'] = ['file0', 'file1']
server_proc:
A subprocess object.
Ex: setup_info['server_proc']
url:
A loop-back address pointing to the repository directory.
Ex: setup_info['url']
TUF related variables. Refer to the diagram in init_tuf().
tuf_repo_path:
A tuf repository that contains all tuf related directories, such as
metadata, targets, and keystore. (Read docs on how to handle keys!)
tuf_repo:
metadata:
Metadata directory that contains metadata files and is located in the
tuf repository directory ('tuf_repository_dir').
Ex: setup_info['tuf_repo']['metadata']
keystore:
Contains all tuf keys, i.e. keys that are used to sign metadata roles.
It's located in the tuf repository directory ('tuf_repo_path').
Ex: setup_info['tuf_repo']['keystore']
targets:
All target files (updates) are stored in tuf targets directory.
Basically contains all 'repo_path' files (devel's repository).
It's located in the tuf repository directory ('tuf_repository_dir').
Ex: setup_info['tuf_repo']['targets']
tuf_client_path:
Client side tuf directory. It contains current and previous metadata
files.
Ex: setup_info['tuf_client_path']
tuf_client:
metadata_path:
Stores path of client's metadata directory.
Ex; setup_info['tuf_client']['metadata_path']
metadata:
Client needs to keep track of metadata files.
current:
Latest known to client metadata is stored here.
Ex: setup_info['tuf_client']['metadata']['current']
previous:
Previous metadata is stored here. It's used during metadata update
process.
Ex: setup_info['tuf_client']['metadata']['previous']
tuf_client_metadata_dir:
Contains current and previous versions of metadata files.
Note: metadata files are root.txt, targets.txt, release.txt and
timestamp.txt. There could be more metadata files such us mirrors.txt.
The metadata files are signed by their corresponding roles i.e. root,
targets etc.
More documentation is provided in comments and doc blocks.
"""
setup_info = {}
def init_repo(tuf=False):
# Repository directory with few files in it.
setup_info['tuf'] = tuf
setup_info['repo_path'] = tempfile.mkdtemp(dir=os.getcwd())
setup_info['dest_path'] = tempfile.mkdtemp(dir=os.getcwd())
setup_info['repo'] = {'repo_files':[]}
add_or_change_file_at_repository(repo_file='file0', data='SystemTestFile0')
add_or_change_file_at_repository(repo_file='file1', data='SystemTestFile1')
# Start a simple server pointing to the repository directory.
setup_info['port'] = port = random.randint(30000, 45000)
command = ['python', '-m', 'SimpleHTTPServer', str(port)]
setup_info['server_proc'] = subprocess.Popen(command, stderr=subprocess.PIPE)
# Tailor url for the repository.
repo_relpath = os.path.basename(setup_info['repo_path'])
setup_info['url'] = 'http://localhost:'+str(port)+'/'+repo_relpath+'/'
# NOTE: The delay is needed to make up for asynchronous subprocess.
# Otherwise following error might be raised:
# <urlopen error [Errno 111] Connection refused>
time.sleep(.1)
if tuf:
init_tuf()
def cleanup():
if not setup_info:
msg = 'init_repo() must be called before cleanup().\n'
sys.exit(msg)
if setup_info['server_proc'].returncode is None:
setup_info['server_proc'].kill()
print 'Server terminated.\n'
# Removing repository directory.
shutil.rmtree(setup_info['repo_path'])
shutil.rmtree(setup_info['dest_path'])
if setup_info['tuf']:
cleanup_tuf()
def add_or_change_file_at_repository(repo_file, data=None):
"""
<Purpose>
Adds or changes a file at the repository setup_info[repo_path].
<Arguments>
repo_file:
A key to setup_info[repo] dictionary.
data:
A string to write to the indicated file. If None, 'test' string is
used.
"""
if isinstance(repo_file, basestring):
if not setup_info['repo'].has_key(repo_file):
junk, filepath = tempfile.mkstemp(dir=setup_info['repo_path'])
filename = os.path.basename(filepath)
setup_info['repo'][repo_file] = [filename, filepath]
setup_info['repo']['repo_files'].append(repo_file)
fileobj = open(setup_info['repo'][repo_file][1], 'wb')
if data is None:
data = 'test'
fileobj.write(data)
fileobj.close()
return setup_info['repo'][repo_file][1]
msg = 'Nothing was added or changed. Provide a valid string.\n'
sys.exit(msg)
def delete_file_at_repository(repo_file):
"""
<Purpose>
Attempt to delete a file at the repository setup_info['repo_path'].
"""
if isinstance(repo_file, basestring):
if not setup_info['repo'].has_key(repo_file):
msg = 'Provide a valid dictionary key to remove the file.\n'
sys.exit(msg)
os.remove(setup_info['repo'][repo_file][1])
del setup_info['repo'][repo_file]
def _open_connection(url):
try:
request = urllib2.Request(url)
connection = urllib2.urlopen(request)
except Exception, e:
msg = 'Couldn\'t open connection: ' + repr(e)
sys.exit(msg)
return connection
def client_download(repo_file):
"""
<Purpose>
Attempt to download a file from repository without TUF.
"""
if not isinstance(repo_file, basestring) or \
not setup_info['repo'].has_key(repo_file):
msg = 'Provide a valid key for a file that exists on the repository.\n'
sys.exit(msg)
connection = _open_connection(setup_info['url']+setup_info['repo'][repo_file][0])
data = connection.read()
connection.close()
return data
def read_file_content(filepath):
try:
fileobj = open(filepath, 'rb')
except Exception, e:
raise
data = fileobj.read()
fileobj.close()
return data
def init_tuf():
"""
<Purpose>
Setup TUF directory structure and populated it with TUF metadata and
congfiguration files.
tuf_repository_dir
|
--------------------------------------------
| | |
keystore metadata targets
| | |
key.key files role.txt files targets (updates)
... ... ...
tuf_client_dir
|
metadata
|
---------------------------
| |
current previous
| |
role.txt files role.txt files
... ...
"""
passwd = 'test'
threshold = 1
# Setup TUF-repo directory structure.
setup_info['tuf_repo_path'] = \
tempfile.mkdtemp(prefix='tuf_repo_', dir=os.getcwd())
keystore_dir = os.path.join(setup_info['tuf_repo_path'], 'keystore')
metadata_dir = os.path.join(setup_info['tuf_repo_path'], 'metadata')
targets_dir = os.path.join(setup_info['tuf_repo_path'], 'targets')
setup_info['tuf_repo'] = {'keystore':keystore_dir,
'metadata':metadata_dir,
'targets':targets_dir}
os.mkdir(setup_info['tuf_repo']['keystore'])
os.mkdir(setup_info['tuf_repo']['metadata'])
shutil.copytree(setup_info['repo_path'], setup_info['tuf_repo']['targets'])
# Setting TUF-client directory structure.
# 'tuf.client.updater.py' expects the 'current' and 'previous'
# directories to exist under client's 'metadata' directory.
setup_info['tuf_client_path'] = tempfile.mkdtemp(suffix='tuf_client_', dir=os.getcwd())
tuf_client_metadata_dir = os.path.join(setup_info['tuf_client_path'], 'metadata')
current_dir = os.path.join(setup_info['tuf_client_path'], 'metadata', 'current')
previous_dir = os.path.join(setup_info['tuf_client_path'], 'metadata', 'previous')
setup_info['tuf_client'] = {'metadata_path': tuf_client_metadata_dir,
'metadata': {'current':current_dir,
'previous':previous_dir}}
os.mkdir(setup_info['tuf_client']['metadata_path'])
# Generate at least one rsa key.
key = signerlib.generate_and_save_rsa_key(keystore_dir, passwd)
keyid = [key['keyid']]
setup_info['keyid'] = keyid
# Set role info.
info = {'keyids': [key['keyid']], 'threshold': threshold}
# 'role_info' dictionary looks like this:
# {role : {'keyids : [keyid1, ...] , 'threshold' : 1}}
# In our case 'role_info[keyids]' will only have on entry since only one
# is being used.
role_info = {}
role_list = ['root', 'targets', 'release', 'timestamp']
for role in role_list:
role_info[role] = info
# At this point there is enough information to create TUF configuration
# and metadata files.
# Build the configuration file.
conf_path = signerlib.build_config_file(metadata_dir, 365, role_info)
# Generate the 'root.txt' metadata file.
signerlib.build_root_file(conf_path, keyid, metadata_dir)
# Generate the 'targets.txt' metadata file.
signerlib.build_targets_file(targets_dir, keyid, metadata_dir)
# Generate the 'release.txt' metadata file.
signerlib.build_release_file(keyid, metadata_dir)
# Generate the 'timestamp.txt' metadata file.
signerlib.build_timestamp_file(keyid, metadata_dir)
# Move the metadata to the client's 'current' and 'previous' directories.
shutil.copytree(metadata_dir, current_dir)
shutil.copytree(metadata_dir, previous_dir)
# The repository is now setup!
# Here is a mirrors dictionary that will allow a client to seek out
# places to download the metadata and targets from.
tuf_repo_relpath = os.path.basename(setup_info['tuf_repo_path'])
setup_info['tuf_url'] = 'http://localhost:'+ \
str(setup_info['port'])+'/'+tuf_repo_relpath+'/'
setup_info['mirrors'] = {'mirror1':
{'url_prefix': setup_info['tuf_url'],
'metadata_path': 'metadata',
'targets_path': 'targets',
'confined_target_dirs': ['']}}
# Adjusting configuration file (tuf.conf.py).
tuf.conf.repository_directory = setup_info['tuf_client_path']
# Instantiate an updater.
setup_info['tuf_client']['updater'] = \
tuf.client.updater.Updater('updater', setup_info['mirrors'])
def cleanup_tuf():
"""
<Purpose>
Clean-up method, removes all TUF directories created using
tuf_init().
"""
shutil.rmtree(setup_info['tuf_repo_path'])
shutil.rmtree(setup_info['tuf_client_path'])
def refresh_tuf_repository():
"""
<Purpose>
Update TUF metadata files. Call this method whenever targets files have
changed in the 'repository_dir'.
"""
if not setup_info['tuf']:
msg = 'TUF needs to be initialized.\n'
sys.exist(msg)
shutil.rmtree(setup_info['tuf_repo']['targets'])
shutil.copytree(setup_info['repo_path'], setup_info['tuf_repo']['targets'])
# Regenerate the 'targets.txt' metadata file.
signerlib.build_targets_file(setup_info['tuf_repo']['targets'],
setup_info['keyid'],
setup_info['tuf_repo']['metadata'])
# Regenerate the 'release.txt' metadata file.
signerlib.build_release_file(setup_info['keyid'],
setup_info['tuf_repo']['metadata'])
# Regenerate the 'timestamp.txt' metadata file.
signerlib.build_timestamp_file(setup_info['keyid'],
setup_info['tuf_repo']['metadata'])
def tuf_client_refresh_metadata():
if not setup_info['tuf']:
msg = 'TUF needs to be initialized.\n'
sys.exist(msg)
# Update all metadata.
setup_info['tuf_client']['updater'].refresh()
def tuf_client_download_updates():
if not setup_info['tuf']:
msg = 'TUF needs to be initialized.\n'
sys.exist(msg)
# Get the latest information on targets.
targets = setup_info['tuf_client']['updater'].all_targets()
# Determine which targets have changed or are new.
updated_targets = \
setup_info['tuf_client']['updater'].updated_targets(targets, setup_info['dest_path'])
# Download new/changed targets and store them in the destination
# directory 'destination_dir'.
for target in updated_targets:
setup_info['tuf_client']['updater'].download_target(target, setup_info['dest_path'])
#===========================================#
# Bellow are few quick tests to make sure #
# that everything works smoothly. #
#===========================================#
def test_client_download():
init_repo()
data = client_download('file0')
assert data == 'SystemTestFile0'
def test_tuf_setup():
init_repo(tuf=True)
# Verify that all necessary TUF-paths exist.
for role in ['root', 'targets', 'release', 'timestamp']:
# Repository side.
role_file = os.path.join(setup_info['tuf_repo']['metadata'], role+'.txt')
msg = repr(role)+'repository metadata file missing!'
assert os.path.isfile(role_file), msg
# Client side.
current_dir = setup_info['tuf_client']['metadata']['current']
role_file = os.path.join(current_dir, role+'.txt')
msg = repr(role)+'client metadata file missing!'
assert os.path.isfile(role_file), msg
targets_dir = setup_info['tuf_repo']['targets']
target1 = os.path.join(targets_dir, setup_info['repo']['file0'][0])
target2 = os.path.join(targets_dir, setup_info['repo']['file1'][0])
msg = 'missing target file!'
assert os.path.isfile(target1), msg
assert os.path.isfile(target2), msg
def test_methods():
"""
Making sure following methods work as intended:
- add_or_change_file_at_repository()
- delete_file_at_repository()
- refresh_tuf_repository()
"""
init_repo(tuf=True)
new_file = add_or_change_file_at_repository('file2')
fileobj = open(new_file, 'rb')
msg = 'add_or_change_file_at_repository() failed on file creation.'
assert os.path.exists(new_file), msg
msg = 'content of the new file did not match expected data.'
assert 'test' == fileobj.read(), msg
fileobj.close()
old_file = add_or_change_file_at_repository(repo_file='file1')
fileobj = open(old_file, 'rb')
msg = 'add_or_change_file_at_repository() failed on changing content of a file.'
assert os.path.exists(old_file), msg
msg = 'content of the changed file did not match expected data.'
assert 'test' == fileobj.read(), msg
fileobj.close()
old_file = add_or_change_file_at_repository(repo_file='file1', data='1234')
fileobj = open(old_file, 'rb')
msg = 'add_or_change_file_at_repository() failed on changing content of a file.'
assert os.path.exists(old_file), msg
msg = 'content of the changed file did not match expected data.'
assert '1234' == fileobj.read(), msg
fileobj.close()
refresh_tuf_repository()
targets_dir = setup_info['tuf_repo']['targets']
new_target = os.path.join(targets_dir, os.path.basename(new_file))
msg = 'failed to add a target to the tuf targets directory on refresh.'
assert os.path.exists(new_target)
# Here it's assumed that all relevant metadata has been updated
# successfully. This is tested in signerlib and other unit tests.
delete_file_at_repository('file2')
msg = 'failed to delete a file on the repository.'
assert not os.path.exists(new_file), msg
if __name__ == '__main__':
tests = [test_client_download, test_tuf_setup, test_methods]
for test in tests:
try:
test()
print repr(test)+'....... OKAY'
except Exception, e:
raise
finally:
cleanup()
setup_info = {}

View file

@ -0,0 +1,153 @@
"""
<Program Name>
test_replay_attack.py
<Author>
Konstantin Andrianov
<Started>
February 22, 2012
<Copyright>
See LICENSE for licensing information.
<Purpose>
Simulate a replay attack. A simple client update vs. client update
implementing TUF.
Note: There is no difference between 'updates' and 'target' files.
Note: If TUF is implemented - you would NOT use urllib like it's done here
for the testing purposes. TUF handles the downloads.
"""
import os
import shutil
import urllib
import tempfile
import util_test_tools
def test_replay_attack(tuf=False):
"""
<Arguments>
tuf:
If set to 'False' all directories that start with 'tuf_' are ignored,
indicating that tuf is not implemented.
<Purpose>
Illustrate replay attack vulnerability.
"""
# Setup.
temp_root, url = util_test_tools.init_repo(tuf=tuf)
repo = os.path.join(temp_root, 'repo')
tuf_repo = os.path.join(temp_root, 'tuf_repo')
downloads =os.path.join(temp_root, 'downloads')
# Add file to 'repo' directory: {temp_root}
filepath = util_test_tools.add_file_to_repository('Test A')
file_basename = os.path.basename(filepath)
url_to_repo = url+'repo/'+file_basename
downloaded_file = os.path.join(downloads, file_basename)
# Client performs initial update.
if tuf:
util_test_tools.tuf_refresh_and_download()
else:
urllib.urlretrieve(url_to_repo, downloaded_file)
# Content of the downloaded file.
# Downloads are stored in the same directory '{temp_root}/downloads/'
# independent of who stores there (tuf or regular client). See warning
# in util_test_tools.init_repo().
downloaded_content = util_test_tools.read_file_content(downloaded_file)
msg = '[Initial Updata] Failed to download the file.'
assert 'Test A' == downloaded_content, msg
# Attacker finds a vulnerability in the file.
evil_dir = tempfile.mkdtemp(dir=temp_root)
vulnerable_file = os.path.join(evil_dir, file_basename)
urllib.urlretrieve(url_to_repo, vulnerable_file)
# Developer patches the file and updates the repository.
util_test_tools.modify_file_at_repository(filepath, 'Test NOT A')
# Client downloads the patched file.
if tuf:
util_test_tools.tuf_refresh_and_download()
else:
urllib.urlretrieve(url_to_repo, downloaded_file)
# Content of the downloaded file.
downloaded_content = util_test_tools.read_file_content(downloaded_file)
msg = '[Updata] Failed to update the file.'
assert 'Test NOT A' == downloaded_content, msg
# Attacker tries to be clever, he manages to modifies tuf targets directory
# by replacing a patched file with an old one.
#
# Since we don't really have any restriction where regular download
# retrieves the files from, this works fine. On the other hand, when
# tuf is used this will guarantee that tuf-client will be retrieving the
# attacker's file. This happens, because mirror's list is pointing to
# the tuf repository.
#
# If tuf is False none of the tuf directories are created, but attacker
# needs tuf targets directory in order to be able to attack both tuf and
# non-tuf clients. For this purpose he creates an artificial tuf targets
# directory (Remember: the tuf is not setup at this point!).
targets_dir = os.path.join(tuf_repo, 'targets')
if not os.path.isdir(targets_dir):
os.makedirs(targets_dir)
shutil.copy(vulnerable_file, targets_dir)
url_to_tuf = url+'tuf_repo/targets/'+file_basename
# Verify that 'target' is an old, un-patched file.
target = os.path.join(targets_dir, file_basename)
target_content = util_test_tools.read_file_content(target)
msg = 'The \'target\' file contains new data!'
assert 'Test A' == target_content, msg
# Client downloads the file once time.
if tuf:
util_test_tools.tuf_refresh_and_download()
else:
urllib.urlretrieve(url_to_tuf, downloaded_file)
# Check whether the attack succeeded by inspecting the content of the
# update. The update should contain 'Test NOT A'.
downloaded_content = util_test_tools.read_file_content(downloaded_file)
msg = 'Replay attack was successful!\n'
assert 'Test NOT A' == downloaded_content, msg
try:
test_replay_attack(tuf=False)
except AssertionError, e:
print 'Expected Failure: '+repr(e)
else:
print 'Unexpected Failure!'
finally:
util_test_tools.cleanup()
try:
test_replay_attack(tuf=True)
except AssertionError, e:
print 'Unexpected Failure: '+repr(e)
else:
print 'Expected Success!'
finally:
util_test_tools.cleanup()

View file

@ -0,0 +1,153 @@
"""
<Program Name>
test_util_test_tools.py
<Author>
Konstantin Andrianov
<Started>
February 26, 2012
<Copyright>
See LICENSE for licensing information.
<Purpose>
Test util_test_tools.
"""
import os
import urllib
import unittest
import util_test_tools
class test_UtilTestTools(unittest.TestCase):
def setUp(self):
unittest.TestCase.setUp(self)
self.temp_root, self.url = util_test_tools.init_repo(tuf=True)
def tearDown(self):
unittest.TestCase.tearDown(self)
util_test_tools.cleanup()
#================================================#
# Bellow are few quick tests to make sure that #
# everything works smoothly in util_test_tools. #
#================================================#
# A few quick internal tests to see if everything runs smoothly.
def test_direct_download(self):
# Setup.
downloads = os.path.join(self.temp_root, 'downloads')
filepath = util_test_tools.add_file_to_repository('Test')
file_basename = os.path.basename(filepath)
url = self.url+'repo/'+file_basename
downloaded_file = os.path.join(downloads, file_basename)
# Test direct download using 'urllib.urlretrieve'.
urllib.urlretrieve(url, downloaded_file)
self.assertTrue(os.path.isfile(downloaded_file))
# Verify the content of the downloaded file.
downloaded_content = util_test_tools.read_file_content(downloaded_file)
self.assertEquals('Test', downloaded_content)
def test_correct_directory_structure(self):
# Verify following directories exists: '{temp_root}/repo/',
# '{temp_root}/downloads/.
self.assertTrue(os.path.isdir(os.path.join(self.temp_root, 'repo')))
self.assertTrue(os.path.isdir(os.path.join(self.temp_root, 'downloads')))
# Verify that all necessary TUF-paths exist.
tuf_repo = os.path.join(self.temp_root, 'tuf_repo')
tuf_client = os.path.join(self.temp_root, 'tuf_client')
metadata_dir = os.path.join(tuf_repo, 'metadata')
current_dir = os.path.join(tuf_client, 'metadata', 'current')
# Verify '{temp_root}/tuf_repo/metadata/role.txt' paths exists.
for role in ['root', 'targets', 'release', 'timestamp']:
# Repository side.
role_file = os.path.join(metadata_dir, role+'.txt')
self.assertTrue(os.path.isfile(role_file))
# Client side.
role_file = os.path.join(current_dir, role+'.txt')
self.assertTrue(os.path.isfile(role_file))
# Verify '{temp_root}/tuf_repo/keystore/keyid.key' exists.
keys_list = os.listdir(os.path.join(tuf_repo, 'keystore'))
self.assertEquals(len(keys_list), 1)
# Verify '{temp_root}/tuf_repo/targets/' directory exists.
self.assertTrue(os.path.isdir(os.path.join(tuf_repo, 'targets')))
def test_methods(self):
"""
Making sure following methods work as intended:
- add_file_to_repository(data)
- modify_file_at_repository(filepath, data)
- delete_file_at_repository(filepath)
- read_file_content(filepath)
- tuf_refresh_repo()
- tuf_refresh_and_download()
Note: here file at the 'filepath' and the 'target' file at tuf-targets
directory are identical files.
Ex: filepath = '{temp_root}/repo/file.txt'
target = '{temp_root}/tuf_repo/targets/file.txt'
"""
repo = os.path.join(self.temp_root, 'repo')
tuf_repo = os.path.join(self.temp_root, 'tuf_repo')
downloads = os.path.join(self.temp_root, 'downloads')
# Test 'add_file_to_repository(data)' and read_file_content(filepath)
# methods
filepath = util_test_tools.add_file_to_repository('Test')
self.assertTrue(os.path.isfile(filepath))
self.assertEquals(os.path.dirname(filepath), repo)
filepath_content = util_test_tools.read_file_content(filepath)
self.assertEquals('Test', filepath_content)
# Test 'modify_file_at_repository(filepath, data)' method.
filepath = util_test_tools.modify_file_at_repository(filepath, 'Modify')
self.assertTrue(os.path.exists(filepath))
filepath_content = util_test_tools.read_file_content(filepath)
self.assertEquals('Modify', filepath_content)
# Test 'tuf_refresh_repo' method.
util_test_tools.tuf_refresh_repo()
file_basename = os.path.basename(filepath)
target = os.path.join(tuf_repo, 'targets', file_basename)
self.assertTrue(os.path.isfile(target))
# Test 'tuf_refresh_and_download()' method.
util_test_tools.tuf_refresh_and_download()
target = os.path.join(downloads, file_basename)
self.assertTrue(os.path.isfile(target))
# Test 'delete_file_at_repository(filepath)' method.
util_test_tools.delete_file_at_repository(filepath)
self.assertFalse(os.path.exists(filepath))
# Test 'tuf_refresh_repo' method once more.
util_test_tools.tuf_refresh_repo()
file_basename = os.path.basename(filepath)
target = os.path.join(tuf_repo, 'targets', file_basename)
self.assertFalse(os.path.isfile(target))
if __name__ == '__main__':
unittest.main()

View file

@ -0,0 +1,456 @@
"""
<Program Name>
util_test_tools.py
<Author>
Konstantin Andrianov
<Started>
February 19, 2012
<Copyright>
See LICENSE for licensing information.
<Purpose>
A utility modules that provides convenient methods to make the laborious
process of test construction a bit easier.
A structure that does NOT implementing TUF. A direct download over http.
Repository + Server <---------------> Client
The TUF structure is described bellow in the class and tuf_tearDown() docs.
Repository + TUF + Server <---------> TUF + Client
<Directories>
Initialized by init_repo()
The server is pointing to 'temp_root' directory, including the '/'.
temp_root
|
-------------------------------------------------------
| | | | |
repo tuf_repo tuf_client tuf_downloads downloads
'{temp_repo}/downloads/': stores all direct downloads made by the client.
'{temp_repo}/tuf_downloads/': stores all downloads made by the client using
tuf.
repo
|
-----------------------------
| | ... |
file(1) file(2) ... file(n)
'{temp_repo}/repo/': main developer's repository that contains files or
updates that need to be distributed.
tuf_repo
|
------------------------------------------
| | |
keystore metadata targets
| | |
key1.key ... role.txt ... file(1) ...
'{temp_repo}/tuf_repo/': developer's tuf-repository directory containing
following subdirectories:
'{temp_repo}/tuf_repo/keystore/': directory where all signing keys are
stored.
'{temp_repo}/tuf_repo/metadata/': directory where all metadata signed
metadata files are stored.
'{temp_repo}/tuf_repo/targets/': directory where all tuf verified files
are stored.
tuf_client
|
metadata
|
---------------------------
| |
current previous
| |
role.txt ... role.txt ...
'{temp_repo}/tuf_cleint/': client directory containing tuf metadata.
'{temp_repo}/tuf_cleint/metadata/current': directory where client stores
latest metadata files.
'{temp_repo}/tuf_cleint/metadata/current': directory where client stores
previous metadata files.
<Methods>
init_repo(tuf=True):
Initializes the repositories (depicted in the diagram above) and
starts the server process. init_repo takes one boolean argument
which when True sets-up tuf repository i.e. adds all of the
directories that start with 'tuf_' in the temp_root (depicted above).
Returns a tuple - full path of the 'temp_root' directory, and the url.
This should be sufficient to construct the tests.
cleanup():
Deletes all of the created repositories and shuts down the server.
add_file_to_repository(data):
Adds a file to the 'repo' directory and writes 'data' into it.
Returns full file path of the new file.
modify_file_at_repository(filepath, data):
Modifies a file at the 'repo' directory by writing 'data' into it.
'filepath' has to be an existing file at the 'repo' directory.
Returns full file path of the modified file.
delete_file_at_repository(filepath):
Deletes a file at the 'repo' directory.
'filepath' has to be an existing file at the 'repo' directory.
read_file_content(filepath):
Returns data string of the 'filepath' content.
init_tuf():
Builds tuf repository creating all necessary directories, metadata files,
and keys.
tuf_refresh_repo():
Refreshes metadata files at the 'tuf_repo' directory i.e. role.txt's at
'{temp_root}/tuf_repo/metadata/'. Following roles are refreshed:
targets, release and timestamp. Also, the whole 'repo' directory is
copied to targets directory i.e. '{temp_root}/tuf_repo/targets/'.
tuf_refresh_client_metadata():
Downloads latests metadata files from '{temp_root}/tuf_repo/metadata/'
into '{temp_root}/tuf_client/metadata/current/'.
tuf_download_updates()
Downloads files in the secure manner and then performs all tuf security
checks i.e. length and hash comparisons based on the information in the
metadata files.
tuf_refresh_and_download()
Combines tuf_refresh_repo(), tuf_refresh_client_metadata(), and
tuf_download_updates().
Returns 'tuf_downloads' directory where all tuf downloaded files are
located.
Note: metadata files are root.txt, targets.txt, release.txt and
timestamp.txt (denoted as 'role.txt in the diagrams'). There could be
more metadata files such us mirrors.txt. The metadata files are signed
by their corresponding roles i.e. root, targets etc.
More documentation is provided in the comment and doc blocks.
"""
import os
import sys
import time
import shutil
import random
import logging
import tempfile
import subprocess
import tuf.client.updater
import tuf.repo.signerlib as signerlib
# Disable/Enable logging. Comment-out to Enable logging.
logging.getLogger('tuf')
logging.disable(logging.CRITICAL)
# 'setup_info' stores all important setup information like the path of the
# 'temp_root' directory, etc.
setup_info = {}
def init_repo(tuf=False):
setup_info['tuf'] = tuf
# Temp root directory for regular and tuf repositories.
# WARNING: tuf client stores files in '{temp_root}/downloads/' directory!
# Make sure regular download are NOT stored in the that directory when
# tuf stores its downloads there. If regular download needs to happen at
# the time when tuf has or will have tuf downloads stored there just create
# a separate directory in {temp_root} to store regular downloads in.
# Ex: mkdir(temp_root, 'reg_downloads').
setup_info['temp_root'] = temp_root = tempfile.mkdtemp(dir=os.getcwd())
setup_info['repo'] = os.path.join(temp_root, 'repo')
setup_info['downloads'] = os.path.join(temp_root, 'downloads')
os.mkdir(setup_info['repo'])
os.mkdir(setup_info['downloads'])
# Start a simple server pointing to the repository directory.
port = random.randint(30000, 45000)
command = ['python', '-m', 'SimpleHTTPServer', str(port)]
setup_info['server_proc'] = subprocess.Popen(command, stderr=subprocess.PIPE)
# Tailor url for the repository. In order to download a 'file.txt'
# from 'repo' do: url+'repo/file.txt'
relpath = os.path.basename(temp_root)
setup_info['url'] = url = 'http://localhost:'+str(port)+'/'+relpath+'/'
# NOTE: The delay is needed to make up for asynchronous subprocess.
# Otherwise following error might be raised:
# <urlopen error [Errno 111] Connection refused>
time.sleep(.1)
if tuf:
init_tuf()
return temp_root, url
def cleanup():
if not setup_info:
msg = 'init_repo() must be called before cleanup().\n'
sys.exit(msg)
if setup_info['server_proc'].returncode is None:
setup_info['server_proc'].kill()
print 'Server terminated.\n'
# Removing repository directory.
try:
shutil.rmtree(setup_info['temp_root'])
except OSError, e:
pass
def add_file_to_repository(data='Test String'):
junk, filepath = tempfile.mkstemp(dir=setup_info['repo'])
fileobj = open(filepath, 'wb')
fileobj.write(data)
fileobj.close()
return filepath
def modify_file_at_repository(filepath, data='Modified String'):
repo = os.path.dirname(filepath)
if repo != setup_info['repo'] or not os.path.isfile(filepath):
msg = 'Provide a valid file on the repository to modify.'
sys.exit(msg)
fileobj = open(filepath, 'wb')
fileobj.write(data)
fileobj.close()
return filepath
def delete_file_at_repository(filepath):
"""
<Purpose>
Attempt to delete a file at the repository setup_info['repo_path'].
"""
repo = os.path.dirname(filepath)
if repo != setup_info['repo'] or not os.path.isfile(filepath):
msg = 'Provide a valid file on the repository to delete.'
sys.exit(msg)
os.remove(filepath)
def read_file_content(filepath):
if not os.path.isfile(filepath):
msg = 'Provide a valid file to read.'
sys.exit(msg)
fileobj = open(filepath, 'rb')
data = fileobj.read()
fileobj.close()
return data
def init_tuf():
"""
<Purpose>
Setup TUF directory structure and populated it with TUF metadata and
congfiguration files.
"""
passwd = 'test'
threshold = 1
# Setup TUF-repo directory structure.
setup_info['tuf_repo'] = tuf_repo = \
os.path.join(setup_info['temp_root'], 'tuf_repo')
keystore_dir = os.path.join(tuf_repo, 'keystore')
metadata_dir = os.path.join(tuf_repo, 'metadata')
targets_dir = os.path.join(tuf_repo, 'targets')
os.mkdir(tuf_repo)
os.mkdir(keystore_dir)
os.mkdir(metadata_dir)
shutil.copytree(setup_info['repo'], targets_dir)
# Setting TUF-client directory structure.
# 'tuf.client.updater.py' expects the 'current' and 'previous'
# directories to exist under client's 'metadata' directory.
setup_info['tuf_client'] = tuf_client = \
os.path.join(setup_info['temp_root'], 'tuf_client')
tuf_client_metadata_dir = os.path.join(tuf_client, 'metadata')
current_dir = os.path.join(tuf_client_metadata_dir, 'current')
previous_dir = os.path.join(tuf_client_metadata_dir, 'previous')
os.makedirs(tuf_client_metadata_dir)
# Generate at least one rsa key.
key = signerlib.generate_and_save_rsa_key(keystore_dir, passwd)
keyid = [key['keyid']]
setup_info['keyid'] = keyid
# Set role info.
info = {'keyids': [key['keyid']], 'threshold': threshold}
# 'role_info' dictionary looks like this:
# {role : {'keyids : [keyid1, ...] , 'threshold' : 1}}
# In our case 'role_info[keyids]' will only have on entry since only one
# is being used.
role_info = {}
role_list = ['root', 'targets', 'release', 'timestamp']
for role in role_list:
role_info[role] = info
# At this point there is enough information to create TUF configuration
# and metadata files.
# Build the configuration file.
conf_path = signerlib.build_config_file(metadata_dir, 365, role_info)
# Generate the 'root.txt' metadata file.
signerlib.build_root_file(conf_path, keyid, metadata_dir)
# Generate the 'targets.txt' metadata file.
signerlib.build_targets_file(targets_dir, keyid, metadata_dir)
# Generate the 'release.txt' metadata file.
signerlib.build_release_file(keyid, metadata_dir)
# Generate the 'timestamp.txt' metadata file.
signerlib.build_timestamp_file(keyid, metadata_dir)
# Move the metadata to the client's 'current' and 'previous' directories.
shutil.copytree(metadata_dir, current_dir)
shutil.copytree(metadata_dir, previous_dir)
# The repository is now setup!
# Here is a mirrors dictionary that will allow a client to seek out
# places to download the metadata and targets from.
tuf_repo_relpath = os.path.basename(tuf_repo)
url_prefix = setup_info['url']+tuf_repo_relpath+'/'
setup_info['mirrors'] = {'mirror1':
{'url_prefix': url_prefix,
'metadata_path': 'metadata',
'targets_path': 'targets',
'confined_target_dirs': ['']}}
# Adjusting configuration file (tuf.conf.py).
tuf.conf.repository_directory = setup_info['tuf_client']
# Instantiate an updater.
setup_info['updater'] = \
tuf.client.updater.Updater('updater', setup_info['mirrors'])
def tuf_refresh_repo():
"""
<Purpose>
Update TUF metadata files. Call this method whenever targets files have
changed in the 'repo'.
"""
if not setup_info['tuf']:
msg = 'TUF needs to be initialized.\n'
sys.exist(msg)
keyid = setup_info['keyid']
metadata_dir = os.path.join(setup_info['tuf_repo'], 'metadata')
targets_dir = os.path.join(setup_info['tuf_repo'], 'targets')
shutil.rmtree(targets_dir)
shutil.copytree(setup_info['repo'], targets_dir)
# Regenerate the 'targets.txt' metadata file.
signerlib.build_targets_file(targets_dir, keyid, metadata_dir)
# Regenerate the 'release.txt' metadata file.
signerlib.build_release_file(keyid, metadata_dir)
# Regenerate the 'timestamp.txt' metadata file.
signerlib.build_timestamp_file(keyid, metadata_dir)
def tuf_refresh_client_metadata():
if not setup_info['tuf']:
msg = 'TUF needs to be initialized.\n'
sys.exist(msg)
# Update all metadata.
setup_info['updater'].refresh()
def tuf_download_updates():
"""
Here it is assumed that client has already downloaded latest metadata files.
"""
if not setup_info['tuf']:
msg = 'TUF needs to be initialized.\n'
sys.exist(msg)
# Get the latest information on targets.
targets = setup_info['updater'].all_targets()
# Create destination directory for the tuf targets.
dest = setup_info['downloads']
# Determine which targets have changed or are new.
updated_targets = \
setup_info['updater'].updated_targets(targets, dest)
# Download new/changed targets and store them in the 'tuf_downloads' dir.
for target in updated_targets:
setup_info['updater'].download_target(target, dest)
def tuf_refresh_and_download():
"""
Combines tuf_refresh_repo(), tuf_refresh_client_metadata(), and
tuf_download_updates().
Returns 'tuf_downloads' directory.
"""
tuf_refresh_repo()
tuf_refresh_client_metadata()
tuf_download_updates()
return setup_info['downloads']

View file

@ -213,15 +213,82 @@ def test_2__get_metadata_directory(self):
def test_1__list_keyids(self):
def test_4__list_keyids(self):
# SETUP
# The 'root.txt' and 'targets.txt' metadata files are
# needed for _list_keyids() to determine the roles
# associated with each keyid.
keystore_dir = self.create_temp_keystore_directory()
repo_dir = self.make_temp_directory()
# Create temp directory for config file.
config_dir = self.make_temp_directory()
# Build a config file.
config_filepath = signerlib.build_config_file(config_dir, 365,
self.top_level_role_info)
# Create the metadata directory needed by _list_keyids().
meta_dir = self.make_temp_directory()
# Patch signercli._get_metadata_directory().
self.mock_get_metadata_directory(directory=meta_dir)
# Patch signercli._prompt().
self.mock_prompt(config_filepath)
# Patch signercli._get_password().
self.get_passwords()
# Create the root metadata file that will be loaded by _list_keyids()
# to extract the keyids for the top-level roles.
signercli.make_root_metadata(keystore_dir)
# Create a directory containing target files.
targets_dir, targets_paths =\
self.make_temp_directory_with_data_files(directory=repo_dir)
# Mock method for signercli._prompt().
self.make_metadata_mock_prompts(targ_dir=targets_dir,
conf_path=config_filepath)
# Create the target metadata file that will be loaded by _list_keyids()
# to extract the keyids for all the targets roles.
signercli.make_targets_metadata(keystore_dir)
# TESTS
# Test: normal case.
signercli._list_keyids(keystore_dir)
signercli._list_keyids(keystore_dir, meta_dir)
# Test: Improperly formatted 'root.txt' file.
root_filename = os.path.join(meta_dir, 'root.txt')
root_signable = tuf.util.load_json_file(root_filename)
saved_roles = root_signable['signed']['roles']
del root_signable['signed']['roles']
tuf.repo.signerlib.write_metadata_file(root_signable, root_filename)
self.assertRaises(tuf.RepositoryError,
signercli._list_keyids, keystore_dir, meta_dir)
# Restore the properly formatted 'root.txt' file.
root_signable['signed']['roles'] = saved_roles
tuf.repo.signerlib.write_metadata_file(root_signable, root_filename)
# Test: Improperly formatted 'targets.txt' file.
targets_filename = os.path.join(meta_dir, 'targets.txt')
targets_signable = tuf.util.load_json_file(targets_filename)
saved_targets = targets_signable['signed']['targets']
del targets_signable['signed']['targets']
tuf.repo.signerlib.write_metadata_file(targets_signable, targets_filename)
self.assertRaises(tuf.RepositoryError,
signercli._list_keyids, keystore_dir, meta_dir)
# Restore the properly formatted 'targets.txt' file.
targets_signable['signed']['targets'] = saved_targets
tuf.repo.signerlib.write_metadata_file(targets_signable, targets_filename)
@ -459,15 +526,44 @@ def test_1__sign_and_write_metadata(self):
def test_2_change_password(self):
def test_4_change_password(self):
# SETUP
# Create keystore and repo directories.
keystore_dir = self.create_temp_keystore_directory()
repo_dir = self.make_temp_directory()
# Create temp directory for config file.
config_dir = self.make_temp_directory()
# Build a config file.
config_filepath = signerlib.build_config_file(config_dir, 365,
self.top_level_role_info)
# Create a temp metadata directory.
meta_dir = self.make_temp_directory()
# Patch signercli._get_metadata_directory().
self.mock_get_metadata_directory(directory=meta_dir)
# Patch signercli._prompt().
self.mock_prompt(config_filepath)
signercli.make_root_metadata(keystore_dir)
# Create a directory containing target files.
targets_dir, targets_paths =\
self.make_temp_directory_with_data_files(directory=repo_dir)
# Mock method for signercli._prompt().
self.make_metadata_mock_prompts(targ_dir=targets_dir,
conf_path=config_filepath)
signercli.make_targets_metadata(keystore_dir)
test_keyid = self.rsa_keyids[0]
self.mock_prompt(test_keyid)
# Create keystore directory.
keystore_dir = self.create_temp_keystore_directory()
# Specify old password and create a new password.
old_password = self.rsa_passwords[test_keyid]
new_password = self.random_string()
@ -542,9 +638,45 @@ def _mock_get_password(junk, confirm=False):
def test_2_dump_key(self):
def test_4_dump_key(self):
# SETUP
# Create keystore and repo directories.
keystore_dir = self.create_temp_keystore_directory()
repo_dir = self.make_temp_directory()
# Create temp directory for config file.
config_dir = self.make_temp_directory()
# Build a config file.
config_filepath = signerlib.build_config_file(config_dir, 365,
self.top_level_role_info)
# Create a temp metadata directory.
meta_dir = self.make_temp_directory()
# Patch signercli._get_metadata_directory().
self.mock_get_metadata_directory(directory=meta_dir)
# Patch signercli._get_password().
self.get_passwords()
# Patch signercli._prompt().
self.mock_prompt(config_filepath)
signercli.make_root_metadata(keystore_dir)
# Create a directory containing target files.
targets_dir, targets_paths =\
self.make_temp_directory_with_data_files(directory=repo_dir)
# Mock method for signercli._prompt().
self.make_metadata_mock_prompts(targ_dir=targets_dir,
conf_path=config_filepath)
signercli.make_targets_metadata(keystore_dir)
keyid = self.rsa_keyids[0]
password = self.rsa_passwords[keyid]
show_priv = 'private'
@ -567,9 +699,6 @@ def _mock_prompt(msg, junk):
# Patch signercli._prompt().
signercli._prompt = _mock_prompt
# Create keystore directory.
keystore_dir = self.create_temp_keystore_directory()
# TESTS
# Test: normal case.
@ -1108,6 +1237,9 @@ def test_7_make_delegation(self):
# Load keystore.
load_keystore = keystore.load_keystore_from_keyfiles
# Build the root metadata file (root.txt).
signercli.make_root_metadata(keystore_dir)
# Build targets metadata file (targets.txt).
signercli.make_targets_metadata(keystore_dir)
@ -1116,7 +1248,7 @@ def test_7_make_delegation(self):
# Mock method for signercli._prompt().
def _mock_prompt(msg, junk):
if msg.startswith('\nNOTE: The directory entered'):
if msg.startswith('\nThe directory entered'):
return delegated_targets_dir
elif msg.startswith('\nChoose and enter the parent'):
return parent_role

View file

@ -1,225 +0,0 @@
# Thanks to https://gist.github.com/zed/1347055
"""SSL client/server certificates verification for `urllib2`.
It works on Python 2.6, 2.7, 3.1, 3.2
It also works on Python 2.4, 2.5 if `ssl` is installed (``pip install ssl``)
Example::
>>> import urllib2, urllib2_ssl
>>> opener = urllib2.build_opener(urllib2_ssl.HTTPSHandler(
... key_file='clientkey.pem',
... cert_file='clientcert.pem',
... ca_certs='cacrt.pem'))
>>> opener.open('https://example.com/').read()
"""
__all__ = ['match_hostname', 'CertificateError']
import sys
import socket
if not hasattr(socket, 'create_connection'): # for Python 2.4
_GLOBAL_DEFAULT_TIMEOUT = getattr(socket, '_GLOBAL_DEFAULT_TIMEOUT', object())
# copy-paste from stdlib's socket.py (py2.6)
def create_connection(address, timeout=_GLOBAL_DEFAULT_TIMEOUT,
source_address=None):
"""Connect to *address* and return the socket object.
Convenience function. Connect to *address* (a 2-tuple ``(host,
port)``) and return the socket object. Passing the optional
*timeout* parameter will set the timeout on the socket instance
before attempting to connect. If no *timeout* is supplied, the
global default timeout setting returned by :func:`getdefaulttimeout`
is used.
"""
host, port = address
err = None
for res in socket.getaddrinfo(host, port, 0, socket.SOCK_STREAM):
af, socktype, proto, canonname, sa = res
sock = None
try:
sock = socket.socket(af, socktype, proto)
if timeout is not _GLOBAL_DEFAULT_TIMEOUT:
sock.settimeout(timeout)
if source_address:
sock.bind(source_address)
sock.connect(sa)
return sock
except socket.error:
err = sys.exc_info()[1]
if sock is not None:
sock.close()
if err is not None:
raise err
else:
raise socket.error("getaddrinfo returns an empty list")
# monkey-patch socket module
socket.create_connection = create_connection
# copy-paste from stdlib's ssl.py (py3.2)
class CertificateError(ValueError):
pass
def match_hostname(cert, hostname):
"""Verify that *cert* (in decoded format as returned by
SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 rules
are mostly followed, but IP addresses are not accepted for *hostname*.
CertificateError is raised on failure. On success, the function
returns nothing.
XXX this version differ from ssl.match_hostname in python 3.2
it checks subject even if subjectAltName is not empty
"""
if not cert:
raise ValueError("empty or no certificate")
dnsnames = []
san = cert.get('subjectAltName', ())
for key, value in san:
if key == 'DNS':
if _dnsname_to_pat(value).match(hostname):
return
dnsnames.append(value)
if not dnsnames:
#XXX check subject even if subjectAltName is not empty
for sub in cert.get('subject', ()):
for key, value in sub:
# XXX according to RFC 2818, the most specific Common Name
# must be used.
if key == 'commonName':
if _dnsname_to_pat(value).match(hostname):
return
dnsnames.append(value)
if len(dnsnames) > 1:
raise CertificateError("hostname %r "
"doesn't match either of %s"
% (hostname, ', '.join(map(repr, dnsnames))))
elif len(dnsnames) == 1:
raise CertificateError("hostname %r "
"doesn't match %r"
% (hostname, dnsnames[0]))
else:
raise CertificateError("no appropriate commonName or "
"subjectAltName fields were found")
def _dnsname_to_pat(dn):
pats = []
for frag in dn.split(r'.'):
if frag == '*':
# When '*' is a fragment by itself, it matches a non-empty dotless
# fragment.
pats.append('[^.]+')
else:
# Otherwise, '*' matches any dotless fragment.
frag = re.escape(frag)
pats.append(frag.replace(r'\*', '[^.]*'))
return re.compile(r'\A' + r'\.'.join(pats) + r'\Z', re.IGNORECASE)
try: import ssl
except ImportError:
ssl = None
import warnings
msg = ("Can't import ssl. HTTPS won't work."
"Run `pip install ssl` if Python < 2.6")
# use python -Wd to see this warning (it is ignored by default)
try:
ImportWarning
except NameError:
warnings.warn(msg) # Python < 2.5
else:
warnings.warn(msg, ImportWarning)
else: # ssl is available
# see http://www.muchtooscrawled.com/2010/03/https-certificate-verification-in-python-with-urllib2/
try: from http import client # py3k
except ImportError:
import httplib as client # py < 3.x
import re
try: import urllib2 as request # py < 3.x
except ImportError:
from urllib import request # py3k
class HTTPSConnection(client.HTTPSConnection):
def __init__(self, host, **kwargs):
self.ca_certs = kwargs.pop('ca_certs', None)
self.checker = kwargs.pop('checker', match_hostname)
# for python < 2.6
self.timeout = kwargs.get('timeout', socket.getdefaulttimeout())
client.HTTPSConnection.__init__(self, host, **kwargs)
def connect(self):
# overrides the version in httplib so that we do
# certificate verification
args = [(self.host, self.port), self.timeout,]
if hasattr(self, 'source_address'):
args.append(self.source_address)
sock = socket.create_connection(*args)
if getattr(self, '_tunnel_host', None):
self.sock = sock
self._tunnel()
# wrap the socket using verification with the root
# certs in self.ca_certs
kwargs = {}
if self.ca_certs is not None:
kwargs.update(
cert_reqs=ssl.CERT_REQUIRED,
ca_certs=self.ca_certs)
self.sock = ssl.wrap_socket(sock,
keyfile=self.key_file,
certfile=self.cert_file,
**kwargs)
if self.checker is not None:
try:
self.checker(self.sock.getpeercert(), self.host)
except CertificateError:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
# wraps https connections with ssl certificate verification
class HTTPSHandler(request.HTTPSHandler):
# see http://www.threepillarglobal.com/soap_client_auth
# HTTPS Client Auth solution for urllib2, inspired by
# http://bugs.python.org/issue3466 and improved by David
# Norton of Three Pillar Software. In this implementation,
# we use properties passed in rather than static module
# fields.
def __init__(self, key_file=None, cert_file=None, ca_certs=None,
checker=match_hostname):
request.HTTPSHandler.__init__(self)
# see http://docs.python.org/library/ssl.html#certificates
self.key_file = key_file
self.cert_file = cert_file
self.ca_certs = ca_certs
self.checker = checker
def https_open(self, req):
# Rather than pass in a reference to a connection class, we pass in
# a reference to a function which, for all intents and purposes,
# will behave as a constructor
return self.do_open(self.getConnection, req)
def getConnection(self, host, **kwargs):
d = dict(cert_file=self.cert_file,
key_file=self.key_file,
ca_certs=self.ca_certs,
checker=self.checker)
d.update(kwargs)
return HTTPSConnection(host, **d)
__all__.append('HTTPSHandler')