author | Alberto Bertogli
<albertito@blitiri.com.ar> 2019-08-31 16:18:19 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2019-08-31 16:28:15 UTC |
parent | 41deb810fe0deaee3d8de79a6d9bd12db6902b48 |
.travis.yml | +1 | -1 |
Makefile | +1 | -1 |
README.md | +1 | -1 |
tests/run_tests | +83 | -78 |
diff --git a/.travis.yml b/.travis.yml index 6b7fb2f..1a4ad77 100644 --- a/.travis.yml +++ b/.travis.yml @@ -10,7 +10,7 @@ go: addons: apt: packages: - - python + - python3 - openssl # Nothing to do for install, the tests will build the binaries anyway. diff --git a/Makefile b/Makefile index 5186d6c..6f429b8 100644 --- a/Makefile +++ b/Makefile @@ -20,7 +20,7 @@ vet: $(GO) tool vet . test: kxd kxc - python tests/run_tests -b + tests/run_tests -b tests: test diff --git a/README.md b/README.md index 5b32001..70ce4a1 100644 --- a/README.md +++ b/README.md @@ -78,7 +78,7 @@ The configuration helper scripts (`create-kxd-config`, `kxc-add-key`, etc.) depend on: `bash`, `openssl` (the binary), and core utilities (`mkdir`, `dd`, etc.). -Testing needs Python 2.7, and openssl (the binary). +Testing needs Python 3, and openssl (the binary). ## Bugs and contact diff --git a/tests/run_tests b/tests/run_tests index 0bd3a34..d6d4ba2 100755 --- a/tests/run_tests +++ b/tests/run_tests @@ -1,4 +1,4 @@ -#!/usr/bin/env python +#!/usr/bin/env python3 """ Tests for kxd and kxc @@ -10,26 +10,26 @@ It will create different test configurations and run the compiled server and client under various conditions, to make sure they behave as intended. """ -# NOTE: Please run "pylint --rcfile=.pylintrc run_tests" after making changes, +# NOTE: Please run "pylint3 --rcfile=.pylintrc run_tests" after making changes, # to make sure the file has a reasonably uniform coding style. # You can also use "autopep8 -d --ignore=E301,E26 run_tests" to help with # this, but make sure the output looks sane. import contextlib -import httplib +import http.client import os -import platform import shutil import socket import ssl import subprocess -import sys import tempfile import textwrap import time +import tracemalloc import unittest +tracemalloc.start() ############################################################ # Test infrastructure. @@ -81,7 +81,7 @@ def pushd(path): os.chdir(prev) -class Config(object): +class Config: def __init__(self, name): self.path = tempfile.mkdtemp(prefix="config-%s-" % name, dir=TEMPDIR) self.name = name @@ -92,7 +92,7 @@ class Config(object): "-out", "%s/key.pem" % self.path, "2048"] subprocess.check_output(cmd, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as err: - print "openssl call failed, output: %r" % err.output + print("openssl call failed, output: %r" % err.output) raise ouname = "kxd-tests-%s" % self.name @@ -108,7 +108,7 @@ class Config(object): try: subprocess.check_output(req_args, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as err: - print "openssl call failed, output: %r" % err.output + print("openssl call failed, output: %r" % err.output) raise def cert_path(self): @@ -121,10 +121,10 @@ class Config(object): return self.path + "/cert.csr" def cert(self): - return open(self.path + "/cert.pem").read() + return read_all(self.path + "/cert.pem") -class CA(object): +class CA: def __init__(self): self.path = tempfile.mkdtemp(prefix="config-ca-", dir=TEMPDIR) os.makedirs(self.path + "/kxd-ca/newcerts/") @@ -132,8 +132,9 @@ class CA(object): try: # We need to run the CA commands from within the path. with pushd(self.path): - open("kxd-ca/index.txt", "w") - open("kxd-ca/serial", "w").write("1000\n") + open("kxd-ca/index.txt", "w").close() + with open("kxd-ca/serial", "w") as serial: + serial.write("1000\n") subprocess.check_output( ["openssl", "req", "-new", "-x509", "-batch", "-config", OPENSSL_CONF, @@ -144,7 +145,7 @@ class CA(object): "-out", "cacert.pem"], stderr=subprocess.STDOUT) except subprocess.CalledProcessError as err: - print "openssl call failed, output: %r" % err.output + print("openssl call failed, output: %r" % err.output) raise def sign(self, csr): @@ -156,14 +157,14 @@ class CA(object): "-in", csr, "-out", "%s.pem" % os.path.splitext(csr)[0]], stderr=subprocess.STDOUT) except subprocess.CalledProcessError as err: - print "openssl call failed, output: %r" % err.output + print("openssl call failed, output: %r" % err.output) raise def cert_path(self): return self.path + "/cacert.pem" def cert(self): - return open(self.path + "/cacert.pem").read() + return read_all(self.path + "/cacert.pem") class ServerConfig(Config): @@ -177,17 +178,18 @@ class ServerConfig(Config): key_path = self.path + "/data/" + name + "/" if not os.path.isdir(key_path): os.makedirs(key_path) - open(key_path + "key", "w").write(self.keys[name]) + with open(key_path + "key", "bw") as key: + key.write(self.keys[name]) if allowed_clients is not None: - cfd = open(key_path + "/allowed_clients", "a") - for cli in allowed_clients: - cfd.write(cli) + with open(key_path + "/allowed_clients", "a") as cfd: + for cli in allowed_clients: + cfd.write(cli) if allowed_hosts is not None: - hfd = open(key_path + "/allowed_hosts", "a") - for host in allowed_hosts: - hfd.write(host + "\n") + with open(key_path + "/allowed_hosts", "a") as hfd: + for host in allowed_hosts: + hfd.write(host + "\n") class ClientConfig(Config): @@ -202,10 +204,10 @@ class ClientConfig(Config): "--server_cert=%s" % server_cert, url] try: - print "Running client:", " ".join(args) + print("Running client:", " ".join(args)) return subprocess.check_output(args, stderr=subprocess.STDOUT) except subprocess.CalledProcessError as err: - print "Client call failed, output: %r" % err.output + print("Client call failed, output: %r" % err.output) raise @@ -215,12 +217,16 @@ def launch_daemon(cfg): "--key=%s/key.pem" % cfg, "--cert=%s/cert.pem" % cfg, "--logfile=%s/log" % cfg, - "--hook=%s/hook" % cfg - ] - print "Launching server: ", " ".join(args) + "--hook=%s/hook" % cfg] + print("Launching server: ", " ".join(args)) return subprocess.Popen(args) +def read_all(fname): + with open(fname) as fd: # pylint: disable=invalid-name + return fd.read() + + class TestCase(unittest.TestCase): def setUp(self): self.server = ServerConfig() @@ -228,11 +234,11 @@ class TestCase(unittest.TestCase): self.daemon = None self.ca = None # pylint: disable=invalid-name self.launch_server(self.server) - self.longMessage = True def tearDown(self): if self.daemon: self.daemon.kill() + self.daemon.wait() def launch_server(self, server): self.daemon = launch_daemon(server.path) @@ -241,8 +247,9 @@ class TestCase(unittest.TestCase): deadline = time.time() + 5 while time.time() < deadline: try: - socket.create_connection(("localhost", 19840), timeout=5) - break + with socket.create_connection( + ("localhost", 19840), timeout=5): + break except socket.error: continue else: @@ -258,7 +265,7 @@ class TestCase(unittest.TestCase): try: client.call(cert_path, url) except subprocess.CalledProcessError as err: - self.assertRegexpMatches(err.output, regexp) + self.assertRegex(err.output.decode(), regexp) else: self.fail("Client call did not fail as expected") @@ -280,7 +287,7 @@ class Simple(TestCase): allowed_clients=[self.client.cert()], allowed_hosts=["localhost"]) key = self.client.call(self.server.cert_path(), "kxd://localhost/k1") - self.assertEquals(key, self.server.keys["k1"]) + self.assertEqual(key, self.server.keys["k1"]) # Unknown key -> 404. self.assertClientFails("kxd://localhost/k2", "404 Not Found") @@ -324,17 +331,17 @@ class Multiples(TestCase): self.client.cert(), self.client2.cert()], allowed_hosts=["localhost"]) key = self.client.call(self.server.cert_path(), "kxd://localhost/k1") - self.assertEquals(key, self.server.keys["k1"]) + self.assertEqual(key, self.server.keys["k1"]) key = self.client2.call(self.server.cert_path(), "kxd://localhost/k1") - self.assertEquals(key, self.server.keys["k1"]) + self.assertEqual(key, self.server.keys["k1"]) # Only one client allowed. self.server.new_key("k2", allowed_clients=[self.client.cert()], allowed_hosts=["localhost"]) key = self.client.call(self.server.cert_path(), "kxd://localhost/k2") - self.assertEquals(key, self.server.keys["k2"]) + self.assertEqual(key, self.server.keys["k2"]) self.assertClientFails("kxd://localhost/k2", "403 Forbidden.*No allowed certificate found", @@ -351,11 +358,11 @@ class Multiples(TestCase): for key in keys: data = self.client.call(self.server.cert_path(), "kxd://localhost/%s" % key) - self.assertEquals(data, self.server.keys[key]) + self.assertEqual(data, self.server.keys[key]) data = self.client2.call(self.server.cert_path(), "kxd://localhost/%s" % key) - self.assertEquals(data, self.server.keys[key]) + self.assertEqual(data, self.server.keys[key]) self.assertClientFails("kxd://localhost/a/b", "404 Not Found") @@ -368,58 +375,56 @@ class Multiples(TestCase): # Write a file containing the certs of both servers. server_certs_path = self.client.path + "/server_certs.pem" server_certs = open(server_certs_path, "w") - server_certs.write(open(server1.cert_path()).read()) - server_certs.write(open(server2.cert_path()).read()) + server_certs.write(read_all(server1.cert_path())) + server_certs.write(read_all(server2.cert_path())) server_certs.close() key = self.client.call(server_certs_path, "kxd://localhost/k1") - self.assertEquals(key, server1.keys["k1"]) + self.assertEqual(key, server1.keys["k1"]) self.daemon.kill() + self.daemon.wait() time.sleep(0.5) self.launch_server(server2) key = self.client.call(server_certs_path, "kxd://localhost/k1") - self.assertEquals(key, server2.keys["k1"]) + self.assertEqual(key, server2.keys["k1"]) class TrickyRequests(TestCase): """Tests for tricky requests.""" - def HTTPSConnection(self, host, port, key_file=None, cert_file=None): - # httplib.HTTPSConnection() wrapper that works with versions before - # and after Python 2.7.9, which introduced default server validation - # with no backwards-compatible way of turning it off. - if sys.hexversion < 0x2070900: - return httplib.HTTPSConnection( - host, port, key_file=key_file, cert_file=cert_file) - + def https_connection(self, host, port, key_file=None, cert_file=None): # Get an SSL context that can validate our server certificate. context = ssl.create_default_context(cafile=self.server.cert_path()) - return httplib.HTTPSConnection( - host, port, key_file=key_file, cert_file=cert_file, - context=context) - - def test_tricky(self): - # No local certificate. - conn = self.HTTPSConnection("localhost", 19840) + context.check_hostname = False + if cert_file: + context.load_cert_chain(cert_file, key_file) + return http.client.HTTPSConnection(host, port, context=context) + + def test_no_local_cert(self): + """No local certificate.""" + conn = self.https_connection("localhost", 19840) try: conn.request("GET", "/v1/") + conn.close() except ssl.SSLError as err: - self.assertRegexpMatches(str(err), "alert bad certificate") + self.assertEqual(err.reason, "SSLV3_ALERT_BAD_CERTIFICATE") else: self.fail("Client call did not fail as expected") - # Requests with '..'. - conn = self.HTTPSConnection("localhost", 19840, - key_file=self.client.key_path(), - cert_file=self.client.cert_path()) + def test_path_with_dotdot(self): + """Requests with '..'.""" + conn = self.https_connection("localhost", 19840, + key_file=self.client.key_path(), + cert_file=self.client.cert_path()) conn.request("GET", "/v1/a/../b") response = conn.getresponse() + conn.close() # Go's http server intercepts these and gives us a 301 Moved # Permanently. - self.assertEquals(response.status, 301) + self.assertEqual(response.status, 301) def test_server_cert(self): rawsock = socket.create_connection(("localhost", 19840)) @@ -433,7 +438,8 @@ class TrickyRequests(TestCase): server_cert = ssl.DER_cert_to_PEM_cert( sock.getpeercert(binary_form=True)) - self.assertEquals(server_cert, self.server.cert()) + self.assertEqual(server_cert, self.server.cert()) + sock.close() class BrokenServerConfig(TestCase): @@ -445,11 +451,9 @@ class BrokenServerConfig(TestCase): allowed_hosts=["localhost"]) # Corrupt the client certificate. - cfd = open(self.server.path + "/data/k1/allowed_clients", "r+") - for _ in range(5): - cfd.readline() - cfd.write('+/+BROKEN+/+') - cfd.close() + with open(self.server.path + "/data/k1/allowed_clients", "tr+") as cfd: + cfd.seek(30) + cfd.write('+/+BROKEN+/+') self.assertClientFails( "kxd://localhost/k1", @@ -498,12 +502,12 @@ class Delegation(TestCase): # Successful request. key = self.client.call(self.ca.cert_path(), "kxd://localhost/k1") - self.assertEquals(key, self.server.keys["k1"]) + self.assertEqual(key, self.server.keys["k1"]) # The server is signed by the CA, but the CA is unknown to the client. # But the client knows the server directly, so it's allowed. key = self.client.call(self.server.cert_path(), "kxd://localhost/k1") - self.assertEquals(key, self.server.keys["k1"]) + self.assertEqual(key, self.server.keys["k1"]) # Same as above, but give the wrong CA. ca2 = CA() @@ -519,7 +523,7 @@ class Delegation(TestCase): allowed_clients=[self.ca.cert()], allowed_hosts=["localhost"]) key = self.client.call(self.server.cert_path(), "kxd://localhost/k1") - self.assertEquals(key, self.server.keys["k1"]) + self.assertEqual(key, self.server.keys["k1"]) # The CA signing the client is unknown to the server. ca2 = CA() @@ -536,7 +540,7 @@ class Delegation(TestCase): allowed_clients=[self.client.cert()], allowed_hosts=["localhost"]) key = self.client.call(self.server.cert_path(), "kxd://localhost/k3") - self.assertEquals(key, self.server.keys["k3"]) + self.assertEqual(key, self.server.keys["k3"]) def test_both_delegated(self): self.prepare(server_self_sign=False, client_self_sign=False) @@ -545,7 +549,7 @@ class Delegation(TestCase): allowed_hosts=["localhost"]) key = self.client.call(self.ca.cert_path(), "kxd://localhost/k1") - self.assertEquals(key, self.server.keys["k1"]) + self.assertEqual(key, self.server.keys["k1"]) class Hook(TestCase): @@ -562,8 +566,9 @@ class Hook(TestCase): path = self.server.path + "/hook" script = self.HOOK_SCRIPT_TMPL.format(exit_code=exit_code) - open(path, "w").write(script) - os.chmod(path, 0770) + with open(path, "w") as hook: + hook.write(script) + os.chmod(path, 0o770) def test_simple(self): self.write_hook(exit_code=0) @@ -573,9 +578,9 @@ class Hook(TestCase): allowed_clients=[self.client.cert()], allowed_hosts=["localhost"]) key = self.client.call(self.server.cert_path(), "kxd://localhost/k1") - self.assertEquals(key, self.server.keys["k1"]) + self.assertEqual(key, self.server.keys["k1"]) - hook_out = open(self.server.path + "/data/hook-output").read() + hook_out = read_all(self.server.path + "/data/hook-output") self.assertIn("CLIENT_CERT_SUBJECT=OU=kxd-tests-client", hook_out) # Failure caused by the hook exiting with error. @@ -584,7 +589,7 @@ class Hook(TestCase): # Failure caused by the hook not being executable. self.write_hook(exit_code=0) - os.chmod(self.server.path + "/hook", 0660) + os.chmod(self.server.path + "/hook", 0o660) self.assertClientFails("kxd://localhost/k1", "Prevented by hook")