Update default instance kernel version
[ganeti-local] / test / ganeti.utils.x509_unittest.py
1 #!/usr/bin/python
2 #
3
4 # Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Script for testing ganeti.utils.x509"""
23
24 import os
25 import tempfile
26 import unittest
27 import shutil
28 import time
29 import OpenSSL
30 import distutils.version
31 import string
32
33 from ganeti import constants
34 from ganeti import utils
35 from ganeti import compat
36 from ganeti import errors
37
38 import testutils
39
40
41 class TestParseAsn1Generalizedtime(unittest.TestCase):
42   def setUp(self):
43     self._Parse = utils.x509._ParseAsn1Generalizedtime
44
45   def test(self):
46     # UTC
47     self.assertEqual(self._Parse("19700101000000Z"), 0)
48     self.assertEqual(self._Parse("20100222174152Z"), 1266860512)
49     self.assertEqual(self._Parse("20380119031407Z"), (2**31) - 1)
50
51     # With offset
52     self.assertEqual(self._Parse("20100222174152+0000"), 1266860512)
53     self.assertEqual(self._Parse("20100223131652+0000"), 1266931012)
54     self.assertEqual(self._Parse("20100223051808-0800"), 1266931088)
55     self.assertEqual(self._Parse("20100224002135+1100"), 1266931295)
56     self.assertEqual(self._Parse("19700101000000-0100"), 3600)
57
58     # Leap seconds are not supported by datetime.datetime
59     self.assertRaises(ValueError, self._Parse, "19841231235960+0000")
60     self.assertRaises(ValueError, self._Parse, "19920630235960+0000")
61
62     # Errors
63     self.assertRaises(ValueError, self._Parse, "")
64     self.assertRaises(ValueError, self._Parse, "invalid")
65     self.assertRaises(ValueError, self._Parse, "20100222174152")
66     self.assertRaises(ValueError, self._Parse, "Mon Feb 22 17:47:02 UTC 2010")
67     self.assertRaises(ValueError, self._Parse, "2010-02-22 17:42:02")
68
69
70 class TestGetX509CertValidity(testutils.GanetiTestCase):
71   def setUp(self):
72     testutils.GanetiTestCase.setUp(self)
73
74     pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
75
76     # Test whether we have pyOpenSSL 0.7 or above
77     self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
78
79     if not self.pyopenssl0_7:
80       warnings.warn("This test requires pyOpenSSL 0.7 or above to"
81                     " function correctly")
82
83   def _LoadCert(self, name):
84     return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
85                                            self._ReadTestData(name))
86
87   def test(self):
88     validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
89     if self.pyopenssl0_7:
90       self.assertEqual(validity, (1266919967, 1267524767))
91     else:
92       self.assertEqual(validity, (None, None))
93
94
95 class TestSignX509Certificate(unittest.TestCase):
96   KEY = "My private key!"
97   KEY_OTHER = "Another key"
98
99   def test(self):
100     # Generate certificate valid for 5 minutes
101     (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
102
103     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
104                                            cert_pem)
105
106     # No signature at all
107     self.assertRaises(errors.GenericError,
108                       utils.LoadSignedX509Certificate, cert_pem, self.KEY)
109
110     # Invalid input
111     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
112                       "", self.KEY)
113     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
114                       "X-Ganeti-Signature: \n", self.KEY)
115     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
116                       "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
117     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
118                       "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
119     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
120                       "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
121
122     # Invalid salt
123     for salt in list("-_@$,:;/\\ \t\n"):
124       self.assertRaises(errors.GenericError, utils.SignX509Certificate,
125                         cert_pem, self.KEY, "foo%sbar" % salt)
126
127     for salt in ["HelloWorld", "salt", string.letters, string.digits,
128                  utils.GenerateSecret(numbytes=4),
129                  utils.GenerateSecret(numbytes=16),
130                  "{123:456}".encode("hex")]:
131       signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
132
133       self._Check(cert, salt, signed_pem)
134
135       self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
136       self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
137       self._Check(cert, salt, (signed_pem + "\n\na few more\n"
138                                "lines----\n------ at\nthe end!"))
139
140   def _Check(self, cert, salt, pem):
141     (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
142     self.assertEqual(salt, salt2)
143     self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
144
145     # Other key
146     self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
147                       pem, self.KEY_OTHER)
148
149
150 class TestCertVerification(testutils.GanetiTestCase):
151   def setUp(self):
152     testutils.GanetiTestCase.setUp(self)
153
154     self.tmpdir = tempfile.mkdtemp()
155
156   def tearDown(self):
157     shutil.rmtree(self.tmpdir)
158
159   def testVerifyCertificate(self):
160     cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
161     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
162                                            cert_pem)
163
164     # Not checking return value as this certificate is expired
165     utils.VerifyX509Certificate(cert, 30, 7)
166
167
168 class TestVerifyCertificateInner(unittest.TestCase):
169   def test(self):
170     vci = utils.x509._VerifyCertificateInner
171
172     # Valid
173     self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
174                      (None, None))
175
176     # Not yet valid
177     (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
178     self.assertEqual(errcode, utils.CERT_WARNING)
179
180     # Expiring soon
181     (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
182     self.assertEqual(errcode, utils.CERT_ERROR)
183
184     (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
185     self.assertEqual(errcode, utils.CERT_WARNING)
186
187     (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
188     self.assertEqual(errcode, None)
189
190     # Expired
191     (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
192     self.assertEqual(errcode, utils.CERT_ERROR)
193
194     (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
195     self.assertEqual(errcode, utils.CERT_ERROR)
196
197     (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
198     self.assertEqual(errcode, utils.CERT_ERROR)
199
200     (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
201     self.assertEqual(errcode, utils.CERT_ERROR)
202
203
204 class TestGenerateSelfSignedX509Cert(unittest.TestCase):
205   def setUp(self):
206     self.tmpdir = tempfile.mkdtemp()
207
208   def tearDown(self):
209     shutil.rmtree(self.tmpdir)
210
211   def _checkRsaPrivateKey(self, key):
212     lines = key.splitlines()
213     return (("-----BEGIN RSA PRIVATE KEY-----" in lines and
214              "-----END RSA PRIVATE KEY-----" in lines) or
215             ("-----BEGIN PRIVATE KEY-----" in lines and
216              "-----END PRIVATE KEY-----" in lines))
217
218   def _checkCertificate(self, cert):
219     lines = cert.splitlines()
220     return ("-----BEGIN CERTIFICATE-----" in lines and
221             "-----END CERTIFICATE-----" in lines)
222
223   def test(self):
224     for common_name in [None, ".", "Ganeti", "node1.example.com"]:
225       (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
226       self._checkRsaPrivateKey(key_pem)
227       self._checkCertificate(cert_pem)
228
229       key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
230                                            key_pem)
231       self.assert_(key.bits() >= 1024)
232       self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
233       self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
234
235       x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
236                                              cert_pem)
237       self.failIf(x509.has_expired())
238       self.assertEqual(x509.get_issuer().CN, common_name)
239       self.assertEqual(x509.get_subject().CN, common_name)
240       self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
241
242   def testLegacy(self):
243     cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
244
245     utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
246
247     cert1 = utils.ReadFile(cert1_filename)
248
249     self.assert_(self._checkRsaPrivateKey(cert1))
250     self.assert_(self._checkCertificate(cert1))
251
252
253 if __name__ == "__main__":
254   testutils.GanetiTestProgram()