python-tuf/tests/test_download.py

327 lines
12 KiB
Python
Raw Permalink Normal View History

#!/usr/bin/env python
# Copyright 2014 - 2017, New York University and the TUF contributors
# SPDX-License-Identifier: MIT OR Apache-2.0
2013-01-31 18:54:15 +00:00
"""
<Program>
test_download.py
2013-01-31 18:54:15 +00:00
<Author>
Konstantin Andrianov.
2013-01-31 18:54:15 +00:00
<Started>
March 26, 2012.
2013-01-31 18:54:15 +00:00
<Copyright>
See LICENSE-MIT OR LICENSE for licensing information.
2013-01-31 18:54:15 +00:00
<Purpose>
Unit test for 'download.py'.
NOTE: Make sure test_download.py is ran in 'tuf/tests/' directory.
Otherwise, module that launches simple server would not be found.
2013-01-31 18:54:15 +00:00
"""
# Help with Python 3 compatibility, where the print statement is a function, an
# implicit relative import is invalid, and the '/' operator performs true
# division. Example: print 'hello world' raises a 'SyntaxError' exception.
from __future__ import print_function
from __future__ import absolute_import
from __future__ import division
from __future__ import unicode_literals
2013-01-31 18:54:15 +00:00
import hashlib
import logging
2013-08-06 17:40:24 +00:00
import os
import random
import subprocess
2013-08-06 17:40:24 +00:00
import time
import unittest
import tuf
import tuf.download as download
import tuf.log
import tuf.unittest_toolbox as unittest_toolbox
import tuf.exceptions
import requests.exceptions
import securesystemslib
import six
logger = logging.getLogger('tuf.test_download')
2013-01-31 21:35:32 +00:00
class TestDownload(unittest_toolbox.Modified_TestCase):
def setUp(self):
"""
2013-01-31 21:35:32 +00:00
Create a temporary file and launch a simple server in the
current working directory.
"""
unittest_toolbox.Modified_TestCase.setUp(self)
# Making a temporary file.
current_dir = os.getcwd()
target_filepath = self.make_temp_data_file(directory=current_dir)
2013-01-31 21:35:32 +00:00
self.target_fileobj = open(target_filepath, 'r')
self.target_data = self.target_fileobj.read()
self.target_data_length = len(self.target_data)
2013-01-31 21:35:32 +00:00
2014-06-17 14:28:56 +00:00
# Launch a SimpleHTTPServer (serves files in the current dir).
2013-02-01 20:39:27 +00:00
self.PORT = random.randint(30000, 45000)
command = ['python', 'simple_server.py', str(self.PORT)]
self.server_proc = subprocess.Popen(command, stderr=subprocess.PIPE)
logger.info('\n\tServer process started.')
logger.info('\tServer process id: '+str(self.server_proc.pid))
logger.info('\tServing on port: '+str(self.PORT))
junk, rel_target_filepath = os.path.split(target_filepath)
2013-02-01 20:39:27 +00:00
self.url = 'http://localhost:'+str(self.PORT)+'/'+rel_target_filepath
2013-01-31 21:35:32 +00:00
# NOTE: Following error is raised if delay is not applied:
# <urlopen error [Errno 111] Connection refused>
time.sleep(1)
2013-01-31 21:35:32 +00:00
2013-02-01 20:39:27 +00:00
# Computing hash of target file data.
m = hashlib.md5()
m.update(self.target_data.encode('utf-8'))
digest = m.hexdigest()
self.target_hash = {'md5':digest}
2013-02-01 20:39:27 +00:00
2013-01-31 21:35:32 +00:00
# Stop server process and perform clean up.
2013-01-31 21:35:32 +00:00
def tearDown(self):
unittest_toolbox.Modified_TestCase.tearDown(self)
if self.server_proc.returncode is None:
logger.info('\tServer process '+str(self.server_proc.pid)+' terminated.')
self.server_proc.kill()
self.target_fileobj.close()
2013-01-31 21:35:32 +00:00
2013-08-06 17:40:24 +00:00
# Test: Normal case.
2013-01-31 21:35:32 +00:00
def test_download_url_to_tempfileobj(self):
download_file = download.safe_download
2013-01-31 18:54:15 +00:00
2013-08-06 17:40:24 +00:00
temp_fileobj = download_file(self.url, self.target_data_length)
self.assertEqual(self.target_data, temp_fileobj.read().decode('utf-8'))
self.assertEqual(self.target_data_length, len(temp_fileobj.read()))
2013-08-06 17:40:24 +00:00
temp_fileobj.close_temp_file()
2013-02-01 20:39:27 +00:00
2013-08-06 17:40:24 +00:00
2013-08-06 17:40:24 +00:00
# Test: Incorrect lengths.
def test_download_url_to_tempfileobj_and_lengths(self):
# We do *not* catch
# 'securesystemslib.exceptions.DownloadLengthMismatchError' in the
# following two calls because the file at 'self.url' contains enough bytes
# to satisfy the smaller number of required bytes requested.
# safe_download() and unsafe_download() will only log a warning when the
# the server-reported length of the file does not match the
# required_length. 'updater.py' *does* verify the hashes of downloaded
# content.
download.safe_download(self.url, self.target_data_length - 4)
download.unsafe_download(self.url, self.target_data_length - 4)
# We catch 'tuf.exceptions.DownloadLengthMismatchError' for safe_download()
# because it will not download more bytes than requested (in this case, a
# length greater than the size of the target file).
self.assertRaises(tuf.exceptions.DownloadLengthMismatchError,
download.safe_download, self.url, self.target_data_length + 1)
# Calling unsafe_download() with a mismatched length should not raise an
# exception.
download.unsafe_download(self.url, self.target_data_length + 1)
2013-02-01 20:39:27 +00:00
2013-01-31 18:54:15 +00:00
2013-08-06 17:40:24 +00:00
def test_download_url_to_tempfileobj_and_performance(self):
2013-01-31 21:35:32 +00:00
"""
2013-02-01 20:39:27 +00:00
# Measuring performance of 'auto_flush = False' vs. 'auto_flush = True'
# in download._download_file() during write. No change was observed.
2013-02-01 20:39:27 +00:00
star_cpu = time.clock()
star_real = time.time()
2013-01-31 18:54:15 +00:00
temp_fileobj = download_file(self.url,
self.target_data_length)
2013-01-31 18:54:15 +00:00
2013-02-01 20:39:27 +00:00
end_cpu = time.clock()
end_real = time.time()
self.assertEqual(self.target_data, temp_fileobj.read())
self.assertEqual(self.target_data_length, len(temp_fileobj.read()))
2013-02-01 20:39:27 +00:00
temp_fileobj.close_temp_file()
2013-01-31 21:35:32 +00:00
2013-01-31 18:54:15 +00:00
print "Performance cpu time: "+str(end_cpu - star_cpu)
print "Performance real time: "+str(end_real - star_real)
2013-02-01 20:39:27 +00:00
# TODO: [Not urgent] Show the difference by setting write(auto_flush=False)
"""
2013-01-31 18:54:15 +00:00
2013-08-06 17:40:24 +00:00
# Test: Incorrect/Unreachable URLs.
def test_download_url_to_tempfileobj_and_urls(self):
download_file = download.safe_download
unsafe_download_file = download.unsafe_download
2013-08-06 17:40:24 +00:00
self.assertRaises(securesystemslib.exceptions.FormatError,
download_file, None, self.target_data_length)
2013-08-06 17:40:24 +00:00
self.assertRaises(tuf.exceptions.URLParsingError,
2013-08-06 17:40:24 +00:00
download_file,
self.random_string(), self.target_data_length)
2013-08-06 17:40:24 +00:00
self.assertRaises(requests.exceptions.HTTPError,
2013-08-06 17:40:24 +00:00
download_file,
'http://localhost:' + str(self.PORT) + '/' + self.random_string(),
self.target_data_length)
2013-08-06 17:40:24 +00:00
self.assertRaises(requests.exceptions.ConnectionError,
2013-08-06 17:40:24 +00:00
download_file,
'http://localhost:' + str(self.PORT+1) + '/' + self.random_string(),
self.target_data_length)
# Specify an unsupported URI scheme.
url_with_unsupported_uri = self.url.replace('http', 'file')
self.assertRaises(requests.exceptions.InvalidSchema, download_file, url_with_unsupported_uri,
self.target_data_length)
self.assertRaises(requests.exceptions.InvalidSchema, unsafe_download_file,
url_with_unsupported_uri, self.target_data_length)
# This test uses sites on the internet, requiring a net connection to succeed.
# Since this is the only such test in TUF, I'm not going to enable it... but
# it's here in case it's useful for diagnosis.
# def test_https_validation(self):
# """
# Use some known URLs on the net to ensure that TUF download checks SSL
# certificates appropriately.
# """
# # We should never get as far as the target file download itself, so the
# # length we pass to safe_download and unsafe_download shouldn't matter.
# irrelevant_length = 10
#
# for bad_url in [
# 'https://expired.badssl.com/', # expired certificate
# 'https://wrong.host.badssl.com/', ]: # hostname verification fail
#
# with self.assertRaises(requests.exceptions.SSLError):
# download.safe_download(bad_url, irrelevant_length)
#
# with self.assertRaises(requests.exceptions.SSLError):
# download.unsafe_download(bad_url, irrelevant_length)
2014-06-17 14:28:56 +00:00
def test_https_connection(self):
"""
Try various HTTPS downloads using trusted and untrusted certificates with
and without the correct hostname listed in the SSL certificate.
"""
2014-06-17 14:28:56 +00:00
# Make a temporary file to be served to the client.
current_directory = os.getcwd()
target_filepath = self.make_temp_data_file(directory=current_directory)
2014-06-17 14:28:56 +00:00
with open(target_filepath, 'r') as target_file_object:
target_data_length = len(target_file_object.read())
# These cert files should both be valid, but "good" lists localhost and
# "bad" lists another hostname. We'll be trying to download from localhost,
# so we expect
good_cert_fname = os.path.join('ssl_certs', 'ssl_cert.crt')
bad_cert_fname = os.path.join('ssl_certs', 'ssl_cert_wronghost.crt')
# Launch two https servers (serves files in the current dir).
# The first we expect to operate correctly, and the second we run with an
# HTTPS certificate with an unexpected hostname.
port1 = str(random.randint(30000, 45000))
port2 = str(int(port1) + 1)
command1 = ['python', 'simple_https_server.py', port1, good_cert_fname]
command2 = ['python', 'simple_https_server.py', port2, bad_cert_fname]
good_https_server_proc = subprocess.Popen(command1, stderr=subprocess.PIPE)
bad_https_server_proc = subprocess.Popen(command2, stderr=subprocess.PIPE)
2014-06-17 14:28:56 +00:00
# NOTE: Following error is raised if delay is not applied:
# <urlopen error [Errno 111] Connection refused>
time.sleep(0.2)
relative_target_fpath = os.path.basename(target_filepath)
good_https_url = 'https://localhost:' + port1 + '/' + relative_target_fpath
bad_https_url = good_https_url.replace(':' + port1, ':' + port2)
2014-06-17 14:28:56 +00:00
# Download the target file using an https connection.
# Use try-finally solely to ensure that the server processes are killed.
try:
# Trust the certfile that happens to use a different hostname than we
# will expect.
os.environ['REQUESTS_CA_BUNDLE'] = bad_cert_fname
# Try connecting to the server process with the bad cert while trusting
# the bad cert. Expect failure because even though we trust it, the
# hostname we're connecting to does not match the hostname in the cert.
logger.info('Trying https download of target file: ' + bad_https_url)
with self.assertRaises(requests.exceptions.SSLError):
download.safe_download(bad_https_url, target_data_length)
with self.assertRaises(requests.exceptions.SSLError):
download.unsafe_download(bad_https_url, target_data_length)
# Try connecting to the server process with the good cert while not
# trusting the good cert (trusting the bad cert instead). Expect failure
# because even though the server's cert file is otherwise OK, we don't
# trust it.
print('Trying https download of target file: ' + good_https_url)
with self.assertRaises(requests.exceptions.SSLError):
download.safe_download(good_https_url, target_data_length)
with self.assertRaises(requests.exceptions.SSLError):
download.unsafe_download(good_https_url, target_data_length)
# Configure environment to trust the good cert file now instead.
os.environ['REQUESTS_CA_BUNDLE'] = good_cert_fname
# Try connecting to the server process with the good cert while trusting
# that cert. Expect success.
# Note: running these OK downloads at the top of this try section causes
# a failure in a previous assertion: retrieving the same good URL
# again after no longer "trusting" the good certfile still succeeds
# if we had previously succeeded in retrieving that same URL while
# still trusting the good cert. Perhaps it's a caching issue....?
# I'm not especially concerned yet, but take note for later....
logger.info('Trying https download of target file: ' + good_https_url)
download.safe_download(good_https_url, target_data_length)
download.unsafe_download(good_https_url, target_data_length)
2014-06-17 14:28:56 +00:00
finally:
if good_https_server_proc.returncode is None:
logger.info(
'Terminating server process ' + str(good_https_server_proc.pid))
good_https_server_proc.kill()
if bad_https_server_proc.returncode is None:
logger.info(
'Terminating server process ' + str(bad_https_server_proc.pid))
bad_https_server_proc.kill()
2014-06-17 14:28:56 +00:00
2013-08-06 17:40:24 +00:00
def test__get_content_length(self):
content_length = \
tuf.download._get_content_length({'bad_connection_object': 8})
self.assertEqual(content_length, None)
2013-02-01 20:39:27 +00:00
# Run unit test.
if __name__ == '__main__':
unittest.main()