git » kxd » commit e21f639

tests: Adjust run_test to the pep8 coding style

author Alberto Bertogli
2014-04-29 20:59:42 UTC
committer Alberto Bertogli
2014-04-29 20:59:42 UTC
parent c85c7509066cf377c2ea7f2e60b9473ff2b56391

tests: Adjust run_test to the pep8 coding style

This patch make some style changes to run_tests to adjust it to the pep8
coding style, based on autopep8's suggestions.

Signed-off-by: Alberto Bertogli <albertito@blitiri.com.ar>

tests/run_tests +73 -71

diff --git a/tests/run_tests b/tests/run_tests
index e02ae6c..149380a 100755
--- a/tests/run_tests
+++ b/tests/run_tests
@@ -12,6 +12,8 @@ client under various conditions, to make sure they behave as intended.
 
 # NOTE: Please run "pylint --rcfile=.pylintrc run_tests" after making changes,
 # to make sure the file has a reasonably uniform coding style.
+# You can also use "autopep8 -d --ignore=E301,E26 run_tests" to help with
+# this, but make sure the output looks sane.
 
 
 import contextlib
@@ -36,11 +38,11 @@ import unittest
 # Path to our built binaries; used to run the server and client for testing
 # purposes.
 BINS = os.path.abspath(
-        os.path.dirname(os.path.realpath(__file__)) + "/../out")
+    os.path.dirname(os.path.realpath(__file__)) + "/../out")
 
 # Path to the test OpenSSL configuration.
 OPENSSL_CONF = os.path.abspath(
-        os.path.dirname(os.path.realpath(__file__)) + "/openssl.cnf")
+    os.path.dirname(os.path.realpath(__file__)) + "/openssl.cnf")
 
 DEVNULL = open("/dev/null", "w")
 
@@ -86,11 +88,10 @@ class Config(object):
             stdout=DEVNULL, stderr=DEVNULL)
 
         req_args = ["openssl", "req", "-new", "-batch",
-             "-subj",
-                "/commonName=*" +
-                "/organizationalUnitName=kxd-tests-%s:%s@%s" % (
-                    self.name, os.getlogin(), platform.node()),
-             "-key", "%s/key.pem" % self.path]
+                    "-subj", ("/commonName=*" +
+                              "/organizationalUnitName=kxd-tests-%s:%s@%s" % (
+                                  self.name, os.getlogin(), platform.node())),
+                    "-key", "%s/key.pem" % self.path]
         if self_sign:
             req_args.extend(["-x509", "-out", "%s/cert.pem" % self.path])
         else:
@@ -124,16 +125,15 @@ class CA(object):
                 open("kxd-ca/index.txt", "w")
                 open("kxd-ca/serial", "w").write("1000\n")
                 subprocess.check_output(
-                        ["openssl", "req", "-new", "-x509", "-batch",
-                            "-config", OPENSSL_CONF,
-                            "-subj",
-                              "/commonName=*" +
-                              "/organizationalUnitName=kxd-tests-ca:%s@%s" % (
-                                  os.getlogin(), platform.node()),
-                            "-extensions", "v3_ca", "-nodes",
-                            "-keyout", "cakey.pem",
-                            "-out", "cacert.pem"],
-                        stderr=subprocess.STDOUT)
+                    ["openssl", "req", "-new", "-x509", "-batch",
+                     "-config", OPENSSL_CONF,
+                     "-subj", ("/commonName=*" +
+                               "/organizationalUnitName=kxd-tests-ca:%s@%s" % (
+                                   os.getlogin(), platform.node())),
+                     "-extensions", "v3_ca", "-nodes",
+                     "-keyout", "cakey.pem",
+                     "-out", "cacert.pem"],
+                    stderr=subprocess.STDOUT)
         except subprocess.CalledProcessError as err:
             print "openssl call failed, output: %r" % err.output
             raise
@@ -142,11 +142,10 @@ class CA(object):
         try:
             with pushd(self.path):
                 subprocess.check_output(
-                        ["openssl", "ca", "-batch", "-config", OPENSSL_CONF,
-                            "-keyfile", "cakey.pem", "-cert", "cacert.pem",
-                            "-in", csr,
-                            "-out", "%s.pem" % os.path.splitext(csr)[0]],
-                        stderr=subprocess.STDOUT)
+                    ["openssl", "ca", "-batch", "-config", OPENSSL_CONF,
+                     "-keyfile", "cakey.pem", "-cert", "cacert.pem",
+                     "-in", csr, "-out", "%s.pem" % os.path.splitext(csr)[0]],
+                    stderr=subprocess.STDOUT)
         except subprocess.CalledProcessError as err:
             print "openssl call failed, output: %r" % err.output
             raise
@@ -266,8 +265,8 @@ class Simple(TestCase):
 
         # Normal successful case.
         self.server.new_key("k1",
-                allowed_clients=[self.client.cert()],
-                allowed_hosts=["localhost"])
+                            allowed_clients=[self.client.cert()],
+                            allowed_hosts=["localhost"])
         key = self.client.call(self.server.cert_path(), "kxd://localhost/k1")
         self.assertEquals(key, self.server.keys["k1"])
 
@@ -277,14 +276,14 @@ class Simple(TestCase):
         # No certificates allowed -> 403.
         self.server.new_key("k3", allowed_hosts=["localhost"])
         self.assertClientFails("kxd://localhost/k3",
-                "403 Forbidden.*No allowed certificate found")
+                               "403 Forbidden.*No allowed certificate found")
 
         # Host not allowed -> 403.
         self.server.new_key("k4",
-                allowed_clients=[self.client.cert()],
-                allowed_hosts=[])
+                            allowed_clients=[self.client.cert()],
+                            allowed_hosts=[])
         self.assertClientFails("kxd://localhost/k4",
-                "403 Forbidden.*Host not allowed")
+                               "403 Forbidden.*Host not allowed")
 
         # Nothing allowed -> 403.
         # We don't restrict the reason of failure, that's not defined in this
@@ -296,8 +295,8 @@ class Simple(TestCase):
         # We tell the client to expect the server certificate to be the client
         # one, which is never going to work.
         self.assertClientFails("kxd://localhost/k1",
-                "certificate signed by unknown authority",
-                cert_path=self.client.cert_path())
+                               "certificate signed by unknown authority",
+                               cert_path=self.client.cert_path())
 
 
 class Multiples(TestCase):
@@ -309,8 +308,9 @@ class Multiples(TestCase):
 
     def test_two_clients(self):
         self.server.new_key("k1",
-                allowed_clients=[self.client.cert(), self.client2.cert()],
-                allowed_hosts=["localhost"])
+                            allowed_clients=[
+                                self.client.cert(), self.client2.cert()],
+                            allowed_hosts=["localhost"])
         key = self.client.call(self.server.cert_path(), "kxd://localhost/k1")
         self.assertEquals(key, self.server.keys["k1"])
 
@@ -319,29 +319,30 @@ class Multiples(TestCase):
 
         # Only one client allowed.
         self.server.new_key("k2",
-                allowed_clients=[self.client.cert()],
-                allowed_hosts=["localhost"])
+                            allowed_clients=[self.client.cert()],
+                            allowed_hosts=["localhost"])
         key = self.client.call(self.server.cert_path(), "kxd://localhost/k2")
         self.assertEquals(key, self.server.keys["k2"])
 
         self.assertClientFails("kxd://localhost/k2",
-                "403 Forbidden.*No allowed certificate found",
-                client=self.client2)
+                               "403 Forbidden.*No allowed certificate found",
+                               client=self.client2)
 
     def test_many_keys(self):
         keys = ["a", "d/e", "a/b/c", "d/"]
         for key in keys:
-            self.server.new_key(key,
-                    allowed_clients=[self.client.cert(), self.client2.cert()],
-                    allowed_hosts=["localhost"])
+            self.server.new_key(
+                key,
+                allowed_clients=[self.client.cert(), self.client2.cert()],
+                allowed_hosts=["localhost"])
 
         for key in keys:
             data = self.client.call(self.server.cert_path(),
-                    "kxd://localhost/%s" % key)
+                                    "kxd://localhost/%s" % key)
             self.assertEquals(data, self.server.keys[key])
 
             data = self.client2.call(self.server.cert_path(),
-                    "kxd://localhost/%s" % key)
+                                     "kxd://localhost/%s" % key)
             self.assertEquals(data, self.server.keys[key])
 
         self.assertClientFails("kxd://localhost/a/b", "404 Not Found")
@@ -385,8 +386,8 @@ class TrickyRequests(TestCase):
 
         # Requests with '..'.
         conn = httplib.HTTPSConnection("localhost", 19840,
-                key_file=self.client.key_path(),
-                cert_file=self.client.cert_path())
+                                       key_file=self.client.key_path(),
+                                       cert_file=self.client.cert_path())
         conn.request("GET", "/v1/a/../b")
         response = conn.getresponse()
 
@@ -397,15 +398,15 @@ class TrickyRequests(TestCase):
     def test_server_cert(self):
         rawsock = socket.create_connection(("localhost", 19840))
         sock = ssl.wrap_socket(rawsock,
-                keyfile=self.client.key_path(),
-                certfile=self.client.cert_path())
+                               keyfile=self.client.key_path(),
+                               certfile=self.client.cert_path())
 
         # We don't check the cipher itself, as it depends on the environment,
         # but we should be using > 128 bit secrets.
         self.assertTrue(sock.cipher()[2] > 128)
 
         server_cert = ssl.DER_cert_to_PEM_cert(
-                sock.getpeercert(binary_form=True))
+            sock.getpeercert(binary_form=True))
         self.assertEquals(server_cert, self.server.cert())
 
 
@@ -414,8 +415,8 @@ class BrokenServerConfig(TestCase):
 
     def test_broken_client_certs(self):
         self.server.new_key("k1",
-                allowed_clients=[self.client.cert()],
-                allowed_hosts=["localhost"])
+                            allowed_clients=[self.client.cert()],
+                            allowed_hosts=["localhost"])
 
         # Corrupt the client certificate.
         cfd = open(self.server.path + "/data/k1/allowed_clients", "r+")
@@ -424,13 +425,14 @@ class BrokenServerConfig(TestCase):
         cfd.write('+/+BROKEN+/+')
         cfd.close()
 
-        self.assertClientFails("kxd://localhost/k1",
-                "500 Internal Server Error.*Error loading certs")
+        self.assertClientFails(
+            "kxd://localhost/k1",
+            "500 Internal Server Error.*Error loading certs")
 
     def test_missing_key(self):
         self.server.new_key("k1",
-                allowed_clients=[self.client.cert()],
-                allowed_hosts=["localhost"])
+                            allowed_clients=[self.client.cert()],
+                            allowed_hosts=["localhost"])
 
         os.unlink(self.server.path + "/data/k1/key")
         self.assertClientFails("kxd://localhost/k1", "404 Not Found")
@@ -444,7 +446,7 @@ class Delegation(TestCase):
         pass
 
     def prepare(self, server_self_sign=True, client_self_sign=True,
-            ca_sign_server=None, ca_sign_client=None):
+                ca_sign_server=None, ca_sign_client=None):
         self.server = ServerConfig(self_sign=server_self_sign)
         self.client = ClientConfig(self_sign=client_self_sign)
 
@@ -465,8 +467,8 @@ class Delegation(TestCase):
         self.prepare(server_self_sign=False)
 
         self.server.new_key("k1",
-                allowed_clients=[self.client.cert()],
-                allowed_hosts=["localhost"])
+                            allowed_clients=[self.client.cert()],
+                            allowed_hosts=["localhost"])
 
         # Successful request.
         key = self.client.call(self.ca.cert_path(), "kxd://localhost/k1")
@@ -475,48 +477,48 @@ class Delegation(TestCase):
         # The server is signed by the CA, but the CA is unknown to the client,
         # so it can't validate it, even if it knows the server directly.
         self.assertClientFails("kxd://localhost/k1",
-                "certificate signed by unknown authority",
-                cert_path=self.server.cert_path())
+                               "certificate signed by unknown authority",
+                               cert_path=self.server.cert_path())
 
         # Same as above, but give the wrong CA.
         ca2 = CA()
         self.assertClientFails("kxd://localhost/k1",
-                "certificate signed by unknown authority",
-                cert_path=ca2.cert_path())
+                               "certificate signed by unknown authority",
+                               cert_path=ca2.cert_path())
 
     def test_client_delegated(self):
         self.prepare(client_self_sign=False)
 
         # Successful request.
         self.server.new_key("k1",
-                allowed_clients=[self.ca.cert()],
-                allowed_hosts=["localhost"])
+                            allowed_clients=[self.ca.cert()],
+                            allowed_hosts=["localhost"])
         key = self.client.call(self.server.cert_path(), "kxd://localhost/k1")
         self.assertEquals(key, self.server.keys["k1"])
 
         # The CA signing the client is unknown to the server.
         ca2 = CA()
         self.server.new_key("k2",
-                allowed_clients=[ca2.cert()],
-                allowed_hosts=["localhost"])
+                            allowed_clients=[ca2.cert()],
+                            allowed_hosts=["localhost"])
         self.assertClientFails("kxd://localhost/k2",
-                "403 Forbidden.*No allowed certificate found",
-                cert_path=self.server.cert_path())
+                               "403 Forbidden.*No allowed certificate found",
+                               cert_path=self.server.cert_path())
 
         # The client is signed by the CA, but the CA is unknown to the server,
         # so it can't validate it, even if it knows the client directly.
         self.server.new_key("k3",
-                allowed_clients=[self.client.cert()],
-                allowed_hosts=["localhost"])
+                            allowed_clients=[self.client.cert()],
+                            allowed_hosts=["localhost"])
         self.assertClientFails("kxd://localhost/k3",
-                "403 Forbidden.*No allowed certificate found",
-                cert_path=self.server.cert_path())
+                               "403 Forbidden.*No allowed certificate found",
+                               cert_path=self.server.cert_path())
 
     def test_both_delegated(self):
         self.prepare(server_self_sign=False, client_self_sign=False)
         self.server.new_key("k1",
-                allowed_clients=[self.ca.cert()],
-                allowed_hosts=["localhost"])
+                            allowed_clients=[self.ca.cert()],
+                            allowed_hosts=["localhost"])
 
         key = self.client.call(self.ca.cert_path(), "kxd://localhost/k1")
         self.assertEquals(key, self.server.keys["k1"])