Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils.x509_unittest.py @ 7578ab0a

History | View | Annotate | Download (8.9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.utils.x509"""
23

    
24
import os
25
import tempfile
26
import unittest
27
import shutil
28
import time
29
import OpenSSL
30
import distutils.version
31
import string
32

    
33
from ganeti import constants
34
from ganeti import utils
35
from ganeti import compat
36
from ganeti import errors
37

    
38
import testutils
39

    
40

    
41
class TestParseAsn1Generalizedtime(unittest.TestCase):
42
  def setUp(self):
43
    self._Parse = utils.x509._ParseAsn1Generalizedtime
44

    
45
  def test(self):
46
    # UTC
47
    self.assertEqual(self._Parse("19700101000000Z"), 0)
48
    self.assertEqual(self._Parse("20100222174152Z"), 1266860512)
49
    self.assertEqual(self._Parse("20380119031407Z"), (2**31) - 1)
50

    
51
    # With offset
52
    self.assertEqual(self._Parse("20100222174152+0000"), 1266860512)
53
    self.assertEqual(self._Parse("20100223131652+0000"), 1266931012)
54
    self.assertEqual(self._Parse("20100223051808-0800"), 1266931088)
55
    self.assertEqual(self._Parse("20100224002135+1100"), 1266931295)
56
    self.assertEqual(self._Parse("19700101000000-0100"), 3600)
57

    
58
    # Leap seconds are not supported by datetime.datetime
59
    self.assertRaises(ValueError, self._Parse, "19841231235960+0000")
60
    self.assertRaises(ValueError, self._Parse, "19920630235960+0000")
61

    
62
    # Errors
63
    self.assertRaises(ValueError, self._Parse, "")
64
    self.assertRaises(ValueError, self._Parse, "invalid")
65
    self.assertRaises(ValueError, self._Parse, "20100222174152")
66
    self.assertRaises(ValueError, self._Parse, "Mon Feb 22 17:47:02 UTC 2010")
67
    self.assertRaises(ValueError, self._Parse, "2010-02-22 17:42:02")
68

    
69

    
70
class TestGetX509CertValidity(testutils.GanetiTestCase):
71
  def setUp(self):
72
    testutils.GanetiTestCase.setUp(self)
73

    
74
    pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
75

    
76
    # Test whether we have pyOpenSSL 0.7 or above
77
    self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
78

    
79
    if not self.pyopenssl0_7:
80
      warnings.warn("This test requires pyOpenSSL 0.7 or above to"
81
                    " function correctly")
82

    
83
  def _LoadCert(self, name):
84
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
85
                                           self._ReadTestData(name))
86

    
87
  def test(self):
88
    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
89
    if self.pyopenssl0_7:
90
      self.assertEqual(validity, (1266919967, 1267524767))
91
    else:
92
      self.assertEqual(validity, (None, None))
93

    
94

    
95
class TestSignX509Certificate(unittest.TestCase):
96
  KEY = "My private key!"
97
  KEY_OTHER = "Another key"
98

    
99
  def test(self):
100
    # Generate certificate valid for 5 minutes
101
    (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
102

    
103
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
104
                                           cert_pem)
105

    
106
    # No signature at all
107
    self.assertRaises(errors.GenericError,
108
                      utils.LoadSignedX509Certificate, cert_pem, self.KEY)
109

    
110
    # Invalid input
111
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
112
                      "", self.KEY)
113
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
114
                      "X-Ganeti-Signature: \n", self.KEY)
115
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
116
                      "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
117
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
118
                      "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
119
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
120
                      "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
121

    
122
    # Invalid salt
123
    for salt in list("-_@$,:;/\\ \t\n"):
124
      self.assertRaises(errors.GenericError, utils.SignX509Certificate,
125
                        cert_pem, self.KEY, "foo%sbar" % salt)
126

    
127
    for salt in ["HelloWorld", "salt", string.letters, string.digits,
128
                 utils.GenerateSecret(numbytes=4),
129
                 utils.GenerateSecret(numbytes=16),
130
                 "{123:456}".encode("hex")]:
131
      signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
132

    
133
      self._Check(cert, salt, signed_pem)
134

    
135
      self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
136
      self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
137
      self._Check(cert, salt, (signed_pem + "\n\na few more\n"
138
                               "lines----\n------ at\nthe end!"))
139

    
140
  def _Check(self, cert, salt, pem):
141
    (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
142
    self.assertEqual(salt, salt2)
143
    self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
144

    
145
    # Other key
146
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
147
                      pem, self.KEY_OTHER)
148

    
149

    
150
class TestCertVerification(testutils.GanetiTestCase):
151
  def setUp(self):
152
    testutils.GanetiTestCase.setUp(self)
153

    
154
    self.tmpdir = tempfile.mkdtemp()
155

    
156
  def tearDown(self):
157
    shutil.rmtree(self.tmpdir)
158

    
159
  def testVerifyCertificate(self):
160
    cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
161
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
162
                                           cert_pem)
163

    
164
    # Not checking return value as this certificate is expired
165
    utils.VerifyX509Certificate(cert, 30, 7)
166

    
167

    
168
class TestVerifyCertificateInner(unittest.TestCase):
169
  def test(self):
170
    vci = utils.x509._VerifyCertificateInner
171

    
172
    # Valid
173
    self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
174
                     (None, None))
175

    
176
    # Not yet valid
177
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
178
    self.assertEqual(errcode, utils.CERT_WARNING)
179

    
180
    # Expiring soon
181
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
182
    self.assertEqual(errcode, utils.CERT_ERROR)
183

    
184
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
185
    self.assertEqual(errcode, utils.CERT_WARNING)
186

    
187
    (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
188
    self.assertEqual(errcode, None)
189

    
190
    # Expired
191
    (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
192
    self.assertEqual(errcode, utils.CERT_ERROR)
193

    
194
    (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
195
    self.assertEqual(errcode, utils.CERT_ERROR)
196

    
197
    (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
198
    self.assertEqual(errcode, utils.CERT_ERROR)
199

    
200
    (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
201
    self.assertEqual(errcode, utils.CERT_ERROR)
202

    
203

    
204
class TestGenerateSelfSignedX509Cert(unittest.TestCase):
205
  def setUp(self):
206
    self.tmpdir = tempfile.mkdtemp()
207

    
208
  def tearDown(self):
209
    shutil.rmtree(self.tmpdir)
210

    
211
  def _checkRsaPrivateKey(self, key):
212
    lines = key.splitlines()
213
    return ("-----BEGIN RSA PRIVATE KEY-----" in lines and
214
            "-----END RSA PRIVATE KEY-----" in lines)
215

    
216
  def _checkCertificate(self, cert):
217
    lines = cert.splitlines()
218
    return ("-----BEGIN CERTIFICATE-----" in lines and
219
            "-----END CERTIFICATE-----" in lines)
220

    
221
  def test(self):
222
    for common_name in [None, ".", "Ganeti", "node1.example.com"]:
223
      (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
224
      self._checkRsaPrivateKey(key_pem)
225
      self._checkCertificate(cert_pem)
226

    
227
      key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
228
                                           key_pem)
229
      self.assert_(key.bits() >= 1024)
230
      self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
231
      self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
232

    
233
      x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
234
                                             cert_pem)
235
      self.failIf(x509.has_expired())
236
      self.assertEqual(x509.get_issuer().CN, common_name)
237
      self.assertEqual(x509.get_subject().CN, common_name)
238
      self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
239

    
240
  def testLegacy(self):
241
    cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
242

    
243
    utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
244

    
245
    cert1 = utils.ReadFile(cert1_filename)
246

    
247
    self.assert_(self._checkRsaPrivateKey(cert1))
248
    self.assert_(self._checkCertificate(cert1))
249

    
250

    
251
if __name__ == "__main__":
252
  testutils.GanetiTestProgram()