Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils.x509_unittest.py @ 1a2eb2dc

History | View | Annotate | Download (10.3 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.utils.x509"""
23

    
24
import os
25
import tempfile
26
import unittest
27
import shutil
28
import time
29
import OpenSSL
30
import distutils.version
31
import string
32

    
33
from ganeti import constants
34
from ganeti import utils
35
from ganeti import compat
36
from ganeti import errors
37

    
38
import testutils
39

    
40

    
41
class TestParseAsn1Generalizedtime(unittest.TestCase):
42
  def setUp(self):
43
    self._Parse = utils.x509._ParseAsn1Generalizedtime
44

    
45
  def test(self):
46
    # UTC
47
    self.assertEqual(self._Parse("19700101000000Z"), 0)
48
    self.assertEqual(self._Parse("20100222174152Z"), 1266860512)
49
    self.assertEqual(self._Parse("20380119031407Z"), (2**31) - 1)
50

    
51
    # With offset
52
    self.assertEqual(self._Parse("20100222174152+0000"), 1266860512)
53
    self.assertEqual(self._Parse("20100223131652+0000"), 1266931012)
54
    self.assertEqual(self._Parse("20100223051808-0800"), 1266931088)
55
    self.assertEqual(self._Parse("20100224002135+1100"), 1266931295)
56
    self.assertEqual(self._Parse("19700101000000-0100"), 3600)
57

    
58
    # Leap seconds are not supported by datetime.datetime
59
    self.assertRaises(ValueError, self._Parse, "19841231235960+0000")
60
    self.assertRaises(ValueError, self._Parse, "19920630235960+0000")
61

    
62
    # Errors
63
    self.assertRaises(ValueError, self._Parse, "")
64
    self.assertRaises(ValueError, self._Parse, "invalid")
65
    self.assertRaises(ValueError, self._Parse, "20100222174152")
66
    self.assertRaises(ValueError, self._Parse, "Mon Feb 22 17:47:02 UTC 2010")
67
    self.assertRaises(ValueError, self._Parse, "2010-02-22 17:42:02")
68

    
69

    
70
class TestGetX509CertValidity(testutils.GanetiTestCase):
71
  def setUp(self):
72
    testutils.GanetiTestCase.setUp(self)
73

    
74
    pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
75

    
76
    # Test whether we have pyOpenSSL 0.7 or above
77
    self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
78

    
79
    if not self.pyopenssl0_7:
80
      warnings.warn("This test requires pyOpenSSL 0.7 or above to"
81
                    " function correctly")
82

    
83
  def _LoadCert(self, name):
84
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
85
                                           self._ReadTestData(name))
86

    
87
  def test(self):
88
    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
89
    if self.pyopenssl0_7:
90
      self.assertEqual(validity, (1266919967, 1267524767))
91
    else:
92
      self.assertEqual(validity, (None, None))
93

    
94

    
95
class TestSignX509Certificate(unittest.TestCase):
96
  KEY = "My private key!"
97
  KEY_OTHER = "Another key"
98

    
99
  def test(self):
100
    # Generate certificate valid for 5 minutes
101
    (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
102

    
103
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
104
                                           cert_pem)
105

    
106
    # No signature at all
107
    self.assertRaises(errors.GenericError,
108
                      utils.LoadSignedX509Certificate, cert_pem, self.KEY)
109

    
110
    # Invalid input
111
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
112
                      "", self.KEY)
113
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
114
                      "X-Ganeti-Signature: \n", self.KEY)
115
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
116
                      "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
117
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
118
                      "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
119
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
120
                      "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
121

    
122
    # Invalid salt
123
    for salt in list("-_@$,:;/\\ \t\n"):
124
      self.assertRaises(errors.GenericError, utils.SignX509Certificate,
125
                        cert_pem, self.KEY, "foo%sbar" % salt)
126

    
127
    for salt in ["HelloWorld", "salt", string.letters, string.digits,
128
                 utils.GenerateSecret(numbytes=4),
129
                 utils.GenerateSecret(numbytes=16),
130
                 "{123:456}".encode("hex")]:
131
      signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
132

    
133
      self._Check(cert, salt, signed_pem)
134

    
135
      self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
136
      self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
137
      self._Check(cert, salt, (signed_pem + "\n\na few more\n"
138
                               "lines----\n------ at\nthe end!"))
139

    
140
  def _Check(self, cert, salt, pem):
141
    (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
142
    self.assertEqual(salt, salt2)
143
    self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
144

    
145
    # Other key
146
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
147
                      pem, self.KEY_OTHER)
148

    
149

    
150
class TestCertVerification(testutils.GanetiTestCase):
151
  def setUp(self):
152
    testutils.GanetiTestCase.setUp(self)
153

    
154
    self.tmpdir = tempfile.mkdtemp()
155

    
156
  def tearDown(self):
157
    shutil.rmtree(self.tmpdir)
158

    
159
  def testVerifyCertificate(self):
160
    cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
161
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
162
                                           cert_pem)
163

    
164
    # Not checking return value as this certificate is expired
165
    utils.VerifyX509Certificate(cert, 30, 7)
166

    
167
  @staticmethod
168
  def _GenCert(key, before, validity):
169
    # Urgh... mostly copied from x509.py :(
170

    
171
    # Create self-signed certificate
172
    cert = OpenSSL.crypto.X509()
173
    cert.set_serial_number(1)
174
    if before != 0:
175
      cert.gmtime_adj_notBefore(int(before))
176
    cert.gmtime_adj_notAfter(validity)
177
    cert.set_issuer(cert.get_subject())
178
    cert.set_pubkey(key)
179
    cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
180
    return cert
181

    
182
  def testClockSkew(self):
183
    SKEW = constants.NODE_MAX_CLOCK_SKEW
184
    # Create private and public key
185
    key = OpenSSL.crypto.PKey()
186
    key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
187

    
188
    validity = 7 * 86400
189
    # skew small enough, accepting cert; note that this is a timed
190
    # test, and could fail if the machine is so loaded that the next
191
    # few lines take more than NODE_MAX_CLOCK_SKEW / 2
192
    for before in [-1, 0, SKEW / 4, SKEW / 2]:
193
      cert = self._GenCert(key, before, validity)
194
      result = utils.VerifyX509Certificate(cert, 1, 2)
195
      self.assertEqual(result, (None, None))
196

    
197
    # skew too great, not accepting certs
198
    for before in [SKEW * 2, SKEW * 10]:
199
      cert = self._GenCert(key, before, validity)
200
      (status, msg) = utils.VerifyX509Certificate(cert, 1, 2)
201
      self.assertEqual(status, utils.CERT_WARNING)
202
      self.assertTrue(msg.startswith("Certificate not yet valid"))
203

    
204

    
205
class TestVerifyCertificateInner(unittest.TestCase):
206
  def test(self):
207
    vci = utils.x509._VerifyCertificateInner
208

    
209
    # Valid
210
    self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
211
                     (None, None))
212

    
213
    # Not yet valid
214
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
215
    self.assertEqual(errcode, utils.CERT_WARNING)
216

    
217
    # Expiring soon
218
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
219
    self.assertEqual(errcode, utils.CERT_ERROR)
220

    
221
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
222
    self.assertEqual(errcode, utils.CERT_WARNING)
223

    
224
    (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
225
    self.assertEqual(errcode, None)
226

    
227
    # Expired
228
    (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
229
    self.assertEqual(errcode, utils.CERT_ERROR)
230

    
231
    (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
232
    self.assertEqual(errcode, utils.CERT_ERROR)
233

    
234
    (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
235
    self.assertEqual(errcode, utils.CERT_ERROR)
236

    
237
    (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
238
    self.assertEqual(errcode, utils.CERT_ERROR)
239

    
240

    
241
class TestGenerateSelfSignedX509Cert(unittest.TestCase):
242
  def setUp(self):
243
    self.tmpdir = tempfile.mkdtemp()
244

    
245
  def tearDown(self):
246
    shutil.rmtree(self.tmpdir)
247

    
248
  def _checkRsaPrivateKey(self, key):
249
    lines = key.splitlines()
250
    return (("-----BEGIN RSA PRIVATE KEY-----" in lines and
251
             "-----END RSA PRIVATE KEY-----" in lines) or
252
            ("-----BEGIN PRIVATE KEY-----" in lines and
253
             "-----END PRIVATE KEY-----" in lines))
254

    
255
  def _checkCertificate(self, cert):
256
    lines = cert.splitlines()
257
    return ("-----BEGIN CERTIFICATE-----" in lines and
258
            "-----END CERTIFICATE-----" in lines)
259

    
260
  def test(self):
261
    for common_name in [None, ".", "Ganeti", "node1.example.com"]:
262
      (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
263
      self._checkRsaPrivateKey(key_pem)
264
      self._checkCertificate(cert_pem)
265

    
266
      key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
267
                                           key_pem)
268
      self.assert_(key.bits() >= 1024)
269
      self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
270
      self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
271

    
272
      x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
273
                                             cert_pem)
274
      self.failIf(x509.has_expired())
275
      self.assertEqual(x509.get_issuer().CN, common_name)
276
      self.assertEqual(x509.get_subject().CN, common_name)
277
      self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
278

    
279
  def testLegacy(self):
280
    cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
281

    
282
    utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
283

    
284
    cert1 = utils.ReadFile(cert1_filename)
285

    
286
    self.assert_(self._checkRsaPrivateKey(cert1))
287
    self.assert_(self._checkCertificate(cert1))
288

    
289

    
290
if __name__ == "__main__":
291
  testutils.GanetiTestProgram()