author | Alberto Bertogli
<albertito@blitiri.com.ar> 2014-04-29 20:59:42 UTC |
committer | Alberto Bertogli
<albertito@blitiri.com.ar> 2014-04-29 20:59:42 UTC |
parent | c85c7509066cf377c2ea7f2e60b9473ff2b56391 |
tests/run_tests | +73 | -71 |
diff --git a/tests/run_tests b/tests/run_tests index e02ae6c..149380a 100755 --- a/tests/run_tests +++ b/tests/run_tests @@ -12,6 +12,8 @@ client under various conditions, to make sure they behave as intended. # NOTE: Please run "pylint --rcfile=.pylintrc run_tests" after making changes, # to make sure the file has a reasonably uniform coding style. +# You can also use "autopep8 -d --ignore=E301,E26 run_tests" to help with +# this, but make sure the output looks sane. import contextlib @@ -36,11 +38,11 @@ import unittest # Path to our built binaries; used to run the server and client for testing # purposes. BINS = os.path.abspath( - os.path.dirname(os.path.realpath(__file__)) + "/../out") + os.path.dirname(os.path.realpath(__file__)) + "/../out") # Path to the test OpenSSL configuration. OPENSSL_CONF = os.path.abspath( - os.path.dirname(os.path.realpath(__file__)) + "/openssl.cnf") + os.path.dirname(os.path.realpath(__file__)) + "/openssl.cnf") DEVNULL = open("/dev/null", "w") @@ -86,11 +88,10 @@ class Config(object): stdout=DEVNULL, stderr=DEVNULL) req_args = ["openssl", "req", "-new", "-batch", - "-subj", - "/commonName=*" + - "/organizationalUnitName=kxd-tests-%s:%s@%s" % ( - self.name, os.getlogin(), platform.node()), - "-key", "%s/key.pem" % self.path] + "-subj", ("/commonName=*" + + "/organizationalUnitName=kxd-tests-%s:%s@%s" % ( + self.name, os.getlogin(), platform.node())), + "-key", "%s/key.pem" % self.path] if self_sign: req_args.extend(["-x509", "-out", "%s/cert.pem" % self.path]) else: @@ -124,16 +125,15 @@ class CA(object): open("kxd-ca/index.txt", "w") open("kxd-ca/serial", "w").write("1000\n") subprocess.check_output( - ["openssl", "req", "-new", "-x509", "-batch", - "-config", OPENSSL_CONF, - "-subj", - "/commonName=*" + - "/organizationalUnitName=kxd-tests-ca:%s@%s" % ( - os.getlogin(), platform.node()), - "-extensions", "v3_ca", "-nodes", - "-keyout", "cakey.pem", - "-out", "cacert.pem"], - stderr=subprocess.STDOUT) + ["openssl", "req", "-new", "-x509", "-batch", + "-config", OPENSSL_CONF, + "-subj", ("/commonName=*" + + "/organizationalUnitName=kxd-tests-ca:%s@%s" % ( + os.getlogin(), platform.node())), + "-extensions", "v3_ca", "-nodes", + "-keyout", "cakey.pem", + "-out", "cacert.pem"], + stderr=subprocess.STDOUT) except subprocess.CalledProcessError as err: print "openssl call failed, output: %r" % err.output raise @@ -142,11 +142,10 @@ class CA(object): try: with pushd(self.path): subprocess.check_output( - ["openssl", "ca", "-batch", "-config", OPENSSL_CONF, - "-keyfile", "cakey.pem", "-cert", "cacert.pem", - "-in", csr, - "-out", "%s.pem" % os.path.splitext(csr)[0]], - stderr=subprocess.STDOUT) + ["openssl", "ca", "-batch", "-config", OPENSSL_CONF, + "-keyfile", "cakey.pem", "-cert", "cacert.pem", + "-in", csr, "-out", "%s.pem" % os.path.splitext(csr)[0]], + stderr=subprocess.STDOUT) except subprocess.CalledProcessError as err: print "openssl call failed, output: %r" % err.output raise @@ -266,8 +265,8 @@ class Simple(TestCase): # Normal successful case. self.server.new_key("k1", - allowed_clients=[self.client.cert()], - allowed_hosts=["localhost"]) + allowed_clients=[self.client.cert()], + allowed_hosts=["localhost"]) key = self.client.call(self.server.cert_path(), "kxd://localhost/k1") self.assertEquals(key, self.server.keys["k1"]) @@ -277,14 +276,14 @@ class Simple(TestCase): # No certificates allowed -> 403. self.server.new_key("k3", allowed_hosts=["localhost"]) self.assertClientFails("kxd://localhost/k3", - "403 Forbidden.*No allowed certificate found") + "403 Forbidden.*No allowed certificate found") # Host not allowed -> 403. self.server.new_key("k4", - allowed_clients=[self.client.cert()], - allowed_hosts=[]) + allowed_clients=[self.client.cert()], + allowed_hosts=[]) self.assertClientFails("kxd://localhost/k4", - "403 Forbidden.*Host not allowed") + "403 Forbidden.*Host not allowed") # Nothing allowed -> 403. # We don't restrict the reason of failure, that's not defined in this @@ -296,8 +295,8 @@ class Simple(TestCase): # We tell the client to expect the server certificate to be the client # one, which is never going to work. self.assertClientFails("kxd://localhost/k1", - "certificate signed by unknown authority", - cert_path=self.client.cert_path()) + "certificate signed by unknown authority", + cert_path=self.client.cert_path()) class Multiples(TestCase): @@ -309,8 +308,9 @@ class Multiples(TestCase): def test_two_clients(self): self.server.new_key("k1", - allowed_clients=[self.client.cert(), self.client2.cert()], - allowed_hosts=["localhost"]) + allowed_clients=[ + self.client.cert(), self.client2.cert()], + allowed_hosts=["localhost"]) key = self.client.call(self.server.cert_path(), "kxd://localhost/k1") self.assertEquals(key, self.server.keys["k1"]) @@ -319,29 +319,30 @@ class Multiples(TestCase): # Only one client allowed. self.server.new_key("k2", - allowed_clients=[self.client.cert()], - allowed_hosts=["localhost"]) + allowed_clients=[self.client.cert()], + allowed_hosts=["localhost"]) key = self.client.call(self.server.cert_path(), "kxd://localhost/k2") self.assertEquals(key, self.server.keys["k2"]) self.assertClientFails("kxd://localhost/k2", - "403 Forbidden.*No allowed certificate found", - client=self.client2) + "403 Forbidden.*No allowed certificate found", + client=self.client2) def test_many_keys(self): keys = ["a", "d/e", "a/b/c", "d/"] for key in keys: - self.server.new_key(key, - allowed_clients=[self.client.cert(), self.client2.cert()], - allowed_hosts=["localhost"]) + self.server.new_key( + key, + allowed_clients=[self.client.cert(), self.client2.cert()], + allowed_hosts=["localhost"]) for key in keys: data = self.client.call(self.server.cert_path(), - "kxd://localhost/%s" % key) + "kxd://localhost/%s" % key) self.assertEquals(data, self.server.keys[key]) data = self.client2.call(self.server.cert_path(), - "kxd://localhost/%s" % key) + "kxd://localhost/%s" % key) self.assertEquals(data, self.server.keys[key]) self.assertClientFails("kxd://localhost/a/b", "404 Not Found") @@ -385,8 +386,8 @@ class TrickyRequests(TestCase): # Requests with '..'. conn = httplib.HTTPSConnection("localhost", 19840, - key_file=self.client.key_path(), - cert_file=self.client.cert_path()) + key_file=self.client.key_path(), + cert_file=self.client.cert_path()) conn.request("GET", "/v1/a/../b") response = conn.getresponse() @@ -397,15 +398,15 @@ class TrickyRequests(TestCase): def test_server_cert(self): rawsock = socket.create_connection(("localhost", 19840)) sock = ssl.wrap_socket(rawsock, - keyfile=self.client.key_path(), - certfile=self.client.cert_path()) + keyfile=self.client.key_path(), + certfile=self.client.cert_path()) # We don't check the cipher itself, as it depends on the environment, # but we should be using > 128 bit secrets. self.assertTrue(sock.cipher()[2] > 128) server_cert = ssl.DER_cert_to_PEM_cert( - sock.getpeercert(binary_form=True)) + sock.getpeercert(binary_form=True)) self.assertEquals(server_cert, self.server.cert()) @@ -414,8 +415,8 @@ class BrokenServerConfig(TestCase): def test_broken_client_certs(self): self.server.new_key("k1", - allowed_clients=[self.client.cert()], - allowed_hosts=["localhost"]) + allowed_clients=[self.client.cert()], + allowed_hosts=["localhost"]) # Corrupt the client certificate. cfd = open(self.server.path + "/data/k1/allowed_clients", "r+") @@ -424,13 +425,14 @@ class BrokenServerConfig(TestCase): cfd.write('+/+BROKEN+/+') cfd.close() - self.assertClientFails("kxd://localhost/k1", - "500 Internal Server Error.*Error loading certs") + self.assertClientFails( + "kxd://localhost/k1", + "500 Internal Server Error.*Error loading certs") def test_missing_key(self): self.server.new_key("k1", - allowed_clients=[self.client.cert()], - allowed_hosts=["localhost"]) + allowed_clients=[self.client.cert()], + allowed_hosts=["localhost"]) os.unlink(self.server.path + "/data/k1/key") self.assertClientFails("kxd://localhost/k1", "404 Not Found") @@ -444,7 +446,7 @@ class Delegation(TestCase): pass def prepare(self, server_self_sign=True, client_self_sign=True, - ca_sign_server=None, ca_sign_client=None): + ca_sign_server=None, ca_sign_client=None): self.server = ServerConfig(self_sign=server_self_sign) self.client = ClientConfig(self_sign=client_self_sign) @@ -465,8 +467,8 @@ class Delegation(TestCase): self.prepare(server_self_sign=False) self.server.new_key("k1", - allowed_clients=[self.client.cert()], - allowed_hosts=["localhost"]) + allowed_clients=[self.client.cert()], + allowed_hosts=["localhost"]) # Successful request. key = self.client.call(self.ca.cert_path(), "kxd://localhost/k1") @@ -475,48 +477,48 @@ class Delegation(TestCase): # The server is signed by the CA, but the CA is unknown to the client, # so it can't validate it, even if it knows the server directly. self.assertClientFails("kxd://localhost/k1", - "certificate signed by unknown authority", - cert_path=self.server.cert_path()) + "certificate signed by unknown authority", + cert_path=self.server.cert_path()) # Same as above, but give the wrong CA. ca2 = CA() self.assertClientFails("kxd://localhost/k1", - "certificate signed by unknown authority", - cert_path=ca2.cert_path()) + "certificate signed by unknown authority", + cert_path=ca2.cert_path()) def test_client_delegated(self): self.prepare(client_self_sign=False) # Successful request. self.server.new_key("k1", - allowed_clients=[self.ca.cert()], - allowed_hosts=["localhost"]) + allowed_clients=[self.ca.cert()], + allowed_hosts=["localhost"]) key = self.client.call(self.server.cert_path(), "kxd://localhost/k1") self.assertEquals(key, self.server.keys["k1"]) # The CA signing the client is unknown to the server. ca2 = CA() self.server.new_key("k2", - allowed_clients=[ca2.cert()], - allowed_hosts=["localhost"]) + allowed_clients=[ca2.cert()], + allowed_hosts=["localhost"]) self.assertClientFails("kxd://localhost/k2", - "403 Forbidden.*No allowed certificate found", - cert_path=self.server.cert_path()) + "403 Forbidden.*No allowed certificate found", + cert_path=self.server.cert_path()) # The client is signed by the CA, but the CA is unknown to the server, # so it can't validate it, even if it knows the client directly. self.server.new_key("k3", - allowed_clients=[self.client.cert()], - allowed_hosts=["localhost"]) + allowed_clients=[self.client.cert()], + allowed_hosts=["localhost"]) self.assertClientFails("kxd://localhost/k3", - "403 Forbidden.*No allowed certificate found", - cert_path=self.server.cert_path()) + "403 Forbidden.*No allowed certificate found", + cert_path=self.server.cert_path()) def test_both_delegated(self): self.prepare(server_self_sign=False, client_self_sign=False) self.server.new_key("k1", - allowed_clients=[self.ca.cert()], - allowed_hosts=["localhost"]) + allowed_clients=[self.ca.cert()], + allowed_hosts=["localhost"]) key = self.client.call(self.ca.cert_path(), "kxd://localhost/k1") self.assertEquals(key, self.server.keys["k1"])