mirror of
https://github.com/theupdateframework/python-tuf
synced 2026-05-24 10:08:28 +00:00
Merge pull request #264 from vladimir-v-diaz/review-pankh-interposition
Review interposition PR #254
This commit is contained in:
commit
e5492dcf7b
13 changed files with 1736 additions and 324 deletions
1
.gitignore
vendored
1
.gitignore
vendored
|
|
@ -11,3 +11,4 @@ build/*
|
|||
*.egg-info
|
||||
.coverage
|
||||
.tox/*
|
||||
tests/htmlcov/*
|
||||
|
|
|
|||
472
tests/test_interpose_updater.py
Executable file
472
tests/test_interpose_updater.py
Executable file
|
|
@ -0,0 +1,472 @@
|
|||
#!/usr/bin/env python
|
||||
|
||||
"""
|
||||
<Program Name>
|
||||
test_interpose_updater.py
|
||||
|
||||
<Author>
|
||||
Pankhuri Goyal <pankhurigoyal02@gmail.com>
|
||||
|
||||
<Started>
|
||||
August 2014.
|
||||
|
||||
<Copyright>
|
||||
See LICENSE for licensing information.
|
||||
|
||||
<Purpose>
|
||||
Unit test for 'tuf.interposition.updater.py'.
|
||||
"""
|
||||
|
||||
from __future__ import print_function
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import os
|
||||
import sys
|
||||
import tempfile
|
||||
import subprocess
|
||||
import random
|
||||
import shutil
|
||||
import logging
|
||||
import time
|
||||
import copy
|
||||
import json
|
||||
|
||||
import tuf
|
||||
import tuf.util
|
||||
import tuf.conf
|
||||
import tuf.log
|
||||
import tuf.interposition.updater as updater
|
||||
import tuf.interposition.configuration as configuration
|
||||
import tuf.unittest_toolbox as unittest_toolbox
|
||||
|
||||
if sys.version_info >= (2, 7):
|
||||
import unittest
|
||||
|
||||
else:
|
||||
import unittest2 as unittest
|
||||
|
||||
|
||||
logger = logging.getLogger('tuf.test_interpose_updater')
|
||||
|
||||
|
||||
class TestUpdaterController(unittest_toolbox.Modified_TestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
# This method is called before tests in individual class are executed.
|
||||
|
||||
# Create a temporary directory to store the repository, metadata, and
|
||||
# target files. 'temporary_directory' must be deleted in TearDownModule()
|
||||
# so that temporary files are always removed, even when exceptions occur.
|
||||
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
|
||||
|
||||
# Launch a SimpleHTTPServer (serves files in the current directory).
|
||||
# Test cases will request metadata and target files that have been
|
||||
# pre-generated in 'tuf/tests/repository_data', which will be served
|
||||
# by the SimpleHTTPServer launched here. The test cases of
|
||||
# 'test_updater.py' assume the pre-generated metadata files have a specific
|
||||
# structure, such as a delegated role 'targets/role1', three target
|
||||
# files, five key files, etc.
|
||||
cls.SERVER_PORT = random.randint(30000, 45000)
|
||||
command = ['python', 'simple_server.py', str(cls.SERVER_PORT)]
|
||||
cls.server_process = subprocess.Popen(command, stderr=subprocess.PIPE)
|
||||
logger.info('\n\tServer process started.')
|
||||
logger.info('\tServer process id: '+str(cls.server_process.pid))
|
||||
logger.info('\tServing on port: '+str(cls.SERVER_PORT))
|
||||
cls.url = 'http://localhost:'+str(cls.SERVER_PORT) + os.path.sep
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
# Remove the temporary directory after all the tests are done.
|
||||
shutil.rmtree(cls.temporary_directory)
|
||||
|
||||
# Kill the SimpleHTTPServer Process
|
||||
if cls.server_process is None:
|
||||
message = '\tServer process ' + str(cls.server_process.pid) + \
|
||||
' terminated.'
|
||||
logger.info(message)
|
||||
cls.server_process.kill()
|
||||
|
||||
|
||||
|
||||
def setUp(self):
|
||||
# We are inheriting from custom class.
|
||||
unittest_toolbox.Modified_TestCase.setUp(self)
|
||||
|
||||
# Copy the original repository files provided in the test folder so that
|
||||
# any modifications made to repository files are restricted to the copies.
|
||||
# The 'repository_data' directory is expected to exist in 'tuf.tests/'.
|
||||
original_repository_files = os.path.join(os.getcwd(), 'repository_data')
|
||||
temporary_repository_root = \
|
||||
self.make_temp_directory(directory=self.temporary_directory)
|
||||
|
||||
# The original repository, keystore, and client directories will be copied
|
||||
# for each test case.
|
||||
original_repository = os.path.join(original_repository_files, 'repository')
|
||||
original_keystore = os.path.join(original_repository_files, 'keystore')
|
||||
original_client = os.path.join(original_repository_files, 'client')
|
||||
|
||||
# Save references to the often-needed client repository directories.
|
||||
# Test cases need these references to access metadata and target files.
|
||||
self.repository_directory = \
|
||||
os.path.join(temporary_repository_root, 'repository')
|
||||
self.keystore_directory = \
|
||||
os.path.join(temporary_repository_root, 'keystore')
|
||||
self.client_directory = os.path.join(temporary_repository_root, 'client')
|
||||
self.client_metadata = os.path.join(self.client_directory, 'metadata')
|
||||
self.client_metadata_current = os.path.join(self.client_metadata, 'current')
|
||||
self.client_metadata_previous = \
|
||||
os.path.join(self.client_metadata, 'previous')
|
||||
|
||||
# Copy the original 'repository', 'client', and 'keystore' directories
|
||||
# to the temporary repository the test cases can use.
|
||||
shutil.copytree(original_repository, self.repository_directory)
|
||||
shutil.copytree(original_client, self.client_directory)
|
||||
shutil.copytree(original_keystore, self.keystore_directory)
|
||||
|
||||
# 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'.
|
||||
repository_basepath = self.repository_directory[len(os.getcwd()):]
|
||||
|
||||
# Test Set 1 -
|
||||
port = self.SERVER_PORT
|
||||
url_prefix = 'http://localhost:' + str(port) + repository_basepath
|
||||
|
||||
# Setting 'tuf.conf.repository_directory' with the temporary client
|
||||
# directory copied from the original repository files.
|
||||
tuf.conf.repository_directory = self.client_directory
|
||||
|
||||
self.repository_mirrors = {'mirror': {'url_prefix': url_prefix,
|
||||
'metadata_path': 'metadata',
|
||||
'targets_path': 'targets',
|
||||
'confined_target_dirs': ['']}
|
||||
}
|
||||
|
||||
self.target_filepath = [{".*/targets":"/file1.txt"}]
|
||||
|
||||
self.good_configuration = configuration.Configuration('localhost', 8001,
|
||||
self.client_directory,
|
||||
self.repository_mirrors,
|
||||
self.target_filepath, None)
|
||||
|
||||
self.test1_configuration = configuration.Configuration('localhost', port,
|
||||
self.client_directory,
|
||||
self.repository_mirrors,
|
||||
'targets', None)
|
||||
|
||||
self.test2_configuration = configuration.Configuration('localhost', 8002,
|
||||
self.client_directory,
|
||||
self.repository_mirrors,
|
||||
'targets', None)
|
||||
|
||||
test_server_port=random.randint(30000, 45000)
|
||||
|
||||
self.test3_configuration = configuration.Configuration('localhost',
|
||||
test_server_port,
|
||||
self.client_directory,
|
||||
self.repository_mirrors,
|
||||
'targets', None)
|
||||
|
||||
url_prefix_test = \
|
||||
'http://localhost:' + str(test_server_port) + repository_basepath
|
||||
|
||||
|
||||
self.repository_mirrors = {'mirror': {'url_prefix': url_prefix_test,
|
||||
'metadata_path': 'metadata',
|
||||
'targets_path': 'targets',
|
||||
'confined_target_dirs': ['']}
|
||||
}
|
||||
|
||||
self.test4_configuration = configuration.Configuration('localhost', 8004,
|
||||
self.client_directory,
|
||||
self.repository_mirrors,
|
||||
'targets', None)
|
||||
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
# We are inheriting from custom class.
|
||||
unittest_toolbox.Modified_TestCase.tearDown(self)
|
||||
|
||||
|
||||
|
||||
# Unit Tests
|
||||
def test_add(self):
|
||||
updater_controller = updater.UpdaterController()
|
||||
|
||||
# Given good configuration, the UpdaterController.add() should work.
|
||||
updater_controller.add(self.good_configuration)
|
||||
|
||||
# Instead of configuration, if some number is given.
|
||||
self.assertRaises(tuf.InvalidConfigurationError, updater_controller.add, 8)
|
||||
|
||||
# Hostname already exists, should raise exception.
|
||||
self.assertRaises(tuf.FormatError, updater_controller.add,
|
||||
self.good_configuration)
|
||||
|
||||
# Hostname already exists as a mirror, should raise an exception.
|
||||
self.assertRaises(tuf.FormatError, updater_controller.add,
|
||||
self.test1_configuration)
|
||||
|
||||
# Repository mirror already exists as another mirror.
|
||||
self.assertRaises(tuf.FormatError, updater_controller.add,
|
||||
self.test2_configuration)
|
||||
|
||||
# Remove the old updater.
|
||||
updater_controller.remove(self.good_configuration)
|
||||
|
||||
# Add a new updater for this test.
|
||||
updater_controller.add(self.test3_configuration)
|
||||
|
||||
# Repository mirror already exists as an updater.
|
||||
self.assertRaises(tuf.FormatError, updater_controller.add,
|
||||
self.test4_configuration)
|
||||
|
||||
# Remove the updater once the testing is completed.
|
||||
updater_controller.remove(self.test3_configuration)
|
||||
|
||||
|
||||
def test_refresh(self):
|
||||
updater_controller = updater.UpdaterController()
|
||||
|
||||
# To check refresh() method, add a configuration for test.
|
||||
updater_controller.add(self.good_configuration)
|
||||
|
||||
updater_controller.refresh(self.good_configuration)
|
||||
|
||||
# Check for invalid configuration error.
|
||||
self.assertRaises(tuf.InvalidConfigurationError,
|
||||
updater_controller.refresh, 8)
|
||||
|
||||
# Check if the updater not added in the updater list is refreshed, gives an
|
||||
# error or not.
|
||||
self.assertRaises(tuf.NotFoundError, updater_controller.refresh,
|
||||
self.test1_configuration)
|
||||
|
||||
# Giving the same port number and network location as good_configuration.
|
||||
self.test4_configuration.port = 8001
|
||||
self.test4_configuration.network_location = 'localhost:8001'
|
||||
|
||||
# Check if the mirror not added is refreshed, gives an error or not.
|
||||
self.assertRaises(tuf.NotFoundError, updater_controller.refresh,
|
||||
self.test4_configuration)
|
||||
|
||||
# Make an object of tuf.interposition.updater.Updater of good configuration
|
||||
# for testing.
|
||||
good_updater = updater.Updater(self.good_configuration)
|
||||
good_updater.refresh()
|
||||
|
||||
self.good_configuration.repository_mirrors['mirror']['url_prefix'] = \
|
||||
'http://localhost:99999999'
|
||||
|
||||
# To check if a bad url_prefix of a mirror raises an exception or not.
|
||||
self.assertRaises(tuf.NoWorkingMirrorError, good_updater.refresh)
|
||||
|
||||
|
||||
|
||||
def test_get(self):
|
||||
updater_controller = updater.UpdaterController()
|
||||
|
||||
updater_controller.add(self.good_configuration)
|
||||
|
||||
url = 'http://localhost:8001'
|
||||
updater_controller.get(url)
|
||||
|
||||
wrong_url = 'http://localhost:9999'
|
||||
updater_controller.get(wrong_url)
|
||||
|
||||
good_updater = updater.Updater(self.good_configuration)
|
||||
self.assertRaises(tuf.URLMatchesNoPatternError,
|
||||
good_updater.get_target_filepath, url)
|
||||
|
||||
|
||||
|
||||
def test_remove(self):
|
||||
updater_controller = updater.UpdaterController()
|
||||
|
||||
# To check remove() method, add a configuration for test.
|
||||
updater_controller.add(self.good_configuration)
|
||||
|
||||
# Check for invalid configuration error.
|
||||
self.assertRaises(tuf.InvalidConfigurationError,
|
||||
updater_controller.remove, 8)
|
||||
|
||||
self.assertRaises(tuf.NotFoundError, updater_controller.remove,
|
||||
self.test1_configuration)
|
||||
|
||||
# Giving the same port number and network location as good_configuration.
|
||||
self.test4_configuration.port = 8001
|
||||
self.test4_configuration.network_location = 'localhost:8001'
|
||||
|
||||
self.assertRaises(tuf.NotFoundError, updater_controller.remove,
|
||||
self.test4_configuration)
|
||||
|
||||
|
||||
class TestUpdater(unittest_toolbox.Modified_TestCase):
|
||||
|
||||
@classmethod
|
||||
def setUpClass(cls):
|
||||
# This method is called before tests in individual class are executed.
|
||||
|
||||
# Create a temporary directory to store the repository, metadata, and
|
||||
# target files. 'temporary_directory' must be deleted in TearDownModule()
|
||||
# so that temporary files are always removed, even when exceptions occur.
|
||||
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
|
||||
|
||||
# Launch a SimpleHTTPServer (serves files in the current directory).
|
||||
# Test cases will request metadata and target files that have been
|
||||
# pre-generated in 'tuf/tests/repository_data', which will be served
|
||||
# by the SimpleHTTPServer launched here. The test cases of
|
||||
# 'test_updater.py' assume the pre-generated metadata files have a specific
|
||||
# structure, such as a delegated role 'targets/role1', three target
|
||||
# files, five key files, etc.
|
||||
cls.SERVER_PORT = random.randint(30000, 45000)
|
||||
command = ['python', 'simple_server.py', str(cls.SERVER_PORT)]
|
||||
cls.server_process = subprocess.Popen(command, stderr=subprocess.PIPE)
|
||||
logger.info('\n\tServer process started.')
|
||||
logger.info('\tServer process id: '+str(cls.server_process.pid))
|
||||
logger.info('\tServing on port: '+str(cls.SERVER_PORT))
|
||||
cls.url = 'http://localhost:'+str(cls.SERVER_PORT) + os.path.sep
|
||||
|
||||
time.sleep(1)
|
||||
|
||||
@classmethod
|
||||
def tearDownClass(cls):
|
||||
# Remove the temporary directory after all the tests are done.
|
||||
shutil.rmtree(cls.temporary_directory)
|
||||
|
||||
# Kill the SimpleHTTPServer Process
|
||||
if cls.server_process is None:
|
||||
message = '\tServer process ' + str(cls.server_process.pid) + \
|
||||
' terminated.'
|
||||
logger.info(message)
|
||||
cls.server_process.kill()
|
||||
|
||||
|
||||
def setUp(self):
|
||||
# We are inheriting from custom class.
|
||||
unittest_toolbox.Modified_TestCase.setUp(self)
|
||||
|
||||
# Copy the original repository files provided in the test folder so that
|
||||
# any modifications made to repository files are restricted to the copies.
|
||||
# The 'repository_data' directory is expected to exist in 'tuf.tests/'.
|
||||
original_repository_files = os.path.join(os.getcwd(), 'repository_data')
|
||||
temporary_repository_root = \
|
||||
self.make_temp_directory(directory=self.temporary_directory)
|
||||
|
||||
# The original repository, keystore, and client directories will be copied
|
||||
# for each test case.
|
||||
original_repository = os.path.join(original_repository_files, 'repository')
|
||||
original_keystore = os.path.join(original_repository_files, 'keystore')
|
||||
original_client = os.path.join(original_repository_files, 'client')
|
||||
|
||||
# Save references to the often-needed client repository directories.
|
||||
# Test cases need these references to access metadata and target files.
|
||||
self.repository_directory = \
|
||||
os.path.join(temporary_repository_root, 'repository')
|
||||
self.keystore_directory = \
|
||||
os.path.join(temporary_repository_root, 'keystore')
|
||||
self.client_directory = os.path.join(temporary_repository_root, 'client')
|
||||
self.client_metadata = os.path.join(self.client_directory, 'metadata')
|
||||
self.client_metadata_current = os.path.join(self.client_metadata, 'current')
|
||||
self.client_metadata_previous = \
|
||||
os.path.join(self.client_metadata, 'previous')
|
||||
|
||||
# Copy the original 'repository', 'client', and 'keystore' directories
|
||||
# to the temporary repository the test cases can use.
|
||||
shutil.copytree(original_repository, self.repository_directory)
|
||||
shutil.copytree(original_client, self.client_directory)
|
||||
shutil.copytree(original_keystore, self.keystore_directory)
|
||||
|
||||
# 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'.
|
||||
repository_basepath = self.repository_directory[len(os.getcwd()):]
|
||||
|
||||
# Test Set 1 -
|
||||
port = self.SERVER_PORT
|
||||
url_prefix = 'http://localhost:' + str(port) + repository_basepath
|
||||
|
||||
# Setting 'tuf.conf.repository_directory' with the temporary client
|
||||
# directory copied from the original repository files.
|
||||
tuf.conf.repository_directory = self.client_directory
|
||||
|
||||
self.repository_mirrors = {'mirror': {'url_prefix': url_prefix,
|
||||
'metadata_path': 'metadata',
|
||||
'targets_path': 'targets',
|
||||
'confined_target_dirs': ['']}
|
||||
}
|
||||
|
||||
self.target_paths = [{".*/targets":"/file1.txt"}]
|
||||
|
||||
self.good_configuration = configuration.Configuration('localhost', 8001,
|
||||
self.client_directory,
|
||||
self.repository_mirrors,
|
||||
self.target_paths, None)
|
||||
|
||||
|
||||
|
||||
def tearDown(self):
|
||||
# We are inheriting from custom class.
|
||||
unittest_toolbox.Modified_TestCase.tearDown(self)
|
||||
|
||||
|
||||
# Unit Tests
|
||||
def test_download_target(self):
|
||||
myUpdater = updater.Updater(self.good_configuration)
|
||||
|
||||
target_filepath = 'file.txt'
|
||||
self.assertRaises(tuf.UnknownTargetError, myUpdater.download_target,
|
||||
target_filepath)
|
||||
|
||||
self.assertRaises(tuf.FormatError, myUpdater.download_target, 8)
|
||||
|
||||
target_filepath = 'file1.txt'
|
||||
myUpdater.download_target(target_filepath)
|
||||
|
||||
|
||||
def test_get_target_filepath(self):
|
||||
myUpdater = updater.Updater(self.good_configuration)
|
||||
|
||||
self.assertRaises(AttributeError, myUpdater.get_target_filepath, 8)
|
||||
|
||||
test_source_url = 'http://localhost:9999'
|
||||
self.assertRaises(tuf.URLMatchesNoPatternError,
|
||||
myUpdater.get_target_filepath, test_source_url)
|
||||
|
||||
test_source_url = 'http://localhost:8001/targets/file.txt'
|
||||
myUpdater.get_target_filepath(test_source_url)
|
||||
|
||||
|
||||
def test_open(self):
|
||||
myUpdater = updater.Updater(self.good_configuration)
|
||||
|
||||
self.assertRaises(AttributeError, myUpdater.open, 8)
|
||||
|
||||
url = 'http://localhost:8001/targets/file1.txt'
|
||||
myUpdater.open(url, 'interposition.json')
|
||||
|
||||
|
||||
def test_retrieve(self):
|
||||
myUpdater = updater.Updater(self.good_configuration)
|
||||
|
||||
self.assertRaises(AttributeError, myUpdater.retrieve, 8)
|
||||
|
||||
test_source_url = 'http://localhost:8001/targets/file1.txt'
|
||||
myUpdater.retrieve(test_source_url, 'interposition.json')
|
||||
|
||||
#self.assertRaises(tuf.NoWorkingMirrorError, myUpdater.retrieve, test_source_url)
|
||||
|
||||
test_source_url = 'http://6767:localhost'
|
||||
self.assertRaises(tuf.URLMatchesNoPatternError, myUpdater.retrieve,
|
||||
test_source_url)
|
||||
|
||||
test_source_url = 'http://localhost:8001/targets/file1.txt'
|
||||
myUpdater.retrieve(test_source_url)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
|
@ -243,18 +243,25 @@ def test_1__init__exceptions(self):
|
|||
shutil.move(self.client_metadata_previous, previous_backup)
|
||||
self.assertRaises(tuf.RepositoryError, updater.Updater, 'test_repository',
|
||||
self.repository_mirrors)
|
||||
|
||||
# Restore the client's previous directory. The required 'current' directory
|
||||
# is still missing.
|
||||
shutil.move(previous_backup, self.client_metadata_previous)
|
||||
|
||||
|
||||
# Test: repository with only a '{repository_directory/metadata/previous'
|
||||
# Test: repository with only a '{repository_directory}/metadata/previous'
|
||||
# directory.
|
||||
self.assertRaises(tuf.RepositoryError, updater.Updater, 'test_repository',
|
||||
self.repository_mirrors)
|
||||
# Restore the client's current directory.
|
||||
shutil.move(current_backup, self.client_metadata_current)
|
||||
|
||||
# Test: repository with a '{repository_directory}/metadata/current'
|
||||
# directory, but the 'previous' directory is missing.
|
||||
shutil.move(self.client_metadata_previous, previous_backup)
|
||||
self.assertRaises(tuf.RepositoryError, updater.Updater, 'test_repository',
|
||||
self.repository_mirrors)
|
||||
shutil.move(previous_backup, self.client_metadata_previous)
|
||||
|
||||
# Test: repository missing the required 'root.json' file.
|
||||
client_root_file = os.path.join(self.client_metadata_current, 'root.json')
|
||||
backup_root_file = client_root_file + '.backup'
|
||||
|
|
@ -290,10 +297,16 @@ def test_1__load_metadata_from_file(self):
|
|||
# (i.e., only the 'root.json' file should have been loaded.
|
||||
self.assertEqual(len(self.repository_updater.metadata['current']), 5)
|
||||
|
||||
# Verify that the content of root metadata is valid.
|
||||
# Verify that the content of root metadata is valid.
|
||||
self.assertEqual(self.repository_updater.metadata['current']['targets/role1'],
|
||||
role1_meta['signed'])
|
||||
|
||||
# Test invalid metadata set argument (must be either
|
||||
# 'current' or 'previous'.)
|
||||
self.assertRaises(tuf.Error,
|
||||
self.repository_updater._load_metadata_from_file,
|
||||
'bad_metadata_set', 'targets/role1')
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
@ -413,7 +426,40 @@ def test_2__import_delegations(self):
|
|||
for keyid in keyids:
|
||||
self.assertTrue(keyid in tuf.keydb._keydb_dict)
|
||||
|
||||
# Verify that _import_delegations() ignores invalid keytypes in the 'keys'
|
||||
# field of parent role's 'delegations'
|
||||
existing_keyid = keyids[0]
|
||||
|
||||
self.repository_updater.metadata['current']['targets']\
|
||||
['delegations']['keys'][existing_keyid]['keytype'] = 'bad_keytype'
|
||||
self.repository_updater._import_delegations('targets')
|
||||
# Restore the keytype of 'existing_keyid'.
|
||||
self.repository_updater.metadata['current']['targets']\
|
||||
['delegations']['keys'][existing_keyid]['keytype'] = 'rsa'
|
||||
|
||||
# Verify that _import_delegations() raises an exception if any key in
|
||||
# 'delegations' is improperly formatted (i.e., bad keyid.)
|
||||
tuf.keydb.clear_keydb()
|
||||
self.repository_updater.metadata['current']['targets']\
|
||||
['delegations']['keys'][existing_keyid]['keyid'] = '123'
|
||||
|
||||
print(repr(self.repository_updater.metadata['current']['targets']\
|
||||
['delegations']['keys'][existing_keyid]['keyid']))
|
||||
self.repository_updater._import_delegations('targets')
|
||||
#self.assertRaises(tuf.Error, self.repository_updater._import_delegations,
|
||||
# 'targets')
|
||||
|
||||
# Restore the keyid of 'existing_keyids2'.
|
||||
self.repository_updater.metadata['current']['targets']\
|
||||
['delegations']['keys'][existing_keyid]['keyid'] = existing_keyid
|
||||
|
||||
# Verify that _import_delegations() raises an exception if it fails to add
|
||||
# one of the roles loaded from parent role's 'delegations'.
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -394,6 +394,7 @@ def test_C2_find_delegated_role(self):
|
|||
self.assertTrue(tuf.formats.ROLELIST_SCHEMA.matches(role_list))
|
||||
self.assertEqual(tuf.util.find_delegated_role(role_list, 'targets/tuf'), 1)
|
||||
self.assertEqual(tuf.util.find_delegated_role(role_list, 'targets/warehouse'), 0)
|
||||
|
||||
# Test for non-existent role. 'find_delegated_role()' returns 'None'
|
||||
# if the role is not found.
|
||||
self.assertEqual(tuf.util.find_delegated_role(role_list, 'targets/non-existent'),
|
||||
|
|
|
|||
|
|
@ -293,7 +293,7 @@ class UnknownTargetError(Error):
|
|||
|
||||
|
||||
class InvalidNameError(Error):
|
||||
"""Indicate an error while trying to validate any type of named object"""
|
||||
"""Indicate an error while trying to validate any type of named object."""
|
||||
pass
|
||||
|
||||
|
||||
|
|
@ -345,3 +345,18 @@ def __str__(self):
|
|||
all_errors += '\n ' + repr(mirror_netloc) + ': ' + repr(mirror_error)
|
||||
|
||||
return all_errors
|
||||
|
||||
|
||||
class NotFoundError(Error):
|
||||
"""If a required configuration or resource is not found."""
|
||||
pass
|
||||
|
||||
|
||||
class URLMatchesNoPatternError(Error):
|
||||
"""If a URL does not match a user-specified regular expression."""
|
||||
pass
|
||||
|
||||
|
||||
class InvalidConfigurationError(Error):
|
||||
"""If a configuration object does not match the expected format."""
|
||||
pass
|
||||
|
|
|
|||
|
|
@ -324,7 +324,7 @@ def __init__(self, updater_name, repository_mirrors):
|
|||
|
||||
# Ensure the current path is valid/exists before saving it.
|
||||
if not os.path.exists(current_path):
|
||||
message = 'Missing '+repr(current_path)+'. This path must exist and, ' \
|
||||
message = 'Missing ' + repr(current_path) + '. This path must exist and, ' \
|
||||
'at a minimum, contain the root metadata file.'
|
||||
raise tuf.RepositoryError(message)
|
||||
self.metadata_directory['current'] = current_path
|
||||
|
|
@ -334,7 +334,7 @@ def __init__(self, updater_name, repository_mirrors):
|
|||
|
||||
# Ensure the previous path is valid/exists.
|
||||
if not os.path.exists(previous_path):
|
||||
message = 'Missing '+repr(previous_path)+'. This path must exist.'
|
||||
message = 'Missing ' + repr(previous_path) + '. This path must exist.'
|
||||
raise tuf.RepositoryError(message)
|
||||
self.metadata_directory['previous'] = previous_path
|
||||
|
||||
|
|
@ -402,7 +402,7 @@ def _load_metadata_from_file(self, metadata_set, metadata_role):
|
|||
|
||||
# Ensure we have a valid metadata set.
|
||||
if metadata_set not in ['current', 'previous']:
|
||||
raise tuf.Error('Invalid metadata set: '+repr(metadata_set))
|
||||
raise tuf.Error('Invalid metadata set: ' + repr(metadata_set))
|
||||
|
||||
# Save and construct the full metadata path.
|
||||
metadata_directory = self.metadata_directory[metadata_set]
|
||||
|
|
@ -515,7 +515,7 @@ def _import_delegations(self, parent_role):
|
|||
keys_info = current_parent_metadata['delegations'].get('keys', {})
|
||||
roles_info = current_parent_metadata['delegations'].get('roles', [])
|
||||
|
||||
logger.debug('Adding roles delegated from '+repr(parent_role)+'.')
|
||||
logger.debug('Adding roles delegated from ' + repr(parent_role) + '.')
|
||||
|
||||
# Iterate through the keys of the delegated roles of 'parent_role'
|
||||
# and load them.
|
||||
|
|
@ -532,12 +532,12 @@ def _import_delegations(self, parent_role):
|
|||
pass
|
||||
|
||||
except (tuf.FormatError, tuf.Error) as e:
|
||||
logger.exception('Failed to add keyid: '+repr(keyid)+'.')
|
||||
logger.error('Aborting role delegation for parent role '+parent_role+'.')
|
||||
logger.exception('Invalid key for keyid: ' + repr(keyid) + '.')
|
||||
logger.error('Aborting role delegation for parent role ' + parent_role + '.')
|
||||
raise
|
||||
|
||||
else:
|
||||
logger.warning('Invalid key type for '+repr(keyid)+'.')
|
||||
logger.warning('Invalid key type for ' + repr(keyid) + '.')
|
||||
continue
|
||||
|
||||
# Add the roles to the role database.
|
||||
|
|
@ -546,14 +546,14 @@ def _import_delegations(self, parent_role):
|
|||
# NOTE: tuf.roledb.add_role will take care of the case where rolename
|
||||
# is None.
|
||||
rolename = roleinfo.get('name')
|
||||
logger.debug('Adding delegated role: '+str(rolename)+'.')
|
||||
logger.debug('Adding delegated role: ' + str(rolename) + '.')
|
||||
tuf.roledb.add_role(rolename, roleinfo)
|
||||
|
||||
except tuf.RoleAlreadyExistsError as e:
|
||||
logger.warning('Role already exists: '+rolename)
|
||||
logger.warning('Role already exists: ' + rolename)
|
||||
|
||||
except:
|
||||
logger.exception('Failed to add delegated role: '+rolename+'.')
|
||||
logger.exception('Failed to add delegated role: ' + rolename + '.')
|
||||
raise
|
||||
|
||||
|
||||
|
|
|
|||
|
|
@ -1,28 +1,56 @@
|
|||
## Examples
|
||||
## Interposition
|
||||
|
||||
```python
|
||||
import tuf.interposition
|
||||
# Configurations are simply a JSON object which allows you to answer these questions:
|
||||
# - Which network locations get intercepted?
|
||||
# - Given a network location, which TUF mirrors should we forward requests to?
|
||||
# - Given a network location, which paths should be intercepted?
|
||||
# - Given a TUF mirror, how do we verify its SSL certificate?
|
||||
tuf.interposition.configure()
|
||||
```
|
||||
The interposition package (tuf/interposition/) can be used to integrate TUF
|
||||
into a software updater. It is an integration method that requires the least
|
||||
amount of effort from developers who are performing the integration. The
|
||||
integration method used by interposition is considered high-level because the
|
||||
integrator does not explicitly call TUF methods to refresh metadata and
|
||||
download target files. For example, performing a low-level integration with
|
||||
*tuf/client/updater.py* requires the integrator to instantiate an updater
|
||||
object, call updater.refresh() to refresh TUF metadata, and
|
||||
updater.download_target() to download target files referenced in TUF metadata.
|
||||
In contrast, an integrator may utilize interposition to load some configuration
|
||||
settings to indicate which URLs requested by Python urllib calls should be
|
||||
interposed by TUF. This means that all the update calls for metadata and
|
||||
target requests are made transparently by the low level *tuf/client/updater.py*
|
||||
module.
|
||||
|
||||
### Option one
|
||||
|
||||
### Interposition Examples
|
||||
|
||||
To use interposition, integrators must:
|
||||
|
||||
1. Create an interposition configuration file.
|
||||
2. Import interposition, and load the configuration file with configure().
|
||||
3. Perform updater urllib calls that may be interposed.
|
||||
4. Deconfigure interposition.
|
||||
|
||||
|
||||
## Option 1
|
||||
|
||||
```python
|
||||
from tuf.interposition import urllib_tuf as urllib
|
||||
from tuf.interposition import urllib2_tuf as urllib2
|
||||
|
||||
# configure() loads the interposition configuration file that indicates which
|
||||
# URLs should be interposed by TUF. Any urllib calls that occur after
|
||||
# configure() are subject to interposition.
|
||||
|
||||
configuration = tuf.interposition.configure()
|
||||
|
||||
url = 'http://example.com/path/to/document'
|
||||
|
||||
urllib.urlopen(url)
|
||||
urllib.urlretrieve(url)
|
||||
urllib.urlretrieve(url, 'mytarget')
|
||||
urllib2.urlopen(url)
|
||||
|
||||
# deconfigure() is used to stop interposition. Any urllib calls that occur
|
||||
# after deconfigure() are not interposed.
|
||||
tuf.interposition.deconfigure(configuration)
|
||||
|
||||
```
|
||||
|
||||
### Option two
|
||||
## Option 2
|
||||
|
||||
```python
|
||||
@tuf.interposition.open_url
|
||||
|
|
@ -30,6 +58,12 @@ def instancemethod(self, url, ...):
|
|||
...
|
||||
```
|
||||
|
||||
|
||||
Note: tuf.interposition.refresh(configuration) may be called to force a
|
||||
refresh of the TUF metadata. Interposition normally performs a refresh of TUF
|
||||
metadata when configure() is called.
|
||||
|
||||
|
||||
## Configuration
|
||||
|
||||
A *configuration* is simply a JSON object which tells `tuf.interposition` which
|
||||
|
|
|
|||
154
tuf/interposition/__init__.py
Normal file → Executable file
154
tuf/interposition/__init__.py
Normal file → Executable file
|
|
@ -1,16 +1,45 @@
|
|||
"""
|
||||
<Program Name>
|
||||
__init__.py
|
||||
|
||||
<Author>
|
||||
Trishank Kuppusamy.
|
||||
Pankhuri Goyal <pankhurigoyal02@gmail.com>
|
||||
|
||||
<Started>
|
||||
|
||||
<Copyright>
|
||||
See LICENSE for licensing information.
|
||||
|
||||
<Purpose>
|
||||
TODO: Add pros / cons of using interposition. Also, should we move code
|
||||
here to its own module (instead of __init__.py)?
|
||||
"""
|
||||
|
||||
# Help with Python 3 compatibility, where the print statement is a function, an
|
||||
# implicit relative import is invalid, and the '/' operator performs true
|
||||
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
|
||||
from __future__ import print_function
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import unicode_literals
|
||||
|
||||
|
||||
import functools
|
||||
import imp
|
||||
import json
|
||||
import socket
|
||||
import urllib
|
||||
import urllib2
|
||||
import logging
|
||||
|
||||
import tuf.log
|
||||
import tuf._vendor.six as six
|
||||
|
||||
# We import them directly into our namespace so that there is no name conflict.
|
||||
from configuration import ConfigurationParser, InvalidConfiguration
|
||||
from utility import Logger
|
||||
from updater import UpdaterController
|
||||
# We import the following directly into our namespace so that there is no name
|
||||
# conflict.
|
||||
from tuf.interposition.configuration import ConfigurationParser
|
||||
from tuf.interposition.updater import UpdaterController
|
||||
|
||||
logger = logging.getLogger('tuf.interposition.__init__')
|
||||
|
||||
# Export nothing when: from tuf.interposition import *
|
||||
__all__ = []
|
||||
|
|
@ -19,10 +48,12 @@
|
|||
# TODO:
|
||||
# - Document design decisions.
|
||||
# - Interposition: Honour urllib/urllib2 contract.
|
||||
# - Review security issues resulting from regular expressions (e.g. complexity attacks).
|
||||
# - Review security issues resulting from regular expressions
|
||||
# (e.g. complexity attacks).
|
||||
# - Warn user when TUF is used without any configuration.
|
||||
# - Override other default (e.g. HTTPS) urllib2 handlers.
|
||||
# - Failsafe: If TUF fails, offer option to unsafely resort back to urllib/urllib2?
|
||||
# - Failsafe: If TUF fails, offer option to unsafely resort back to
|
||||
# urllib/urllib2?
|
||||
|
||||
|
||||
|
||||
|
|
@ -59,39 +90,53 @@
|
|||
|
||||
|
||||
def __monkey_patch():
|
||||
"""Build and monkey patch public copies of the urllib and urllib2 modules.
|
||||
"""
|
||||
Build and monkey patch public copies of the urllib and urllib2 modules.
|
||||
|
||||
We prefer simplicity, which leads to easier proof of security, even if it may
|
||||
come at the cost of not honouring some provisions of the urllib and urllib2
|
||||
module contracts unrelated to security.
|
||||
We prefer simplicity, which leads to easier proof of security, even if it may
|
||||
come at the cost of not honouring some provisions of the urllib and urllib2
|
||||
module contracts unrelated to security.
|
||||
|
||||
References:
|
||||
http://stackoverflow.com/a/11285504
|
||||
http://docs.python.org/2/library/imp.html"""
|
||||
References:
|
||||
http://stackoverflow.com/a/11285504
|
||||
http://docs.python.org/2/library/imp.html
|
||||
"""
|
||||
|
||||
global urllib_tuf
|
||||
global urllib2_tuf
|
||||
|
||||
if urllib_tuf is None:
|
||||
urllib_module_name = 'urllib'
|
||||
if six.PY3:
|
||||
urllib_module_name = 'urllib/request'
|
||||
|
||||
try:
|
||||
module_file, pathname, description = imp.find_module("urllib")
|
||||
module_file, pathname, description = imp.find_module(urllib_module_name)
|
||||
urllib_tuf = \
|
||||
imp.load_module( "urllib_tuf", module_file, pathname, description)
|
||||
imp.load_module('urllib_tuf', module_file, pathname, description)
|
||||
module_file.close()
|
||||
|
||||
except:
|
||||
raise
|
||||
|
||||
else:
|
||||
urllib_tuf.urlopen = __urllib_urlopen
|
||||
urllib_tuf.urlretrieve = __urllib_urlretrieve
|
||||
|
||||
if urllib2_tuf is None:
|
||||
urllib2_module_name = 'urllib2'
|
||||
if six.PY3:
|
||||
urllib2_module_name = 'urllib/request'
|
||||
|
||||
try:
|
||||
module_file, pathname, description = imp.find_module("urllib2")
|
||||
module_file, pathname, description = imp.find_module(urllib2_module_name)
|
||||
urllib2_tuf = \
|
||||
imp.load_module( "urllib2_tuf", module_file, pathname, description)
|
||||
imp.load_module('urllib2_tuf', module_file, pathname, description)
|
||||
module_file.close()
|
||||
|
||||
except:
|
||||
raise
|
||||
|
||||
else:
|
||||
urllib2_tuf.urlopen = __urllib2_urlopen
|
||||
|
||||
|
|
@ -105,7 +150,8 @@ def __urllib_urlopen(url, data=None, proxies=None):
|
|||
updater = __updater_controller.get(url)
|
||||
|
||||
if updater is None:
|
||||
return urllib.urlopen(url, data=data, proxies=proxies)
|
||||
return six.moves.urllib.request.urlopen(url, data=data, proxies=proxies)
|
||||
|
||||
else:
|
||||
return updater.open(url, data=data)
|
||||
|
||||
|
|
@ -119,7 +165,8 @@ def __urllib_urlretrieve(url, filename=None, reporthook=None, data=None):
|
|||
updater = __updater_controller.get(url)
|
||||
|
||||
if updater is None:
|
||||
return urllib.urlretrieve(url, filename=filename, reporthook=reporthook, data=data)
|
||||
return six.moves.urllib.request.urlretrieve(url, filename=filename, reporthook=reporthook, data=data)
|
||||
|
||||
else:
|
||||
return updater.retrieve(url, filename=filename, reporthook=reporthook, data=data)
|
||||
|
||||
|
|
@ -136,27 +183,32 @@ def __urllib2_urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT):
|
|||
updater = None
|
||||
|
||||
# If this is a urllib2.Request...
|
||||
if isinstance(url, urllib2.Request):
|
||||
if isinstance(url, six.moves.urllib.request.Request):
|
||||
# If this is a GET HTTP method...
|
||||
if url.get_method() == "GET":
|
||||
# ...then you should check with TUF.
|
||||
updater = __updater_controller.get(url.get_full_url())
|
||||
|
||||
else:
|
||||
# ...otherwise, revert to default behaviour.
|
||||
Logger.warn(NON_GET_HTTP_METHOD_MESSAGE.format(method=url.get_method(),
|
||||
logger.warn(NON_GET_HTTP_METHOD_MESSAGE.format(method=url.get_method(),
|
||||
url=url.get_full_url()))
|
||||
return urllib2.urlopen(url, data=data, timeout=timeout)
|
||||
return six.moves.urllib.request.urlopen(url, data=data, timeout=timeout)
|
||||
|
||||
else:
|
||||
# ...otherwise, we assume this is a string.
|
||||
updater = __updater_controller.get(url)
|
||||
|
||||
if updater is None:
|
||||
return urllib2.urlopen(url, data=data, timeout=timeout)
|
||||
return six.moves.urllib.request.urlopen(url, data=data, timeout=timeout)
|
||||
|
||||
else:
|
||||
response = updater.open(url, data=data)
|
||||
|
||||
# See urllib2.AbstractHTTPHandler.do_open
|
||||
# TODO: let Updater handle this
|
||||
response.msg = ""
|
||||
|
||||
return response
|
||||
|
||||
|
||||
|
|
@ -173,12 +225,13 @@ def __read_configuration(configuration_handler,
|
|||
parent_ssl_certificates_directory=None):
|
||||
"""
|
||||
A generic function to read TUF interposition configurations off a file, and
|
||||
then handle those configurations with a given function. configuration_handler
|
||||
must be a function which accepts a tuf.interposition.Configuration
|
||||
instance.
|
||||
then handle those configurations with a given function.
|
||||
configuration_handler must be a function which accepts a
|
||||
tuf.interposition.Configuration instance.
|
||||
|
||||
Returns the parsed configurations as a dictionary of configurations indexed
|
||||
by hostnames."""
|
||||
by hostnames.
|
||||
"""
|
||||
|
||||
INVALID_TUF_CONFIGURATION = "Invalid configuration for {network_location}!"
|
||||
INVALID_TUF_INTERPOSITION_JSON = "Invalid configuration in {filename}!"
|
||||
|
|
@ -193,34 +246,34 @@ def __read_configuration(configuration_handler,
|
|||
configurations = tuf_interpositions.get("configurations", {})
|
||||
|
||||
if len(configurations) == 0:
|
||||
raise InvalidConfiguration(NO_CONFIGURATIONS.format(filename=filename))
|
||||
raise tuf.InvalidConfigurationError(NO_CONFIGURATIONS.format(filename=filename))
|
||||
|
||||
else:
|
||||
for network_location, configuration in configurations.iteritems():
|
||||
for network_location, configuration in six.iteritems(configurations):
|
||||
try:
|
||||
configuration_parser = ConfigurationParser(network_location,
|
||||
configuration, parent_repository_directory=parent_repository_directory,
|
||||
parent_ssl_certificates_directory=parent_ssl_certificates_directory)
|
||||
|
||||
|
||||
# configuration_parser.parse() returns a
|
||||
# 'tuf.interposition.Configuration' object, which interposition
|
||||
# uses to determine which URLs should be interposed.
|
||||
configuration = configuration_parser.parse()
|
||||
configuration_handler(configuration)
|
||||
parsed_configurations[configuration.hostname] = configuration
|
||||
|
||||
except:
|
||||
Logger.exception(INVALID_TUF_CONFIGURATION.format(network_location=network_location))
|
||||
logger.exception(INVALID_TUF_CONFIGURATION.format(network_location=network_location))
|
||||
raise
|
||||
|
||||
except:
|
||||
Logger.exception(INVALID_TUF_INTERPOSITION_JSON.format(filename=filename))
|
||||
logger.exception(INVALID_TUF_INTERPOSITION_JSON.format(filename=filename))
|
||||
raise
|
||||
|
||||
else:
|
||||
return parsed_configurations
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
# TODO: Is parent_repository_directory a security risk? For example, would it
|
||||
# allow the user to overwrite another TUF repository metadata on the filesystem?
|
||||
# On the other hand, it is beyond TUF's scope to handle filesystem permissions.
|
||||
|
|
@ -230,7 +283,8 @@ def configure(filename="tuf.interposition.json",
|
|||
parent_repository_directory=None,
|
||||
parent_ssl_certificates_directory=None):
|
||||
|
||||
"""The optional parent_repository_directory parameter is used to specify the
|
||||
"""
|
||||
The optional parent_repository_directory parameter is used to specify the
|
||||
containing parent directory of the "repository_directory" specified in a
|
||||
configuration for *all* network locations, because sometimes the absolute
|
||||
location of the "repository_directory" is only known at runtime. If you
|
||||
|
|
@ -272,7 +326,8 @@ def configure(filename="tuf.interposition.json",
|
|||
optional; it must specify certificates bundled as PEM (RFC 1422).
|
||||
|
||||
Returns the parsed configurations as a dictionary of configurations indexed
|
||||
by hostnames."""
|
||||
by hostnames.
|
||||
"""
|
||||
|
||||
configurations = \
|
||||
__read_configuration(__updater_controller.add, filename=filename,
|
||||
|
|
@ -296,7 +351,7 @@ def refresh(configurations):
|
|||
# Although interposition was designed to remain transparent, for software
|
||||
# updaters that require an explicit refresh of top-level metadata, this
|
||||
# method is provided.
|
||||
for configuration in configurations.itervalues():
|
||||
for configuration in six.itervalues(configurations):
|
||||
__updater_controller.refresh(configuration)
|
||||
|
||||
|
||||
|
|
@ -306,7 +361,7 @@ def refresh(configurations):
|
|||
def deconfigure(configurations):
|
||||
"""Remove TUF interposition for previously read configurations."""
|
||||
|
||||
for configuration in configurations.itervalues():
|
||||
for configuration in six.itervalues(configurations):
|
||||
__updater_controller.remove(configuration)
|
||||
|
||||
|
||||
|
|
@ -314,8 +369,10 @@ def deconfigure(configurations):
|
|||
|
||||
|
||||
def open_url(instancemethod):
|
||||
"""Decorate an instance method of the form
|
||||
instancemethod(self, url, ...) with me in order to pass it to TUF."""
|
||||
"""
|
||||
Decorate an instance method of the form
|
||||
instancemethod(self, url, ...) with me in order to pass it to TUF.
|
||||
"""
|
||||
|
||||
@functools.wraps(instancemethod)
|
||||
def wrapper(self, *args, **kwargs):
|
||||
|
|
@ -325,16 +382,18 @@ def wrapper(self, *args, **kwargs):
|
|||
data = kwargs.get("data")
|
||||
|
||||
# If this is a urllib2.Request...
|
||||
if isinstance(url_object, urllib2.Request):
|
||||
if isinstance(url_object, six.moves.request.Request):
|
||||
# If this is a GET HTTP method...
|
||||
if url_object.get_method() == "GET":
|
||||
# ...then you should check with TUF.
|
||||
url = url_object.get_full_url()
|
||||
|
||||
else:
|
||||
# ...otherwise, revert to default behaviour.
|
||||
Logger.warn(NON_GET_HTTP_METHOD_MESSAGE.format(method=url_object.get_method(),
|
||||
logger.warn(NON_GET_HTTP_METHOD_MESSAGE.format(method=url_object.get_method(),
|
||||
url=url_object.get_full_url()))
|
||||
return instancemethod(self, *args, **kwargs)
|
||||
|
||||
# ...otherwise, we assume this is a string.
|
||||
else:
|
||||
url = url_object
|
||||
|
|
@ -345,6 +404,7 @@ def wrapper(self, *args, **kwargs):
|
|||
if updater is None:
|
||||
# ...then revert to default behaviour.
|
||||
return instancemethod(self, *args, **kwargs)
|
||||
|
||||
else:
|
||||
# ...otherwise, use TUF to get this document.
|
||||
return updater.open(url, data=data)
|
||||
|
|
@ -364,7 +424,3 @@ def wrapper(self, *args, **kwargs):
|
|||
# Build and monkey patch public copies of the urllib and urllib2 modules.
|
||||
__monkey_patch()
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
|
|
|
|||
256
tuf/interposition/configuration.py
Normal file → Executable file
256
tuf/interposition/configuration.py
Normal file → Executable file
|
|
@ -1,38 +1,71 @@
|
|||
"""
|
||||
<Program Name>
|
||||
configuration.py
|
||||
|
||||
<Author>
|
||||
Trishank Kuppusamy
|
||||
Pankhuri Goyal <pankhurigoyal02@gmail.com>
|
||||
Vladimir Diaz <vladimir.v.diaz@gmail.com>
|
||||
|
||||
<Started>
|
||||
|
||||
<Copyright>
|
||||
See LICENSE for licensing information.
|
||||
|
||||
<Purpose>
|
||||
|
||||
"""
|
||||
|
||||
# Help with Python 3 compatibility where the print statement is a function, an
|
||||
# implicit relative import is invalid, and the '/' operator performs true
|
||||
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
|
||||
from __future__ import print_function
|
||||
from __future__ import absolute_import
|
||||
from __future__ import division
|
||||
from __future__ import unicode_literals
|
||||
|
||||
import os.path
|
||||
import types
|
||||
import urlparse
|
||||
|
||||
|
||||
# We import them directly into our namespace so that there is no name conflict.
|
||||
from utility import Logger, InterpositionException
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
################################ GLOBAL CLASSES ################################
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class InvalidConfiguration(InterpositionException):
|
||||
"""User configuration is invalid."""
|
||||
pass
|
||||
|
||||
import logging
|
||||
|
||||
import tuf.log
|
||||
import tuf._vendor.six as six
|
||||
|
||||
logger = logging.getLogger('tuf.interposition.configuration')
|
||||
|
||||
|
||||
class Configuration(object):
|
||||
"""Holds TUF interposition configuration information about a network
|
||||
location which is important to an updater for that network location."""
|
||||
|
||||
"""
|
||||
<Purpose>
|
||||
Holds TUF interposition configuration information about a network
|
||||
location which is important to an updater for that network location.
|
||||
"""
|
||||
|
||||
def __init__(self, hostname, port, repository_directory, repository_mirrors,
|
||||
target_paths, ssl_certificates):
|
||||
|
||||
"""Constructor assumes that its parameters are valid."""
|
||||
"""
|
||||
<Purpose>
|
||||
Constructor assumes that its parameters are valid.
|
||||
|
||||
<Arguments>
|
||||
hostname:
|
||||
|
||||
port:
|
||||
|
||||
repository_directory:
|
||||
|
||||
repository_mirrors:
|
||||
|
||||
target_paths:
|
||||
|
||||
ssl_certificates:
|
||||
|
||||
<Exceptions>
|
||||
|
||||
<Side Effects>
|
||||
|
||||
<Returns>
|
||||
"""
|
||||
|
||||
self.hostname = hostname
|
||||
self.port = port
|
||||
|
|
@ -50,8 +83,19 @@ def __repr__(self):
|
|||
|
||||
|
||||
def get_repository_mirror_hostnames(self):
|
||||
"""Get a set of hostnames of every repository mirror of this
|
||||
configuration."""
|
||||
"""
|
||||
<Purpose>
|
||||
Get a set of hostnames of every repository mirror of this configuration.
|
||||
|
||||
<Arguments>
|
||||
None.
|
||||
|
||||
<Exceptions>
|
||||
|
||||
<Side Effects>
|
||||
|
||||
<Returns>
|
||||
"""
|
||||
|
||||
# Parse TUF server repository mirrors.
|
||||
repository_mirrors = self.repository_mirrors
|
||||
|
|
@ -59,10 +103,14 @@ def get_repository_mirror_hostnames(self):
|
|||
|
||||
for repository_mirror in repository_mirrors:
|
||||
mirror_configuration = repository_mirrors[repository_mirror]
|
||||
|
||||
url_prefix = mirror_configuration["url_prefix"]
|
||||
parsed_url = urlparse.urlparse(url_prefix)
|
||||
parsed_url = six.moves.urllib.parse.urlparse(url_prefix)
|
||||
mirror_hostname = parsed_url.hostname
|
||||
repository_mirror_hostnames.add(mirror_hostname)
|
||||
mirror_port = parsed_url.port
|
||||
mirror_network_location = \
|
||||
"{hostname}:{port}".format(hostname=mirror_hostname, port = mirror_port)
|
||||
repository_mirror_hostnames.add(mirror_network_location)
|
||||
|
||||
return repository_mirror_hostnames
|
||||
|
||||
|
|
@ -71,14 +119,36 @@ def get_repository_mirror_hostnames(self):
|
|||
|
||||
|
||||
class ConfigurationParser(object):
|
||||
"""Parses TUF interposition configuration information about a network
|
||||
location, stored as a JSON object, and returns it as a Configuration."""
|
||||
"""
|
||||
<Purpose>
|
||||
Parses TUF interposition configuration information about a network
|
||||
location, stored as a JSON object, and returns it as a Configuration.
|
||||
"""
|
||||
|
||||
|
||||
def __init__(self, network_location, configuration,
|
||||
parent_repository_directory=None,
|
||||
parent_ssl_certificates_directory=None):
|
||||
"""
|
||||
<Purpose>
|
||||
|
||||
<Arguments>
|
||||
network_location:
|
||||
|
||||
configuration:
|
||||
|
||||
parent_repository_directory:
|
||||
|
||||
parent_ssl_certificates_directory:
|
||||
|
||||
<Exceptions>
|
||||
|
||||
<Side Effects>
|
||||
|
||||
<Returns>
|
||||
None.
|
||||
"""
|
||||
|
||||
self.network_location = network_location
|
||||
self.configuration = configuration
|
||||
self.parent_repository_directory = parent_repository_directory
|
||||
|
|
@ -86,7 +156,20 @@ def __init__(self, network_location, configuration,
|
|||
|
||||
|
||||
def get_network_location(self):
|
||||
"""Check network location."""
|
||||
"""
|
||||
<Purpose>
|
||||
Check network location.
|
||||
|
||||
<Arguments>
|
||||
None.
|
||||
|
||||
<Exceptions>
|
||||
|
||||
<Side Effects>
|
||||
|
||||
<Returns>
|
||||
|
||||
"""
|
||||
|
||||
INVALID_NETWORK_LOCATION = "Invalid network location {network_location}!"
|
||||
|
||||
|
|
@ -97,14 +180,27 @@ def get_network_location(self):
|
|||
if len(network_location_tokens) > 1:
|
||||
port = int(network_location_tokens[1], 10)
|
||||
if port <= 0 or port >= 2**16:
|
||||
raise InvalidConfiguration(INVALID_NETWORK_LOCATION.format(
|
||||
raise tuf.InvalidConfigurationError(INVALID_NETWORK_LOCATION.format(
|
||||
network_location=self.network_location))
|
||||
|
||||
return hostname, port
|
||||
|
||||
|
||||
def get_repository_directory(self):
|
||||
"""Locate TUF client metadata repository."""
|
||||
"""
|
||||
<Purpose>
|
||||
Locate TUF client metadata repository.
|
||||
|
||||
<Arguments>
|
||||
None.
|
||||
|
||||
<Exceptions>
|
||||
|
||||
<Side Effects>
|
||||
|
||||
<Returns>
|
||||
|
||||
"""
|
||||
|
||||
INVALID_PARENT_REPOSITORY_DIRECTORY = \
|
||||
"Invalid parent_repository_directory for {network_location}!"
|
||||
|
|
@ -121,14 +217,27 @@ def get_repository_directory(self):
|
|||
# TODO: assert os.path.isdir(repository_directory)
|
||||
|
||||
else:
|
||||
raise InvalidConfiguration(INVALID_PARENT_REPOSITORY_DIRECTORY.format(
|
||||
raise tuf.InvalidConfigurationError(INVALID_PARENT_REPOSITORY_DIRECTORY.format(
|
||||
network_location=self.network_location))
|
||||
|
||||
return repository_directory
|
||||
|
||||
|
||||
def get_ssl_certificates(self):
|
||||
"""Get any PEM certificate bundle."""
|
||||
"""
|
||||
<Purpose>
|
||||
Get any PEM certificate bundle.
|
||||
|
||||
<Arguments>
|
||||
None.
|
||||
|
||||
<Exceptions>
|
||||
|
||||
<Side Effects>
|
||||
|
||||
<Returns>
|
||||
|
||||
"""
|
||||
|
||||
INVALID_SSL_CERTIFICATES = \
|
||||
"Invalid ssl_certificates for {network_location}!"
|
||||
|
|
@ -147,11 +256,11 @@ def get_ssl_certificates(self):
|
|||
ssl_certificates)
|
||||
|
||||
if not os.path.isfile(ssl_certificates):
|
||||
raise InvalidConfiguration(INVALID_SSL_CERTIFICATES.format(
|
||||
raise tuf.InvalidConfigurationError(INVALID_SSL_CERTIFICATES.format(
|
||||
network_location=self.network_location))
|
||||
|
||||
else:
|
||||
raise InvalidConfiguration(
|
||||
raise tuf.InvalidConfigurationError(
|
||||
INVALID_PARENT_SSL_CERTIFICATES_DIRECTORY.format(
|
||||
network_location=self.network_location))
|
||||
|
||||
|
|
@ -159,7 +268,24 @@ def get_ssl_certificates(self):
|
|||
|
||||
|
||||
def get_repository_mirrors(self, hostname, port, ssl_certificates):
|
||||
"""Parse TUF server repository mirrors."""
|
||||
"""
|
||||
<Purpose>
|
||||
Parse TUF server repository mirrors.
|
||||
|
||||
<Arguments>
|
||||
hostname:
|
||||
|
||||
port:
|
||||
|
||||
ssl_certificates:
|
||||
|
||||
<Exceptions>
|
||||
|
||||
<Side Effects>
|
||||
|
||||
<Returns>
|
||||
|
||||
"""
|
||||
|
||||
INVALID_REPOSITORY_MIRROR = "Invalid repository mirror {repository_mirror}!"
|
||||
|
||||
|
|
@ -171,7 +297,7 @@ def get_repository_mirrors(self, hostname, port, ssl_certificates):
|
|||
|
||||
try:
|
||||
url_prefix = mirror_configuration["url_prefix"]
|
||||
parsed_url = urlparse.urlparse(url_prefix)
|
||||
parsed_url = six.moves.urllib.parse.urlparse(url_prefix)
|
||||
mirror_hostname = parsed_url.hostname
|
||||
mirror_port = parsed_url.port or 80
|
||||
mirror_scheme = parsed_url.scheme
|
||||
|
|
@ -199,18 +325,29 @@ def get_repository_mirrors(self, hostname, port, ssl_certificates):
|
|||
except:
|
||||
error_message = \
|
||||
INVALID_REPOSITORY_MIRROR.format(repository_mirror=repository_mirror)
|
||||
Logger.exception(error_message)
|
||||
raise InvalidConfiguration(error_message)
|
||||
logger.exception(error_message)
|
||||
raise tuf.InvalidConfigurationError(error_message)
|
||||
|
||||
return repository_mirrors
|
||||
|
||||
|
||||
def get_target_paths(self):
|
||||
"""
|
||||
Within a network_location, we match URLs with this list of regular
|
||||
expressions, which tell us to map from a source URL to a target URL.
|
||||
If there are multiple regular expressions which match a source URL,
|
||||
the order of appearance will be used to resolve ambiguity.
|
||||
<Purpose>
|
||||
Within a network_location, we match URLs with this list of regular
|
||||
expressions, which tell us to map from a source URL to a target URL.
|
||||
If there are multiple regular expressions which match a source URL,
|
||||
the order of appearance will be used to resolve ambiguity.
|
||||
|
||||
<Arguments>
|
||||
None.
|
||||
|
||||
<Exceptions>
|
||||
|
||||
<Side Effects>
|
||||
|
||||
<Returns>
|
||||
|
||||
"""
|
||||
|
||||
INVALID_TARGET_PATH = "Invalid target path in {network_location}!"
|
||||
|
|
@ -221,27 +358,40 @@ def get_target_paths(self):
|
|||
target_paths = self.configuration.get("target_paths", [WILD_TARGET_PATH])
|
||||
|
||||
# target_paths: [ target_path, ... ]
|
||||
assert isinstance(target_paths, types.ListType)
|
||||
assert isinstance(target_paths, list)
|
||||
|
||||
for target_path in target_paths:
|
||||
try:
|
||||
# target_path: { "regex_with_groups", "target_with_group_captures" }
|
||||
# e.g. { ".*(/some/directory)/$", "{0}/index.html" }
|
||||
assert isinstance(target_path, types.DictType)
|
||||
assert isinstance(target_path, dict)
|
||||
assert len(target_path) == 1
|
||||
|
||||
except:
|
||||
error_message = \
|
||||
INVALID_TARGET_PATH.format(network_location=self.network_location)
|
||||
Logger.exception(error_message)
|
||||
raise InvalidConfiguration(error_message)
|
||||
logger.exception(error_message)
|
||||
raise tuf.InvalidConfigurationError(error_message)
|
||||
|
||||
return target_paths
|
||||
|
||||
|
||||
# TODO: more input sanity checks?
|
||||
def parse(self):
|
||||
"""Parse, check and get the required configuration parameters."""
|
||||
"""
|
||||
<Purpose>
|
||||
Parse, check, and get the required configuration parameters.
|
||||
|
||||
<Arguments>
|
||||
None.
|
||||
|
||||
<Exceptions>
|
||||
|
||||
<Side Effects>
|
||||
|
||||
<Returns>
|
||||
|
||||
"""
|
||||
|
||||
hostname, port = self.get_network_location()
|
||||
ssl_certificates = self.get_ssl_certificates()
|
||||
|
|
@ -252,5 +402,5 @@ def parse(self):
|
|||
self.get_repository_mirrors(hostname, port, ssl_certificates)
|
||||
|
||||
# If everything passes, we return a Configuration.
|
||||
return Configuration(hostname, port, repository_directory, repository_mirrors,
|
||||
target_paths, ssl_certificates)
|
||||
return Configuration(hostname, port, repository_directory,
|
||||
repository_mirrors, target_paths, ssl_certificates)
|
||||
|
|
|
|||
978
tuf/interposition/updater.py
Normal file → Executable file
978
tuf/interposition/updater.py
Normal file → Executable file
File diff suppressed because it is too large
Load diff
|
|
@ -1,43 +0,0 @@
|
|||
import logging
|
||||
|
||||
|
||||
# Import our standard logger for its side effects.
|
||||
import tuf.log
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class InterpositionException(Exception):
|
||||
"""Base exception class."""
|
||||
pass
|
||||
|
||||
|
||||
|
||||
|
||||
|
||||
class Logger(object):
|
||||
"""A static logging object for tuf.interposition."""
|
||||
|
||||
|
||||
__logger = logging.getLogger("tuf.interposition")
|
||||
|
||||
|
||||
@staticmethod
|
||||
def debug(message):
|
||||
Logger.__logger.debug(message)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def exception(message):
|
||||
Logger.__logger.exception(message)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def info(message):
|
||||
Logger.__logger.info(message)
|
||||
|
||||
|
||||
@staticmethod
|
||||
def warn(message):
|
||||
Logger.__logger.warn(message)
|
||||
|
|
@ -164,9 +164,9 @@ def add_key(key_dict, keyid=None):
|
|||
# Raise 'tuf.FormatError' if the check fails.
|
||||
tuf.formats.KEYID_SCHEMA.check_match(keyid)
|
||||
|
||||
# Check if the keyid found in 'rsakey_dict' matches 'keyid'.
|
||||
# Check if the keyid found in 'key_dict' matches 'keyid'.
|
||||
if keyid != key_dict['keyid']:
|
||||
raise tuf.Error('Incorrect keyid '+key_dict['keyid']+' expected '+keyid)
|
||||
raise tuf.Error('Incorrect keyid ' + key_dict['keyid'] + ' expected ' + keyid)
|
||||
|
||||
# Check if the keyid belonging to 'rsakey_dict' is not already
|
||||
# available in the key database before returning.
|
||||
|
|
|
|||
|
|
@ -560,7 +560,7 @@ def find_delegated_role(roles, delegated_role):
|
|||
|
||||
# This role has a different name.
|
||||
else:
|
||||
continue
|
||||
logger.debug('Skipping delegated role: ' + repr(delegated_role))
|
||||
|
||||
return role_index
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue