verify-disks: Explicitely state nothing has to be done
[ganeti-local] / test / ganeti.utils.x509_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.utils.x509"""
23
24 import os
25 import tempfile
26 import unittest
27 import shutil
28 import time
29 import OpenSSL
30 import distutils.version
31 import string
32
33 from ganeti import constants
34 from ganeti import utils
35 from ganeti import compat
36 from ganeti import errors
37
38 import testutils
39
40
41 class TestParseAsn1Generalizedtime(unittest.TestCase):
42   def setUp(self):
43     self._Parse = utils.x509._ParseAsn1Generalizedtime
44
45   def test(self):
46     # UTC
47     self.assertEqual(self._Parse("19700101000000Z"), 0)
48     self.assertEqual(self._Parse("20100222174152Z"), 1266860512)
49     self.assertEqual(self._Parse("20380119031407Z"), (2**31) - 1)
50
51     # With offset
52     self.assertEqual(self._Parse("20100222174152+0000"), 1266860512)
53     self.assertEqual(self._Parse("20100223131652+0000"), 1266931012)
54     self.assertEqual(self._Parse("20100223051808-0800"), 1266931088)
55     self.assertEqual(self._Parse("20100224002135+1100"), 1266931295)
56     self.assertEqual(self._Parse("19700101000000-0100"), 3600)
57
58     # Leap seconds are not supported by datetime.datetime
59     self.assertRaises(ValueError, self._Parse, "19841231235960+0000")
60     self.assertRaises(ValueError, self._Parse, "19920630235960+0000")
61
62     # Errors
63     self.assertRaises(ValueError, self._Parse, "")
64     self.assertRaises(ValueError, self._Parse, "invalid")
65     self.assertRaises(ValueError, self._Parse, "20100222174152")
66     self.assertRaises(ValueError, self._Parse, "Mon Feb 22 17:47:02 UTC 2010")
67     self.assertRaises(ValueError, self._Parse, "2010-02-22 17:42:02")
68
69
70 class TestGetX509CertValidity(testutils.GanetiTestCase):
71   def setUp(self):
72     testutils.GanetiTestCase.setUp(self)
73
74     pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
75
76     # Test whether we have pyOpenSSL 0.7 or above
77     self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
78
79     if not self.pyopenssl0_7:
80       warnings.warn("This test requires pyOpenSSL 0.7 or above to"
81                     " function correctly")
82
83   def _LoadCert(self, name):
84     return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
85                                            self._ReadTestData(name))
86
87   def test(self):
88     validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
89     if self.pyopenssl0_7:
90       self.assertEqual(validity, (1266919967, 1267524767))
91     else:
92       self.assertEqual(validity, (None, None))
93
94
95 class TestSignX509Certificate(unittest.TestCase):
96   KEY = "My private key!"
97   KEY_OTHER = "Another key"
98
99   def test(self):
100     # Generate certificate valid for 5 minutes
101     (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
102
103     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
104                                            cert_pem)
105
106     # No signature at all
107     self.assertRaises(errors.GenericError,
108                       utils.LoadSignedX509Certificate, cert_pem, self.KEY)
109
110     # Invalid input
111     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
112                       "", self.KEY)
113     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
114                       "X-Ganeti-Signature: \n", self.KEY)
115     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
116                       "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
117     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
118                       "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
119     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
120                       "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
121
122     # Invalid salt
123     for salt in list("-_@$,:;/\\ \t\n"):
124       self.assertRaises(errors.GenericError, utils.SignX509Certificate,
125                         cert_pem, self.KEY, "foo%sbar" % salt)
126
127     for salt in ["HelloWorld", "salt", string.letters, string.digits,
128                  utils.GenerateSecret(numbytes=4),
129                  utils.GenerateSecret(numbytes=16),
130                  "{123:456}".encode("hex")]:
131       signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
132
133       self._Check(cert, salt, signed_pem)
134
135       self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
136       self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
137       self._Check(cert, salt, (signed_pem + "\n\na few more\n"
138                                "lines----\n------ at\nthe end!"))
139
140   def _Check(self, cert, salt, pem):
141     (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
142     self.assertEqual(salt, salt2)
143     self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
144
145     # Other key
146     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
147                       pem, self.KEY_OTHER)
148
149
150 class TestCertVerification(testutils.GanetiTestCase):
151   def setUp(self):
152     testutils.GanetiTestCase.setUp(self)
153
154     self.tmpdir = tempfile.mkdtemp()
155
156   def tearDown(self):
157     shutil.rmtree(self.tmpdir)
158
159   def testVerifyCertificate(self):
160     cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
161     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
162                                            cert_pem)
163
164     # Not checking return value as this certificate is expired
165     utils.VerifyX509Certificate(cert, 30, 7)
166
167   @staticmethod
168   def _GenCert(key, before, validity):
169     # Urgh... mostly copied from x509.py :(
170
171     # Create self-signed certificate
172     cert = OpenSSL.crypto.X509()
173     cert.set_serial_number(1)
174     if before != 0:
175       cert.gmtime_adj_notBefore(int(before))
176     cert.gmtime_adj_notAfter(validity)
177     cert.set_issuer(cert.get_subject())
178     cert.set_pubkey(key)
179     cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
180     return cert
181
182   def testClockSkew(self):
183     SKEW = constants.NODE_MAX_CLOCK_SKEW
184     # Create private and public key
185     key = OpenSSL.crypto.PKey()
186     key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
187
188     validity = 7 * 86400
189     # skew small enough, accepting cert; note that this is a timed
190     # test, and could fail if the machine is so loaded that the next
191     # few lines take more than NODE_MAX_CLOCK_SKEW / 2
192     for before in [-1, 0, SKEW / 4, SKEW / 2]:
193       cert = self._GenCert(key, before, validity)
194       result = utils.VerifyX509Certificate(cert, 1, 2)
195       self.assertEqual(result, (None, None))
196
197     # skew too great, not accepting certs
198     for before in [SKEW * 2, SKEW * 10]:
199       cert = self._GenCert(key, before, validity)
200       (status, msg) = utils.VerifyX509Certificate(cert, 1, 2)
201       self.assertEqual(status, utils.CERT_WARNING)
202       self.assertTrue(msg.startswith("Certificate not yet valid"))
203
204
205 class TestVerifyCertificateInner(unittest.TestCase):
206   def test(self):
207     vci = utils.x509._VerifyCertificateInner
208
209     # Valid
210     self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
211                      (None, None))
212
213     # Not yet valid
214     (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
215     self.assertEqual(errcode, utils.CERT_WARNING)
216
217     # Expiring soon
218     (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
219     self.assertEqual(errcode, utils.CERT_ERROR)
220
221     (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
222     self.assertEqual(errcode, utils.CERT_WARNING)
223
224     (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
225     self.assertEqual(errcode, None)
226
227     # Expired
228     (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
229     self.assertEqual(errcode, utils.CERT_ERROR)
230
231     (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
232     self.assertEqual(errcode, utils.CERT_ERROR)
233
234     (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
235     self.assertEqual(errcode, utils.CERT_ERROR)
236
237     (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
238     self.assertEqual(errcode, utils.CERT_ERROR)
239
240
241 class TestGenerateSelfSignedX509Cert(unittest.TestCase):
242   def setUp(self):
243     self.tmpdir = tempfile.mkdtemp()
244
245   def tearDown(self):
246     shutil.rmtree(self.tmpdir)
247
248   def _checkRsaPrivateKey(self, key):
249     lines = key.splitlines()
250     return (("-----BEGIN RSA PRIVATE KEY-----" in lines and
251              "-----END RSA PRIVATE KEY-----" in lines) or
252             ("-----BEGIN PRIVATE KEY-----" in lines and
253              "-----END PRIVATE KEY-----" in lines))
254
255   def _checkCertificate(self, cert):
256     lines = cert.splitlines()
257     return ("-----BEGIN CERTIFICATE-----" in lines and
258             "-----END CERTIFICATE-----" in lines)
259
260   def test(self):
261     for common_name in [None, ".", "Ganeti", "node1.example.com"]:
262       (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
263       self._checkRsaPrivateKey(key_pem)
264       self._checkCertificate(cert_pem)
265
266       key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
267                                            key_pem)
268       self.assert_(key.bits() >= 1024)
269       self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
270       self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
271
272       x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
273                                              cert_pem)
274       self.failIf(x509.has_expired())
275       self.assertEqual(x509.get_issuer().CN, common_name)
276       self.assertEqual(x509.get_subject().CN, common_name)
277       self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
278
279   def testLegacy(self):
280     cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
281
282     utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
283
284     cert1 = utils.ReadFile(cert1_filename)
285
286     self.assert_(self._checkRsaPrivateKey(cert1))
287     self.assert_(self._checkCertificate(cert1))
288
289
290 if __name__ == "__main__":
291   testutils.GanetiTestProgram()