Merge pull request #1198 from MVrachev/threads-port-generation

Tests: Use Queue for process communication which replaces tmp files and use OS for port creation
This commit is contained in:
Joshua Lock 2020-11-23 15:40:19 +00:00 committed by GitHub
commit 2aae0bad69
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
22 changed files with 466 additions and 321 deletions

25
tests/fast_server_exit.py Normal file
View file

@ -0,0 +1,25 @@
#!/usr/bin/env python
# Copyright 2020, TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""
<Program Name>
fast_server_exit.py
<Author>
Martin Vrachev.
<Started>
October 29, 2020.
<Copyright>
See LICENSE-MIT OR LICENSE for licensing information.
<Purpose>
Used for tests in tests/test_utils.py.
"""
import sys
sys.exit(0)

View file

@ -462,25 +462,21 @@ def test(HandlerClass=ProxyRequestHandler, ServerClass=ThreadingHTTPServer, prot
global INTERCEPT
global TARGET_SERVER_CA_FILEPATH
if sys.argv[1:]:
port = int(sys.argv[1])
else:
port = 8080
server_address = ('localhost', port)
server_address = ('localhost', 0)
# MODIFIED: Argument added, conditional below added to control INTERCEPT
# setting.
if len(sys.argv) > 2:
if sys.argv[2].lower() == 'intercept':
if len(sys.argv) > 1:
if sys.argv[1].lower() == 'intercept':
INTERCEPT = True
# MODIFIED: Argument added to control certificate(s) the proxy expects of
# the target server(s), and added default value.
if len(sys.argv) > 3:
if os.path.exists(sys.argv[3]):
TARGET_SERVER_CA_FILEPATH = sys.argv[3]
if len(sys.argv) > 2:
if os.path.exists(sys.argv[2]):
TARGET_SERVER_CA_FILEPATH = sys.argv[2]
else:
raise Exception('Target server cert file not found: ' + sys.argv[3])
raise Exception('Target server cert file not found: ' + sys.argv[2])
# MODIFIED: Create the target-host-specific proxy certificates directory if
# it doesn't already exist.
@ -490,11 +486,13 @@ def test(HandlerClass=ProxyRequestHandler, ServerClass=ThreadingHTTPServer, prot
HandlerClass.protocol_version = protocol
httpd = ServerClass(server_address, HandlerClass)
sa = httpd.socket.getsockname()
print "Serving HTTP Proxy on", sa[0], "port", sa[1], "..."
port_message = 'bind succeeded, server port is: ' + str(sa[1])
print(port_message)
print("Serving HTTP Proxy on", sa[0], "port", sa[1], "...")
httpd.serve_forever()
if __name__ == '__main__':
test()

Binary file not shown.

View file

@ -38,35 +38,29 @@
from __future__ import unicode_literals
import sys
import random
import ssl
import os
import six
PORT = 0
keyfile = os.path.join('ssl_certs', 'ssl_cert.key')
certfile = os.path.join('ssl_certs', 'ssl_cert.crt')
if len(sys.argv) > 1:
PORT = int(sys.argv[1])
else:
PORT = random.randint(30000, 45000)
if len(sys.argv) > 1 and os.path.exists(sys.argv[1]):
certfile = sys.argv[1]
if len(sys.argv) > 2:
if os.path.exists(sys.argv[2]):
certfile = sys.argv[2]
else:
print('simple_https_server: cert file not found: ' + sys.argv[2] +
'; using default: ' + certfile)
httpd = six.moves.BaseHTTPServer.HTTPServer(('localhost', PORT),
six.moves.SimpleHTTPServer.SimpleHTTPRequestHandler)
httpd = six.moves.BaseHTTPServer.HTTPServer(('localhost', 0),
six.moves.SimpleHTTPServer.SimpleHTTPRequestHandler)
httpd.socket = ssl.wrap_socket(
httpd.socket, keyfile=keyfile, certfile=certfile, server_side=True)
#print('Starting https server on port: ' + str(PORT))
port_message = 'bind succeeded, server port is: ' \
+ str(httpd.server_address[1])
print(port_message)
if len(sys.argv) > 1 and certfile != sys.argv[1]:
print('simple_https_server: cert file was not found: ' + sys.argv[1] +
'; using default: ' + certfile + " certfile")
httpd.serve_forever()

View file

@ -39,14 +39,6 @@
import six
from six.moves.SimpleHTTPServer import SimpleHTTPRequestHandler
PORT = 0
if len(sys.argv) > 1:
PORT = int(sys.argv[1])
else:
PORT = random.randint(30000, 45000)
class QuietHTTPRequestHandler(SimpleHTTPRequestHandler):
"""A SimpleHTTPRequestHandler that does not write incoming requests to
@ -73,6 +65,9 @@ def log_request(self, code='-', size='-'):
# Allow re-use so you can re-run tests as often as you want even if the
# tests re-use ports. Otherwise TCP TIME-WAIT prevents reuse for ~1 minute
six.moves.socketserver.TCPServer.allow_reuse_address = True
httpd = six.moves.socketserver.TCPServer(('', PORT), handler)
httpd = six.moves.socketserver.TCPServer(('localhost', 0), handler)
port_message = 'bind succeeded, server port is: ' \
+ str(httpd.server_address[1])
print(port_message)
httpd.serve_forever()

View file

@ -32,19 +32,10 @@
import os
import sys
import time
import random
import six
# Modify the HTTPServer class to pass the 'test_mode' argument to
# do_GET() function.
class HTTPServer_Test(six.moves.BaseHTTPServer.HTTPServer):
def __init__(self, server_address, Handler, test_mode):
six.moves.BaseHTTPServer.HTTPServer.__init__(self, server_address, Handler)
self.test_mode = test_mode
# HTTP request handler.
class Handler(six.moves.BaseHTTPServer.BaseHTTPRequestHandler):
@ -62,38 +53,21 @@ def do_GET(self):
self.send_header('Content-length', str(len(data)))
self.end_headers()
if self.server.test_mode == 'mode_1':
# Before sending any data, the server does nothing for a long time.
DELAY = 40
time.sleep(DELAY)
self.wfile.write(data)
return
# 'mode_2'
else:
DELAY = 1
# Throttle the file by sending a character every DELAY seconds.
for i in range(len(data)):
self.wfile.write(data[i].encode('utf-8'))
time.sleep(DELAY)
return
# Before sending any data, the server does nothing for a long time.
DELAY = 40
time.sleep(DELAY)
self.wfile.write((data.encode('utf-8')))
except IOError as e:
self.send_error(404, 'File Not Found!')
def run(port, test_mode):
server_address = ('localhost', port)
httpd = HTTPServer_Test(server_address, Handler, test_mode)
httpd.handle_request()
if __name__ == '__main__':
port = int(sys.argv[1])
test_mode = sys.argv[2]
assert test_mode in ('mode_1', 'mode_2')
run(port, test_mode)
server_address = ('localhost', 0)
httpd = six.moves.BaseHTTPServer.HTTPServer(server_address, Handler)
port_message = 'bind succeeded, server port is: ' \
+ str(httpd.server_address[1])
print(port_message)
httpd.serve_forever()

View file

@ -82,7 +82,7 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
# Kills the server subprocess and closes the temp file used for logging.
# Cleans the resources and flush the logged lines (if any).
cls.server_process_handler.clean()
# Remove the temporary repository directory, which should contain all the

View file

@ -90,8 +90,7 @@ def setUp(self):
def tearDown(self):
unittest_toolbox.Modified_TestCase.tearDown(self)
# Logs stdout and stderr from the server subprocess and then it
# kills it and closes the temp file used for logging.
# Cleans the resources and flush the logged lines (if any).
self.server_process_handler.clean()
self.target_fileobj.close()
@ -258,29 +257,29 @@ def test_https_connection(self):
# 4: run with an HTTPS certificate that is expired
# Be sure to offset from the port used in setUp to avoid collision.
port1 = self.server_process_handler.port + 1
port2 = self.server_process_handler.port + 2
port3 = self.server_process_handler.port + 3
port4 = self.server_process_handler.port + 4
good_https_server_handler = utils.TestServerProcess(log=logger,
server='simple_https_server.py', port=port1,
server='simple_https_server.py',
extra_cmd_args=[good_cert_fname])
good2_https_server_handler = utils.TestServerProcess(log=logger,
server='simple_https_server.py', port=port2,
server='simple_https_server.py',
extra_cmd_args=[good2_cert_fname])
bad_https_server_handler = utils.TestServerProcess(log=logger,
server='simple_https_server.py', port=port3,
server='simple_https_server.py',
extra_cmd_args=[bad_cert_fname])
expd_https_server_handler = utils.TestServerProcess(log=logger,
server='simple_https_server.py', port=port4,
server='simple_https_server.py',
extra_cmd_args=[expired_cert_fname])
suffix = '/' + os.path.basename(target_filepath)
good_https_url = 'https://localhost:' + str(port1) + suffix
good2_https_url = 'https://localhost:' + str(port2) + suffix
bad_https_url = 'https://localhost:' + str(port3) + suffix
expired_https_url = 'https://localhost:' + str(port4) + suffix
good_https_url = 'https://localhost:' \
+ str(good_https_server_handler.port) + suffix
good2_https_url = 'https://localhost:' \
+ str(good2_https_server_handler.port) + suffix
bad_https_url = 'https://localhost:' \
+ str(bad_https_server_handler.port) + suffix
expired_https_url = 'https://localhost:' \
+ str(expd_https_server_handler.port) + suffix
# Download the target file using an HTTPS connection.
@ -367,8 +366,7 @@ def test_https_connection(self):
bad_https_server_handler,
expd_https_server_handler]:
# Logs stdout and stderr from the server subprocess and then it
# kills it and closes the temp file used for logging.
# Cleans the resources and flush the logged lines (if any).
proc_handler.clean()

View file

@ -84,7 +84,7 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
# Kills the server subprocess and closes the temp file used for logging.
# Cleans the resources and flush the logged lines (if any).
cls.server_process_handler.clean()
# Remove the temporary repository directory, which should contain all the

View file

@ -88,7 +88,7 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
# Kills the server subprocess and closes the temp file used for logging.
# Cleans the resources and flush the logged lines (if any).
cls.server_process_handler.clean()
# Remove the temporary repository directory, which should contain all the

View file

@ -102,7 +102,7 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
# Kills the server subprocess and closes the temp file used for logging.
# Cleans the resources and flush the logged lines (if any).
cls.server_process_handler.clean()
# Remove the temporary repository directory, which should contain all the

View file

@ -83,7 +83,7 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
# Kills the server subprocess and closes the temp file used for logging.
# Cleans the resources and flush the logged lines (if any).
cls.server_process_handler.clean()
# Remove the temporary repository directory, which should contain all the

View file

@ -38,7 +38,6 @@
import securesystemslib
import securesystemslib.util
import six
class TestMirrors(unittest_toolbox.Modified_TestCase):

View file

@ -87,7 +87,7 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
# Kills the server subprocess and closes the temp file used for logging.
# Cleans the resources and flush the logged lines (if any).
cls.server_process_handler.clean()
# Remove the temporary repository directory, which should contain all the

View file

@ -31,7 +31,6 @@
import os
import tempfile
import random
import logging
import shutil
import unittest
@ -119,35 +118,26 @@ def setUp(self):
# the pre-generated metadata files have a specific structure, such
# as a delegated role 'targets/role1', three target files, five key files,
# etc.
self.SERVER_PORT = random.SystemRandom().randint(30000, 45000)
self.SERVER_PORT2 = random.SystemRandom().randint(30000, 45000)
# Avoid duplicate port numbers, to prevent multiple localhosts from
# listening on the same port.
while self.SERVER_PORT == self.SERVER_PORT2:
self.SERVER_PORT2 = random.SystemRandom().randint(30000, 45000)
# Needed because in some tests simple_server.py cannot be found.
# The reason is that the current working directory
# has been changed when executing a subprocess.
SIMPLE_SERVER_PATH = os.path.join(os.getcwd(), 'simple_server.py')
# Creates a subprocess running server and uses temp file for logging.
# Creates a subprocess running a server.
self.server_process_handler = utils.TestServerProcess(log=logger,
port=self.SERVER_PORT, server=SIMPLE_SERVER_PATH,
popen_cwd=self.repository_directory)
server=SIMPLE_SERVER_PATH, popen_cwd=self.repository_directory)
logger.debug('Server process started.')
# Creates a subprocess running server and uses temp file for logging.
# Creates a subprocess running a server.
self.server_process_handler2 = utils.TestServerProcess(log=logger,
port=self.SERVER_PORT2, server=SIMPLE_SERVER_PATH,
popen_cwd=self.repository_directory2)
server=SIMPLE_SERVER_PATH, popen_cwd=self.repository_directory2)
logger.debug('Server process 2 started.')
url_prefix = 'http://localhost:' + str(self.SERVER_PORT)
url_prefix2 = 'http://localhost:' + str(self.SERVER_PORT2)
url_prefix = 'http://localhost:' + str(self.server_process_handler.port)
url_prefix2 = 'http://localhost:' + str(self.server_process_handler2.port)
self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix,
'metadata_path': 'metadata',
@ -170,8 +160,7 @@ def tearDown(self):
# directories that may have been created during each test case.
unittest_toolbox.Modified_TestCase.tearDown(self)
# Logs stdout and stderr from the server subprocesses and then it
# kills them and closes the temp files used for logging.
# Cleans the resources and flush the logged lines (if any).
self.server_process_handler.clean()
self.server_process_handler2.clean()
@ -265,8 +254,10 @@ def test_repository_tool(self):
# Test the behavior of the multi-repository updater.
map_file = securesystemslib.util.load_json_file(self.map_file)
map_file['repositories'][self.repository_name] = ['http://localhost:' + str(self.SERVER_PORT)]
map_file['repositories'][self.repository_name2] = ['http://localhost:' + str(self.SERVER_PORT2)]
map_file['repositories'][self.repository_name] = ['http://localhost:' \
+ str(self.server_process_handler.port)]
map_file['repositories'][self.repository_name2] = ['http://localhost:' \
+ str(self.server_process_handler2.port)]
with open(self.map_file, 'w') as file_object:
file_object.write(json.dumps(map_file))

View file

@ -87,15 +87,13 @@ def setUpClass(cls):
# Launch an HTTPS server (serves files in the current dir).
cls.https_server_handler = utils.TestServerProcess(log=logger,
server='simple_https_server.py',
port=cls.http_server_handler.port + 1)
server='simple_https_server.py')
# Launch an HTTP proxy server derived from inaz2/proxy2.
# This one is able to handle HTTP CONNECT requests, and so can pass HTTPS
# requests on to the target server.
cls.http_proxy_handler = utils.TestServerProcess(log=logger,
server='proxy_server.py',
port=cls.http_server_handler.port + 2)
server='proxy_server.py')
# Note that the HTTP proxy server's address uses http://, regardless of the
# type of connection used with the target server.
@ -116,9 +114,8 @@ def setUpClass(cls):
# This is only relevant if the proxy is in intercept mode.
good_cert_fpath = os.path.join('ssl_certs', 'ssl_cert.crt')
cls.https_proxy_handler = utils.TestServerProcess(log=logger,
server='proxy_server.py',
port=cls.http_server_handler.port + 3,
extra_cmd_args=['intercept', good_cert_fpath])
server='proxy_server.py', extra_cmd_args=['intercept',
good_cert_fpath])
# Note that the HTTPS proxy server's address uses https://, regardless of
# the type of connection used with the target server.

View file

@ -87,7 +87,7 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
# Kills the server subprocess and closes the temp file used for logging.
# Cleans the resources and flush the logged lines (if any).
cls.server_process_handler.clean()
# Remove the temporary repository directory, which should contain all the

View file

@ -46,8 +46,6 @@
import os
import tempfile
import random
import time
import shutil
import logging
import unittest
@ -68,55 +66,8 @@
repo_tool.disable_console_log_messages()
class TestSlowRetrievalAttack(unittest_toolbox.Modified_TestCase):
@classmethod
def setUpClass(cls):
# Create a temporary directory to store the repository, metadata, and target
# files. 'temporary_directory' must be deleted in TearDownModule() so that
# temporary files are always removed, even when exceptions occur.
cls.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
cls.SERVER_PORT = random.randint(30000, 45000)
@classmethod
def tearDownClass(cls):
# Remove the temporary repository directory, which should contain all the
# metadata, targets, and key files generated of all the test cases.
shutil.rmtree(cls.temporary_directory)
def _start_slow_server(self, mode):
# Launch a SimpleHTTPServer (serves files in the current directory).
# Test cases will request metadata and target files that have been
# pre-generated in 'tuf/tests/repository_data', which will be served by the
# SimpleHTTPServer launched here. The test cases of this unit test assume
# the pre-generated metadata files have a specific structure, such
# as a delegated role 'targets/role1', three target files, five key files,
# etc.
self.server_process_handler = utils.TestServerProcess(log=logger,
server='slow_retrieval_server.py', port=self.SERVER_PORT,
timeout=0, extra_cmd_args=[mode])
logger.info('Slow Retrieval Server process started.')
# NOTE: Following error is raised if a delay is not long enough:
# <urlopen error [Errno 111] Connection refused>
# or, on Windows:
# Failed to establish a new connection: [Errno 111] Connection refused'
# 1s led to occasional failures in automated builds on AppVeyor, so
# increasing this to 3s, sadly.
time.sleep(3)
def _stop_slow_server(self):
# Logs stdout and stderr from the server subprocess and then it
# kills it and closes the temp file used for logging.
self.server_process_handler.clean()
class TestSlowRetrieval(unittest_toolbox.Modified_TestCase):
def setUp(self):
# We are inheriting from custom class.
@ -124,6 +75,11 @@ def setUp(self):
self.repository_name = 'test_repository1'
# Create a temporary directory to store the repository, metadata, and target
# files. 'temporary_directory' must be deleted in TearDownModule() so that
# temporary files are always removed, even when exceptions occur.
self.temporary_directory = tempfile.mkdtemp(dir=os.getcwd())
# Copy the original repository files provided in the test folder so that
# any modifications made to repository files are restricted to the copies.
# The 'repository_data' directory is expected to exist in 'tuf/tests/'.
@ -209,8 +165,14 @@ def setUp(self):
# Set the url prefix required by the 'tuf/client/updater.py' updater.
# 'path/to/tmp/repository' -> 'localhost:8001/tmp/repository'.
repository_basepath = self.repository_directory[len(os.getcwd()):]
url_prefix = \
'http://localhost:' + str(self.SERVER_PORT) + repository_basepath
self.server_process_handler = utils.TestServerProcess(log=logger,
server='slow_retrieval_server.py')
logger.info('Slow Retrieval Server process started.')
url_prefix = 'http://localhost:' \
+ str(self.server_process_handler.port) + repository_basepath
# Setting 'tuf.settings.repository_directory' with the temporary client
# directory copied from the original repository files.
@ -233,19 +195,22 @@ def tearDown(self):
tuf.roledb.clear_roledb(clear_all=True)
tuf.keydb.clear_keydb(clear_all=True)
# Cleans the resources and flush the logged lines (if any).
self.server_process_handler.clean()
# Remove the temporary repository directory, which should contain all the
# metadata, targets, and key files generated of all the test cases.
shutil.rmtree(self.temporary_directory)
def test_with_tuf_mode_1(self):
def test_delay_before_send(self):
# Simulate a slow retrieval attack.
# 'mode_1': When download begins,the server blocks the download for a long
# When download begins,the server blocks the download for a long
# time by doing nothing before it sends the first byte of data.
self._start_slow_server('mode_1')
# Verify that the TUF client detects replayed metadata and refuses to
# continue the update process.
client_filepath = os.path.join(self.client_directory, 'file1.txt')
try:
file1_target = self.repository_updater.get_one_valid_targetinfo('file1.txt')
self.repository_updater.download_target(file1_target, self.client_directory)
@ -264,53 +229,6 @@ def test_with_tuf_mode_1(self):
else:
self.fail('TUF did not prevent a slow retrieval attack.')
finally:
self._stop_slow_server()
# The following test fails as a result of a change to TUF's download code.
# Rather than constructing urllib2 requests, we now use the requests library.
# This solves an HTTPS proxy issue, but has for the moment deprived us of a
# way to prevent certain this kind of slow retrieval attack.
# See conversation in PR: https://github.com/theupdateframework/tuf/pull/781
# TODO: Update download code to resolve the slow retrieval vulnerability.
@unittest.expectedFailure
def test_with_tuf_mode_2(self):
# Simulate a slow retrieval attack.
# 'mode_2': During the download process, the server blocks the download
# by sending just several characters every few seconds.
self._start_slow_server('mode_2')
client_filepath = os.path.join(self.client_directory, 'file1.txt')
original_average_download_speed = tuf.settings.MIN_AVERAGE_DOWNLOAD_SPEED
tuf.settings.MIN_AVERAGE_DOWNLOAD_SPEED = 3
try:
file1_target = self.repository_updater.get_one_valid_targetinfo('file1.txt')
self.repository_updater.download_target(file1_target, self.client_directory)
# Verify that the specific 'tuf.exceptions.SlowRetrievalError' exception is
# raised by each mirror. 'file1.txt' should be large enough to trigger a
# slow retrieval attack, otherwise the expected exception may not be
# consistently raised.
except tuf.exceptions.NoWorkingMirrorError as exception:
for mirror_url, mirror_error in six.iteritems(exception.mirror_errors):
url_prefix = self.repository_mirrors['mirror1']['url_prefix']
url_file = os.path.join(url_prefix, 'targets', 'file1.txt')
# Verify that 'file1.txt' is the culprit.
self.assertEqual(url_file.replace('\\', '/'), mirror_url)
self.assertTrue(isinstance(mirror_error, tuf.exceptions.SlowRetrievalError))
else:
# Another possibility is to check for a successfully downloaded
# 'file1.txt' at this point.
self.fail('TUF did not prevent a slow retrieval attack.')
finally:
self._stop_slow_server()
tuf.settings.MIN_AVERAGE_DOWNLOAD_SPEED = original_average_download_speed
if __name__ == '__main__':

View file

@ -58,6 +58,7 @@
import errno
import sys
import unittest
import json
import tuf
import tuf.exceptions
@ -107,7 +108,7 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
# Kills the server subprocess and closes the temp file used for logging.
# Cleans the resources and flush the logged lines (if any).
cls.server_process_handler.clean()
# Remove the temporary repository directory, which should contain all the
@ -1081,7 +1082,7 @@ def test_6_get_one_valid_targetinfo(self):
# The SimpleHTTPServer started in the setupclass has a tendency to
# timeout in Windows after a few tests.
# Creates a subprocess running server and uses temp file for logging.
# Creates a subprocess running a server.
server_process_handler = utils.TestServerProcess(log=logger,
server=self.SIMPLE_SERVER_PATH)
@ -1205,7 +1206,7 @@ def test_6_get_one_valid_targetinfo(self):
self.repository_updater.get_one_valid_targetinfo,
'/foo/foo1.1.tar.gz')
# Kills the server subprocess and closes the temp file used for logging.
# Cleans the resources and flush the logged lines (if any).
server_process_handler.clean()
@ -1375,7 +1376,7 @@ def test_7_updated_targets(self):
# The SimpleHTTPServer started in the setupclass has a tendency to
# timeout in Windows after a few tests.
# Creates a subprocess running server and uses temp file for logging.
# Creates a subprocess running a server.
server_process_handler = utils.TestServerProcess(log=logger,
server=self.SIMPLE_SERVER_PATH)
@ -1490,7 +1491,7 @@ def test_7_updated_targets(self):
self.repository_updater.updated_targets(all_targets, destination_directory)
self.assertEqual(len(updated_targets), 1)
# Kills the server subprocess and closes the temp file used for logging.
# Cleans the resources and flush the logged lines (if any).
server_process_handler.clean()
@ -1502,7 +1503,7 @@ def test_8_remove_obsolete_targets(self):
# The SimpleHTTPServer started in the setupclass has a tendency to
# timeout in Windows after a few tests.
# Creates a subprocess running server and uses temp file for logging.
# Creates a subprocess running a server.
server_process_handler = utils.TestServerProcess(log=logger,
server=self.SIMPLE_SERVER_PATH)
@ -1593,7 +1594,7 @@ def test_8_remove_obsolete_targets(self):
del self.repository_updater.metadata['previous']['targets']
self.repository_updater.remove_obsolete_targets(destination_directory)
# Kills the server subprocess and closes the temp file used for logging.
# Cleans the resources and flush the logged lines (if any).
server_process_handler.clean()
@ -1863,27 +1864,35 @@ def setUp(self):
# as a delegated role 'targets/role1', three target files, five key files,
# etc.
# The ports are harcoded because the urls to the repositories are harcoded
# in map.json.
self.SERVER_PORT = 30001
self.SERVER_PORT2 = 30002
# Creates a subprocess running server and uses temp file for logging.
# Creates a subprocess running a server.
self.server_process_handler = utils.TestServerProcess(log=logger,
server=self.SIMPLE_SERVER_PATH, port=self.SERVER_PORT,
popen_cwd=self.repository_directory)
server=self.SIMPLE_SERVER_PATH, popen_cwd=self.repository_directory)
logger.debug('Server process started.')
# Creates a subprocess running server and uses temp file for logging.
# Creates a subprocess running a server.
self.server_process_handler2 = utils.TestServerProcess(log=logger,
server=self.SIMPLE_SERVER_PATH, port=self.SERVER_PORT2,
popen_cwd=self.repository_directory2)
server=self.SIMPLE_SERVER_PATH, popen_cwd=self.repository_directory2)
logger.debug('Server process 2 started.')
url_prefix = 'http://localhost:' + str(self.SERVER_PORT)
url_prefix2 = 'http://localhost:' + str(self.SERVER_PORT2)
url_prefix = 'http://localhost:' + str(self.server_process_handler.port)
url_prefix2 = 'http://localhost:' + str(self.server_process_handler2.port)
# We have all of the necessary information for two repository mirrors
# in map.json, except for url prefixes.
# For the url prefixes, we create subprocesses that run a server script.
# In server scripts we get a free port from the OS which is sent
# back to the parent process.
# That's why we dynamically add the ports to the url prefixes
# and changing the content of map.json.
self.map_file_path = os.path.join(self.client_directory, 'map.json')
data = securesystemslib.util.load_json_file(self.map_file_path)
data['repositories']['test_repository1'] = [url_prefix]
data['repositories']['test_repository2'] = [url_prefix2]
with open(self.map_file_path, 'w') as f:
json.dump(data, f)
self.repository_mirrors = {'mirror1': {'url_prefix': url_prefix,
'metadata_path': 'metadata', 'targets_path': 'targets'}}
@ -1914,8 +1923,7 @@ def tearDown(self):
# directories that may have been created during each test case.
unittest_toolbox.Modified_TestCase.tearDown(self)
# Logs stdout and stderr from the server subprocesses and then it
# kills them and closes the temp files used for logging.
# Cleans the resources and flush the logged lines (if any).
self.server_process_handler.clean()
self.server_process_handler2.clean()
@ -1957,14 +1965,12 @@ def test__init__(self):
updater.MultiRepoUpdater, root_filepath)
# Test for a valid instantiation.
map_file = os.path.join(self.client_directory, 'map.json')
multi_repo_updater = updater.MultiRepoUpdater(map_file)
multi_repo_updater = updater.MultiRepoUpdater(self.map_file_path)
def test__target_matches_path_pattern(self):
map_file = os.path.join(self.client_directory, 'map.json')
multi_repo_updater = updater.MultiRepoUpdater(map_file)
multi_repo_updater = updater.MultiRepoUpdater(self.map_file_path)
paths = ['foo*.tgz', 'bar*.tgz', 'file1.txt']
self.assertTrue(
multi_repo_updater._target_matches_path_pattern('bar-1.0.tgz', paths))
@ -1976,8 +1982,7 @@ def test__target_matches_path_pattern(self):
def test_get_valid_targetinfo(self):
map_file = os.path.join(self.client_directory, 'map.json')
multi_repo_updater = updater.MultiRepoUpdater(map_file)
multi_repo_updater = updater.MultiRepoUpdater(self.map_file_path)
# Verify the multi repo updater refuses to save targetinfo if
# required local repositories are missing.
@ -2084,8 +2089,7 @@ def test_get_valid_targetinfo(self):
def test_get_updater(self):
map_file = os.path.join(self.client_directory, 'map.json')
multi_repo_updater = updater.MultiRepoUpdater(map_file)
multi_repo_updater = updater.MultiRepoUpdater(self.map_file_path)
# Test for a non-existent repository name.
self.assertEqual(None, multi_repo_updater.get_updater('bad_repo_name'))

View file

@ -91,7 +91,7 @@ def setUpClass(cls):
@classmethod
def tearDownClass(cls):
# Kills the server subprocess and closes the temp file used for logging.
# Cleans the resources and flush the logged lines (if any).
cls.server_process_handler.clean()
# Remove the temporary repository directory, which should contain all the

153
tests/test_utils.py Normal file
View file

@ -0,0 +1,153 @@
#!/usr/bin/env python
# Copyright 2020, TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
"""
<Program Name>
test_utils.py
<Author>
Martin Vrachev.
<Started>
October 21, 2020.
<Copyright>
See LICENSE-MIT OR LICENSE for licensing information.
<Purpose>
Provide tests for some of the functions in utils.py module.
"""
import os
import logging
import unittest
import socket
import sys
import tuf.unittest_toolbox as unittest_toolbox
import utils
logger = logging.getLogger(__name__)
class TestServerProcess(unittest_toolbox.Modified_TestCase):
def tearDown(self):
# Make sure we are calling clean on existing attribute.
if hasattr(self, 'server_process_handler'):
self.server_process_handler.clean()
def can_connect(self):
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('localhost', self.server_process_handler.port))
return True
except:
return False
finally:
# The process will always enter in finally even we return.
if sock:
sock.close()
def test_simple_server_startup(self):
# Test normal case
self.server_process_handler = utils.TestServerProcess(log=logger)
# Make sure we can connect to the server
self.assertTrue(self.can_connect())
def test_simple_https_server_startup(self):
# Test normal case
good_cert_path = os.path.join('ssl_certs', 'ssl_cert.crt')
self.server_process_handler = utils.TestServerProcess(log=logger,
server='simple_https_server.py', extra_cmd_args=[good_cert_path])
# Make sure we can connect to the server
self.assertTrue(self.can_connect())
self.server_process_handler.clean()
# Test when no cert file is provided
self.server_process_handler = utils.TestServerProcess(log=logger,
server='simple_https_server.py')
# Make sure we can connect to the server
self.assertTrue(self.can_connect())
self.server_process_handler.clean()
# Test with a non existing cert file.
non_existing_cert_path = os.path.join('ssl_certs', 'non_existing.crt')
self.server_process_handler = utils.TestServerProcess(log=logger,
server='simple_https_server.py',
extra_cmd_args=[non_existing_cert_path])
# Make sure we can connect to the server
self.assertTrue(self.can_connect())
@unittest.skipIf(sys.version_info.major != 2, "Test for Python 2.X")
def test_proxy_server_startup(self):
# Test normal case
self.server_process_handler = utils.TestServerProcess(log=logger,
server='proxy_server.py')
# Make sure we can connect to the server.
self.assertTrue(self.can_connect())
self.server_process_handler.clean()
# Test start proxy_server using certificate files.
good_cert_fpath = os.path.join('ssl_certs', 'ssl_cert.crt')
self.server_process_handler = utils.TestServerProcess(log=logger,
server='proxy_server.py', extra_cmd_args=['intercept',
good_cert_fpath])
# Make sure we can connect to the server.
self.assertTrue(self.can_connect())
self.server_process_handler.clean()
# Test with a non existing cert file.
non_existing_cert_path = os.path.join('ssl_certs', 'non_existing.crt')
self.server_process_handler = utils.TestServerProcess(log=logger,
server='proxy_server.py', extra_cmd_args=[non_existing_cert_path])
# Make sure we can connect to the server.
self.assertTrue(self.can_connect())
def test_slow_retrieval_server_startup(self):
# Test normal case
self.server_process_handler = utils.TestServerProcess(log=logger,
server='slow_retrieval_server.py')
# Make sure we can connect to the server
self.assertTrue(self.can_connect())
def test_cleanup(self):
# Test normal case
self.server_process_handler = utils.TestServerProcess(log=logger,
server='simple_server.py')
self.server_process_handler.clean()
# Check if the process has successfully been killed.
self.assertFalse(self.server_process_handler.is_process_running())
def test_server_exit_before_timeout(self):
self.assertRaises(utils.TestServerProcessError, utils.TestServerProcess,
logger, server='non_existing_server.py')
# Test starting a server which immediately exits."
self.assertRaises(utils.TestServerProcessError, utils.TestServerProcess,
logger, server='fast_server_exit.py')
if __name__ == '__main__':
utils.configure_test_logging(sys.argv)
unittest.main()

View file

@ -27,10 +27,14 @@
import socket
import time
import subprocess
import tempfile
import random
import threading
import warnings
try:
import queue
except ImportError:
import Queue as queue # python2
import tuf.log
logger = logging.getLogger(__name__)
@ -49,6 +53,15 @@ def __str__(self):
return repr(self.value)
class TestServerProcessError(Exception):
def __init__(self, value="TestServerProcess"):
self.value = value
def __str__(self):
return repr(self.value)
@contextmanager
def ignore_deprecation_warnings(module):
with warnings.catch_warnings():
@ -90,7 +103,7 @@ def wait_for_server(host, server, port, timeout=10):
if not succeeded:
raise TimeoutError("Could not connect to the " + server \
+ " on port " + str(port) + " !")
+ " on port " + str(port) + "!")
def configure_test_logging(argv):
@ -119,7 +132,7 @@ class TestServerProcess():
"""
<Purpose>
Creates a child process with the subprocess.Popen object and
TempFile object used for logging.
uses a thread-safe Queue structure for logging.
<Arguments>
log:
@ -129,15 +142,9 @@ class TestServerProcess():
Path to the server to run in the subprocess.
Default is "simpler_server.py".
port:
The port used to access the server. If none is provided,
then one will be generated.
Default is None.
timeout:
Time in seconds in which the server should start or otherwise
TimeoutError error will be raised.
If 0 is given, no check if the server has started will be done.
Default is 10.
popen_cwd:
@ -154,69 +161,161 @@ class TestServerProcess():
def __init__(self, log, server='simple_server.py',
port=None, timeout=10, popen_cwd=".",
extra_cmd_args=[]):
# Create temporary log file used for logging stdout and stderr
# of the subprocess. In the mode "r+"" stands for reading and writing
# and "t" stands for text mode.
self.__temp_log_file = tempfile.TemporaryFile(mode='r+t')
timeout=10, popen_cwd=".", extra_cmd_args=[]):
self.server = server
self.port = port or random.randint(30000, 45000)
self.__logger = log
# Stores popped messages from the queue.
self.__logged_messages = []
try:
self._start_server(timeout, extra_cmd_args, popen_cwd)
wait_for_server('localhost', self.server, self.port, timeout)
except Exception as e:
# Clean the resources and log the server errors if any exists.
self.clean()
raise e
def _start_server(self, timeout, extra_cmd_args, popen_cwd):
"""
Start the server subprocess and a thread
responsible to redirect stdout/stderr to the Queue.
Waits for the port message maximum timeout seconds.
"""
self._start_process(extra_cmd_args, popen_cwd)
self._start_redirect_thread()
self._wait_for_port(timeout)
self.__logger.info(self.server + ' serving on ' + str(self.port))
def _start_process(self, extra_cmd_args, popen_cwd):
"""Starts the process running the server."""
# The "-u" option forces stdin, stdout and stderr to be unbuffered.
command = ['python', '-u', server, str(self.port)] + extra_cmd_args
command = ['python', '-u', self.server] + extra_cmd_args
# We are reusing one server subprocess in multiple unit tests, but we are
# collecting the logs per test.
# Reusing one subprocess in multiple tests, but split up the logs for each.
self.__server_process = subprocess.Popen(command,
stdout=self.__temp_log_file, stderr=subprocess.STDOUT, cwd=popen_cwd)
stdout=subprocess.PIPE, stderr=subprocess.STDOUT, cwd=popen_cwd)
self.__logger.info('Server process with process id ' \
+ str(self.__server_process.pid) + " serving on port " \
+ str(self.port) + ' started.')
if timeout > 0:
try:
wait_for_server('localhost', self.server, self.port, timeout)
except Exception as e:
# Make sure that errors from the server side will be logged.
def _start_redirect_thread(self):
"""Starts a thread responsible to redirect stdout/stderr to the Queue."""
# Run log_queue_worker() in a thread.
# The thread will exit when the child process dies.
self._log_queue = queue.Queue()
log_thread = threading.Thread(target=self._log_queue_worker,
args=(self.__server_process.stdout, self._log_queue))
# "daemon = True" means the thread won't interfere with the process exit.
log_thread.daemon = True
log_thread.start()
@staticmethod
def _log_queue_worker(stream, line_queue):
"""
Worker function to run in a seprate thread.
Reads from 'stream', puts lines in a Queue (Queue is thread-safe).
"""
while True:
# readline() is a blocking operation.
# decode to push a string in the queue instead of 8-bit bytes.
log_line = stream.readline().decode('utf-8')
line_queue.put(log_line)
if len(log_line) == 0:
# This is the end of the stream meaning the server process has exited.
stream.close()
break
def _wait_for_port(self, timeout):
"""
Validates the first item from the Queue against the port message.
If validation is successful, self.port is set.
Raises TestServerProcessError if the process has exited or
TimeoutError if no message was found within timeout seconds.
"""
# We have hardcoded the message we expect on a successful server startup.
# This message should be the first message sent by the server!
expected_msg = 'bind succeeded, server port is: '
try:
line = self._log_queue.get(timeout=timeout)
if len(line) == 0:
# The process has exited.
raise TestServerProcessError(self.server + ' exited unexpectedly ' \
+ 'with code ' + str(self.__server_process.poll()) + '!')
elif line.startswith(expected_msg):
self.port = int(line[len(expected_msg):])
else:
# An exception or some other message is printed from the server.
self.__logged_messages.append(line)
# Check if more lines are logged.
self.flush_log()
raise e
raise TestServerProcessError(self.server + ' did not print port ' \
+ 'message as first stdout line as expected!')
except queue.Empty:
raise TimeoutError('Failure during ' + self.server + ' startup!')
def flush_log(self):
"""Logs contents from TempFile, truncates buffer"""
def _kill_server_process(self):
"""Kills the server subprocess if it's running."""
# Seek is needed to move the pointer to the beginning of the file, because
# the subprocess could have read and/or write and thus moved the pointer.
self.__temp_log_file.seek(0)
log_message = self.__temp_log_file.read()
if len(log_message) > 0:
title = "Test server (" + self.server + ") output:"
message = [title] + log_message.splitlines()
self.__logger.info('\n| '.join(message))
# Make sure the file is empty before the next test logs new information.
self.__temp_log_file.truncate(0)
def clean(self):
"""Kills the subprocess and closes the TempFile.
Calls flush_log to check for logged information, but not yet flushed."""
# If there is anything logged, flush it before closing the resourses.
self.flush_log()
self.__temp_log_file.close()
if self.__server_process.returncode is None:
if self.is_process_running():
self.__logger.info('Server process ' + str(self.__server_process.pid) +
' terminated.')
self.__server_process.kill()
self.__server_process.wait()
def flush_log(self):
"""Flushes the log lines from the logging queue."""
while True:
# Get lines from log_queue
try:
line = self._log_queue.get(block=False)
if len(line) > 0:
self.__logged_messages.append(line)
except queue.Empty:
# No more lines are logged in the queue.
break
if len(self.__logged_messages) > 0:
title = "Test server (" + self.server + ") output:"
message = [title] + self.__logged_messages
self.__logger.info('| '.join(message))
self.__logged_messages = []
def clean(self):
"""
Kills the subprocess and closes the TempFile.
Calls flush_log to check for logged information, but not yet flushed.
"""
# If there is anything logged, flush it before closing the resourses.
self.flush_log()
self._kill_server_process()
def is_process_running(self):
return True if self.__server_process.poll() is None else False