Statistics
| Branch: | Tag: | Revision:

root / test / py / ganeti.utils.x509_unittest.py @ 14933c17

History | View | Annotate | Download (13.4 kB)

1 c50645c0 Michael Hanselmann
#!/usr/bin/python
2 c50645c0 Michael Hanselmann
#
3 c50645c0 Michael Hanselmann
4 f97a7ada Iustin Pop
# Copyright (C) 2006, 2007, 2010, 2011, 2012 Google Inc.
5 c50645c0 Michael Hanselmann
#
6 c50645c0 Michael Hanselmann
# This program is free software; you can redistribute it and/or modify
7 c50645c0 Michael Hanselmann
# it under the terms of the GNU General Public License as published by
8 c50645c0 Michael Hanselmann
# the Free Software Foundation; either version 2 of the License, or
9 c50645c0 Michael Hanselmann
# (at your option) any later version.
10 c50645c0 Michael Hanselmann
#
11 c50645c0 Michael Hanselmann
# This program is distributed in the hope that it will be useful, but
12 c50645c0 Michael Hanselmann
# WITHOUT ANY WARRANTY; without even the implied warranty of
13 c50645c0 Michael Hanselmann
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 c50645c0 Michael Hanselmann
# General Public License for more details.
15 c50645c0 Michael Hanselmann
#
16 c50645c0 Michael Hanselmann
# You should have received a copy of the GNU General Public License
17 c50645c0 Michael Hanselmann
# along with this program; if not, write to the Free Software
18 c50645c0 Michael Hanselmann
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 c50645c0 Michael Hanselmann
# 02110-1301, USA.
20 c50645c0 Michael Hanselmann
21 c50645c0 Michael Hanselmann
22 c50645c0 Michael Hanselmann
"""Script for testing ganeti.utils.x509"""
23 c50645c0 Michael Hanselmann
24 c50645c0 Michael Hanselmann
import os
25 c50645c0 Michael Hanselmann
import tempfile
26 c50645c0 Michael Hanselmann
import unittest
27 c50645c0 Michael Hanselmann
import shutil
28 c50645c0 Michael Hanselmann
import time
29 c50645c0 Michael Hanselmann
import OpenSSL
30 c50645c0 Michael Hanselmann
import distutils.version
31 c50645c0 Michael Hanselmann
import string
32 c50645c0 Michael Hanselmann
33 c50645c0 Michael Hanselmann
from ganeti import constants
34 c50645c0 Michael Hanselmann
from ganeti import utils
35 c50645c0 Michael Hanselmann
from ganeti import compat
36 c50645c0 Michael Hanselmann
from ganeti import errors
37 c50645c0 Michael Hanselmann
38 c50645c0 Michael Hanselmann
import testutils
39 c50645c0 Michael Hanselmann
40 c50645c0 Michael Hanselmann
41 c50645c0 Michael Hanselmann
class TestParseAsn1Generalizedtime(unittest.TestCase):
42 c50645c0 Michael Hanselmann
  def setUp(self):
43 c50645c0 Michael Hanselmann
    self._Parse = utils.x509._ParseAsn1Generalizedtime
44 c50645c0 Michael Hanselmann
45 c50645c0 Michael Hanselmann
  def test(self):
46 c50645c0 Michael Hanselmann
    # UTC
47 c50645c0 Michael Hanselmann
    self.assertEqual(self._Parse("19700101000000Z"), 0)
48 c50645c0 Michael Hanselmann
    self.assertEqual(self._Parse("20100222174152Z"), 1266860512)
49 c50645c0 Michael Hanselmann
    self.assertEqual(self._Parse("20380119031407Z"), (2**31) - 1)
50 c50645c0 Michael Hanselmann
51 c50645c0 Michael Hanselmann
    # With offset
52 c50645c0 Michael Hanselmann
    self.assertEqual(self._Parse("20100222174152+0000"), 1266860512)
53 c50645c0 Michael Hanselmann
    self.assertEqual(self._Parse("20100223131652+0000"), 1266931012)
54 c50645c0 Michael Hanselmann
    self.assertEqual(self._Parse("20100223051808-0800"), 1266931088)
55 c50645c0 Michael Hanselmann
    self.assertEqual(self._Parse("20100224002135+1100"), 1266931295)
56 c50645c0 Michael Hanselmann
    self.assertEqual(self._Parse("19700101000000-0100"), 3600)
57 c50645c0 Michael Hanselmann
58 c50645c0 Michael Hanselmann
    # Leap seconds are not supported by datetime.datetime
59 c50645c0 Michael Hanselmann
    self.assertRaises(ValueError, self._Parse, "19841231235960+0000")
60 c50645c0 Michael Hanselmann
    self.assertRaises(ValueError, self._Parse, "19920630235960+0000")
61 c50645c0 Michael Hanselmann
62 c50645c0 Michael Hanselmann
    # Errors
63 c50645c0 Michael Hanselmann
    self.assertRaises(ValueError, self._Parse, "")
64 c50645c0 Michael Hanselmann
    self.assertRaises(ValueError, self._Parse, "invalid")
65 c50645c0 Michael Hanselmann
    self.assertRaises(ValueError, self._Parse, "20100222174152")
66 c50645c0 Michael Hanselmann
    self.assertRaises(ValueError, self._Parse, "Mon Feb 22 17:47:02 UTC 2010")
67 c50645c0 Michael Hanselmann
    self.assertRaises(ValueError, self._Parse, "2010-02-22 17:42:02")
68 c50645c0 Michael Hanselmann
69 c50645c0 Michael Hanselmann
70 c50645c0 Michael Hanselmann
class TestGetX509CertValidity(testutils.GanetiTestCase):
71 c50645c0 Michael Hanselmann
  def setUp(self):
72 c50645c0 Michael Hanselmann
    testutils.GanetiTestCase.setUp(self)
73 c50645c0 Michael Hanselmann
74 c50645c0 Michael Hanselmann
    pyopenssl_version = distutils.version.LooseVersion(OpenSSL.__version__)
75 c50645c0 Michael Hanselmann
76 c50645c0 Michael Hanselmann
    # Test whether we have pyOpenSSL 0.7 or above
77 c50645c0 Michael Hanselmann
    self.pyopenssl0_7 = (pyopenssl_version >= "0.7")
78 c50645c0 Michael Hanselmann
79 c50645c0 Michael Hanselmann
    if not self.pyopenssl0_7:
80 c50645c0 Michael Hanselmann
      warnings.warn("This test requires pyOpenSSL 0.7 or above to"
81 c50645c0 Michael Hanselmann
                    " function correctly")
82 c50645c0 Michael Hanselmann
83 c50645c0 Michael Hanselmann
  def _LoadCert(self, name):
84 c50645c0 Michael Hanselmann
    return OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
85 00ef625c Michael Hanselmann
                                           testutils.ReadTestData(name))
86 c50645c0 Michael Hanselmann
87 c50645c0 Michael Hanselmann
  def test(self):
88 c50645c0 Michael Hanselmann
    validity = utils.GetX509CertValidity(self._LoadCert("cert1.pem"))
89 c50645c0 Michael Hanselmann
    if self.pyopenssl0_7:
90 c50645c0 Michael Hanselmann
      self.assertEqual(validity, (1266919967, 1267524767))
91 c50645c0 Michael Hanselmann
    else:
92 c50645c0 Michael Hanselmann
      self.assertEqual(validity, (None, None))
93 c50645c0 Michael Hanselmann
94 c50645c0 Michael Hanselmann
95 c50645c0 Michael Hanselmann
class TestSignX509Certificate(unittest.TestCase):
96 c50645c0 Michael Hanselmann
  KEY = "My private key!"
97 c50645c0 Michael Hanselmann
  KEY_OTHER = "Another key"
98 c50645c0 Michael Hanselmann
99 c50645c0 Michael Hanselmann
  def test(self):
100 c50645c0 Michael Hanselmann
    # Generate certificate valid for 5 minutes
101 c50645c0 Michael Hanselmann
    (_, cert_pem) = utils.GenerateSelfSignedX509Cert(None, 300)
102 c50645c0 Michael Hanselmann
103 c50645c0 Michael Hanselmann
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
104 c50645c0 Michael Hanselmann
                                           cert_pem)
105 c50645c0 Michael Hanselmann
106 c50645c0 Michael Hanselmann
    # No signature at all
107 c50645c0 Michael Hanselmann
    self.assertRaises(errors.GenericError,
108 c50645c0 Michael Hanselmann
                      utils.LoadSignedX509Certificate, cert_pem, self.KEY)
109 c50645c0 Michael Hanselmann
110 c50645c0 Michael Hanselmann
    # Invalid input
111 c50645c0 Michael Hanselmann
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
112 c50645c0 Michael Hanselmann
                      "", self.KEY)
113 c50645c0 Michael Hanselmann
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
114 c50645c0 Michael Hanselmann
                      "X-Ganeti-Signature: \n", self.KEY)
115 c50645c0 Michael Hanselmann
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
116 c50645c0 Michael Hanselmann
                      "X-Ganeti-Sign: $1234$abcdef\n", self.KEY)
117 c50645c0 Michael Hanselmann
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
118 c50645c0 Michael Hanselmann
                      "X-Ganeti-Signature: $1234567890$abcdef\n", self.KEY)
119 c50645c0 Michael Hanselmann
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
120 c50645c0 Michael Hanselmann
                      "X-Ganeti-Signature: $1234$abc\n\n" + cert_pem, self.KEY)
121 c50645c0 Michael Hanselmann
122 c50645c0 Michael Hanselmann
    # Invalid salt
123 c50645c0 Michael Hanselmann
    for salt in list("-_@$,:;/\\ \t\n"):
124 c50645c0 Michael Hanselmann
      self.assertRaises(errors.GenericError, utils.SignX509Certificate,
125 c50645c0 Michael Hanselmann
                        cert_pem, self.KEY, "foo%sbar" % salt)
126 c50645c0 Michael Hanselmann
127 c50645c0 Michael Hanselmann
    for salt in ["HelloWorld", "salt", string.letters, string.digits,
128 c50645c0 Michael Hanselmann
                 utils.GenerateSecret(numbytes=4),
129 c50645c0 Michael Hanselmann
                 utils.GenerateSecret(numbytes=16),
130 c50645c0 Michael Hanselmann
                 "{123:456}".encode("hex")]:
131 c50645c0 Michael Hanselmann
      signed_pem = utils.SignX509Certificate(cert, self.KEY, salt)
132 c50645c0 Michael Hanselmann
133 c50645c0 Michael Hanselmann
      self._Check(cert, salt, signed_pem)
134 c50645c0 Michael Hanselmann
135 c50645c0 Michael Hanselmann
      self._Check(cert, salt, "X-Another-Header: with a value\n" + signed_pem)
136 c50645c0 Michael Hanselmann
      self._Check(cert, salt, (10 * "Hello World!\n") + signed_pem)
137 c50645c0 Michael Hanselmann
      self._Check(cert, salt, (signed_pem + "\n\na few more\n"
138 c50645c0 Michael Hanselmann
                               "lines----\n------ at\nthe end!"))
139 c50645c0 Michael Hanselmann
140 c50645c0 Michael Hanselmann
  def _Check(self, cert, salt, pem):
141 c50645c0 Michael Hanselmann
    (cert2, salt2) = utils.LoadSignedX509Certificate(pem, self.KEY)
142 c50645c0 Michael Hanselmann
    self.assertEqual(salt, salt2)
143 c50645c0 Michael Hanselmann
    self.assertEqual(cert.digest("sha1"), cert2.digest("sha1"))
144 c50645c0 Michael Hanselmann
145 c50645c0 Michael Hanselmann
    # Other key
146 c50645c0 Michael Hanselmann
    self.assertRaises(errors.GenericError, utils.LoadSignedX509Certificate,
147 c50645c0 Michael Hanselmann
                      pem, self.KEY_OTHER)
148 c50645c0 Michael Hanselmann
149 c50645c0 Michael Hanselmann
150 c50645c0 Michael Hanselmann
class TestCertVerification(testutils.GanetiTestCase):
151 c50645c0 Michael Hanselmann
  def setUp(self):
152 c50645c0 Michael Hanselmann
    testutils.GanetiTestCase.setUp(self)
153 c50645c0 Michael Hanselmann
154 c50645c0 Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
155 c50645c0 Michael Hanselmann
156 c50645c0 Michael Hanselmann
  def tearDown(self):
157 c50645c0 Michael Hanselmann
    shutil.rmtree(self.tmpdir)
158 c50645c0 Michael Hanselmann
159 c50645c0 Michael Hanselmann
  def testVerifyCertificate(self):
160 00ef625c Michael Hanselmann
    cert_pem = testutils.ReadTestData("cert1.pem")
161 c50645c0 Michael Hanselmann
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
162 c50645c0 Michael Hanselmann
                                           cert_pem)
163 c50645c0 Michael Hanselmann
164 c50645c0 Michael Hanselmann
    # Not checking return value as this certificate is expired
165 c50645c0 Michael Hanselmann
    utils.VerifyX509Certificate(cert, 30, 7)
166 c50645c0 Michael Hanselmann
167 f97a7ada Iustin Pop
  @staticmethod
168 f97a7ada Iustin Pop
  def _GenCert(key, before, validity):
169 f97a7ada Iustin Pop
    # Urgh... mostly copied from x509.py :(
170 f97a7ada Iustin Pop
171 f97a7ada Iustin Pop
    # Create self-signed certificate
172 f97a7ada Iustin Pop
    cert = OpenSSL.crypto.X509()
173 f97a7ada Iustin Pop
    cert.set_serial_number(1)
174 f97a7ada Iustin Pop
    if before != 0:
175 f97a7ada Iustin Pop
      cert.gmtime_adj_notBefore(int(before))
176 f97a7ada Iustin Pop
    cert.gmtime_adj_notAfter(validity)
177 f97a7ada Iustin Pop
    cert.set_issuer(cert.get_subject())
178 f97a7ada Iustin Pop
    cert.set_pubkey(key)
179 f97a7ada Iustin Pop
    cert.sign(key, constants.X509_CERT_SIGN_DIGEST)
180 f97a7ada Iustin Pop
    return cert
181 f97a7ada Iustin Pop
182 f97a7ada Iustin Pop
  def testClockSkew(self):
183 f97a7ada Iustin Pop
    SKEW = constants.NODE_MAX_CLOCK_SKEW
184 f97a7ada Iustin Pop
    # Create private and public key
185 f97a7ada Iustin Pop
    key = OpenSSL.crypto.PKey()
186 f97a7ada Iustin Pop
    key.generate_key(OpenSSL.crypto.TYPE_RSA, constants.RSA_KEY_BITS)
187 f97a7ada Iustin Pop
188 f97a7ada Iustin Pop
    validity = 7 * 86400
189 f97a7ada Iustin Pop
    # skew small enough, accepting cert; note that this is a timed
190 f97a7ada Iustin Pop
    # test, and could fail if the machine is so loaded that the next
191 f97a7ada Iustin Pop
    # few lines take more than NODE_MAX_CLOCK_SKEW / 2
192 f97a7ada Iustin Pop
    for before in [-1, 0, SKEW / 4, SKEW / 2]:
193 f97a7ada Iustin Pop
      cert = self._GenCert(key, before, validity)
194 f97a7ada Iustin Pop
      result = utils.VerifyX509Certificate(cert, 1, 2)
195 f97a7ada Iustin Pop
      self.assertEqual(result, (None, None))
196 f97a7ada Iustin Pop
197 f97a7ada Iustin Pop
    # skew too great, not accepting certs
198 302424e7 Iustin Pop
    for before in [SKEW * 2, SKEW * 10]:
199 f97a7ada Iustin Pop
      cert = self._GenCert(key, before, validity)
200 f97a7ada Iustin Pop
      (status, msg) = utils.VerifyX509Certificate(cert, 1, 2)
201 f97a7ada Iustin Pop
      self.assertEqual(status, utils.CERT_WARNING)
202 f97a7ada Iustin Pop
      self.assertTrue(msg.startswith("Certificate not yet valid"))
203 f97a7ada Iustin Pop
204 c50645c0 Michael Hanselmann
205 c50645c0 Michael Hanselmann
class TestVerifyCertificateInner(unittest.TestCase):
206 c50645c0 Michael Hanselmann
  def test(self):
207 c50645c0 Michael Hanselmann
    vci = utils.x509._VerifyCertificateInner
208 c50645c0 Michael Hanselmann
209 c50645c0 Michael Hanselmann
    # Valid
210 c50645c0 Michael Hanselmann
    self.assertEqual(vci(False, 1263916313, 1298476313, 1266940313, 30, 7),
211 c50645c0 Michael Hanselmann
                     (None, None))
212 c50645c0 Michael Hanselmann
213 c50645c0 Michael Hanselmann
    # Not yet valid
214 c50645c0 Michael Hanselmann
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266075600, 30, 7)
215 c50645c0 Michael Hanselmann
    self.assertEqual(errcode, utils.CERT_WARNING)
216 c50645c0 Michael Hanselmann
217 c50645c0 Michael Hanselmann
    # Expiring soon
218 c50645c0 Michael Hanselmann
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 7)
219 c50645c0 Michael Hanselmann
    self.assertEqual(errcode, utils.CERT_ERROR)
220 c50645c0 Michael Hanselmann
221 c50645c0 Michael Hanselmann
    (errcode, msg) = vci(False, 1266507600, 1267544400, 1266939600, 30, 1)
222 c50645c0 Michael Hanselmann
    self.assertEqual(errcode, utils.CERT_WARNING)
223 c50645c0 Michael Hanselmann
224 c50645c0 Michael Hanselmann
    (errcode, msg) = vci(False, 1266507600, None, 1266939600, 30, 7)
225 c50645c0 Michael Hanselmann
    self.assertEqual(errcode, None)
226 c50645c0 Michael Hanselmann
227 c50645c0 Michael Hanselmann
    # Expired
228 c50645c0 Michael Hanselmann
    (errcode, msg) = vci(True, 1266507600, 1267544400, 1266939600, 30, 7)
229 c50645c0 Michael Hanselmann
    self.assertEqual(errcode, utils.CERT_ERROR)
230 c50645c0 Michael Hanselmann
231 c50645c0 Michael Hanselmann
    (errcode, msg) = vci(True, None, 1267544400, 1266939600, 30, 7)
232 c50645c0 Michael Hanselmann
    self.assertEqual(errcode, utils.CERT_ERROR)
233 c50645c0 Michael Hanselmann
234 c50645c0 Michael Hanselmann
    (errcode, msg) = vci(True, 1266507600, None, 1266939600, 30, 7)
235 c50645c0 Michael Hanselmann
    self.assertEqual(errcode, utils.CERT_ERROR)
236 c50645c0 Michael Hanselmann
237 c50645c0 Michael Hanselmann
    (errcode, msg) = vci(True, None, None, 1266939600, 30, 7)
238 c50645c0 Michael Hanselmann
    self.assertEqual(errcode, utils.CERT_ERROR)
239 c50645c0 Michael Hanselmann
240 c50645c0 Michael Hanselmann
241 c50645c0 Michael Hanselmann
class TestGenerateSelfSignedX509Cert(unittest.TestCase):
242 c50645c0 Michael Hanselmann
  def setUp(self):
243 c50645c0 Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
244 c50645c0 Michael Hanselmann
245 c50645c0 Michael Hanselmann
  def tearDown(self):
246 c50645c0 Michael Hanselmann
    shutil.rmtree(self.tmpdir)
247 c50645c0 Michael Hanselmann
248 c50645c0 Michael Hanselmann
  def _checkRsaPrivateKey(self, key):
249 c50645c0 Michael Hanselmann
    lines = key.splitlines()
250 ecc760ce Guido Trotter
    return (("-----BEGIN RSA PRIVATE KEY-----" in lines and
251 ecc760ce Guido Trotter
             "-----END RSA PRIVATE KEY-----" in lines) or
252 ecc760ce Guido Trotter
            ("-----BEGIN PRIVATE KEY-----" in lines and
253 ecc760ce Guido Trotter
             "-----END PRIVATE KEY-----" in lines))
254 c50645c0 Michael Hanselmann
255 c50645c0 Michael Hanselmann
  def _checkCertificate(self, cert):
256 c50645c0 Michael Hanselmann
    lines = cert.splitlines()
257 c50645c0 Michael Hanselmann
    return ("-----BEGIN CERTIFICATE-----" in lines and
258 c50645c0 Michael Hanselmann
            "-----END CERTIFICATE-----" in lines)
259 c50645c0 Michael Hanselmann
260 c50645c0 Michael Hanselmann
  def test(self):
261 c50645c0 Michael Hanselmann
    for common_name in [None, ".", "Ganeti", "node1.example.com"]:
262 c50645c0 Michael Hanselmann
      (key_pem, cert_pem) = utils.GenerateSelfSignedX509Cert(common_name, 300)
263 c50645c0 Michael Hanselmann
      self._checkRsaPrivateKey(key_pem)
264 c50645c0 Michael Hanselmann
      self._checkCertificate(cert_pem)
265 c50645c0 Michael Hanselmann
266 c50645c0 Michael Hanselmann
      key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
267 c50645c0 Michael Hanselmann
                                           key_pem)
268 c50645c0 Michael Hanselmann
      self.assert_(key.bits() >= 1024)
269 c50645c0 Michael Hanselmann
      self.assertEqual(key.bits(), constants.RSA_KEY_BITS)
270 c50645c0 Michael Hanselmann
      self.assertEqual(key.type(), OpenSSL.crypto.TYPE_RSA)
271 c50645c0 Michael Hanselmann
272 c50645c0 Michael Hanselmann
      x509 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
273 c50645c0 Michael Hanselmann
                                             cert_pem)
274 c50645c0 Michael Hanselmann
      self.failIf(x509.has_expired())
275 c50645c0 Michael Hanselmann
      self.assertEqual(x509.get_issuer().CN, common_name)
276 c50645c0 Michael Hanselmann
      self.assertEqual(x509.get_subject().CN, common_name)
277 c50645c0 Michael Hanselmann
      self.assertEqual(x509.get_pubkey().bits(), constants.RSA_KEY_BITS)
278 c50645c0 Michael Hanselmann
279 c50645c0 Michael Hanselmann
  def testLegacy(self):
280 c50645c0 Michael Hanselmann
    cert1_filename = os.path.join(self.tmpdir, "cert1.pem")
281 c50645c0 Michael Hanselmann
282 c50645c0 Michael Hanselmann
    utils.GenerateSelfSignedSslCert(cert1_filename, validity=1)
283 c50645c0 Michael Hanselmann
284 c50645c0 Michael Hanselmann
    cert1 = utils.ReadFile(cert1_filename)
285 c50645c0 Michael Hanselmann
286 c50645c0 Michael Hanselmann
    self.assert_(self._checkRsaPrivateKey(cert1))
287 c50645c0 Michael Hanselmann
    self.assert_(self._checkCertificate(cert1))
288 c50645c0 Michael Hanselmann
289 c50645c0 Michael Hanselmann
290 0602cef3 Michael Hanselmann
class TestCheckNodeCertificate(testutils.GanetiTestCase):
291 0602cef3 Michael Hanselmann
  def setUp(self):
292 0602cef3 Michael Hanselmann
    testutils.GanetiTestCase.setUp(self)
293 0602cef3 Michael Hanselmann
    self.tmpdir = tempfile.mkdtemp()
294 0602cef3 Michael Hanselmann
295 0602cef3 Michael Hanselmann
  def tearDown(self):
296 0602cef3 Michael Hanselmann
    testutils.GanetiTestCase.tearDown(self)
297 0602cef3 Michael Hanselmann
    shutil.rmtree(self.tmpdir)
298 0602cef3 Michael Hanselmann
299 0602cef3 Michael Hanselmann
  def testMismatchingKey(self):
300 00ef625c Michael Hanselmann
    other_cert = testutils.TestDataFilename("cert1.pem")
301 00ef625c Michael Hanselmann
    node_cert = testutils.TestDataFilename("cert2.pem")
302 0602cef3 Michael Hanselmann
303 0602cef3 Michael Hanselmann
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
304 0602cef3 Michael Hanselmann
                                           utils.ReadFile(other_cert))
305 0602cef3 Michael Hanselmann
306 0602cef3 Michael Hanselmann
    try:
307 0602cef3 Michael Hanselmann
      utils.CheckNodeCertificate(cert, _noded_cert_file=node_cert)
308 0602cef3 Michael Hanselmann
    except errors.GenericError, err:
309 0602cef3 Michael Hanselmann
      self.assertEqual(str(err),
310 0602cef3 Michael Hanselmann
                       "Given cluster certificate does not match local key")
311 0602cef3 Michael Hanselmann
    else:
312 0602cef3 Michael Hanselmann
      self.fail("Exception was not raised")
313 0602cef3 Michael Hanselmann
314 0602cef3 Michael Hanselmann
  def testMatchingKey(self):
315 00ef625c Michael Hanselmann
    cert_filename = testutils.TestDataFilename("cert2.pem")
316 0602cef3 Michael Hanselmann
317 0602cef3 Michael Hanselmann
    # Extract certificate
318 0602cef3 Michael Hanselmann
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
319 0602cef3 Michael Hanselmann
                                           utils.ReadFile(cert_filename))
320 0602cef3 Michael Hanselmann
    cert_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
321 0602cef3 Michael Hanselmann
                                               cert)
322 0602cef3 Michael Hanselmann
323 0602cef3 Michael Hanselmann
    utils.CheckNodeCertificate(cert, _noded_cert_file=cert_filename)
324 0602cef3 Michael Hanselmann
325 0602cef3 Michael Hanselmann
  def testMissingFile(self):
326 00ef625c Michael Hanselmann
    cert_path = testutils.TestDataFilename("cert1.pem")
327 0602cef3 Michael Hanselmann
    nodecert = utils.PathJoin(self.tmpdir, "does-not-exist")
328 0602cef3 Michael Hanselmann
329 0602cef3 Michael Hanselmann
    utils.CheckNodeCertificate(NotImplemented, _noded_cert_file=nodecert)
330 0602cef3 Michael Hanselmann
331 0602cef3 Michael Hanselmann
    self.assertFalse(os.path.exists(nodecert))
332 0602cef3 Michael Hanselmann
333 0602cef3 Michael Hanselmann
  def testInvalidCertificate(self):
334 0602cef3 Michael Hanselmann
    tmpfile = utils.PathJoin(self.tmpdir, "cert")
335 0602cef3 Michael Hanselmann
    utils.WriteFile(tmpfile, data="not a certificate")
336 0602cef3 Michael Hanselmann
337 0602cef3 Michael Hanselmann
    self.assertRaises(errors.X509CertError, utils.CheckNodeCertificate,
338 0602cef3 Michael Hanselmann
                      NotImplemented, _noded_cert_file=tmpfile)
339 0602cef3 Michael Hanselmann
340 0602cef3 Michael Hanselmann
  def testNoPrivateKey(self):
341 00ef625c Michael Hanselmann
    cert = testutils.TestDataFilename("cert1.pem")
342 0602cef3 Michael Hanselmann
    self.assertRaises(errors.X509CertError, utils.CheckNodeCertificate,
343 0602cef3 Michael Hanselmann
                      NotImplemented, _noded_cert_file=cert)
344 0602cef3 Michael Hanselmann
345 0602cef3 Michael Hanselmann
  def testMismatchInNodeCert(self):
346 00ef625c Michael Hanselmann
    cert1_path = testutils.TestDataFilename("cert1.pem")
347 00ef625c Michael Hanselmann
    cert2_path = testutils.TestDataFilename("cert2.pem")
348 0602cef3 Michael Hanselmann
    tmpfile = utils.PathJoin(self.tmpdir, "cert")
349 0602cef3 Michael Hanselmann
350 0602cef3 Michael Hanselmann
    # Extract certificate
351 0602cef3 Michael Hanselmann
    cert1 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
352 0602cef3 Michael Hanselmann
                                            utils.ReadFile(cert1_path))
353 0602cef3 Michael Hanselmann
    cert1_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
354 0602cef3 Michael Hanselmann
                                                cert1)
355 0602cef3 Michael Hanselmann
356 0602cef3 Michael Hanselmann
    # Extract mismatching key
357 0602cef3 Michael Hanselmann
    key2 = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
358 0602cef3 Michael Hanselmann
                                          utils.ReadFile(cert2_path))
359 0602cef3 Michael Hanselmann
    key2_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM,
360 0602cef3 Michael Hanselmann
                                              key2)
361 0602cef3 Michael Hanselmann
362 0602cef3 Michael Hanselmann
    # Write to file
363 0602cef3 Michael Hanselmann
    utils.WriteFile(tmpfile, data=cert1_pem + key2_pem)
364 0602cef3 Michael Hanselmann
365 0602cef3 Michael Hanselmann
    try:
366 0602cef3 Michael Hanselmann
      utils.CheckNodeCertificate(cert1, _noded_cert_file=tmpfile)
367 0602cef3 Michael Hanselmann
    except errors.X509CertError, err:
368 0602cef3 Michael Hanselmann
      self.assertEqual(err.args,
369 0602cef3 Michael Hanselmann
                       (tmpfile, "Certificate does not match with private key"))
370 0602cef3 Michael Hanselmann
    else:
371 0602cef3 Michael Hanselmann
      self.fail("Exception was not raised")
372 0602cef3 Michael Hanselmann
373 0602cef3 Michael Hanselmann
374 c50645c0 Michael Hanselmann
if __name__ == "__main__":
375 c50645c0 Michael Hanselmann
  testutils.GanetiTestProgram()