gnt-network(8): Re-add editor settings and re-wrap
[ganeti-local] / test / ganeti.utils.x509_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.utils.x509"""
23
24 import os
25 import tempfile
26 import unittest
27 import shutil
28 import time
29 import OpenSSL
30 import distutils.version
31 import string
32
33 from ganeti import constants
34 from ganeti import utils
35 from ganeti import compat
36 from ganeti import errors
37
38 import testutils
39
40
41 class TestParseAsn1Generalizedtime(unittest.TestCase):
42   def setUp(self):
43     self._Parse = utils.x509._ParseAsn1Generalizedtime
44
45   def test(self):
46     # UTC
47     self.assertEqual(self._Parse("19700101000000Z"), 0)
48     self.assertEqual(self._Parse("20100222174152Z"), 1266860512)
49     self.assertEqual(self._Parse("20380119031407Z"), (2**31) - 1)
50
51     # With offset
52     self.assertEqual(self._Parse("20100222174152+0000"), 1266860512)
53     self.assertEqual(self._Parse("20100223131652+0000"), 1266931012)
54     self.assertEqual(self._Parse("20100223051808-0800"), 1266931088)
55     self.assertEqual(self._Parse("20100224002135+1100"), 1266931295)
56     self.assertEqual(self._Parse("19700101000000-0100"), 3600)
57
58     # Leap seconds are not supported by datetime.datetime
59     self.assertRaises(ValueError, self._Parse, "19841231235960+0000")
60     self.assertRaises(ValueError, self._Parse, "19920630235960+0000")
61
62     # Errors
63     self.assertRaises(ValueError, self._Parse, "")
64     self.assertRaises(ValueError, self._Parse, "invalid")
65     self.assertRaises(ValueError, self._Parse, "20100222174152")
66     self.assertRaises(ValueError, self._Parse, "Mon Feb 22 17:47:02 UTC 2010")
67     self.assertRaises(ValueError, self._Parse, "2010-02-22 17:42:02")
68
69
70 class TestGetX509CertValidity(testutils.GanetiTestCase):
71   def setUp(self):
72     testutils.GanetiTestCase.setUp(self)
73
74     pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
75
76     # Test whether we have pyOpenSSL 0.7 or above
77     self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
78
79     if not self.pyopenssl0_7:
80       warnings.warn("This test requires pyOpenSSL 0.7 or above to"
81                     " function correctly")
82
83   def _LoadCert(self, name):
84     return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
85                                            self._ReadTestData(name))
86
87   def test(self):
88     validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
89     if self.pyopenssl0_7:
90       self.assertEqual(validity, (1266919967, 1267524767))
91     else:
92       self.assertEqual(validity, (None, None))
93
94
95 class TestSignX509Certificate(unittest.TestCase):
96   KEY = "My private key!"
97   KEY_OTHER = "Another key"
98
99   def test(self):
100     # Generate certificate valid for 5 minutes
101     (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
102
103     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
104                                            cert_pem)
105
106     # No signature at all
107     self.assertRaises(errors.GenericError,
108                       utils.LoadSignedX509Certificate, cert_pem, self.KEY)
109
110     # Invalid input
111     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
112                       "", self.KEY)
113     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
114                       "X-Ganeti-Signature: \n", self.KEY)
115     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
116                       "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
117     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
118                       "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
119     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
120                       "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
121
122     # Invalid salt
123     for salt in list("-_@$,:;/\\ \t\n"):
124       self.assertRaises(errors.GenericError, utils.SignX509Certificate,
125                         cert_pem, self.KEY, "foo%sbar" % salt)
126
127     for salt in ["HelloWorld", "salt", string.letters, string.digits,
128                  utils.GenerateSecret(numbytes=4),
129                  utils.GenerateSecret(numbytes=16),
130                  "{123:456}".encode("hex")]:
131       signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
132
133       self._Check(cert, salt, signed_pem)
134
135       self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
136       self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
137       self._Check(cert, salt, (signed_pem + "\n\na few more\n"
138                                "lines----\n------ at\nthe end!"))
139
140   def _Check(self, cert, salt, pem):
141     (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
142     self.assertEqual(salt, salt2)
143     self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
144
145     # Other key
146     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
147                       pem, self.KEY_OTHER)
148
149
150 class TestCertVerification(testutils.GanetiTestCase):
151   def setUp(self):
152     testutils.GanetiTestCase.setUp(self)
153
154     self.tmpdir = tempfile.mkdtemp()
155
156   def tearDown(self):
157     shutil.rmtree(self.tmpdir)
158
159   def testVerifyCertificate(self):
160     cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
161     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
162                                            cert_pem)
163
164     # Not checking return value as this certificate is expired
165     utils.VerifyX509Certificate(cert, 30, 7)
166
167   @staticmethod
168   def _GenCert(key, before, validity):
169     # Urgh... mostly copied from x509.py :(
170
171     # Create self-signed certificate
172     cert = OpenSSL.crypto.X509()
173     cert.set_serial_number(1)
174     if before != 0:
175       cert.gmtime_adj_notBefore(int(before))
176     cert.gmtime_adj_notAfter(validity)
177     cert.set_issuer(cert.get_subject())
178     cert.set_pubkey(key)
179     cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
180     return cert
181
182   def testClockSkew(self):
183     SKEW = constants.NODE_MAX_CLOCK_SKEW
184     # Create private and public key
185     key = OpenSSL.crypto.PKey()
186     key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
187
188     validity = 7 * 86400
189     # skew small enough, accepting cert; note that this is a timed
190     # test, and could fail if the machine is so loaded that the next
191     # few lines take more than NODE_MAX_CLOCK_SKEW / 2
192     for before in [-1, 0, SKEW / 4, SKEW / 2]:
193       cert = self._GenCert(key, before, validity)
194       result = utils.VerifyX509Certificate(cert, 1, 2)
195       self.assertEqual(result, (None, None))
196
197     # skew too great, not accepting certs
198     for before in [SKEW * 2, SKEW * 10]:
199       cert = self._GenCert(key, before, validity)
200       (status, msg) = utils.VerifyX509Certificate(cert, 1, 2)
201       self.assertEqual(status, utils.CERT_WARNING)
202       self.assertTrue(msg.startswith("Certificate not yet valid"))
203
204
205 class TestVerifyCertificateInner(unittest.TestCase):
206   def test(self):
207     vci = utils.x509._VerifyCertificateInner
208
209     # Valid
210     self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
211                      (None, None))
212
213     # Not yet valid
214     (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
215     self.assertEqual(errcode, utils.CERT_WARNING)
216
217     # Expiring soon
218     (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
219     self.assertEqual(errcode, utils.CERT_ERROR)
220
221     (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
222     self.assertEqual(errcode, utils.CERT_WARNING)
223
224     (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
225     self.assertEqual(errcode, None)
226
227     # Expired
228     (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
229     self.assertEqual(errcode, utils.CERT_ERROR)
230
231     (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
232     self.assertEqual(errcode, utils.CERT_ERROR)
233
234     (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
235     self.assertEqual(errcode, utils.CERT_ERROR)
236
237     (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
238     self.assertEqual(errcode, utils.CERT_ERROR)
239
240
241 class TestGenerateSelfSignedX509Cert(unittest.TestCase):
242   def setUp(self):
243     self.tmpdir = tempfile.mkdtemp()
244
245   def tearDown(self):
246     shutil.rmtree(self.tmpdir)
247
248   def _checkRsaPrivateKey(self, key):
249     lines = key.splitlines()
250     return (("-----BEGIN RSA PRIVATE KEY-----" in lines and
251              "-----END RSA PRIVATE KEY-----" in lines) or
252             ("-----BEGIN PRIVATE KEY-----" in lines and
253              "-----END PRIVATE KEY-----" in lines))
254
255   def _checkCertificate(self, cert):
256     lines = cert.splitlines()
257     return ("-----BEGIN CERTIFICATE-----" in lines and
258             "-----END CERTIFICATE-----" in lines)
259
260   def test(self):
261     for common_name in [None, ".", "Ganeti", "node1.example.com"]:
262       (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
263       self._checkRsaPrivateKey(key_pem)
264       self._checkCertificate(cert_pem)
265
266       key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
267                                            key_pem)
268       self.assert_(key.bits() >= 1024)
269       self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
270       self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
271
272       x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
273                                              cert_pem)
274       self.failIf(x509.has_expired())
275       self.assertEqual(x509.get_issuer().CN, common_name)
276       self.assertEqual(x509.get_subject().CN, common_name)
277       self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
278
279   def testLegacy(self):
280     cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
281
282     utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
283
284     cert1 = utils.ReadFile(cert1_filename)
285
286     self.assert_(self._checkRsaPrivateKey(cert1))
287     self.assert_(self._checkCertificate(cert1))
288
289
290 class TestCheckNodeCertificate(testutils.GanetiTestCase):
291   def setUp(self):
292     testutils.GanetiTestCase.setUp(self)
293     self.tmpdir = tempfile.mkdtemp()
294
295   def tearDown(self):
296     testutils.GanetiTestCase.tearDown(self)
297     shutil.rmtree(self.tmpdir)
298
299   def testMismatchingKey(self):
300     other_cert = self._TestDataFilename("cert1.pem")
301     node_cert = self._TestDataFilename("cert2.pem")
302
303     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
304                                            utils.ReadFile(other_cert))
305
306     try:
307       utils.CheckNodeCertificate(cert, _noded_cert_file=node_cert)
308     except errors.GenericError, err:
309       self.assertEqual(str(err),
310                        "Given cluster certificate does not match local key")
311     else:
312       self.fail("Exception was not raised")
313
314   def testMatchingKey(self):
315     cert_filename = self._TestDataFilename("cert2.pem")
316
317     # Extract certificate
318     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
319                                            utils.ReadFile(cert_filename))
320     cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
321                                                cert)
322
323     utils.CheckNodeCertificate(cert, _noded_cert_file=cert_filename)
324
325   def testMissingFile(self):
326     cert_path = self._TestDataFilename("cert1.pem")
327     nodecert = utils.PathJoin(self.tmpdir, "does-not-exist")
328
329     utils.CheckNodeCertificate(NotImplemented, _noded_cert_file=nodecert)
330
331     self.assertFalse(os.path.exists(nodecert))
332
333   def testInvalidCertificate(self):
334     tmpfile = utils.PathJoin(self.tmpdir, "cert")
335     utils.WriteFile(tmpfile, data="not a certificate")
336
337     self.assertRaises(errors.X509CertError, utils.CheckNodeCertificate,
338                       NotImplemented, _noded_cert_file=tmpfile)
339
340   def testNoPrivateKey(self):
341     cert = self._TestDataFilename("cert1.pem")
342     self.assertRaises(errors.X509CertError, utils.CheckNodeCertificate,
343                       NotImplemented, _noded_cert_file=cert)
344
345   def testMismatchInNodeCert(self):
346     cert1_path = self._TestDataFilename("cert1.pem")
347     cert2_path = self._TestDataFilename("cert2.pem")
348     tmpfile = utils.PathJoin(self.tmpdir, "cert")
349
350     # Extract certificate
351     cert1 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
352                                             utils.ReadFile(cert1_path))
353     cert1_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
354                                                 cert1)
355
356     # Extract mismatching key
357     key2 = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
358                                           utils.ReadFile(cert2_path))
359     key2_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM,
360                                               key2)
361
362     # Write to file
363     utils.WriteFile(tmpfile, data=cert1_pem + key2_pem)
364
365     try:
366       utils.CheckNodeCertificate(cert1, _noded_cert_file=tmpfile)
367     except errors.X509CertError, err:
368       self.assertEqual(err.args,
369                        (tmpfile, "Certificate does not match with private key"))
370     else:
371       self.fail("Exception was not raised")
372
373
374 if __name__ == "__main__":
375   testutils.GanetiTestProgram()