Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.utils.x509_unittest.py @ b4726cd7

History | View | Annotate | Download (13.5 kB)

1 c50645c0 Michael Hanselmann
#!/usr/bin/python
2 c50645c0 Michael Hanselmann
#
3 c50645c0 Michael Hanselmann
4 f97a7ada Iustin Pop
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 c50645c0 Michael Hanselmann
#
6 c50645c0 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 c50645c0 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 c50645c0 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 c50645c0 Michael Hanselmann
# (at your option) any later version.
10 c50645c0 Michael Hanselmann
#
11 c50645c0 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 c50645c0 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 c50645c0 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 c50645c0 Michael Hanselmann
# General Public License for more details.
15 c50645c0 Michael Hanselmann
#
16 c50645c0 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 c50645c0 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 c50645c0 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 c50645c0 Michael Hanselmann
# 02110-1301, USA.
20 c50645c0 Michael Hanselmann
21 c50645c0 Michael Hanselmann
22 c50645c0 Michael Hanselmann
"""Script for testing ganeti.utils.x509"""
23 c50645c0 Michael Hanselmann
24 c50645c0 Michael Hanselmann
import os
25 c50645c0 Michael Hanselmann
import tempfile
26 c50645c0 Michael Hanselmann
import unittest
27 c50645c0 Michael Hanselmann
import shutil
28 c50645c0 Michael Hanselmann
import OpenSSL
29 c50645c0 Michael Hanselmann
import distutils.version
30 c50645c0 Michael Hanselmann
import string
31 c50645c0 Michael Hanselmann
32 c50645c0 Michael Hanselmann
from ganeti import constants
33 c50645c0 Michael Hanselmann
from ganeti import utils
34 c50645c0 Michael Hanselmann
from ganeti import errors
35 c50645c0 Michael Hanselmann
36 c50645c0 Michael Hanselmann
import testutils
37 c50645c0 Michael Hanselmann
38 c50645c0 Michael Hanselmann
39 c50645c0 Michael Hanselmann
class TestParseAsn1Generalizedtime(unittest.TestCase):
40 c50645c0 Michael Hanselmann
  def setUp(self):
41 c50645c0 Michael Hanselmann
    self._Parse = utils.x509._ParseAsn1Generalizedtime
42 c50645c0 Michael Hanselmann
43 c50645c0 Michael Hanselmann
  def test(self):
44 c50645c0 Michael Hanselmann
    # UTC
45 c50645c0 Michael Hanselmann
    self.assertEqual(self._Parse("19700101000000Z"), 0)
46 c50645c0 Michael Hanselmann
    self.assertEqual(self._Parse("20100222174152Z"), 1266860512)
47 c50645c0 Michael Hanselmann
    self.assertEqual(self._Parse("20380119031407Z"), (2**31) - 1)
48 c50645c0 Michael Hanselmann
49 c50645c0 Michael Hanselmann
    # With offset
50 c50645c0 Michael Hanselmann
    self.assertEqual(self._Parse("20100222174152+0000"), 1266860512)
51 c50645c0 Michael Hanselmann
    self.assertEqual(self._Parse("20100223131652+0000"), 1266931012)
52 c50645c0 Michael Hanselmann
    self.assertEqual(self._Parse("20100223051808-0800"), 1266931088)
53 c50645c0 Michael Hanselmann
    self.assertEqual(self._Parse("20100224002135+1100"), 1266931295)
54 c50645c0 Michael Hanselmann
    self.assertEqual(self._Parse("19700101000000-0100"), 3600)
55 c50645c0 Michael Hanselmann
56 c50645c0 Michael Hanselmann
    # Leap seconds are not supported by datetime.datetime
57 c50645c0 Michael Hanselmann
    self.assertRaises(ValueError, self._Parse, "19841231235960+0000")
58 c50645c0 Michael Hanselmann
    self.assertRaises(ValueError, self._Parse, "19920630235960+0000")
59 c50645c0 Michael Hanselmann
60 c50645c0 Michael Hanselmann
    # Errors
61 c50645c0 Michael Hanselmann
    self.assertRaises(ValueError, self._Parse, "")
62 c50645c0 Michael Hanselmann
    self.assertRaises(ValueError, self._Parse, "invalid")
63 c50645c0 Michael Hanselmann
    self.assertRaises(ValueError, self._Parse, "20100222174152")
64 c50645c0 Michael Hanselmann
    self.assertRaises(ValueError, self._Parse, "Mon Feb 22 17:47:02 UTC 2010")
65 c50645c0 Michael Hanselmann
    self.assertRaises(ValueError, self._Parse, "2010-02-22 17:42:02")
66 c50645c0 Michael Hanselmann
67 c50645c0 Michael Hanselmann
68 c50645c0 Michael Hanselmann
class TestGetX509CertValidity(testutils.GanetiTestCase):
69 c50645c0 Michael Hanselmann
  def setUp(self):
70 c50645c0 Michael Hanselmann
    testutils.GanetiTestCase.setUp(self)
71 c50645c0 Michael Hanselmann
72 c50645c0 Michael Hanselmann
    pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
73 c50645c0 Michael Hanselmann
74 c50645c0 Michael Hanselmann
    # Test whether we have pyOpenSSL 0.7 or above
75 c50645c0 Michael Hanselmann
    self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
76 c50645c0 Michael Hanselmann
77 c50645c0 Michael Hanselmann
    if not self.pyopenssl0_7:
78 c50645c0 Michael Hanselmann
      warnings.warn("This test requires pyOpenSSL 0.7 or above to"
79 c50645c0 Michael Hanselmann
                    " function correctly")
80 c50645c0 Michael Hanselmann
81 c50645c0 Michael Hanselmann
  def _LoadCert(self, name):
82 c50645c0 Michael Hanselmann
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
83 00ef625c Michael Hanselmann
                                           testutils.ReadTestData(name))
84 c50645c0 Michael Hanselmann
85 c50645c0 Michael Hanselmann
  def test(self):
86 c50645c0 Michael Hanselmann
    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
87 c50645c0 Michael Hanselmann
    if self.pyopenssl0_7:
88 c50645c0 Michael Hanselmann
      self.assertEqual(validity, (1266919967, 1267524767))
89 c50645c0 Michael Hanselmann
    else:
90 c50645c0 Michael Hanselmann
      self.assertEqual(validity, (None, None))
91 c50645c0 Michael Hanselmann
92 c50645c0 Michael Hanselmann
93 c50645c0 Michael Hanselmann
class TestSignX509Certificate(unittest.TestCase):
94 c50645c0 Michael Hanselmann
  KEY = "My private key!"
95 c50645c0 Michael Hanselmann
  KEY_OTHER = "Another key"
96 c50645c0 Michael Hanselmann
97 c50645c0 Michael Hanselmann
  def test(self):
98 c50645c0 Michael Hanselmann
    # Generate certificate valid for 5 minutes
99 ab4b1cf2 Helga Velroyen
    (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300, 1)
100 c50645c0 Michael Hanselmann
101 c50645c0 Michael Hanselmann
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
102 c50645c0 Michael Hanselmann
                                           cert_pem)
103 c50645c0 Michael Hanselmann
104 c50645c0 Michael Hanselmann
    # No signature at all
105 c50645c0 Michael Hanselmann
    self.assertRaises(errors.GenericError,
106 c50645c0 Michael Hanselmann
                      utils.LoadSignedX509Certificate, cert_pem, self.KEY)
107 c50645c0 Michael Hanselmann
108 c50645c0 Michael Hanselmann
    # Invalid input
109 c50645c0 Michael Hanselmann
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
110 c50645c0 Michael Hanselmann
                      "", self.KEY)
111 c50645c0 Michael Hanselmann
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
112 c50645c0 Michael Hanselmann
                      "X-Ganeti-Signature: \n", self.KEY)
113 c50645c0 Michael Hanselmann
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
114 c50645c0 Michael Hanselmann
                      "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
115 c50645c0 Michael Hanselmann
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
116 c50645c0 Michael Hanselmann
                      "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
117 c50645c0 Michael Hanselmann
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
118 c50645c0 Michael Hanselmann
                      "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
119 c50645c0 Michael Hanselmann
120 c50645c0 Michael Hanselmann
    # Invalid salt
121 c50645c0 Michael Hanselmann
    for salt in list("-_@$,:;/\\ \t\n"):
122 c50645c0 Michael Hanselmann
      self.assertRaises(errors.GenericError, utils.SignX509Certificate,
123 c50645c0 Michael Hanselmann
                        cert_pem, self.KEY, "foo%sbar" % salt)
124 c50645c0 Michael Hanselmann
125 c50645c0 Michael Hanselmann
    for salt in ["HelloWorld", "salt", string.letters, string.digits,
126 c50645c0 Michael Hanselmann
                 utils.GenerateSecret(numbytes=4),
127 c50645c0 Michael Hanselmann
                 utils.GenerateSecret(numbytes=16),
128 c50645c0 Michael Hanselmann
                 "{123:456}".encode("hex")]:
129 c50645c0 Michael Hanselmann
      signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
130 c50645c0 Michael Hanselmann
131 c50645c0 Michael Hanselmann
      self._Check(cert, salt, signed_pem)
132 c50645c0 Michael Hanselmann
133 c50645c0 Michael Hanselmann
      self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
134 c50645c0 Michael Hanselmann
      self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
135 c50645c0 Michael Hanselmann
      self._Check(cert, salt, (signed_pem + "\n\na few more\n"
136 c50645c0 Michael Hanselmann
                               "lines----\n------ at\nthe end!"))
137 c50645c0 Michael Hanselmann
138 c50645c0 Michael Hanselmann
  def _Check(self, cert, salt, pem):
139 c50645c0 Michael Hanselmann
    (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
140 c50645c0 Michael Hanselmann
    self.assertEqual(salt, salt2)
141 c50645c0 Michael Hanselmann
    self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
142 c50645c0 Michael Hanselmann
143 c50645c0 Michael Hanselmann
    # Other key
144 c50645c0 Michael Hanselmann
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
145 c50645c0 Michael Hanselmann
                      pem, self.KEY_OTHER)
146 c50645c0 Michael Hanselmann
147 c50645c0 Michael Hanselmann
148 c50645c0 Michael Hanselmann
class TestCertVerification(testutils.GanetiTestCase):
149 c50645c0 Michael Hanselmann
  def setUp(self):
150 c50645c0 Michael Hanselmann
    testutils.GanetiTestCase.setUp(self)
151 c50645c0 Michael Hanselmann
152 c50645c0 Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
153 c50645c0 Michael Hanselmann
154 c50645c0 Michael Hanselmann
  def tearDown(self):
155 c50645c0 Michael Hanselmann
    shutil.rmtree(self.tmpdir)
156 c50645c0 Michael Hanselmann
157 c50645c0 Michael Hanselmann
  def testVerifyCertificate(self):
158 00ef625c Michael Hanselmann
    cert_pem = testutils.ReadTestData("cert1.pem")
159 c50645c0 Michael Hanselmann
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
160 c50645c0 Michael Hanselmann
                                           cert_pem)
161 c50645c0 Michael Hanselmann
162 c50645c0 Michael Hanselmann
    # Not checking return value as this certificate is expired
163 c50645c0 Michael Hanselmann
    utils.VerifyX509Certificate(cert, 30, 7)
164 c50645c0 Michael Hanselmann
165 f97a7ada Iustin Pop
  @staticmethod
166 f97a7ada Iustin Pop
  def _GenCert(key, before, validity):
167 f97a7ada Iustin Pop
    # Urgh... mostly copied from x509.py :(
168 f97a7ada Iustin Pop
169 f97a7ada Iustin Pop
    # Create self-signed certificate
170 f97a7ada Iustin Pop
    cert = OpenSSL.crypto.X509()
171 f97a7ada Iustin Pop
    cert.set_serial_number(1)
172 f97a7ada Iustin Pop
    if before != 0:
173 f97a7ada Iustin Pop
      cert.gmtime_adj_notBefore(int(before))
174 f97a7ada Iustin Pop
    cert.gmtime_adj_notAfter(validity)
175 f97a7ada Iustin Pop
    cert.set_issuer(cert.get_subject())
176 f97a7ada Iustin Pop
    cert.set_pubkey(key)
177 f97a7ada Iustin Pop
    cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
178 f97a7ada Iustin Pop
    return cert
179 f97a7ada Iustin Pop
180 f97a7ada Iustin Pop
  def testClockSkew(self):
181 f97a7ada Iustin Pop
    SKEW = constants.NODE_MAX_CLOCK_SKEW
182 f97a7ada Iustin Pop
    # Create private and public key
183 f97a7ada Iustin Pop
    key = OpenSSL.crypto.PKey()
184 f97a7ada Iustin Pop
    key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
185 f97a7ada Iustin Pop
186 f97a7ada Iustin Pop
    validity = 7 * 86400
187 f97a7ada Iustin Pop
    # skew small enough, accepting cert; note that this is a timed
188 f97a7ada Iustin Pop
    # test, and could fail if the machine is so loaded that the next
189 f97a7ada Iustin Pop
    # few lines take more than NODE_MAX_CLOCK_SKEW / 2
190 f97a7ada Iustin Pop
    for before in [-1, 0, SKEW / 4, SKEW / 2]:
191 f97a7ada Iustin Pop
      cert = self._GenCert(key, before, validity)
192 f97a7ada Iustin Pop
      result = utils.VerifyX509Certificate(cert, 1, 2)
193 f97a7ada Iustin Pop
      self.assertEqual(result, (None, None))
194 f97a7ada Iustin Pop
195 f97a7ada Iustin Pop
    # skew too great, not accepting certs
196 302424e7 Iustin Pop
    for before in [SKEW * 2, SKEW * 10]:
197 f97a7ada Iustin Pop
      cert = self._GenCert(key, before, validity)
198 f97a7ada Iustin Pop
      (status, msg) = utils.VerifyX509Certificate(cert, 1, 2)
199 f97a7ada Iustin Pop
      self.assertEqual(status, utils.CERT_WARNING)
200 f97a7ada Iustin Pop
      self.assertTrue(msg.startswith("Certificate not yet valid"))
201 f97a7ada Iustin Pop
202 c50645c0 Michael Hanselmann
203 c50645c0 Michael Hanselmann
class TestVerifyCertificateInner(unittest.TestCase):
204 c50645c0 Michael Hanselmann
  def test(self):
205 c50645c0 Michael Hanselmann
    vci = utils.x509._VerifyCertificateInner
206 c50645c0 Michael Hanselmann
207 c50645c0 Michael Hanselmann
    # Valid
208 c50645c0 Michael Hanselmann
    self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
209 c50645c0 Michael Hanselmann
                     (None, None))
210 c50645c0 Michael Hanselmann
211 c50645c0 Michael Hanselmann
    # Not yet valid
212 c50645c0 Michael Hanselmann
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
213 c50645c0 Michael Hanselmann
    self.assertEqual(errcode, utils.CERT_WARNING)
214 c50645c0 Michael Hanselmann
215 c50645c0 Michael Hanselmann
    # Expiring soon
216 c50645c0 Michael Hanselmann
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
217 c50645c0 Michael Hanselmann
    self.assertEqual(errcode, utils.CERT_ERROR)
218 c50645c0 Michael Hanselmann
219 c50645c0 Michael Hanselmann
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
220 c50645c0 Michael Hanselmann
    self.assertEqual(errcode, utils.CERT_WARNING)
221 c50645c0 Michael Hanselmann
222 c50645c0 Michael Hanselmann
    (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
223 c50645c0 Michael Hanselmann
    self.assertEqual(errcode, None)
224 c50645c0 Michael Hanselmann
225 c50645c0 Michael Hanselmann
    # Expired
226 c50645c0 Michael Hanselmann
    (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
227 c50645c0 Michael Hanselmann
    self.assertEqual(errcode, utils.CERT_ERROR)
228 c50645c0 Michael Hanselmann
229 c50645c0 Michael Hanselmann
    (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
230 c50645c0 Michael Hanselmann
    self.assertEqual(errcode, utils.CERT_ERROR)
231 c50645c0 Michael Hanselmann
232 c50645c0 Michael Hanselmann
    (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
233 c50645c0 Michael Hanselmann
    self.assertEqual(errcode, utils.CERT_ERROR)
234 c50645c0 Michael Hanselmann
235 c50645c0 Michael Hanselmann
    (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
236 c50645c0 Michael Hanselmann
    self.assertEqual(errcode, utils.CERT_ERROR)
237 c50645c0 Michael Hanselmann
238 c50645c0 Michael Hanselmann
239 c50645c0 Michael Hanselmann
class TestGenerateSelfSignedX509Cert(unittest.TestCase):
240 c50645c0 Michael Hanselmann
  def setUp(self):
241 c50645c0 Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
242 c50645c0 Michael Hanselmann
243 c50645c0 Michael Hanselmann
  def tearDown(self):
244 c50645c0 Michael Hanselmann
    shutil.rmtree(self.tmpdir)
245 c50645c0 Michael Hanselmann
246 c50645c0 Michael Hanselmann
  def _checkRsaPrivateKey(self, key):
247 c50645c0 Michael Hanselmann
    lines = key.splitlines()
248 ecc760ce Guido Trotter
    return (("-----BEGIN RSA PRIVATE KEY-----" in lines and
249 ecc760ce Guido Trotter
             "-----END RSA PRIVATE KEY-----" in lines) or
250 ecc760ce Guido Trotter
            ("-----BEGIN PRIVATE KEY-----" in lines and
251 ecc760ce Guido Trotter
             "-----END PRIVATE KEY-----" in lines))
252 c50645c0 Michael Hanselmann
253 c50645c0 Michael Hanselmann
  def _checkCertificate(self, cert):
254 c50645c0 Michael Hanselmann
    lines = cert.splitlines()
255 c50645c0 Michael Hanselmann
    return ("-----BEGIN CERTIFICATE-----" in lines and
256 c50645c0 Michael Hanselmann
            "-----END CERTIFICATE-----" in lines)
257 c50645c0 Michael Hanselmann
258 c50645c0 Michael Hanselmann
  def test(self):
259 c50645c0 Michael Hanselmann
    for common_name in [None, ".", "Ganeti", "node1.example.com"]:
260 ab4b1cf2 Helga Velroyen
      (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300,
261 ab4b1cf2 Helga Velroyen
                                                             1)
262 c50645c0 Michael Hanselmann
      self._checkRsaPrivateKey(key_pem)
263 c50645c0 Michael Hanselmann
      self._checkCertificate(cert_pem)
264 c50645c0 Michael Hanselmann
265 c50645c0 Michael Hanselmann
      key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
266 c50645c0 Michael Hanselmann
                                           key_pem)
267 c50645c0 Michael Hanselmann
      self.assert_(key.bits() >= 1024)
268 c50645c0 Michael Hanselmann
      self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
269 c50645c0 Michael Hanselmann
      self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
270 c50645c0 Michael Hanselmann
271 c50645c0 Michael Hanselmann
      x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
272 c50645c0 Michael Hanselmann
                                             cert_pem)
273 c50645c0 Michael Hanselmann
      self.failIf(x509.has_expired())
274 c50645c0 Michael Hanselmann
      self.assertEqual(x509.get_issuer().CN, common_name)
275 c50645c0 Michael Hanselmann
      self.assertEqual(x509.get_subject().CN, common_name)
276 c50645c0 Michael Hanselmann
      self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
277 c50645c0 Michael Hanselmann
278 c50645c0 Michael Hanselmann
  def testLegacy(self):
279 c50645c0 Michael Hanselmann
    cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
280 c50645c0 Michael Hanselmann
281 ab4b1cf2 Helga Velroyen
    utils.GenerateSelfSignedSslCert(cert1_filename, 1, validity=1)
282 c50645c0 Michael Hanselmann
283 c50645c0 Michael Hanselmann
    cert1 = utils.ReadFile(cert1_filename)
284 c50645c0 Michael Hanselmann
285 c50645c0 Michael Hanselmann
    self.assert_(self._checkRsaPrivateKey(cert1))
286 c50645c0 Michael Hanselmann
    self.assert_(self._checkCertificate(cert1))
287 c50645c0 Michael Hanselmann
288 c50645c0 Michael Hanselmann
289 0602cef3 Michael Hanselmann
class TestCheckNodeCertificate(testutils.GanetiTestCase):
290 0602cef3 Michael Hanselmann
  def setUp(self):
291 0602cef3 Michael Hanselmann
    testutils.GanetiTestCase.setUp(self)
292 0602cef3 Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
293 0602cef3 Michael Hanselmann
294 0602cef3 Michael Hanselmann
  def tearDown(self):
295 0602cef3 Michael Hanselmann
    testutils.GanetiTestCase.tearDown(self)
296 0602cef3 Michael Hanselmann
    shutil.rmtree(self.tmpdir)
297 0602cef3 Michael Hanselmann
298 0602cef3 Michael Hanselmann
  def testMismatchingKey(self):
299 00ef625c Michael Hanselmann
    other_cert = testutils.TestDataFilename("cert1.pem")
300 00ef625c Michael Hanselmann
    node_cert = testutils.TestDataFilename("cert2.pem")
301 0602cef3 Michael Hanselmann
302 0602cef3 Michael Hanselmann
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
303 0602cef3 Michael Hanselmann
                                           utils.ReadFile(other_cert))
304 0602cef3 Michael Hanselmann
305 0602cef3 Michael Hanselmann
    try:
306 0602cef3 Michael Hanselmann
      utils.CheckNodeCertificate(cert, _noded_cert_file=node_cert)
307 0602cef3 Michael Hanselmann
    except errors.GenericError, err:
308 0602cef3 Michael Hanselmann
      self.assertEqual(str(err),
309 0602cef3 Michael Hanselmann
                       "Given cluster certificate does not match local key")
310 0602cef3 Michael Hanselmann
    else:
311 0602cef3 Michael Hanselmann
      self.fail("Exception was not raised")
312 0602cef3 Michael Hanselmann
313 0602cef3 Michael Hanselmann
  def testMatchingKey(self):
314 00ef625c Michael Hanselmann
    cert_filename = testutils.TestDataFilename("cert2.pem")
315 0602cef3 Michael Hanselmann
316 0602cef3 Michael Hanselmann
    # Extract certificate
317 0602cef3 Michael Hanselmann
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
318 0602cef3 Michael Hanselmann
                                           utils.ReadFile(cert_filename))
319 0602cef3 Michael Hanselmann
    cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
320 0602cef3 Michael Hanselmann
                                               cert)
321 0602cef3 Michael Hanselmann
322 0602cef3 Michael Hanselmann
    utils.CheckNodeCertificate(cert, _noded_cert_file=cert_filename)
323 0602cef3 Michael Hanselmann
324 0602cef3 Michael Hanselmann
  def testMissingFile(self):
325 00ef625c Michael Hanselmann
    cert_path = testutils.TestDataFilename("cert1.pem")
326 0602cef3 Michael Hanselmann
    nodecert = utils.PathJoin(self.tmpdir, "does-not-exist")
327 0602cef3 Michael Hanselmann
328 0602cef3 Michael Hanselmann
    utils.CheckNodeCertificate(NotImplemented, _noded_cert_file=nodecert)
329 0602cef3 Michael Hanselmann
330 0602cef3 Michael Hanselmann
    self.assertFalse(os.path.exists(nodecert))
331 0602cef3 Michael Hanselmann
332 0602cef3 Michael Hanselmann
  def testInvalidCertificate(self):
333 0602cef3 Michael Hanselmann
    tmpfile = utils.PathJoin(self.tmpdir, "cert")
334 0602cef3 Michael Hanselmann
    utils.WriteFile(tmpfile, data="not a certificate")
335 0602cef3 Michael Hanselmann
336 0602cef3 Michael Hanselmann
    self.assertRaises(errors.X509CertError, utils.CheckNodeCertificate,
337 0602cef3 Michael Hanselmann
                      NotImplemented, _noded_cert_file=tmpfile)
338 0602cef3 Michael Hanselmann
339 0602cef3 Michael Hanselmann
  def testNoPrivateKey(self):
340 00ef625c Michael Hanselmann
    cert = testutils.TestDataFilename("cert1.pem")
341 0602cef3 Michael Hanselmann
    self.assertRaises(errors.X509CertError, utils.CheckNodeCertificate,
342 0602cef3 Michael Hanselmann
                      NotImplemented, _noded_cert_file=cert)
343 0602cef3 Michael Hanselmann
344 0602cef3 Michael Hanselmann
  def testMismatchInNodeCert(self):
345 00ef625c Michael Hanselmann
    cert1_path = testutils.TestDataFilename("cert1.pem")
346 00ef625c Michael Hanselmann
    cert2_path = testutils.TestDataFilename("cert2.pem")
347 0602cef3 Michael Hanselmann
    tmpfile = utils.PathJoin(self.tmpdir, "cert")
348 0602cef3 Michael Hanselmann
349 0602cef3 Michael Hanselmann
    # Extract certificate
350 0602cef3 Michael Hanselmann
    cert1 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
351 0602cef3 Michael Hanselmann
                                            utils.ReadFile(cert1_path))
352 0602cef3 Michael Hanselmann
    cert1_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
353 0602cef3 Michael Hanselmann
                                                cert1)
354 0602cef3 Michael Hanselmann
355 0602cef3 Michael Hanselmann
    # Extract mismatching key
356 0602cef3 Michael Hanselmann
    key2 = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
357 0602cef3 Michael Hanselmann
                                          utils.ReadFile(cert2_path))
358 0602cef3 Michael Hanselmann
    key2_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM,
359 0602cef3 Michael Hanselmann
                                              key2)
360 0602cef3 Michael Hanselmann
361 0602cef3 Michael Hanselmann
    # Write to file
362 0602cef3 Michael Hanselmann
    utils.WriteFile(tmpfile, data=cert1_pem + key2_pem)
363 0602cef3 Michael Hanselmann
364 0602cef3 Michael Hanselmann
    try:
365 0602cef3 Michael Hanselmann
      utils.CheckNodeCertificate(cert1, _noded_cert_file=tmpfile)
366 0602cef3 Michael Hanselmann
    except errors.X509CertError, err:
367 0602cef3 Michael Hanselmann
      self.assertEqual(err.args,
368 0602cef3 Michael Hanselmann
                       (tmpfile, "Certificate does not match with private key"))
369 0602cef3 Michael Hanselmann
    else:
370 0602cef3 Michael Hanselmann
      self.fail("Exception was not raised")
371 0602cef3 Michael Hanselmann
372 0602cef3 Michael Hanselmann
373 c50645c0 Michael Hanselmann
if __name__ == "__main__":
374 c50645c0 Michael Hanselmann
  testutils.GanetiTestProgram()