Statistics
| Branch: | Tag: | Revision:

root / test / ganeti.utils.x509_unittest.py @ e85444d0

History | View | Annotate | Download (9 kB)

1
#!/usr/bin/python
2
#
3

    
4
# Copyright (C) 2006, 2007, 2010, 2011 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Script for testing ganeti.utils.x509"""
23

    
24
import os
25
import tempfile
26
import unittest
27
import shutil
28
import time
29
import OpenSSL
30
import distutils.version
31
import string
32

    
33
from ganeti import constants
34
from ganeti import utils
35
from ganeti import compat
36
from ganeti import errors
37

    
38
import testutils
39

    
40

    
41
class TestParseAsn1Generalizedtime(unittest.TestCase):
42
  def setUp(self):
43
    self._Parse = utils.x509._ParseAsn1Generalizedtime
44

    
45
  def test(self):
46
    # UTC
47
    self.assertEqual(self._Parse("19700101000000Z"), 0)
48
    self.assertEqual(self._Parse("20100222174152Z"), 1266860512)
49
    self.assertEqual(self._Parse("20380119031407Z"), (2**31) - 1)
50

    
51
    # With offset
52
    self.assertEqual(self._Parse("20100222174152+0000"), 1266860512)
53
    self.assertEqual(self._Parse("20100223131652+0000"), 1266931012)
54
    self.assertEqual(self._Parse("20100223051808-0800"), 1266931088)
55
    self.assertEqual(self._Parse("20100224002135+1100"), 1266931295)
56
    self.assertEqual(self._Parse("19700101000000-0100"), 3600)
57

    
58
    # Leap seconds are not supported by datetime.datetime
59
    self.assertRaises(ValueError, self._Parse, "19841231235960+0000")
60
    self.assertRaises(ValueError, self._Parse, "19920630235960+0000")
61

    
62
    # Errors
63
    self.assertRaises(ValueError, self._Parse, "")
64
    self.assertRaises(ValueError, self._Parse, "invalid")
65
    self.assertRaises(ValueError, self._Parse, "20100222174152")
66
    self.assertRaises(ValueError, self._Parse, "Mon Feb 22 17:47:02 UTC 2010")
67
    self.assertRaises(ValueError, self._Parse, "2010-02-22 17:42:02")
68

    
69

    
70
class TestGetX509CertValidity(testutils.GanetiTestCase):
71
  def setUp(self):
72
    testutils.GanetiTestCase.setUp(self)
73

    
74
    pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
75

    
76
    # Test whether we have pyOpenSSL 0.7 or above
77
    self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
78

    
79
    if not self.pyopenssl0_7:
80
      warnings.warn("This test requires pyOpenSSL 0.7 or above to"
81
                    " function correctly")
82

    
83
  def _LoadCert(self, name):
84
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
85
                                           self._ReadTestData(name))
86

    
87
  def test(self):
88
    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
89
    if self.pyopenssl0_7:
90
      self.assertEqual(validity, (1266919967, 1267524767))
91
    else:
92
      self.assertEqual(validity, (None, None))
93

    
94

    
95
class TestSignX509Certificate(unittest.TestCase):
96
  KEY = "My private key!"
97
  KEY_OTHER = "Another key"
98

    
99
  def test(self):
100
    # Generate certificate valid for 5 minutes
101
    (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
102

    
103
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
104
                                           cert_pem)
105

    
106
    # No signature at all
107
    self.assertRaises(errors.GenericError,
108
                      utils.LoadSignedX509Certificate, cert_pem, self.KEY)
109

    
110
    # Invalid input
111
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
112
                      "", self.KEY)
113
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
114
                      "X-Ganeti-Signature: \n", self.KEY)
115
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
116
                      "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
117
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
118
                      "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
119
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
120
                      "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
121

    
122
    # Invalid salt
123
    for salt in list("-_@$,:;/\\ \t\n"):
124
      self.assertRaises(errors.GenericError, utils.SignX509Certificate,
125
                        cert_pem, self.KEY, "foo%sbar" % salt)
126

    
127
    for salt in ["HelloWorld", "salt", string.letters, string.digits,
128
                 utils.GenerateSecret(numbytes=4),
129
                 utils.GenerateSecret(numbytes=16),
130
                 "{123:456}".encode("hex")]:
131
      signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
132

    
133
      self._Check(cert, salt, signed_pem)
134

    
135
      self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
136
      self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
137
      self._Check(cert, salt, (signed_pem + "\n\na few more\n"
138
                               "lines----\n------ at\nthe end!"))
139

    
140
  def _Check(self, cert, salt, pem):
141
    (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
142
    self.assertEqual(salt, salt2)
143
    self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
144

    
145
    # Other key
146
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
147
                      pem, self.KEY_OTHER)
148

    
149

    
150
class TestCertVerification(testutils.GanetiTestCase):
151
  def setUp(self):
152
    testutils.GanetiTestCase.setUp(self)
153

    
154
    self.tmpdir = tempfile.mkdtemp()
155

    
156
  def tearDown(self):
157
    shutil.rmtree(self.tmpdir)
158

    
159
  def testVerifyCertificate(self):
160
    cert_pem = utils.ReadFile(self._TestDataFilename("cert1.pem"))
161
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
162
                                           cert_pem)
163

    
164
    # Not checking return value as this certificate is expired
165
    utils.VerifyX509Certificate(cert, 30, 7)
166

    
167

    
168
class TestVerifyCertificateInner(unittest.TestCase):
169
  def test(self):
170
    vci = utils.x509._VerifyCertificateInner
171

    
172
    # Valid
173
    self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
174
                     (None, None))
175

    
176
    # Not yet valid
177
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
178
    self.assertEqual(errcode, utils.CERT_WARNING)
179

    
180
    # Expiring soon
181
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
182
    self.assertEqual(errcode, utils.CERT_ERROR)
183

    
184
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
185
    self.assertEqual(errcode, utils.CERT_WARNING)
186

    
187
    (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
188
    self.assertEqual(errcode, None)
189

    
190
    # Expired
191
    (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
192
    self.assertEqual(errcode, utils.CERT_ERROR)
193

    
194
    (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
195
    self.assertEqual(errcode, utils.CERT_ERROR)
196

    
197
    (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
198
    self.assertEqual(errcode, utils.CERT_ERROR)
199

    
200
    (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
201
    self.assertEqual(errcode, utils.CERT_ERROR)
202

    
203

    
204
class TestGenerateSelfSignedX509Cert(unittest.TestCase):
205
  def setUp(self):
206
    self.tmpdir = tempfile.mkdtemp()
207

    
208
  def tearDown(self):
209
    shutil.rmtree(self.tmpdir)
210

    
211
  def _checkRsaPrivateKey(self, key):
212
    lines = key.splitlines()
213
    return (("-----BEGIN RSA PRIVATE KEY-----" in lines and
214
             "-----END RSA PRIVATE KEY-----" in lines) or
215
            ("-----BEGIN PRIVATE KEY-----" in lines and
216
             "-----END PRIVATE KEY-----" in lines))
217

    
218
  def _checkCertificate(self, cert):
219
    lines = cert.splitlines()
220
    return ("-----BEGIN CERTIFICATE-----" in lines and
221
            "-----END CERTIFICATE-----" in lines)
222

    
223
  def test(self):
224
    for common_name in [None, ".", "Ganeti", "node1.example.com"]:
225
      (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
226
      self._checkRsaPrivateKey(key_pem)
227
      self._checkCertificate(cert_pem)
228

    
229
      key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
230
                                           key_pem)
231
      self.assert_(key.bits() >= 1024)
232
      self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
233
      self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
234

    
235
      x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
236
                                             cert_pem)
237
      self.failIf(x509.has_expired())
238
      self.assertEqual(x509.get_issuer().CN, common_name)
239
      self.assertEqual(x509.get_subject().CN, common_name)
240
      self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
241

    
242
  def testLegacy(self):
243
    cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
244

    
245
    utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
246

    
247
    cert1 = utils.ReadFile(cert1_filename)
248

    
249
    self.assert_(self._checkRsaPrivateKey(cert1))
250
    self.assert_(self._checkCertificate(cert1))
251

    
252

    
253
if __name__ == "__main__":
254
  testutils.GanetiTestProgram()