4 # Copyright (C) 2012 Google Inc.
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
14 # General Public License for more details.
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
22 """Script for testing ganeti.tools.node_daemon_setup"""
30 from ganeti import errors
31 from ganeti import constants
32 from ganeti import serializer
33 from ganeti import pathutils
34 from ganeti import compat
35 from ganeti import utils
36 from ganeti.tools import node_daemon_setup
41 _SetupError = node_daemon_setup.SetupError
44 class TestLoadData(unittest.TestCase):
46 for data in ["", "{", "}"]:
47 self.assertRaises(errors.ParseError, node_daemon_setup.LoadData, data)
49 def testInvalidDataStructure(self):
50 raw = serializer.DumpJson({
51 "some other thing": False,
53 self.assertRaises(errors.ParseError, node_daemon_setup.LoadData, raw)
55 raw = serializer.DumpJson([])
56 self.assertRaises(errors.ParseError, node_daemon_setup.LoadData, raw)
58 def testValidData(self):
59 raw = serializer.DumpJson({})
60 self.assertEqual(node_daemon_setup.LoadData(raw), {})
63 class TestVerifyCertificate(testutils.GanetiTestCase):
65 testutils.GanetiTestCase.setUp(self)
66 self.tmpdir = tempfile.mkdtemp()
69 testutils.GanetiTestCase.tearDown(self)
70 shutil.rmtree(self.tmpdir)
73 self.assertRaises(_SetupError, node_daemon_setup.VerifyCertificate,
74 {}, _verify_fn=NotImplemented)
76 def testVerificationSuccessWithCert(self):
77 node_daemon_setup.VerifyCertificate({
78 constants.NDS_NODE_DAEMON_CERTIFICATE: "something",
79 }, _verify_fn=lambda _: None)
81 def testNoPrivateKey(self):
82 cert_filename = testutils.TestDataFilename("cert1.pem")
83 cert_pem = utils.ReadFile(cert_filename)
85 self.assertRaises(errors.X509CertError,
86 node_daemon_setup._VerifyCertificate,
87 cert_pem, _check_fn=NotImplemented)
89 def testInvalidCertificate(self):
90 self.assertRaises(errors.X509CertError,
91 node_daemon_setup._VerifyCertificate,
92 "Something that's not a certificate",
93 _check_fn=NotImplemented)
97 assert cert.get_subject()
99 def testSuccessfulCheck(self):
100 cert_filename = testutils.TestDataFilename("cert2.pem")
101 cert_pem = utils.ReadFile(cert_filename)
103 node_daemon_setup._VerifyCertificate(cert_pem, _check_fn=self._Check)
105 cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM, result)
106 self.assertTrue(cert)
108 key = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM, result)
111 def testMismatchingKey(self):
112 cert1_path = testutils.TestDataFilename("cert1.pem")
113 cert2_path = testutils.TestDataFilename("cert2.pem")
115 # Extract certificate
116 cert1 = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
117 utils.ReadFile(cert1_path))
118 cert1_pem = OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_PEM,
121 # Extract mismatching key
122 key2 = OpenSSL.crypto.load_privatekey(OpenSSL.crypto.FILETYPE_PEM,
123 utils.ReadFile(cert2_path))
124 key2_pem = OpenSSL.crypto.dump_privatekey(OpenSSL.crypto.FILETYPE_PEM,
128 node_daemon_setup._VerifyCertificate(cert1_pem + key2_pem,
129 _check_fn=NotImplemented)
130 except errors.X509CertError, err:
131 self.assertEqual(err.args,
132 ("(stdin)", "Certificate is not signed with given key"))
134 self.fail("Exception was not raised")
137 class TestVerifyClusterName(unittest.TestCase):
139 unittest.TestCase.setUp(self)
140 self.tmpdir = tempfile.mkdtemp()
143 unittest.TestCase.tearDown(self)
144 shutil.rmtree(self.tmpdir)
146 def testNoName(self):
147 self.assertRaises(_SetupError, node_daemon_setup.VerifyClusterName,
148 {}, _verify_fn=NotImplemented)
151 def _FailingVerify(name):
152 assert name == "somecluster.example.com"
153 raise errors.GenericError()
155 def testFailingVerification(self):
157 constants.NDS_CLUSTER_NAME: "somecluster.example.com",
160 self.assertRaises(errors.GenericError, node_daemon_setup.VerifyClusterName,
161 data, _verify_fn=self._FailingVerify)
163 def testSuccess(self):
165 constants.NDS_CLUSTER_NAME: "cluster.example.com",
169 node_daemon_setup.VerifyClusterName(data, _verify_fn=lambda _: None)
171 self.assertEqual(result, "cluster.example.com")
174 class TestVerifySsconf(unittest.TestCase):
175 def testNoSsconf(self):
176 self.assertRaises(_SetupError, node_daemon_setup.VerifySsconf,
177 {}, NotImplemented, _verify_fn=NotImplemented)
179 for items in [None, {}]:
180 self.assertRaises(_SetupError, node_daemon_setup.VerifySsconf, {
181 constants.NDS_SSCONF: items,
182 }, NotImplemented, _verify_fn=NotImplemented)
184 def _Check(self, names):
185 self.assertEqual(frozenset(names), frozenset([
186 constants.SS_CLUSTER_NAME,
187 constants.SS_INSTANCE_LIST,
190 def testSuccess(self):
192 constants.SS_CLUSTER_NAME: "cluster.example.com",
193 constants.SS_INSTANCE_LIST: [],
196 result = node_daemon_setup.VerifySsconf({
197 constants.NDS_SSCONF: ssdata,
198 }, "cluster.example.com", _verify_fn=self._Check)
200 self.assertEqual(result, ssdata)
202 self.assertRaises(_SetupError, node_daemon_setup.VerifySsconf, {
203 constants.NDS_SSCONF: ssdata,
204 }, "wrong.example.com", _verify_fn=self._Check)
206 def testInvalidKey(self):
207 self.assertRaises(errors.GenericError, node_daemon_setup.VerifySsconf, {
208 constants.NDS_SSCONF: {
209 "no-valid-ssconf-key": "value",
214 if __name__ == "__main__":
215 testutils.GanetiTestProgram()