Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 809a055b

History | View | Annotate | Download (140.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import copy
25
import itertools
26
import logging
27
import operator
28
import os
29
import re
30
import time
31

    
32
from ganeti import compat
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import hypervisor
36
from ganeti import locking
37
from ganeti import masterd
38
from ganeti import netutils
39
from ganeti import objects
40
from ganeti import opcodes
41
from ganeti import pathutils
42
from ganeti import query
43
import ganeti.rpc.node as rpc
44
from ganeti import runtime
45
from ganeti import ssh
46
from ganeti import uidpool
47
from ganeti import utils
48
from ganeti import vcluster
49

    
50
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
51
  ResultWithJobs
52
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
53
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
54
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
55
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
56
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
57
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
58
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
59
  CheckDiskAccessModeConsistency, CreateNewClientCert, \
60
  AddInstanceCommunicationNetworkOp, ConnectInstanceCommunicationNetworkOp, \
61
  CheckImageValidity
62

    
63
import ganeti.masterd.instance
64

    
65

    
66
def _UpdateMasterClientCert(
67
    lu, cfg, master_uuid,
68
    client_cert=pathutils.NODED_CLIENT_CERT_FILE,
69
    client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP):
70
  """Renews the master's client certificate and propagates the config.
71

72
  @type lu: C{LogicalUnit}
73
  @param lu: the logical unit holding the config
74
  @type cfg: C{config.ConfigWriter}
75
  @param cfg: the cluster's configuration
76
  @type master_uuid: string
77
  @param master_uuid: the master node's UUID
78
  @type client_cert: string
79
  @param client_cert: the path of the client certificate
80
  @type client_cert_tmp: string
81
  @param client_cert_tmp: the temporary path of the client certificate
82
  @rtype: string
83
  @return: the digest of the newly created client certificate
84

85
  """
86
  client_digest = CreateNewClientCert(lu, master_uuid, filename=client_cert_tmp)
87
  cfg.AddNodeToCandidateCerts(master_uuid, client_digest)
88
  # This triggers an update of the config and distribution of it with the old
89
  # SSL certificate
90

    
91
  utils.RemoveFile(client_cert)
92
  utils.RenameFile(client_cert_tmp, client_cert)
93
  return client_digest
94

    
95

    
96
class LUClusterRenewCrypto(NoHooksLU):
97
  """Renew the cluster's crypto tokens.
98

99
  Note that most of this operation is done in gnt_cluster.py, this LU only
100
  takes care of the renewal of the client SSL certificates.
101

102
  """
103
  def Exec(self, feedback_fn):
104
    master_uuid = self.cfg.GetMasterNode()
105

    
106
    server_digest = utils.GetCertificateDigest(
107
      cert_filename=pathutils.NODED_CERT_FILE)
108
    self.cfg.AddNodeToCandidateCerts("%s-SERVER" % master_uuid,
109
                                     server_digest)
110
    try:
111
      old_master_digest = utils.GetCertificateDigest(
112
        cert_filename=pathutils.NODED_CLIENT_CERT_FILE)
113
      self.cfg.AddNodeToCandidateCerts("%s-OLDMASTER" % master_uuid,
114
                                       old_master_digest)
115
    except IOError:
116
      logging.info("No old certificate available.")
117

    
118
    new_master_digest = _UpdateMasterClientCert(self, self.cfg, master_uuid)
119

    
120
    self.cfg.AddNodeToCandidateCerts(master_uuid, new_master_digest)
121
    nodes = self.cfg.GetAllNodesInfo()
122
    for (node_uuid, node_info) in nodes.items():
123
      if node_uuid != master_uuid:
124
        new_digest = CreateNewClientCert(self, node_uuid)
125
        if node_info.master_candidate:
126
          self.cfg.AddNodeToCandidateCerts(node_uuid, new_digest)
127
    self.cfg.RemoveNodeFromCandidateCerts("%s-SERVER" % master_uuid)
128
    self.cfg.RemoveNodeFromCandidateCerts("%s-OLDMASTER" % master_uuid)
129
    # Trigger another update of the config now with the new master cert
130

    
131

    
132
class LUClusterActivateMasterIp(NoHooksLU):
133
  """Activate the master IP on the master node.
134

135
  """
136
  def Exec(self, feedback_fn):
137
    """Activate the master IP.
138

139
    """
140
    master_params = self.cfg.GetMasterNetworkParameters()
141
    ems = self.cfg.GetUseExternalMipScript()
142
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
143
                                                   master_params, ems)
144
    result.Raise("Could not activate the master IP")
145

    
146

    
147
class LUClusterDeactivateMasterIp(NoHooksLU):
148
  """Deactivate the master IP on the master node.
149

150
  """
151
  def Exec(self, feedback_fn):
152
    """Deactivate the master IP.
153

154
    """
155
    master_params = self.cfg.GetMasterNetworkParameters()
156
    ems = self.cfg.GetUseExternalMipScript()
157
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
158
                                                     master_params, ems)
159
    result.Raise("Could not deactivate the master IP")
160

    
161

    
162
class LUClusterConfigQuery(NoHooksLU):
163
  """Return configuration values.
164

165
  """
166
  REQ_BGL = False
167

    
168
  def CheckArguments(self):
169
    self.cq = ClusterQuery(None, self.op.output_fields, False)
170

    
171
  def ExpandNames(self):
172
    self.cq.ExpandNames(self)
173

    
174
  def DeclareLocks(self, level):
175
    self.cq.DeclareLocks(self, level)
176

    
177
  def Exec(self, feedback_fn):
178
    result = self.cq.OldStyleQuery(self)
179

    
180
    assert len(result) == 1
181

    
182
    return result[0]
183

    
184

    
185
class LUClusterDestroy(LogicalUnit):
186
  """Logical unit for destroying the cluster.
187

188
  """
189
  HPATH = "cluster-destroy"
190
  HTYPE = constants.HTYPE_CLUSTER
191

    
192
  def BuildHooksEnv(self):
193
    """Build hooks env.
194

195
    """
196
    return {
197
      "OP_TARGET": self.cfg.GetClusterName(),
198
      }
199

    
200
  def BuildHooksNodes(self):
201
    """Build hooks nodes.
202

203
    """
204
    return ([], [])
205

    
206
  def CheckPrereq(self):
207
    """Check prerequisites.
208

209
    This checks whether the cluster is empty.
210

211
    Any errors are signaled by raising errors.OpPrereqError.
212

213
    """
214
    master = self.cfg.GetMasterNode()
215

    
216
    nodelist = self.cfg.GetNodeList()
217
    if len(nodelist) != 1 or nodelist[0] != master:
218
      raise errors.OpPrereqError("There are still %d node(s) in"
219
                                 " this cluster." % (len(nodelist) - 1),
220
                                 errors.ECODE_INVAL)
221
    instancelist = self.cfg.GetInstanceList()
222
    if instancelist:
223
      raise errors.OpPrereqError("There are still %d instance(s) in"
224
                                 " this cluster." % len(instancelist),
225
                                 errors.ECODE_INVAL)
226

    
227
  def Exec(self, feedback_fn):
228
    """Destroys the cluster.
229

230
    """
231
    master_params = self.cfg.GetMasterNetworkParameters()
232

    
233
    # Run post hooks on master node before it's removed
234
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
235

    
236
    ems = self.cfg.GetUseExternalMipScript()
237
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
238
                                                     master_params, ems)
239
    result.Warn("Error disabling the master IP address", self.LogWarning)
240
    return master_params.uuid
241

    
242

    
243
class LUClusterPostInit(LogicalUnit):
244
  """Logical unit for running hooks after cluster initialization.
245

246
  """
247
  HPATH = "cluster-init"
248
  HTYPE = constants.HTYPE_CLUSTER
249

    
250
  def CheckArguments(self):
251
    self.master_uuid = self.cfg.GetMasterNode()
252
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
253

    
254
    # TODO: When Issue 584 is solved, and None is properly parsed when used
255
    # as a default value, ndparams.get(.., None) can be changed to
256
    # ndparams[..] to access the values directly
257

    
258
    # OpenvSwitch: Warn user if link is missing
259
    if (self.master_ndparams[constants.ND_OVS] and not
260
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
261
      self.LogInfo("No physical interface for OpenvSwitch was given."
262
                   " OpenvSwitch will not have an outside connection. This"
263
                   " might not be what you want.")
264

    
265
  def BuildHooksEnv(self):
266
    """Build hooks env.
267

268
    """
269
    return {
270
      "OP_TARGET": self.cfg.GetClusterName(),
271
      }
272

    
273
  def BuildHooksNodes(self):
274
    """Build hooks nodes.
275

276
    """
277
    return ([], [self.cfg.GetMasterNode()])
278

    
279
  def Exec(self, feedback_fn):
280
    """Create and configure Open vSwitch
281

282
    """
283
    if self.master_ndparams[constants.ND_OVS]:
284
      result = self.rpc.call_node_configure_ovs(
285
                 self.master_uuid,
286
                 self.master_ndparams[constants.ND_OVS_NAME],
287
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
288
      result.Raise("Could not successully configure Open vSwitch")
289

    
290
    _UpdateMasterClientCert(self, self.cfg, self.master_uuid)
291

    
292
    return True
293

    
294

    
295
class ClusterQuery(QueryBase):
296
  FIELDS = query.CLUSTER_FIELDS
297

    
298
  #: Do not sort (there is only one item)
299
  SORT_FIELD = None
300

    
301
  def ExpandNames(self, lu):
302
    lu.needed_locks = {}
303

    
304
    # The following variables interact with _QueryBase._GetNames
305
    self.wanted = locking.ALL_SET
306
    self.do_locking = self.use_locking
307

    
308
    if self.do_locking:
309
      raise errors.OpPrereqError("Can not use locking for cluster queries",
310
                                 errors.ECODE_INVAL)
311

    
312
  def DeclareLocks(self, lu, level):
313
    pass
314

    
315
  def _GetQueryData(self, lu):
316
    """Computes the list of nodes and their attributes.
317

318
    """
319
    if query.CQ_CONFIG in self.requested_data:
320
      cluster = lu.cfg.GetClusterInfo()
321
      nodes = lu.cfg.GetAllNodesInfo()
322
    else:
323
      cluster = NotImplemented
324
      nodes = NotImplemented
325

    
326
    if query.CQ_QUEUE_DRAINED in self.requested_data:
327
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
328
    else:
329
      drain_flag = NotImplemented
330

    
331
    if query.CQ_WATCHER_PAUSE in self.requested_data:
332
      master_node_uuid = lu.cfg.GetMasterNode()
333

    
334
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
335
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
336
                   lu.cfg.GetMasterNodeName())
337

    
338
      watcher_pause = result.payload
339
    else:
340
      watcher_pause = NotImplemented
341

    
342
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
343

    
344

    
345
class LUClusterQuery(NoHooksLU):
346
  """Query cluster configuration.
347

348
  """
349
  REQ_BGL = False
350

    
351
  def ExpandNames(self):
352
    self.needed_locks = {}
353

    
354
  def Exec(self, feedback_fn):
355
    """Return cluster config.
356

357
    """
358
    cluster = self.cfg.GetClusterInfo()
359
    os_hvp = {}
360

    
361
    # Filter just for enabled hypervisors
362
    for os_name, hv_dict in cluster.os_hvp.items():
363
      os_hvp[os_name] = {}
364
      for hv_name, hv_params in hv_dict.items():
365
        if hv_name in cluster.enabled_hypervisors:
366
          os_hvp[os_name][hv_name] = hv_params
367

    
368
    # Convert ip_family to ip_version
369
    primary_ip_version = constants.IP4_VERSION
370
    if cluster.primary_ip_family == netutils.IP6Address.family:
371
      primary_ip_version = constants.IP6_VERSION
372

    
373
    result = {
374
      "software_version": constants.RELEASE_VERSION,
375
      "protocol_version": constants.PROTOCOL_VERSION,
376
      "config_version": constants.CONFIG_VERSION,
377
      "os_api_version": max(constants.OS_API_VERSIONS),
378
      "export_version": constants.EXPORT_VERSION,
379
      "vcs_version": constants.VCS_VERSION,
380
      "architecture": runtime.GetArchInfo(),
381
      "name": cluster.cluster_name,
382
      "master": self.cfg.GetMasterNodeName(),
383
      "default_hypervisor": cluster.primary_hypervisor,
384
      "enabled_hypervisors": cluster.enabled_hypervisors,
385
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
386
                        for hypervisor_name in cluster.enabled_hypervisors]),
387
      "os_hvp": os_hvp,
388
      "beparams": cluster.beparams,
389
      "osparams": cluster.osparams,
390
      "ipolicy": cluster.ipolicy,
391
      "nicparams": cluster.nicparams,
392
      "ndparams": cluster.ndparams,
393
      "diskparams": cluster.diskparams,
394
      "candidate_pool_size": cluster.candidate_pool_size,
395
      "max_running_jobs": cluster.max_running_jobs,
396
      "mac_prefix": cluster.mac_prefix,
397
      "master_netdev": cluster.master_netdev,
398
      "master_netmask": cluster.master_netmask,
399
      "use_external_mip_script": cluster.use_external_mip_script,
400
      "volume_group_name": cluster.volume_group_name,
401
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
402
      "file_storage_dir": cluster.file_storage_dir,
403
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
404
      "maintain_node_health": cluster.maintain_node_health,
405
      "ctime": cluster.ctime,
406
      "mtime": cluster.mtime,
407
      "uuid": cluster.uuid,
408
      "tags": list(cluster.GetTags()),
409
      "uid_pool": cluster.uid_pool,
410
      "default_iallocator": cluster.default_iallocator,
411
      "default_iallocator_params": cluster.default_iallocator_params,
412
      "reserved_lvs": cluster.reserved_lvs,
413
      "primary_ip_version": primary_ip_version,
414
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
415
      "hidden_os": cluster.hidden_os,
416
      "blacklisted_os": cluster.blacklisted_os,
417
      "enabled_disk_templates": cluster.enabled_disk_templates,
418
      "install_image": cluster.install_image,
419
      "instance_communication_network": cluster.instance_communication_network,
420
      "compression_tools": cluster.compression_tools,
421
      }
422

    
423
    return result
424

    
425

    
426
class LUClusterRedistConf(NoHooksLU):
427
  """Force the redistribution of cluster configuration.
428

429
  This is a very simple LU.
430

431
  """
432
  REQ_BGL = False
433

    
434
  def ExpandNames(self):
435
    self.needed_locks = {
436
      locking.LEVEL_NODE: locking.ALL_SET,
437
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
438
    }
439
    self.share_locks = ShareAll()
440

    
441
  def Exec(self, feedback_fn):
442
    """Redistribute the configuration.
443

444
    """
445
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
446
    RedistributeAncillaryFiles(self)
447

    
448

    
449
class LUClusterRename(LogicalUnit):
450
  """Rename the cluster.
451

452
  """
453
  HPATH = "cluster-rename"
454
  HTYPE = constants.HTYPE_CLUSTER
455

    
456
  def BuildHooksEnv(self):
457
    """Build hooks env.
458

459
    """
460
    return {
461
      "OP_TARGET": self.cfg.GetClusterName(),
462
      "NEW_NAME": self.op.name,
463
      }
464

    
465
  def BuildHooksNodes(self):
466
    """Build hooks nodes.
467

468
    """
469
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
470

    
471
  def CheckPrereq(self):
472
    """Verify that the passed name is a valid one.
473

474
    """
475
    hostname = netutils.GetHostname(name=self.op.name,
476
                                    family=self.cfg.GetPrimaryIPFamily())
477

    
478
    new_name = hostname.name
479
    self.ip = new_ip = hostname.ip
480
    old_name = self.cfg.GetClusterName()
481
    old_ip = self.cfg.GetMasterIP()
482
    if new_name == old_name and new_ip == old_ip:
483
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
484
                                 " cluster has changed",
485
                                 errors.ECODE_INVAL)
486
    if new_ip != old_ip:
487
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
488
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
489
                                   " reachable on the network" %
490
                                   new_ip, errors.ECODE_NOTUNIQUE)
491

    
492
    self.op.name = new_name
493

    
494
  def Exec(self, feedback_fn):
495
    """Rename the cluster.
496

497
    """
498
    clustername = self.op.name
499
    new_ip = self.ip
500

    
501
    # shutdown the master IP
502
    master_params = self.cfg.GetMasterNetworkParameters()
503
    ems = self.cfg.GetUseExternalMipScript()
504
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
505
                                                     master_params, ems)
506
    result.Raise("Could not disable the master role")
507

    
508
    try:
509
      cluster = self.cfg.GetClusterInfo()
510
      cluster.cluster_name = clustername
511
      cluster.master_ip = new_ip
512
      self.cfg.Update(cluster, feedback_fn)
513

    
514
      # update the known hosts file
515
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
516
      node_list = self.cfg.GetOnlineNodeList()
517
      try:
518
        node_list.remove(master_params.uuid)
519
      except ValueError:
520
        pass
521
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
522
    finally:
523
      master_params.ip = new_ip
524
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
525
                                                     master_params, ems)
526
      result.Warn("Could not re-enable the master role on the master,"
527
                  " please restart manually", self.LogWarning)
528

    
529
    return clustername
530

    
531

    
532
class LUClusterRepairDiskSizes(NoHooksLU):
533
  """Verifies the cluster disks sizes.
534

535
  """
536
  REQ_BGL = False
537

    
538
  def ExpandNames(self):
539
    if self.op.instances:
540
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
541
      # Not getting the node allocation lock as only a specific set of
542
      # instances (and their nodes) is going to be acquired
543
      self.needed_locks = {
544
        locking.LEVEL_NODE_RES: [],
545
        locking.LEVEL_INSTANCE: self.wanted_names,
546
        }
547
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
548
    else:
549
      self.wanted_names = None
550
      self.needed_locks = {
551
        locking.LEVEL_NODE_RES: locking.ALL_SET,
552
        locking.LEVEL_INSTANCE: locking.ALL_SET,
553

    
554
        # This opcode is acquires the node locks for all instances
555
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
556
        }
557

    
558
    self.share_locks = {
559
      locking.LEVEL_NODE_RES: 1,
560
      locking.LEVEL_INSTANCE: 0,
561
      locking.LEVEL_NODE_ALLOC: 1,
562
      }
563

    
564
  def DeclareLocks(self, level):
565
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
566
      self._LockInstancesNodes(primary_only=True, level=level)
567

    
568
  def CheckPrereq(self):
569
    """Check prerequisites.
570

571
    This only checks the optional instance list against the existing names.
572

573
    """
574
    if self.wanted_names is None:
575
      self.wanted_names = \
576
          map(self.cfg.GetInstanceName,
577
              self.owned_locks(locking.LEVEL_INSTANCE))
578

    
579
    self.wanted_instances = \
580
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
581

    
582
  def _EnsureChildSizes(self, disk):
583
    """Ensure children of the disk have the needed disk size.
584

585
    This is valid mainly for DRBD8 and fixes an issue where the
586
    children have smaller disk size.
587

588
    @param disk: an L{ganeti.objects.Disk} object
589

590
    """
591
    if disk.dev_type == constants.DT_DRBD8:
592
      assert disk.children, "Empty children for DRBD8?"
593
      fchild = disk.children[0]
594
      mismatch = fchild.size < disk.size
595
      if mismatch:
596
        self.LogInfo("Child disk has size %d, parent %d, fixing",
597
                     fchild.size, disk.size)
598
        fchild.size = disk.size
599

    
600
      # and we recurse on this child only, not on the metadev
601
      return self._EnsureChildSizes(fchild) or mismatch
602
    else:
603
      return False
604

    
605
  def Exec(self, feedback_fn):
606
    """Verify the size of cluster disks.
607

608
    """
609
    # TODO: check child disks too
610
    # TODO: check differences in size between primary/secondary nodes
611
    per_node_disks = {}
612
    for instance in self.wanted_instances:
613
      pnode = instance.primary_node
614
      if pnode not in per_node_disks:
615
        per_node_disks[pnode] = []
616
      for idx, disk in enumerate(self.cfg.GetInstanceDisks(instance.uuid)):
617
        per_node_disks[pnode].append((instance, idx, disk))
618

    
619
    assert not (frozenset(per_node_disks.keys()) -
620
                frozenset(self.owned_locks(locking.LEVEL_NODE_RES))), \
621
      "Not owning correct locks"
622
    assert not self.owned_locks(locking.LEVEL_NODE)
623

    
624
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
625
                                               per_node_disks.keys())
626

    
627
    changed = []
628
    for node_uuid, dskl in per_node_disks.items():
629
      if not dskl:
630
        # no disks on the node
631
        continue
632

    
633
      newl = [([v[2].Copy()], v[0]) for v in dskl]
634
      node_name = self.cfg.GetNodeName(node_uuid)
635
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
636
      if result.fail_msg:
637
        self.LogWarning("Failure in blockdev_getdimensions call to node"
638
                        " %s, ignoring", node_name)
639
        continue
640
      if len(result.payload) != len(dskl):
641
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
642
                        " result.payload=%s", node_name, len(dskl),
643
                        result.payload)
644
        self.LogWarning("Invalid result from node %s, ignoring node results",
645
                        node_name)
646
        continue
647
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
648
        if dimensions is None:
649
          self.LogWarning("Disk %d of instance %s did not return size"
650
                          " information, ignoring", idx, instance.name)
651
          continue
652
        if not isinstance(dimensions, (tuple, list)):
653
          self.LogWarning("Disk %d of instance %s did not return valid"
654
                          " dimension information, ignoring", idx,
655
                          instance.name)
656
          continue
657
        (size, spindles) = dimensions
658
        if not isinstance(size, (int, long)):
659
          self.LogWarning("Disk %d of instance %s did not return valid"
660
                          " size information, ignoring", idx, instance.name)
661
          continue
662
        size = size >> 20
663
        if size != disk.size:
664
          self.LogInfo("Disk %d of instance %s has mismatched size,"
665
                       " correcting: recorded %d, actual %d", idx,
666
                       instance.name, disk.size, size)
667
          disk.size = size
668
          self.cfg.Update(disk, feedback_fn)
669
          changed.append((instance.name, idx, "size", size))
670
        if es_flags[node_uuid]:
671
          if spindles is None:
672
            self.LogWarning("Disk %d of instance %s did not return valid"
673
                            " spindles information, ignoring", idx,
674
                            instance.name)
675
          elif disk.spindles is None or disk.spindles != spindles:
676
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
677
                         " correcting: recorded %s, actual %s",
678
                         idx, instance.name, disk.spindles, spindles)
679
            disk.spindles = spindles
680
            self.cfg.Update(disk, feedback_fn)
681
            changed.append((instance.name, idx, "spindles", disk.spindles))
682
        if self._EnsureChildSizes(disk):
683
          self.cfg.Update(disk, feedback_fn)
684
          changed.append((instance.name, idx, "size", disk.size))
685
    return changed
686

    
687

    
688
def _ValidateNetmask(cfg, netmask):
689
  """Checks if a netmask is valid.
690

691
  @type cfg: L{config.ConfigWriter}
692
  @param cfg: cluster configuration
693
  @type netmask: int
694
  @param netmask: netmask to be verified
695
  @raise errors.OpPrereqError: if the validation fails
696

697
  """
698
  ip_family = cfg.GetPrimaryIPFamily()
699
  try:
700
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
701
  except errors.ProgrammerError:
702
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
703
                               ip_family, errors.ECODE_INVAL)
704
  if not ipcls.ValidateNetmask(netmask):
705
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
706
                               (netmask), errors.ECODE_INVAL)
707

    
708

    
709
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
710
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
711
    file_disk_template):
712
  """Checks whether the given file-based storage directory is acceptable.
713

714
  Note: This function is public, because it is also used in bootstrap.py.
715

716
  @type logging_warn_fn: function
717
  @param logging_warn_fn: function which accepts a string and logs it
718
  @type file_storage_dir: string
719
  @param file_storage_dir: the directory to be used for file-based instances
720
  @type enabled_disk_templates: list of string
721
  @param enabled_disk_templates: the list of enabled disk templates
722
  @type file_disk_template: string
723
  @param file_disk_template: the file-based disk template for which the
724
      path should be checked
725

726
  """
727
  assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
728
            constants.ST_FILE, constants.ST_SHARED_FILE
729
         ))
730
  file_storage_enabled = file_disk_template in enabled_disk_templates
731
  if file_storage_dir is not None:
732
    if file_storage_dir == "":
733
      if file_storage_enabled:
734
        raise errors.OpPrereqError(
735
            "Unsetting the '%s' storage directory while having '%s' storage"
736
            " enabled is not permitted." %
737
            (file_disk_template, file_disk_template))
738
    else:
739
      if not file_storage_enabled:
740
        logging_warn_fn(
741
            "Specified a %s storage directory, although %s storage is not"
742
            " enabled." % (file_disk_template, file_disk_template))
743
  else:
744
    raise errors.ProgrammerError("Received %s storage dir with value"
745
                                 " 'None'." % file_disk_template)
746

    
747

    
748
def CheckFileStoragePathVsEnabledDiskTemplates(
749
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
750
  """Checks whether the given file storage directory is acceptable.
751

752
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
753

754
  """
755
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
756
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
757
      constants.DT_FILE)
758

    
759

    
760
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
761
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
762
  """Checks whether the given shared file storage directory is acceptable.
763

764
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
765

766
  """
767
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
768
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
769
      constants.DT_SHARED_FILE)
770

    
771

    
772
def CheckCompressionTools(tools):
773
  """Check whether the provided compression tools look like executables.
774

775
  @type tools: list of string
776
  @param tools: The tools provided as opcode input
777

778
  """
779
  regex = re.compile('^[-_a-zA-Z0-9]+$')
780
  illegal_tools = [t for t in tools if not regex.match(t)]
781

    
782
  if illegal_tools:
783
    raise errors.OpPrereqError(
784
      "The tools '%s' contain illegal characters: only alphanumeric values,"
785
      " dashes, and underscores are allowed" % ", ".join(illegal_tools)
786
    )
787

    
788
  if constants.IEC_GZIP not in tools:
789
    raise errors.OpPrereqError("For compatibility reasons, the %s utility must"
790
                               " be present among the compression tools" %
791
                               constants.IEC_GZIP)
792

    
793
  if constants.IEC_NONE in tools:
794
    raise errors.OpPrereqError("%s is a reserved value used for no compression,"
795
                               " and cannot be used as the name of a tool" %
796
                               constants.IEC_NONE)
797

    
798

    
799
class LUClusterSetParams(LogicalUnit):
800
  """Change the parameters of the cluster.
801

802
  """
803
  HPATH = "cluster-modify"
804
  HTYPE = constants.HTYPE_CLUSTER
805
  REQ_BGL = False
806

    
807
  def CheckArguments(self):
808
    """Check parameters
809

810
    """
811
    if self.op.uid_pool:
812
      uidpool.CheckUidPool(self.op.uid_pool)
813

    
814
    if self.op.add_uids:
815
      uidpool.CheckUidPool(self.op.add_uids)
816

    
817
    if self.op.remove_uids:
818
      uidpool.CheckUidPool(self.op.remove_uids)
819

    
820
    if self.op.mac_prefix:
821
      self.op.mac_prefix = \
822
          utils.NormalizeAndValidateThreeOctetMacPrefix(self.op.mac_prefix)
823

    
824
    if self.op.master_netmask is not None:
825
      _ValidateNetmask(self.cfg, self.op.master_netmask)
826

    
827
    if self.op.diskparams:
828
      for dt_params in self.op.diskparams.values():
829
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
830
      try:
831
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
832
        CheckDiskAccessModeValidity(self.op.diskparams)
833
      except errors.OpPrereqError, err:
834
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
835
                                   errors.ECODE_INVAL)
836

    
837
    if self.op.install_image is not None:
838
      CheckImageValidity(self.op.install_image,
839
                         "Install image must be an absolute path or a URL")
840

    
841
  def ExpandNames(self):
842
    # FIXME: in the future maybe other cluster params won't require checking on
843
    # all nodes to be modified.
844
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
845
    # resource locks the right thing, shouldn't it be the BGL instead?
846
    self.needed_locks = {
847
      locking.LEVEL_NODE: locking.ALL_SET,
848
      locking.LEVEL_INSTANCE: locking.ALL_SET,
849
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
850
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
851
    }
852
    self.share_locks = ShareAll()
853

    
854
  def BuildHooksEnv(self):
855
    """Build hooks env.
856

857
    """
858
    return {
859
      "OP_TARGET": self.cfg.GetClusterName(),
860
      "NEW_VG_NAME": self.op.vg_name,
861
      }
862

    
863
  def BuildHooksNodes(self):
864
    """Build hooks nodes.
865

866
    """
867
    mn = self.cfg.GetMasterNode()
868
    return ([mn], [mn])
869

    
870
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
871
                   new_enabled_disk_templates):
872
    """Check the consistency of the vg name on all nodes and in case it gets
873
       unset whether there are instances still using it.
874

875
    """
876
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
877
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
878
                                            new_enabled_disk_templates)
879
    current_vg_name = self.cfg.GetVGName()
880

    
881
    if self.op.vg_name == '':
882
      if lvm_is_enabled:
883
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
884
                                   " disk templates are or get enabled.")
885

    
886
    if self.op.vg_name is None:
887
      if current_vg_name is None and lvm_is_enabled:
888
        raise errors.OpPrereqError("Please specify a volume group when"
889
                                   " enabling lvm-based disk-templates.")
890

    
891
    if self.op.vg_name is not None and not self.op.vg_name:
892
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
893
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
894
                                   " instances exist", errors.ECODE_INVAL)
895

    
896
    if (self.op.vg_name is not None and lvm_is_enabled) or \
897
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
898
      self._CheckVgNameOnNodes(node_uuids)
899

    
900
  def _CheckVgNameOnNodes(self, node_uuids):
901
    """Check the status of the volume group on each node.
902

903
    """
904
    vglist = self.rpc.call_vg_list(node_uuids)
905
    for node_uuid in node_uuids:
906
      msg = vglist[node_uuid].fail_msg
907
      if msg:
908
        # ignoring down node
909
        self.LogWarning("Error while gathering data on node %s"
910
                        " (ignoring node): %s",
911
                        self.cfg.GetNodeName(node_uuid), msg)
912
        continue
913
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
914
                                            self.op.vg_name,
915
                                            constants.MIN_VG_SIZE)
916
      if vgstatus:
917
        raise errors.OpPrereqError("Error on node '%s': %s" %
918
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
919
                                   errors.ECODE_ENVIRON)
920

    
921
  @staticmethod
922
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
923
                                old_enabled_disk_templates):
924
    """Computes three sets of disk templates.
925

926
    @see: C{_GetDiskTemplateSets} for more details.
927

928
    """
929
    enabled_disk_templates = None
930
    new_enabled_disk_templates = []
931
    disabled_disk_templates = []
932
    if op_enabled_disk_templates:
933
      enabled_disk_templates = op_enabled_disk_templates
934
      new_enabled_disk_templates = \
935
        list(set(enabled_disk_templates)
936
             - set(old_enabled_disk_templates))
937
      disabled_disk_templates = \
938
        list(set(old_enabled_disk_templates)
939
             - set(enabled_disk_templates))
940
    else:
941
      enabled_disk_templates = old_enabled_disk_templates
942
    return (enabled_disk_templates, new_enabled_disk_templates,
943
            disabled_disk_templates)
944

    
945
  def _GetDiskTemplateSets(self, cluster):
946
    """Computes three sets of disk templates.
947

948
    The three sets are:
949
      - disk templates that will be enabled after this operation (no matter if
950
        they were enabled before or not)
951
      - disk templates that get enabled by this operation (thus haven't been
952
        enabled before.)
953
      - disk templates that get disabled by this operation
954

955
    """
956
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
957
                                          cluster.enabled_disk_templates)
958

    
959
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
960
    """Checks the ipolicy.
961

962
    @type cluster: C{objects.Cluster}
963
    @param cluster: the cluster's configuration
964
    @type enabled_disk_templates: list of string
965
    @param enabled_disk_templates: list of (possibly newly) enabled disk
966
      templates
967

968
    """
969
    # FIXME: write unit tests for this
970
    if self.op.ipolicy:
971
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
972
                                           group_policy=False)
973

    
974
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
975
                                  enabled_disk_templates)
976

    
977
      all_instances = self.cfg.GetAllInstancesInfo().values()
978
      violations = set()
979
      for group in self.cfg.GetAllNodeGroupsInfo().values():
980
        instances = frozenset(
981
          [inst for inst in all_instances
982
           if compat.any(nuuid in group.members
983
           for nuuid in self.cfg.GetInstanceNodes(inst.uuid))])
984
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
985
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
986
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
987
                                           self.cfg)
988
        if new:
989
          violations.update(new)
990

    
991
      if violations:
992
        self.LogWarning("After the ipolicy change the following instances"
993
                        " violate them: %s",
994
                        utils.CommaJoin(utils.NiceSort(violations)))
995
    else:
996
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
997
                                  enabled_disk_templates)
998

    
999
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
1000
    """Checks whether the set DRBD helper actually exists on the nodes.
1001

1002
    @type drbd_helper: string
1003
    @param drbd_helper: path of the drbd usermode helper binary
1004
    @type node_uuids: list of strings
1005
    @param node_uuids: list of node UUIDs to check for the helper
1006

1007
    """
1008
    # checks given drbd helper on all nodes
1009
    helpers = self.rpc.call_drbd_helper(node_uuids)
1010
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
1011
      if ninfo.offline:
1012
        self.LogInfo("Not checking drbd helper on offline node %s",
1013
                     ninfo.name)
1014
        continue
1015
      msg = helpers[ninfo.uuid].fail_msg
1016
      if msg:
1017
        raise errors.OpPrereqError("Error checking drbd helper on node"
1018
                                   " '%s': %s" % (ninfo.name, msg),
1019
                                   errors.ECODE_ENVIRON)
1020
      node_helper = helpers[ninfo.uuid].payload
1021
      if node_helper != drbd_helper:
1022
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
1023
                                   (ninfo.name, node_helper),
1024
                                   errors.ECODE_ENVIRON)
1025

    
1026
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
1027
    """Check the DRBD usermode helper.
1028

1029
    @type node_uuids: list of strings
1030
    @param node_uuids: a list of nodes' UUIDs
1031
    @type drbd_enabled: boolean
1032
    @param drbd_enabled: whether DRBD will be enabled after this operation
1033
      (no matter if it was disabled before or not)
1034
    @type drbd_gets_enabled: boolen
1035
    @param drbd_gets_enabled: true if DRBD was disabled before this
1036
      operation, but will be enabled afterwards
1037

1038
    """
1039
    if self.op.drbd_helper == '':
1040
      if drbd_enabled:
1041
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1042
                                   " DRBD is enabled.")
1043
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
1044
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1045
                                   " drbd-based instances exist",
1046
                                   errors.ECODE_INVAL)
1047

    
1048
    else:
1049
      if self.op.drbd_helper is not None and drbd_enabled:
1050
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
1051
      else:
1052
        if drbd_gets_enabled:
1053
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
1054
          if current_drbd_helper is not None:
1055
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
1056
          else:
1057
            raise errors.OpPrereqError("Cannot enable DRBD without a"
1058
                                       " DRBD usermode helper set.")
1059

    
1060
  def _CheckInstancesOfDisabledDiskTemplates(
1061
      self, disabled_disk_templates):
1062
    """Check whether we try to disable a disk template that is in use.
1063

1064
    @type disabled_disk_templates: list of string
1065
    @param disabled_disk_templates: list of disk templates that are going to
1066
      be disabled by this operation
1067

1068
    """
1069
    for disk_template in disabled_disk_templates:
1070
      if self.cfg.HasAnyDiskOfType(disk_template):
1071
        raise errors.OpPrereqError(
1072
            "Cannot disable disk template '%s', because there is at least one"
1073
            " instance using it." % disk_template)
1074

    
1075
  @staticmethod
1076
  def _CheckInstanceCommunicationNetwork(network, warning_fn):
1077
    """Check whether an existing network is configured for instance
1078
    communication.
1079

1080
    Checks whether an existing network is configured with the
1081
    parameters that are advisable for instance communication, and
1082
    otherwise issue security warnings.
1083

1084
    @type network: L{ganeti.objects.Network}
1085
    @param network: L{ganeti.objects.Network} object whose
1086
                    configuration is being checked
1087
    @type warning_fn: function
1088
    @param warning_fn: function used to print warnings
1089
    @rtype: None
1090
    @return: None
1091

1092
    """
1093
    def _MaybeWarn(err, val, default):
1094
      if val != default:
1095
        warning_fn("Supplied instance communication network '%s' %s '%s',"
1096
                   " this might pose a security risk (default is '%s').",
1097
                   network.name, err, val, default)
1098

    
1099
    if network.network is None:
1100
      raise errors.OpPrereqError("Supplied instance communication network '%s'"
1101
                                 " must have an IPv4 network address.",
1102
                                 network.name)
1103

    
1104
    _MaybeWarn("has an IPv4 gateway", network.gateway, None)
1105
    _MaybeWarn("has a non-standard IPv4 network address", network.network,
1106
               constants.INSTANCE_COMMUNICATION_NETWORK4)
1107
    _MaybeWarn("has an IPv6 gateway", network.gateway6, None)
1108
    _MaybeWarn("has a non-standard IPv6 network address", network.network6,
1109
               constants.INSTANCE_COMMUNICATION_NETWORK6)
1110
    _MaybeWarn("has a non-standard MAC prefix", network.mac_prefix,
1111
               constants.INSTANCE_COMMUNICATION_MAC_PREFIX)
1112

    
1113
  def CheckPrereq(self):
1114
    """Check prerequisites.
1115

1116
    This checks whether the given params don't conflict and
1117
    if the given volume group is valid.
1118

1119
    """
1120
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1121
    self.cluster = cluster = self.cfg.GetClusterInfo()
1122

    
1123
    vm_capable_node_uuids = [node.uuid
1124
                             for node in self.cfg.GetAllNodesInfo().values()
1125
                             if node.uuid in node_uuids and node.vm_capable]
1126

    
1127
    (enabled_disk_templates, new_enabled_disk_templates,
1128
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
1129
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
1130

    
1131
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
1132
                      new_enabled_disk_templates)
1133

    
1134
    if self.op.file_storage_dir is not None:
1135
      CheckFileStoragePathVsEnabledDiskTemplates(
1136
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
1137

    
1138
    if self.op.shared_file_storage_dir is not None:
1139
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
1140
          self.LogWarning, self.op.shared_file_storage_dir,
1141
          enabled_disk_templates)
1142

    
1143
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1144
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1145
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1146

    
1147
    # validate params changes
1148
    if self.op.beparams:
1149
      objects.UpgradeBeParams(self.op.beparams)
1150
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1151
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1152

    
1153
    if self.op.ndparams:
1154
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1155
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1156

    
1157
      # TODO: we need a more general way to handle resetting
1158
      # cluster-level parameters to default values
1159
      if self.new_ndparams["oob_program"] == "":
1160
        self.new_ndparams["oob_program"] = \
1161
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1162

    
1163
    if self.op.hv_state:
1164
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1165
                                           self.cluster.hv_state_static)
1166
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1167
                               for hv, values in new_hv_state.items())
1168

    
1169
    if self.op.disk_state:
1170
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1171
                                               self.cluster.disk_state_static)
1172
      self.new_disk_state = \
1173
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1174
                            for name, values in svalues.items()))
1175
             for storage, svalues in new_disk_state.items())
1176

    
1177
    self._CheckIpolicy(cluster, enabled_disk_templates)
1178

    
1179
    if self.op.nicparams:
1180
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1181
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1182
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1183
      nic_errors = []
1184

    
1185
      # check all instances for consistency
1186
      for instance in self.cfg.GetAllInstancesInfo().values():
1187
        for nic_idx, nic in enumerate(instance.nics):
1188
          params_copy = copy.deepcopy(nic.nicparams)
1189
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1190

    
1191
          # check parameter syntax
1192
          try:
1193
            objects.NIC.CheckParameterSyntax(params_filled)
1194
          except errors.ConfigurationError, err:
1195
            nic_errors.append("Instance %s, nic/%d: %s" %
1196
                              (instance.name, nic_idx, err))
1197

    
1198
          # if we're moving instances to routed, check that they have an ip
1199
          target_mode = params_filled[constants.NIC_MODE]
1200
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1201
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1202
                              " address" % (instance.name, nic_idx))
1203
      if nic_errors:
1204
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1205
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1206

    
1207
    # hypervisor list/parameters
1208
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1209
    if self.op.hvparams:
1210
      for hv_name, hv_dict in self.op.hvparams.items():
1211
        if hv_name not in self.new_hvparams:
1212
          self.new_hvparams[hv_name] = hv_dict
1213
        else:
1214
          self.new_hvparams[hv_name].update(hv_dict)
1215

    
1216
    # disk template parameters
1217
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1218
    if self.op.diskparams:
1219
      for dt_name, dt_params in self.op.diskparams.items():
1220
        if dt_name not in self.new_diskparams:
1221
          self.new_diskparams[dt_name] = dt_params
1222
        else:
1223
          self.new_diskparams[dt_name].update(dt_params)
1224
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1225

    
1226
    # os hypervisor parameters
1227
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1228
    if self.op.os_hvp:
1229
      for os_name, hvs in self.op.os_hvp.items():
1230
        if os_name not in self.new_os_hvp:
1231
          self.new_os_hvp[os_name] = hvs
1232
        else:
1233
          for hv_name, hv_dict in hvs.items():
1234
            if hv_dict is None:
1235
              # Delete if it exists
1236
              self.new_os_hvp[os_name].pop(hv_name, None)
1237
            elif hv_name not in self.new_os_hvp[os_name]:
1238
              self.new_os_hvp[os_name][hv_name] = hv_dict
1239
            else:
1240
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1241

    
1242
    # os parameters
1243
    self._BuildOSParams(cluster)
1244

    
1245
    # changes to the hypervisor list
1246
    if self.op.enabled_hypervisors is not None:
1247
      self.hv_list = self.op.enabled_hypervisors
1248
      for hv in self.hv_list:
1249
        # if the hypervisor doesn't already exist in the cluster
1250
        # hvparams, we initialize it to empty, and then (in both
1251
        # cases) we make sure to fill the defaults, as we might not
1252
        # have a complete defaults list if the hypervisor wasn't
1253
        # enabled before
1254
        if hv not in new_hvp:
1255
          new_hvp[hv] = {}
1256
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1257
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1258
    else:
1259
      self.hv_list = cluster.enabled_hypervisors
1260

    
1261
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1262
      # either the enabled list has changed, or the parameters have, validate
1263
      for hv_name, hv_params in self.new_hvparams.items():
1264
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1265
            (self.op.enabled_hypervisors and
1266
             hv_name in self.op.enabled_hypervisors)):
1267
          # either this is a new hypervisor, or its parameters have changed
1268
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1269
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1270
          hv_class.CheckParameterSyntax(hv_params)
1271
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1272

    
1273
    self._CheckDiskTemplateConsistency()
1274

    
1275
    if self.op.os_hvp:
1276
      # no need to check any newly-enabled hypervisors, since the
1277
      # defaults have already been checked in the above code-block
1278
      for os_name, os_hvp in self.new_os_hvp.items():
1279
        for hv_name, hv_params in os_hvp.items():
1280
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1281
          # we need to fill in the new os_hvp on top of the actual hv_p
1282
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1283
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1284
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1285
          hv_class.CheckParameterSyntax(new_osp)
1286
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1287

    
1288
    if self.op.default_iallocator:
1289
      alloc_script = utils.FindFile(self.op.default_iallocator,
1290
                                    constants.IALLOCATOR_SEARCH_PATH,
1291
                                    os.path.isfile)
1292
      if alloc_script is None:
1293
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1294
                                   " specified" % self.op.default_iallocator,
1295
                                   errors.ECODE_INVAL)
1296

    
1297
    if self.op.instance_communication_network:
1298
      network_name = self.op.instance_communication_network
1299

    
1300
      try:
1301
        network_uuid = self.cfg.LookupNetwork(network_name)
1302
      except errors.OpPrereqError:
1303
        network_uuid = None
1304

    
1305
      if network_uuid is not None:
1306
        network = self.cfg.GetNetwork(network_uuid)
1307
        self._CheckInstanceCommunicationNetwork(network, self.LogWarning)
1308

    
1309
    if self.op.compression_tools:
1310
      CheckCompressionTools(self.op.compression_tools)
1311

    
1312
  def _BuildOSParams(self, cluster):
1313
    "Calculate the new OS parameters for this operation."
1314

    
1315
    def _GetNewParams(source, new_params):
1316
      "Wrapper around GetUpdatedParams."
1317
      if new_params is None:
1318
        return source
1319
      result = objects.FillDict(source, {}) # deep copy of source
1320
      for os_name in new_params:
1321
        result[os_name] = GetUpdatedParams(result.get(os_name, {}),
1322
                                           new_params[os_name],
1323
                                           use_none=True)
1324
        if not result[os_name]:
1325
          del result[os_name] # we removed all parameters
1326
      return result
1327

    
1328
    self.new_osp = _GetNewParams(cluster.osparams,
1329
                                 self.op.osparams)
1330
    self.new_osp_private = _GetNewParams(cluster.osparams_private_cluster,
1331
                                         self.op.osparams_private_cluster)
1332

    
1333
    # Remove os validity check
1334
    changed_oses = (set(self.new_osp.keys()) | set(self.new_osp_private.keys()))
1335
    for os_name in changed_oses:
1336
      os_params = cluster.SimpleFillOS(
1337
        os_name,
1338
        self.new_osp.get(os_name, {}),
1339
        os_params_private=self.new_osp_private.get(os_name, {})
1340
      )
1341
      # check the parameter validity (remote check)
1342
      CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1343
                    os_name, os_params, False)
1344

    
1345
  def _CheckDiskTemplateConsistency(self):
1346
    """Check whether the disk templates that are going to be disabled
1347
       are still in use by some instances.
1348

1349
    """
1350
    if self.op.enabled_disk_templates:
1351
      cluster = self.cfg.GetClusterInfo()
1352
      instances = self.cfg.GetAllInstancesInfo()
1353

    
1354
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1355
        - set(self.op.enabled_disk_templates)
1356
      for instance in instances.itervalues():
1357
        if instance.disk_template in disk_templates_to_remove:
1358
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1359
                                     " because instance '%s' is using it." %
1360
                                     (instance.disk_template, instance.name))
1361

    
1362
  def _SetVgName(self, feedback_fn):
1363
    """Determines and sets the new volume group name.
1364

1365
    """
1366
    if self.op.vg_name is not None:
1367
      new_volume = self.op.vg_name
1368
      if not new_volume:
1369
        new_volume = None
1370
      if new_volume != self.cfg.GetVGName():
1371
        self.cfg.SetVGName(new_volume)
1372
      else:
1373
        feedback_fn("Cluster LVM configuration already in desired"
1374
                    " state, not changing")
1375

    
1376
  def _SetFileStorageDir(self, feedback_fn):
1377
    """Set the file storage directory.
1378

1379
    """
1380
    if self.op.file_storage_dir is not None:
1381
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1382
        feedback_fn("Global file storage dir already set to value '%s'"
1383
                    % self.cluster.file_storage_dir)
1384
      else:
1385
        self.cluster.file_storage_dir = self.op.file_storage_dir
1386

    
1387
  def _SetSharedFileStorageDir(self, feedback_fn):
1388
    """Set the shared file storage directory.
1389

1390
    """
1391
    if self.op.shared_file_storage_dir is not None:
1392
      if self.cluster.shared_file_storage_dir == \
1393
          self.op.shared_file_storage_dir:
1394
        feedback_fn("Global shared file storage dir already set to value '%s'"
1395
                    % self.cluster.shared_file_storage_dir)
1396
      else:
1397
        self.cluster.shared_file_storage_dir = self.op.shared_file_storage_dir
1398

    
1399
  def _SetDrbdHelper(self, feedback_fn):
1400
    """Set the DRBD usermode helper.
1401

1402
    """
1403
    if self.op.drbd_helper is not None:
1404
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1405
        feedback_fn("Note that you specified a drbd user helper, but did not"
1406
                    " enable the drbd disk template.")
1407
      new_helper = self.op.drbd_helper
1408
      if not new_helper:
1409
        new_helper = None
1410
      if new_helper != self.cfg.GetDRBDHelper():
1411
        self.cfg.SetDRBDHelper(new_helper)
1412
      else:
1413
        feedback_fn("Cluster DRBD helper already in desired state,"
1414
                    " not changing")
1415

    
1416
  @staticmethod
1417
  def _EnsureInstanceCommunicationNetwork(cfg, network_name):
1418
    """Ensure that the instance communication network exists and is
1419
    connected to all groups.
1420

1421
    The instance communication network given by L{network_name} it is
1422
    created, if necessary, via the opcode 'OpNetworkAdd'.  Also, the
1423
    instance communication network is connected to all existing node
1424
    groups, if necessary, via the opcode 'OpNetworkConnect'.
1425

1426
    @type cfg: L{config.ConfigWriter}
1427
    @param cfg: cluster configuration
1428

1429
    @type network_name: string
1430
    @param network_name: instance communication network name
1431

1432
    @rtype: L{ganeti.cmdlib.ResultWithJobs} or L{None}
1433
    @return: L{ganeti.cmdlib.ResultWithJobs} if the instance
1434
             communication needs to be created or it needs to be
1435
             connected to a group, otherwise L{None}
1436

1437
    """
1438
    jobs = []
1439

    
1440
    try:
1441
      network_uuid = cfg.LookupNetwork(network_name)
1442
      network_exists = True
1443
    except errors.OpPrereqError:
1444
      network_exists = False
1445

    
1446
    if not network_exists:
1447
      jobs.append(AddInstanceCommunicationNetworkOp(network_name))
1448

    
1449
    for group_uuid in cfg.GetNodeGroupList():
1450
      group = cfg.GetNodeGroup(group_uuid)
1451

    
1452
      if network_exists:
1453
        network_connected = network_uuid in group.networks
1454
      else:
1455
        # The network was created asynchronously by the previous
1456
        # opcode and, therefore, we don't have access to its
1457
        # network_uuid.  As a result, we assume that the network is
1458
        # not connected to any group yet.
1459
        network_connected = False
1460

    
1461
      if not network_connected:
1462
        op = ConnectInstanceCommunicationNetworkOp(group_uuid, network_name)
1463
        jobs.append(op)
1464

    
1465
    if jobs:
1466
      return ResultWithJobs([jobs])
1467
    else:
1468
      return None
1469

    
1470
  @staticmethod
1471
  def _ModifyInstanceCommunicationNetwork(cfg, network_name, feedback_fn):
1472
    """Update the instance communication network stored in the cluster
1473
    configuration.
1474

1475
    Compares the user-supplied instance communication network against
1476
    the one stored in the Ganeti cluster configuration.  If there is a
1477
    change, the instance communication network may be possibly created
1478
    and connected to all groups (see
1479
    L{LUClusterSetParams._EnsureInstanceCommunicationNetwork}).
1480

1481
    @type cfg: L{config.ConfigWriter}
1482
    @param cfg: cluster configuration
1483

1484
    @type network_name: string
1485
    @param network_name: instance communication network name
1486

1487
    @type feedback_fn: function
1488
    @param feedback_fn: see L{ganeti.cmdlist.base.LogicalUnit}
1489

1490
    @rtype: L{LUClusterSetParams._EnsureInstanceCommunicationNetwork} or L{None}
1491
    @return: see L{LUClusterSetParams._EnsureInstanceCommunicationNetwork}
1492

1493
    """
1494
    config_network_name = cfg.GetInstanceCommunicationNetwork()
1495

    
1496
    if network_name == config_network_name:
1497
      feedback_fn("Instance communication network already is '%s', nothing to"
1498
                  " do." % network_name)
1499
    else:
1500
      try:
1501
        cfg.LookupNetwork(config_network_name)
1502
        feedback_fn("Previous instance communication network '%s'"
1503
                    " should be removed manually." % config_network_name)
1504
      except errors.OpPrereqError:
1505
        pass
1506

    
1507
      if network_name:
1508
        feedback_fn("Changing instance communication network to '%s', only new"
1509
                    " instances will be affected."
1510
                    % network_name)
1511
      else:
1512
        feedback_fn("Disabling instance communication network, only new"
1513
                    " instances will be affected.")
1514

    
1515
      cfg.SetInstanceCommunicationNetwork(network_name)
1516

    
1517
      if network_name:
1518
        return LUClusterSetParams._EnsureInstanceCommunicationNetwork(
1519
          cfg,
1520
          network_name)
1521
      else:
1522
        return None
1523

    
1524
  def Exec(self, feedback_fn):
1525
    """Change the parameters of the cluster.
1526

1527
    """
1528
    # re-read the fresh configuration
1529
    self.cluster = self.cfg.GetClusterInfo()
1530
    if self.op.enabled_disk_templates:
1531
      self.cluster.enabled_disk_templates = \
1532
        list(self.op.enabled_disk_templates)
1533
    # save the changes
1534
    self.cfg.Update(self.cluster, feedback_fn)
1535

    
1536
    self._SetVgName(feedback_fn)
1537

    
1538
    self.cluster = self.cfg.GetClusterInfo()
1539
    self._SetFileStorageDir(feedback_fn)
1540
    self.cfg.Update(self.cluster, feedback_fn)
1541
    self._SetDrbdHelper(feedback_fn)
1542

    
1543
    # re-read the fresh configuration again
1544
    self.cluster = self.cfg.GetClusterInfo()
1545

    
1546
    if self.op.hvparams:
1547
      self.cluster.hvparams = self.new_hvparams
1548
    if self.op.os_hvp:
1549
      self.cluster.os_hvp = self.new_os_hvp
1550
    if self.op.enabled_hypervisors is not None:
1551
      self.cluster.hvparams = self.new_hvparams
1552
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1553
    if self.op.beparams:
1554
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1555
    if self.op.nicparams:
1556
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1557
    if self.op.ipolicy:
1558
      self.cluster.ipolicy = self.new_ipolicy
1559
    if self.op.osparams:
1560
      self.cluster.osparams = self.new_osp
1561
    if self.op.osparams_private_cluster:
1562
      self.cluster.osparams_private_cluster = self.new_osp_private
1563
    if self.op.ndparams:
1564
      self.cluster.ndparams = self.new_ndparams
1565
    if self.op.diskparams:
1566
      self.cluster.diskparams = self.new_diskparams
1567
    if self.op.hv_state:
1568
      self.cluster.hv_state_static = self.new_hv_state
1569
    if self.op.disk_state:
1570
      self.cluster.disk_state_static = self.new_disk_state
1571

    
1572
    if self.op.candidate_pool_size is not None:
1573
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1574
      # we need to update the pool size here, otherwise the save will fail
1575
      AdjustCandidatePool(self, [])
1576

    
1577
    if self.op.max_running_jobs is not None:
1578
      self.cluster.max_running_jobs = self.op.max_running_jobs
1579

    
1580
    if self.op.maintain_node_health is not None:
1581
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1582
        feedback_fn("Note: CONFD was disabled at build time, node health"
1583
                    " maintenance is not useful (still enabling it)")
1584
      self.cluster.maintain_node_health = self.op.maintain_node_health
1585

    
1586
    if self.op.modify_etc_hosts is not None:
1587
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1588

    
1589
    if self.op.prealloc_wipe_disks is not None:
1590
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1591

    
1592
    if self.op.add_uids is not None:
1593
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1594

    
1595
    if self.op.remove_uids is not None:
1596
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1597

    
1598
    if self.op.uid_pool is not None:
1599
      self.cluster.uid_pool = self.op.uid_pool
1600

    
1601
    if self.op.default_iallocator is not None:
1602
      self.cluster.default_iallocator = self.op.default_iallocator
1603

    
1604
    if self.op.default_iallocator_params is not None:
1605
      self.cluster.default_iallocator_params = self.op.default_iallocator_params
1606

    
1607
    if self.op.reserved_lvs is not None:
1608
      self.cluster.reserved_lvs = self.op.reserved_lvs
1609

    
1610
    if self.op.use_external_mip_script is not None:
1611
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1612

    
1613
    def helper_os(aname, mods, desc):
1614
      desc += " OS list"
1615
      lst = getattr(self.cluster, aname)
1616
      for key, val in mods:
1617
        if key == constants.DDM_ADD:
1618
          if val in lst:
1619
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1620
          else:
1621
            lst.append(val)
1622
        elif key == constants.DDM_REMOVE:
1623
          if val in lst:
1624
            lst.remove(val)
1625
          else:
1626
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1627
        else:
1628
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1629

    
1630
    if self.op.hidden_os:
1631
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1632

    
1633
    if self.op.blacklisted_os:
1634
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1635

    
1636
    if self.op.mac_prefix:
1637
      self.cluster.mac_prefix = self.op.mac_prefix
1638

    
1639
    if self.op.master_netdev:
1640
      master_params = self.cfg.GetMasterNetworkParameters()
1641
      ems = self.cfg.GetUseExternalMipScript()
1642
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1643
                  self.cluster.master_netdev)
1644
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1645
                                                       master_params, ems)
1646
      if not self.op.force:
1647
        result.Raise("Could not disable the master ip")
1648
      else:
1649
        if result.fail_msg:
1650
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1651
                 result.fail_msg)
1652
          feedback_fn(msg)
1653
      feedback_fn("Changing master_netdev from %s to %s" %
1654
                  (master_params.netdev, self.op.master_netdev))
1655
      self.cluster.master_netdev = self.op.master_netdev
1656

    
1657
    if self.op.master_netmask:
1658
      master_params = self.cfg.GetMasterNetworkParameters()
1659
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1660
      result = self.rpc.call_node_change_master_netmask(
1661
                 master_params.uuid, master_params.netmask,
1662
                 self.op.master_netmask, master_params.ip,
1663
                 master_params.netdev)
1664
      result.Warn("Could not change the master IP netmask", feedback_fn)
1665
      self.cluster.master_netmask = self.op.master_netmask
1666

    
1667
    if self.op.install_image:
1668
      self.cluster.install_image = self.op.install_image
1669

    
1670
    if self.op.zeroing_image is not None:
1671
      CheckImageValidity(self.op.zeroing_image,
1672
                         "Zeroing image must be an absolute path or a URL")
1673
      self.cluster.zeroing_image = self.op.zeroing_image
1674

    
1675
    self.cfg.Update(self.cluster, feedback_fn)
1676

    
1677
    if self.op.master_netdev:
1678
      master_params = self.cfg.GetMasterNetworkParameters()
1679
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1680
                  self.op.master_netdev)
1681
      ems = self.cfg.GetUseExternalMipScript()
1682
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1683
                                                     master_params, ems)
1684
      result.Warn("Could not re-enable the master ip on the master,"
1685
                  " please restart manually", self.LogWarning)
1686

    
1687
    if self.op.compression_tools is not None:
1688
      self.cfg.SetCompressionTools(self.op.compression_tools)
1689

    
1690
    network_name = self.op.instance_communication_network
1691
    if network_name is not None:
1692
      return self._ModifyInstanceCommunicationNetwork(self.cfg,
1693
                                                      network_name, feedback_fn)
1694
    else:
1695
      return None
1696

    
1697

    
1698
class LUClusterVerify(NoHooksLU):
1699
  """Submits all jobs necessary to verify the cluster.
1700

1701
  """
1702
  REQ_BGL = False
1703

    
1704
  def ExpandNames(self):
1705
    self.needed_locks = {}
1706

    
1707
  def Exec(self, feedback_fn):
1708
    jobs = []
1709

    
1710
    if self.op.group_name:
1711
      groups = [self.op.group_name]
1712
      depends_fn = lambda: None
1713
    else:
1714
      groups = self.cfg.GetNodeGroupList()
1715

    
1716
      # Verify global configuration
1717
      jobs.append([
1718
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1719
        ])
1720

    
1721
      # Always depend on global verification
1722
      depends_fn = lambda: [(-len(jobs), [])]
1723

    
1724
    jobs.extend(
1725
      [opcodes.OpClusterVerifyGroup(group_name=group,
1726
                                    ignore_errors=self.op.ignore_errors,
1727
                                    depends=depends_fn())]
1728
      for group in groups)
1729

    
1730
    # Fix up all parameters
1731
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1732
      op.debug_simulate_errors = self.op.debug_simulate_errors
1733
      op.verbose = self.op.verbose
1734
      op.error_codes = self.op.error_codes
1735
      try:
1736
        op.skip_checks = self.op.skip_checks
1737
      except AttributeError:
1738
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1739

    
1740
    return ResultWithJobs(jobs)
1741

    
1742

    
1743
class _VerifyErrors(object):
1744
  """Mix-in for cluster/group verify LUs.
1745

1746
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1747
  self.op and self._feedback_fn to be available.)
1748

1749
  """
1750

    
1751
  ETYPE_FIELD = "code"
1752
  ETYPE_ERROR = constants.CV_ERROR
1753
  ETYPE_WARNING = constants.CV_WARNING
1754

    
1755
  def _Error(self, ecode, item, msg, *args, **kwargs):
1756
    """Format an error message.
1757

1758
    Based on the opcode's error_codes parameter, either format a
1759
    parseable error code, or a simpler error string.
1760

1761
    This must be called only from Exec and functions called from Exec.
1762

1763
    """
1764
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1765
    itype, etxt, _ = ecode
1766
    # If the error code is in the list of ignored errors, demote the error to a
1767
    # warning
1768
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1769
      ltype = self.ETYPE_WARNING
1770
    # first complete the msg
1771
    if args:
1772
      msg = msg % args
1773
    # then format the whole message
1774
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1775
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1776
    else:
1777
      if item:
1778
        item = " " + item
1779
      else:
1780
        item = ""
1781
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1782
    # and finally report it via the feedback_fn
1783
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1784
    # do not mark the operation as failed for WARN cases only
1785
    if ltype == self.ETYPE_ERROR:
1786
      self.bad = True
1787

    
1788
  def _ErrorIf(self, cond, *args, **kwargs):
1789
    """Log an error message if the passed condition is True.
1790

1791
    """
1792
    if (bool(cond)
1793
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1794
      self._Error(*args, **kwargs)
1795

    
1796

    
1797
def _GetAllHypervisorParameters(cluster, instances):
1798
  """Compute the set of all hypervisor parameters.
1799

1800
  @type cluster: L{objects.Cluster}
1801
  @param cluster: the cluster object
1802
  @param instances: list of L{objects.Instance}
1803
  @param instances: additional instances from which to obtain parameters
1804
  @rtype: list of (origin, hypervisor, parameters)
1805
  @return: a list with all parameters found, indicating the hypervisor they
1806
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1807

1808
  """
1809
  hvp_data = []
1810

    
1811
  for hv_name in cluster.enabled_hypervisors:
1812
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1813

    
1814
  for os_name, os_hvp in cluster.os_hvp.items():
1815
    for hv_name, hv_params in os_hvp.items():
1816
      if hv_params:
1817
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1818
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1819

    
1820
  # TODO: collapse identical parameter values in a single one
1821
  for instance in instances:
1822
    if instance.hvparams:
1823
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1824
                       cluster.FillHV(instance)))
1825

    
1826
  return hvp_data
1827

    
1828

    
1829
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1830
  """Verifies the cluster config.
1831

1832
  """
1833
  REQ_BGL = False
1834

    
1835
  def _VerifyHVP(self, hvp_data):
1836
    """Verifies locally the syntax of the hypervisor parameters.
1837

1838
    """
1839
    for item, hv_name, hv_params in hvp_data:
1840
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1841
             (item, hv_name))
1842
      try:
1843
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1844
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1845
        hv_class.CheckParameterSyntax(hv_params)
1846
      except errors.GenericError, err:
1847
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1848

    
1849
  def ExpandNames(self):
1850
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1851
    self.share_locks = ShareAll()
1852

    
1853
  def CheckPrereq(self):
1854
    """Check prerequisites.
1855

1856
    """
1857
    # Retrieve all information
1858
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1859
    self.all_node_info = self.cfg.GetAllNodesInfo()
1860
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1861

    
1862
  def Exec(self, feedback_fn):
1863
    """Verify integrity of cluster, performing various test on nodes.
1864

1865
    """
1866
    self.bad = False
1867
    self._feedback_fn = feedback_fn
1868

    
1869
    # Force the configuration to be fully distributed before doing any tests
1870
    self.cfg.FlushConfig()
1871

    
1872
    feedback_fn("* Verifying cluster config")
1873

    
1874
    for msg in self.cfg.VerifyConfig():
1875
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1876

    
1877
    feedback_fn("* Verifying cluster certificate files")
1878

    
1879
    for cert_filename in pathutils.ALL_CERT_FILES:
1880
      (errcode, msg) = utils.VerifyCertificate(cert_filename)
1881
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1882

    
1883
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1884
                                    pathutils.NODED_CERT_FILE),
1885
                  constants.CV_ECLUSTERCERT,
1886
                  None,
1887
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1888
                    constants.LUXID_USER + " user")
1889

    
1890
    feedback_fn("* Verifying hypervisor parameters")
1891

    
1892
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1893
                                                self.all_inst_info.values()))
1894

    
1895
    feedback_fn("* Verifying all nodes belong to an existing group")
1896

    
1897
    # We do this verification here because, should this bogus circumstance
1898
    # occur, it would never be caught by VerifyGroup, which only acts on
1899
    # nodes/instances reachable from existing node groups.
1900

    
1901
    dangling_nodes = set(node for node in self.all_node_info.values()
1902
                         if node.group not in self.all_group_info)
1903

    
1904
    dangling_instances = {}
1905
    no_node_instances = []
1906

    
1907
    for inst in self.all_inst_info.values():
1908
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1909
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1910
      elif inst.primary_node not in self.all_node_info:
1911
        no_node_instances.append(inst)
1912

    
1913
    pretty_dangling = [
1914
        "%s (%s)" %
1915
        (node.name,
1916
         utils.CommaJoin(inst.name for
1917
                         inst in dangling_instances.get(node.uuid, [])))
1918
        for node in dangling_nodes]
1919

    
1920
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1921
                  None,
1922
                  "the following nodes (and their instances) belong to a non"
1923
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1924

    
1925
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1926
                  None,
1927
                  "the following instances have a non-existing primary-node:"
1928
                  " %s", utils.CommaJoin(inst.name for
1929
                                         inst in no_node_instances))
1930

    
1931
    return not self.bad
1932

    
1933

    
1934
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1935
  """Verifies the status of a node group.
1936

1937
  """
1938
  HPATH = "cluster-verify"
1939
  HTYPE = constants.HTYPE_CLUSTER
1940
  REQ_BGL = False
1941

    
1942
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1943

    
1944
  class NodeImage(object):
1945
    """A class representing the logical and physical status of a node.
1946

1947
    @type uuid: string
1948
    @ivar uuid: the node UUID to which this object refers
1949
    @ivar volumes: a structure as returned from
1950
        L{ganeti.backend.GetVolumeList} (runtime)
1951
    @ivar instances: a list of running instances (runtime)
1952
    @ivar pinst: list of configured primary instances (config)
1953
    @ivar sinst: list of configured secondary instances (config)
1954
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1955
        instances for which this node is secondary (config)
1956
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1957
    @ivar dfree: free disk, as reported by the node (runtime)
1958
    @ivar offline: the offline status (config)
1959
    @type rpc_fail: boolean
1960
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1961
        not whether the individual keys were correct) (runtime)
1962
    @type lvm_fail: boolean
1963
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1964
    @type hyp_fail: boolean
1965
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1966
    @type ghost: boolean
1967
    @ivar ghost: whether this is a known node or not (config)
1968
    @type os_fail: boolean
1969
    @ivar os_fail: whether the RPC call didn't return valid OS data
1970
    @type oslist: list
1971
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1972
    @type vm_capable: boolean
1973
    @ivar vm_capable: whether the node can host instances
1974
    @type pv_min: float
1975
    @ivar pv_min: size in MiB of the smallest PVs
1976
    @type pv_max: float
1977
    @ivar pv_max: size in MiB of the biggest PVs
1978

1979
    """
1980
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1981
      self.uuid = uuid
1982
      self.volumes = {}
1983
      self.instances = []
1984
      self.pinst = []
1985
      self.sinst = []
1986
      self.sbp = {}
1987
      self.mfree = 0
1988
      self.dfree = 0
1989
      self.offline = offline
1990
      self.vm_capable = vm_capable
1991
      self.rpc_fail = False
1992
      self.lvm_fail = False
1993
      self.hyp_fail = False
1994
      self.ghost = False
1995
      self.os_fail = False
1996
      self.oslist = {}
1997
      self.pv_min = None
1998
      self.pv_max = None
1999

    
2000
  def ExpandNames(self):
2001
    # This raises errors.OpPrereqError on its own:
2002
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
2003

    
2004
    # Get instances in node group; this is unsafe and needs verification later
2005
    inst_uuids = \
2006
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
2007

    
2008
    self.needed_locks = {
2009
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
2010
      locking.LEVEL_NODEGROUP: [self.group_uuid],
2011
      locking.LEVEL_NODE: [],
2012

    
2013
      # This opcode is run by watcher every five minutes and acquires all nodes
2014
      # for a group. It doesn't run for a long time, so it's better to acquire
2015
      # the node allocation lock as well.
2016
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
2017
      }
2018

    
2019
    self.share_locks = ShareAll()
2020

    
2021
  def DeclareLocks(self, level):
2022
    if level == locking.LEVEL_NODE:
2023
      # Get members of node group; this is unsafe and needs verification later
2024
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
2025

    
2026
      # In Exec(), we warn about mirrored instances that have primary and
2027
      # secondary living in separate node groups. To fully verify that
2028
      # volumes for these instances are healthy, we will need to do an
2029
      # extra call to their secondaries. We ensure here those nodes will
2030
      # be locked.
2031
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
2032
        # Important: access only the instances whose lock is owned
2033
        instance = self.cfg.GetInstanceInfoByName(inst_name)
2034
        if instance.disk_template in constants.DTS_INT_MIRROR:
2035
          nodes.update(self.cfg.GetInstanceSecondaryNodes(instance.uuid))
2036

    
2037
      self.needed_locks[locking.LEVEL_NODE] = nodes
2038

    
2039
  def CheckPrereq(self):
2040
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
2041
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
2042

    
2043
    group_node_uuids = set(self.group_info.members)
2044
    group_inst_uuids = \
2045
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
2046

    
2047
    unlocked_node_uuids = \
2048
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
2049

    
2050
    unlocked_inst_uuids = \
2051
        group_inst_uuids.difference(
2052
          [self.cfg.GetInstanceInfoByName(name).uuid
2053
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
2054

    
2055
    if unlocked_node_uuids:
2056
      raise errors.OpPrereqError(
2057
        "Missing lock for nodes: %s" %
2058
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
2059
        errors.ECODE_STATE)
2060

    
2061
    if unlocked_inst_uuids:
2062
      raise errors.OpPrereqError(
2063
        "Missing lock for instances: %s" %
2064
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
2065
        errors.ECODE_STATE)
2066

    
2067
    self.all_node_info = self.cfg.GetAllNodesInfo()
2068
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
2069

    
2070
    self.my_node_uuids = group_node_uuids
2071
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
2072
                             for node_uuid in group_node_uuids)
2073

    
2074
    self.my_inst_uuids = group_inst_uuids
2075
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
2076
                             for inst_uuid in group_inst_uuids)
2077

    
2078
    # We detect here the nodes that will need the extra RPC calls for verifying
2079
    # split LV volumes; they should be locked.
2080
    extra_lv_nodes = set()
2081

    
2082
    for inst in self.my_inst_info.values():
2083
      if inst.disk_template in constants.DTS_INT_MIRROR:
2084
        inst_nodes = self.cfg.GetInstanceNodes(inst.uuid)
2085
        for nuuid in inst_nodes:
2086
          if self.all_node_info[nuuid].group != self.group_uuid:
2087
            extra_lv_nodes.add(nuuid)
2088

    
2089
    unlocked_lv_nodes = \
2090
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
2091

    
2092
    if unlocked_lv_nodes:
2093
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
2094
                                 utils.CommaJoin(unlocked_lv_nodes),
2095
                                 errors.ECODE_STATE)
2096
    self.extra_lv_nodes = list(extra_lv_nodes)
2097

    
2098
  def _VerifyNode(self, ninfo, nresult):
2099
    """Perform some basic validation on data returned from a node.
2100

2101
      - check the result data structure is well formed and has all the
2102
        mandatory fields
2103
      - check ganeti version
2104

2105
    @type ninfo: L{objects.Node}
2106
    @param ninfo: the node to check
2107
    @param nresult: the results from the node
2108
    @rtype: boolean
2109
    @return: whether overall this call was successful (and we can expect
2110
         reasonable values in the respose)
2111

2112
    """
2113
    # main result, nresult should be a non-empty dict
2114
    test = not nresult or not isinstance(nresult, dict)
2115
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
2116
                  "unable to verify node: no data returned")
2117
    if test:
2118
      return False
2119

    
2120
    # compares ganeti version
2121
    local_version = constants.PROTOCOL_VERSION
2122
    remote_version = nresult.get("version", None)
2123
    test = not (remote_version and
2124
                isinstance(remote_version, (list, tuple)) and
2125
                len(remote_version) == 2)
2126
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
2127
                  "connection to node returned invalid data")
2128
    if test:
2129
      return False
2130

    
2131
    test = local_version != remote_version[0]
2132
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
2133
                  "incompatible protocol versions: master %s,"
2134
                  " node %s", local_version, remote_version[0])
2135
    if test:
2136
      return False
2137

    
2138
    # node seems compatible, we can actually try to look into its results
2139

    
2140
    # full package version
2141
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
2142
                  constants.CV_ENODEVERSION, ninfo.name,
2143
                  "software version mismatch: master %s, node %s",
2144
                  constants.RELEASE_VERSION, remote_version[1],
2145
                  code=self.ETYPE_WARNING)
2146

    
2147
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
2148
    if ninfo.vm_capable and isinstance(hyp_result, dict):
2149
      for hv_name, hv_result in hyp_result.iteritems():
2150
        test = hv_result is not None
2151
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2152
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
2153

    
2154
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
2155
    if ninfo.vm_capable and isinstance(hvp_result, list):
2156
      for item, hv_name, hv_result in hvp_result:
2157
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
2158
                      "hypervisor %s parameter verify failure (source %s): %s",
2159
                      hv_name, item, hv_result)
2160

    
2161
    test = nresult.get(constants.NV_NODESETUP,
2162
                       ["Missing NODESETUP results"])
2163
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
2164
                  "node setup error: %s", "; ".join(test))
2165

    
2166
    return True
2167

    
2168
  def _VerifyNodeTime(self, ninfo, nresult,
2169
                      nvinfo_starttime, nvinfo_endtime):
2170
    """Check the node time.
2171

2172
    @type ninfo: L{objects.Node}
2173
    @param ninfo: the node to check
2174
    @param nresult: the remote results for the node
2175
    @param nvinfo_starttime: the start time of the RPC call
2176
    @param nvinfo_endtime: the end time of the RPC call
2177

2178
    """
2179
    ntime = nresult.get(constants.NV_TIME, None)
2180
    try:
2181
      ntime_merged = utils.MergeTime(ntime)
2182
    except (ValueError, TypeError):
2183
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
2184
                    "Node returned invalid time")
2185
      return
2186

    
2187
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
2188
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
2189
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
2190
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
2191
    else:
2192
      ntime_diff = None
2193

    
2194
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
2195
                  "Node time diverges by at least %s from master node time",
2196
                  ntime_diff)
2197

    
2198
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
2199
    """Check the node LVM results and update info for cross-node checks.
2200

2201
    @type ninfo: L{objects.Node}
2202
    @param ninfo: the node to check
2203
    @param nresult: the remote results for the node
2204
    @param vg_name: the configured VG name
2205
    @type nimg: L{NodeImage}
2206
    @param nimg: node image
2207

2208
    """
2209
    if vg_name is None:
2210
      return
2211

    
2212
    # checks vg existence and size > 20G
2213
    vglist = nresult.get(constants.NV_VGLIST, None)
2214
    test = not vglist
2215
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2216
                  "unable to check volume groups")
2217
    if not test:
2218
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
2219
                                            constants.MIN_VG_SIZE)
2220
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
2221

    
2222
    # Check PVs
2223
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
2224
    for em in errmsgs:
2225
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
2226
    if pvminmax is not None:
2227
      (nimg.pv_min, nimg.pv_max) = pvminmax
2228

    
2229
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
2230
    """Check cross-node DRBD version consistency.
2231

2232
    @type node_verify_infos: dict
2233
    @param node_verify_infos: infos about nodes as returned from the
2234
      node_verify call.
2235

2236
    """
2237
    node_versions = {}
2238
    for node_uuid, ndata in node_verify_infos.items():
2239
      nresult = ndata.payload
2240
      if nresult:
2241
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
2242
        node_versions[node_uuid] = version
2243

    
2244
    if len(set(node_versions.values())) > 1:
2245
      for node_uuid, version in sorted(node_versions.items()):
2246
        msg = "DRBD version mismatch: %s" % version
2247
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
2248
                    code=self.ETYPE_WARNING)
2249

    
2250
  def _VerifyGroupLVM(self, node_image, vg_name):
2251
    """Check cross-node consistency in LVM.
2252

2253
    @type node_image: dict
2254
    @param node_image: info about nodes, mapping from node to names to
2255
      L{NodeImage} objects
2256
    @param vg_name: the configured VG name
2257

2258
    """
2259
    if vg_name is None:
2260
      return
2261

    
2262
    # Only exclusive storage needs this kind of checks
2263
    if not self._exclusive_storage:
2264
      return
2265

    
2266
    # exclusive_storage wants all PVs to have the same size (approximately),
2267
    # if the smallest and the biggest ones are okay, everything is fine.
2268
    # pv_min is None iff pv_max is None
2269
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
2270
    if not vals:
2271
      return
2272
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
2273
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
2274
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
2275
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
2276
                  "PV sizes differ too much in the group; smallest (%s MB) is"
2277
                  " on %s, biggest (%s MB) is on %s",
2278
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
2279
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
2280

    
2281
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
2282
    """Check the node bridges.
2283

2284
    @type ninfo: L{objects.Node}
2285
    @param ninfo: the node to check
2286
    @param nresult: the remote results for the node
2287
    @param bridges: the expected list of bridges
2288

2289
    """
2290
    if not bridges:
2291
      return
2292

    
2293
    missing = nresult.get(constants.NV_BRIDGES, None)
2294
    test = not isinstance(missing, list)
2295
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2296
                  "did not return valid bridge information")
2297
    if not test:
2298
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
2299
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
2300

    
2301
  def _VerifyNodeUserScripts(self, ninfo, nresult):
2302
    """Check the results of user scripts presence and executability on the node
2303

2304
    @type ninfo: L{objects.Node}
2305
    @param ninfo: the node to check
2306
    @param nresult: the remote results for the node
2307

2308
    """
2309
    test = not constants.NV_USERSCRIPTS in nresult
2310
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2311
                  "did not return user scripts information")
2312

    
2313
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2314
    if not test:
2315
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2316
                    "user scripts not present or not executable: %s" %
2317
                    utils.CommaJoin(sorted(broken_scripts)))
2318

    
2319
  def _VerifyNodeNetwork(self, ninfo, nresult):
2320
    """Check the node network connectivity results.
2321

2322
    @type ninfo: L{objects.Node}
2323
    @param ninfo: the node to check
2324
    @param nresult: the remote results for the node
2325

2326
    """
2327
    test = constants.NV_NODELIST not in nresult
2328
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2329
                  "node hasn't returned node ssh connectivity data")
2330
    if not test:
2331
      if nresult[constants.NV_NODELIST]:
2332
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2333
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2334
                        "ssh communication with node '%s': %s", a_node, a_msg)
2335

    
2336
    test = constants.NV_NODENETTEST not in nresult
2337
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2338
                  "node hasn't returned node tcp connectivity data")
2339
    if not test:
2340
      if nresult[constants.NV_NODENETTEST]:
2341
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2342
        for anode in nlist:
2343
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2344
                        "tcp communication with node '%s': %s",
2345
                        anode, nresult[constants.NV_NODENETTEST][anode])
2346

    
2347
    test = constants.NV_MASTERIP not in nresult
2348
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2349
                  "node hasn't returned node master IP reachability data")
2350
    if not test:
2351
      if not nresult[constants.NV_MASTERIP]:
2352
        if ninfo.uuid == self.master_node:
2353
          msg = "the master node cannot reach the master IP (not configured?)"
2354
        else:
2355
          msg = "cannot reach the master IP"
2356
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2357

    
2358
  def _VerifyInstance(self, instance, node_image, diskstatus):
2359
    """Verify an instance.
2360

2361
    This function checks to see if the required block devices are
2362
    available on the instance's node, and that the nodes are in the correct
2363
    state.
2364

2365
    """
2366
    pnode_uuid = instance.primary_node
2367
    pnode_img = node_image[pnode_uuid]
2368
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2369

    
2370
    node_vol_should = {}
2371
    self.cfg.GetInstanceLVsByNode(instance.uuid, lvmap=node_vol_should)
2372

    
2373
    cluster = self.cfg.GetClusterInfo()
2374
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2375
                                                            self.group_info)
2376
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2377
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2378
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2379

    
2380
    for node_uuid in node_vol_should:
2381
      n_img = node_image[node_uuid]
2382
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2383
        # ignore missing volumes on offline or broken nodes
2384
        continue
2385
      for volume in node_vol_should[node_uuid]:
2386
        test = volume not in n_img.volumes
2387
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2388
                      "volume %s missing on node %s", volume,
2389
                      self.cfg.GetNodeName(node_uuid))
2390

    
2391
    if instance.admin_state == constants.ADMINST_UP:
2392
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2393
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2394
                    "instance not running on its primary node %s",
2395
                     self.cfg.GetNodeName(pnode_uuid))
2396
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2397
                    instance.name, "instance is marked as running and lives on"
2398
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2399

    
2400
    diskdata = [(nname, success, status, idx)
2401
                for (nname, disks) in diskstatus.items()
2402
                for idx, (success, status) in enumerate(disks)]
2403

    
2404
    for nname, success, bdev_status, idx in diskdata:
2405
      # the 'ghost node' construction in Exec() ensures that we have a
2406
      # node here
2407
      snode = node_image[nname]
2408
      bad_snode = snode.ghost or snode.offline
2409
      self._ErrorIf(instance.disks_active and
2410
                    not success and not bad_snode,
2411
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2412
                    "couldn't retrieve status for disk/%s on %s: %s",
2413
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2414

    
2415
      if instance.disks_active and success and \
2416
         (bdev_status.is_degraded or
2417
          bdev_status.ldisk_status != constants.LDS_OKAY):
2418
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2419
        if bdev_status.is_degraded:
2420
          msg += " is degraded"
2421
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2422
          msg += "; state is '%s'" % \
2423
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2424

    
2425
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2426

    
2427
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2428
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2429
                  "instance %s, connection to primary node failed",
2430
                  instance.name)
2431

    
2432
    secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance.uuid)
2433
    self._ErrorIf(len(secondary_nodes) > 1,
2434
                  constants.CV_EINSTANCELAYOUT, instance.name,
2435
                  "instance has multiple secondary nodes: %s",
2436
                  utils.CommaJoin(secondary_nodes),
2437
                  code=self.ETYPE_WARNING)
2438

    
2439
    inst_nodes = self.cfg.GetInstanceNodes(instance.uuid)
2440
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, inst_nodes)
2441
    if any(es_flags.values()):
2442
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2443
        # Disk template not compatible with exclusive_storage: no instance
2444
        # node should have the flag set
2445
        es_nodes = [n
2446
                    for (n, es) in es_flags.items()
2447
                    if es]
2448
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2449
                    "instance has template %s, which is not supported on nodes"
2450
                    " that have exclusive storage set: %s",
2451
                    instance.disk_template,
2452
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2453
      for (idx, disk) in enumerate(self.cfg.GetInstanceDisks(instance.uuid)):
2454
        self._ErrorIf(disk.spindles is None,
2455
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2456
                      "number of spindles not configured for disk %s while"
2457
                      " exclusive storage is enabled, try running"
2458
                      " gnt-cluster repair-disk-sizes", idx)
2459

    
2460
    if instance.disk_template in constants.DTS_INT_MIRROR:
2461
      instance_nodes = utils.NiceSort(inst_nodes)
2462
      instance_groups = {}
2463

    
2464
      for node_uuid in instance_nodes:
2465
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2466
                                   []).append(node_uuid)
2467

    
2468
      pretty_list = [
2469
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2470
                           groupinfo[group].name)
2471
        # Sort so that we always list the primary node first.
2472
        for group, nodes in sorted(instance_groups.items(),
2473
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2474
                                   reverse=True)]
2475

    
2476
      self._ErrorIf(len(instance_groups) > 1,
2477
                    constants.CV_EINSTANCESPLITGROUPS,
2478
                    instance.name, "instance has primary and secondary nodes in"
2479
                    " different groups: %s", utils.CommaJoin(pretty_list),
2480
                    code=self.ETYPE_WARNING)
2481

    
2482
    inst_nodes_offline = []
2483
    for snode in secondary_nodes:
2484
      s_img = node_image[snode]
2485
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2486
                    self.cfg.GetNodeName(snode),
2487
                    "instance %s, connection to secondary node failed",
2488
                    instance.name)
2489

    
2490
      if s_img.offline:
2491
        inst_nodes_offline.append(snode)
2492

    
2493
    # warn that the instance lives on offline nodes
2494
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2495
                  instance.name, "instance has offline secondary node(s) %s",
2496
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2497
    # ... or ghost/non-vm_capable nodes
2498
    for node_uuid in inst_nodes:
2499
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2500
                    instance.name, "instance lives on ghost node %s",
2501
                    self.cfg.GetNodeName(node_uuid))
2502
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2503
                    constants.CV_EINSTANCEBADNODE, instance.name,
2504
                    "instance lives on non-vm_capable node %s",
2505
                    self.cfg.GetNodeName(node_uuid))
2506

    
2507
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2508
    """Verify if there are any unknown volumes in the cluster.
2509

2510
    The .os, .swap and backup volumes are ignored. All other volumes are
2511
    reported as unknown.
2512

2513
    @type reserved: L{ganeti.utils.FieldSet}
2514
    @param reserved: a FieldSet of reserved volume names
2515

2516
    """
2517
    for node_uuid, n_img in node_image.items():
2518
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2519
          self.all_node_info[node_uuid].group != self.group_uuid):
2520
        # skip non-healthy nodes
2521
        continue
2522
      for volume in n_img.volumes:
2523
        test = ((node_uuid not in node_vol_should or
2524
                volume not in node_vol_should[node_uuid]) and
2525
                not reserved.Matches(volume))
2526
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2527
                      self.cfg.GetNodeName(node_uuid),
2528
                      "volume %s is unknown", volume,
2529
                      code=_VerifyErrors.ETYPE_WARNING)
2530

    
2531
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2532
    """Verify N+1 Memory Resilience.
2533

2534
    Check that if one single node dies we can still start all the
2535
    instances it was primary for.
2536

2537
    """
2538
    cluster_info = self.cfg.GetClusterInfo()
2539
    for node_uuid, n_img in node_image.items():
2540
      # This code checks that every node which is now listed as
2541
      # secondary has enough memory to host all instances it is
2542
      # supposed to should a single other node in the cluster fail.
2543
      # FIXME: not ready for failover to an arbitrary node
2544
      # FIXME: does not support file-backed instances
2545
      # WARNING: we currently take into account down instances as well
2546
      # as up ones, considering that even if they're down someone
2547
      # might want to start them even in the event of a node failure.
2548
      if n_img.offline or \
2549
         self.all_node_info[node_uuid].group != self.group_uuid:
2550
        # we're skipping nodes marked offline and nodes in other groups from
2551
        # the N+1 warning, since most likely we don't have good memory
2552
        # information from them; we already list instances living on such
2553
        # nodes, and that's enough warning
2554
        continue
2555
      #TODO(dynmem): also consider ballooning out other instances
2556
      for prinode, inst_uuids in n_img.sbp.items():
2557
        needed_mem = 0
2558
        for inst_uuid in inst_uuids:
2559
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2560
          if bep[constants.BE_AUTO_BALANCE]:
2561
            needed_mem += bep[constants.BE_MINMEM]
2562
        test = n_img.mfree < needed_mem
2563
        self._ErrorIf(test, constants.CV_ENODEN1,
2564
                      self.cfg.GetNodeName(node_uuid),
2565
                      "not enough memory to accomodate instance failovers"
2566
                      " should node %s fail (%dMiB needed, %dMiB available)",
2567
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2568

    
2569
  def _VerifyClientCertificates(self, nodes, all_nvinfo):
2570
    """Verifies the consistency of the client certificates.
2571

2572
    This includes several aspects:
2573
      - the individual validation of all nodes' certificates
2574
      - the consistency of the master candidate certificate map
2575
      - the consistency of the master candidate certificate map with the
2576
        certificates that the master candidates are actually using.
2577

2578
    @param nodes: the list of nodes to consider in this verification
2579
    @param all_nvinfo: the map of results of the verify_node call to
2580
      all nodes
2581

2582
    """
2583
    candidate_certs = self.cfg.GetClusterInfo().candidate_certs
2584
    if candidate_certs is None or len(candidate_certs) == 0:
2585
      self._ErrorIf(
2586
        True, constants.CV_ECLUSTERCLIENTCERT, None,
2587
        "The cluster's list of master candidate certificates is empty."
2588
        " If you just updated the cluster, please run"
2589
        " 'gnt-cluster renew-crypto --new-node-certificates'.")
2590
      return
2591

    
2592
    self._ErrorIf(
2593
      len(candidate_certs) != len(set(candidate_certs.values())),
2594
      constants.CV_ECLUSTERCLIENTCERT, None,
2595
      "There are at least two master candidates configured to use the same"
2596
      " certificate.")
2597

    
2598
    # collect the client certificate
2599
    for node in nodes:
2600
      if node.offline:
2601
        continue
2602

    
2603
      nresult = all_nvinfo[node.uuid]
2604
      if nresult.fail_msg or not nresult.payload:
2605
        continue
2606

    
2607
      (errcode, msg) = nresult.payload.get(constants.NV_CLIENT_CERT, None)
2608

    
2609
      self._ErrorIf(
2610
        errcode is not None, constants.CV_ECLUSTERCLIENTCERT, None,
2611
        "Client certificate of node '%s' failed validation: %s (code '%s')",
2612
        node.uuid, msg, errcode)
2613

    
2614
      if not errcode:
2615
        digest = msg
2616
        if node.master_candidate:
2617
          if node.uuid in candidate_certs:
2618
            self._ErrorIf(
2619
              digest != candidate_certs[node.uuid],
2620
              constants.CV_ECLUSTERCLIENTCERT, None,
2621
              "Client certificate digest of master candidate '%s' does not"
2622
              " match its entry in the cluster's map of master candidate"
2623
              " certificates. Expected: %s Got: %s", node.uuid,
2624
              digest, candidate_certs[node.uuid])
2625
          else:
2626
            self._ErrorIf(
2627
              True, constants.CV_ECLUSTERCLIENTCERT, None,
2628
              "The master candidate '%s' does not have an entry in the"
2629
              " map of candidate certificates.", node.uuid)
2630
            self._ErrorIf(
2631
              digest in candidate_certs.values(),
2632
              constants.CV_ECLUSTERCLIENTCERT, None,
2633
              "Master candidate '%s' is using a certificate of another node.",
2634
              node.uuid)
2635
        else:
2636
          self._ErrorIf(
2637
            node.uuid in candidate_certs,
2638
            constants.CV_ECLUSTERCLIENTCERT, None,
2639
            "Node '%s' is not a master candidate, but still listed in the"
2640
            " map of master candidate certificates.", node.uuid)
2641
          self._ErrorIf(
2642
            (node.uuid not in candidate_certs) and
2643
              (digest in candidate_certs.values()),
2644
            constants.CV_ECLUSTERCLIENTCERT, None,
2645
            "Node '%s' is not a master candidate and is incorrectly using a"
2646
            " certificate of another node which is master candidate.",
2647
            node.uuid)
2648

    
2649
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2650
                   (files_all, files_opt, files_mc, files_vm)):
2651
    """Verifies file checksums collected from all nodes.
2652

2653
    @param nodes: List of L{objects.Node} objects
2654
    @param master_node_uuid: UUID of master node
2655
    @param all_nvinfo: RPC results
2656

2657
    """
2658
    # Define functions determining which nodes to consider for a file
2659
    files2nodefn = [
2660
      (files_all, None),
2661
      (files_mc, lambda node: (node.master_candidate or
2662
                               node.uuid == master_node_uuid)),
2663
      (files_vm, lambda node: node.vm_capable),
2664
      ]
2665

    
2666
    # Build mapping from filename to list of nodes which should have the file
2667
    nodefiles = {}
2668
    for (files, fn) in files2nodefn:
2669
      if fn is None:
2670
        filenodes = nodes
2671
      else:
2672
        filenodes = filter(fn, nodes)
2673
      nodefiles.update((filename,
2674
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2675
                       for filename in files)
2676

    
2677
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2678

    
2679
    fileinfo = dict((filename, {}) for filename in nodefiles)
2680
    ignore_nodes = set()
2681

    
2682
    for node in nodes:
2683
      if node.offline:
2684
        ignore_nodes.add(node.uuid)
2685
        continue
2686

    
2687
      nresult = all_nvinfo[node.uuid]
2688

    
2689
      if nresult.fail_msg or not nresult.payload:
2690
        node_files = None
2691
      else:
2692
        fingerprints = nresult.payload.get(constants.NV_FILELIST, {})
2693
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2694
                          for (key, value) in fingerprints.items())
2695
        del fingerprints
2696

    
2697
      test = not (node_files and isinstance(node_files, dict))
2698
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2699
                    "Node did not return file checksum data")
2700
      if test:
2701
        ignore_nodes.add(node.uuid)
2702
        continue
2703

    
2704
      # Build per-checksum mapping from filename to nodes having it
2705
      for (filename, checksum) in node_files.items():
2706
        assert filename in nodefiles
2707
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2708

    
2709
    for (filename, checksums) in fileinfo.items():
2710
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2711

    
2712
      # Nodes having the file
2713
      with_file = frozenset(node_uuid
2714
                            for node_uuids in fileinfo[filename].values()
2715
                            for node_uuid in node_uuids) - ignore_nodes
2716

    
2717
      expected_nodes = nodefiles[filename] - ignore_nodes
2718

    
2719
      # Nodes missing file
2720
      missing_file = expected_nodes - with_file
2721

    
2722
      if filename in files_opt:
2723
        # All or no nodes
2724
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2725
                      constants.CV_ECLUSTERFILECHECK, None,
2726
                      "File %s is optional, but it must exist on all or no"
2727
                      " nodes (not found on %s)",
2728
                      filename,
2729
                      utils.CommaJoin(
2730
                        utils.NiceSort(
2731
                          map(self.cfg.GetNodeName, missing_file))))
2732
      else:
2733
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2734
                      "File %s is missing from node(s) %s", filename,
2735
                      utils.CommaJoin(
2736
                        utils.NiceSort(
2737
                          map(self.cfg.GetNodeName, missing_file))))
2738

    
2739
        # Warn if a node has a file it shouldn't
2740
        unexpected = with_file - expected_nodes
2741
        self._ErrorIf(unexpected,
2742
                      constants.CV_ECLUSTERFILECHECK, None,
2743
                      "File %s should not exist on node(s) %s",
2744
                      filename, utils.CommaJoin(
2745
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2746

    
2747
      # See if there are multiple versions of the file
2748
      test = len(checksums) > 1
2749
      if test:
2750
        variants = ["variant %s on %s" %
2751
                    (idx + 1,
2752
                     utils.CommaJoin(utils.NiceSort(
2753
                       map(self.cfg.GetNodeName, node_uuids))))
2754
                    for (idx, (checksum, node_uuids)) in
2755
                      enumerate(sorted(checksums.items()))]
2756
      else:
2757
        variants = []
2758

    
2759
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2760
                    "File %s found with %s different checksums (%s)",
2761
                    filename, len(checksums), "; ".join(variants))
2762

    
2763
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2764
    """Verify the drbd helper.
2765

2766
    """
2767
    if drbd_helper:
2768
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2769
      test = (helper_result is None)
2770
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2771
                    "no drbd usermode helper returned")
2772
      if helper_result:
2773
        status, payload = helper_result
2774
        test = not status
2775
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2776
                      "drbd usermode helper check unsuccessful: %s", payload)
2777
        test = status and (payload != drbd_helper)
2778
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2779
                      "wrong drbd usermode helper: %s", payload)
2780

    
2781
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2782
                      drbd_map):
2783
    """Verifies and the node DRBD status.
2784

2785
    @type ninfo: L{objects.Node}
2786
    @param ninfo: the node to check
2787
    @param nresult: the remote results for the node
2788
    @param instanceinfo: the dict of instances
2789
    @param drbd_helper: the configured DRBD usermode helper
2790
    @param drbd_map: the DRBD map as returned by
2791
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2792

2793
    """
2794
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2795

    
2796
    # compute the DRBD minors
2797
    node_drbd = {}
2798
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2799
      test = inst_uuid not in instanceinfo
2800
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2801
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2802
        # ghost instance should not be running, but otherwise we
2803
        # don't give double warnings (both ghost instance and
2804
        # unallocated minor in use)
2805
      if test:
2806
        node_drbd[minor] = (inst_uuid, False)
2807
      else:
2808
        instance = instanceinfo[inst_uuid]
2809
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2810

    
2811
    # and now check them
2812
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2813
    test = not isinstance(used_minors, (tuple, list))
2814
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2815
                  "cannot parse drbd status file: %s", str(used_minors))
2816
    if test:
2817
      # we cannot check drbd status
2818
      return
2819

    
2820
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2821
      test = minor not in used_minors and must_exist
2822
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2823
                    "drbd minor %d of instance %s is not active", minor,
2824
                    self.cfg.GetInstanceName(inst_uuid))
2825
    for minor in used_minors:
2826
      test = minor not in node_drbd
2827
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2828
                    "unallocated drbd minor %d is in use", minor)
2829

    
2830
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2831
    """Builds the node OS structures.
2832

2833
    @type ninfo: L{objects.Node}
2834
    @param ninfo: the node to check
2835
    @param nresult: the remote results for the node
2836
    @param nimg: the node image object
2837

2838
    """
2839
    remote_os = nresult.get(constants.NV_OSLIST, None)
2840
    test = (not isinstance(remote_os, list) or
2841
            not compat.all(isinstance(v, list) and len(v) == 8
2842
                           for v in remote_os))
2843

    
2844
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2845
                  "node hasn't returned valid OS data")
2846

    
2847
    nimg.os_fail = test
2848

    
2849
    if test:
2850
      return
2851

    
2852
    os_dict = {}
2853

    
2854
    for (name, os_path, status, diagnose,
2855
         variants, parameters, api_ver,
2856
         trusted) in nresult[constants.NV_OSLIST]:
2857

    
2858
      if name not in os_dict:
2859
        os_dict[name] = []
2860

    
2861
      # parameters is a list of lists instead of list of tuples due to
2862
      # JSON lacking a real tuple type, fix it:
2863
      parameters = [tuple(v) for v in parameters]
2864
      os_dict[name].append((os_path, status, diagnose,
2865
                            set(variants), set(parameters), set(api_ver),
2866
                            trusted))
2867

    
2868
    nimg.oslist = os_dict
2869

    
2870
  def _VerifyNodeOS(self, ninfo, nimg, base):
2871
    """Verifies the node OS list.
2872

2873
    @type ninfo: L{objects.Node}
2874
    @param ninfo: the node to check
2875
    @param nimg: the node image object
2876
    @param base: the 'template' node we match against (e.g. from the master)
2877

2878
    """
2879
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2880

    
2881
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2882
    for os_name, os_data in nimg.oslist.items():
2883
      assert os_data, "Empty OS status for OS %s?!" % os_name
2884
      f_path, f_status, f_diag, f_var, f_param, f_api, f_trusted = os_data[0]
2885
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2886
                    "Invalid OS %s (located at %s): %s",
2887
                    os_name, f_path, f_diag)
2888
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2889
                    "OS '%s' has multiple entries"
2890
                    " (first one shadows the rest): %s",
2891
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2892
      # comparisons with the 'base' image
2893
      test = os_name not in base.oslist
2894
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2895
                    "Extra OS %s not present on reference node (%s)",
2896
                    os_name, self.cfg.GetNodeName(base.uuid))
2897
      if test:
2898
        continue
2899
      assert base.oslist[os_name], "Base node has empty OS status?"
2900
      _, b_status, _, b_var, b_param, b_api, b_trusted = base.oslist[os_name][0]
2901
      if not b_status:
2902
        # base OS is invalid, skipping
2903
        continue
2904
      for kind, a, b in [("API version", f_api, b_api),
2905
                         ("variants list", f_var, b_var),
2906
                         ("parameters", beautify_params(f_param),
2907
                          beautify_params(b_param))]:
2908
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2909
                      "OS %s for %s differs from reference node %s:"
2910
                      " [%s] vs. [%s]", kind, os_name,
2911
                      self.cfg.GetNodeName(base.uuid),
2912
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2913
      for kind, a, b in [("trusted", f_trusted, b_trusted)]:
2914
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2915
                      "OS %s for %s differs from reference node %s:"
2916
                      " %s vs. %s", kind, os_name,
2917
                      self.cfg.GetNodeName(base.uuid), a, b)
2918

    
2919
    # check any missing OSes
2920
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2921
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2922
                  "OSes present on reference node %s"
2923
                  " but missing on this node: %s",
2924
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2925

    
2926
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2927
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2928

2929
    @type ninfo: L{objects.Node}
2930
    @param ninfo: the node to check
2931
    @param nresult: the remote results for the node
2932
    @type is_master: bool
2933
    @param is_master: Whether node is the master node
2934

2935
    """
2936
    cluster = self.cfg.GetClusterInfo()
2937
    if (is_master and
2938
        (cluster.IsFileStorageEnabled() or
2939
         cluster.IsSharedFileStorageEnabled())):
2940
      try:
2941
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2942
      except KeyError:
2943
        # This should never happen
2944
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2945
                      "Node did not return forbidden file storage paths")
2946
      else:
2947
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2948
                      "Found forbidden file storage paths: %s",
2949
                      utils.CommaJoin(fspaths))
2950
    else:
2951
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2952
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2953
                    "Node should not have returned forbidden file storage"
2954
                    " paths")
2955

    
2956
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2957
                          verify_key, error_key):
2958
    """Verifies (file) storage paths.
2959

2960
    @type ninfo: L{objects.Node}
2961
    @param ninfo: the node to check
2962
    @param nresult: the remote results for the node
2963
    @type file_disk_template: string
2964
    @param file_disk_template: file-based disk template, whose directory
2965
        is supposed to be verified
2966
    @type verify_key: string
2967
    @param verify_key: key for the verification map of this file
2968
        verification step
2969
    @param error_key: error key to be added to the verification results
2970
        in case something goes wrong in this verification step
2971

2972
    """
2973
    assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
2974
              constants.ST_FILE, constants.ST_SHARED_FILE
2975
           ))
2976

    
2977
    cluster = self.cfg.GetClusterInfo()
2978
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2979
      self._ErrorIf(
2980
          verify_key in nresult,
2981
          error_key, ninfo.name,
2982
          "The configured %s storage path is unusable: %s" %
2983
          (file_disk_template, nresult.get(verify_key)))
2984

    
2985
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2986
    """Verifies (file) storage paths.
2987

2988
    @see: C{_VerifyStoragePaths}
2989

2990
    """
2991
    self._VerifyStoragePaths(
2992
        ninfo, nresult, constants.DT_FILE,
2993
        constants.NV_FILE_STORAGE_PATH,
2994
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2995

    
2996
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2997
    """Verifies (file) storage paths.
2998

2999
    @see: C{_VerifyStoragePaths}
3000

3001
    """
3002
    self._VerifyStoragePaths(
3003
        ninfo, nresult, constants.DT_SHARED_FILE,
3004
        constants.NV_SHARED_FILE_STORAGE_PATH,
3005
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
3006

    
3007
  def _VerifyOob(self, ninfo, nresult):
3008
    """Verifies out of band functionality of a node.
3009

3010
    @type ninfo: L{objects.Node}
3011
    @param ninfo: the node to check
3012
    @param nresult: the remote results for the node
3013

3014
    """
3015
    # We just have to verify the paths on master and/or master candidates
3016
    # as the oob helper is invoked on the master
3017
    if ((ninfo.master_candidate or ninfo.master_capable) and
3018
        constants.NV_OOB_PATHS in nresult):
3019
      for path_result in nresult[constants.NV_OOB_PATHS]:
3020
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
3021
                      ninfo.name, path_result)
3022

    
3023
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
3024
    """Verifies and updates the node volume data.
3025

3026
    This function will update a L{NodeImage}'s internal structures
3027
    with data from the remote call.
3028

3029
    @type ninfo: L{objects.Node}
3030
    @param ninfo: the node to check
3031
    @param nresult: the remote results for the node
3032
    @param nimg: the node image object
3033
    @param vg_name: the configured VG name
3034

3035
    """
3036
    nimg.lvm_fail = True
3037
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
3038
    if vg_name is None:
3039
      pass
3040
    elif isinstance(lvdata, basestring):
3041
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
3042
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
3043
    elif not isinstance(lvdata, dict):
3044
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
3045
                    "rpc call to node failed (lvlist)")
3046
    else:
3047
      nimg.volumes = lvdata
3048
      nimg.lvm_fail = False
3049

    
3050
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
3051
    """Verifies and updates the node instance list.
3052

3053
    If the listing was successful, then updates this node's instance
3054
    list. Otherwise, it marks the RPC call as failed for the instance
3055
    list key.
3056

3057
    @type ninfo: L{objects.Node}
3058
    @param ninfo: the node to check
3059
    @param nresult: the remote results for the node
3060
    @param nimg: the node image object
3061

3062
    """
3063
    idata = nresult.get(constants.NV_INSTANCELIST, None)
3064
    test = not isinstance(idata, list)
3065
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
3066
                  "rpc call to node failed (instancelist): %s",
3067
                  utils.SafeEncode(str(idata)))
3068
    if test:
3069
      nimg.hyp_fail = True
3070
    else:
3071
      nimg.instances = [inst.uuid for (_, inst) in
3072
                        self.cfg.GetMultiInstanceInfoByName(idata)]
3073

    
3074
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
3075
    """Verifies and computes a node information map
3076

3077
    @type ninfo: L{objects.Node}
3078
    @param ninfo: the node to check
3079
    @param nresult: the remote results for the node
3080
    @param nimg: the node image object
3081
    @param vg_name: the configured VG name
3082

3083
    """
3084
    # try to read free memory (from the hypervisor)
3085
    hv_info = nresult.get(constants.NV_HVINFO, None)
3086
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
3087
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
3088
                  "rpc call to node failed (hvinfo)")
3089
    if not test:
3090
      try:
3091
        nimg.mfree = int(hv_info["memory_free"])
3092
      except (ValueError, TypeError):
3093
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
3094
                      "node returned invalid nodeinfo, check hypervisor")
3095

    
3096
    # FIXME: devise a free space model for file based instances as well
3097
    if vg_name is not None:
3098
      test = (constants.NV_VGLIST not in nresult or
3099
              vg_name not in nresult[constants.NV_VGLIST])
3100
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
3101
                    "node didn't return data for the volume group '%s'"
3102
                    " - it is either missing or broken", vg_name)
3103
      if not test:
3104
        try:
3105
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
3106
        except (ValueError, TypeError):
3107
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
3108
                        "node returned invalid LVM info, check LVM status")
3109

    
3110
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
3111
    """Gets per-disk status information for all instances.
3112

3113
    @type node_uuids: list of strings
3114
    @param node_uuids: Node UUIDs
3115
    @type node_image: dict of (UUID, L{objects.Node})
3116
    @param node_image: Node objects
3117
    @type instanceinfo: dict of (UUID, L{objects.Instance})
3118
    @param instanceinfo: Instance objects
3119
    @rtype: {instance: {node: [(succes, payload)]}}
3120
    @return: a dictionary of per-instance dictionaries with nodes as
3121
        keys and disk information as values; the disk information is a
3122
        list of tuples (success, payload)
3123

3124
    """
3125
    node_disks = {}
3126
    node_disks_dev_inst_only = {}
3127
    diskless_instances = set()
3128
    nodisk_instances = set()
3129
    diskless = constants.DT_DISKLESS
3130

    
3131
    for nuuid in node_uuids:
3132
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
3133
                                             node_image[nuuid].sinst))
3134
      diskless_instances.update(uuid for uuid in node_inst_uuids
3135
                                if instanceinfo[uuid].disk_template == diskless)
3136
      disks = [(inst_uuid, disk)
3137
               for inst_uuid in node_inst_uuids
3138
               for disk in self.cfg.GetInstanceDisks(inst_uuid)]
3139

    
3140
      if not disks:
3141
        nodisk_instances.update(uuid for uuid in node_inst_uuids
3142
                                if instanceinfo[uuid].disk_template != diskless)
3143
        # No need to collect data
3144
        continue
3145

    
3146
      node_disks[nuuid] = disks
3147

    
3148
      # _AnnotateDiskParams makes already copies of the disks
3149
      dev_inst_only = []
3150
      for (inst_uuid, dev) in disks:
3151
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
3152
                                          self.cfg)
3153
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
3154

    
3155
      node_disks_dev_inst_only[nuuid] = dev_inst_only
3156

    
3157
    assert len(node_disks) == len(node_disks_dev_inst_only)
3158

    
3159
    # Collect data from all nodes with disks
3160
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
3161
               node_disks.keys(), node_disks_dev_inst_only)
3162

    
3163
    assert len(result) == len(node_disks)
3164

    
3165
    instdisk = {}
3166

    
3167
    for (nuuid, nres) in result.items():
3168
      node = self.cfg.GetNodeInfo(nuuid)
3169
      disks = node_disks[node.uuid]
3170

    
3171
      if nres.offline:
3172
        # No data from this node
3173
        data = len(disks) * [(False, "node offline")]
3174
      else:
3175
        msg = nres.fail_msg
3176
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
3177
                      "while getting disk information: %s", msg)
3178
        if msg:
3179
          # No data from this node
3180
          data = len(disks) * [(False, msg)]
3181
        else:
3182
          data = []
3183
          for idx, i in enumerate(nres.payload):
3184
            if isinstance(i, (tuple, list)) and len(i) == 2:
3185
              data.append(i)
3186
            else:
3187
              logging.warning("Invalid result from node %s, entry %d: %s",
3188
                              node.name, idx, i)
3189
              data.append((False, "Invalid result from the remote node"))
3190

    
3191
      for ((inst_uuid, _), status) in zip(disks, data):
3192
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
3193
          .append(status)
3194

    
3195
    # Add empty entries for diskless instances.
3196
    for inst_uuid in diskless_instances:
3197
      assert inst_uuid not in instdisk
3198
      instdisk[inst_uuid] = {}
3199
    # ...and disk-full instances that happen to have no disks
3200
    for inst_uuid in nodisk_instances:
3201
      assert inst_uuid not in instdisk
3202
      instdisk[inst_uuid] = {}
3203

    
3204
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
3205
                      len(nuuids) <= len(
3206
                        self.cfg.GetInstanceNodes(instanceinfo[inst].uuid)) and
3207
                      compat.all(isinstance(s, (tuple, list)) and
3208
                                 len(s) == 2 for s in statuses)
3209
                      for inst, nuuids in instdisk.items()
3210
                      for nuuid, statuses in nuuids.items())
3211
    if __debug__:
3212
      instdisk_keys = set(instdisk)
3213
      instanceinfo_keys = set(instanceinfo)
3214
      assert instdisk_keys == instanceinfo_keys, \
3215
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
3216
         (instdisk_keys, instanceinfo_keys))
3217

    
3218
    return instdisk
3219

    
3220
  @staticmethod
3221
  def _SshNodeSelector(group_uuid, all_nodes):
3222
    """Create endless iterators for all potential SSH check hosts.
3223

3224
    """
3225
    nodes = [node for node in all_nodes
3226
             if (node.group != group_uuid and
3227
                 not node.offline)]
3228
    keyfunc = operator.attrgetter("group")
3229

    
3230
    return map(itertools.cycle,
3231
               [sorted(map(operator.attrgetter("name"), names))
3232
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
3233
                                                  keyfunc)])
3234

    
3235
  @classmethod
3236
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
3237
    """Choose which nodes should talk to which other nodes.
3238

3239
    We will make nodes contact all nodes in their group, and one node from
3240
    every other group.
3241

3242
    @warning: This algorithm has a known issue if one node group is much
3243
      smaller than others (e.g. just one node). In such a case all other
3244
      nodes will talk to the single node.
3245

3246
    """
3247
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
3248
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
3249

    
3250
    return (online_nodes,
3251
            dict((name, sorted([i.next() for i in sel]))
3252
                 for name in online_nodes))
3253

    
3254
  def BuildHooksEnv(self):
3255
    """Build hooks env.
3256

3257
    Cluster-Verify hooks just ran in the post phase and their failure makes
3258
    the output be logged in the verify output and the verification to fail.
3259

3260
    """
3261
    env = {
3262
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
3263
      }
3264

    
3265
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
3266
               for node in self.my_node_info.values())
3267

    
3268
    return env
3269

    
3270
  def BuildHooksNodes(self):
3271
    """Build hooks nodes.
3272

3273
    """
3274
    return ([], list(self.my_node_info.keys()))
3275

    
3276
  def Exec(self, feedback_fn):
3277
    """Verify integrity of the node group, performing various test on nodes.
3278

3279
    """
3280
    # This method has too many local variables. pylint: disable=R0914
3281
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
3282

    
3283
    if not self.my_node_uuids:
3284
      # empty node group
3285
      feedback_fn("* Empty node group, skipping verification")
3286
      return True
3287

    
3288
    self.bad = False
3289
    verbose = self.op.verbose
3290
    self._feedback_fn = feedback_fn
3291

    
3292
    vg_name = self.cfg.GetVGName()
3293
    drbd_helper = self.cfg.GetDRBDHelper()
3294
    cluster = self.cfg.GetClusterInfo()
3295
    hypervisors = cluster.enabled_hypervisors
3296
    node_data_list = self.my_node_info.values()
3297

    
3298
    i_non_redundant = [] # Non redundant instances
3299
    i_non_a_balanced = [] # Non auto-balanced instances
3300
    i_offline = 0 # Count of offline instances
3301
    n_offline = 0 # Count of offline nodes
3302
    n_drained = 0 # Count of nodes being drained
3303
    node_vol_should = {}
3304

    
3305
    # FIXME: verify OS list
3306

    
3307
    # File verification
3308
    filemap = ComputeAncillaryFiles(cluster, False)
3309

    
3310
    # do local checksums
3311
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
3312
    master_ip = self.cfg.GetMasterIP()
3313

    
3314
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
3315

    
3316
    user_scripts = []
3317
    if self.cfg.GetUseExternalMipScript():
3318
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
3319

    
3320
    node_verify_param = {
3321
      constants.NV_FILELIST:
3322
        map(vcluster.MakeVirtualPath,
3323
            utils.UniqueSequence(filename
3324
                                 for files in filemap
3325
                                 for filename in files)),
3326
      constants.NV_NODELIST:
3327
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
3328
                                  self.all_node_info.values()),
3329
      constants.NV_HYPERVISOR: hypervisors,
3330
      constants.NV_HVPARAMS:
3331
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
3332
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
3333
                                 for node in node_data_list
3334
                                 if not node.offline],
3335
      constants.NV_INSTANCELIST: hypervisors,
3336
      constants.NV_VERSION: None,
3337
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
3338
      constants.NV_NODESETUP: None,
3339
      constants.NV_TIME: None,
3340
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
3341
      constants.NV_OSLIST: None,
3342
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
3343
      constants.NV_USERSCRIPTS: user_scripts,
3344
      constants.NV_CLIENT_CERT: None,
3345
      }
3346

    
3347
    if vg_name is not None:
3348
      node_verify_param[constants.NV_VGLIST] = None
3349
      node_verify_param[constants.NV_LVLIST] = vg_name
3350
      node_verify_param[constants.NV_PVLIST] = [vg_name]
3351

    
3352
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
3353
      if drbd_helper:
3354
        node_verify_param[constants.NV_DRBDVERSION] = None
3355
        node_verify_param[constants.NV_DRBDLIST] = None
3356
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
3357

    
3358
    if cluster.IsFileStorageEnabled() or \
3359
        cluster.IsSharedFileStorageEnabled():
3360
      # Load file storage paths only from master node
3361
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
3362
        self.cfg.GetMasterNodeName()
3363
      if cluster.IsFileStorageEnabled():
3364
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
3365
          cluster.file_storage_dir
3366
      if cluster.IsSharedFileStorageEnabled():
3367
        node_verify_param[constants.NV_SHARED_FILE_STORAGE_PATH] = \
3368
          cluster.shared_file_storage_dir
3369

    
3370
    # bridge checks
3371
    # FIXME: this needs to be changed per node-group, not cluster-wide
3372
    bridges = set()
3373
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
3374
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3375
      bridges.add(default_nicpp[constants.NIC_LINK])
3376
    for inst_uuid in self.my_inst_info.values():
3377
      for nic in inst_uuid.nics:
3378
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
3379
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3380
          bridges.add(full_nic[constants.NIC_LINK])
3381

    
3382
    if bridges:
3383
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
3384

    
3385
    # Build our expected cluster state
3386
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
3387
                                                 uuid=node.uuid,
3388
                                                 vm_capable=node.vm_capable))
3389
                      for node in node_data_list)
3390

    
3391
    # Gather OOB paths
3392
    oob_paths = []
3393
    for node in self.all_node_info.values():
3394
      path = SupportsOob(self.cfg, node)
3395
      if path and path not in oob_paths:
3396
        oob_paths.append(path)
3397

    
3398
    if oob_paths:
3399
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
3400

    
3401
    for inst_uuid in self.my_inst_uuids:
3402
      instance = self.my_inst_info[inst_uuid]
3403
      if instance.admin_state == constants.ADMINST_OFFLINE:
3404
        i_offline += 1
3405

    
3406
      inst_nodes = self.cfg.GetInstanceNodes(instance.uuid)
3407
      for nuuid in inst_nodes:
3408
        if nuuid not in node_image:
3409
          gnode = self.NodeImage(uuid=nuuid)
3410
          gnode.ghost = (nuuid not in self.all_node_info)
3411
          node_image[nuuid] = gnode
3412

    
3413
      self.cfg.GetInstanceLVsByNode(instance.uuid, lvmap=node_vol_should)
3414

    
3415
      pnode = instance.primary_node
3416
      node_image[pnode].pinst.append(instance.uuid)
3417

    
3418
      for snode in self.cfg.GetInstanceSecondaryNodes(instance.uuid):
3419
        nimg = node_image[snode]
3420
        nimg.sinst.append(instance.uuid)
3421
        if pnode not in nimg.sbp:
3422
          nimg.sbp[pnode] = []
3423
        nimg.sbp[pnode].append(instance.uuid)
3424

    
3425
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3426
                                               self.my_node_info.keys())
3427
    # The value of exclusive_storage should be the same across the group, so if
3428
    # it's True for at least a node, we act as if it were set for all the nodes
3429
    self._exclusive_storage = compat.any(es_flags.values())
3430
    if self._exclusive_storage:
3431
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3432

    
3433
    node_group_uuids = dict(map(lambda n: (n.name, n.group),
3434
                                self.cfg.GetAllNodesInfo().values()))
3435
    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
3436

    
3437
    # At this point, we have the in-memory data structures complete,
3438
    # except for the runtime information, which we'll gather next
3439

    
3440
    # NOTE: Here we lock the configuration for the duration of RPC calls,
3441
    # which means that the cluster configuration changes are blocked during
3442
    # this period.
3443
    # This is something that should be done only exceptionally and only for
3444
    # justified cases!
3445
    # In this case, we need the lock as we can only verify the integrity of
3446
    # configuration files on MCs only if we know nobody else is modifying it.
3447
    # FIXME: The check for integrity of config.data should be moved to
3448
    # WConfD, which is the only one who can otherwise ensure nobody
3449
    # will modify the configuration during the check.
3450
    with self.cfg.GetConfigManager(shared=True):
3451
      feedback_fn("* Gathering information about nodes (%s nodes)" %
3452
                  len(self.my_node_uuids))
3453
      # Due to the way our RPC system works, exact response times cannot be
3454
      # guaranteed (e.g. a broken node could run into a timeout). By keeping
3455
      # the time before and after executing the request, we can at least have
3456
      # a time window.
3457
      nvinfo_starttime = time.time()
3458
      # Get lock on the configuration so that nobody modifies it concurrently.
3459
      # Otherwise it can be modified by other jobs, failing the consistency
3460
      # test.
3461
      # NOTE: This is an exceptional situation, we should otherwise avoid
3462
      # locking the configuration for something but very fast, pure operations.
3463
      cluster_name = self.cfg.GetClusterName()
3464
      hvparams = self.cfg.GetClusterInfo().hvparams
3465
      all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3466
                                             node_verify_param,
3467
                                             cluster_name,
3468
                                             hvparams,
3469
                                             node_group_uuids,
3470
                                             groups_config)
3471
      nvinfo_endtime = time.time()
3472

    
3473
      if self.extra_lv_nodes and vg_name is not None:
3474
        feedback_fn("* Gathering information about extra nodes (%s nodes)" %
3475
                    len(self.extra_lv_nodes))
3476
        extra_lv_nvinfo = \
3477
            self.rpc.call_node_verify(self.extra_lv_nodes,
3478
                                      {constants.NV_LVLIST: vg_name},
3479
                                      self.cfg.GetClusterName(),
3480
                                      self.cfg.GetClusterInfo().hvparams,
3481
                                      node_group_uuids,
3482
                                      groups_config)
3483
      else:
3484
        extra_lv_nvinfo = {}
3485

    
3486
      # If not all nodes are being checked, we need to make sure the master
3487
      # node and a non-checked vm_capable node are in the list.
3488
      absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3489
      if absent_node_uuids:
3490
        vf_nvinfo = all_nvinfo.copy()
3491
        vf_node_info = list(self.my_node_info.values())
3492
        additional_node_uuids = []
3493
        if master_node_uuid not in self.my_node_info:
3494
          additional_node_uuids.append(master_node_uuid)
3495
          vf_node_info.append(self.all_node_info[master_node_uuid])
3496
        # Add the first vm_capable node we find which is not included,
3497
        # excluding the master node (which we already have)
3498
        for node_uuid in absent_node_uuids:
3499
          nodeinfo = self.all_node_info[node_uuid]
3500
          if (nodeinfo.vm_capable and not nodeinfo.offline and
3501
              node_uuid != master_node_uuid):
3502
            additional_node_uuids.append(node_uuid)
3503
            vf_node_info.append(self.all_node_info[node_uuid])
3504
            break
3505
        key = constants.NV_FILELIST
3506

    
3507
        feedback_fn("* Gathering information about the master node")
3508
        vf_nvinfo.update(self.rpc.call_node_verify(
3509
           additional_node_uuids, {key: node_verify_param[key]},
3510
           self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
3511
           node_group_uuids,
3512
           groups_config))
3513
      else:
3514
        vf_nvinfo = all_nvinfo
3515
        vf_node_info = self.my_node_info.values()
3516

    
3517
    all_drbd_map = self.cfg.ComputeDRBDMap()
3518

    
3519
    feedback_fn("* Gathering disk information (%s nodes)" %
3520
                len(self.my_node_uuids))
3521
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3522
                                     self.my_inst_info)
3523

    
3524
    feedback_fn("* Verifying configuration file consistency")
3525

    
3526
    self._VerifyClientCertificates(self.my_node_info.values(), all_nvinfo)
3527

    
3528
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3529

    
3530
    feedback_fn("* Verifying node status")
3531

    
3532
    refos_img = None
3533

    
3534
    for node_i in node_data_list:
3535
      nimg = node_image[node_i.uuid]
3536

    
3537
      if node_i.offline:
3538
        if verbose:
3539
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3540
        n_offline += 1
3541
        continue
3542

    
3543
      if node_i.uuid == master_node_uuid:
3544
        ntype = "master"
3545
      elif node_i.master_candidate:
3546
        ntype = "master candidate"
3547
      elif node_i.drained:
3548
        ntype = "drained"
3549
        n_drained += 1
3550
      else:
3551
        ntype = "regular"
3552
      if verbose:
3553
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3554

    
3555
      msg = all_nvinfo[node_i.uuid].fail_msg
3556
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3557
                    "while contacting node: %s", msg)
3558
      if msg:
3559
        nimg.rpc_fail = True
3560
        continue
3561

    
3562
      nresult = all_nvinfo[node_i.uuid].payload
3563

    
3564
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3565
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3566
      self._VerifyNodeNetwork(node_i, nresult)
3567
      self._VerifyNodeUserScripts(node_i, nresult)
3568
      self._VerifyOob(node_i, nresult)
3569
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3570
                                           node_i.uuid == master_node_uuid)
3571
      self._VerifyFileStoragePaths(node_i, nresult)
3572
      self._VerifySharedFileStoragePaths(node_i, nresult)
3573

    
3574
      if nimg.vm_capable:
3575
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3576
        if constants.DT_DRBD8 in cluster.enabled_disk_templates:
3577
          self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3578
                               all_drbd_map)
3579

    
3580
        if (constants.DT_PLAIN in cluster.enabled_disk_templates) or \
3581
            (constants.DT_DRBD8 in cluster.enabled_disk_templates):
3582
          self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3583
        self._UpdateNodeInstances(node_i, nresult, nimg)
3584
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3585
        self._UpdateNodeOS(node_i, nresult, nimg)
3586

    
3587
        if not nimg.os_fail:
3588
          if refos_img is None:
3589
            refos_img = nimg
3590
          self._VerifyNodeOS(node_i, nimg, refos_img)
3591
        self._VerifyNodeBridges(node_i, nresult, bridges)
3592

    
3593
        # Check whether all running instances are primary for the node. (This
3594
        # can no longer be done from _VerifyInstance below, since some of the
3595
        # wrong instances could be from other node groups.)
3596
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3597

    
3598
        for inst_uuid in non_primary_inst_uuids:
3599
          test = inst_uuid in self.all_inst_info
3600
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3601
                        self.cfg.GetInstanceName(inst_uuid),
3602
                        "instance should not run on node %s", node_i.name)
3603
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3604
                        "node is running unknown instance %s", inst_uuid)
3605

    
3606
    self._VerifyGroupDRBDVersion(all_nvinfo)
3607
    self._VerifyGroupLVM(node_image, vg_name)
3608

    
3609
    for node_uuid, result in extra_lv_nvinfo.items():
3610
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3611
                              node_image[node_uuid], vg_name)
3612

    
3613
    feedback_fn("* Verifying instance status")
3614
    for inst_uuid in self.my_inst_uuids:
3615
      instance = self.my_inst_info[inst_uuid]
3616
      if verbose:
3617
        feedback_fn("* Verifying instance %s" % instance.name)
3618
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3619

    
3620
      # If the instance is non-redundant we cannot survive losing its primary
3621
      # node, so we are not N+1 compliant.
3622
      if instance.disk_template not in constants.DTS_MIRRORED:
3623
        i_non_redundant.append(instance)
3624

    
3625
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3626
        i_non_a_balanced.append(instance)
3627

    
3628
    feedback_fn("* Verifying orphan volumes")
3629
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3630

    
3631
    # We will get spurious "unknown volume" warnings if any node of this group
3632
    # is secondary for an instance whose primary is in another group. To avoid
3633
    # them, we find these instances and add their volumes to node_vol_should.
3634
    for instance in self.all_inst_info.values():
3635
      for secondary in self.cfg.GetInstanceSecondaryNodes(instance.uuid):
3636
        if (secondary in self.my_node_info
3637
            and instance.name not in self.my_inst_info):
3638
          self.cfg.GetInstanceLVsByNode(instance.uuid, lvmap=node_vol_should)
3639
          break
3640

    
3641
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3642

    
3643
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3644
      feedback_fn("* Verifying N+1 Memory redundancy")
3645
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3646

    
3647
    feedback_fn("* Other Notes")
3648
    if i_non_redundant:
3649
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3650
                  % len(i_non_redundant))
3651

    
3652
    if i_non_a_balanced:
3653
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3654
                  % len(i_non_a_balanced))
3655

    
3656
    if i_offline:
3657
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3658

    
3659
    if n_offline:
3660
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3661

    
3662
    if n_drained:
3663
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3664

    
3665
    return not self.bad
3666

    
3667
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3668
    """Analyze the post-hooks' result
3669

3670
    This method analyses the hook result, handles it, and sends some
3671
    nicely-formatted feedback back to the user.
3672

3673
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3674
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3675
    @param hooks_results: the results of the multi-node hooks rpc call
3676
    @param feedback_fn: function used send feedback back to the caller
3677
    @param lu_result: previous Exec result
3678
    @return: the new Exec result, based on the previous result
3679
        and hook results
3680

3681
    """
3682
    # We only really run POST phase hooks, only for non-empty groups,
3683
    # and are only interested in their results
3684
    if not self.my_node_uuids:
3685
      # empty node group
3686
      pass
3687
    elif phase == constants.HOOKS_PHASE_POST:
3688
      # Used to change hooks' output to proper indentation
3689
      feedback_fn("* Hooks Results")
3690
      assert hooks_results, "invalid result from hooks"
3691

    
3692
      for node_name in hooks_results:
3693
        res = hooks_results[node_name]
3694
        msg = res.fail_msg
3695
        test = msg and not res.offline
3696
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3697
                      "Communication failure in hooks execution: %s", msg)
3698
        if test:
3699
          lu_result = False
3700
          continue
3701
        if res.offline:
3702
          # No need to investigate payload if node is offline
3703
          continue
3704
        for script, hkr, output in res.payload:
3705
          test = hkr == constants.HKR_FAIL
3706
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3707
                        "Script %s failed, output:", script)
3708
          if test:
3709
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3710
            feedback_fn("%s" % output)
3711
            lu_result = False
3712

    
3713
    return lu_result
3714

    
3715

    
3716
class LUClusterVerifyDisks(NoHooksLU):
3717
  """Verifies the cluster disks status.
3718

3719
  """
3720
  REQ_BGL = False
3721

    
3722
  def ExpandNames(self):
3723
    self.share_locks = ShareAll()
3724
    self.needed_locks = {
3725
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3726
      }
3727

    
3728
  def Exec(self, feedback_fn):
3729
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3730

    
3731
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3732
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3733
                           for group in group_names])