From 0c072e47f2bd4d7caeb747f7af3cce5a834ad212 Mon Sep 17 00:00:00 2001 From: dachshund Date: Fri, 8 Mar 2013 20:46:17 -0500 Subject: [PATCH] Follow Seattle coding style. --- tuf/interposition/__init__.py | 1 + tuf/interposition/configuration.py | 410 ++++++++++++++--------------- 2 files changed, 206 insertions(+), 205 deletions(-) diff --git a/tuf/interposition/__init__.py b/tuf/interposition/__init__.py index ef5dfdcf..086e78f1 100644 --- a/tuf/interposition/__init__.py +++ b/tuf/interposition/__init__.py @@ -154,6 +154,7 @@ def __urllib2_urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): def configure(filename="tuf.interposition.json", parent_repository_directory=None, parent_ssl_certificates_directory=None): + """ The optional parent_repository_directory parameter is used to specify the containing parent directory of the "repository_directory" specified in a diff --git a/tuf/interposition/configuration.py b/tuf/interposition/configuration.py index e7160495..41a43ecf 100644 --- a/tuf/interposition/configuration.py +++ b/tuf/interposition/configuration.py @@ -3,233 +3,233 @@ import types import urlparse + +# We import them directly into our namespace so that there is no name conflict. from utility import Logger, InterpositionException -class InvalidConfiguration( InterpositionException ): - """User configuration is invalid.""" - - pass -class Configuration( object ): - def __init__( - self, - hostname, - port, - repository_directory, - repository_mirrors, - target_paths, - ssl_certificates - ): - """This constructor assumes that its parameters are valid.""" - self.hostname = hostname - self.port = port - self.network_location = \ - "{hostname}:{port}".format( hostname = hostname, port = port ) - self.repository_directory = repository_directory - self.repository_mirrors = repository_mirrors - self.target_paths = target_paths - self.ssl_certificates = ssl_certificates - self.tempdir = tempfile.mkdtemp() +################################ GLOBAL CLASSES ################################ -class ConfigurationParser( object ): - def __init__( - self, - network_location, - configuration, - parent_repository_directory = None, - parent_ssl_certificates_directory = None - ): - self.network_location = network_location - self.configuration = configuration - self.parent_repository_directory = parent_repository_directory - self.parent_ssl_certificates_directory = parent_ssl_certificates_directory - def get_network_location( self ): - """Check network location.""" - INVALID_NETWORK_LOCATION = \ - "Invalid network location {network_location}!" - - network_location_tokens = self.network_location.split( ':', 1 ) - hostname = network_location_tokens[ 0 ] - port = 80 - - if len( network_location_tokens ) > 1: - port = int( network_location_tokens[ 1 ], 10 ) - if port <= 0 or port >= 2**16: - raise InvalidConfiguration( - INVALID_NETWORK_LOCATION.format( - network_location = self.network_location - ) - ) - - return hostname, port +class InvalidConfiguration(InterpositionException): + """User configuration is invalid.""" + pass - def get_repository_directory( self ): - """Locate TUF client metadata repository.""" - - INVALID_PARENT_REPOSITORY_DIRECTORY = "Invalid " + \ - "parent_repository_directory for {network_location}!" - - repository_directory = self.configuration[ "repository_directory" ] - if self.parent_repository_directory is not None: - parent_repository_directory = \ - os.path.abspath( self.parent_repository_directory ) - if os.path.isdir( parent_repository_directory ): - repository_directory = os.path.join( - parent_repository_directory, - repository_directory - ) - # TODO: assert os.path.isdir( repository_directory ) - else: - raise InvalidConfiguration( - INVALID_PARENT_REPOSITORY_DIRECTORY.format( - network_location = self.network_location - ) - ) - - return repository_directory - def get_ssl_certificates( self ): - """Get any PEM certificate bundle.""" - INVALID_SSL_CERTIFICATES = "Invalid ssl_certificates " + \ - "for {network_location}!" - INVALID_PARENT_SSL_CERTIFICATES_DIRECTORY = "Invalid " + \ - "parent_ssl_certificates_directory for {network_location}!" - - ssl_certificates = self.configuration.get( "ssl_certificates" ) - - if ssl_certificates is not None: - if self.parent_ssl_certificates_directory is not None: - parent_ssl_certificates_directory = \ - os.path.abspath( self.parent_ssl_certificates_directory ) - if os.path.isdir( parent_ssl_certificates_directory ): - ssl_certificates = os.path.join( - parent_ssl_certificates_directory, - ssl_certificates - ) - if not os.path.isfile( ssl_certificates ): - raise InvalidConfiguration( - INVALID_SSL_CERTIFICATES.format( - network_location = self.network_location - ) - ) - else: - raise InvalidConfiguration( - INVALID_PARENT_SSL_CERTIFICATES_DIRECTORY.format( - network_location = self.network_location - ) - ) - - return ssl_certificates +class Configuration(object): + """Holds TUF interposition configuration information about a network + location which is important to an updater for that network location.""" - def get_repository_mirrors( self, hostname, port, ssl_certificates ): - """Parse TUF server repository mirrors.""" + def __init__(self, hostname, port, repository_directory, repository_mirrors, + target_paths, ssl_certificates): - INVALID_REPOSITORY_MIRROR = \ - "Invalid repository mirror {repository_mirror}!" + """Constructor assumes that its parameters are valid.""" - repository_mirrors = self.configuration[ "repository_mirrors" ] - repository_mirror_network_locations = set() - - for repository_mirror in repository_mirrors: - mirror_configuration = repository_mirrors[ repository_mirror ] - try: - url_prefix = mirror_configuration[ "url_prefix" ] - parsed_url = urlparse.urlparse( url_prefix ) - mirror_hostname = parsed_url.hostname - mirror_port = parsed_url.port or 80 - mirror_scheme = parsed_url.scheme - mirror_netloc = "{hostname}:{port}".format( - hostname = mirror_hostname, - port = mirror_port - ) - - # TODO: warn is ssl_certificates is specified, - # but there is no mirror_scheme == "https" - if mirror_scheme == "https": - assert os.path.isfile( ssl_certificates ) - - # No single-edge cycle in interposition. - # GOOD: A -> { A:XYZ, ... } - # BAD: A -> { A, ... } - assert not ( mirror_hostname == hostname and mirror_port == port ) - - # Unique network location over repository mirrors. - # GOOD: A -> { A:X, A:Y, ... } - # BAD: A -> { A:X, A:X, ... } - assert mirror_netloc not in repository_mirror_network_locations - - # Remember this mirror's network location to check the rest of the mirrors. - repository_mirror_network_locations.add( mirror_netloc ) - except: - error_message = INVALID_REPOSITORY_MIRROR.format( - repository_mirror = repository_mirror - ) - Logger.error( error_message ) - raise InvalidConfiguration( error_message ) - - return repository_mirrors + self.hostname = hostname + self.port = port + self.network_location = \ + "{hostname}:{port}".format( hostname = hostname, port = port ) + self.repository_directory = repository_directory + self.repository_mirrors = repository_mirrors + self.target_paths = target_paths + self.ssl_certificates = ssl_certificates + self.tempdir = tempfile.mkdtemp() - def get_target_paths( self ): - """ - Within a network_location, we match URLs with this list of regular - expressions, which tell us to map from a source URL to a target URL. - If there are multiple regular expressions which match a source URL, - the order of appearance will be used to resolve ambiguity. - """ - - INVALID_TARGET_PATH = "Invalid target path in {network_location}!" - # An "identity" capture from source URL to target URL. - WILD_TARGET_PATH = { "(.*)": "{0}" } - - target_paths = \ - self.configuration.get( "target_paths", [ WILD_TARGET_PATH ] ) - - # target_paths: [ target_path, ... ] - assert isinstance( target_paths, types.ListType ) - for target_path in target_paths: - try: - # target_path: { "regex_with_groups", "target_with_group_captures" } - # e.g. { ".*(/some/directory)/$", "{0}/index.html" } - assert isinstance( target_path, types.DictType ) - assert len( target_path ) == 1 - except: - error_message = INVALID_TARGET_PATH.format( - network_location = self.network_location - ) - Logger.error( error_message ) - raise InvalidConfiguration( error_message ) - - return target_paths - # TODO: more input sanity checks? - def parse( self ): - # Parse, check and get the required configuration parameters. - hostname, port = self.get_network_location() - ssl_certificates = self.get_ssl_certificates() - repository_directory = self.get_repository_directory() - repository_mirrors = \ - self.get_repository_mirrors( hostname, port, ssl_certificates ) - target_paths = self.get_target_paths() - # If everything passes, we return a Configuration. - return Configuration( - hostname, - port, - repository_directory, - repository_mirrors, - target_paths, - ssl_certificates - ) +class ConfigurationParser(object): + """Parses TUF interposition configuration information about a network + location, stored as a JSON object, and returns it as a Configuration.""" + + + def __init__(self, network_location, configuration, + parent_repository_directory=None, + parent_ssl_certificates_directory=None): + + self.network_location = network_location + self.configuration = configuration + self.parent_repository_directory = parent_repository_directory + self.parent_ssl_certificates_directory = parent_ssl_certificates_directory + + + def get_network_location(self): + """Check network location.""" + + INVALID_NETWORK_LOCATION = "Invalid network location {network_location}!" + + network_location_tokens = self.network_location.split(':', 1) + hostname = network_location_tokens[0] + port = 80 + + if len(network_location_tokens) > 1: + port = int(network_location_tokens[1], 10) + if port <= 0 or port >= 2**16: + raise InvalidConfiguration(INVALID_NETWORK_LOCATION.format( + network_location=self.network_location)) + + return hostname, port + + + def get_repository_directory(self): + """Locate TUF client metadata repository.""" + + INVALID_PARENT_REPOSITORY_DIRECTORY = \ + "Invalid parent_repository_directory for {network_location}!" + + repository_directory = self.configuration["repository_directory"] + + if self.parent_repository_directory is not None: + parent_repository_directory = \ + os.path.abspath(self.parent_repository_directory) + + if os.path.isdir(parent_repository_directory): + repository_directory = os.path.join(parent_repository_directory, + repository_directory) + # TODO: assert os.path.isdir(repository_directory) + + else: + raise InvalidConfiguration(INVALID_PARENT_REPOSITORY_DIRECTORY.format( + network_location=self.network_location)) + + return repository_directory + + + def get_ssl_certificates(self): + """Get any PEM certificate bundle.""" + + INVALID_SSL_CERTIFICATES = \ + "Invalid ssl_certificates for {network_location}!" + INVALID_PARENT_SSL_CERTIFICATES_DIRECTORY = \ + "Invalid parent_ssl_certificates_directory for {network_location}!" + + ssl_certificates = self.configuration.get("ssl_certificates") + + if ssl_certificates is not None: + if self.parent_ssl_certificates_directory is not None: + parent_ssl_certificates_directory = \ + os.path.abspath(self.parent_ssl_certificates_directory) + + if os.path.isdir(parent_ssl_certificates_directory): + ssl_certificates = os.path.join(parent_ssl_certificates_directory, + ssl_certificates) + + if not os.path.isfile(ssl_certificates): + raise InvalidConfiguration(INVALID_SSL_CERTIFICATES.format( + network_location=self.network_location)) + + else: + raise InvalidConfiguration( + INVALID_PARENT_SSL_CERTIFICATES_DIRECTORY.format( + network_location=self.network_location)) + + return ssl_certificates + + + def get_repository_mirrors(self, hostname, port, ssl_certificates): + """Parse TUF server repository mirrors.""" + + INVALID_REPOSITORY_MIRROR = "Invalid repository mirror {repository_mirror}!" + + repository_mirrors = self.configuration["repository_mirrors"] + repository_mirror_network_locations = set() + + for repository_mirror in repository_mirrors: + mirror_configuration = repository_mirrors[repository_mirror] + + try: + url_prefix = mirror_configuration["url_prefix"] + parsed_url = urlparse.urlparse(url_prefix) + mirror_hostname = parsed_url.hostname + mirror_port = parsed_url.port or 80 + mirror_scheme = parsed_url.scheme + mirror_netloc = "{hostname}:{port}".format(hostname = mirror_hostname, + port = mirror_port) + + # TODO: warn is ssl_certificates is specified, + # but there is no mirror_scheme == "https" + if mirror_scheme == "https": + assert os.path.isfile(ssl_certificates) + + # No single-edge cycle in interposition. + # GOOD: A -> { A:XYZ, ... } + # BAD: A -> { A, ... } + assert not (mirror_hostname == hostname and mirror_port == port) + + # Unique network location over repository mirrors. + # GOOD: A -> { A:X, A:Y, ... } + # BAD: A -> { A:X, A:X, ... } + assert mirror_netloc not in repository_mirror_network_locations + + # Remember this mirror's network location to check the rest of the mirrors. + repository_mirror_network_locations.add(mirror_netloc) + + except: + error_message = \ + INVALID_REPOSITORY_MIRROR.format(repository_mirror=repository_mirror) + Logger.error(error_message) + raise InvalidConfiguration(error_message) + + return repository_mirrors + + + def get_target_paths(self): + """ + Within a network_location, we match URLs with this list of regular + expressions, which tell us to map from a source URL to a target URL. + If there are multiple regular expressions which match a source URL, + the order of appearance will be used to resolve ambiguity. + """ + + INVALID_TARGET_PATH = "Invalid target path in {network_location}!" + + # An "identity" capture from source URL to target URL. + WILD_TARGET_PATH = { "(.*)": "{0}" } + + target_paths = self.configuration.get("target_paths", [WILD_TARGET_PATH]) + + # target_paths: [ target_path, ... ] + assert isinstance(target_paths, types.ListType) + + for target_path in target_paths: + try: + # target_path: { "regex_with_groups", "target_with_group_captures" } + # e.g. { ".*(/some/directory)/$", "{0}/index.html" } + assert isinstance(target_path, types.DictType) + assert len(target_path) == 1 + + except: + error_message = \ + INVALID_TARGET_PATH.format(network_location=self.network_location) + Logger.error(error_message) + raise InvalidConfiguration(error_message) + + return target_paths + + + # TODO: more input sanity checks? + def parse(self): + """Parse, check and get the required configuration parameters.""" + + hostname, port = self.get_network_location() + ssl_certificates = self.get_ssl_certificates() + repository_directory = self.get_repository_directory() + target_paths = self.get_target_paths() + + repository_mirrors = \ + self.get_repository_mirrors(hostname, port, ssl_certificates) + + # If everything passes, we return a Configuration. + return Configuration(hostname, port, repository_directory, repository_mirrors, + target_paths, ssl_certificates)