Revision a6c43c02 test/py/cmdlib/cluster_unittest.py

b/test/py/cmdlib/cluster_unittest.py
25 25

  
26 26
import OpenSSL
27 27

  
28
import copy
28 29
import unittest
29 30
import operator
30
import os
31
import tempfile
32
import shutil
33

  
34
from collections import defaultdict
35 31

  
36 32
from ganeti.cmdlib import cluster
37 33
from ganeti import constants
......
49 45
import testutils
50 46

  
51 47

  
52
class TestCertVerification(testutils.GanetiTestCase):
53
  def setUp(self):
54
    testutils.GanetiTestCase.setUp(self)
55

  
56
    self.tmpdir = tempfile.mkdtemp()
57

  
58
  def tearDown(self):
59
    shutil.rmtree(self.tmpdir)
60

  
61
  def testVerifyCertificate(self):
62
    cluster._VerifyCertificate(testutils.TestDataFilename("cert1.pem"))
63

  
64
    nonexist_filename = os.path.join(self.tmpdir, "does-not-exist")
65

  
66
    (errcode, msg) = cluster._VerifyCertificate(nonexist_filename)
67
    self.assertEqual(errcode, cluster.LUClusterVerifyConfig.ETYPE_ERROR)
68

  
69
    # Try to load non-certificate file
70
    invalid_cert = testutils.TestDataFilename("bdev-net.txt")
71
    (errcode, msg) = cluster._VerifyCertificate(invalid_cert)
72
    self.assertEqual(errcode, cluster.LUClusterVerifyConfig.ETYPE_ERROR)
73

  
74

  
75 48
class TestClusterVerifySsh(unittest.TestCase):
76 49
  def testMultipleGroups(self):
77 50
    fn = cluster.LUClusterVerifyGroup._SelectSshCheckNodes
......
1013 986
      .patch_object(OpenSSL.crypto, "load_certificate")
1014 987
    self._load_cert_mock = self._load_cert_patcher.start()
1015 988
    self._verify_cert_patcher = testutils \
1016
      .patch_object(utils, "VerifyX509Certificate")
989
      .patch_object(utils, "VerifyCertificate")
1017 990
    self._verify_cert_mock = self._verify_cert_patcher.start()
1018 991
    self._read_file_patcher = testutils.patch_object(utils, "ReadFile")
1019 992
    self._read_file_mock = self._read_file_patcher.start()
......
1037 1010
    self.cfg.AddNewInstance()
1038 1011
    op = opcodes.OpClusterVerifyConfig()
1039 1012
    result = self.ExecOpCode(op)
1040

  
1041 1013
    self.assertTrue(result)
1042 1014

  
1043 1015
  def testDanglingNode(self):
......
1113 1085
    self.ExecOpCode(op)
1114 1086

  
1115 1087

  
1088
class TestLUClusterVerifyClientCerts(CmdlibTestCase):
1089

  
1090
  def _AddNormalNode(self):
1091
    self.normalnode = copy.deepcopy(self.master)
1092
    self.normalnode.master_candidate = False
1093
    self.normalnode.uuid = "normal-node-uuid"
1094
    self.cfg.AddNode(self.normalnode, None)
1095

  
1096
  def testVerifyMasterCandidate(self):
1097
    client_cert = "client-cert-digest"
1098
    self.cluster.candidate_certs = {self.master.uuid: client_cert}
1099
    self.rpc.call_node_verify.return_value = \
1100
      RpcResultsBuilder() \
1101
        .AddSuccessfulNode(self.master,
1102
          {constants.NV_CLIENT_CERT: (None, client_cert)}) \
1103
        .Build()
1104
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1105
    self.ExecOpCode(op)
1106

  
1107
  def testVerifyMasterCandidateInvalid(self):
1108
    client_cert = "client-cert-digest"
1109
    self.cluster.candidate_certs = {self.master.uuid: client_cert}
1110
    self.rpc.call_node_verify.return_value = \
1111
      RpcResultsBuilder() \
1112
        .AddSuccessfulNode(self.master,
1113
          {constants.NV_CLIENT_CERT: (666, "Invalid Certificate")}) \
1114
        .Build()
1115
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1116
    self.ExecOpCode(op)
1117
    self.mcpu.assertLogContainsRegex("Client certificate")
1118
    self.mcpu.assertLogContainsRegex("failed validation")
1119

  
1120
  def testVerifyNoMasterCandidateMap(self):
1121
    client_cert = "client-cert-digest"
1122
    self.cluster.candidate_certs = {}
1123
    self.rpc.call_node_verify.return_value = \
1124
      RpcResultsBuilder() \
1125
        .AddSuccessfulNode(self.master,
1126
          {constants.NV_CLIENT_CERT: (None, client_cert)}) \
1127
        .Build()
1128
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1129
    self.ExecOpCode(op)
1130
    self.mcpu.assertLogContainsRegex(
1131
      "list of master candidate certificates is empty")
1132

  
1133
  def testVerifyNoSharingMasterCandidates(self):
1134
    client_cert = "client-cert-digest"
1135
    self.cluster.candidate_certs = {
1136
      self.master.uuid: client_cert,
1137
      "some-other-master-candidate-uuid": client_cert}
1138
    self.rpc.call_node_verify.return_value = \
1139
      RpcResultsBuilder() \
1140
        .AddSuccessfulNode(self.master,
1141
          {constants.NV_CLIENT_CERT: (None, client_cert)}) \
1142
        .Build()
1143
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1144
    self.ExecOpCode(op)
1145
    self.mcpu.assertLogContainsRegex(
1146
      "two master candidates configured to use the same")
1147

  
1148
  def testVerifyMasterCandidateCertMismatch(self):
1149
    client_cert = "client-cert-digest"
1150
    self.cluster.candidate_certs = {self.master.uuid: "different-cert-digest"}
1151
    self.rpc.call_node_verify.return_value = \
1152
      RpcResultsBuilder() \
1153
        .AddSuccessfulNode(self.master,
1154
          {constants.NV_CLIENT_CERT: (None, client_cert)}) \
1155
        .Build()
1156
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1157
    self.ExecOpCode(op)
1158
    self.mcpu.assertLogContainsRegex("does not match its entry")
1159

  
1160
  def testVerifyMasterCandidateUnregistered(self):
1161
    client_cert = "client-cert-digest"
1162
    self.cluster.candidate_certs = {"other-node-uuid": "different-cert-digest"}
1163
    self.rpc.call_node_verify.return_value = \
1164
      RpcResultsBuilder() \
1165
        .AddSuccessfulNode(self.master,
1166
          {constants.NV_CLIENT_CERT: (None, client_cert)}) \
1167
        .Build()
1168
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1169
    self.ExecOpCode(op)
1170
    self.mcpu.assertLogContainsRegex("does not have an entry")
1171

  
1172
  def testVerifyMasterCandidateOtherNodesCert(self):
1173
    client_cert = "client-cert-digest"
1174
    self.cluster.candidate_certs = {"other-node-uuid": client_cert}
1175
    self.rpc.call_node_verify.return_value = \
1176
      RpcResultsBuilder() \
1177
        .AddSuccessfulNode(self.master,
1178
          {constants.NV_CLIENT_CERT: (None, client_cert)}) \
1179
        .Build()
1180
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1181
    self.ExecOpCode(op)
1182
    self.mcpu.assertLogContainsRegex("using a certificate of another node")
1183

  
1184
  def testNormalNodeStillInList(self):
1185
    self._AddNormalNode()
1186
    client_cert_master = "client-cert-digest-master"
1187
    client_cert_normal = "client-cert-digest-normal"
1188
    self.cluster.candidate_certs = {
1189
      self.normalnode.uuid: client_cert_normal,
1190
      self.master.uuid: client_cert_master}
1191
    self.rpc.call_node_verify.return_value = \
1192
      RpcResultsBuilder() \
1193
        .AddSuccessfulNode(self.normalnode,
1194
          {constants.NV_CLIENT_CERT: (None, client_cert_normal)}) \
1195
        .AddSuccessfulNode(self.master,
1196
          {constants.NV_CLIENT_CERT: (None, client_cert_master)}) \
1197
        .Build()
1198
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1199
    self.ExecOpCode(op)
1200
    self.mcpu.assertLogContainsRegex("not a master candidate")
1201
    self.mcpu.assertLogContainsRegex("still listed")
1202

  
1203
  def testNormalNodeStealingMasterCandidateCert(self):
1204
    self._AddNormalNode()
1205
    client_cert_master = "client-cert-digest-master"
1206
    self.cluster.candidate_certs = {
1207
      self.master.uuid: client_cert_master}
1208
    self.rpc.call_node_verify.return_value = \
1209
      RpcResultsBuilder() \
1210
        .AddSuccessfulNode(self.normalnode,
1211
          {constants.NV_CLIENT_CERT: (None, client_cert_master)}) \
1212
        .AddSuccessfulNode(self.master,
1213
          {constants.NV_CLIENT_CERT: (None, client_cert_master)}) \
1214
        .Build()
1215
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1216
    self.ExecOpCode(op)
1217
    self.mcpu.assertLogContainsRegex("not a master candidate")
1218
    self.mcpu.assertLogContainsRegex(
1219
      "certificate of another node which is master candidate")
1220

  
1221

  
1116 1222
class TestLUClusterVerifyGroupMethods(CmdlibTestCase):
1117 1223
  """Base class for testing individual methods in LUClusterVerifyGroup.
1118 1224

  

Also available in: Unified diff