Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 7352d33b

History | View | Annotate | Download (104.1 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, _QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import _ShareAll, _RunPostHook, \
55
  _ComputeAncillaryFiles, _RedistributeAncillaryFiles, _UploadHelper, \
56
  _GetWantedInstances, _MergeAndVerifyHvState, _MergeAndVerifyDiskState, \
57
  _GetUpdatedIPolicy, _ComputeNewInstanceViolations, _GetUpdatedParams, \
58
  _CheckOSParams, _CheckHVParams, _AdjustCandidatePool, _CheckNodePVs, \
59
  _ComputeIPolicyInstanceViolation, _AnnotateDiskParams, \
60
  _SupportsOob
61

    
62
import ganeti.masterd.instance
63

    
64

    
65
class LUClusterActivateMasterIp(NoHooksLU):
66
  """Activate the master IP on the master node.
67

68
  """
69
  def Exec(self, feedback_fn):
70
    """Activate the master IP.
71

72
    """
73
    master_params = self.cfg.GetMasterNetworkParameters()
74
    ems = self.cfg.GetUseExternalMipScript()
75
    result = self.rpc.call_node_activate_master_ip(master_params.name,
76
                                                   master_params, ems)
77
    result.Raise("Could not activate the master IP")
78

    
79

    
80
class LUClusterDeactivateMasterIp(NoHooksLU):
81
  """Deactivate the master IP on the master node.
82

83
  """
84
  def Exec(self, feedback_fn):
85
    """Deactivate the master IP.
86

87
    """
88
    master_params = self.cfg.GetMasterNetworkParameters()
89
    ems = self.cfg.GetUseExternalMipScript()
90
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
91
                                                     master_params, ems)
92
    result.Raise("Could not deactivate the master IP")
93

    
94

    
95
class LUClusterConfigQuery(NoHooksLU):
96
  """Return configuration values.
97

98
  """
99
  REQ_BGL = False
100

    
101
  def CheckArguments(self):
102
    self.cq = _ClusterQuery(None, self.op.output_fields, False)
103

    
104
  def ExpandNames(self):
105
    self.cq.ExpandNames(self)
106

    
107
  def DeclareLocks(self, level):
108
    self.cq.DeclareLocks(self, level)
109

    
110
  def Exec(self, feedback_fn):
111
    result = self.cq.OldStyleQuery(self)
112

    
113
    assert len(result) == 1
114

    
115
    return result[0]
116

    
117

    
118
class LUClusterDestroy(LogicalUnit):
119
  """Logical unit for destroying the cluster.
120

121
  """
122
  HPATH = "cluster-destroy"
123
  HTYPE = constants.HTYPE_CLUSTER
124

    
125
  def BuildHooksEnv(self):
126
    """Build hooks env.
127

128
    """
129
    return {
130
      "OP_TARGET": self.cfg.GetClusterName(),
131
      }
132

    
133
  def BuildHooksNodes(self):
134
    """Build hooks nodes.
135

136
    """
137
    return ([], [])
138

    
139
  def CheckPrereq(self):
140
    """Check prerequisites.
141

142
    This checks whether the cluster is empty.
143

144
    Any errors are signaled by raising errors.OpPrereqError.
145

146
    """
147
    master = self.cfg.GetMasterNode()
148

    
149
    nodelist = self.cfg.GetNodeList()
150
    if len(nodelist) != 1 or nodelist[0] != master:
151
      raise errors.OpPrereqError("There are still %d node(s) in"
152
                                 " this cluster." % (len(nodelist) - 1),
153
                                 errors.ECODE_INVAL)
154
    instancelist = self.cfg.GetInstanceList()
155
    if instancelist:
156
      raise errors.OpPrereqError("There are still %d instance(s) in"
157
                                 " this cluster." % len(instancelist),
158
                                 errors.ECODE_INVAL)
159

    
160
  def Exec(self, feedback_fn):
161
    """Destroys the cluster.
162

163
    """
164
    master_params = self.cfg.GetMasterNetworkParameters()
165

    
166
    # Run post hooks on master node before it's removed
167
    _RunPostHook(self, master_params.name)
168

    
169
    ems = self.cfg.GetUseExternalMipScript()
170
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
171
                                                     master_params, ems)
172
    if result.fail_msg:
173
      self.LogWarning("Error disabling the master IP address: %s",
174
                      result.fail_msg)
175

    
176
    return master_params.name
177

    
178

    
179
class LUClusterPostInit(LogicalUnit):
180
  """Logical unit for running hooks after cluster initialization.
181

182
  """
183
  HPATH = "cluster-init"
184
  HTYPE = constants.HTYPE_CLUSTER
185

    
186
  def BuildHooksEnv(self):
187
    """Build hooks env.
188

189
    """
190
    return {
191
      "OP_TARGET": self.cfg.GetClusterName(),
192
      }
193

    
194
  def BuildHooksNodes(self):
195
    """Build hooks nodes.
196

197
    """
198
    return ([], [self.cfg.GetMasterNode()])
199

    
200
  def Exec(self, feedback_fn):
201
    """Nothing to do.
202

203
    """
204
    return True
205

    
206

    
207
class _ClusterQuery(_QueryBase):
208
  FIELDS = query.CLUSTER_FIELDS
209

    
210
  #: Do not sort (there is only one item)
211
  SORT_FIELD = None
212

    
213
  def ExpandNames(self, lu):
214
    lu.needed_locks = {}
215

    
216
    # The following variables interact with _QueryBase._GetNames
217
    self.wanted = locking.ALL_SET
218
    self.do_locking = self.use_locking
219

    
220
    if self.do_locking:
221
      raise errors.OpPrereqError("Can not use locking for cluster queries",
222
                                 errors.ECODE_INVAL)
223

    
224
  def DeclareLocks(self, lu, level):
225
    pass
226

    
227
  def _GetQueryData(self, lu):
228
    """Computes the list of nodes and their attributes.
229

230
    """
231
    # Locking is not used
232
    assert not (compat.any(lu.glm.is_owned(level)
233
                           for level in locking.LEVELS
234
                           if level != locking.LEVEL_CLUSTER) or
235
                self.do_locking or self.use_locking)
236

    
237
    if query.CQ_CONFIG in self.requested_data:
238
      cluster = lu.cfg.GetClusterInfo()
239
    else:
240
      cluster = NotImplemented
241

    
242
    if query.CQ_QUEUE_DRAINED in self.requested_data:
243
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
244
    else:
245
      drain_flag = NotImplemented
246

    
247
    if query.CQ_WATCHER_PAUSE in self.requested_data:
248
      master_name = lu.cfg.GetMasterNode()
249

    
250
      result = lu.rpc.call_get_watcher_pause(master_name)
251
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
252
                   master_name)
253

    
254
      watcher_pause = result.payload
255
    else:
256
      watcher_pause = NotImplemented
257

    
258
    return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
259

    
260

    
261
class LUClusterQuery(NoHooksLU):
262
  """Query cluster configuration.
263

264
  """
265
  REQ_BGL = False
266

    
267
  def ExpandNames(self):
268
    self.needed_locks = {}
269

    
270
  def Exec(self, feedback_fn):
271
    """Return cluster config.
272

273
    """
274
    cluster = self.cfg.GetClusterInfo()
275
    os_hvp = {}
276

    
277
    # Filter just for enabled hypervisors
278
    for os_name, hv_dict in cluster.os_hvp.items():
279
      os_hvp[os_name] = {}
280
      for hv_name, hv_params in hv_dict.items():
281
        if hv_name in cluster.enabled_hypervisors:
282
          os_hvp[os_name][hv_name] = hv_params
283

    
284
    # Convert ip_family to ip_version
285
    primary_ip_version = constants.IP4_VERSION
286
    if cluster.primary_ip_family == netutils.IP6Address.family:
287
      primary_ip_version = constants.IP6_VERSION
288

    
289
    result = {
290
      "software_version": constants.RELEASE_VERSION,
291
      "protocol_version": constants.PROTOCOL_VERSION,
292
      "config_version": constants.CONFIG_VERSION,
293
      "os_api_version": max(constants.OS_API_VERSIONS),
294
      "export_version": constants.EXPORT_VERSION,
295
      "architecture": runtime.GetArchInfo(),
296
      "name": cluster.cluster_name,
297
      "master": cluster.master_node,
298
      "default_hypervisor": cluster.primary_hypervisor,
299
      "enabled_hypervisors": cluster.enabled_hypervisors,
300
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301
                        for hypervisor_name in cluster.enabled_hypervisors]),
302
      "os_hvp": os_hvp,
303
      "beparams": cluster.beparams,
304
      "osparams": cluster.osparams,
305
      "ipolicy": cluster.ipolicy,
306
      "nicparams": cluster.nicparams,
307
      "ndparams": cluster.ndparams,
308
      "diskparams": cluster.diskparams,
309
      "candidate_pool_size": cluster.candidate_pool_size,
310
      "master_netdev": cluster.master_netdev,
311
      "master_netmask": cluster.master_netmask,
312
      "use_external_mip_script": cluster.use_external_mip_script,
313
      "volume_group_name": cluster.volume_group_name,
314
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
315
      "file_storage_dir": cluster.file_storage_dir,
316
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
317
      "maintain_node_health": cluster.maintain_node_health,
318
      "ctime": cluster.ctime,
319
      "mtime": cluster.mtime,
320
      "uuid": cluster.uuid,
321
      "tags": list(cluster.GetTags()),
322
      "uid_pool": cluster.uid_pool,
323
      "default_iallocator": cluster.default_iallocator,
324
      "reserved_lvs": cluster.reserved_lvs,
325
      "primary_ip_version": primary_ip_version,
326
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327
      "hidden_os": cluster.hidden_os,
328
      "blacklisted_os": cluster.blacklisted_os,
329
      }
330

    
331
    return result
332

    
333

    
334
class LUClusterRedistConf(NoHooksLU):
335
  """Force the redistribution of cluster configuration.
336

337
  This is a very simple LU.
338

339
  """
340
  REQ_BGL = False
341

    
342
  def ExpandNames(self):
343
    self.needed_locks = {
344
      locking.LEVEL_NODE: locking.ALL_SET,
345
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
346
    }
347
    self.share_locks = _ShareAll()
348

    
349
  def Exec(self, feedback_fn):
350
    """Redistribute the configuration.
351

352
    """
353
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
354
    _RedistributeAncillaryFiles(self)
355

    
356

    
357
class LUClusterRename(LogicalUnit):
358
  """Rename the cluster.
359

360
  """
361
  HPATH = "cluster-rename"
362
  HTYPE = constants.HTYPE_CLUSTER
363

    
364
  def BuildHooksEnv(self):
365
    """Build hooks env.
366

367
    """
368
    return {
369
      "OP_TARGET": self.cfg.GetClusterName(),
370
      "NEW_NAME": self.op.name,
371
      }
372

    
373
  def BuildHooksNodes(self):
374
    """Build hooks nodes.
375

376
    """
377
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
378

    
379
  def CheckPrereq(self):
380
    """Verify that the passed name is a valid one.
381

382
    """
383
    hostname = netutils.GetHostname(name=self.op.name,
384
                                    family=self.cfg.GetPrimaryIPFamily())
385

    
386
    new_name = hostname.name
387
    self.ip = new_ip = hostname.ip
388
    old_name = self.cfg.GetClusterName()
389
    old_ip = self.cfg.GetMasterIP()
390
    if new_name == old_name and new_ip == old_ip:
391
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
392
                                 " cluster has changed",
393
                                 errors.ECODE_INVAL)
394
    if new_ip != old_ip:
395
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
396
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
397
                                   " reachable on the network" %
398
                                   new_ip, errors.ECODE_NOTUNIQUE)
399

    
400
    self.op.name = new_name
401

    
402
  def Exec(self, feedback_fn):
403
    """Rename the cluster.
404

405
    """
406
    clustername = self.op.name
407
    new_ip = self.ip
408

    
409
    # shutdown the master IP
410
    master_params = self.cfg.GetMasterNetworkParameters()
411
    ems = self.cfg.GetUseExternalMipScript()
412
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
413
                                                     master_params, ems)
414
    result.Raise("Could not disable the master role")
415

    
416
    try:
417
      cluster = self.cfg.GetClusterInfo()
418
      cluster.cluster_name = clustername
419
      cluster.master_ip = new_ip
420
      self.cfg.Update(cluster, feedback_fn)
421

    
422
      # update the known hosts file
423
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
424
      node_list = self.cfg.GetOnlineNodeList()
425
      try:
426
        node_list.remove(master_params.name)
427
      except ValueError:
428
        pass
429
      _UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
430
    finally:
431
      master_params.ip = new_ip
432
      result = self.rpc.call_node_activate_master_ip(master_params.name,
433
                                                     master_params, ems)
434
      msg = result.fail_msg
435
      if msg:
436
        self.LogWarning("Could not re-enable the master role on"
437
                        " the master, please restart manually: %s", msg)
438

    
439
    return clustername
440

    
441

    
442
class LUClusterRepairDiskSizes(NoHooksLU):
443
  """Verifies the cluster disks sizes.
444

445
  """
446
  REQ_BGL = False
447

    
448
  def ExpandNames(self):
449
    if self.op.instances:
450
      self.wanted_names = _GetWantedInstances(self, self.op.instances)
451
      # Not getting the node allocation lock as only a specific set of
452
      # instances (and their nodes) is going to be acquired
453
      self.needed_locks = {
454
        locking.LEVEL_NODE_RES: [],
455
        locking.LEVEL_INSTANCE: self.wanted_names,
456
        }
457
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
458
    else:
459
      self.wanted_names = None
460
      self.needed_locks = {
461
        locking.LEVEL_NODE_RES: locking.ALL_SET,
462
        locking.LEVEL_INSTANCE: locking.ALL_SET,
463

    
464
        # This opcode is acquires the node locks for all instances
465
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
466
        }
467

    
468
    self.share_locks = {
469
      locking.LEVEL_NODE_RES: 1,
470
      locking.LEVEL_INSTANCE: 0,
471
      locking.LEVEL_NODE_ALLOC: 1,
472
      }
473

    
474
  def DeclareLocks(self, level):
475
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
476
      self._LockInstancesNodes(primary_only=True, level=level)
477

    
478
  def CheckPrereq(self):
479
    """Check prerequisites.
480

481
    This only checks the optional instance list against the existing names.
482

483
    """
484
    if self.wanted_names is None:
485
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
486

    
487
    self.wanted_instances = \
488
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
489

    
490
  def _EnsureChildSizes(self, disk):
491
    """Ensure children of the disk have the needed disk size.
492

493
    This is valid mainly for DRBD8 and fixes an issue where the
494
    children have smaller disk size.
495

496
    @param disk: an L{ganeti.objects.Disk} object
497

498
    """
499
    if disk.dev_type == constants.LD_DRBD8:
500
      assert disk.children, "Empty children for DRBD8?"
501
      fchild = disk.children[0]
502
      mismatch = fchild.size < disk.size
503
      if mismatch:
504
        self.LogInfo("Child disk has size %d, parent %d, fixing",
505
                     fchild.size, disk.size)
506
        fchild.size = disk.size
507

    
508
      # and we recurse on this child only, not on the metadev
509
      return self._EnsureChildSizes(fchild) or mismatch
510
    else:
511
      return False
512

    
513
  def Exec(self, feedback_fn):
514
    """Verify the size of cluster disks.
515

516
    """
517
    # TODO: check child disks too
518
    # TODO: check differences in size between primary/secondary nodes
519
    per_node_disks = {}
520
    for instance in self.wanted_instances:
521
      pnode = instance.primary_node
522
      if pnode not in per_node_disks:
523
        per_node_disks[pnode] = []
524
      for idx, disk in enumerate(instance.disks):
525
        per_node_disks[pnode].append((instance, idx, disk))
526

    
527
    assert not (frozenset(per_node_disks.keys()) -
528
                self.owned_locks(locking.LEVEL_NODE_RES)), \
529
      "Not owning correct locks"
530
    assert not self.owned_locks(locking.LEVEL_NODE)
531

    
532
    changed = []
533
    for node, dskl in per_node_disks.items():
534
      newl = [v[2].Copy() for v in dskl]
535
      for dsk in newl:
536
        self.cfg.SetDiskID(dsk, node)
537
      result = self.rpc.call_blockdev_getsize(node, newl)
538
      if result.fail_msg:
539
        self.LogWarning("Failure in blockdev_getsize call to node"
540
                        " %s, ignoring", node)
541
        continue
542
      if len(result.payload) != len(dskl):
543
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
544
                        " result.payload=%s", node, len(dskl), result.payload)
545
        self.LogWarning("Invalid result from node %s, ignoring node results",
546
                        node)
547
        continue
548
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
549
        if size is None:
550
          self.LogWarning("Disk %d of instance %s did not return size"
551
                          " information, ignoring", idx, instance.name)
552
          continue
553
        if not isinstance(size, (int, long)):
554
          self.LogWarning("Disk %d of instance %s did not return valid"
555
                          " size information, ignoring", idx, instance.name)
556
          continue
557
        size = size >> 20
558
        if size != disk.size:
559
          self.LogInfo("Disk %d of instance %s has mismatched size,"
560
                       " correcting: recorded %d, actual %d", idx,
561
                       instance.name, disk.size, size)
562
          disk.size = size
563
          self.cfg.Update(instance, feedback_fn)
564
          changed.append((instance.name, idx, size))
565
        if self._EnsureChildSizes(disk):
566
          self.cfg.Update(instance, feedback_fn)
567
          changed.append((instance.name, idx, disk.size))
568
    return changed
569

    
570

    
571
def _ValidateNetmask(cfg, netmask):
572
  """Checks if a netmask is valid.
573

574
  @type cfg: L{config.ConfigWriter}
575
  @param cfg: The cluster configuration
576
  @type netmask: int
577
  @param netmask: the netmask to be verified
578
  @raise errors.OpPrereqError: if the validation fails
579

580
  """
581
  ip_family = cfg.GetPrimaryIPFamily()
582
  try:
583
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
584
  except errors.ProgrammerError:
585
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
586
                               ip_family, errors.ECODE_INVAL)
587
  if not ipcls.ValidateNetmask(netmask):
588
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
589
                               (netmask), errors.ECODE_INVAL)
590

    
591

    
592
class LUClusterSetParams(LogicalUnit):
593
  """Change the parameters of the cluster.
594

595
  """
596
  HPATH = "cluster-modify"
597
  HTYPE = constants.HTYPE_CLUSTER
598
  REQ_BGL = False
599

    
600
  def CheckArguments(self):
601
    """Check parameters
602

603
    """
604
    if self.op.uid_pool:
605
      uidpool.CheckUidPool(self.op.uid_pool)
606

    
607
    if self.op.add_uids:
608
      uidpool.CheckUidPool(self.op.add_uids)
609

    
610
    if self.op.remove_uids:
611
      uidpool.CheckUidPool(self.op.remove_uids)
612

    
613
    if self.op.master_netmask is not None:
614
      _ValidateNetmask(self.cfg, self.op.master_netmask)
615

    
616
    if self.op.diskparams:
617
      for dt_params in self.op.diskparams.values():
618
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
619
      try:
620
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
621
      except errors.OpPrereqError, err:
622
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
623
                                   errors.ECODE_INVAL)
624

    
625
  def ExpandNames(self):
626
    # FIXME: in the future maybe other cluster params won't require checking on
627
    # all nodes to be modified.
628
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
629
    # resource locks the right thing, shouldn't it be the BGL instead?
630
    self.needed_locks = {
631
      locking.LEVEL_NODE: locking.ALL_SET,
632
      locking.LEVEL_INSTANCE: locking.ALL_SET,
633
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
634
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
635
    }
636
    self.share_locks = _ShareAll()
637

    
638
  def BuildHooksEnv(self):
639
    """Build hooks env.
640

641
    """
642
    return {
643
      "OP_TARGET": self.cfg.GetClusterName(),
644
      "NEW_VG_NAME": self.op.vg_name,
645
      }
646

    
647
  def BuildHooksNodes(self):
648
    """Build hooks nodes.
649

650
    """
651
    mn = self.cfg.GetMasterNode()
652
    return ([mn], [mn])
653

    
654
  def CheckPrereq(self):
655
    """Check prerequisites.
656

657
    This checks whether the given params don't conflict and
658
    if the given volume group is valid.
659

660
    """
661
    if self.op.vg_name is not None and not self.op.vg_name:
662
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
663
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
664
                                   " instances exist", errors.ECODE_INVAL)
665

    
666
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
667
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
668
        raise errors.OpPrereqError("Cannot disable drbd helper while"
669
                                   " drbd-based instances exist",
670
                                   errors.ECODE_INVAL)
671

    
672
    node_list = self.owned_locks(locking.LEVEL_NODE)
673

    
674
    vm_capable_nodes = [node.name
675
                        for node in self.cfg.GetAllNodesInfo().values()
676
                        if node.name in node_list and node.vm_capable]
677

    
678
    # if vg_name not None, checks given volume group on all nodes
679
    if self.op.vg_name:
680
      vglist = self.rpc.call_vg_list(vm_capable_nodes)
681
      for node in vm_capable_nodes:
682
        msg = vglist[node].fail_msg
683
        if msg:
684
          # ignoring down node
685
          self.LogWarning("Error while gathering data on node %s"
686
                          " (ignoring node): %s", node, msg)
687
          continue
688
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
689
                                              self.op.vg_name,
690
                                              constants.MIN_VG_SIZE)
691
        if vgstatus:
692
          raise errors.OpPrereqError("Error on node '%s': %s" %
693
                                     (node, vgstatus), errors.ECODE_ENVIRON)
694

    
695
    if self.op.drbd_helper:
696
      # checks given drbd helper on all nodes
697
      helpers = self.rpc.call_drbd_helper(node_list)
698
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
699
        if ninfo.offline:
700
          self.LogInfo("Not checking drbd helper on offline node %s", node)
701
          continue
702
        msg = helpers[node].fail_msg
703
        if msg:
704
          raise errors.OpPrereqError("Error checking drbd helper on node"
705
                                     " '%s': %s" % (node, msg),
706
                                     errors.ECODE_ENVIRON)
707
        node_helper = helpers[node].payload
708
        if node_helper != self.op.drbd_helper:
709
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
710
                                     (node, node_helper), errors.ECODE_ENVIRON)
711

    
712
    self.cluster = cluster = self.cfg.GetClusterInfo()
713
    # validate params changes
714
    if self.op.beparams:
715
      objects.UpgradeBeParams(self.op.beparams)
716
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
717
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
718

    
719
    if self.op.ndparams:
720
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
721
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
722

    
723
      # TODO: we need a more general way to handle resetting
724
      # cluster-level parameters to default values
725
      if self.new_ndparams["oob_program"] == "":
726
        self.new_ndparams["oob_program"] = \
727
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
728

    
729
    if self.op.hv_state:
730
      new_hv_state = _MergeAndVerifyHvState(self.op.hv_state,
731
                                            self.cluster.hv_state_static)
732
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
733
                               for hv, values in new_hv_state.items())
734

    
735
    if self.op.disk_state:
736
      new_disk_state = _MergeAndVerifyDiskState(self.op.disk_state,
737
                                                self.cluster.disk_state_static)
738
      self.new_disk_state = \
739
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
740
                            for name, values in svalues.items()))
741
             for storage, svalues in new_disk_state.items())
742

    
743
    if self.op.ipolicy:
744
      self.new_ipolicy = _GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
745
                                            group_policy=False)
746

    
747
      all_instances = self.cfg.GetAllInstancesInfo().values()
748
      violations = set()
749
      for group in self.cfg.GetAllNodeGroupsInfo().values():
750
        instances = frozenset([inst for inst in all_instances
751
                               if compat.any(node in group.members
752
                                             for node in inst.all_nodes)])
753
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
754
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
755
        new = _ComputeNewInstanceViolations(ipol,
756
                                            new_ipolicy, instances, self.cfg)
757
        if new:
758
          violations.update(new)
759

    
760
      if violations:
761
        self.LogWarning("After the ipolicy change the following instances"
762
                        " violate them: %s",
763
                        utils.CommaJoin(utils.NiceSort(violations)))
764

    
765
    if self.op.nicparams:
766
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
767
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
768
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
769
      nic_errors = []
770

    
771
      # check all instances for consistency
772
      for instance in self.cfg.GetAllInstancesInfo().values():
773
        for nic_idx, nic in enumerate(instance.nics):
774
          params_copy = copy.deepcopy(nic.nicparams)
775
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
776

    
777
          # check parameter syntax
778
          try:
779
            objects.NIC.CheckParameterSyntax(params_filled)
780
          except errors.ConfigurationError, err:
781
            nic_errors.append("Instance %s, nic/%d: %s" %
782
                              (instance.name, nic_idx, err))
783

    
784
          # if we're moving instances to routed, check that they have an ip
785
          target_mode = params_filled[constants.NIC_MODE]
786
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
787
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
788
                              " address" % (instance.name, nic_idx))
789
      if nic_errors:
790
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
791
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
792

    
793
    # hypervisor list/parameters
794
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
795
    if self.op.hvparams:
796
      for hv_name, hv_dict in self.op.hvparams.items():
797
        if hv_name not in self.new_hvparams:
798
          self.new_hvparams[hv_name] = hv_dict
799
        else:
800
          self.new_hvparams[hv_name].update(hv_dict)
801

    
802
    # disk template parameters
803
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
804
    if self.op.diskparams:
805
      for dt_name, dt_params in self.op.diskparams.items():
806
        if dt_name not in self.op.diskparams:
807
          self.new_diskparams[dt_name] = dt_params
808
        else:
809
          self.new_diskparams[dt_name].update(dt_params)
810

    
811
    # os hypervisor parameters
812
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
813
    if self.op.os_hvp:
814
      for os_name, hvs in self.op.os_hvp.items():
815
        if os_name not in self.new_os_hvp:
816
          self.new_os_hvp[os_name] = hvs
817
        else:
818
          for hv_name, hv_dict in hvs.items():
819
            if hv_dict is None:
820
              # Delete if it exists
821
              self.new_os_hvp[os_name].pop(hv_name, None)
822
            elif hv_name not in self.new_os_hvp[os_name]:
823
              self.new_os_hvp[os_name][hv_name] = hv_dict
824
            else:
825
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
826

    
827
    # os parameters
828
    self.new_osp = objects.FillDict(cluster.osparams, {})
829
    if self.op.osparams:
830
      for os_name, osp in self.op.osparams.items():
831
        if os_name not in self.new_osp:
832
          self.new_osp[os_name] = {}
833

    
834
        self.new_osp[os_name] = _GetUpdatedParams(self.new_osp[os_name], osp,
835
                                                  use_none=True)
836

    
837
        if not self.new_osp[os_name]:
838
          # we removed all parameters
839
          del self.new_osp[os_name]
840
        else:
841
          # check the parameter validity (remote check)
842
          _CheckOSParams(self, False, [self.cfg.GetMasterNode()],
843
                         os_name, self.new_osp[os_name])
844

    
845
    # changes to the hypervisor list
846
    if self.op.enabled_hypervisors is not None:
847
      self.hv_list = self.op.enabled_hypervisors
848
      for hv in self.hv_list:
849
        # if the hypervisor doesn't already exist in the cluster
850
        # hvparams, we initialize it to empty, and then (in both
851
        # cases) we make sure to fill the defaults, as we might not
852
        # have a complete defaults list if the hypervisor wasn't
853
        # enabled before
854
        if hv not in new_hvp:
855
          new_hvp[hv] = {}
856
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
857
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
858
    else:
859
      self.hv_list = cluster.enabled_hypervisors
860

    
861
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
862
      # either the enabled list has changed, or the parameters have, validate
863
      for hv_name, hv_params in self.new_hvparams.items():
864
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
865
            (self.op.enabled_hypervisors and
866
             hv_name in self.op.enabled_hypervisors)):
867
          # either this is a new hypervisor, or its parameters have changed
868
          hv_class = hypervisor.GetHypervisorClass(hv_name)
869
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
870
          hv_class.CheckParameterSyntax(hv_params)
871
          _CheckHVParams(self, node_list, hv_name, hv_params)
872

    
873
    self._CheckDiskTemplateConsistency()
874

    
875
    if self.op.os_hvp:
876
      # no need to check any newly-enabled hypervisors, since the
877
      # defaults have already been checked in the above code-block
878
      for os_name, os_hvp in self.new_os_hvp.items():
879
        for hv_name, hv_params in os_hvp.items():
880
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
881
          # we need to fill in the new os_hvp on top of the actual hv_p
882
          cluster_defaults = self.new_hvparams.get(hv_name, {})
883
          new_osp = objects.FillDict(cluster_defaults, hv_params)
884
          hv_class = hypervisor.GetHypervisorClass(hv_name)
885
          hv_class.CheckParameterSyntax(new_osp)
886
          _CheckHVParams(self, node_list, hv_name, new_osp)
887

    
888
    if self.op.default_iallocator:
889
      alloc_script = utils.FindFile(self.op.default_iallocator,
890
                                    constants.IALLOCATOR_SEARCH_PATH,
891
                                    os.path.isfile)
892
      if alloc_script is None:
893
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
894
                                   " specified" % self.op.default_iallocator,
895
                                   errors.ECODE_INVAL)
896

    
897
  def _CheckDiskTemplateConsistency(self):
898
    """Check whether the disk templates that are going to be disabled
899
       are still in use by some instances.
900

901
    """
902
    if self.op.enabled_disk_templates:
903
      cluster = self.cfg.GetClusterInfo()
904
      instances = self.cfg.GetAllInstancesInfo()
905

    
906
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
907
        - set(self.op.enabled_disk_templates)
908
      for instance in instances.itervalues():
909
        if instance.disk_template in disk_templates_to_remove:
910
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
911
                                     " because instance '%s' is using it." %
912
                                     (instance.disk_template, instance.name))
913

    
914
  def Exec(self, feedback_fn):
915
    """Change the parameters of the cluster.
916

917
    """
918
    if self.op.vg_name is not None:
919
      new_volume = self.op.vg_name
920
      if not new_volume:
921
        new_volume = None
922
      if new_volume != self.cfg.GetVGName():
923
        self.cfg.SetVGName(new_volume)
924
      else:
925
        feedback_fn("Cluster LVM configuration already in desired"
926
                    " state, not changing")
927
    if self.op.drbd_helper is not None:
928
      new_helper = self.op.drbd_helper
929
      if not new_helper:
930
        new_helper = None
931
      if new_helper != self.cfg.GetDRBDHelper():
932
        self.cfg.SetDRBDHelper(new_helper)
933
      else:
934
        feedback_fn("Cluster DRBD helper already in desired state,"
935
                    " not changing")
936
    if self.op.hvparams:
937
      self.cluster.hvparams = self.new_hvparams
938
    if self.op.os_hvp:
939
      self.cluster.os_hvp = self.new_os_hvp
940
    if self.op.enabled_hypervisors is not None:
941
      self.cluster.hvparams = self.new_hvparams
942
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
943
    if self.op.enabled_disk_templates:
944
      self.cluster.enabled_disk_templates = \
945
        list(set(self.op.enabled_disk_templates))
946
    if self.op.beparams:
947
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
948
    if self.op.nicparams:
949
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
950
    if self.op.ipolicy:
951
      self.cluster.ipolicy = self.new_ipolicy
952
    if self.op.osparams:
953
      self.cluster.osparams = self.new_osp
954
    if self.op.ndparams:
955
      self.cluster.ndparams = self.new_ndparams
956
    if self.op.diskparams:
957
      self.cluster.diskparams = self.new_diskparams
958
    if self.op.hv_state:
959
      self.cluster.hv_state_static = self.new_hv_state
960
    if self.op.disk_state:
961
      self.cluster.disk_state_static = self.new_disk_state
962

    
963
    if self.op.candidate_pool_size is not None:
964
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
965
      # we need to update the pool size here, otherwise the save will fail
966
      _AdjustCandidatePool(self, [])
967

    
968
    if self.op.maintain_node_health is not None:
969
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
970
        feedback_fn("Note: CONFD was disabled at build time, node health"
971
                    " maintenance is not useful (still enabling it)")
972
      self.cluster.maintain_node_health = self.op.maintain_node_health
973

    
974
    if self.op.prealloc_wipe_disks is not None:
975
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
976

    
977
    if self.op.add_uids is not None:
978
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
979

    
980
    if self.op.remove_uids is not None:
981
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
982

    
983
    if self.op.uid_pool is not None:
984
      self.cluster.uid_pool = self.op.uid_pool
985

    
986
    if self.op.default_iallocator is not None:
987
      self.cluster.default_iallocator = self.op.default_iallocator
988

    
989
    if self.op.reserved_lvs is not None:
990
      self.cluster.reserved_lvs = self.op.reserved_lvs
991

    
992
    if self.op.use_external_mip_script is not None:
993
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
994

    
995
    def helper_os(aname, mods, desc):
996
      desc += " OS list"
997
      lst = getattr(self.cluster, aname)
998
      for key, val in mods:
999
        if key == constants.DDM_ADD:
1000
          if val in lst:
1001
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1002
          else:
1003
            lst.append(val)
1004
        elif key == constants.DDM_REMOVE:
1005
          if val in lst:
1006
            lst.remove(val)
1007
          else:
1008
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1009
        else:
1010
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1011

    
1012
    if self.op.hidden_os:
1013
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1014

    
1015
    if self.op.blacklisted_os:
1016
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1017

    
1018
    if self.op.master_netdev:
1019
      master_params = self.cfg.GetMasterNetworkParameters()
1020
      ems = self.cfg.GetUseExternalMipScript()
1021
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1022
                  self.cluster.master_netdev)
1023
      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1024
                                                       master_params, ems)
1025
      result.Raise("Could not disable the master ip")
1026
      feedback_fn("Changing master_netdev from %s to %s" %
1027
                  (master_params.netdev, self.op.master_netdev))
1028
      self.cluster.master_netdev = self.op.master_netdev
1029

    
1030
    if self.op.master_netmask:
1031
      master_params = self.cfg.GetMasterNetworkParameters()
1032
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1033
      result = self.rpc.call_node_change_master_netmask(master_params.name,
1034
                                                        master_params.netmask,
1035
                                                        self.op.master_netmask,
1036
                                                        master_params.ip,
1037
                                                        master_params.netdev)
1038
      if result.fail_msg:
1039
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
1040
        feedback_fn(msg)
1041

    
1042
      self.cluster.master_netmask = self.op.master_netmask
1043

    
1044
    self.cfg.Update(self.cluster, feedback_fn)
1045

    
1046
    if self.op.master_netdev:
1047
      master_params = self.cfg.GetMasterNetworkParameters()
1048
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1049
                  self.op.master_netdev)
1050
      ems = self.cfg.GetUseExternalMipScript()
1051
      result = self.rpc.call_node_activate_master_ip(master_params.name,
1052
                                                     master_params, ems)
1053
      if result.fail_msg:
1054
        self.LogWarning("Could not re-enable the master ip on"
1055
                        " the master, please restart manually: %s",
1056
                        result.fail_msg)
1057

    
1058

    
1059
class LUClusterVerify(NoHooksLU):
1060
  """Submits all jobs necessary to verify the cluster.
1061

1062
  """
1063
  REQ_BGL = False
1064

    
1065
  def ExpandNames(self):
1066
    self.needed_locks = {}
1067

    
1068
  def Exec(self, feedback_fn):
1069
    jobs = []
1070

    
1071
    if self.op.group_name:
1072
      groups = [self.op.group_name]
1073
      depends_fn = lambda: None
1074
    else:
1075
      groups = self.cfg.GetNodeGroupList()
1076

    
1077
      # Verify global configuration
1078
      jobs.append([
1079
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1080
        ])
1081

    
1082
      # Always depend on global verification
1083
      depends_fn = lambda: [(-len(jobs), [])]
1084

    
1085
    jobs.extend(
1086
      [opcodes.OpClusterVerifyGroup(group_name=group,
1087
                                    ignore_errors=self.op.ignore_errors,
1088
                                    depends=depends_fn())]
1089
      for group in groups)
1090

    
1091
    # Fix up all parameters
1092
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1093
      op.debug_simulate_errors = self.op.debug_simulate_errors
1094
      op.verbose = self.op.verbose
1095
      op.error_codes = self.op.error_codes
1096
      try:
1097
        op.skip_checks = self.op.skip_checks
1098
      except AttributeError:
1099
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1100

    
1101
    return ResultWithJobs(jobs)
1102

    
1103

    
1104
class _VerifyErrors(object):
1105
  """Mix-in for cluster/group verify LUs.
1106

1107
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1108
  self.op and self._feedback_fn to be available.)
1109

1110
  """
1111

    
1112
  ETYPE_FIELD = "code"
1113
  ETYPE_ERROR = "ERROR"
1114
  ETYPE_WARNING = "WARNING"
1115

    
1116
  def _Error(self, ecode, item, msg, *args, **kwargs):
1117
    """Format an error message.
1118

1119
    Based on the opcode's error_codes parameter, either format a
1120
    parseable error code, or a simpler error string.
1121

1122
    This must be called only from Exec and functions called from Exec.
1123

1124
    """
1125
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1126
    itype, etxt, _ = ecode
1127
    # If the error code is in the list of ignored errors, demote the error to a
1128
    # warning
1129
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1130
      ltype = self.ETYPE_WARNING
1131
    # first complete the msg
1132
    if args:
1133
      msg = msg % args
1134
    # then format the whole message
1135
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1136
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1137
    else:
1138
      if item:
1139
        item = " " + item
1140
      else:
1141
        item = ""
1142
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1143
    # and finally report it via the feedback_fn
1144
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1145
    # do not mark the operation as failed for WARN cases only
1146
    if ltype == self.ETYPE_ERROR:
1147
      self.bad = True
1148

    
1149
  def _ErrorIf(self, cond, *args, **kwargs):
1150
    """Log an error message if the passed condition is True.
1151

1152
    """
1153
    if (bool(cond)
1154
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1155
      self._Error(*args, **kwargs)
1156

    
1157

    
1158
def _VerifyCertificate(filename):
1159
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1160

1161
  @type filename: string
1162
  @param filename: Path to PEM file
1163

1164
  """
1165
  try:
1166
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1167
                                           utils.ReadFile(filename))
1168
  except Exception, err: # pylint: disable=W0703
1169
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1170
            "Failed to load X509 certificate %s: %s" % (filename, err))
1171

    
1172
  (errcode, msg) = \
1173
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1174
                                constants.SSL_CERT_EXPIRATION_ERROR)
1175

    
1176
  if msg:
1177
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1178
  else:
1179
    fnamemsg = None
1180

    
1181
  if errcode is None:
1182
    return (None, fnamemsg)
1183
  elif errcode == utils.CERT_WARNING:
1184
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1185
  elif errcode == utils.CERT_ERROR:
1186
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1187

    
1188
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1189

    
1190

    
1191
def _GetAllHypervisorParameters(cluster, instances):
1192
  """Compute the set of all hypervisor parameters.
1193

1194
  @type cluster: L{objects.Cluster}
1195
  @param cluster: the cluster object
1196
  @param instances: list of L{objects.Instance}
1197
  @param instances: additional instances from which to obtain parameters
1198
  @rtype: list of (origin, hypervisor, parameters)
1199
  @return: a list with all parameters found, indicating the hypervisor they
1200
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1201

1202
  """
1203
  hvp_data = []
1204

    
1205
  for hv_name in cluster.enabled_hypervisors:
1206
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1207

    
1208
  for os_name, os_hvp in cluster.os_hvp.items():
1209
    for hv_name, hv_params in os_hvp.items():
1210
      if hv_params:
1211
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1212
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1213

    
1214
  # TODO: collapse identical parameter values in a single one
1215
  for instance in instances:
1216
    if instance.hvparams:
1217
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1218
                       cluster.FillHV(instance)))
1219

    
1220
  return hvp_data
1221

    
1222

    
1223
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1224
  """Verifies the cluster config.
1225

1226
  """
1227
  REQ_BGL = False
1228

    
1229
  def _VerifyHVP(self, hvp_data):
1230
    """Verifies locally the syntax of the hypervisor parameters.
1231

1232
    """
1233
    for item, hv_name, hv_params in hvp_data:
1234
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1235
             (item, hv_name))
1236
      try:
1237
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1238
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1239
        hv_class.CheckParameterSyntax(hv_params)
1240
      except errors.GenericError, err:
1241
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1242

    
1243
  def ExpandNames(self):
1244
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1245
    self.share_locks = _ShareAll()
1246

    
1247
  def CheckPrereq(self):
1248
    """Check prerequisites.
1249

1250
    """
1251
    # Retrieve all information
1252
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1253
    self.all_node_info = self.cfg.GetAllNodesInfo()
1254
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1255

    
1256
  def Exec(self, feedback_fn):
1257
    """Verify integrity of cluster, performing various test on nodes.
1258

1259
    """
1260
    self.bad = False
1261
    self._feedback_fn = feedback_fn
1262

    
1263
    feedback_fn("* Verifying cluster config")
1264

    
1265
    for msg in self.cfg.VerifyConfig():
1266
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1267

    
1268
    feedback_fn("* Verifying cluster certificate files")
1269

    
1270
    for cert_filename in pathutils.ALL_CERT_FILES:
1271
      (errcode, msg) = _VerifyCertificate(cert_filename)
1272
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1273

    
1274
    feedback_fn("* Verifying hypervisor parameters")
1275

    
1276
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1277
                                                self.all_inst_info.values()))
1278

    
1279
    feedback_fn("* Verifying all nodes belong to an existing group")
1280

    
1281
    # We do this verification here because, should this bogus circumstance
1282
    # occur, it would never be caught by VerifyGroup, which only acts on
1283
    # nodes/instances reachable from existing node groups.
1284

    
1285
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1286
                         if node.group not in self.all_group_info)
1287

    
1288
    dangling_instances = {}
1289
    no_node_instances = []
1290

    
1291
    for inst in self.all_inst_info.values():
1292
      if inst.primary_node in dangling_nodes:
1293
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1294
      elif inst.primary_node not in self.all_node_info:
1295
        no_node_instances.append(inst.name)
1296

    
1297
    pretty_dangling = [
1298
        "%s (%s)" %
1299
        (node.name,
1300
         utils.CommaJoin(dangling_instances.get(node.name,
1301
                                                ["no instances"])))
1302
        for node in dangling_nodes]
1303

    
1304
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1305
                  None,
1306
                  "the following nodes (and their instances) belong to a non"
1307
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1308

    
1309
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1310
                  None,
1311
                  "the following instances have a non-existing primary-node:"
1312
                  " %s", utils.CommaJoin(no_node_instances))
1313

    
1314
    return not self.bad
1315

    
1316

    
1317
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1318
  """Verifies the status of a node group.
1319

1320
  """
1321
  HPATH = "cluster-verify"
1322
  HTYPE = constants.HTYPE_CLUSTER
1323
  REQ_BGL = False
1324

    
1325
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1326

    
1327
  class NodeImage(object):
1328
    """A class representing the logical and physical status of a node.
1329

1330
    @type name: string
1331
    @ivar name: the node name to which this object refers
1332
    @ivar volumes: a structure as returned from
1333
        L{ganeti.backend.GetVolumeList} (runtime)
1334
    @ivar instances: a list of running instances (runtime)
1335
    @ivar pinst: list of configured primary instances (config)
1336
    @ivar sinst: list of configured secondary instances (config)
1337
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1338
        instances for which this node is secondary (config)
1339
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1340
    @ivar dfree: free disk, as reported by the node (runtime)
1341
    @ivar offline: the offline status (config)
1342
    @type rpc_fail: boolean
1343
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1344
        not whether the individual keys were correct) (runtime)
1345
    @type lvm_fail: boolean
1346
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1347
    @type hyp_fail: boolean
1348
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1349
    @type ghost: boolean
1350
    @ivar ghost: whether this is a known node or not (config)
1351
    @type os_fail: boolean
1352
    @ivar os_fail: whether the RPC call didn't return valid OS data
1353
    @type oslist: list
1354
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1355
    @type vm_capable: boolean
1356
    @ivar vm_capable: whether the node can host instances
1357
    @type pv_min: float
1358
    @ivar pv_min: size in MiB of the smallest PVs
1359
    @type pv_max: float
1360
    @ivar pv_max: size in MiB of the biggest PVs
1361

1362
    """
1363
    def __init__(self, offline=False, name=None, vm_capable=True):
1364
      self.name = name
1365
      self.volumes = {}
1366
      self.instances = []
1367
      self.pinst = []
1368
      self.sinst = []
1369
      self.sbp = {}
1370
      self.mfree = 0
1371
      self.dfree = 0
1372
      self.offline = offline
1373
      self.vm_capable = vm_capable
1374
      self.rpc_fail = False
1375
      self.lvm_fail = False
1376
      self.hyp_fail = False
1377
      self.ghost = False
1378
      self.os_fail = False
1379
      self.oslist = {}
1380
      self.pv_min = None
1381
      self.pv_max = None
1382

    
1383
  def ExpandNames(self):
1384
    # This raises errors.OpPrereqError on its own:
1385
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1386

    
1387
    # Get instances in node group; this is unsafe and needs verification later
1388
    inst_names = \
1389
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1390

    
1391
    self.needed_locks = {
1392
      locking.LEVEL_INSTANCE: inst_names,
1393
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1394
      locking.LEVEL_NODE: [],
1395

    
1396
      # This opcode is run by watcher every five minutes and acquires all nodes
1397
      # for a group. It doesn't run for a long time, so it's better to acquire
1398
      # the node allocation lock as well.
1399
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1400
      }
1401

    
1402
    self.share_locks = _ShareAll()
1403

    
1404
  def DeclareLocks(self, level):
1405
    if level == locking.LEVEL_NODE:
1406
      # Get members of node group; this is unsafe and needs verification later
1407
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1408

    
1409
      all_inst_info = self.cfg.GetAllInstancesInfo()
1410

    
1411
      # In Exec(), we warn about mirrored instances that have primary and
1412
      # secondary living in separate node groups. To fully verify that
1413
      # volumes for these instances are healthy, we will need to do an
1414
      # extra call to their secondaries. We ensure here those nodes will
1415
      # be locked.
1416
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1417
        # Important: access only the instances whose lock is owned
1418
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1419
          nodes.update(all_inst_info[inst].secondary_nodes)
1420

    
1421
      self.needed_locks[locking.LEVEL_NODE] = nodes
1422

    
1423
  def CheckPrereq(self):
1424
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1425
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1426

    
1427
    group_nodes = set(self.group_info.members)
1428
    group_instances = \
1429
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1430

    
1431
    unlocked_nodes = \
1432
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1433

    
1434
    unlocked_instances = \
1435
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1436

    
1437
    if unlocked_nodes:
1438
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1439
                                 utils.CommaJoin(unlocked_nodes),
1440
                                 errors.ECODE_STATE)
1441

    
1442
    if unlocked_instances:
1443
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1444
                                 utils.CommaJoin(unlocked_instances),
1445
                                 errors.ECODE_STATE)
1446

    
1447
    self.all_node_info = self.cfg.GetAllNodesInfo()
1448
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1449

    
1450
    self.my_node_names = utils.NiceSort(group_nodes)
1451
    self.my_inst_names = utils.NiceSort(group_instances)
1452

    
1453
    self.my_node_info = dict((name, self.all_node_info[name])
1454
                             for name in self.my_node_names)
1455

    
1456
    self.my_inst_info = dict((name, self.all_inst_info[name])
1457
                             for name in self.my_inst_names)
1458

    
1459
    # We detect here the nodes that will need the extra RPC calls for verifying
1460
    # split LV volumes; they should be locked.
1461
    extra_lv_nodes = set()
1462

    
1463
    for inst in self.my_inst_info.values():
1464
      if inst.disk_template in constants.DTS_INT_MIRROR:
1465
        for nname in inst.all_nodes:
1466
          if self.all_node_info[nname].group != self.group_uuid:
1467
            extra_lv_nodes.add(nname)
1468

    
1469
    unlocked_lv_nodes = \
1470
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1471

    
1472
    if unlocked_lv_nodes:
1473
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1474
                                 utils.CommaJoin(unlocked_lv_nodes),
1475
                                 errors.ECODE_STATE)
1476
    self.extra_lv_nodes = list(extra_lv_nodes)
1477

    
1478
  def _VerifyNode(self, ninfo, nresult):
1479
    """Perform some basic validation on data returned from a node.
1480

1481
      - check the result data structure is well formed and has all the
1482
        mandatory fields
1483
      - check ganeti version
1484

1485
    @type ninfo: L{objects.Node}
1486
    @param ninfo: the node to check
1487
    @param nresult: the results from the node
1488
    @rtype: boolean
1489
    @return: whether overall this call was successful (and we can expect
1490
         reasonable values in the respose)
1491

1492
    """
1493
    node = ninfo.name
1494
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1495

    
1496
    # main result, nresult should be a non-empty dict
1497
    test = not nresult or not isinstance(nresult, dict)
1498
    _ErrorIf(test, constants.CV_ENODERPC, node,
1499
                  "unable to verify node: no data returned")
1500
    if test:
1501
      return False
1502

    
1503
    # compares ganeti version
1504
    local_version = constants.PROTOCOL_VERSION
1505
    remote_version = nresult.get("version", None)
1506
    test = not (remote_version and
1507
                isinstance(remote_version, (list, tuple)) and
1508
                len(remote_version) == 2)
1509
    _ErrorIf(test, constants.CV_ENODERPC, node,
1510
             "connection to node returned invalid data")
1511
    if test:
1512
      return False
1513

    
1514
    test = local_version != remote_version[0]
1515
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1516
             "incompatible protocol versions: master %s,"
1517
             " node %s", local_version, remote_version[0])
1518
    if test:
1519
      return False
1520

    
1521
    # node seems compatible, we can actually try to look into its results
1522

    
1523
    # full package version
1524
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1525
                  constants.CV_ENODEVERSION, node,
1526
                  "software version mismatch: master %s, node %s",
1527
                  constants.RELEASE_VERSION, remote_version[1],
1528
                  code=self.ETYPE_WARNING)
1529

    
1530
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1531
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1532
      for hv_name, hv_result in hyp_result.iteritems():
1533
        test = hv_result is not None
1534
        _ErrorIf(test, constants.CV_ENODEHV, node,
1535
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1536

    
1537
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1538
    if ninfo.vm_capable and isinstance(hvp_result, list):
1539
      for item, hv_name, hv_result in hvp_result:
1540
        _ErrorIf(True, constants.CV_ENODEHV, node,
1541
                 "hypervisor %s parameter verify failure (source %s): %s",
1542
                 hv_name, item, hv_result)
1543

    
1544
    test = nresult.get(constants.NV_NODESETUP,
1545
                       ["Missing NODESETUP results"])
1546
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1547
             "; ".join(test))
1548

    
1549
    return True
1550

    
1551
  def _VerifyNodeTime(self, ninfo, nresult,
1552
                      nvinfo_starttime, nvinfo_endtime):
1553
    """Check the node time.
1554

1555
    @type ninfo: L{objects.Node}
1556
    @param ninfo: the node to check
1557
    @param nresult: the remote results for the node
1558
    @param nvinfo_starttime: the start time of the RPC call
1559
    @param nvinfo_endtime: the end time of the RPC call
1560

1561
    """
1562
    node = ninfo.name
1563
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1564

    
1565
    ntime = nresult.get(constants.NV_TIME, None)
1566
    try:
1567
      ntime_merged = utils.MergeTime(ntime)
1568
    except (ValueError, TypeError):
1569
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1570
      return
1571

    
1572
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1573
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1574
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1575
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1576
    else:
1577
      ntime_diff = None
1578

    
1579
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1580
             "Node time diverges by at least %s from master node time",
1581
             ntime_diff)
1582

    
1583
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1584
    """Check the node LVM results and update info for cross-node checks.
1585

1586
    @type ninfo: L{objects.Node}
1587
    @param ninfo: the node to check
1588
    @param nresult: the remote results for the node
1589
    @param vg_name: the configured VG name
1590
    @type nimg: L{NodeImage}
1591
    @param nimg: node image
1592

1593
    """
1594
    if vg_name is None:
1595
      return
1596

    
1597
    node = ninfo.name
1598
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1599

    
1600
    # checks vg existence and size > 20G
1601
    vglist = nresult.get(constants.NV_VGLIST, None)
1602
    test = not vglist
1603
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1604
    if not test:
1605
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1606
                                            constants.MIN_VG_SIZE)
1607
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1608

    
1609
    # Check PVs
1610
    (errmsgs, pvminmax) = _CheckNodePVs(nresult, self._exclusive_storage)
1611
    for em in errmsgs:
1612
      self._Error(constants.CV_ENODELVM, node, em)
1613
    if pvminmax is not None:
1614
      (nimg.pv_min, nimg.pv_max) = pvminmax
1615

    
1616
  def _VerifyGroupLVM(self, node_image, vg_name):
1617
    """Check cross-node consistency in LVM.
1618

1619
    @type node_image: dict
1620
    @param node_image: info about nodes, mapping from node to names to
1621
      L{NodeImage} objects
1622
    @param vg_name: the configured VG name
1623

1624
    """
1625
    if vg_name is None:
1626
      return
1627

    
1628
    # Only exlcusive storage needs this kind of checks
1629
    if not self._exclusive_storage:
1630
      return
1631

    
1632
    # exclusive_storage wants all PVs to have the same size (approximately),
1633
    # if the smallest and the biggest ones are okay, everything is fine.
1634
    # pv_min is None iff pv_max is None
1635
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1636
    if not vals:
1637
      return
1638
    (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1639
    (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1640
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1641
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1642
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1643
                  " on %s, biggest (%s MB) is on %s",
1644
                  pvmin, minnode, pvmax, maxnode)
1645

    
1646
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1647
    """Check the node bridges.
1648

1649
    @type ninfo: L{objects.Node}
1650
    @param ninfo: the node to check
1651
    @param nresult: the remote results for the node
1652
    @param bridges: the expected list of bridges
1653

1654
    """
1655
    if not bridges:
1656
      return
1657

    
1658
    node = ninfo.name
1659
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1660

    
1661
    missing = nresult.get(constants.NV_BRIDGES, None)
1662
    test = not isinstance(missing, list)
1663
    _ErrorIf(test, constants.CV_ENODENET, node,
1664
             "did not return valid bridge information")
1665
    if not test:
1666
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1667
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1668

    
1669
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1670
    """Check the results of user scripts presence and executability on the node
1671

1672
    @type ninfo: L{objects.Node}
1673
    @param ninfo: the node to check
1674
    @param nresult: the remote results for the node
1675

1676
    """
1677
    node = ninfo.name
1678

    
1679
    test = not constants.NV_USERSCRIPTS in nresult
1680
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1681
                  "did not return user scripts information")
1682

    
1683
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1684
    if not test:
1685
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1686
                    "user scripts not present or not executable: %s" %
1687
                    utils.CommaJoin(sorted(broken_scripts)))
1688

    
1689
  def _VerifyNodeNetwork(self, ninfo, nresult):
1690
    """Check the node network connectivity results.
1691

1692
    @type ninfo: L{objects.Node}
1693
    @param ninfo: the node to check
1694
    @param nresult: the remote results for the node
1695

1696
    """
1697
    node = ninfo.name
1698
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1699

    
1700
    test = constants.NV_NODELIST not in nresult
1701
    _ErrorIf(test, constants.CV_ENODESSH, node,
1702
             "node hasn't returned node ssh connectivity data")
1703
    if not test:
1704
      if nresult[constants.NV_NODELIST]:
1705
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1706
          _ErrorIf(True, constants.CV_ENODESSH, node,
1707
                   "ssh communication with node '%s': %s", a_node, a_msg)
1708

    
1709
    test = constants.NV_NODENETTEST not in nresult
1710
    _ErrorIf(test, constants.CV_ENODENET, node,
1711
             "node hasn't returned node tcp connectivity data")
1712
    if not test:
1713
      if nresult[constants.NV_NODENETTEST]:
1714
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1715
        for anode in nlist:
1716
          _ErrorIf(True, constants.CV_ENODENET, node,
1717
                   "tcp communication with node '%s': %s",
1718
                   anode, nresult[constants.NV_NODENETTEST][anode])
1719

    
1720
    test = constants.NV_MASTERIP not in nresult
1721
    _ErrorIf(test, constants.CV_ENODENET, node,
1722
             "node hasn't returned node master IP reachability data")
1723
    if not test:
1724
      if not nresult[constants.NV_MASTERIP]:
1725
        if node == self.master_node:
1726
          msg = "the master node cannot reach the master IP (not configured?)"
1727
        else:
1728
          msg = "cannot reach the master IP"
1729
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
1730

    
1731
  def _VerifyInstance(self, instance, inst_config, node_image,
1732
                      diskstatus):
1733
    """Verify an instance.
1734

1735
    This function checks to see if the required block devices are
1736
    available on the instance's node, and that the nodes are in the correct
1737
    state.
1738

1739
    """
1740
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1741
    pnode = inst_config.primary_node
1742
    pnode_img = node_image[pnode]
1743
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1744

    
1745
    node_vol_should = {}
1746
    inst_config.MapLVsByNode(node_vol_should)
1747

    
1748
    cluster = self.cfg.GetClusterInfo()
1749
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1750
                                                            self.group_info)
1751
    err = _ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1752
    _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1753
             code=self.ETYPE_WARNING)
1754

    
1755
    for node in node_vol_should:
1756
      n_img = node_image[node]
1757
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1758
        # ignore missing volumes on offline or broken nodes
1759
        continue
1760
      for volume in node_vol_should[node]:
1761
        test = volume not in n_img.volumes
1762
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1763
                 "volume %s missing on node %s", volume, node)
1764

    
1765
    if inst_config.admin_state == constants.ADMINST_UP:
1766
      test = instance not in pnode_img.instances and not pnode_img.offline
1767
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1768
               "instance not running on its primary node %s",
1769
               pnode)
1770
      _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1771
               "instance is marked as running and lives on offline node %s",
1772
               pnode)
1773

    
1774
    diskdata = [(nname, success, status, idx)
1775
                for (nname, disks) in diskstatus.items()
1776
                for idx, (success, status) in enumerate(disks)]
1777

    
1778
    for nname, success, bdev_status, idx in diskdata:
1779
      # the 'ghost node' construction in Exec() ensures that we have a
1780
      # node here
1781
      snode = node_image[nname]
1782
      bad_snode = snode.ghost or snode.offline
1783
      _ErrorIf(inst_config.admin_state == constants.ADMINST_UP and
1784
               not success and not bad_snode,
1785
               constants.CV_EINSTANCEFAULTYDISK, instance,
1786
               "couldn't retrieve status for disk/%s on %s: %s",
1787
               idx, nname, bdev_status)
1788
      _ErrorIf((inst_config.admin_state == constants.ADMINST_UP and
1789
                success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1790
               constants.CV_EINSTANCEFAULTYDISK, instance,
1791
               "disk/%s on %s is faulty", idx, nname)
1792

    
1793
    _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1794
             constants.CV_ENODERPC, pnode, "instance %s, connection to"
1795
             " primary node failed", instance)
1796

    
1797
    _ErrorIf(len(inst_config.secondary_nodes) > 1,
1798
             constants.CV_EINSTANCELAYOUT,
1799
             instance, "instance has multiple secondary nodes: %s",
1800
             utils.CommaJoin(inst_config.secondary_nodes),
1801
             code=self.ETYPE_WARNING)
1802

    
1803
    if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1804
      # Disk template not compatible with exclusive_storage: no instance
1805
      # node should have the flag set
1806
      es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1807
                                                     inst_config.all_nodes)
1808
      es_nodes = [n for (n, es) in es_flags.items()
1809
                  if es]
1810
      _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance,
1811
               "instance has template %s, which is not supported on nodes"
1812
               " that have exclusive storage set: %s",
1813
               inst_config.disk_template, utils.CommaJoin(es_nodes))
1814

    
1815
    if inst_config.disk_template in constants.DTS_INT_MIRROR:
1816
      instance_nodes = utils.NiceSort(inst_config.all_nodes)
1817
      instance_groups = {}
1818

    
1819
      for node in instance_nodes:
1820
        instance_groups.setdefault(self.all_node_info[node].group,
1821
                                   []).append(node)
1822

    
1823
      pretty_list = [
1824
        "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1825
        # Sort so that we always list the primary node first.
1826
        for group, nodes in sorted(instance_groups.items(),
1827
                                   key=lambda (_, nodes): pnode in nodes,
1828
                                   reverse=True)]
1829

    
1830
      self._ErrorIf(len(instance_groups) > 1,
1831
                    constants.CV_EINSTANCESPLITGROUPS,
1832
                    instance, "instance has primary and secondary nodes in"
1833
                    " different groups: %s", utils.CommaJoin(pretty_list),
1834
                    code=self.ETYPE_WARNING)
1835

    
1836
    inst_nodes_offline = []
1837
    for snode in inst_config.secondary_nodes:
1838
      s_img = node_image[snode]
1839
      _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1840
               snode, "instance %s, connection to secondary node failed",
1841
               instance)
1842

    
1843
      if s_img.offline:
1844
        inst_nodes_offline.append(snode)
1845

    
1846
    # warn that the instance lives on offline nodes
1847
    _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1848
             "instance has offline secondary node(s) %s",
1849
             utils.CommaJoin(inst_nodes_offline))
1850
    # ... or ghost/non-vm_capable nodes
1851
    for node in inst_config.all_nodes:
1852
      _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1853
               instance, "instance lives on ghost node %s", node)
1854
      _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1855
               instance, "instance lives on non-vm_capable node %s", node)
1856

    
1857
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1858
    """Verify if there are any unknown volumes in the cluster.
1859

1860
    The .os, .swap and backup volumes are ignored. All other volumes are
1861
    reported as unknown.
1862

1863
    @type reserved: L{ganeti.utils.FieldSet}
1864
    @param reserved: a FieldSet of reserved volume names
1865

1866
    """
1867
    for node, n_img in node_image.items():
1868
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1869
          self.all_node_info[node].group != self.group_uuid):
1870
        # skip non-healthy nodes
1871
        continue
1872
      for volume in n_img.volumes:
1873
        test = ((node not in node_vol_should or
1874
                volume not in node_vol_should[node]) and
1875
                not reserved.Matches(volume))
1876
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1877
                      "volume %s is unknown", volume)
1878

    
1879
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1880
    """Verify N+1 Memory Resilience.
1881

1882
    Check that if one single node dies we can still start all the
1883
    instances it was primary for.
1884

1885
    """
1886
    cluster_info = self.cfg.GetClusterInfo()
1887
    for node, n_img in node_image.items():
1888
      # This code checks that every node which is now listed as
1889
      # secondary has enough memory to host all instances it is
1890
      # supposed to should a single other node in the cluster fail.
1891
      # FIXME: not ready for failover to an arbitrary node
1892
      # FIXME: does not support file-backed instances
1893
      # WARNING: we currently take into account down instances as well
1894
      # as up ones, considering that even if they're down someone
1895
      # might want to start them even in the event of a node failure.
1896
      if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1897
        # we're skipping nodes marked offline and nodes in other groups from
1898
        # the N+1 warning, since most likely we don't have good memory
1899
        # infromation from them; we already list instances living on such
1900
        # nodes, and that's enough warning
1901
        continue
1902
      #TODO(dynmem): also consider ballooning out other instances
1903
      for prinode, instances in n_img.sbp.items():
1904
        needed_mem = 0
1905
        for instance in instances:
1906
          bep = cluster_info.FillBE(instance_cfg[instance])
1907
          if bep[constants.BE_AUTO_BALANCE]:
1908
            needed_mem += bep[constants.BE_MINMEM]
1909
        test = n_img.mfree < needed_mem
1910
        self._ErrorIf(test, constants.CV_ENODEN1, node,
1911
                      "not enough memory to accomodate instance failovers"
1912
                      " should node %s fail (%dMiB needed, %dMiB available)",
1913
                      prinode, needed_mem, n_img.mfree)
1914

    
1915
  @classmethod
1916
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1917
                   (files_all, files_opt, files_mc, files_vm)):
1918
    """Verifies file checksums collected from all nodes.
1919

1920
    @param errorif: Callback for reporting errors
1921
    @param nodeinfo: List of L{objects.Node} objects
1922
    @param master_node: Name of master node
1923
    @param all_nvinfo: RPC results
1924

1925
    """
1926
    # Define functions determining which nodes to consider for a file
1927
    files2nodefn = [
1928
      (files_all, None),
1929
      (files_mc, lambda node: (node.master_candidate or
1930
                               node.name == master_node)),
1931
      (files_vm, lambda node: node.vm_capable),
1932
      ]
1933

    
1934
    # Build mapping from filename to list of nodes which should have the file
1935
    nodefiles = {}
1936
    for (files, fn) in files2nodefn:
1937
      if fn is None:
1938
        filenodes = nodeinfo
1939
      else:
1940
        filenodes = filter(fn, nodeinfo)
1941
      nodefiles.update((filename,
1942
                        frozenset(map(operator.attrgetter("name"), filenodes)))
1943
                       for filename in files)
1944

    
1945
    assert set(nodefiles) == (files_all | files_mc | files_vm)
1946

    
1947
    fileinfo = dict((filename, {}) for filename in nodefiles)
1948
    ignore_nodes = set()
1949

    
1950
    for node in nodeinfo:
1951
      if node.offline:
1952
        ignore_nodes.add(node.name)
1953
        continue
1954

    
1955
      nresult = all_nvinfo[node.name]
1956

    
1957
      if nresult.fail_msg or not nresult.payload:
1958
        node_files = None
1959
      else:
1960
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
1961
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
1962
                          for (key, value) in fingerprints.items())
1963
        del fingerprints
1964

    
1965
      test = not (node_files and isinstance(node_files, dict))
1966
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
1967
              "Node did not return file checksum data")
1968
      if test:
1969
        ignore_nodes.add(node.name)
1970
        continue
1971

    
1972
      # Build per-checksum mapping from filename to nodes having it
1973
      for (filename, checksum) in node_files.items():
1974
        assert filename in nodefiles
1975
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
1976

    
1977
    for (filename, checksums) in fileinfo.items():
1978
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1979

    
1980
      # Nodes having the file
1981
      with_file = frozenset(node_name
1982
                            for nodes in fileinfo[filename].values()
1983
                            for node_name in nodes) - ignore_nodes
1984

    
1985
      expected_nodes = nodefiles[filename] - ignore_nodes
1986

    
1987
      # Nodes missing file
1988
      missing_file = expected_nodes - with_file
1989

    
1990
      if filename in files_opt:
1991
        # All or no nodes
1992
        errorif(missing_file and missing_file != expected_nodes,
1993
                constants.CV_ECLUSTERFILECHECK, None,
1994
                "File %s is optional, but it must exist on all or no"
1995
                " nodes (not found on %s)",
1996
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
1997
      else:
1998
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
1999
                "File %s is missing from node(s) %s", filename,
2000
                utils.CommaJoin(utils.NiceSort(missing_file)))
2001

    
2002
        # Warn if a node has a file it shouldn't
2003
        unexpected = with_file - expected_nodes
2004
        errorif(unexpected,
2005
                constants.CV_ECLUSTERFILECHECK, None,
2006
                "File %s should not exist on node(s) %s",
2007
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2008

    
2009
      # See if there are multiple versions of the file
2010
      test = len(checksums) > 1
2011
      if test:
2012
        variants = ["variant %s on %s" %
2013
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2014
                    for (idx, (checksum, nodes)) in
2015
                      enumerate(sorted(checksums.items()))]
2016
      else:
2017
        variants = []
2018

    
2019
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2020
              "File %s found with %s different checksums (%s)",
2021
              filename, len(checksums), "; ".join(variants))
2022

    
2023
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2024
                      drbd_map):
2025
    """Verifies and the node DRBD status.
2026

2027
    @type ninfo: L{objects.Node}
2028
    @param ninfo: the node to check
2029
    @param nresult: the remote results for the node
2030
    @param instanceinfo: the dict of instances
2031
    @param drbd_helper: the configured DRBD usermode helper
2032
    @param drbd_map: the DRBD map as returned by
2033
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2034

2035
    """
2036
    node = ninfo.name
2037
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2038

    
2039
    if drbd_helper:
2040
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2041
      test = (helper_result is None)
2042
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2043
               "no drbd usermode helper returned")
2044
      if helper_result:
2045
        status, payload = helper_result
2046
        test = not status
2047
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2048
                 "drbd usermode helper check unsuccessful: %s", payload)
2049
        test = status and (payload != drbd_helper)
2050
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2051
                 "wrong drbd usermode helper: %s", payload)
2052

    
2053
    # compute the DRBD minors
2054
    node_drbd = {}
2055
    for minor, instance in drbd_map[node].items():
2056
      test = instance not in instanceinfo
2057
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2058
               "ghost instance '%s' in temporary DRBD map", instance)
2059
        # ghost instance should not be running, but otherwise we
2060
        # don't give double warnings (both ghost instance and
2061
        # unallocated minor in use)
2062
      if test:
2063
        node_drbd[minor] = (instance, False)
2064
      else:
2065
        instance = instanceinfo[instance]
2066
        node_drbd[minor] = (instance.name,
2067
                            instance.admin_state == constants.ADMINST_UP)
2068

    
2069
    # and now check them
2070
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2071
    test = not isinstance(used_minors, (tuple, list))
2072
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2073
             "cannot parse drbd status file: %s", str(used_minors))
2074
    if test:
2075
      # we cannot check drbd status
2076
      return
2077

    
2078
    for minor, (iname, must_exist) in node_drbd.items():
2079
      test = minor not in used_minors and must_exist
2080
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2081
               "drbd minor %d of instance %s is not active", minor, iname)
2082
    for minor in used_minors:
2083
      test = minor not in node_drbd
2084
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2085
               "unallocated drbd minor %d is in use", minor)
2086

    
2087
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2088
    """Builds the node OS structures.
2089

2090
    @type ninfo: L{objects.Node}
2091
    @param ninfo: the node to check
2092
    @param nresult: the remote results for the node
2093
    @param nimg: the node image object
2094

2095
    """
2096
    node = ninfo.name
2097
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2098

    
2099
    remote_os = nresult.get(constants.NV_OSLIST, None)
2100
    test = (not isinstance(remote_os, list) or
2101
            not compat.all(isinstance(v, list) and len(v) == 7
2102
                           for v in remote_os))
2103

    
2104
    _ErrorIf(test, constants.CV_ENODEOS, node,
2105
             "node hasn't returned valid OS data")
2106

    
2107
    nimg.os_fail = test
2108

    
2109
    if test:
2110
      return
2111

    
2112
    os_dict = {}
2113

    
2114
    for (name, os_path, status, diagnose,
2115
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2116

    
2117
      if name not in os_dict:
2118
        os_dict[name] = []
2119

    
2120
      # parameters is a list of lists instead of list of tuples due to
2121
      # JSON lacking a real tuple type, fix it:
2122
      parameters = [tuple(v) for v in parameters]
2123
      os_dict[name].append((os_path, status, diagnose,
2124
                            set(variants), set(parameters), set(api_ver)))
2125

    
2126
    nimg.oslist = os_dict
2127

    
2128
  def _VerifyNodeOS(self, ninfo, nimg, base):
2129
    """Verifies the node OS list.
2130

2131
    @type ninfo: L{objects.Node}
2132
    @param ninfo: the node to check
2133
    @param nimg: the node image object
2134
    @param base: the 'template' node we match against (e.g. from the master)
2135

2136
    """
2137
    node = ninfo.name
2138
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2139

    
2140
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2141

    
2142
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2143
    for os_name, os_data in nimg.oslist.items():
2144
      assert os_data, "Empty OS status for OS %s?!" % os_name
2145
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2146
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2147
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2148
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2149
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2150
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2151
      # comparisons with the 'base' image
2152
      test = os_name not in base.oslist
2153
      _ErrorIf(test, constants.CV_ENODEOS, node,
2154
               "Extra OS %s not present on reference node (%s)",
2155
               os_name, base.name)
2156
      if test:
2157
        continue
2158
      assert base.oslist[os_name], "Base node has empty OS status?"
2159
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2160
      if not b_status:
2161
        # base OS is invalid, skipping
2162
        continue
2163
      for kind, a, b in [("API version", f_api, b_api),
2164
                         ("variants list", f_var, b_var),
2165
                         ("parameters", beautify_params(f_param),
2166
                          beautify_params(b_param))]:
2167
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2168
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2169
                 kind, os_name, base.name,
2170
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2171

    
2172
    # check any missing OSes
2173
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2174
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2175
             "OSes present on reference node %s but missing on this node: %s",
2176
             base.name, utils.CommaJoin(missing))
2177

    
2178
  def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2179
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2180

2181
    @type ninfo: L{objects.Node}
2182
    @param ninfo: the node to check
2183
    @param nresult: the remote results for the node
2184
    @type is_master: bool
2185
    @param is_master: Whether node is the master node
2186

2187
    """
2188
    node = ninfo.name
2189

    
2190
    if (is_master and
2191
        (constants.ENABLE_FILE_STORAGE or
2192
         constants.ENABLE_SHARED_FILE_STORAGE)):
2193
      try:
2194
        fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2195
      except KeyError:
2196
        # This should never happen
2197
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2198
                      "Node did not return forbidden file storage paths")
2199
      else:
2200
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2201
                      "Found forbidden file storage paths: %s",
2202
                      utils.CommaJoin(fspaths))
2203
    else:
2204
      self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2205
                    constants.CV_ENODEFILESTORAGEPATHS, node,
2206
                    "Node should not have returned forbidden file storage"
2207
                    " paths")
2208

    
2209
  def _VerifyOob(self, ninfo, nresult):
2210
    """Verifies out of band functionality of a node.
2211

2212
    @type ninfo: L{objects.Node}
2213
    @param ninfo: the node to check
2214
    @param nresult: the remote results for the node
2215

2216
    """
2217
    node = ninfo.name
2218
    # We just have to verify the paths on master and/or master candidates
2219
    # as the oob helper is invoked on the master
2220
    if ((ninfo.master_candidate or ninfo.master_capable) and
2221
        constants.NV_OOB_PATHS in nresult):
2222
      for path_result in nresult[constants.NV_OOB_PATHS]:
2223
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2224

    
2225
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2226
    """Verifies and updates the node volume data.
2227

2228
    This function will update a L{NodeImage}'s internal structures
2229
    with data from the remote call.
2230

2231
    @type ninfo: L{objects.Node}
2232
    @param ninfo: the node to check
2233
    @param nresult: the remote results for the node
2234
    @param nimg: the node image object
2235
    @param vg_name: the configured VG name
2236

2237
    """
2238
    node = ninfo.name
2239
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2240

    
2241
    nimg.lvm_fail = True
2242
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2243
    if vg_name is None:
2244
      pass
2245
    elif isinstance(lvdata, basestring):
2246
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2247
               utils.SafeEncode(lvdata))
2248
    elif not isinstance(lvdata, dict):
2249
      _ErrorIf(True, constants.CV_ENODELVM, node,
2250
               "rpc call to node failed (lvlist)")
2251
    else:
2252
      nimg.volumes = lvdata
2253
      nimg.lvm_fail = False
2254

    
2255
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2256
    """Verifies and updates the node instance list.
2257

2258
    If the listing was successful, then updates this node's instance
2259
    list. Otherwise, it marks the RPC call as failed for the instance
2260
    list key.
2261

2262
    @type ninfo: L{objects.Node}
2263
    @param ninfo: the node to check
2264
    @param nresult: the remote results for the node
2265
    @param nimg: the node image object
2266

2267
    """
2268
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2269
    test = not isinstance(idata, list)
2270
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2271
                  "rpc call to node failed (instancelist): %s",
2272
                  utils.SafeEncode(str(idata)))
2273
    if test:
2274
      nimg.hyp_fail = True
2275
    else:
2276
      nimg.instances = idata
2277

    
2278
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2279
    """Verifies and computes a node information map
2280

2281
    @type ninfo: L{objects.Node}
2282
    @param ninfo: the node to check
2283
    @param nresult: the remote results for the node
2284
    @param nimg: the node image object
2285
    @param vg_name: the configured VG name
2286

2287
    """
2288
    node = ninfo.name
2289
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2290

    
2291
    # try to read free memory (from the hypervisor)
2292
    hv_info = nresult.get(constants.NV_HVINFO, None)
2293
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2294
    _ErrorIf(test, constants.CV_ENODEHV, node,
2295
             "rpc call to node failed (hvinfo)")
2296
    if not test:
2297
      try:
2298
        nimg.mfree = int(hv_info["memory_free"])
2299
      except (ValueError, TypeError):
2300
        _ErrorIf(True, constants.CV_ENODERPC, node,
2301
                 "node returned invalid nodeinfo, check hypervisor")
2302

    
2303
    # FIXME: devise a free space model for file based instances as well
2304
    if vg_name is not None:
2305
      test = (constants.NV_VGLIST not in nresult or
2306
              vg_name not in nresult[constants.NV_VGLIST])
2307
      _ErrorIf(test, constants.CV_ENODELVM, node,
2308
               "node didn't return data for the volume group '%s'"
2309
               " - it is either missing or broken", vg_name)
2310
      if not test:
2311
        try:
2312
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2313
        except (ValueError, TypeError):
2314
          _ErrorIf(True, constants.CV_ENODERPC, node,
2315
                   "node returned invalid LVM info, check LVM status")
2316

    
2317
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2318
    """Gets per-disk status information for all instances.
2319

2320
    @type nodelist: list of strings
2321
    @param nodelist: Node names
2322
    @type node_image: dict of (name, L{objects.Node})
2323
    @param node_image: Node objects
2324
    @type instanceinfo: dict of (name, L{objects.Instance})
2325
    @param instanceinfo: Instance objects
2326
    @rtype: {instance: {node: [(succes, payload)]}}
2327
    @return: a dictionary of per-instance dictionaries with nodes as
2328
        keys and disk information as values; the disk information is a
2329
        list of tuples (success, payload)
2330

2331
    """
2332
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2333

    
2334
    node_disks = {}
2335
    node_disks_devonly = {}
2336
    diskless_instances = set()
2337
    diskless = constants.DT_DISKLESS
2338

    
2339
    for nname in nodelist:
2340
      node_instances = list(itertools.chain(node_image[nname].pinst,
2341
                                            node_image[nname].sinst))
2342
      diskless_instances.update(inst for inst in node_instances
2343
                                if instanceinfo[inst].disk_template == diskless)
2344
      disks = [(inst, disk)
2345
               for inst in node_instances
2346
               for disk in instanceinfo[inst].disks]
2347

    
2348
      if not disks:
2349
        # No need to collect data
2350
        continue
2351

    
2352
      node_disks[nname] = disks
2353

    
2354
      # _AnnotateDiskParams makes already copies of the disks
2355
      devonly = []
2356
      for (inst, dev) in disks:
2357
        (anno_disk,) = _AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2358
        self.cfg.SetDiskID(anno_disk, nname)
2359
        devonly.append(anno_disk)
2360

    
2361
      node_disks_devonly[nname] = devonly
2362

    
2363
    assert len(node_disks) == len(node_disks_devonly)
2364

    
2365
    # Collect data from all nodes with disks
2366
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2367
                                                          node_disks_devonly)
2368

    
2369
    assert len(result) == len(node_disks)
2370

    
2371
    instdisk = {}
2372

    
2373
    for (nname, nres) in result.items():
2374
      disks = node_disks[nname]
2375

    
2376
      if nres.offline:
2377
        # No data from this node
2378
        data = len(disks) * [(False, "node offline")]
2379
      else:
2380
        msg = nres.fail_msg
2381
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2382
                 "while getting disk information: %s", msg)
2383
        if msg:
2384
          # No data from this node
2385
          data = len(disks) * [(False, msg)]
2386
        else:
2387
          data = []
2388
          for idx, i in enumerate(nres.payload):
2389
            if isinstance(i, (tuple, list)) and len(i) == 2:
2390
              data.append(i)
2391
            else:
2392
              logging.warning("Invalid result from node %s, entry %d: %s",
2393
                              nname, idx, i)
2394
              data.append((False, "Invalid result from the remote node"))
2395

    
2396
      for ((inst, _), status) in zip(disks, data):
2397
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2398

    
2399
    # Add empty entries for diskless instances.
2400
    for inst in diskless_instances:
2401
      assert inst not in instdisk
2402
      instdisk[inst] = {}
2403

    
2404
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2405
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2406
                      compat.all(isinstance(s, (tuple, list)) and
2407
                                 len(s) == 2 for s in statuses)
2408
                      for inst, nnames in instdisk.items()
2409
                      for nname, statuses in nnames.items())
2410
    if __debug__:
2411
      instdisk_keys = set(instdisk)
2412
      instanceinfo_keys = set(instanceinfo)
2413
      assert instdisk_keys == instanceinfo_keys, \
2414
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2415
         (instdisk_keys, instanceinfo_keys))
2416

    
2417
    return instdisk
2418

    
2419
  @staticmethod
2420
  def _SshNodeSelector(group_uuid, all_nodes):
2421
    """Create endless iterators for all potential SSH check hosts.
2422

2423
    """
2424
    nodes = [node for node in all_nodes
2425
             if (node.group != group_uuid and
2426
                 not node.offline)]
2427
    keyfunc = operator.attrgetter("group")
2428

    
2429
    return map(itertools.cycle,
2430
               [sorted(map(operator.attrgetter("name"), names))
2431
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2432
                                                  keyfunc)])
2433

    
2434
  @classmethod
2435
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2436
    """Choose which nodes should talk to which other nodes.
2437

2438
    We will make nodes contact all nodes in their group, and one node from
2439
    every other group.
2440

2441
    @warning: This algorithm has a known issue if one node group is much
2442
      smaller than others (e.g. just one node). In such a case all other
2443
      nodes will talk to the single node.
2444

2445
    """
2446
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2447
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2448

    
2449
    return (online_nodes,
2450
            dict((name, sorted([i.next() for i in sel]))
2451
                 for name in online_nodes))
2452

    
2453
  def BuildHooksEnv(self):
2454
    """Build hooks env.
2455

2456
    Cluster-Verify hooks just ran in the post phase and their failure makes
2457
    the output be logged in the verify output and the verification to fail.
2458

2459
    """
2460
    env = {
2461
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2462
      }
2463

    
2464
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2465
               for node in self.my_node_info.values())
2466

    
2467
    return env
2468

    
2469
  def BuildHooksNodes(self):
2470
    """Build hooks nodes.
2471

2472
    """
2473
    return ([], self.my_node_names)
2474

    
2475
  def Exec(self, feedback_fn):
2476
    """Verify integrity of the node group, performing various test on nodes.
2477

2478
    """
2479
    # This method has too many local variables. pylint: disable=R0914
2480
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2481

    
2482
    if not self.my_node_names:
2483
      # empty node group
2484
      feedback_fn("* Empty node group, skipping verification")
2485
      return True
2486

    
2487
    self.bad = False
2488
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2489
    verbose = self.op.verbose
2490
    self._feedback_fn = feedback_fn
2491

    
2492
    vg_name = self.cfg.GetVGName()
2493
    drbd_helper = self.cfg.GetDRBDHelper()
2494
    cluster = self.cfg.GetClusterInfo()
2495
    hypervisors = cluster.enabled_hypervisors
2496
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2497

    
2498
    i_non_redundant = [] # Non redundant instances
2499
    i_non_a_balanced = [] # Non auto-balanced instances
2500
    i_offline = 0 # Count of offline instances
2501
    n_offline = 0 # Count of offline nodes
2502
    n_drained = 0 # Count of nodes being drained
2503
    node_vol_should = {}
2504

    
2505
    # FIXME: verify OS list
2506

    
2507
    # File verification
2508
    filemap = _ComputeAncillaryFiles(cluster, False)
2509

    
2510
    # do local checksums
2511
    master_node = self.master_node = self.cfg.GetMasterNode()
2512
    master_ip = self.cfg.GetMasterIP()
2513

    
2514
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2515

    
2516
    user_scripts = []
2517
    if self.cfg.GetUseExternalMipScript():
2518
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2519

    
2520
    node_verify_param = {
2521
      constants.NV_FILELIST:
2522
        map(vcluster.MakeVirtualPath,
2523
            utils.UniqueSequence(filename
2524
                                 for files in filemap
2525
                                 for filename in files)),
2526
      constants.NV_NODELIST:
2527
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2528
                                  self.all_node_info.values()),
2529
      constants.NV_HYPERVISOR: hypervisors,
2530
      constants.NV_HVPARAMS:
2531
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2532
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2533
                                 for node in node_data_list
2534
                                 if not node.offline],
2535
      constants.NV_INSTANCELIST: hypervisors,
2536
      constants.NV_VERSION: None,
2537
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2538
      constants.NV_NODESETUP: None,
2539
      constants.NV_TIME: None,
2540
      constants.NV_MASTERIP: (master_node, master_ip),
2541
      constants.NV_OSLIST: None,
2542
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2543
      constants.NV_USERSCRIPTS: user_scripts,
2544
      }
2545

    
2546
    if vg_name is not None:
2547
      node_verify_param[constants.NV_VGLIST] = None
2548
      node_verify_param[constants.NV_LVLIST] = vg_name
2549
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2550

    
2551
    if drbd_helper:
2552
      node_verify_param[constants.NV_DRBDLIST] = None
2553
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2554

    
2555
    if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2556
      # Load file storage paths only from master node
2557
      node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2558

    
2559
    # bridge checks
2560
    # FIXME: this needs to be changed per node-group, not cluster-wide
2561
    bridges = set()
2562
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2563
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2564
      bridges.add(default_nicpp[constants.NIC_LINK])
2565
    for instance in self.my_inst_info.values():
2566
      for nic in instance.nics:
2567
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2568
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2569
          bridges.add(full_nic[constants.NIC_LINK])
2570

    
2571
    if bridges:
2572
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2573

    
2574
    # Build our expected cluster state
2575
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2576
                                                 name=node.name,
2577
                                                 vm_capable=node.vm_capable))
2578
                      for node in node_data_list)
2579

    
2580
    # Gather OOB paths
2581
    oob_paths = []
2582
    for node in self.all_node_info.values():
2583
      path = _SupportsOob(self.cfg, node)
2584
      if path and path not in oob_paths:
2585
        oob_paths.append(path)
2586

    
2587
    if oob_paths:
2588
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2589

    
2590
    for instance in self.my_inst_names:
2591
      inst_config = self.my_inst_info[instance]
2592
      if inst_config.admin_state == constants.ADMINST_OFFLINE:
2593
        i_offline += 1
2594

    
2595
      for nname in inst_config.all_nodes:
2596
        if nname not in node_image:
2597
          gnode = self.NodeImage(name=nname)
2598
          gnode.ghost = (nname not in self.all_node_info)
2599
          node_image[nname] = gnode
2600

    
2601
      inst_config.MapLVsByNode(node_vol_should)
2602

    
2603
      pnode = inst_config.primary_node
2604
      node_image[pnode].pinst.append(instance)
2605

    
2606
      for snode in inst_config.secondary_nodes:
2607
        nimg = node_image[snode]
2608
        nimg.sinst.append(instance)
2609
        if pnode not in nimg.sbp:
2610
          nimg.sbp[pnode] = []
2611
        nimg.sbp[pnode].append(instance)
2612

    
2613
    es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2614
    # The value of exclusive_storage should be the same across the group, so if
2615
    # it's True for at least a node, we act as if it were set for all the nodes
2616
    self._exclusive_storage = compat.any(es_flags.values())
2617
    if self._exclusive_storage:
2618
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2619

    
2620
    # At this point, we have the in-memory data structures complete,
2621
    # except for the runtime information, which we'll gather next
2622

    
2623
    # Due to the way our RPC system works, exact response times cannot be
2624
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2625
    # time before and after executing the request, we can at least have a time
2626
    # window.
2627
    nvinfo_starttime = time.time()
2628
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2629
                                           node_verify_param,
2630
                                           self.cfg.GetClusterName())
2631
    nvinfo_endtime = time.time()
2632

    
2633
    if self.extra_lv_nodes and vg_name is not None:
2634
      extra_lv_nvinfo = \
2635
          self.rpc.call_node_verify(self.extra_lv_nodes,
2636
                                    {constants.NV_LVLIST: vg_name},
2637
                                    self.cfg.GetClusterName())
2638
    else:
2639
      extra_lv_nvinfo = {}
2640

    
2641
    all_drbd_map = self.cfg.ComputeDRBDMap()
2642

    
2643
    feedback_fn("* Gathering disk information (%s nodes)" %
2644
                len(self.my_node_names))
2645
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2646
                                     self.my_inst_info)
2647

    
2648
    feedback_fn("* Verifying configuration file consistency")
2649

    
2650
    # If not all nodes are being checked, we need to make sure the master node
2651
    # and a non-checked vm_capable node are in the list.
2652
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2653
    if absent_nodes:
2654
      vf_nvinfo = all_nvinfo.copy()
2655
      vf_node_info = list(self.my_node_info.values())
2656
      additional_nodes = []
2657
      if master_node not in self.my_node_info:
2658
        additional_nodes.append(master_node)
2659
        vf_node_info.append(self.all_node_info[master_node])
2660
      # Add the first vm_capable node we find which is not included,
2661
      # excluding the master node (which we already have)
2662
      for node in absent_nodes:
2663
        nodeinfo = self.all_node_info[node]
2664
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2665
            node != master_node):
2666
          additional_nodes.append(node)
2667
          vf_node_info.append(self.all_node_info[node])
2668
          break
2669
      key = constants.NV_FILELIST
2670
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2671
                                                 {key: node_verify_param[key]},
2672
                                                 self.cfg.GetClusterName()))
2673
    else:
2674
      vf_nvinfo = all_nvinfo
2675
      vf_node_info = self.my_node_info.values()
2676

    
2677
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2678

    
2679
    feedback_fn("* Verifying node status")
2680

    
2681
    refos_img = None
2682

    
2683
    for node_i in node_data_list:
2684
      node = node_i.name
2685
      nimg = node_image[node]
2686

    
2687
      if node_i.offline:
2688
        if verbose:
2689
          feedback_fn("* Skipping offline node %s" % (node,))
2690
        n_offline += 1
2691
        continue
2692

    
2693
      if node == master_node:
2694
        ntype = "master"
2695
      elif node_i.master_candidate:
2696
        ntype = "master candidate"
2697
      elif node_i.drained:
2698
        ntype = "drained"
2699
        n_drained += 1
2700
      else:
2701
        ntype = "regular"
2702
      if verbose:
2703
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2704

    
2705
      msg = all_nvinfo[node].fail_msg
2706
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2707
               msg)
2708
      if msg:
2709
        nimg.rpc_fail = True
2710
        continue
2711

    
2712
      nresult = all_nvinfo[node].payload
2713

    
2714
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2715
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2716
      self._VerifyNodeNetwork(node_i, nresult)
2717
      self._VerifyNodeUserScripts(node_i, nresult)
2718
      self._VerifyOob(node_i, nresult)
2719
      self._VerifyFileStoragePaths(node_i, nresult,
2720
                                   node == master_node)
2721

    
2722
      if nimg.vm_capable:
2723
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2724
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2725
                             all_drbd_map)
2726

    
2727
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2728
        self._UpdateNodeInstances(node_i, nresult, nimg)
2729
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2730
        self._UpdateNodeOS(node_i, nresult, nimg)
2731

    
2732
        if not nimg.os_fail:
2733
          if refos_img is None:
2734
            refos_img = nimg
2735
          self._VerifyNodeOS(node_i, nimg, refos_img)
2736
        self._VerifyNodeBridges(node_i, nresult, bridges)
2737

    
2738
        # Check whether all running instancies are primary for the node. (This
2739
        # can no longer be done from _VerifyInstance below, since some of the
2740
        # wrong instances could be from other node groups.)
2741
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2742

    
2743
        for inst in non_primary_inst:
2744
          test = inst in self.all_inst_info
2745
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2746
                   "instance should not run on node %s", node_i.name)
2747
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2748
                   "node is running unknown instance %s", inst)
2749

    
2750
    self._VerifyGroupLVM(node_image, vg_name)
2751

    
2752
    for node, result in extra_lv_nvinfo.items():
2753
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2754
                              node_image[node], vg_name)
2755

    
2756
    feedback_fn("* Verifying instance status")
2757
    for instance in self.my_inst_names:
2758
      if verbose:
2759
        feedback_fn("* Verifying instance %s" % instance)
2760
      inst_config = self.my_inst_info[instance]
2761
      self._VerifyInstance(instance, inst_config, node_image,
2762
                           instdisk[instance])
2763

    
2764
      # If the instance is non-redundant we cannot survive losing its primary
2765
      # node, so we are not N+1 compliant.
2766
      if inst_config.disk_template not in constants.DTS_MIRRORED:
2767
        i_non_redundant.append(instance)
2768

    
2769
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2770
        i_non_a_balanced.append(instance)
2771

    
2772
    feedback_fn("* Verifying orphan volumes")
2773
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2774

    
2775
    # We will get spurious "unknown volume" warnings if any node of this group
2776
    # is secondary for an instance whose primary is in another group. To avoid
2777
    # them, we find these instances and add their volumes to node_vol_should.
2778
    for inst in self.all_inst_info.values():
2779
      for secondary in inst.secondary_nodes:
2780
        if (secondary in self.my_node_info
2781
            and inst.name not in self.my_inst_info):
2782
          inst.MapLVsByNode(node_vol_should)
2783
          break
2784

    
2785
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2786

    
2787
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2788
      feedback_fn("* Verifying N+1 Memory redundancy")
2789
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2790

    
2791
    feedback_fn("* Other Notes")
2792
    if i_non_redundant:
2793
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2794
                  % len(i_non_redundant))
2795

    
2796
    if i_non_a_balanced:
2797
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2798
                  % len(i_non_a_balanced))
2799

    
2800
    if i_offline:
2801
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2802

    
2803
    if n_offline:
2804
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2805

    
2806
    if n_drained:
2807
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2808

    
2809
    return not self.bad
2810

    
2811
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2812
    """Analyze the post-hooks' result
2813

2814
    This method analyses the hook result, handles it, and sends some
2815
    nicely-formatted feedback back to the user.
2816

2817
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2818
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2819
    @param hooks_results: the results of the multi-node hooks rpc call
2820
    @param feedback_fn: function used send feedback back to the caller
2821
    @param lu_result: previous Exec result
2822
    @return: the new Exec result, based on the previous result
2823
        and hook results
2824

2825
    """
2826
    # We only really run POST phase hooks, only for non-empty groups,
2827
    # and are only interested in their results
2828
    if not self.my_node_names:
2829
      # empty node group
2830
      pass
2831
    elif phase == constants.HOOKS_PHASE_POST:
2832
      # Used to change hooks' output to proper indentation
2833
      feedback_fn("* Hooks Results")
2834
      assert hooks_results, "invalid result from hooks"
2835

    
2836
      for node_name in hooks_results:
2837
        res = hooks_results[node_name]
2838
        msg = res.fail_msg
2839
        test = msg and not res.offline
2840
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2841
                      "Communication failure in hooks execution: %s", msg)
2842
        if res.offline or msg:
2843
          # No need to investigate payload if node is offline or gave
2844
          # an error.
2845
          continue
2846
        for script, hkr, output in res.payload:
2847
          test = hkr == constants.HKR_FAIL
2848
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2849
                        "Script %s failed, output:", script)
2850
          if test:
2851
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2852
            feedback_fn("%s" % output)
2853
            lu_result = False
2854

    
2855
    return lu_result
2856

    
2857

    
2858
class LUClusterVerifyDisks(NoHooksLU):
2859
  """Verifies the cluster disks status.
2860

2861
  """
2862
  REQ_BGL = False
2863

    
2864
  def ExpandNames(self):
2865
    self.share_locks = _ShareAll()
2866
    self.needed_locks = {
2867
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2868
      }
2869

    
2870
  def Exec(self, feedback_fn):
2871
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2872

    
2873
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2874
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2875
                           for group in group_names])