Revision a6c43c02

b/lib/backend.py
930 930
    result[constants.NV_HVINFO] = hyper.GetNodeInfo(hvparams=hvparams)
931 931

  
932 932

  
933
def _VerifyClientCertificate(cert_file=pathutils.NODED_CLIENT_CERT_FILE):
934
  """Verify the existance and validity of the client SSL certificate.
935

  
936
  """
937
  create_cert_cmd = "gnt-cluster renew-crypto --new-node-certificates"
938
  if not os.path.exists(cert_file):
939
    return (constants.CV_ERROR,
940
            "The client certificate does not exist. Run '%s' to create"
941
            "client certificates for all nodes." % create_cert_cmd)
942

  
943
  (errcode, msg) = utils.VerifyCertificate(cert_file)
944
  if errcode is not None:
945
    return (errcode, msg)
946
  else:
947
    # if everything is fine, we return the digest to be compared to the config
948
    return (None, utils.GetCertificateDigest(cert_filename=cert_file))
949

  
950

  
933 951
def VerifyNode(what, cluster_name, all_hvparams, node_groups, groups_cfg):
934 952
  """Verify the status of the local node.
935 953

  
......
983 1001
      dict((vcluster.MakeVirtualPath(key), value)
984 1002
           for (key, value) in fingerprints.items())
985 1003

  
1004
  if constants.NV_CLIENT_CERT in what:
1005
    result[constants.NV_CLIENT_CERT] = _VerifyClientCertificate()
1006

  
986 1007
  if constants.NV_NODELIST in what:
987 1008
    (nodes, bynode) = what[constants.NV_NODELIST]
988 1009

  
b/lib/cmdlib/cluster.py
21 21

  
22 22
"""Logical units dealing with the cluster."""
23 23

  
24
import OpenSSL
25

  
26 24
import copy
27 25
import itertools
28 26
import logging
......
1484 1482
  """
1485 1483

  
1486 1484
  ETYPE_FIELD = "code"
1487
  ETYPE_ERROR = "ERROR"
1488
  ETYPE_WARNING = "WARNING"
1485
  ETYPE_ERROR = constants.CV_ERROR
1486
  ETYPE_WARNING = constants.CV_WARNING
1489 1487

  
1490 1488
  def _Error(self, ecode, item, msg, *args, **kwargs):
1491 1489
    """Format an error message.
......
1529 1527
      self._Error(*args, **kwargs)
1530 1528

  
1531 1529

  
1532
def _VerifyCertificate(filename):
1533
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1534

  
1535
  @type filename: string
1536
  @param filename: Path to PEM file
1537

  
1538
  """
1539
  try:
1540
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1541
                                           utils.ReadFile(filename))
1542
  except Exception, err: # pylint: disable=W0703
1543
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1544
            "Failed to load X509 certificate %s: %s" % (filename, err))
1545

  
1546
  (errcode, msg) = \
1547
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1548
                                constants.SSL_CERT_EXPIRATION_ERROR)
1549

  
1550
  if msg:
1551
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1552
  else:
1553
    fnamemsg = None
1554

  
1555
  if errcode is None:
1556
    return (None, fnamemsg)
1557
  elif errcode == utils.CERT_WARNING:
1558
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1559
  elif errcode == utils.CERT_ERROR:
1560
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1561

  
1562
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1563

  
1564

  
1565 1530
def _GetAllHypervisorParameters(cluster, instances):
1566 1531
  """Compute the set of all hypervisor parameters.
1567 1532

  
......
1642 1607
    feedback_fn("* Verifying cluster certificate files")
1643 1608

  
1644 1609
    for cert_filename in pathutils.ALL_CERT_FILES:
1645
      (errcode, msg) = _VerifyCertificate(cert_filename)
1610
      (errcode, msg) = utils.VerifyCertificate(cert_filename)
1646 1611
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1647 1612

  
1648 1613
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
......
2328 2293
                      " should node %s fail (%dMiB needed, %dMiB available)",
2329 2294
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2330 2295

  
2296
  def _VerifyClientCertificates(self, nodes, all_nvinfo):
2297
    """Verifies the consistency of the client certificates.
2298

  
2299
    This includes several aspects:
2300
      - the individual validation of all nodes' certificates
2301
      - the consistency of the master candidate certificate map
2302
      - the consistency of the master candidate certificate map with the
2303
        certificates that the master candidates are actually using.
2304

  
2305
    @param nodes: the list of nodes to consider in this verification
2306
    @param all_nvinfo: the map of results of the verify_node call to
2307
      all nodes
2308

  
2309
    """
2310
    candidate_certs = self.cfg.GetClusterInfo().candidate_certs
2311
    if candidate_certs is None or len(candidate_certs) == 0:
2312
      self._ErrorIf(
2313
        True, constants.CV_ECLUSTERCLIENTCERT, None,
2314
        "The cluster's list of master candidate certificates is empty."
2315
        "If you just updated the cluster, please run"
2316
        " 'gnt-cluster renew-crypto --new-node-certificates'.")
2317
      return
2318

  
2319
    self._ErrorIf(
2320
      len(candidate_certs) != len(set(candidate_certs.values())),
2321
      constants.CV_ECLUSTERCLIENTCERT, None,
2322
      "There are at least two master candidates configured to use the same"
2323
      " certificate.")
2324

  
2325
    # collect the client certificate
2326
    for node in nodes:
2327
      if node.offline:
2328
        continue
2329

  
2330
      nresult = all_nvinfo[node.uuid]
2331
      if nresult.fail_msg or not nresult.payload:
2332
        continue
2333

  
2334
      (errcode, msg) = nresult.payload.get(constants.NV_CLIENT_CERT, None)
2335

  
2336
      self._ErrorIf(
2337
        errcode is not None, constants.CV_ECLUSTERCLIENTCERT, None,
2338
        "Client certificate of node '%s' failed validation: %s (code '%s')",
2339
        node.uuid, msg, errcode)
2340

  
2341
      if not errcode:
2342
        digest = msg
2343
        if node.master_candidate:
2344
          if node.uuid in candidate_certs:
2345
            self._ErrorIf(
2346
              digest != candidate_certs[node.uuid],
2347
              constants.CV_ECLUSTERCLIENTCERT, None,
2348
              "Client certificate digest of master candidate '%s' does not"
2349
              " match its entry in the cluster's map of master candidate"
2350
              " certificates. Expected: %s Got: %s", node.uuid,
2351
              digest, candidate_certs[node.uuid])
2352
          else:
2353
            self._ErrorIf(
2354
              True, constants.CV_ECLUSTERCLIENTCERT, None,
2355
              "The master candidate '%s' does not have an entry in the"
2356
              " map of candidate certificates.", node.uuid)
2357
            self._ErrorIf(
2358
              digest in candidate_certs.values(),
2359
              constants.CV_ECLUSTERCLIENTCERT, None,
2360
              "Master candidate '%s' is using a certificate of another node.",
2361
              node.uuid)
2362
        else:
2363
          self._ErrorIf(
2364
            node.uuid in candidate_certs,
2365
            constants.CV_ECLUSTERCLIENTCERT, None,
2366
            "Node '%s' is not a master candidate, but still listed in the"
2367
            " map of master candidate certificates.", node.uuid)
2368
          self._ErrorIf(
2369
            (node.uuid not in candidate_certs) and
2370
              (digest in candidate_certs.values()),
2371
            constants.CV_ECLUSTERCLIENTCERT, None,
2372
            "Node '%s' is not a master candidate and is incorrectly using a"
2373
            " certificate of another node which is master candidate.",
2374
            node.uuid)
2375

  
2331 2376
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2332 2377
                   (files_all, files_opt, files_mc, files_vm)):
2333 2378
    """Verifies file checksums collected from all nodes.
......
2371 2416
      if nresult.fail_msg or not nresult.payload:
2372 2417
        node_files = None
2373 2418
      else:
2374
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2419
        fingerprints = nresult.payload.get(constants.NV_FILELIST, {})
2375 2420
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2376 2421
                          for (key, value) in fingerprints.items())
2377 2422
        del fingerprints
......
3008 3053
      constants.NV_OSLIST: None,
3009 3054
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
3010 3055
      constants.NV_USERSCRIPTS: user_scripts,
3056
      constants.NV_CLIENT_CERT: None,
3011 3057
      }
3012 3058

  
3013 3059
    if vg_name is not None:
......
3132 3178

  
3133 3179
    feedback_fn("* Verifying configuration file consistency")
3134 3180

  
3181
    self._VerifyClientCertificates(self.my_node_info.values(), all_nvinfo)
3135 3182
    # If not all nodes are being checked, we need to make sure the master node
3136 3183
    # and a non-checked vm_capable node are in the list.
3137 3184
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
b/lib/utils/security.py
28 28

  
29 29
from ganeti.utils import io
30 30
from ganeti.utils import x509
31
from ganeti import constants
32
from ganeti import errors
31 33
from ganeti import pathutils
32 34

  
33 35

  
......
110 112

  
111 113
    logging.debug(log_msg)
112 114
    x509.GenerateSelfSignedSslCert(cert_filename)
115

  
116

  
117
def VerifyCertificate(filename):
118
  """Verifies a SSL certificate.
119

  
120
  @type filename: string
121
  @param filename: Path to PEM file
122

  
123
  """
124
  try:
125
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
126
                                           io.ReadFile(filename))
127
  except Exception, err: # pylint: disable=W0703
128
    return (constants.CV_ERROR,
129
            "Failed to load X509 certificate %s: %s" % (filename, err))
130

  
131
  (errcode, msg) = \
132
    x509.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
133
                                constants.SSL_CERT_EXPIRATION_ERROR)
134

  
135
  if msg:
136
    fnamemsg = "While verifying %s: %s" % (filename, msg)
137
  else:
138
    fnamemsg = None
139

  
140
  if errcode is None:
141
    return (None, fnamemsg)
142
  elif errcode == x509.CERT_WARNING:
143
    return (constants.CV_WARNING, fnamemsg)
144
  elif errcode == x509.CERT_ERROR:
145
    return (constants.CV_ERROR, fnamemsg)
146

  
147
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
b/src/Ganeti/Constants.hs
2684 2684
cvTinstance :: String
2685 2685
cvTinstance = "instance"
2686 2686

  
2687
-- * Cluster Verify error levels
2688

  
2689
cvWarning :: String
2690
cvWarning = "WARNING"
2691

  
2692
cvError :: String
2693
cvError = "ERROR"
2694

  
2687 2695
-- * Cluster Verify error codes and documentation
2688 2696

  
2689 2697
cvEclustercert :: (String, String, String)
......
2692 2700
   Types.cVErrorCodeToRaw CvECLUSTERCERT,
2693 2701
   "Cluster certificate files verification failure")
2694 2702

  
2703
cvEclusterclientcert :: (String, String, String)
2704
cvEclusterclientcert =
2705
  ("cluster",
2706
   Types.cVErrorCodeToRaw CvECLUSTERCLIENTCERT,
2707
   "Cluster client certificate files verification failure")
2708

  
2695 2709
cvEclustercfg :: (String, String, String)
2696 2710
cvEclustercfg =
2697 2711
  ("cluster",
......
2965 2979
nvBridges :: String
2966 2980
nvBridges = "bridges"
2967 2981

  
2982
nvClientCert :: String
2983
nvClientCert = "client-cert"
2984

  
2968 2985
nvDrbdhelper :: String
2969 2986
nvDrbdhelper = "drbd-helper"
2970 2987

  
b/src/Ganeti/Types.hs
362 362
$(THH.declareLADT ''String "CVErrorCode"
363 363
  [ ("CvECLUSTERCFG",                  "ECLUSTERCFG")
364 364
  , ("CvECLUSTERCERT",                 "ECLUSTERCERT")
365
  , ("CvECLUSTERCLIENTCERT",           "ECLUSTERCLIENTCERT")
365 366
  , ("CvECLUSTERFILECHECK",            "ECLUSTERFILECHECK")
366 367
  , ("CvECLUSTERDANGLINGNODES",        "ECLUSTERDANGLINGNODES")
367 368
  , ("CvECLUSTERDANGLINGINST",         "ECLUSTERDANGLINGINST")
b/test/py/cmdlib/cluster_unittest.py
25 25

  
26 26
import OpenSSL
27 27

  
28
import copy
28 29
import unittest
29 30
import operator
30
import os
31
import tempfile
32
import shutil
33

  
34
from collections import defaultdict
35 31

  
36 32
from ganeti.cmdlib import cluster
37 33
from ganeti import constants
......
49 45
import testutils
50 46

  
51 47

  
52
class TestCertVerification(testutils.GanetiTestCase):
53
  def setUp(self):
54
    testutils.GanetiTestCase.setUp(self)
55

  
56
    self.tmpdir = tempfile.mkdtemp()
57

  
58
  def tearDown(self):
59
    shutil.rmtree(self.tmpdir)
60

  
61
  def testVerifyCertificate(self):
62
    cluster._VerifyCertificate(testutils.TestDataFilename("cert1.pem"))
63

  
64
    nonexist_filename = os.path.join(self.tmpdir, "does-not-exist")
65

  
66
    (errcode, msg) = cluster._VerifyCertificate(nonexist_filename)
67
    self.assertEqual(errcode, cluster.LUClusterVerifyConfig.ETYPE_ERROR)
68

  
69
    # Try to load non-certificate file
70
    invalid_cert = testutils.TestDataFilename("bdev-net.txt")
71
    (errcode, msg) = cluster._VerifyCertificate(invalid_cert)
72
    self.assertEqual(errcode, cluster.LUClusterVerifyConfig.ETYPE_ERROR)
73

  
74

  
75 48
class TestClusterVerifySsh(unittest.TestCase):
76 49
  def testMultipleGroups(self):
77 50
    fn = cluster.LUClusterVerifyGroup._SelectSshCheckNodes
......
1013 986
      .patch_object(OpenSSL.crypto, "load_certificate")
1014 987
    self._load_cert_mock = self._load_cert_patcher.start()
1015 988
    self._verify_cert_patcher = testutils \
1016
      .patch_object(utils, "VerifyX509Certificate")
989
      .patch_object(utils, "VerifyCertificate")
1017 990
    self._verify_cert_mock = self._verify_cert_patcher.start()
1018 991
    self._read_file_patcher = testutils.patch_object(utils, "ReadFile")
1019 992
    self._read_file_mock = self._read_file_patcher.start()
......
1037 1010
    self.cfg.AddNewInstance()
1038 1011
    op = opcodes.OpClusterVerifyConfig()
1039 1012
    result = self.ExecOpCode(op)
1040

  
1041 1013
    self.assertTrue(result)
1042 1014

  
1043 1015
  def testDanglingNode(self):
......
1113 1085
    self.ExecOpCode(op)
1114 1086

  
1115 1087

  
1088
class TestLUClusterVerifyClientCerts(CmdlibTestCase):
1089

  
1090
  def _AddNormalNode(self):
1091
    self.normalnode = copy.deepcopy(self.master)
1092
    self.normalnode.master_candidate = False
1093
    self.normalnode.uuid = "normal-node-uuid"
1094
    self.cfg.AddNode(self.normalnode, None)
1095

  
1096
  def testVerifyMasterCandidate(self):
1097
    client_cert = "client-cert-digest"
1098
    self.cluster.candidate_certs = {self.master.uuid: client_cert}
1099
    self.rpc.call_node_verify.return_value = \
1100
      RpcResultsBuilder() \
1101
        .AddSuccessfulNode(self.master,
1102
          {constants.NV_CLIENT_CERT: (None, client_cert)}) \
1103
        .Build()
1104
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1105
    self.ExecOpCode(op)
1106

  
1107
  def testVerifyMasterCandidateInvalid(self):
1108
    client_cert = "client-cert-digest"
1109
    self.cluster.candidate_certs = {self.master.uuid: client_cert}
1110
    self.rpc.call_node_verify.return_value = \
1111
      RpcResultsBuilder() \
1112
        .AddSuccessfulNode(self.master,
1113
          {constants.NV_CLIENT_CERT: (666, "Invalid Certificate")}) \
1114
        .Build()
1115
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1116
    self.ExecOpCode(op)
1117
    self.mcpu.assertLogContainsRegex("Client certificate")
1118
    self.mcpu.assertLogContainsRegex("failed validation")
1119

  
1120
  def testVerifyNoMasterCandidateMap(self):
1121
    client_cert = "client-cert-digest"
1122
    self.cluster.candidate_certs = {}
1123
    self.rpc.call_node_verify.return_value = \
1124
      RpcResultsBuilder() \
1125
        .AddSuccessfulNode(self.master,
1126
          {constants.NV_CLIENT_CERT: (None, client_cert)}) \
1127
        .Build()
1128
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1129
    self.ExecOpCode(op)
1130
    self.mcpu.assertLogContainsRegex(
1131
      "list of master candidate certificates is empty")
1132

  
1133
  def testVerifyNoSharingMasterCandidates(self):
1134
    client_cert = "client-cert-digest"
1135
    self.cluster.candidate_certs = {
1136
      self.master.uuid: client_cert,
1137
      "some-other-master-candidate-uuid": client_cert}
1138
    self.rpc.call_node_verify.return_value = \
1139
      RpcResultsBuilder() \
1140
        .AddSuccessfulNode(self.master,
1141
          {constants.NV_CLIENT_CERT: (None, client_cert)}) \
1142
        .Build()
1143
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1144
    self.ExecOpCode(op)
1145
    self.mcpu.assertLogContainsRegex(
1146
      "two master candidates configured to use the same")
1147

  
1148
  def testVerifyMasterCandidateCertMismatch(self):
1149
    client_cert = "client-cert-digest"
1150
    self.cluster.candidate_certs = {self.master.uuid: "different-cert-digest"}
1151
    self.rpc.call_node_verify.return_value = \
1152
      RpcResultsBuilder() \
1153
        .AddSuccessfulNode(self.master,
1154
          {constants.NV_CLIENT_CERT: (None, client_cert)}) \
1155
        .Build()
1156
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1157
    self.ExecOpCode(op)
1158
    self.mcpu.assertLogContainsRegex("does not match its entry")
1159

  
1160
  def testVerifyMasterCandidateUnregistered(self):
1161
    client_cert = "client-cert-digest"
1162
    self.cluster.candidate_certs = {"other-node-uuid": "different-cert-digest"}
1163
    self.rpc.call_node_verify.return_value = \
1164
      RpcResultsBuilder() \
1165
        .AddSuccessfulNode(self.master,
1166
          {constants.NV_CLIENT_CERT: (None, client_cert)}) \
1167
        .Build()
1168
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1169
    self.ExecOpCode(op)
1170
    self.mcpu.assertLogContainsRegex("does not have an entry")
1171

  
1172
  def testVerifyMasterCandidateOtherNodesCert(self):
1173
    client_cert = "client-cert-digest"
1174
    self.cluster.candidate_certs = {"other-node-uuid": client_cert}
1175
    self.rpc.call_node_verify.return_value = \
1176
      RpcResultsBuilder() \
1177
        .AddSuccessfulNode(self.master,
1178
          {constants.NV_CLIENT_CERT: (None, client_cert)}) \
1179
        .Build()
1180
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1181
    self.ExecOpCode(op)
1182
    self.mcpu.assertLogContainsRegex("using a certificate of another node")
1183

  
1184
  def testNormalNodeStillInList(self):
1185
    self._AddNormalNode()
1186
    client_cert_master = "client-cert-digest-master"
1187
    client_cert_normal = "client-cert-digest-normal"
1188
    self.cluster.candidate_certs = {
1189
      self.normalnode.uuid: client_cert_normal,
1190
      self.master.uuid: client_cert_master}
1191
    self.rpc.call_node_verify.return_value = \
1192
      RpcResultsBuilder() \
1193
        .AddSuccessfulNode(self.normalnode,
1194
          {constants.NV_CLIENT_CERT: (None, client_cert_normal)}) \
1195
        .AddSuccessfulNode(self.master,
1196
          {constants.NV_CLIENT_CERT: (None, client_cert_master)}) \
1197
        .Build()
1198
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1199
    self.ExecOpCode(op)
1200
    self.mcpu.assertLogContainsRegex("not a master candidate")
1201
    self.mcpu.assertLogContainsRegex("still listed")
1202

  
1203
  def testNormalNodeStealingMasterCandidateCert(self):
1204
    self._AddNormalNode()
1205
    client_cert_master = "client-cert-digest-master"
1206
    self.cluster.candidate_certs = {
1207
      self.master.uuid: client_cert_master}
1208
    self.rpc.call_node_verify.return_value = \
1209
      RpcResultsBuilder() \
1210
        .AddSuccessfulNode(self.normalnode,
1211
          {constants.NV_CLIENT_CERT: (None, client_cert_master)}) \
1212
        .AddSuccessfulNode(self.master,
1213
          {constants.NV_CLIENT_CERT: (None, client_cert_master)}) \
1214
        .Build()
1215
    op = opcodes.OpClusterVerifyGroup(group_name="default", verbose=True)
1216
    self.ExecOpCode(op)
1217
    self.mcpu.assertLogContainsRegex("not a master candidate")
1218
    self.mcpu.assertLogContainsRegex(
1219
      "certificate of another node which is master candidate")
1220

  
1221

  
1116 1222
class TestLUClusterVerifyGroupMethods(CmdlibTestCase):
1117 1223
  """Base class for testing individual methods in LUClusterVerifyGroup.
1118 1224

  
b/test/py/ganeti.backend_unittest.py
177 177
        get_hv_fn=self._GetHypervisor)
178 178
    self._mock_hv.Verify.assert_called_with(hvparams=hvparams)
179 179

  
180
  @testutils.patch_object(utils, "VerifyCertificate")
181
  def testVerifyClientCertificateSuccess(self, verif_cert):
182
    # mock the underlying x509 verification because the test cert is expired
183
    verif_cert.return_value = (None, None)
184
    cert_file = testutils.TestDataFilename("cert2.pem")
185
    (errcode, digest) = backend._VerifyClientCertificate(cert_file=cert_file)
186
    self.assertEqual(None, errcode)
187
    self.assertTrue(isinstance(digest, str))
188

  
189
  @testutils.patch_object(utils, "VerifyCertificate")
190
  def testVerifyClientCertificateFailed(self, verif_cert):
191
    expected_errcode = 666
192
    verif_cert.return_value = (expected_errcode,
193
                               "The devil created this certificate.")
194
    cert_file = testutils.TestDataFilename("cert2.pem")
195
    (errcode, digest) = backend._VerifyClientCertificate(cert_file=cert_file)
196
    self.assertEqual(expected_errcode, errcode)
197

  
198
  def testVerifyClientCertificateNoCert(self):
199
    cert_file = testutils.TestDataFilename("cert-that-does-not-exist.pem")
200
    (errcode, digest) = backend._VerifyClientCertificate(cert_file=cert_file)
201
    self.assertEqual(constants.CV_ERROR, errcode)
202

  
180 203

  
181 204
def _DefRestrictedCmdOwner():
182 205
  return (os.getuid(), os.getgid())
b/test/py/ganeti.utils.security_unittest.py
22 22
"""Script for unittesting the ganeti.utils.storage module"""
23 23

  
24 24
import mock
25
import os
26
import shutil
27
import tempfile
25 28
import unittest
26 29

  
30
from ganeti import constants
27 31
from ganeti.utils import security
28 32

  
29 33
import testutils
......
88 92
    self.assertFalse(digest1 == digest2)
89 93

  
90 94

  
95
class TestCertVerification(testutils.GanetiTestCase):
96
  def setUp(self):
97
    testutils.GanetiTestCase.setUp(self)
98

  
99
    self.tmpdir = tempfile.mkdtemp()
100

  
101
  def tearDown(self):
102
    shutil.rmtree(self.tmpdir)
103

  
104
  def testVerifyCertificate(self):
105
    security.VerifyCertificate(testutils.TestDataFilename("cert1.pem"))
106

  
107
    nonexist_filename = os.path.join(self.tmpdir, "does-not-exist")
108

  
109
    (errcode, msg) = security.VerifyCertificate(nonexist_filename)
110
    self.assertEqual(errcode, constants.CV_ERROR)
111

  
112
    # Try to load non-certificate file
113
    invalid_cert = testutils.TestDataFilename("bdev-net.txt")
114
    (errcode, msg) = security.VerifyCertificate(invalid_cert)
115
    self.assertEqual(errcode, constants.CV_ERROR)
116

  
117

  
91 118
if __name__ == "__main__":
92 119
  testutils.GanetiTestProgram()

Also available in: Unified diff