Revision c50645c0 test/ganeti.utils_unittest.py
b/test/ganeti.utils_unittest.py | ||
---|---|---|
21 | 21 |
|
22 | 22 |
"""Script for unittesting the utils module""" |
23 | 23 |
|
24 |
import distutils.version |
|
25 | 24 |
import errno |
26 | 25 |
import fcntl |
27 | 26 |
import glob |
... | ... | |
32 | 31 |
import signal |
33 | 32 |
import socket |
34 | 33 |
import stat |
35 |
import string |
|
36 | 34 |
import tempfile |
37 | 35 |
import time |
38 | 36 |
import unittest |
39 | 37 |
import warnings |
40 |
import OpenSSL |
|
41 | 38 |
import random |
42 | 39 |
import operator |
43 | 40 |
|
... | ... | |
837 | 834 |
utils.RunInSeparateProcess, _exc) |
838 | 835 |
|
839 | 836 |
|
840 |
class TestGenerateSelfSignedX509Cert(unittest.TestCase): |
|
841 |
def setUp(self): |
|
842 |
self.tmpdir = tempfile.mkdtemp() |
|
843 |
|
|
844 |
def tearDown(self): |
|
845 |
shutil.rmtree(self.tmpdir) |
|
846 |
|
|
847 |
def _checkRsaPrivateKey(self, key): |
|
848 |
lines = key.splitlines() |
|
849 |
return ("-----BEGIN RSA PRIVATE KEY-----" in lines and |
|
850 |
"-----END RSA PRIVATE KEY-----" in lines) |
|
851 |
|
|
852 |
def _checkCertificate(self, cert): |
|
853 |
lines = cert.splitlines() |
|
854 |
return ("-----BEGIN CERTIFICATE-----" in lines and |
|
855 |
"-----END CERTIFICATE-----" in lines) |
|
856 |
|
|
857 |
def test(self): |
|
858 |
for common_name in [None, ".", "Ganeti", "node1.example.com"]: |
|
859 |
(key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300) |
|
860 |
self._checkRsaPrivateKey(key_pem) |
|
861 |
self._checkCertificate(cert_pem) |
|
862 |
|
|
863 |
key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, |
|
864 |
key_pem) |
|
865 |
self.assert_(key.bits() >= 1024) |
|
866 |
self.assertEqual(key.bits(), constants.RSA_KEY_BITS) |
|
867 |
self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA) |
|
868 |
|
|
869 |
x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, |
|
870 |
cert_pem) |
|
871 |
self.failIf(x509.has_expired()) |
|
872 |
self.assertEqual(x509.get_issuer().CN, common_name) |
|
873 |
self.assertEqual(x509.get_subject().CN, common_name) |
|
874 |
self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS) |
|
875 |
|
|
876 |
def testLegacy(self): |
|
877 |
cert1_filename = os.path.join(self.tmpdir, "cert1.pem") |
|
878 |
|
|
879 |
utils.GenerateSelfSignedSslCert(cert1_filename, validity=1) |
|
880 |
|
|
881 |
cert1 = utils.ReadFile(cert1_filename) |
|
882 |
|
|
883 |
self.assert_(self._checkRsaPrivateKey(cert1)) |
|
884 |
self.assert_(self._checkCertificate(cert1)) |
|
885 |
|
|
886 |
|
|
887 | 837 |
class TestValidateServiceName(unittest.TestCase): |
888 | 838 |
def testValid(self): |
889 | 839 |
testnames = [ |
... | ... | |
910 | 860 |
self.assertRaises(errors.OpPrereqError, utils.ValidateServiceName, name) |
911 | 861 |
|
912 | 862 |
|
913 |
class TestParseAsn1Generalizedtime(unittest.TestCase): |
|
914 |
def test(self): |
|
915 |
# UTC |
|
916 |
self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000Z"), 0) |
|
917 |
self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152Z"), |
|
918 |
1266860512) |
|
919 |
self.assertEqual(utils._ParseAsn1Generalizedtime("20380119031407Z"), |
|
920 |
(2**31) - 1) |
|
921 |
|
|
922 |
# With offset |
|
923 |
self.assertEqual(utils._ParseAsn1Generalizedtime("20100222174152+0000"), |
|
924 |
1266860512) |
|
925 |
self.assertEqual(utils._ParseAsn1Generalizedtime("20100223131652+0000"), |
|
926 |
1266931012) |
|
927 |
self.assertEqual(utils._ParseAsn1Generalizedtime("20100223051808-0800"), |
|
928 |
1266931088) |
|
929 |
self.assertEqual(utils._ParseAsn1Generalizedtime("20100224002135+1100"), |
|
930 |
1266931295) |
|
931 |
self.assertEqual(utils._ParseAsn1Generalizedtime("19700101000000-0100"), |
|
932 |
3600) |
|
933 |
|
|
934 |
# Leap seconds are not supported by datetime.datetime |
|
935 |
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, |
|
936 |
"19841231235960+0000") |
|
937 |
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, |
|
938 |
"19920630235960+0000") |
|
939 |
|
|
940 |
# Errors |
|
941 |
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "") |
|
942 |
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, "invalid") |
|
943 |
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, |
|
944 |
"20100222174152") |
|
945 |
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, |
|
946 |
"Mon Feb 22 17:47:02 UTC 2010") |
|
947 |
self.assertRaises(ValueError, utils._ParseAsn1Generalizedtime, |
|
948 |
"2010-02-22 17:42:02") |
|
949 |
|
|
950 |
|
|
951 |
class TestGetX509CertValidity(testutils.GanetiTestCase): |
|
952 |
def setUp(self): |
|
953 |
testutils.GanetiTestCase.setUp(self) |
|
954 |
|
|
955 |
pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__) |
|
956 |
|
|
957 |
# Test whether we have pyOpenSSL 0.7 or above |
|
958 |
self.pyopenssl0_7 = (pyopenssl_version >= "0.7") |
|
959 |
|
|
960 |
if not self.pyopenssl0_7: |
|
961 |
warnings.warn("This test requires pyOpenSSL 0.7 or above to" |
|
962 |
" function correctly") |
|
963 |
|
|
964 |
def _LoadCert(self, name): |
|
965 |
return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, |
|
966 |
self._ReadTestData(name)) |
|
967 |
|
|
968 |
def test(self): |
|
969 |
validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem")) |
|
970 |
if self.pyopenssl0_7: |
|
971 |
self.assertEqual(validity, (1266919967, 1267524767)) |
|
972 |
else: |
|
973 |
self.assertEqual(validity, (None, None)) |
|
974 |
|
|
975 |
|
|
976 |
class TestSignX509Certificate(unittest.TestCase): |
|
977 |
KEY = "My private key!" |
|
978 |
KEY_OTHER = "Another key" |
|
979 |
|
|
980 |
def test(self): |
|
981 |
# Generate certificate valid for 5 minutes |
|
982 |
(_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300) |
|
983 |
|
|
984 |
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, |
|
985 |
cert_pem) |
|
986 |
|
|
987 |
# No signature at all |
|
988 |
self.assertRaises(errors.GenericError, |
|
989 |
utils.LoadSignedX509Certificate, cert_pem, self.KEY) |
|
990 |
|
|
991 |
# Invalid input |
|
992 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
993 |
"", self.KEY) |
|
994 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
995 |
"X-Ganeti-Signature: \n", self.KEY) |
|
996 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
997 |
"X-Ganeti-Sign: $1234$abcdef\n", self.KEY) |
|
998 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
999 |
"X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY) |
|
1000 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
1001 |
"X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY) |
|
1002 |
|
|
1003 |
# Invalid salt |
|
1004 |
for salt in list("-_@$,:;/\\ \t\n"): |
|
1005 |
self.assertRaises(errors.GenericError, utils.SignX509Certificate, |
|
1006 |
cert_pem, self.KEY, "foo%sbar" % salt) |
|
1007 |
|
|
1008 |
for salt in ["HelloWorld", "salt", string.letters, string.digits, |
|
1009 |
utils.GenerateSecret(numbytes=4), |
|
1010 |
utils.GenerateSecret(numbytes=16), |
|
1011 |
"{123:456}".encode("hex")]: |
|
1012 |
signed_pem = utils.SignX509Certificate(cert, self.KEY, salt) |
|
1013 |
|
|
1014 |
self._Check(cert, salt, signed_pem) |
|
1015 |
|
|
1016 |
self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem) |
|
1017 |
self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem) |
|
1018 |
self._Check(cert, salt, (signed_pem + "\n\na few more\n" |
|
1019 |
"lines----\n------ at\nthe end!")) |
|
1020 |
|
|
1021 |
def _Check(self, cert, salt, pem): |
|
1022 |
(cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY) |
|
1023 |
self.assertEqual(salt, salt2) |
|
1024 |
self.assertEqual(cert.digest("sha1"), cert2.digest("sha1")) |
|
1025 |
|
|
1026 |
# Other key |
|
1027 |
self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate, |
|
1028 |
pem, self.KEY_OTHER) |
|
1029 |
|
|
1030 |
|
|
1031 | 863 |
class TestReadLockedPidFile(unittest.TestCase): |
1032 | 864 |
def setUp(self): |
1033 | 865 |
self.tmpdir = tempfile.mkdtemp() |
... | ... | |
1065 | 897 |
self.assertRaises(EnvironmentError, utils.ReadLockedPidFile, path) |
1066 | 898 |
|
1067 | 899 |
|
1068 |
class TestCertVerification(testutils.GanetiTestCase): |
|
1069 |
def setUp(self): |
|
1070 |
testutils.GanetiTestCase.setUp(self) |
|
1071 |
|
|
1072 |
self.tmpdir = tempfile.mkdtemp() |
|
1073 |
|
|
1074 |
def tearDown(self): |
|
1075 |
shutil.rmtree(self.tmpdir) |
|
1076 |
|
|
1077 |
def testVerifyCertificate(self): |
|
1078 |
cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem")) |
|
1079 |
cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, |
|
1080 |
cert_pem) |
|
1081 |
|
|
1082 |
# Not checking return value as this certificate is expired |
|
1083 |
utils.VerifyX509Certificate(cert, 30, 7) |
|
1084 |
|
|
1085 |
|
|
1086 |
class TestVerifyCertificateInner(unittest.TestCase): |
|
1087 |
def test(self): |
|
1088 |
vci = utils._VerifyCertificateInner |
|
1089 |
|
|
1090 |
# Valid |
|
1091 |
self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7), |
|
1092 |
(None, None)) |
|
1093 |
|
|
1094 |
# Not yet valid |
|
1095 |
(errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7) |
|
1096 |
self.assertEqual(errcode, utils.CERT_WARNING) |
|
1097 |
|
|
1098 |
# Expiring soon |
|
1099 |
(errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7) |
|
1100 |
self.assertEqual(errcode, utils.CERT_ERROR) |
|
1101 |
|
|
1102 |
(errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1) |
|
1103 |
self.assertEqual(errcode, utils.CERT_WARNING) |
|
1104 |
|
|
1105 |
(errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7) |
|
1106 |
self.assertEqual(errcode, None) |
|
1107 |
|
|
1108 |
# Expired |
|
1109 |
(errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7) |
|
1110 |
self.assertEqual(errcode, utils.CERT_ERROR) |
|
1111 |
|
|
1112 |
(errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7) |
|
1113 |
self.assertEqual(errcode, utils.CERT_ERROR) |
|
1114 |
|
|
1115 |
(errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7) |
|
1116 |
self.assertEqual(errcode, utils.CERT_ERROR) |
|
1117 |
|
|
1118 |
(errcode, msg) = vci(True, None, None, 1266939600, 30, 7) |
|
1119 |
self.assertEqual(errcode, utils.CERT_ERROR) |
|
1120 |
|
|
1121 |
|
|
1122 | 900 |
class TestFindMatch(unittest.TestCase): |
1123 | 901 |
def test(self): |
1124 | 902 |
data = { |
Also available in: Unified diff