Revision c50645c0 test/ganeti.utils_unittest.py

b/test/ganeti.utils_unittest.py
21 21

  
22 22
"""Script for unittesting the utils module"""
23 23

  
24
import distutils.version
25 24
import errno
26 25
import fcntl
27 26
import glob
......
32 31
import signal
33 32
import socket
34 33
import stat
35
import string
36 34
import tempfile
37 35
import time
38 36
import unittest
39 37
import warnings
40
import OpenSSL
41 38
import random
42 39
import operator
43 40

  
......
837 834
                      utils.RunInSeparateProcess, _exc)
838 835

  
839 836

  
840
class TestGenerateSelfSignedX509Cert(unittest.TestCase):
841
  def setUp(self):
842
    self.tmpdir = tempfile.mkdtemp()
843

  
844
  def tearDown(self):
845
    shutil.rmtree(self.tmpdir)
846

  
847
  def _checkRsaPrivateKey(self, key):
848
    lines = key.splitlines()
849
    return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
850
            "-----END RSA PRIVATE KEY-----" in lines)
851

  
852
  def _checkCertificate(self, cert):
853
    lines = cert.splitlines()
854
    return ("-----BEGIN CERTIFICATE-----" in lines and
855
            "-----END CERTIFICATE-----" in lines)
856

  
857
  def test(self):
858
    for common_name in [None, ".", "Ganeti", "node1.example.com"]:
859
      (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
860
      self._checkRsaPrivateKey(key_pem)
861
      self._checkCertificate(cert_pem)
862

  
863
      key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
864
                                           key_pem)
865
      self.assert_(key.bits() >= 1024)
866
      self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
867
      self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
868

  
869
      x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
870
                                             cert_pem)
871
      self.failIf(x509.has_expired())
872
      self.assertEqual(x509.get_issuer().CN, common_name)
873
      self.assertEqual(x509.get_subject().CN, common_name)
874
      self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
875

  
876
  def testLegacy(self):
877
    cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
878

  
879
    utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
880

  
881
    cert1 = utils.ReadFile(cert1_filename)
882

  
883
    self.assert_(self._checkRsaPrivateKey(cert1))
884
    self.assert_(self._checkCertificate(cert1))
885

  
886

  
887 837
class TestValidateServiceName(unittest.TestCase):
888 838
  def testValid(self):
889 839
    testnames = [
......
910 860
      self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name)
911 861

  
912 862

  
913
class TestParseAsn1Generalizedtime(unittest.TestCase):
914
  def test(self):
915
    # UTC
916
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0)
917
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"),
918
                     1266860512)
919
    self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"),
920
                     (2**31) - 1)
921

  
922
    # With offset
923
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"),
924
                     1266860512)
925
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"),
926
                     1266931012)
927
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"),
928
                     1266931088)
929
    self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"),
930
                     1266931295)
931
    self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"),
932
                     3600)
933

  
934
    # Leap seconds are not supported by datetime.datetime
935
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
936
                      "19841231235960+0000")
937
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
938
                      "19920630235960+0000")
939

  
940
    # Errors
941
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "")
942
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid")
943
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
944
                      "20100222174152")
945
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
946
                      "Mon Feb 22 17:47:02 UTC 2010")
947
    self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime,
948
                      "2010-02-22 17:42:02")
949

  
950

  
951
class TestGetX509CertValidity(testutils.GanetiTestCase):
952
  def setUp(self):
953
    testutils.GanetiTestCase.setUp(self)
954

  
955
    pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
956

  
957
    # Test whether we have pyOpenSSL 0.7 or above
958
    self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
959

  
960
    if not self.pyopenssl0_7:
961
      warnings.warn("This test requires pyOpenSSL 0.7 or above to"
962
                    " function correctly")
963

  
964
  def _LoadCert(self, name):
965
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
966
                                           self._ReadTestData(name))
967

  
968
  def test(self):
969
    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
970
    if self.pyopenssl0_7:
971
      self.assertEqual(validity, (1266919967, 1267524767))
972
    else:
973
      self.assertEqual(validity, (None, None))
974

  
975

  
976
class TestSignX509Certificate(unittest.TestCase):
977
  KEY = "My private key!"
978
  KEY_OTHER = "Another key"
979

  
980
  def test(self):
981
    # Generate certificate valid for 5 minutes
982
    (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
983

  
984
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
985
                                           cert_pem)
986

  
987
    # No signature at all
988
    self.assertRaises(errors.GenericError,
989
                      utils.LoadSignedX509Certificate, cert_pem, self.KEY)
990

  
991
    # Invalid input
992
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
993
                      "", self.KEY)
994
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
995
                      "X-Ganeti-Signature: \n", self.KEY)
996
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
997
                      "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
998
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
999
                      "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
1000
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1001
                      "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
1002

  
1003
    # Invalid salt
1004
    for salt in list("-_@$,:;/\\ \t\n"):
1005
      self.assertRaises(errors.GenericError, utils.SignX509Certificate,
1006
                        cert_pem, self.KEY, "foo%sbar" % salt)
1007

  
1008
    for salt in ["HelloWorld", "salt", string.letters, string.digits,
1009
                 utils.GenerateSecret(numbytes=4),
1010
                 utils.GenerateSecret(numbytes=16),
1011
                 "{123:456}".encode("hex")]:
1012
      signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
1013

  
1014
      self._Check(cert, salt, signed_pem)
1015

  
1016
      self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
1017
      self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
1018
      self._Check(cert, salt, (signed_pem + "\n\na few more\n"
1019
                               "lines----\n------ at\nthe end!"))
1020

  
1021
  def _Check(self, cert, salt, pem):
1022
    (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
1023
    self.assertEqual(salt, salt2)
1024
    self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
1025

  
1026
    # Other key
1027
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
1028
                      pem, self.KEY_OTHER)
1029

  
1030

  
1031 863
class TestReadLockedPidFile(unittest.TestCase):
1032 864
  def setUp(self):
1033 865
    self.tmpdir = tempfile.mkdtemp()
......
1065 897
    self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path)
1066 898

  
1067 899

  
1068
class TestCertVerification(testutils.GanetiTestCase):
1069
  def setUp(self):
1070
    testutils.GanetiTestCase.setUp(self)
1071

  
1072
    self.tmpdir = tempfile.mkdtemp()
1073

  
1074
  def tearDown(self):
1075
    shutil.rmtree(self.tmpdir)
1076

  
1077
  def testVerifyCertificate(self):
1078
    cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
1079
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1080
                                           cert_pem)
1081

  
1082
    # Not checking return value as this certificate is expired
1083
    utils.VerifyX509Certificate(cert, 30, 7)
1084

  
1085

  
1086
class TestVerifyCertificateInner(unittest.TestCase):
1087
  def test(self):
1088
    vci = utils._VerifyCertificateInner
1089

  
1090
    # Valid
1091
    self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
1092
                     (None, None))
1093

  
1094
    # Not yet valid
1095
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
1096
    self.assertEqual(errcode, utils.CERT_WARNING)
1097

  
1098
    # Expiring soon
1099
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
1100
    self.assertEqual(errcode, utils.CERT_ERROR)
1101

  
1102
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
1103
    self.assertEqual(errcode, utils.CERT_WARNING)
1104

  
1105
    (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
1106
    self.assertEqual(errcode, None)
1107

  
1108
    # Expired
1109
    (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
1110
    self.assertEqual(errcode, utils.CERT_ERROR)
1111

  
1112
    (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
1113
    self.assertEqual(errcode, utils.CERT_ERROR)
1114

  
1115
    (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
1116
    self.assertEqual(errcode, utils.CERT_ERROR)
1117

  
1118
    (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
1119
    self.assertEqual(errcode, utils.CERT_ERROR)
1120

  
1121

  
1122 900
class TestFindMatch(unittest.TestCase):
1123 901
  def test(self):
1124 902
    data = {

Also available in: Unified diff