4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.utils.x509"""
30 import distutils.version
33 from ganeti import constants
34 from ganeti import utils
35 from ganeti import compat
36 from ganeti import errors
41 class TestParseAsn1Generalizedtime(unittest.TestCase):
43 self._Parse = utils.x509._ParseAsn1Generalizedtime
47 self.assertEqual(self._Parse("19700101000000Z"), 0)
48 self.assertEqual(self._Parse("20100222174152Z"), 1266860512)
49 self.assertEqual(self._Parse("20380119031407Z"), (2**31) - 1)
52 self.assertEqual(self._Parse("20100222174152+0000"), 1266860512)
53 self.assertEqual(self._Parse("20100223131652+0000"), 1266931012)
54 self.assertEqual(self._Parse("20100223051808-0800"), 1266931088)
55 self.assertEqual(self._Parse("20100224002135+1100"), 1266931295)
56 self.assertEqual(self._Parse("19700101000000-0100"), 3600)
58 # Leap seconds are not supported by datetime.datetime
59 self.assertRaises(ValueError, self._Parse, "19841231235960+0000")
60 self.assertRaises(ValueError, self._Parse, "19920630235960+0000")
63 self.assertRaises(ValueError, self._Parse, "")
64 self.assertRaises(ValueError, self._Parse, "invalid")
65 self.assertRaises(ValueError, self._Parse, "20100222174152")
66 self.assertRaises(ValueError, self._Parse, "Mon Feb 22 17:47:02 UTC 2010")
67 self.assertRaises(ValueError, self._Parse, "2010-02-22 17:42:02")
70 class TestGetX509CertValidity(testutils.GanetiTestCase):
72 testutils.GanetiTestCase.setUp(self)
74 pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
76 # Test whether we have pyOpenSSL 0.7 or above
77 self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
79 if not self.pyopenssl0_7:
80 warnings.warn("This test requires pyOpenSSL 0.7 or above to"
81 " function correctly")
83 def _LoadCert(self, name):
84 return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
85 self._ReadTestData(name))
88 validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
90 self.assertEqual(validity, (1266919967, 1267524767))
92 self.assertEqual(validity, (None, None))
95 class TestSignX509Certificate(unittest.TestCase):
96 KEY = "My private key!"
97 KEY_OTHER = "Another key"
100 # Generate certificate valid for 5 minutes
101 (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
103 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
106 # No signature at all
107 self.assertRaises(errors.GenericError,
108 utils.LoadSignedX509Certificate, cert_pem, self.KEY)
111 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
113 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
114 "X-Ganeti-Signature: \n", self.KEY)
115 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
116 "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
117 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
118 "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
119 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
120 "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
123 for salt in list("-_@$,:;/\\ \t\n"):
124 self.assertRaises(errors.GenericError, utils.SignX509Certificate,
125 cert_pem, self.KEY, "foo%sbar" % salt)
127 for salt in ["HelloWorld", "salt", string.letters, string.digits,
128 utils.GenerateSecret(numbytes=4),
129 utils.GenerateSecret(numbytes=16),
130 "{123:456}".encode("hex")]:
131 signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
133 self._Check(cert, salt, signed_pem)
135 self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
136 self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
137 self._Check(cert, salt, (signed_pem + "\n\na few more\n"
138 "lines----\n------ at\nthe end!"))
140 def _Check(self, cert, salt, pem):
141 (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
142 self.assertEqual(salt, salt2)
143 self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
146 self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
150 class TestCertVerification(testutils.GanetiTestCase):
152 testutils.GanetiTestCase.setUp(self)
154 self.tmpdir = tempfile.mkdtemp()
157 shutil.rmtree(self.tmpdir)
159 def testVerifyCertificate(self):
160 cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
161 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
164 # Not checking return value as this certificate is expired
165 utils.VerifyX509Certificate(cert, 30, 7)
168 def _GenCert(key, before, validity):
169 # Urgh... mostly copied from x509.py :(
171 # Create self-signed certificate
172 cert = OpenSSL.crypto.X509()
173 cert.set_serial_number(1)
175 cert.gmtime_adj_notBefore(int(before))
176 cert.gmtime_adj_notAfter(validity)
177 cert.set_issuer(cert.get_subject())
179 cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
182 def testClockSkew(self):
183 SKEW = constants.NODE_MAX_CLOCK_SKEW
184 # Create private and public key
185 key = OpenSSL.crypto.PKey()
186 key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
189 # skew small enough, accepting cert; note that this is a timed
190 # test, and could fail if the machine is so loaded that the next
191 # few lines take more than NODE_MAX_CLOCK_SKEW / 2
192 for before in [-1, 0, SKEW / 4, SKEW / 2]:
193 cert = self._GenCert(key, before, validity)
194 result = utils.VerifyX509Certificate(cert, 1, 2)
195 self.assertEqual(result, (None, None))
197 # skew too great, not accepting certs
198 for before in [SKEW * 2, SKEW * 10]:
199 cert = self._GenCert(key, before, validity)
200 (status, msg) = utils.VerifyX509Certificate(cert, 1, 2)
201 self.assertEqual(status, utils.CERT_WARNING)
202 self.assertTrue(msg.startswith("Certificate not yet valid"))
205 class TestVerifyCertificateInner(unittest.TestCase):
207 vci = utils.x509._VerifyCertificateInner
210 self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
214 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
215 self.assertEqual(errcode, utils.CERT_WARNING)
218 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
219 self.assertEqual(errcode, utils.CERT_ERROR)
221 (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
222 self.assertEqual(errcode, utils.CERT_WARNING)
224 (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
225 self.assertEqual(errcode, None)
228 (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
229 self.assertEqual(errcode, utils.CERT_ERROR)
231 (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
232 self.assertEqual(errcode, utils.CERT_ERROR)
234 (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
235 self.assertEqual(errcode, utils.CERT_ERROR)
237 (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
238 self.assertEqual(errcode, utils.CERT_ERROR)
241 class TestGenerateSelfSignedX509Cert(unittest.TestCase):
243 self.tmpdir = tempfile.mkdtemp()
246 shutil.rmtree(self.tmpdir)
248 def _checkRsaPrivateKey(self, key):
249 lines = key.splitlines()
250 return (("-----BEGIN RSA PRIVATE KEY-----" in lines and
251 "-----END RSA PRIVATE KEY-----" in lines) or
252 ("-----BEGIN PRIVATE KEY-----" in lines and
253 "-----END PRIVATE KEY-----" in lines))
255 def _checkCertificate(self, cert):
256 lines = cert.splitlines()
257 return ("-----BEGIN CERTIFICATE-----" in lines and
258 "-----END CERTIFICATE-----" in lines)
261 for common_name in [None, ".", "Ganeti", "node1.example.com"]:
262 (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
263 self._checkRsaPrivateKey(key_pem)
264 self._checkCertificate(cert_pem)
266 key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
268 self.assert_(key.bits() >= 1024)
269 self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
270 self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
272 x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
274 self.failIf(x509.has_expired())
275 self.assertEqual(x509.get_issuer().CN, common_name)
276 self.assertEqual(x509.get_subject().CN, common_name)
277 self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
279 def testLegacy(self):
280 cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
282 utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
284 cert1 = utils.ReadFile(cert1_filename)
286 self.assert_(self._checkRsaPrivateKey(cert1))
287 self.assert_(self._checkCertificate(cert1))
290 if __name__ == "__main__":
291 testutils.GanetiTestProgram()