Add exclusive_storage node parameter
[ganeti-local] / test / ganeti.utils.x509_unittest.py
index 007dca8..d799c01 100755 (executable)
@@ -287,5 +287,89 @@ class TestGenerateSelfSignedX509Cert(unittest.TestCase):
     self.assert_(self._checkCertificate(cert1))
 
 
+class TestCheckNodeCertificate(testutils.GanetiTestCase):
+  def setUp(self):
+    testutils.GanetiTestCase.setUp(self)
+    self.tmpdir = tempfile.mkdtemp()
+
+  def tearDown(self):
+    testutils.GanetiTestCase.tearDown(self)
+    shutil.rmtree(self.tmpdir)
+
+  def testMismatchingKey(self):
+    other_cert = self._TestDataFilename("cert1.pem")
+    node_cert = self._TestDataFilename("cert2.pem")
+
+    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                           utils.ReadFile(other_cert))
+
+    try:
+      utils.CheckNodeCertificate(cert, _noded_cert_file=node_cert)
+    except errors.GenericError, err:
+      self.assertEqual(str(err),
+                       "Given cluster certificate does not match local key")
+    else:
+      self.fail("Exception was not raised")
+
+  def testMatchingKey(self):
+    cert_filename = self._TestDataFilename("cert2.pem")
+
+    # Extract certificate
+    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                           utils.ReadFile(cert_filename))
+    cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                               cert)
+
+    utils.CheckNodeCertificate(cert, _noded_cert_file=cert_filename)
+
+  def testMissingFile(self):
+    cert_path = self._TestDataFilename("cert1.pem")
+    nodecert = utils.PathJoin(self.tmpdir, "does-not-exist")
+
+    utils.CheckNodeCertificate(NotImplemented, _noded_cert_file=nodecert)
+
+    self.assertFalse(os.path.exists(nodecert))
+
+  def testInvalidCertificate(self):
+    tmpfile = utils.PathJoin(self.tmpdir, "cert")
+    utils.WriteFile(tmpfile, data="not a certificate")
+
+    self.assertRaises(errors.X509CertError, utils.CheckNodeCertificate,
+                      NotImplemented, _noded_cert_file=tmpfile)
+
+  def testNoPrivateKey(self):
+    cert = self._TestDataFilename("cert1.pem")
+    self.assertRaises(errors.X509CertError, utils.CheckNodeCertificate,
+                      NotImplemented, _noded_cert_file=cert)
+
+  def testMismatchInNodeCert(self):
+    cert1_path = self._TestDataFilename("cert1.pem")
+    cert2_path = self._TestDataFilename("cert2.pem")
+    tmpfile = utils.PathJoin(self.tmpdir, "cert")
+
+    # Extract certificate
+    cert1 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                            utils.ReadFile(cert1_path))
+    cert1_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
+                                                cert1)
+
+    # Extract mismatching key
+    key2 = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
+                                          utils.ReadFile(cert2_path))
+    key2_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM,
+                                              key2)
+
+    # Write to file
+    utils.WriteFile(tmpfile, data=cert1_pem + key2_pem)
+
+    try:
+      utils.CheckNodeCertificate(cert1, _noded_cert_file=tmpfile)
+    except errors.X509CertError, err:
+      self.assertEqual(err.args,
+                       (tmpfile, "Certificate does not match with private key"))
+    else:
+      self.fail("Exception was not raised")
+
+
 if __name__ == "__main__":
   testutils.GanetiTestProgram()