Remove interposition

Signed-off-by: Vladimir Diaz <vladimir.v.diaz@gmail.com>
This commit is contained in:
Vladimir Diaz 2017-11-16 12:37:28 -05:00
parent a0852f6965
commit f0aa76aeaf
No known key found for this signature in database
GPG key ID: 5DEE9B97B0E2289A
4 changed files with 0 additions and 2166 deletions

View file

@ -1,264 +0,0 @@
## Interposition
The interposition package (tuf/interposition/) can be used to integrate TUF
into a software updater. It is an integration method that requires the least
amount of effort from developers who are performing the integration. The
integration method used by interposition is considered high-level because the
integrator does not explicitly call TUF methods to refresh metadata and
download target files. For example, performing a low-level integration with
*tuf/client/updater.py* requires the integrator to instantiate an updater
object, call updater.refresh() to refresh TUF metadata, and
updater.download_target() to download target files referenced in TUF metadata.
In contrast, an integrator may utilize interposition to load some configuration
settings to indicate which URLs requested by Python urllib calls should be
interposed by TUF. This means that all the update calls for metadata and
target requests are made transparently by the low level *tuf/client/updater.py*
module.
### Interposition Examples
To use interposition, integrators must:
1. Create an interposition configuration file.
2. Import interposition, and load the configuration file with configure().
3. Perform updater urllib calls that may be interposed.
4. Deconfigure interposition.
## Option 1
```python
from tuf.interposition import urllib_tuf as urllib
from tuf.interposition import urllib2_tuf as urllib2
# configure() loads the interposition configuration file that indicates which
# URLs should be interposed by TUF. Any urllib calls that occur after
# configure() are subject to interposition.
configuration = tuf.interposition.configure()
url = 'http://example.com/path/to/document'
urllib.urlopen(url)
urllib.urlretrieve(url, 'mytarget')
urllib2.urlopen(url)
# deconfigure() is used to stop interposition. Any urllib calls that occur
# after deconfigure() are not interposed.
tuf.interposition.deconfigure(configuration)
```
## Option 2
```python
@tuf.interposition.open_url
def instancemethod(self, url, ...):
...
```
Note: tuf.interposition.refresh(configuration) may be called to force a
refresh of the TUF metadata. Interposition normally performs a refresh of TUF
metadata when configure() is called.
## Configuration
A *configuration* is simply a JSON object which tells `tuf.interposition` which
URLs to intercept, how to transform them (if necessary), and where to forward
them (possibly over SSL) for secure responses via TUF.
By default, the configuration object is expected to be situated in the current
working directory in the file with the name "tuf.interposition.json". You may
change this like so:
```python
tuf.interposition.configure(filename="/path/to/json")
```
### Examples
#### Basic
```javascript
{
// This is a required root object.
"configurations": {
// Which network location should we intercept?
// Network locations may be specified as "hostname" or "hostname:port".
"seattle.poly.edu": {
// Where do we find the client copy of the TUF server metadata?
"repository_directory": "client/",
// Where do we forward the requests to seattle.poly.edu?
"repository_mirrors" : {
"mirror1": {
// In this case, we forward them to http://tuf.seattle.poly.edu
"url_prefix": "http://tuf.seattle.poly.edu",
// You do not have to worry about these default parameters.
"metadata_path": "metadata",
"targets_path": "targets",
"confined_target_dirs": [ "" ]
},
// You could specify more repository mirrors.
...
}
}
},
// You could specify more network locations.
...
}
```
*Network locations* must be unique across configurations; this restriction
prevents interposition cycles, amongst other things.
Note that, presently, a network request to a specified network location will be
intercepted no matter what its protocol scheme (e.g http, https) may be.
If you choose to specify `repository_directory` as a relative path, then how
would you determine its absolute path at runtime? You should then configure
interposition with this additional parameter:
```python
tuf.interposition.configure(parent_repository_directory="/path/to/parent/to/repository_directory")
```
#### Matching and transforming URL paths with regular expressions
Given a network location, you might want `tuf.interposition` to intercept only
URL paths that match certain patterns. A related problem is that you might wish
to transform a source path into another target path, perhaps because TUF might
not recognize it readily (e.g. it is an implicit reference to file) or because
you want to hide server-side changes from the client for its convenience. You
can solve both of these problems by matching and transforming URL paths with
regular expressions.
```javascript
{
"configurations": {
"pypi.python.org": {
"repository_mirrors" : {
"mirror1": {
"url_prefix": "http://pypi.updateframework.com",
...
},
...
},
...,
"target_paths": [
{ ".*/(simple/\\w+)/$": "{0}/index.html" },
{ ".*/(packages/.+)$": "{0}" }
]
}
}
}
```
In Javascript lingo, `target_paths` is an array of objects, wherein each object
has exactly a single property mapping the transformation of a *source* path
pattern S to a *target* path pattern T. Given a URL path U, `tuf.interposition`
will attempt to match U against every pattern S in order of appearance in this
array. If a match is found, then the
[groups](http://docs.python.org/2/library/re.html#match-objects) captured with S
will be applied to the [format
string](http://docs.python.org/2/library/string.html#string-formatting) T;
otherwise, or in case of an error, `tuf.interposition` will log a warning that
it will not interpose for U.
This brings us to the following important note. `target_paths` is optional: if
you do not configure a network location with this parameter, interposition will
work over *any* path under the given network location. However, if you do
specify this parameter, then you are implicitly telling `tuf.interposition` how
to transform a specified path into another one, and `tuf.interposition` will
*not* recognize any unspecified path for the given network location, *unless*
you add a wildcard regular expression like so:
```javascript
"target_paths": [
...,
{ "(.*)", "{0}" }
]
```
(Internally, this wildcard regular expression is added when `target_paths` is
left unspecified; this is why interposition will then apply to *any* path given
a specified network location.)
In the example above, we will apply the following transformations:
- "http://pypi.python.org/simple/Django/" => "http://pypi.updateframework.com/simple/Django/index.html"
- "http://pypi.python.org/packages/source/D/Django/Django-1.4.5.tar.gz" => "http://pypi.updateframework.com/packages/source/D/Django/Django-1.4.5.tar.gz"
(Actually, there is an implied "targets" root directory on the TUF server, but
we ignore it for pedagogical purposes.)
However, we will not match, and hence apply any transformation towards the
following URLs patterns, or interpose for them:
- "http://pypi.python.org/search"
- "http://pypi.python.org/serversig/(.+)"
Note: We are considering replacing this feature with a simpler, and hence more
provably secure, mechanism. Please follow issue
[#32](https://github.com/akonst/tuf/issues/32) for more details.
#### Mirror SSL certificate verification
For additional security, you may wish to configure a network location such that
a repository mirror must communicate over the HTTPS protocol. You may do this
by specifying the "https" protocol scheme in the `url_prefix` of a repository
mirror.
Furthermore, you may require `tuf.interposition` to verify the purported SSL
certificate of a repository mirror with the `ssl_certificates` parameter.
```javascript
{
"configurations": {
"pypi.python.org": {
"repository_mirrors" : {
"main": {
"url_prefix": "https://pypi.updateframework.com",
...
}
},
...
"ssl_certificates": "cacert.pem"
}
}
}
```
If any `url_prefix` begins with "https://", then `ssl_certificates` is a
required parameter; it must point to a file of
[certificates](http://docs.python.org/2/library/ssl.html#certificates) bundled
as [PEM](https://www.ietf.org/rfc/rfc1422).
If you choose to specify `ssl_certificates` as a relative path, then how
would you determine its absolute path at runtime? You should then configure
interposition with this additional parameter:
```python
tuf.interposition.configure(parent_ssl_certificates_directory="/path/to/parent/to/ssl_certificates")
```
## Applications
### Seattle + TUF
We have a demonstration of the [Seattle](https://seattle.poly.edu/)
software updater over TUF, which we expect to publish soon.
### PyPI + TUF + pip
We have a demonstration of the Python package manager, [pip, over
TUF](https://github.com/theupdateframework/pip/wiki/pip-over-TUF).
## Limitations (at the time of writing)
- The entire `urllib` or `urllib2` contract is not honoured.
- Downloads are not thread-safe.
- Uses some Python features (e.g. string formatting) that are not backwards-compatible (e.g. with Python < 2.6).
- Uses some Python features (e.g. `urllib, urllib2, urlparse`) that are not forwards-compatible (e.g. with Python >= 3).

View file

@ -1,426 +0,0 @@
"""
<Program Name>
__init__.py
<Author>
Trishank Kuppusamy.
Pankhuri Goyal <pankhurigoyal02@gmail.com>
<Started>
<Copyright>
See LICENSE for licensing information.
<Purpose>
TODO: Add pros / cons of using interposition. Also, should we move code
here to its own module (instead of __init__.py)?
"""
# Help with Python 3 compatibility, where the print statement is a function, an
# implicit relative import is invalid, and the '/' operator performs true
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import functools
import imp
import json
import socket
import logging
import tuf.log
import six
# We import the following directly into our namespace so that there is no name
# conflict.
from tuf.interposition.configuration import ConfigurationParser
from tuf.interposition.updater import UpdaterController
logger = logging.getLogger('tuf.interposition.__init__')
# Export nothing when: from tuf.interposition import *
__all__ = []
# TODO:
# - Document design decisions.
# - Interposition: Honour urllib/urllib2 contract.
# - Review security issues resulting from regular expressions
# (e.g. complexity attacks).
# - Warn user when TUF is used without any configuration.
# - Override other default (e.g. HTTPS) urllib2 handlers.
# - Failsafe: If TUF fails, offer option to unsafely resort back to
# urllib/urllib2?
############################## GLOBAL VARIABLES ################################
# Constants
NON_GET_HTTP_METHOD_MESSAGE = \
"Skipping {method} request to {url} because it is not a GET request."
# Our own public copies of the urllib and urllib2 modules.
# We use None as sentinel values.
urllib_tuf = None
urllib2_tuf = None
# A private, global Controller of Updaters.
__updater_controller = UpdaterController()
########################## GLOBAL PRIVATE FUNCTIONS ############################
def __monkey_patch():
"""
Build and monkey patch public copies of the urllib and urllib2 modules.
We prefer simplicity, which leads to easier proof of security, even if it may
come at the cost of not honouring some provisions of the urllib and urllib2
module contracts unrelated to security.
References:
http://stackoverflow.com/a/11285504
http://docs.python.org/2/library/imp.html
"""
global urllib_tuf
global urllib2_tuf
if urllib_tuf is None:
urllib_module_name = 'urllib'
if six.PY3:
urllib_module_name = 'urllib/request'
try:
module_file, pathname, description = imp.find_module(urllib_module_name)
urllib_tuf = \
imp.load_module('urllib_tuf', module_file, pathname, description)
module_file.close()
except:
raise
else:
urllib_tuf.urlopen = __urllib_urlopen
urllib_tuf.urlretrieve = __urllib_urlretrieve
if urllib2_tuf is None:
urllib2_module_name = 'urllib2'
if six.PY3:
urllib2_module_name = 'urllib/request'
try:
module_file, pathname, description = imp.find_module(urllib2_module_name)
urllib2_tuf = \
imp.load_module('urllib2_tuf', module_file, pathname, description)
module_file.close()
except:
raise
else:
urllib2_tuf.urlopen = __urllib2_urlopen
def __urllib_urlopen(url, data=None, proxies=None):
"""Create a file-like object for the specified URL to read from."""
updater = __updater_controller.get(url)
if updater is None:
return six.moves.urllib.request.urlopen(url, data=data, proxies=proxies)
else:
return updater.open(url, data=data)
def __urllib_urlretrieve(url, filename=None, reporthook=None, data=None):
"""Copy a network object denoted by a URL to a local file, if necessary."""
updater = __updater_controller.get(url)
if updater is None:
return six.moves.urllib.request.urlretrieve(url, filename=filename, reporthook=reporthook, data=data)
else:
return updater.retrieve(url, filename=filename, reporthook=reporthook, data=data)
def __urllib2_urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
"""Create a file-like object for the specified URL to read from."""
# We assume that the first argument to instancemethod is a URL-like object;
# that is, either a string or a urllib2.Request.
updater = None
# If this is a urllib2.Request...
if isinstance(url, six.moves.urllib.request.Request):
# If this is a GET HTTP method...
if url.get_method() == "GET":
# ...then you should check with TUF.
updater = __updater_controller.get(url.get_full_url())
else:
# ...otherwise, revert to default behaviour.
logger.warn(NON_GET_HTTP_METHOD_MESSAGE.format(method=url.get_method(),
url=url.get_full_url()))
return six.moves.urllib.request.urlopen(url, data=data, timeout=timeout)
else:
# ...otherwise, we assume this is a string.
updater = __updater_controller.get(url)
if updater is None:
return six.moves.urllib.request.urlopen(url, data=data, timeout=timeout)
else:
response = updater.open(url, data=data)
# See urllib2.AbstractHTTPHandler.do_open
# TODO: let Updater handle this
response.msg = ""
return response
########################### GLOBAL PUBLIC FUNCTIONS ############################
def __read_configuration(configuration_handler,
filename="tuf.interposition.json",
parent_repository_directory=None,
parent_ssl_certificates_directory=None):
"""
A generic function to read TUF interposition configurations off a file, and
then handle those configurations with a given function.
configuration_handler must be a function which accepts a
tuf.interposition.Configuration instance.
Returns the parsed configurations as a dictionary of configurations indexed
by hostnames.
"""
INVALID_TUF_CONFIGURATION = "Invalid configuration for {network_location}!"
INVALID_TUF_INTERPOSITION_JSON = "Invalid configuration in {filename}!"
NO_CONFIGURATIONS = "No configurations found in configuration in {filename}!"
# Configurations indexed by hostnames.
parsed_configurations = {}
try:
with open(filename) as tuf_interposition_json:
tuf_interpositions = json.load(tuf_interposition_json)
configurations = tuf_interpositions.get("configurations", {})
if len(configurations) == 0:
raise tuf.ssl_commons.exceptions.InvalidConfigurationError(NO_CONFIGURATIONS.format(filename=filename))
else:
for network_location, configuration in six.iteritems(configurations):
try:
configuration_parser = ConfigurationParser(network_location,
configuration, parent_repository_directory=parent_repository_directory,
parent_ssl_certificates_directory=parent_ssl_certificates_directory)
# configuration_parser.parse() returns a
# 'tuf.interposition.Configuration' object, which interposition
# uses to determine which URLs should be interposed.
configuration = configuration_parser.parse()
configuration_handler(configuration)
parsed_configurations[configuration.hostname] = configuration
except:
logger.exception(INVALID_TUF_CONFIGURATION.format(network_location=network_location))
raise
except:
logger.exception(INVALID_TUF_INTERPOSITION_JSON.format(filename=filename))
raise
else:
return parsed_configurations
# TODO: Is parent_repository_directory a security risk? For example, would it
# allow the user to overwrite another TUF repository metadata on the filesystem?
# On the other hand, it is beyond TUF's scope to handle filesystem permissions.
# TODO: Ditto for the parent_ssl_certificates_directory parameter.
def configure(filename="tuf.interposition.json",
parent_repository_directory=None,
parent_ssl_certificates_directory=None):
"""
The optional parent_repository_directory parameter is used to specify the
containing parent directory of the "repository_directory" specified in a
configuration for *all* network locations, because sometimes the absolute
location of the "repository_directory" is only known at runtime. If you
need to specify a different parent_repository_directory for other
network locations, simply call this method again with different parameters.
Ditto for the optional parent_ssl_certificates_directory parameter.
Example of a TUF interposition configuration JSON object:
{
"configurations": {
"seattle.cs.washington.edu": {
"repository_directory": "client/",
"repository_mirrors" : {
"mirror1": {
"url_prefix": "http://seattle-tuf.cs.washington.edu",
"metadata_path": "metadata",
"targets_path": "targets",
"confined_target_dirs": [ "" ]
}
},
("target_paths": [
{ ".*/(simple/\\w+)/$": "{0}/index.html" },
{ ".*/(packages/.+)$": "{0}" }
],
"ssl_certificates": "cacert.pem")
}
}
}
"target_paths" is optional: If you do not tell TUF to selectively match
paths with regular expressions, TUF will work over any path under the given
network location. However, if you do specify it, you are then telling TUF
how to transform a specified path into another one, and TUF will *not*
recognize any unspecified path for the given network location.
Unless any "url_prefix" begins with "https://", "ssl_certificates" is
optional; it must specify certificates bundled as PEM (RFC 1422).
Returns the parsed configurations as a dictionary of configurations indexed
by hostnames.
"""
configurations = \
__read_configuration(__updater_controller.add, filename=filename,
parent_repository_directory=parent_repository_directory,
parent_ssl_certificates_directory=parent_ssl_certificates_directory)
return configurations
def refresh(configurations):
"""Refresh the top-level metadata for previously read configurations."""
# Get the updater and refresh its top-level metadata. In the majority of
# integrations, a software updater integrating TUF with interposition will
# usually only require an initial refresh() (i.e., when configure() is
# called). A series of target file requests may then occur, which are all
# referenced by the latest top-level metadata updated by configure().
# Although interposition was designed to remain transparent, for software
# updaters that require an explicit refresh of top-level metadata, this
# method is provided.
for configuration in six.itervalues(configurations):
__updater_controller.refresh(configuration)
def deconfigure(configurations):
"""Remove TUF interposition for previously read configurations."""
for configuration in six.itervalues(configurations):
__updater_controller.remove(configuration)
def open_url(instancemethod):
"""
Decorate an instance method of the form
instancemethod(self, url, ...) with me in order to pass it to TUF.
"""
@functools.wraps(instancemethod)
def wrapper(self, *args, **kwargs):
# We assume that the first argument to instancemethod is a URL-like object;
# that is, either a string or a urllib2.Request.
url_object = args[0]
data = kwargs.get("data")
# If this is a urllib2.Request...
if isinstance(url_object, six.moves.request.Request):
# If this is a GET HTTP method...
if url_object.get_method() == "GET":
# ...then you should check with TUF.
url = url_object.get_full_url()
else:
# ...otherwise, revert to default behaviour.
logger.warn(NON_GET_HTTP_METHOD_MESSAGE.format(method=url_object.get_method(),
url=url_object.get_full_url()))
return instancemethod(self, *args, **kwargs)
# ...otherwise, we assume this is a string.
else:
url = url_object
updater = __updater_controller.get(url)
# If TUF has not been configured for this URL...
if updater is None:
# ...then revert to default behaviour.
return instancemethod(self, *args, **kwargs)
else:
# ...otherwise, use TUF to get this document.
return updater.open(url, data=data)
return wrapper
############################## GLOBAL SIDE EFFECTS #############################
# Build and monkey patch public copies of the urllib and urllib2 modules.
__monkey_patch()

View file

@ -1,406 +0,0 @@
"""
<Program Name>
configuration.py
<Author>
Trishank Kuppusamy
Pankhuri Goyal <pankhurigoyal02@gmail.com>
Vladimir Diaz <vladimir.v.diaz@gmail.com>
<Started>
<Copyright>
See LICENSE for licensing information.
<Purpose>
"""
# Help with Python 3 compatibility where the print statement is a function, an
# implicit relative import is invalid, and the '/' operator performs true
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
import os.path
import logging
import tuf.log
import six
logger = logging.getLogger('tuf.interposition.configuration')
class Configuration(object):
"""
<Purpose>
Holds TUF interposition configuration information about a network
location which is important to an updater for that network location.
"""
def __init__(self, hostname, port, repository_directory, repository_mirrors,
target_paths, ssl_certificates):
"""
<Purpose>
Constructor assumes that its parameters are valid.
<Arguments>
hostname:
port:
repository_directory:
repository_mirrors:
target_paths:
ssl_certificates:
<Exceptions>
<Side Effects>
<Returns>
"""
self.hostname = hostname
self.port = port
self.network_location = \
"{hostname}:{port}".format( hostname = hostname, port = port )
self.repository_directory = repository_directory
self.repository_mirrors = repository_mirrors
self.target_paths = target_paths
self.ssl_certificates = ssl_certificates
def __repr__(self):
MESSAGE = "network location: {network_location}"
return MESSAGE.format(network_location=self.network_location)
def get_repository_mirror_hostnames(self):
"""
<Purpose>
Get a set of hostnames of every repository mirror of this configuration.
<Arguments>
None.
<Exceptions>
<Side Effects>
<Returns>
"""
# Parse TUF server repository mirrors.
repository_mirrors = self.repository_mirrors
repository_mirror_hostnames = set()
for repository_mirror in repository_mirrors:
mirror_configuration = repository_mirrors[repository_mirror]
url_prefix = mirror_configuration["url_prefix"]
parsed_url = six.moves.urllib.parse.urlparse(url_prefix)
mirror_hostname = parsed_url.hostname
mirror_port = parsed_url.port
mirror_network_location = \
"{hostname}:{port}".format(hostname=mirror_hostname, port = mirror_port)
repository_mirror_hostnames.add(mirror_network_location)
return repository_mirror_hostnames
class ConfigurationParser(object):
"""
<Purpose>
Parses TUF interposition configuration information about a network
location, stored as a JSON object, and returns it as a Configuration.
"""
def __init__(self, network_location, configuration,
parent_repository_directory=None,
parent_ssl_certificates_directory=None):
"""
<Purpose>
<Arguments>
network_location:
configuration:
parent_repository_directory:
parent_ssl_certificates_directory:
<Exceptions>
<Side Effects>
<Returns>
None.
"""
self.network_location = network_location
self.configuration = configuration
self.parent_repository_directory = parent_repository_directory
self.parent_ssl_certificates_directory = parent_ssl_certificates_directory
def get_network_location(self):
"""
<Purpose>
Check network location.
<Arguments>
None.
<Exceptions>
<Side Effects>
<Returns>
"""
INVALID_NETWORK_LOCATION = "Invalid network location {network_location}!"
network_location_tokens = self.network_location.split(':', 1)
hostname = network_location_tokens[0]
port = 80
if len(network_location_tokens) > 1:
port = int(network_location_tokens[1], 10)
if port <= 0 or port >= 2**16:
raise tuf.ssl_commons.exceptions.InvalidConfigurationError(INVALID_NETWORK_LOCATION.format(
network_location=self.network_location))
return hostname, port
def get_repository_directory(self):
"""
<Purpose>
Locate TUF client metadata repository.
<Arguments>
None.
<Exceptions>
<Side Effects>
<Returns>
"""
INVALID_PARENT_REPOSITORY_DIRECTORY = \
"Invalid parent_repository_directory for {network_location}!"
repository_directory = self.configuration["repository_directory"]
if self.parent_repository_directory is not None:
parent_repository_directory = \
os.path.abspath(self.parent_repository_directory)
if os.path.isdir(parent_repository_directory):
repository_directory = os.path.join(parent_repository_directory,
repository_directory)
# TODO: assert os.path.isdir(repository_directory)
else:
raise tuf.ssl_commons.exceptions.InvalidConfigurationError(INVALID_PARENT_REPOSITORY_DIRECTORY.format(
network_location=self.network_location))
return repository_directory
def get_ssl_certificates(self):
"""
<Purpose>
Get any PEM certificate bundle.
<Arguments>
None.
<Exceptions>
<Side Effects>
<Returns>
"""
INVALID_SSL_CERTIFICATES = \
"Invalid ssl_certificates for {network_location}!"
INVALID_PARENT_SSL_CERTIFICATES_DIRECTORY = \
"Invalid parent_ssl_certificates_directory for {network_location}!"
ssl_certificates = self.configuration.get("ssl_certificates")
if ssl_certificates is not None:
if self.parent_ssl_certificates_directory is not None:
parent_ssl_certificates_directory = \
os.path.abspath(self.parent_ssl_certificates_directory)
if os.path.isdir(parent_ssl_certificates_directory):
ssl_certificates = os.path.join(parent_ssl_certificates_directory,
ssl_certificates)
if not os.path.isfile(ssl_certificates):
raise tuf.ssl_commons.exceptions.InvalidConfigurationError(INVALID_SSL_CERTIFICATES.format(
network_location=self.network_location))
else:
raise tuf.ssl_commons.exceptions.InvalidConfigurationError(
INVALID_PARENT_SSL_CERTIFICATES_DIRECTORY.format(
network_location=self.network_location))
return ssl_certificates
def get_repository_mirrors(self, hostname, port, ssl_certificates):
"""
<Purpose>
Parse TUF server repository mirrors.
<Arguments>
hostname:
port:
ssl_certificates:
<Exceptions>
<Side Effects>
<Returns>
"""
INVALID_REPOSITORY_MIRROR = "Invalid repository mirror {repository_mirror}!"
repository_mirrors = self.configuration["repository_mirrors"]
repository_mirror_network_locations = set()
for repository_mirror in repository_mirrors:
mirror_configuration = repository_mirrors[repository_mirror]
try:
url_prefix = mirror_configuration["url_prefix"]
parsed_url = six.moves.urllib.parse.urlparse(url_prefix)
mirror_hostname = parsed_url.hostname
mirror_port = parsed_url.port or 80
mirror_scheme = parsed_url.scheme
mirror_netloc = "{hostname}:{port}".format(hostname = mirror_hostname,
port = mirror_port)
# TODO: warn is ssl_certificates is specified,
# but there is no mirror_scheme == "https"
if mirror_scheme == "https":
assert os.path.isfile(ssl_certificates)
# No single-edge cycle in interposition.
# GOOD: A -> { A:XYZ, ... }
# BAD: A -> { A, ... }
assert not (mirror_hostname == hostname and mirror_port == port)
# Unique network location over repository mirrors.
# GOOD: A -> { A:X, A:Y, ... }
# BAD: A -> { A:X, A:X, ... }
assert mirror_netloc not in repository_mirror_network_locations
# Remember this mirror's network location to check the rest of the mirrors.
repository_mirror_network_locations.add(mirror_netloc)
except:
error_message = \
INVALID_REPOSITORY_MIRROR.format(repository_mirror=repository_mirror)
logger.exception(error_message)
raise tuf.ssl_commons.exceptions.InvalidConfigurationError(error_message)
return repository_mirrors
def get_target_paths(self):
"""
<Purpose>
Within a network_location, we match URLs with this list of regular
expressions, which tell us to map from a source URL to a target URL.
If there are multiple regular expressions which match a source URL,
the order of appearance will be used to resolve ambiguity.
<Arguments>
None.
<Exceptions>
<Side Effects>
<Returns>
"""
INVALID_TARGET_PATH = "Invalid target path in {network_location}!"
# An "identity" capture from source URL to target URL.
WILD_TARGET_PATH = { "(.*)": "{0}" }
target_paths = self.configuration.get("target_paths", [WILD_TARGET_PATH])
# target_paths: [ target_path, ... ]
assert isinstance(target_paths, list)
for target_path in target_paths:
try:
# target_path: { "regex_with_groups", "target_with_group_captures" }
# e.g. { ".*(/some/directory)/$", "{0}/index.html" }
assert isinstance(target_path, dict)
assert len(target_path) == 1
except:
error_message = \
INVALID_TARGET_PATH.format(network_location=self.network_location)
logger.exception(error_message)
raise tuf.ssl_commons.exceptions.InvalidConfigurationError(error_message)
return target_paths
# TODO: more input sanity checks?
def parse(self):
"""
<Purpose>
Parse, check, and get the required configuration parameters.
<Arguments>
None.
<Exceptions>
<Side Effects>
<Returns>
"""
hostname, port = self.get_network_location()
ssl_certificates = self.get_ssl_certificates()
repository_directory = self.get_repository_directory()
target_paths = self.get_target_paths()
repository_mirrors = \
self.get_repository_mirrors(hostname, port, ssl_certificates)
# If everything passes, we return a Configuration.
return Configuration(hostname, port, repository_directory,
repository_mirrors, target_paths, ssl_certificates)

File diff suppressed because it is too large Load diff