4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.utils.x509"""
30 import distutils.version
33 from ganeti import constants
34 from ganeti import utils
35 from ganeti import compat
36 from ganeti import errors
41 class TestParseAsn1Generalizedtime(unittest.TestCase):
43 self._Parse = utils.x509._ParseAsn1Generalizedtime
47 self.assertEqual(self._Parse("19700101000000Z"), 0)
48 self.assertEqual(self._Parse("20100222174152Z"), 1266860512)
49 self.assertEqual(self._Parse("20380119031407Z"), (2**31) - 1)
52 self.assertEqual(self._Parse("20100222174152+0000"), 1266860512)
53 self.assertEqual(self._Parse("20100223131652+0000"), 1266931012)
54 self.assertEqual(self._Parse("20100223051808-0800"), 1266931088)
55 self.assertEqual(self._Parse("20100224002135+1100"), 1266931295)
56 self.assertEqual(self._Parse("19700101000000-0100"), 3600)
58 # Leap seconds are not supported by datetime.datetime
59 self.assertRaises(ValueError, self._Parse, "19841231235960+0000")
60 self.assertRaises(ValueError, self._Parse, "19920630235960+0000")
63 self.assertRaises(ValueError, self._Parse, "")
64 self.assertRaises(ValueError, self._Parse, "invalid")
65 self.assertRaises(ValueError, self._Parse, "20100222174152")
66 self.assertRaises(ValueError, self._Parse, "Mon Feb 22 17:47:02 UTC 2010")
67 self.assertRaises(ValueError, self._Parse, "2010-02-22 17:42:02")
70 class TestGetX509CertValidity(testutils.GanetiTestCase):
72 testutils.GanetiTestCase.setUp(self)
74 pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
76 # Test whether we have pyOpenSSL 0.7 or above
77 self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
79 if not self.pyopenssl0_7:
80 warnings.warn("This test requires pyOpenSSL 0.7 or above to"
81 " function correctly")
83 def _LoadCert(self, name):
84 return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
85 self._ReadTestData(name))
88 validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
90 self.assertEqual(validity, (1266919967, 1267524767))
92 self.assertEqual(validity, (None, None))
95 class TestSignX509Certificate(unittest.TestCase):
96 KEY = "My private key!"
97 KEY_OTHER = "Another key"
100 # Generate certificate valid for 5 minutes
101 (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
103 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
106 # No signature at all
107 self.assertRaises(errors.GenericError,
108 utils.LoadSignedX509Certificate, cert_pem, self.KEY)
111 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
113 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
114 "X-Ganeti-Signature: \n", self.KEY)
115 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
116 "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
117 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
118 "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
119 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
120 "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
123 for salt in list("-_@$,:;/\\ \t\n"):
124 self.assertRaises(errors.GenericError, utils.SignX509Certificate,
125 cert_pem, self.KEY, "foo%sbar" % salt)
127 for salt in ["HelloWorld", "salt", string.letters, string.digits,
128 utils.GenerateSecret(numbytes=4),
129 utils.GenerateSecret(numbytes=16),
130 "{123:456}".encode("hex")]:
131 signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
133 self._Check(cert, salt, signed_pem)
135 self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
136 self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
137 self._Check(cert, salt, (signed_pem + "\n\na few more\n"
138 "lines----\n------ at\nthe end!"))
140 def _Check(self, cert, salt, pem):
141 (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
142 self.assertEqual(salt, salt2)
143 self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
146 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
150 class TestCertVerification(testutils.GanetiTestCase):
152 testutils.GanetiTestCase.setUp(self)
154 self.tmpdir = tempfile.mkdtemp()
157 shutil.rmtree(self.tmpdir)
159 def testVerifyCertificate(self):
160 cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
161 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
164 # Not checking return value as this certificate is expired
165 utils.VerifyX509Certificate(cert, 30, 7)
168 class TestVerifyCertificateInner(unittest.TestCase):
170 vci = utils.x509._VerifyCertificateInner
173 self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
177 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
178 self.assertEqual(errcode, utils.CERT_WARNING)
181 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
182 self.assertEqual(errcode, utils.CERT_ERROR)
184 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
185 self.assertEqual(errcode, utils.CERT_WARNING)
187 (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
188 self.assertEqual(errcode, None)
191 (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
192 self.assertEqual(errcode, utils.CERT_ERROR)
194 (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
195 self.assertEqual(errcode, utils.CERT_ERROR)
197 (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
198 self.assertEqual(errcode, utils.CERT_ERROR)
200 (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
201 self.assertEqual(errcode, utils.CERT_ERROR)
204 class TestGenerateSelfSignedX509Cert(unittest.TestCase):
206 self.tmpdir = tempfile.mkdtemp()
209 shutil.rmtree(self.tmpdir)
211 def _checkRsaPrivateKey(self, key):
212 lines = key.splitlines()
213 return (("-----BEGIN RSA PRIVATE KEY-----" in lines and
214 "-----END RSA PRIVATE KEY-----" in lines) or
215 ("-----BEGIN PRIVATE KEY-----" in lines and
216 "-----END PRIVATE KEY-----" in lines))
218 def _checkCertificate(self, cert):
219 lines = cert.splitlines()
220 return ("-----BEGIN CERTIFICATE-----" in lines and
221 "-----END CERTIFICATE-----" in lines)
224 for common_name in [None, ".", "Ganeti", "node1.example.com"]:
225 (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
226 self._checkRsaPrivateKey(key_pem)
227 self._checkCertificate(cert_pem)
229 key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
231 self.assert_(key.bits() >= 1024)
232 self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
233 self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
235 x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
237 self.failIf(x509.has_expired())
238 self.assertEqual(x509.get_issuer().CN, common_name)
239 self.assertEqual(x509.get_subject().CN, common_name)
240 self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
242 def testLegacy(self):
243 cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
245 utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
247 cert1 = utils.ReadFile(cert1_filename)
249 self.assert_(self._checkRsaPrivateKey(cert1))
250 self.assert_(self._checkCertificate(cert1))
253 if __name__ == "__main__":
254 testutils.GanetiTestProgram()