Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 1d4a4b26

History | View | Annotate | Download (103.9 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.name,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, master_params.name)
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
170
                                                     master_params, ems)
171
    if result.fail_msg:
172
      self.LogWarning("Error disabling the master IP address: %s",
173
                      result.fail_msg)
174

    
175
    return master_params.name
176

    
177

    
178
class LUClusterPostInit(LogicalUnit):
179
  """Logical unit for running hooks after cluster initialization.
180

181
  """
182
  HPATH = "cluster-init"
183
  HTYPE = constants.HTYPE_CLUSTER
184

    
185
  def BuildHooksEnv(self):
186
    """Build hooks env.
187

188
    """
189
    return {
190
      "OP_TARGET": self.cfg.GetClusterName(),
191
      }
192

    
193
  def BuildHooksNodes(self):
194
    """Build hooks nodes.
195

196
    """
197
    return ([], [self.cfg.GetMasterNode()])
198

    
199
  def Exec(self, feedback_fn):
200
    """Nothing to do.
201

202
    """
203
    return True
204

    
205

    
206
class ClusterQuery(QueryBase):
207
  FIELDS = query.CLUSTER_FIELDS
208

    
209
  #: Do not sort (there is only one item)
210
  SORT_FIELD = None
211

    
212
  def ExpandNames(self, lu):
213
    lu.needed_locks = {}
214

    
215
    # The following variables interact with _QueryBase._GetNames
216
    self.wanted = locking.ALL_SET
217
    self.do_locking = self.use_locking
218

    
219
    if self.do_locking:
220
      raise errors.OpPrereqError("Can not use locking for cluster queries",
221
                                 errors.ECODE_INVAL)
222

    
223
  def DeclareLocks(self, lu, level):
224
    pass
225

    
226
  def _GetQueryData(self, lu):
227
    """Computes the list of nodes and their attributes.
228

229
    """
230
    # Locking is not used
231
    assert not (compat.any(lu.glm.is_owned(level)
232
                           for level in locking.LEVELS
233
                           if level != locking.LEVEL_CLUSTER) or
234
                self.do_locking or self.use_locking)
235

    
236
    if query.CQ_CONFIG in self.requested_data:
237
      cluster = lu.cfg.GetClusterInfo()
238
    else:
239
      cluster = NotImplemented
240

    
241
    if query.CQ_QUEUE_DRAINED in self.requested_data:
242
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243
    else:
244
      drain_flag = NotImplemented
245

    
246
    if query.CQ_WATCHER_PAUSE in self.requested_data:
247
      master_name = lu.cfg.GetMasterNode()
248

    
249
      result = lu.rpc.call_get_watcher_pause(master_name)
250
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
251
                   master_name)
252

    
253
      watcher_pause = result.payload
254
    else:
255
      watcher_pause = NotImplemented
256

    
257
    return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
258

    
259

    
260
class LUClusterQuery(NoHooksLU):
261
  """Query cluster configuration.
262

263
  """
264
  REQ_BGL = False
265

    
266
  def ExpandNames(self):
267
    self.needed_locks = {}
268

    
269
  def Exec(self, feedback_fn):
270
    """Return cluster config.
271

272
    """
273
    cluster = self.cfg.GetClusterInfo()
274
    os_hvp = {}
275

    
276
    # Filter just for enabled hypervisors
277
    for os_name, hv_dict in cluster.os_hvp.items():
278
      os_hvp[os_name] = {}
279
      for hv_name, hv_params in hv_dict.items():
280
        if hv_name in cluster.enabled_hypervisors:
281
          os_hvp[os_name][hv_name] = hv_params
282

    
283
    # Convert ip_family to ip_version
284
    primary_ip_version = constants.IP4_VERSION
285
    if cluster.primary_ip_family == netutils.IP6Address.family:
286
      primary_ip_version = constants.IP6_VERSION
287

    
288
    result = {
289
      "software_version": constants.RELEASE_VERSION,
290
      "protocol_version": constants.PROTOCOL_VERSION,
291
      "config_version": constants.CONFIG_VERSION,
292
      "os_api_version": max(constants.OS_API_VERSIONS),
293
      "export_version": constants.EXPORT_VERSION,
294
      "architecture": runtime.GetArchInfo(),
295
      "name": cluster.cluster_name,
296
      "master": cluster.master_node,
297
      "default_hypervisor": cluster.primary_hypervisor,
298
      "enabled_hypervisors": cluster.enabled_hypervisors,
299
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
300
                        for hypervisor_name in cluster.enabled_hypervisors]),
301
      "os_hvp": os_hvp,
302
      "beparams": cluster.beparams,
303
      "osparams": cluster.osparams,
304
      "ipolicy": cluster.ipolicy,
305
      "nicparams": cluster.nicparams,
306
      "ndparams": cluster.ndparams,
307
      "diskparams": cluster.diskparams,
308
      "candidate_pool_size": cluster.candidate_pool_size,
309
      "master_netdev": cluster.master_netdev,
310
      "master_netmask": cluster.master_netmask,
311
      "use_external_mip_script": cluster.use_external_mip_script,
312
      "volume_group_name": cluster.volume_group_name,
313
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
314
      "file_storage_dir": cluster.file_storage_dir,
315
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
316
      "maintain_node_health": cluster.maintain_node_health,
317
      "ctime": cluster.ctime,
318
      "mtime": cluster.mtime,
319
      "uuid": cluster.uuid,
320
      "tags": list(cluster.GetTags()),
321
      "uid_pool": cluster.uid_pool,
322
      "default_iallocator": cluster.default_iallocator,
323
      "reserved_lvs": cluster.reserved_lvs,
324
      "primary_ip_version": primary_ip_version,
325
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
326
      "hidden_os": cluster.hidden_os,
327
      "blacklisted_os": cluster.blacklisted_os,
328
      }
329

    
330
    return result
331

    
332

    
333
class LUClusterRedistConf(NoHooksLU):
334
  """Force the redistribution of cluster configuration.
335

336
  This is a very simple LU.
337

338
  """
339
  REQ_BGL = False
340

    
341
  def ExpandNames(self):
342
    self.needed_locks = {
343
      locking.LEVEL_NODE: locking.ALL_SET,
344
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
345
    }
346
    self.share_locks = ShareAll()
347

    
348
  def Exec(self, feedback_fn):
349
    """Redistribute the configuration.
350

351
    """
352
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
353
    RedistributeAncillaryFiles(self)
354

    
355

    
356
class LUClusterRename(LogicalUnit):
357
  """Rename the cluster.
358

359
  """
360
  HPATH = "cluster-rename"
361
  HTYPE = constants.HTYPE_CLUSTER
362

    
363
  def BuildHooksEnv(self):
364
    """Build hooks env.
365

366
    """
367
    return {
368
      "OP_TARGET": self.cfg.GetClusterName(),
369
      "NEW_NAME": self.op.name,
370
      }
371

    
372
  def BuildHooksNodes(self):
373
    """Build hooks nodes.
374

375
    """
376
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
377

    
378
  def CheckPrereq(self):
379
    """Verify that the passed name is a valid one.
380

381
    """
382
    hostname = netutils.GetHostname(name=self.op.name,
383
                                    family=self.cfg.GetPrimaryIPFamily())
384

    
385
    new_name = hostname.name
386
    self.ip = new_ip = hostname.ip
387
    old_name = self.cfg.GetClusterName()
388
    old_ip = self.cfg.GetMasterIP()
389
    if new_name == old_name and new_ip == old_ip:
390
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
391
                                 " cluster has changed",
392
                                 errors.ECODE_INVAL)
393
    if new_ip != old_ip:
394
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
395
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
396
                                   " reachable on the network" %
397
                                   new_ip, errors.ECODE_NOTUNIQUE)
398

    
399
    self.op.name = new_name
400

    
401
  def Exec(self, feedback_fn):
402
    """Rename the cluster.
403

404
    """
405
    clustername = self.op.name
406
    new_ip = self.ip
407

    
408
    # shutdown the master IP
409
    master_params = self.cfg.GetMasterNetworkParameters()
410
    ems = self.cfg.GetUseExternalMipScript()
411
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
412
                                                     master_params, ems)
413
    result.Raise("Could not disable the master role")
414

    
415
    try:
416
      cluster = self.cfg.GetClusterInfo()
417
      cluster.cluster_name = clustername
418
      cluster.master_ip = new_ip
419
      self.cfg.Update(cluster, feedback_fn)
420

    
421
      # update the known hosts file
422
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
423
      node_list = self.cfg.GetOnlineNodeList()
424
      try:
425
        node_list.remove(master_params.name)
426
      except ValueError:
427
        pass
428
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
429
    finally:
430
      master_params.ip = new_ip
431
      result = self.rpc.call_node_activate_master_ip(master_params.name,
432
                                                     master_params, ems)
433
      msg = result.fail_msg
434
      if msg:
435
        self.LogWarning("Could not re-enable the master role on"
436
                        " the master, please restart manually: %s", msg)
437

    
438
    return clustername
439

    
440

    
441
class LUClusterRepairDiskSizes(NoHooksLU):
442
  """Verifies the cluster disks sizes.
443

444
  """
445
  REQ_BGL = False
446

    
447
  def ExpandNames(self):
448
    if self.op.instances:
449
      self.wanted_names = GetWantedInstances(self, self.op.instances)
450
      # Not getting the node allocation lock as only a specific set of
451
      # instances (and their nodes) is going to be acquired
452
      self.needed_locks = {
453
        locking.LEVEL_NODE_RES: [],
454
        locking.LEVEL_INSTANCE: self.wanted_names,
455
        }
456
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
457
    else:
458
      self.wanted_names = None
459
      self.needed_locks = {
460
        locking.LEVEL_NODE_RES: locking.ALL_SET,
461
        locking.LEVEL_INSTANCE: locking.ALL_SET,
462

    
463
        # This opcode is acquires the node locks for all instances
464
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
465
        }
466

    
467
    self.share_locks = {
468
      locking.LEVEL_NODE_RES: 1,
469
      locking.LEVEL_INSTANCE: 0,
470
      locking.LEVEL_NODE_ALLOC: 1,
471
      }
472

    
473
  def DeclareLocks(self, level):
474
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
475
      self._LockInstancesNodes(primary_only=True, level=level)
476

    
477
  def CheckPrereq(self):
478
    """Check prerequisites.
479

480
    This only checks the optional instance list against the existing names.
481

482
    """
483
    if self.wanted_names is None:
484
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
485

    
486
    self.wanted_instances = \
487
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
488

    
489
  def _EnsureChildSizes(self, disk):
490
    """Ensure children of the disk have the needed disk size.
491

492
    This is valid mainly for DRBD8 and fixes an issue where the
493
    children have smaller disk size.
494

495
    @param disk: an L{ganeti.objects.Disk} object
496

497
    """
498
    if disk.dev_type == constants.LD_DRBD8:
499
      assert disk.children, "Empty children for DRBD8?"
500
      fchild = disk.children[0]
501
      mismatch = fchild.size < disk.size
502
      if mismatch:
503
        self.LogInfo("Child disk has size %d, parent %d, fixing",
504
                     fchild.size, disk.size)
505
        fchild.size = disk.size
506

    
507
      # and we recurse on this child only, not on the metadev
508
      return self._EnsureChildSizes(fchild) or mismatch
509
    else:
510
      return False
511

    
512
  def Exec(self, feedback_fn):
513
    """Verify the size of cluster disks.
514

515
    """
516
    # TODO: check child disks too
517
    # TODO: check differences in size between primary/secondary nodes
518
    per_node_disks = {}
519
    for instance in self.wanted_instances:
520
      pnode = instance.primary_node
521
      if pnode not in per_node_disks:
522
        per_node_disks[pnode] = []
523
      for idx, disk in enumerate(instance.disks):
524
        per_node_disks[pnode].append((instance, idx, disk))
525

    
526
    assert not (frozenset(per_node_disks.keys()) -
527
                self.owned_locks(locking.LEVEL_NODE_RES)), \
528
      "Not owning correct locks"
529
    assert not self.owned_locks(locking.LEVEL_NODE)
530

    
531
    changed = []
532
    for node, dskl in per_node_disks.items():
533
      newl = [v[2].Copy() for v in dskl]
534
      for dsk in newl:
535
        self.cfg.SetDiskID(dsk, node)
536
      result = self.rpc.call_blockdev_getsize(node, newl)
537
      if result.fail_msg:
538
        self.LogWarning("Failure in blockdev_getsize call to node"
539
                        " %s, ignoring", node)
540
        continue
541
      if len(result.payload) != len(dskl):
542
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
543
                        " result.payload=%s", node, len(dskl), result.payload)
544
        self.LogWarning("Invalid result from node %s, ignoring node results",
545
                        node)
546
        continue
547
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
548
        if size is None:
549
          self.LogWarning("Disk %d of instance %s did not return size"
550
                          " information, ignoring", idx, instance.name)
551
          continue
552
        if not isinstance(size, (int, long)):
553
          self.LogWarning("Disk %d of instance %s did not return valid"
554
                          " size information, ignoring", idx, instance.name)
555
          continue
556
        size = size >> 20
557
        if size != disk.size:
558
          self.LogInfo("Disk %d of instance %s has mismatched size,"
559
                       " correcting: recorded %d, actual %d", idx,
560
                       instance.name, disk.size, size)
561
          disk.size = size
562
          self.cfg.Update(instance, feedback_fn)
563
          changed.append((instance.name, idx, size))
564
        if self._EnsureChildSizes(disk):
565
          self.cfg.Update(instance, feedback_fn)
566
          changed.append((instance.name, idx, disk.size))
567
    return changed
568

    
569

    
570
def _ValidateNetmask(cfg, netmask):
571
  """Checks if a netmask is valid.
572

573
  @type cfg: L{config.ConfigWriter}
574
  @param cfg: The cluster configuration
575
  @type netmask: int
576
  @param netmask: the netmask to be verified
577
  @raise errors.OpPrereqError: if the validation fails
578

579
  """
580
  ip_family = cfg.GetPrimaryIPFamily()
581
  try:
582
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
583
  except errors.ProgrammerError:
584
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
585
                               ip_family, errors.ECODE_INVAL)
586
  if not ipcls.ValidateNetmask(netmask):
587
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
588
                               (netmask), errors.ECODE_INVAL)
589

    
590

    
591
class LUClusterSetParams(LogicalUnit):
592
  """Change the parameters of the cluster.
593

594
  """
595
  HPATH = "cluster-modify"
596
  HTYPE = constants.HTYPE_CLUSTER
597
  REQ_BGL = False
598

    
599
  def CheckArguments(self):
600
    """Check parameters
601

602
    """
603
    if self.op.uid_pool:
604
      uidpool.CheckUidPool(self.op.uid_pool)
605

    
606
    if self.op.add_uids:
607
      uidpool.CheckUidPool(self.op.add_uids)
608

    
609
    if self.op.remove_uids:
610
      uidpool.CheckUidPool(self.op.remove_uids)
611

    
612
    if self.op.master_netmask is not None:
613
      _ValidateNetmask(self.cfg, self.op.master_netmask)
614

    
615
    if self.op.diskparams:
616
      for dt_params in self.op.diskparams.values():
617
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
618
      try:
619
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
620
      except errors.OpPrereqError, err:
621
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
622
                                   errors.ECODE_INVAL)
623

    
624
  def ExpandNames(self):
625
    # FIXME: in the future maybe other cluster params won't require checking on
626
    # all nodes to be modified.
627
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
628
    # resource locks the right thing, shouldn't it be the BGL instead?
629
    self.needed_locks = {
630
      locking.LEVEL_NODE: locking.ALL_SET,
631
      locking.LEVEL_INSTANCE: locking.ALL_SET,
632
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
633
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
634
    }
635
    self.share_locks = ShareAll()
636

    
637
  def BuildHooksEnv(self):
638
    """Build hooks env.
639

640
    """
641
    return {
642
      "OP_TARGET": self.cfg.GetClusterName(),
643
      "NEW_VG_NAME": self.op.vg_name,
644
      }
645

    
646
  def BuildHooksNodes(self):
647
    """Build hooks nodes.
648

649
    """
650
    mn = self.cfg.GetMasterNode()
651
    return ([mn], [mn])
652

    
653
  def CheckPrereq(self):
654
    """Check prerequisites.
655

656
    This checks whether the given params don't conflict and
657
    if the given volume group is valid.
658

659
    """
660
    if self.op.vg_name is not None and not self.op.vg_name:
661
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
662
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
663
                                   " instances exist", errors.ECODE_INVAL)
664

    
665
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
666
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
667
        raise errors.OpPrereqError("Cannot disable drbd helper while"
668
                                   " drbd-based instances exist",
669
                                   errors.ECODE_INVAL)
670

    
671
    node_list = self.owned_locks(locking.LEVEL_NODE)
672

    
673
    vm_capable_nodes = [node.name
674
                        for node in self.cfg.GetAllNodesInfo().values()
675
                        if node.name in node_list and node.vm_capable]
676

    
677
    # if vg_name not None, checks given volume group on all nodes
678
    if self.op.vg_name:
679
      vglist = self.rpc.call_vg_list(vm_capable_nodes)
680
      for node in vm_capable_nodes:
681
        msg = vglist[node].fail_msg
682
        if msg:
683
          # ignoring down node
684
          self.LogWarning("Error while gathering data on node %s"
685
                          " (ignoring node): %s", node, msg)
686
          continue
687
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
688
                                              self.op.vg_name,
689
                                              constants.MIN_VG_SIZE)
690
        if vgstatus:
691
          raise errors.OpPrereqError("Error on node '%s': %s" %
692
                                     (node, vgstatus), errors.ECODE_ENVIRON)
693

    
694
    if self.op.drbd_helper:
695
      # checks given drbd helper on all nodes
696
      helpers = self.rpc.call_drbd_helper(node_list)
697
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
698
        if ninfo.offline:
699
          self.LogInfo("Not checking drbd helper on offline node %s", node)
700
          continue
701
        msg = helpers[node].fail_msg
702
        if msg:
703
          raise errors.OpPrereqError("Error checking drbd helper on node"
704
                                     " '%s': %s" % (node, msg),
705
                                     errors.ECODE_ENVIRON)
706
        node_helper = helpers[node].payload
707
        if node_helper != self.op.drbd_helper:
708
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
709
                                     (node, node_helper), errors.ECODE_ENVIRON)
710

    
711
    self.cluster = cluster = self.cfg.GetClusterInfo()
712
    # validate params changes
713
    if self.op.beparams:
714
      objects.UpgradeBeParams(self.op.beparams)
715
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
716
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
717

    
718
    if self.op.ndparams:
719
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
720
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
721

    
722
      # TODO: we need a more general way to handle resetting
723
      # cluster-level parameters to default values
724
      if self.new_ndparams["oob_program"] == "":
725
        self.new_ndparams["oob_program"] = \
726
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
727

    
728
    if self.op.hv_state:
729
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
730
                                           self.cluster.hv_state_static)
731
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
732
                               for hv, values in new_hv_state.items())
733

    
734
    if self.op.disk_state:
735
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
736
                                               self.cluster.disk_state_static)
737
      self.new_disk_state = \
738
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
739
                            for name, values in svalues.items()))
740
             for storage, svalues in new_disk_state.items())
741

    
742
    if self.op.ipolicy:
743
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
744
                                           group_policy=False)
745

    
746
      all_instances = self.cfg.GetAllInstancesInfo().values()
747
      violations = set()
748
      for group in self.cfg.GetAllNodeGroupsInfo().values():
749
        instances = frozenset([inst for inst in all_instances
750
                               if compat.any(node in group.members
751
                                             for node in inst.all_nodes)])
752
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
753
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
754
        new = ComputeNewInstanceViolations(ipol,
755
                                           new_ipolicy, instances, self.cfg)
756
        if new:
757
          violations.update(new)
758

    
759
      if violations:
760
        self.LogWarning("After the ipolicy change the following instances"
761
                        " violate them: %s",
762
                        utils.CommaJoin(utils.NiceSort(violations)))
763

    
764
    if self.op.nicparams:
765
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
766
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
767
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
768
      nic_errors = []
769

    
770
      # check all instances for consistency
771
      for instance in self.cfg.GetAllInstancesInfo().values():
772
        for nic_idx, nic in enumerate(instance.nics):
773
          params_copy = copy.deepcopy(nic.nicparams)
774
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
775

    
776
          # check parameter syntax
777
          try:
778
            objects.NIC.CheckParameterSyntax(params_filled)
779
          except errors.ConfigurationError, err:
780
            nic_errors.append("Instance %s, nic/%d: %s" %
781
                              (instance.name, nic_idx, err))
782

    
783
          # if we're moving instances to routed, check that they have an ip
784
          target_mode = params_filled[constants.NIC_MODE]
785
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
786
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
787
                              " address" % (instance.name, nic_idx))
788
      if nic_errors:
789
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
790
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
791

    
792
    # hypervisor list/parameters
793
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
794
    if self.op.hvparams:
795
      for hv_name, hv_dict in self.op.hvparams.items():
796
        if hv_name not in self.new_hvparams:
797
          self.new_hvparams[hv_name] = hv_dict
798
        else:
799
          self.new_hvparams[hv_name].update(hv_dict)
800

    
801
    # disk template parameters
802
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
803
    if self.op.diskparams:
804
      for dt_name, dt_params in self.op.diskparams.items():
805
        if dt_name not in self.op.diskparams:
806
          self.new_diskparams[dt_name] = dt_params
807
        else:
808
          self.new_diskparams[dt_name].update(dt_params)
809

    
810
    # os hypervisor parameters
811
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
812
    if self.op.os_hvp:
813
      for os_name, hvs in self.op.os_hvp.items():
814
        if os_name not in self.new_os_hvp:
815
          self.new_os_hvp[os_name] = hvs
816
        else:
817
          for hv_name, hv_dict in hvs.items():
818
            if hv_dict is None:
819
              # Delete if it exists
820
              self.new_os_hvp[os_name].pop(hv_name, None)
821
            elif hv_name not in self.new_os_hvp[os_name]:
822
              self.new_os_hvp[os_name][hv_name] = hv_dict
823
            else:
824
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
825

    
826
    # os parameters
827
    self.new_osp = objects.FillDict(cluster.osparams, {})
828
    if self.op.osparams:
829
      for os_name, osp in self.op.osparams.items():
830
        if os_name not in self.new_osp:
831
          self.new_osp[os_name] = {}
832

    
833
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
834
                                                 use_none=True)
835

    
836
        if not self.new_osp[os_name]:
837
          # we removed all parameters
838
          del self.new_osp[os_name]
839
        else:
840
          # check the parameter validity (remote check)
841
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
842
                        os_name, self.new_osp[os_name])
843

    
844
    # changes to the hypervisor list
845
    if self.op.enabled_hypervisors is not None:
846
      self.hv_list = self.op.enabled_hypervisors
847
      for hv in self.hv_list:
848
        # if the hypervisor doesn't already exist in the cluster
849
        # hvparams, we initialize it to empty, and then (in both
850
        # cases) we make sure to fill the defaults, as we might not
851
        # have a complete defaults list if the hypervisor wasn't
852
        # enabled before
853
        if hv not in new_hvp:
854
          new_hvp[hv] = {}
855
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
856
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
857
    else:
858
      self.hv_list = cluster.enabled_hypervisors
859

    
860
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
861
      # either the enabled list has changed, or the parameters have, validate
862
      for hv_name, hv_params in self.new_hvparams.items():
863
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
864
            (self.op.enabled_hypervisors and
865
             hv_name in self.op.enabled_hypervisors)):
866
          # either this is a new hypervisor, or its parameters have changed
867
          hv_class = hypervisor.GetHypervisorClass(hv_name)
868
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
869
          hv_class.CheckParameterSyntax(hv_params)
870
          CheckHVParams(self, node_list, hv_name, hv_params)
871

    
872
    self._CheckDiskTemplateConsistency()
873

    
874
    if self.op.os_hvp:
875
      # no need to check any newly-enabled hypervisors, since the
876
      # defaults have already been checked in the above code-block
877
      for os_name, os_hvp in self.new_os_hvp.items():
878
        for hv_name, hv_params in os_hvp.items():
879
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
880
          # we need to fill in the new os_hvp on top of the actual hv_p
881
          cluster_defaults = self.new_hvparams.get(hv_name, {})
882
          new_osp = objects.FillDict(cluster_defaults, hv_params)
883
          hv_class = hypervisor.GetHypervisorClass(hv_name)
884
          hv_class.CheckParameterSyntax(new_osp)
885
          CheckHVParams(self, node_list, hv_name, new_osp)
886

    
887
    if self.op.default_iallocator:
888
      alloc_script = utils.FindFile(self.op.default_iallocator,
889
                                    constants.IALLOCATOR_SEARCH_PATH,
890
                                    os.path.isfile)
891
      if alloc_script is None:
892
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
893
                                   " specified" % self.op.default_iallocator,
894
                                   errors.ECODE_INVAL)
895

    
896
  def _CheckDiskTemplateConsistency(self):
897
    """Check whether the disk templates that are going to be disabled
898
       are still in use by some instances.
899

900
    """
901
    if self.op.enabled_disk_templates:
902
      cluster = self.cfg.GetClusterInfo()
903
      instances = self.cfg.GetAllInstancesInfo()
904

    
905
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
906
        - set(self.op.enabled_disk_templates)
907
      for instance in instances.itervalues():
908
        if instance.disk_template in disk_templates_to_remove:
909
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
910
                                     " because instance '%s' is using it." %
911
                                     (instance.disk_template, instance.name))
912

    
913
  def Exec(self, feedback_fn):
914
    """Change the parameters of the cluster.
915

916
    """
917
    if self.op.vg_name is not None:
918
      new_volume = self.op.vg_name
919
      if not new_volume:
920
        new_volume = None
921
      if new_volume != self.cfg.GetVGName():
922
        self.cfg.SetVGName(new_volume)
923
      else:
924
        feedback_fn("Cluster LVM configuration already in desired"
925
                    " state, not changing")
926
    if self.op.drbd_helper is not None:
927
      new_helper = self.op.drbd_helper
928
      if not new_helper:
929
        new_helper = None
930
      if new_helper != self.cfg.GetDRBDHelper():
931
        self.cfg.SetDRBDHelper(new_helper)
932
      else:
933
        feedback_fn("Cluster DRBD helper already in desired state,"
934
                    " not changing")
935
    if self.op.hvparams:
936
      self.cluster.hvparams = self.new_hvparams
937
    if self.op.os_hvp:
938
      self.cluster.os_hvp = self.new_os_hvp
939
    if self.op.enabled_hypervisors is not None:
940
      self.cluster.hvparams = self.new_hvparams
941
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
942
    if self.op.enabled_disk_templates:
943
      self.cluster.enabled_disk_templates = \
944
        list(set(self.op.enabled_disk_templates))
945
    if self.op.beparams:
946
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
947
    if self.op.nicparams:
948
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
949
    if self.op.ipolicy:
950
      self.cluster.ipolicy = self.new_ipolicy
951
    if self.op.osparams:
952
      self.cluster.osparams = self.new_osp
953
    if self.op.ndparams:
954
      self.cluster.ndparams = self.new_ndparams
955
    if self.op.diskparams:
956
      self.cluster.diskparams = self.new_diskparams
957
    if self.op.hv_state:
958
      self.cluster.hv_state_static = self.new_hv_state
959
    if self.op.disk_state:
960
      self.cluster.disk_state_static = self.new_disk_state
961

    
962
    if self.op.candidate_pool_size is not None:
963
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
964
      # we need to update the pool size here, otherwise the save will fail
965
      AdjustCandidatePool(self, [])
966

    
967
    if self.op.maintain_node_health is not None:
968
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
969
        feedback_fn("Note: CONFD was disabled at build time, node health"
970
                    " maintenance is not useful (still enabling it)")
971
      self.cluster.maintain_node_health = self.op.maintain_node_health
972

    
973
    if self.op.prealloc_wipe_disks is not None:
974
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
975

    
976
    if self.op.add_uids is not None:
977
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
978

    
979
    if self.op.remove_uids is not None:
980
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
981

    
982
    if self.op.uid_pool is not None:
983
      self.cluster.uid_pool = self.op.uid_pool
984

    
985
    if self.op.default_iallocator is not None:
986
      self.cluster.default_iallocator = self.op.default_iallocator
987

    
988
    if self.op.reserved_lvs is not None:
989
      self.cluster.reserved_lvs = self.op.reserved_lvs
990

    
991
    if self.op.use_external_mip_script is not None:
992
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
993

    
994
    def helper_os(aname, mods, desc):
995
      desc += " OS list"
996
      lst = getattr(self.cluster, aname)
997
      for key, val in mods:
998
        if key == constants.DDM_ADD:
999
          if val in lst:
1000
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1001
          else:
1002
            lst.append(val)
1003
        elif key == constants.DDM_REMOVE:
1004
          if val in lst:
1005
            lst.remove(val)
1006
          else:
1007
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1008
        else:
1009
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1010

    
1011
    if self.op.hidden_os:
1012
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1013

    
1014
    if self.op.blacklisted_os:
1015
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1016

    
1017
    if self.op.master_netdev:
1018
      master_params = self.cfg.GetMasterNetworkParameters()
1019
      ems = self.cfg.GetUseExternalMipScript()
1020
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1021
                  self.cluster.master_netdev)
1022
      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1023
                                                       master_params, ems)
1024
      result.Raise("Could not disable the master ip")
1025
      feedback_fn("Changing master_netdev from %s to %s" %
1026
                  (master_params.netdev, self.op.master_netdev))
1027
      self.cluster.master_netdev = self.op.master_netdev
1028

    
1029
    if self.op.master_netmask:
1030
      master_params = self.cfg.GetMasterNetworkParameters()
1031
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1032
      result = self.rpc.call_node_change_master_netmask(master_params.name,
1033
                                                        master_params.netmask,
1034
                                                        self.op.master_netmask,
1035
                                                        master_params.ip,
1036
                                                        master_params.netdev)
1037
      if result.fail_msg:
1038
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
1039
        feedback_fn(msg)
1040

    
1041
      self.cluster.master_netmask = self.op.master_netmask
1042

    
1043
    self.cfg.Update(self.cluster, feedback_fn)
1044

    
1045
    if self.op.master_netdev:
1046
      master_params = self.cfg.GetMasterNetworkParameters()
1047
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1048
                  self.op.master_netdev)
1049
      ems = self.cfg.GetUseExternalMipScript()
1050
      result = self.rpc.call_node_activate_master_ip(master_params.name,
1051
                                                     master_params, ems)
1052
      if result.fail_msg:
1053
        self.LogWarning("Could not re-enable the master ip on"
1054
                        " the master, please restart manually: %s",
1055
                        result.fail_msg)
1056

    
1057

    
1058
class LUClusterVerify(NoHooksLU):
1059
  """Submits all jobs necessary to verify the cluster.
1060

1061
  """
1062
  REQ_BGL = False
1063

    
1064
  def ExpandNames(self):
1065
    self.needed_locks = {}
1066

    
1067
  def Exec(self, feedback_fn):
1068
    jobs = []
1069

    
1070
    if self.op.group_name:
1071
      groups = [self.op.group_name]
1072
      depends_fn = lambda: None
1073
    else:
1074
      groups = self.cfg.GetNodeGroupList()
1075

    
1076
      # Verify global configuration
1077
      jobs.append([
1078
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1079
        ])
1080

    
1081
      # Always depend on global verification
1082
      depends_fn = lambda: [(-len(jobs), [])]
1083

    
1084
    jobs.extend(
1085
      [opcodes.OpClusterVerifyGroup(group_name=group,
1086
                                    ignore_errors=self.op.ignore_errors,
1087
                                    depends=depends_fn())]
1088
      for group in groups)
1089

    
1090
    # Fix up all parameters
1091
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1092
      op.debug_simulate_errors = self.op.debug_simulate_errors
1093
      op.verbose = self.op.verbose
1094
      op.error_codes = self.op.error_codes
1095
      try:
1096
        op.skip_checks = self.op.skip_checks
1097
      except AttributeError:
1098
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1099

    
1100
    return ResultWithJobs(jobs)
1101

    
1102

    
1103
class _VerifyErrors(object):
1104
  """Mix-in for cluster/group verify LUs.
1105

1106
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1107
  self.op and self._feedback_fn to be available.)
1108

1109
  """
1110

    
1111
  ETYPE_FIELD = "code"
1112
  ETYPE_ERROR = "ERROR"
1113
  ETYPE_WARNING = "WARNING"
1114

    
1115
  def _Error(self, ecode, item, msg, *args, **kwargs):
1116
    """Format an error message.
1117

1118
    Based on the opcode's error_codes parameter, either format a
1119
    parseable error code, or a simpler error string.
1120

1121
    This must be called only from Exec and functions called from Exec.
1122

1123
    """
1124
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1125
    itype, etxt, _ = ecode
1126
    # If the error code is in the list of ignored errors, demote the error to a
1127
    # warning
1128
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1129
      ltype = self.ETYPE_WARNING
1130
    # first complete the msg
1131
    if args:
1132
      msg = msg % args
1133
    # then format the whole message
1134
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1135
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1136
    else:
1137
      if item:
1138
        item = " " + item
1139
      else:
1140
        item = ""
1141
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1142
    # and finally report it via the feedback_fn
1143
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1144
    # do not mark the operation as failed for WARN cases only
1145
    if ltype == self.ETYPE_ERROR:
1146
      self.bad = True
1147

    
1148
  def _ErrorIf(self, cond, *args, **kwargs):
1149
    """Log an error message if the passed condition is True.
1150

1151
    """
1152
    if (bool(cond)
1153
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1154
      self._Error(*args, **kwargs)
1155

    
1156

    
1157
def _VerifyCertificate(filename):
1158
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1159

1160
  @type filename: string
1161
  @param filename: Path to PEM file
1162

1163
  """
1164
  try:
1165
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1166
                                           utils.ReadFile(filename))
1167
  except Exception, err: # pylint: disable=W0703
1168
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1169
            "Failed to load X509 certificate %s: %s" % (filename, err))
1170

    
1171
  (errcode, msg) = \
1172
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1173
                                constants.SSL_CERT_EXPIRATION_ERROR)
1174

    
1175
  if msg:
1176
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1177
  else:
1178
    fnamemsg = None
1179

    
1180
  if errcode is None:
1181
    return (None, fnamemsg)
1182
  elif errcode == utils.CERT_WARNING:
1183
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1184
  elif errcode == utils.CERT_ERROR:
1185
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1186

    
1187
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1188

    
1189

    
1190
def _GetAllHypervisorParameters(cluster, instances):
1191
  """Compute the set of all hypervisor parameters.
1192

1193
  @type cluster: L{objects.Cluster}
1194
  @param cluster: the cluster object
1195
  @param instances: list of L{objects.Instance}
1196
  @param instances: additional instances from which to obtain parameters
1197
  @rtype: list of (origin, hypervisor, parameters)
1198
  @return: a list with all parameters found, indicating the hypervisor they
1199
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1200

1201
  """
1202
  hvp_data = []
1203

    
1204
  for hv_name in cluster.enabled_hypervisors:
1205
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1206

    
1207
  for os_name, os_hvp in cluster.os_hvp.items():
1208
    for hv_name, hv_params in os_hvp.items():
1209
      if hv_params:
1210
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1211
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1212

    
1213
  # TODO: collapse identical parameter values in a single one
1214
  for instance in instances:
1215
    if instance.hvparams:
1216
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1217
                       cluster.FillHV(instance)))
1218

    
1219
  return hvp_data
1220

    
1221

    
1222
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1223
  """Verifies the cluster config.
1224

1225
  """
1226
  REQ_BGL = False
1227

    
1228
  def _VerifyHVP(self, hvp_data):
1229
    """Verifies locally the syntax of the hypervisor parameters.
1230

1231
    """
1232
    for item, hv_name, hv_params in hvp_data:
1233
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1234
             (item, hv_name))
1235
      try:
1236
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1237
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1238
        hv_class.CheckParameterSyntax(hv_params)
1239
      except errors.GenericError, err:
1240
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1241

    
1242
  def ExpandNames(self):
1243
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1244
    self.share_locks = ShareAll()
1245

    
1246
  def CheckPrereq(self):
1247
    """Check prerequisites.
1248

1249
    """
1250
    # Retrieve all information
1251
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1252
    self.all_node_info = self.cfg.GetAllNodesInfo()
1253
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1254

    
1255
  def Exec(self, feedback_fn):
1256
    """Verify integrity of cluster, performing various test on nodes.
1257

1258
    """
1259
    self.bad = False
1260
    self._feedback_fn = feedback_fn
1261

    
1262
    feedback_fn("* Verifying cluster config")
1263

    
1264
    for msg in self.cfg.VerifyConfig():
1265
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1266

    
1267
    feedback_fn("* Verifying cluster certificate files")
1268

    
1269
    for cert_filename in pathutils.ALL_CERT_FILES:
1270
      (errcode, msg) = _VerifyCertificate(cert_filename)
1271
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1272

    
1273
    feedback_fn("* Verifying hypervisor parameters")
1274

    
1275
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1276
                                                self.all_inst_info.values()))
1277

    
1278
    feedback_fn("* Verifying all nodes belong to an existing group")
1279

    
1280
    # We do this verification here because, should this bogus circumstance
1281
    # occur, it would never be caught by VerifyGroup, which only acts on
1282
    # nodes/instances reachable from existing node groups.
1283

    
1284
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1285
                         if node.group not in self.all_group_info)
1286

    
1287
    dangling_instances = {}
1288
    no_node_instances = []
1289

    
1290
    for inst in self.all_inst_info.values():
1291
      if inst.primary_node in dangling_nodes:
1292
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1293
      elif inst.primary_node not in self.all_node_info:
1294
        no_node_instances.append(inst.name)
1295

    
1296
    pretty_dangling = [
1297
        "%s (%s)" %
1298
        (node.name,
1299
         utils.CommaJoin(dangling_instances.get(node.name,
1300
                                                ["no instances"])))
1301
        for node in dangling_nodes]
1302

    
1303
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1304
                  None,
1305
                  "the following nodes (and their instances) belong to a non"
1306
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1307

    
1308
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1309
                  None,
1310
                  "the following instances have a non-existing primary-node:"
1311
                  " %s", utils.CommaJoin(no_node_instances))
1312

    
1313
    return not self.bad
1314

    
1315

    
1316
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1317
  """Verifies the status of a node group.
1318

1319
  """
1320
  HPATH = "cluster-verify"
1321
  HTYPE = constants.HTYPE_CLUSTER
1322
  REQ_BGL = False
1323

    
1324
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1325

    
1326
  class NodeImage(object):
1327
    """A class representing the logical and physical status of a node.
1328

1329
    @type name: string
1330
    @ivar name: the node name to which this object refers
1331
    @ivar volumes: a structure as returned from
1332
        L{ganeti.backend.GetVolumeList} (runtime)
1333
    @ivar instances: a list of running instances (runtime)
1334
    @ivar pinst: list of configured primary instances (config)
1335
    @ivar sinst: list of configured secondary instances (config)
1336
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1337
        instances for which this node is secondary (config)
1338
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1339
    @ivar dfree: free disk, as reported by the node (runtime)
1340
    @ivar offline: the offline status (config)
1341
    @type rpc_fail: boolean
1342
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1343
        not whether the individual keys were correct) (runtime)
1344
    @type lvm_fail: boolean
1345
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1346
    @type hyp_fail: boolean
1347
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1348
    @type ghost: boolean
1349
    @ivar ghost: whether this is a known node or not (config)
1350
    @type os_fail: boolean
1351
    @ivar os_fail: whether the RPC call didn't return valid OS data
1352
    @type oslist: list
1353
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1354
    @type vm_capable: boolean
1355
    @ivar vm_capable: whether the node can host instances
1356
    @type pv_min: float
1357
    @ivar pv_min: size in MiB of the smallest PVs
1358
    @type pv_max: float
1359
    @ivar pv_max: size in MiB of the biggest PVs
1360

1361
    """
1362
    def __init__(self, offline=False, name=None, vm_capable=True):
1363
      self.name = name
1364
      self.volumes = {}
1365
      self.instances = []
1366
      self.pinst = []
1367
      self.sinst = []
1368
      self.sbp = {}
1369
      self.mfree = 0
1370
      self.dfree = 0
1371
      self.offline = offline
1372
      self.vm_capable = vm_capable
1373
      self.rpc_fail = False
1374
      self.lvm_fail = False
1375
      self.hyp_fail = False
1376
      self.ghost = False
1377
      self.os_fail = False
1378
      self.oslist = {}
1379
      self.pv_min = None
1380
      self.pv_max = None
1381

    
1382
  def ExpandNames(self):
1383
    # This raises errors.OpPrereqError on its own:
1384
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1385

    
1386
    # Get instances in node group; this is unsafe and needs verification later
1387
    inst_names = \
1388
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1389

    
1390
    self.needed_locks = {
1391
      locking.LEVEL_INSTANCE: inst_names,
1392
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1393
      locking.LEVEL_NODE: [],
1394

    
1395
      # This opcode is run by watcher every five minutes and acquires all nodes
1396
      # for a group. It doesn't run for a long time, so it's better to acquire
1397
      # the node allocation lock as well.
1398
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1399
      }
1400

    
1401
    self.share_locks = ShareAll()
1402

    
1403
  def DeclareLocks(self, level):
1404
    if level == locking.LEVEL_NODE:
1405
      # Get members of node group; this is unsafe and needs verification later
1406
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1407

    
1408
      all_inst_info = self.cfg.GetAllInstancesInfo()
1409

    
1410
      # In Exec(), we warn about mirrored instances that have primary and
1411
      # secondary living in separate node groups. To fully verify that
1412
      # volumes for these instances are healthy, we will need to do an
1413
      # extra call to their secondaries. We ensure here those nodes will
1414
      # be locked.
1415
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1416
        # Important: access only the instances whose lock is owned
1417
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1418
          nodes.update(all_inst_info[inst].secondary_nodes)
1419

    
1420
      self.needed_locks[locking.LEVEL_NODE] = nodes
1421

    
1422
  def CheckPrereq(self):
1423
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1424
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1425

    
1426
    group_nodes = set(self.group_info.members)
1427
    group_instances = \
1428
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1429

    
1430
    unlocked_nodes = \
1431
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1432

    
1433
    unlocked_instances = \
1434
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1435

    
1436
    if unlocked_nodes:
1437
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1438
                                 utils.CommaJoin(unlocked_nodes),
1439
                                 errors.ECODE_STATE)
1440

    
1441
    if unlocked_instances:
1442
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1443
                                 utils.CommaJoin(unlocked_instances),
1444
                                 errors.ECODE_STATE)
1445

    
1446
    self.all_node_info = self.cfg.GetAllNodesInfo()
1447
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1448

    
1449
    self.my_node_names = utils.NiceSort(group_nodes)
1450
    self.my_inst_names = utils.NiceSort(group_instances)
1451

    
1452
    self.my_node_info = dict((name, self.all_node_info[name])
1453
                             for name in self.my_node_names)
1454

    
1455
    self.my_inst_info = dict((name, self.all_inst_info[name])
1456
                             for name in self.my_inst_names)
1457

    
1458
    # We detect here the nodes that will need the extra RPC calls for verifying
1459
    # split LV volumes; they should be locked.
1460
    extra_lv_nodes = set()
1461

    
1462
    for inst in self.my_inst_info.values():
1463
      if inst.disk_template in constants.DTS_INT_MIRROR:
1464
        for nname in inst.all_nodes:
1465
          if self.all_node_info[nname].group != self.group_uuid:
1466
            extra_lv_nodes.add(nname)
1467

    
1468
    unlocked_lv_nodes = \
1469
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1470

    
1471
    if unlocked_lv_nodes:
1472
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1473
                                 utils.CommaJoin(unlocked_lv_nodes),
1474
                                 errors.ECODE_STATE)
1475
    self.extra_lv_nodes = list(extra_lv_nodes)
1476

    
1477
  def _VerifyNode(self, ninfo, nresult):
1478
    """Perform some basic validation on data returned from a node.
1479

1480
      - check the result data structure is well formed and has all the
1481
        mandatory fields
1482
      - check ganeti version
1483

1484
    @type ninfo: L{objects.Node}
1485
    @param ninfo: the node to check
1486
    @param nresult: the results from the node
1487
    @rtype: boolean
1488
    @return: whether overall this call was successful (and we can expect
1489
         reasonable values in the respose)
1490

1491
    """
1492
    node = ninfo.name
1493
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1494

    
1495
    # main result, nresult should be a non-empty dict
1496
    test = not nresult or not isinstance(nresult, dict)
1497
    _ErrorIf(test, constants.CV_ENODERPC, node,
1498
                  "unable to verify node: no data returned")
1499
    if test:
1500
      return False
1501

    
1502
    # compares ganeti version
1503
    local_version = constants.PROTOCOL_VERSION
1504
    remote_version = nresult.get("version", None)
1505
    test = not (remote_version and
1506
                isinstance(remote_version, (list, tuple)) and
1507
                len(remote_version) == 2)
1508
    _ErrorIf(test, constants.CV_ENODERPC, node,
1509
             "connection to node returned invalid data")
1510
    if test:
1511
      return False
1512

    
1513
    test = local_version != remote_version[0]
1514
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1515
             "incompatible protocol versions: master %s,"
1516
             " node %s", local_version, remote_version[0])
1517
    if test:
1518
      return False
1519

    
1520
    # node seems compatible, we can actually try to look into its results
1521

    
1522
    # full package version
1523
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1524
                  constants.CV_ENODEVERSION, node,
1525
                  "software version mismatch: master %s, node %s",
1526
                  constants.RELEASE_VERSION, remote_version[1],
1527
                  code=self.ETYPE_WARNING)
1528

    
1529
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1530
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1531
      for hv_name, hv_result in hyp_result.iteritems():
1532
        test = hv_result is not None
1533
        _ErrorIf(test, constants.CV_ENODEHV, node,
1534
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1535

    
1536
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1537
    if ninfo.vm_capable and isinstance(hvp_result, list):
1538
      for item, hv_name, hv_result in hvp_result:
1539
        _ErrorIf(True, constants.CV_ENODEHV, node,
1540
                 "hypervisor %s parameter verify failure (source %s): %s",
1541
                 hv_name, item, hv_result)
1542

    
1543
    test = nresult.get(constants.NV_NODESETUP,
1544
                       ["Missing NODESETUP results"])
1545
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1546
             "; ".join(test))
1547

    
1548
    return True
1549

    
1550
  def _VerifyNodeTime(self, ninfo, nresult,
1551
                      nvinfo_starttime, nvinfo_endtime):
1552
    """Check the node time.
1553

1554
    @type ninfo: L{objects.Node}
1555
    @param ninfo: the node to check
1556
    @param nresult: the remote results for the node
1557
    @param nvinfo_starttime: the start time of the RPC call
1558
    @param nvinfo_endtime: the end time of the RPC call
1559

1560
    """
1561
    node = ninfo.name
1562
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1563

    
1564
    ntime = nresult.get(constants.NV_TIME, None)
1565
    try:
1566
      ntime_merged = utils.MergeTime(ntime)
1567
    except (ValueError, TypeError):
1568
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1569
      return
1570

    
1571
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1572
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1573
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1574
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1575
    else:
1576
      ntime_diff = None
1577

    
1578
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1579
             "Node time diverges by at least %s from master node time",
1580
             ntime_diff)
1581

    
1582
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1583
    """Check the node LVM results and update info for cross-node checks.
1584

1585
    @type ninfo: L{objects.Node}
1586
    @param ninfo: the node to check
1587
    @param nresult: the remote results for the node
1588
    @param vg_name: the configured VG name
1589
    @type nimg: L{NodeImage}
1590
    @param nimg: node image
1591

1592
    """
1593
    if vg_name is None:
1594
      return
1595

    
1596
    node = ninfo.name
1597
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1598

    
1599
    # checks vg existence and size > 20G
1600
    vglist = nresult.get(constants.NV_VGLIST, None)
1601
    test = not vglist
1602
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1603
    if not test:
1604
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1605
                                            constants.MIN_VG_SIZE)
1606
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1607

    
1608
    # Check PVs
1609
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1610
    for em in errmsgs:
1611
      self._Error(constants.CV_ENODELVM, node, em)
1612
    if pvminmax is not None:
1613
      (nimg.pv_min, nimg.pv_max) = pvminmax
1614

    
1615
  def _VerifyGroupLVM(self, node_image, vg_name):
1616
    """Check cross-node consistency in LVM.
1617

1618
    @type node_image: dict
1619
    @param node_image: info about nodes, mapping from node to names to
1620
      L{NodeImage} objects
1621
    @param vg_name: the configured VG name
1622

1623
    """
1624
    if vg_name is None:
1625
      return
1626

    
1627
    # Only exlcusive storage needs this kind of checks
1628
    if not self._exclusive_storage:
1629
      return
1630

    
1631
    # exclusive_storage wants all PVs to have the same size (approximately),
1632
    # if the smallest and the biggest ones are okay, everything is fine.
1633
    # pv_min is None iff pv_max is None
1634
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1635
    if not vals:
1636
      return
1637
    (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1638
    (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1639
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1640
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1641
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1642
                  " on %s, biggest (%s MB) is on %s",
1643
                  pvmin, minnode, pvmax, maxnode)
1644

    
1645
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1646
    """Check the node bridges.
1647

1648
    @type ninfo: L{objects.Node}
1649
    @param ninfo: the node to check
1650
    @param nresult: the remote results for the node
1651
    @param bridges: the expected list of bridges
1652

1653
    """
1654
    if not bridges:
1655
      return
1656

    
1657
    node = ninfo.name
1658
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1659

    
1660
    missing = nresult.get(constants.NV_BRIDGES, None)
1661
    test = not isinstance(missing, list)
1662
    _ErrorIf(test, constants.CV_ENODENET, node,
1663
             "did not return valid bridge information")
1664
    if not test:
1665
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1666
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1667

    
1668
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1669
    """Check the results of user scripts presence and executability on the node
1670

1671
    @type ninfo: L{objects.Node}
1672
    @param ninfo: the node to check
1673
    @param nresult: the remote results for the node
1674

1675
    """
1676
    node = ninfo.name
1677

    
1678
    test = not constants.NV_USERSCRIPTS in nresult
1679
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1680
                  "did not return user scripts information")
1681

    
1682
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1683
    if not test:
1684
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1685
                    "user scripts not present or not executable: %s" %
1686
                    utils.CommaJoin(sorted(broken_scripts)))
1687

    
1688
  def _VerifyNodeNetwork(self, ninfo, nresult):
1689
    """Check the node network connectivity results.
1690

1691
    @type ninfo: L{objects.Node}
1692
    @param ninfo: the node to check
1693
    @param nresult: the remote results for the node
1694

1695
    """
1696
    node = ninfo.name
1697
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1698

    
1699
    test = constants.NV_NODELIST not in nresult
1700
    _ErrorIf(test, constants.CV_ENODESSH, node,
1701
             "node hasn't returned node ssh connectivity data")
1702
    if not test:
1703
      if nresult[constants.NV_NODELIST]:
1704
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1705
          _ErrorIf(True, constants.CV_ENODESSH, node,
1706
                   "ssh communication with node '%s': %s", a_node, a_msg)
1707

    
1708
    test = constants.NV_NODENETTEST not in nresult
1709
    _ErrorIf(test, constants.CV_ENODENET, node,
1710
             "node hasn't returned node tcp connectivity data")
1711
    if not test:
1712
      if nresult[constants.NV_NODENETTEST]:
1713
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1714
        for anode in nlist:
1715
          _ErrorIf(True, constants.CV_ENODENET, node,
1716
                   "tcp communication with node '%s': %s",
1717
                   anode, nresult[constants.NV_NODENETTEST][anode])
1718

    
1719
    test = constants.NV_MASTERIP not in nresult
1720
    _ErrorIf(test, constants.CV_ENODENET, node,
1721
             "node hasn't returned node master IP reachability data")
1722
    if not test:
1723
      if not nresult[constants.NV_MASTERIP]:
1724
        if node == self.master_node:
1725
          msg = "the master node cannot reach the master IP (not configured?)"
1726
        else:
1727
          msg = "cannot reach the master IP"
1728
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
1729

    
1730
  def _VerifyInstance(self, instance, inst_config, node_image,
1731
                      diskstatus):
1732
    """Verify an instance.
1733

1734
    This function checks to see if the required block devices are
1735
    available on the instance's node, and that the nodes are in the correct
1736
    state.
1737

1738
    """
1739
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1740
    pnode = inst_config.primary_node
1741
    pnode_img = node_image[pnode]
1742
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1743

    
1744
    node_vol_should = {}
1745
    inst_config.MapLVsByNode(node_vol_should)
1746

    
1747
    cluster = self.cfg.GetClusterInfo()
1748
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1749
                                                            self.group_info)
1750
    err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1751
    _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1752
             code=self.ETYPE_WARNING)
1753

    
1754
    for node in node_vol_should:
1755
      n_img = node_image[node]
1756
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1757
        # ignore missing volumes on offline or broken nodes
1758
        continue
1759
      for volume in node_vol_should[node]:
1760
        test = volume not in n_img.volumes
1761
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1762
                 "volume %s missing on node %s", volume, node)
1763

    
1764
    if inst_config.admin_state == constants.ADMINST_UP:
1765
      test = instance not in pnode_img.instances and not pnode_img.offline
1766
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1767
               "instance not running on its primary node %s",
1768
               pnode)
1769
      _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1770
               "instance is marked as running and lives on offline node %s",
1771
               pnode)
1772

    
1773
    diskdata = [(nname, success, status, idx)
1774
                for (nname, disks) in diskstatus.items()
1775
                for idx, (success, status) in enumerate(disks)]
1776

    
1777
    for nname, success, bdev_status, idx in diskdata:
1778
      # the 'ghost node' construction in Exec() ensures that we have a
1779
      # node here
1780
      snode = node_image[nname]
1781
      bad_snode = snode.ghost or snode.offline
1782
      _ErrorIf(inst_config.disks_active and
1783
               not success and not bad_snode,
1784
               constants.CV_EINSTANCEFAULTYDISK, instance,
1785
               "couldn't retrieve status for disk/%s on %s: %s",
1786
               idx, nname, bdev_status)
1787
      _ErrorIf((inst_config.disks_active and
1788
                success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1789
               constants.CV_EINSTANCEFAULTYDISK, instance,
1790
               "disk/%s on %s is faulty", idx, nname)
1791

    
1792
    _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1793
             constants.CV_ENODERPC, pnode, "instance %s, connection to"
1794
             " primary node failed", instance)
1795

    
1796
    _ErrorIf(len(inst_config.secondary_nodes) > 1,
1797
             constants.CV_EINSTANCELAYOUT,
1798
             instance, "instance has multiple secondary nodes: %s",
1799
             utils.CommaJoin(inst_config.secondary_nodes),
1800
             code=self.ETYPE_WARNING)
1801

    
1802
    if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1803
      # Disk template not compatible with exclusive_storage: no instance
1804
      # node should have the flag set
1805
      es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1806
                                                     inst_config.all_nodes)
1807
      es_nodes = [n for (n, es) in es_flags.items()
1808
                  if es]
1809
      _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance,
1810
               "instance has template %s, which is not supported on nodes"
1811
               " that have exclusive storage set: %s",
1812
               inst_config.disk_template, utils.CommaJoin(es_nodes))
1813

    
1814
    if inst_config.disk_template in constants.DTS_INT_MIRROR:
1815
      instance_nodes = utils.NiceSort(inst_config.all_nodes)
1816
      instance_groups = {}
1817

    
1818
      for node in instance_nodes:
1819
        instance_groups.setdefault(self.all_node_info[node].group,
1820
                                   []).append(node)
1821

    
1822
      pretty_list = [
1823
        "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1824
        # Sort so that we always list the primary node first.
1825
        for group, nodes in sorted(instance_groups.items(),
1826
                                   key=lambda (_, nodes): pnode in nodes,
1827
                                   reverse=True)]
1828

    
1829
      self._ErrorIf(len(instance_groups) > 1,
1830
                    constants.CV_EINSTANCESPLITGROUPS,
1831
                    instance, "instance has primary and secondary nodes in"
1832
                    " different groups: %s", utils.CommaJoin(pretty_list),
1833
                    code=self.ETYPE_WARNING)
1834

    
1835
    inst_nodes_offline = []
1836
    for snode in inst_config.secondary_nodes:
1837
      s_img = node_image[snode]
1838
      _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1839
               snode, "instance %s, connection to secondary node failed",
1840
               instance)
1841

    
1842
      if s_img.offline:
1843
        inst_nodes_offline.append(snode)
1844

    
1845
    # warn that the instance lives on offline nodes
1846
    _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1847
             "instance has offline secondary node(s) %s",
1848
             utils.CommaJoin(inst_nodes_offline))
1849
    # ... or ghost/non-vm_capable nodes
1850
    for node in inst_config.all_nodes:
1851
      _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1852
               instance, "instance lives on ghost node %s", node)
1853
      _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1854
               instance, "instance lives on non-vm_capable node %s", node)
1855

    
1856
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1857
    """Verify if there are any unknown volumes in the cluster.
1858

1859
    The .os, .swap and backup volumes are ignored. All other volumes are
1860
    reported as unknown.
1861

1862
    @type reserved: L{ganeti.utils.FieldSet}
1863
    @param reserved: a FieldSet of reserved volume names
1864

1865
    """
1866
    for node, n_img in node_image.items():
1867
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1868
          self.all_node_info[node].group != self.group_uuid):
1869
        # skip non-healthy nodes
1870
        continue
1871
      for volume in n_img.volumes:
1872
        test = ((node not in node_vol_should or
1873
                volume not in node_vol_should[node]) and
1874
                not reserved.Matches(volume))
1875
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1876
                      "volume %s is unknown", volume)
1877

    
1878
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1879
    """Verify N+1 Memory Resilience.
1880

1881
    Check that if one single node dies we can still start all the
1882
    instances it was primary for.
1883

1884
    """
1885
    cluster_info = self.cfg.GetClusterInfo()
1886
    for node, n_img in node_image.items():
1887
      # This code checks that every node which is now listed as
1888
      # secondary has enough memory to host all instances it is
1889
      # supposed to should a single other node in the cluster fail.
1890
      # FIXME: not ready for failover to an arbitrary node
1891
      # FIXME: does not support file-backed instances
1892
      # WARNING: we currently take into account down instances as well
1893
      # as up ones, considering that even if they're down someone
1894
      # might want to start them even in the event of a node failure.
1895
      if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1896
        # we're skipping nodes marked offline and nodes in other groups from
1897
        # the N+1 warning, since most likely we don't have good memory
1898
        # infromation from them; we already list instances living on such
1899
        # nodes, and that's enough warning
1900
        continue
1901
      #TODO(dynmem): also consider ballooning out other instances
1902
      for prinode, instances in n_img.sbp.items():
1903
        needed_mem = 0
1904
        for instance in instances:
1905
          bep = cluster_info.FillBE(instance_cfg[instance])
1906
          if bep[constants.BE_AUTO_BALANCE]:
1907
            needed_mem += bep[constants.BE_MINMEM]
1908
        test = n_img.mfree < needed_mem
1909
        self._ErrorIf(test, constants.CV_ENODEN1, node,
1910
                      "not enough memory to accomodate instance failovers"
1911
                      " should node %s fail (%dMiB needed, %dMiB available)",
1912
                      prinode, needed_mem, n_img.mfree)
1913

    
1914
  @classmethod
1915
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1916
                   (files_all, files_opt, files_mc, files_vm)):
1917
    """Verifies file checksums collected from all nodes.
1918

1919
    @param errorif: Callback for reporting errors
1920
    @param nodeinfo: List of L{objects.Node} objects
1921
    @param master_node: Name of master node
1922
    @param all_nvinfo: RPC results
1923

1924
    """
1925
    # Define functions determining which nodes to consider for a file
1926
    files2nodefn = [
1927
      (files_all, None),
1928
      (files_mc, lambda node: (node.master_candidate or
1929
                               node.name == master_node)),
1930
      (files_vm, lambda node: node.vm_capable),
1931
      ]
1932

    
1933
    # Build mapping from filename to list of nodes which should have the file
1934
    nodefiles = {}
1935
    for (files, fn) in files2nodefn:
1936
      if fn is None:
1937
        filenodes = nodeinfo
1938
      else:
1939
        filenodes = filter(fn, nodeinfo)
1940
      nodefiles.update((filename,
1941
                        frozenset(map(operator.attrgetter("name"), filenodes)))
1942
                       for filename in files)
1943

    
1944
    assert set(nodefiles) == (files_all | files_mc | files_vm)
1945

    
1946
    fileinfo = dict((filename, {}) for filename in nodefiles)
1947
    ignore_nodes = set()
1948

    
1949
    for node in nodeinfo:
1950
      if node.offline:
1951
        ignore_nodes.add(node.name)
1952
        continue
1953

    
1954
      nresult = all_nvinfo[node.name]
1955

    
1956
      if nresult.fail_msg or not nresult.payload:
1957
        node_files = None
1958
      else:
1959
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
1960
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
1961
                          for (key, value) in fingerprints.items())
1962
        del fingerprints
1963

    
1964
      test = not (node_files and isinstance(node_files, dict))
1965
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
1966
              "Node did not return file checksum data")
1967
      if test:
1968
        ignore_nodes.add(node.name)
1969
        continue
1970

    
1971
      # Build per-checksum mapping from filename to nodes having it
1972
      for (filename, checksum) in node_files.items():
1973
        assert filename in nodefiles
1974
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
1975

    
1976
    for (filename, checksums) in fileinfo.items():
1977
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1978

    
1979
      # Nodes having the file
1980
      with_file = frozenset(node_name
1981
                            for nodes in fileinfo[filename].values()
1982
                            for node_name in nodes) - ignore_nodes
1983

    
1984
      expected_nodes = nodefiles[filename] - ignore_nodes
1985

    
1986
      # Nodes missing file
1987
      missing_file = expected_nodes - with_file
1988

    
1989
      if filename in files_opt:
1990
        # All or no nodes
1991
        errorif(missing_file and missing_file != expected_nodes,
1992
                constants.CV_ECLUSTERFILECHECK, None,
1993
                "File %s is optional, but it must exist on all or no"
1994
                " nodes (not found on %s)",
1995
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
1996
      else:
1997
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
1998
                "File %s is missing from node(s) %s", filename,
1999
                utils.CommaJoin(utils.NiceSort(missing_file)))
2000

    
2001
        # Warn if a node has a file it shouldn't
2002
        unexpected = with_file - expected_nodes
2003
        errorif(unexpected,
2004
                constants.CV_ECLUSTERFILECHECK, None,
2005
                "File %s should not exist on node(s) %s",
2006
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2007

    
2008
      # See if there are multiple versions of the file
2009
      test = len(checksums) > 1
2010
      if test:
2011
        variants = ["variant %s on %s" %
2012
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2013
                    for (idx, (checksum, nodes)) in
2014
                      enumerate(sorted(checksums.items()))]
2015
      else:
2016
        variants = []
2017

    
2018
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2019
              "File %s found with %s different checksums (%s)",
2020
              filename, len(checksums), "; ".join(variants))
2021

    
2022
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2023
                      drbd_map):
2024
    """Verifies and the node DRBD status.
2025

2026
    @type ninfo: L{objects.Node}
2027
    @param ninfo: the node to check
2028
    @param nresult: the remote results for the node
2029
    @param instanceinfo: the dict of instances
2030
    @param drbd_helper: the configured DRBD usermode helper
2031
    @param drbd_map: the DRBD map as returned by
2032
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2033

2034
    """
2035
    node = ninfo.name
2036
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2037

    
2038
    if drbd_helper:
2039
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2040
      test = (helper_result is None)
2041
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2042
               "no drbd usermode helper returned")
2043
      if helper_result:
2044
        status, payload = helper_result
2045
        test = not status
2046
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2047
                 "drbd usermode helper check unsuccessful: %s", payload)
2048
        test = status and (payload != drbd_helper)
2049
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2050
                 "wrong drbd usermode helper: %s", payload)
2051

    
2052
    # compute the DRBD minors
2053
    node_drbd = {}
2054
    for minor, instance in drbd_map[node].items():
2055
      test = instance not in instanceinfo
2056
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2057
               "ghost instance '%s' in temporary DRBD map", instance)
2058
        # ghost instance should not be running, but otherwise we
2059
        # don't give double warnings (both ghost instance and
2060
        # unallocated minor in use)
2061
      if test:
2062
        node_drbd[minor] = (instance, False)
2063
      else:
2064
        instance = instanceinfo[instance]
2065
        node_drbd[minor] = (instance.name, instance.disks_active)
2066

    
2067
    # and now check them
2068
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2069
    test = not isinstance(used_minors, (tuple, list))
2070
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2071
             "cannot parse drbd status file: %s", str(used_minors))
2072
    if test:
2073
      # we cannot check drbd status
2074
      return
2075

    
2076
    for minor, (iname, must_exist) in node_drbd.items():
2077
      test = minor not in used_minors and must_exist
2078
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2079
               "drbd minor %d of instance %s is not active", minor, iname)
2080
    for minor in used_minors:
2081
      test = minor not in node_drbd
2082
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2083
               "unallocated drbd minor %d is in use", minor)
2084

    
2085
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2086
    """Builds the node OS structures.
2087

2088
    @type ninfo: L{objects.Node}
2089
    @param ninfo: the node to check
2090
    @param nresult: the remote results for the node
2091
    @param nimg: the node image object
2092

2093
    """
2094
    node = ninfo.name
2095
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2096

    
2097
    remote_os = nresult.get(constants.NV_OSLIST, None)
2098
    test = (not isinstance(remote_os, list) or
2099
            not compat.all(isinstance(v, list) and len(v) == 7
2100
                           for v in remote_os))
2101

    
2102
    _ErrorIf(test, constants.CV_ENODEOS, node,
2103
             "node hasn't returned valid OS data")
2104

    
2105
    nimg.os_fail = test
2106

    
2107
    if test:
2108
      return
2109

    
2110
    os_dict = {}
2111

    
2112
    for (name, os_path, status, diagnose,
2113
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2114

    
2115
      if name not in os_dict:
2116
        os_dict[name] = []
2117

    
2118
      # parameters is a list of lists instead of list of tuples due to
2119
      # JSON lacking a real tuple type, fix it:
2120
      parameters = [tuple(v) for v in parameters]
2121
      os_dict[name].append((os_path, status, diagnose,
2122
                            set(variants), set(parameters), set(api_ver)))
2123

    
2124
    nimg.oslist = os_dict
2125

    
2126
  def _VerifyNodeOS(self, ninfo, nimg, base):
2127
    """Verifies the node OS list.
2128

2129
    @type ninfo: L{objects.Node}
2130
    @param ninfo: the node to check
2131
    @param nimg: the node image object
2132
    @param base: the 'template' node we match against (e.g. from the master)
2133

2134
    """
2135
    node = ninfo.name
2136
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2137

    
2138
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2139

    
2140
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2141
    for os_name, os_data in nimg.oslist.items():
2142
      assert os_data, "Empty OS status for OS %s?!" % os_name
2143
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2144
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2145
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2146
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2147
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2148
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2149
      # comparisons with the 'base' image
2150
      test = os_name not in base.oslist
2151
      _ErrorIf(test, constants.CV_ENODEOS, node,
2152
               "Extra OS %s not present on reference node (%s)",
2153
               os_name, base.name)
2154
      if test:
2155
        continue
2156
      assert base.oslist[os_name], "Base node has empty OS status?"
2157
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2158
      if not b_status:
2159
        # base OS is invalid, skipping
2160
        continue
2161
      for kind, a, b in [("API version", f_api, b_api),
2162
                         ("variants list", f_var, b_var),
2163
                         ("parameters", beautify_params(f_param),
2164
                          beautify_params(b_param))]:
2165
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2166
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2167
                 kind, os_name, base.name,
2168
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2169

    
2170
    # check any missing OSes
2171
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2172
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2173
             "OSes present on reference node %s but missing on this node: %s",
2174
             base.name, utils.CommaJoin(missing))
2175

    
2176
  def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2177
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2178

2179
    @type ninfo: L{objects.Node}
2180
    @param ninfo: the node to check
2181
    @param nresult: the remote results for the node
2182
    @type is_master: bool
2183
    @param is_master: Whether node is the master node
2184

2185
    """
2186
    node = ninfo.name
2187

    
2188
    if (is_master and
2189
        (constants.ENABLE_FILE_STORAGE or
2190
         constants.ENABLE_SHARED_FILE_STORAGE)):
2191
      try:
2192
        fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2193
      except KeyError:
2194
        # This should never happen
2195
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2196
                      "Node did not return forbidden file storage paths")
2197
      else:
2198
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2199
                      "Found forbidden file storage paths: %s",
2200
                      utils.CommaJoin(fspaths))
2201
    else:
2202
      self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2203
                    constants.CV_ENODEFILESTORAGEPATHS, node,
2204
                    "Node should not have returned forbidden file storage"
2205
                    " paths")
2206

    
2207
  def _VerifyOob(self, ninfo, nresult):
2208
    """Verifies out of band functionality of a node.
2209

2210
    @type ninfo: L{objects.Node}
2211
    @param ninfo: the node to check
2212
    @param nresult: the remote results for the node
2213

2214
    """
2215
    node = ninfo.name
2216
    # We just have to verify the paths on master and/or master candidates
2217
    # as the oob helper is invoked on the master
2218
    if ((ninfo.master_candidate or ninfo.master_capable) and
2219
        constants.NV_OOB_PATHS in nresult):
2220
      for path_result in nresult[constants.NV_OOB_PATHS]:
2221
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2222

    
2223
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2224
    """Verifies and updates the node volume data.
2225

2226
    This function will update a L{NodeImage}'s internal structures
2227
    with data from the remote call.
2228

2229
    @type ninfo: L{objects.Node}
2230
    @param ninfo: the node to check
2231
    @param nresult: the remote results for the node
2232
    @param nimg: the node image object
2233
    @param vg_name: the configured VG name
2234

2235
    """
2236
    node = ninfo.name
2237
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2238

    
2239
    nimg.lvm_fail = True
2240
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2241
    if vg_name is None:
2242
      pass
2243
    elif isinstance(lvdata, basestring):
2244
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2245
               utils.SafeEncode(lvdata))
2246
    elif not isinstance(lvdata, dict):
2247
      _ErrorIf(True, constants.CV_ENODELVM, node,
2248
               "rpc call to node failed (lvlist)")
2249
    else:
2250
      nimg.volumes = lvdata
2251
      nimg.lvm_fail = False
2252

    
2253
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2254
    """Verifies and updates the node instance list.
2255

2256
    If the listing was successful, then updates this node's instance
2257
    list. Otherwise, it marks the RPC call as failed for the instance
2258
    list key.
2259

2260
    @type ninfo: L{objects.Node}
2261
    @param ninfo: the node to check
2262
    @param nresult: the remote results for the node
2263
    @param nimg: the node image object
2264

2265
    """
2266
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2267
    test = not isinstance(idata, list)
2268
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2269
                  "rpc call to node failed (instancelist): %s",
2270
                  utils.SafeEncode(str(idata)))
2271
    if test:
2272
      nimg.hyp_fail = True
2273
    else:
2274
      nimg.instances = idata
2275

    
2276
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2277
    """Verifies and computes a node information map
2278

2279
    @type ninfo: L{objects.Node}
2280
    @param ninfo: the node to check
2281
    @param nresult: the remote results for the node
2282
    @param nimg: the node image object
2283
    @param vg_name: the configured VG name
2284

2285
    """
2286
    node = ninfo.name
2287
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2288

    
2289
    # try to read free memory (from the hypervisor)
2290
    hv_info = nresult.get(constants.NV_HVINFO, None)
2291
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2292
    _ErrorIf(test, constants.CV_ENODEHV, node,
2293
             "rpc call to node failed (hvinfo)")
2294
    if not test:
2295
      try:
2296
        nimg.mfree = int(hv_info["memory_free"])
2297
      except (ValueError, TypeError):
2298
        _ErrorIf(True, constants.CV_ENODERPC, node,
2299
                 "node returned invalid nodeinfo, check hypervisor")
2300

    
2301
    # FIXME: devise a free space model for file based instances as well
2302
    if vg_name is not None:
2303
      test = (constants.NV_VGLIST not in nresult or
2304
              vg_name not in nresult[constants.NV_VGLIST])
2305
      _ErrorIf(test, constants.CV_ENODELVM, node,
2306
               "node didn't return data for the volume group '%s'"
2307
               " - it is either missing or broken", vg_name)
2308
      if not test:
2309
        try:
2310
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2311
        except (ValueError, TypeError):
2312
          _ErrorIf(True, constants.CV_ENODERPC, node,
2313
                   "node returned invalid LVM info, check LVM status")
2314

    
2315
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2316
    """Gets per-disk status information for all instances.
2317

2318
    @type nodelist: list of strings
2319
    @param nodelist: Node names
2320
    @type node_image: dict of (name, L{objects.Node})
2321
    @param node_image: Node objects
2322
    @type instanceinfo: dict of (name, L{objects.Instance})
2323
    @param instanceinfo: Instance objects
2324
    @rtype: {instance: {node: [(succes, payload)]}}
2325
    @return: a dictionary of per-instance dictionaries with nodes as
2326
        keys and disk information as values; the disk information is a
2327
        list of tuples (success, payload)
2328

2329
    """
2330
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2331

    
2332
    node_disks = {}
2333
    node_disks_devonly = {}
2334
    diskless_instances = set()
2335
    diskless = constants.DT_DISKLESS
2336

    
2337
    for nname in nodelist:
2338
      node_instances = list(itertools.chain(node_image[nname].pinst,
2339
                                            node_image[nname].sinst))
2340
      diskless_instances.update(inst for inst in node_instances
2341
                                if instanceinfo[inst].disk_template == diskless)
2342
      disks = [(inst, disk)
2343
               for inst in node_instances
2344
               for disk in instanceinfo[inst].disks]
2345

    
2346
      if not disks:
2347
        # No need to collect data
2348
        continue
2349

    
2350
      node_disks[nname] = disks
2351

    
2352
      # _AnnotateDiskParams makes already copies of the disks
2353
      devonly = []
2354
      for (inst, dev) in disks:
2355
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2356
        self.cfg.SetDiskID(anno_disk, nname)
2357
        devonly.append(anno_disk)
2358

    
2359
      node_disks_devonly[nname] = devonly
2360

    
2361
    assert len(node_disks) == len(node_disks_devonly)
2362

    
2363
    # Collect data from all nodes with disks
2364
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2365
                                                          node_disks_devonly)
2366

    
2367
    assert len(result) == len(node_disks)
2368

    
2369
    instdisk = {}
2370

    
2371
    for (nname, nres) in result.items():
2372
      disks = node_disks[nname]
2373

    
2374
      if nres.offline:
2375
        # No data from this node
2376
        data = len(disks) * [(False, "node offline")]
2377
      else:
2378
        msg = nres.fail_msg
2379
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2380
                 "while getting disk information: %s", msg)
2381
        if msg:
2382
          # No data from this node
2383
          data = len(disks) * [(False, msg)]
2384
        else:
2385
          data = []
2386
          for idx, i in enumerate(nres.payload):
2387
            if isinstance(i, (tuple, list)) and len(i) == 2:
2388
              data.append(i)
2389
            else:
2390
              logging.warning("Invalid result from node %s, entry %d: %s",
2391
                              nname, idx, i)
2392
              data.append((False, "Invalid result from the remote node"))
2393

    
2394
      for ((inst, _), status) in zip(disks, data):
2395
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2396

    
2397
    # Add empty entries for diskless instances.
2398
    for inst in diskless_instances:
2399
      assert inst not in instdisk
2400
      instdisk[inst] = {}
2401

    
2402
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2403
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2404
                      compat.all(isinstance(s, (tuple, list)) and
2405
                                 len(s) == 2 for s in statuses)
2406
                      for inst, nnames in instdisk.items()
2407
                      for nname, statuses in nnames.items())
2408
    if __debug__:
2409
      instdisk_keys = set(instdisk)
2410
      instanceinfo_keys = set(instanceinfo)
2411
      assert instdisk_keys == instanceinfo_keys, \
2412
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2413
         (instdisk_keys, instanceinfo_keys))
2414

    
2415
    return instdisk
2416

    
2417
  @staticmethod
2418
  def _SshNodeSelector(group_uuid, all_nodes):
2419
    """Create endless iterators for all potential SSH check hosts.
2420

2421
    """
2422
    nodes = [node for node in all_nodes
2423
             if (node.group != group_uuid and
2424
                 not node.offline)]
2425
    keyfunc = operator.attrgetter("group")
2426

    
2427
    return map(itertools.cycle,
2428
               [sorted(map(operator.attrgetter("name"), names))
2429
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2430
                                                  keyfunc)])
2431

    
2432
  @classmethod
2433
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2434
    """Choose which nodes should talk to which other nodes.
2435

2436
    We will make nodes contact all nodes in their group, and one node from
2437
    every other group.
2438

2439
    @warning: This algorithm has a known issue if one node group is much
2440
      smaller than others (e.g. just one node). In such a case all other
2441
      nodes will talk to the single node.
2442

2443
    """
2444
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2445
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2446

    
2447
    return (online_nodes,
2448
            dict((name, sorted([i.next() for i in sel]))
2449
                 for name in online_nodes))
2450

    
2451
  def BuildHooksEnv(self):
2452
    """Build hooks env.
2453

2454
    Cluster-Verify hooks just ran in the post phase and their failure makes
2455
    the output be logged in the verify output and the verification to fail.
2456

2457
    """
2458
    env = {
2459
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2460
      }
2461

    
2462
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2463
               for node in self.my_node_info.values())
2464

    
2465
    return env
2466

    
2467
  def BuildHooksNodes(self):
2468
    """Build hooks nodes.
2469

2470
    """
2471
    return ([], self.my_node_names)
2472

    
2473
  def Exec(self, feedback_fn):
2474
    """Verify integrity of the node group, performing various test on nodes.
2475

2476
    """
2477
    # This method has too many local variables. pylint: disable=R0914
2478
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2479

    
2480
    if not self.my_node_names:
2481
      # empty node group
2482
      feedback_fn("* Empty node group, skipping verification")
2483
      return True
2484

    
2485
    self.bad = False
2486
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2487
    verbose = self.op.verbose
2488
    self._feedback_fn = feedback_fn
2489

    
2490
    vg_name = self.cfg.GetVGName()
2491
    drbd_helper = self.cfg.GetDRBDHelper()
2492
    cluster = self.cfg.GetClusterInfo()
2493
    hypervisors = cluster.enabled_hypervisors
2494
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2495

    
2496
    i_non_redundant = [] # Non redundant instances
2497
    i_non_a_balanced = [] # Non auto-balanced instances
2498
    i_offline = 0 # Count of offline instances
2499
    n_offline = 0 # Count of offline nodes
2500
    n_drained = 0 # Count of nodes being drained
2501
    node_vol_should = {}
2502

    
2503
    # FIXME: verify OS list
2504

    
2505
    # File verification
2506
    filemap = ComputeAncillaryFiles(cluster, False)
2507

    
2508
    # do local checksums
2509
    master_node = self.master_node = self.cfg.GetMasterNode()
2510
    master_ip = self.cfg.GetMasterIP()
2511

    
2512
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2513

    
2514
    user_scripts = []
2515
    if self.cfg.GetUseExternalMipScript():
2516
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2517

    
2518
    node_verify_param = {
2519
      constants.NV_FILELIST:
2520
        map(vcluster.MakeVirtualPath,
2521
            utils.UniqueSequence(filename
2522
                                 for files in filemap
2523
                                 for filename in files)),
2524
      constants.NV_NODELIST:
2525
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2526
                                  self.all_node_info.values()),
2527
      constants.NV_HYPERVISOR: hypervisors,
2528
      constants.NV_HVPARAMS:
2529
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2530
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2531
                                 for node in node_data_list
2532
                                 if not node.offline],
2533
      constants.NV_INSTANCELIST: hypervisors,
2534
      constants.NV_VERSION: None,
2535
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2536
      constants.NV_NODESETUP: None,
2537
      constants.NV_TIME: None,
2538
      constants.NV_MASTERIP: (master_node, master_ip),
2539
      constants.NV_OSLIST: None,
2540
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2541
      constants.NV_USERSCRIPTS: user_scripts,
2542
      }
2543

    
2544
    if vg_name is not None:
2545
      node_verify_param[constants.NV_VGLIST] = None
2546
      node_verify_param[constants.NV_LVLIST] = vg_name
2547
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2548

    
2549
    if drbd_helper:
2550
      node_verify_param[constants.NV_DRBDLIST] = None
2551
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2552

    
2553
    if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2554
      # Load file storage paths only from master node
2555
      node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2556

    
2557
    # bridge checks
2558
    # FIXME: this needs to be changed per node-group, not cluster-wide
2559
    bridges = set()
2560
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2561
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2562
      bridges.add(default_nicpp[constants.NIC_LINK])
2563
    for instance in self.my_inst_info.values():
2564
      for nic in instance.nics:
2565
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2566
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2567
          bridges.add(full_nic[constants.NIC_LINK])
2568

    
2569
    if bridges:
2570
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2571

    
2572
    # Build our expected cluster state
2573
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2574
                                                 name=node.name,
2575
                                                 vm_capable=node.vm_capable))
2576
                      for node in node_data_list)
2577

    
2578
    # Gather OOB paths
2579
    oob_paths = []
2580
    for node in self.all_node_info.values():
2581
      path = SupportsOob(self.cfg, node)
2582
      if path and path not in oob_paths:
2583
        oob_paths.append(path)
2584

    
2585
    if oob_paths:
2586
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2587

    
2588
    for instance in self.my_inst_names:
2589
      inst_config = self.my_inst_info[instance]
2590
      if inst_config.admin_state == constants.ADMINST_OFFLINE:
2591
        i_offline += 1
2592

    
2593
      for nname in inst_config.all_nodes:
2594
        if nname not in node_image:
2595
          gnode = self.NodeImage(name=nname)
2596
          gnode.ghost = (nname not in self.all_node_info)
2597
          node_image[nname] = gnode
2598

    
2599
      inst_config.MapLVsByNode(node_vol_should)
2600

    
2601
      pnode = inst_config.primary_node
2602
      node_image[pnode].pinst.append(instance)
2603

    
2604
      for snode in inst_config.secondary_nodes:
2605
        nimg = node_image[snode]
2606
        nimg.sinst.append(instance)
2607
        if pnode not in nimg.sbp:
2608
          nimg.sbp[pnode] = []
2609
        nimg.sbp[pnode].append(instance)
2610

    
2611
    es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2612
    # The value of exclusive_storage should be the same across the group, so if
2613
    # it's True for at least a node, we act as if it were set for all the nodes
2614
    self._exclusive_storage = compat.any(es_flags.values())
2615
    if self._exclusive_storage:
2616
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2617

    
2618
    # At this point, we have the in-memory data structures complete,
2619
    # except for the runtime information, which we'll gather next
2620

    
2621
    # Due to the way our RPC system works, exact response times cannot be
2622
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2623
    # time before and after executing the request, we can at least have a time
2624
    # window.
2625
    nvinfo_starttime = time.time()
2626
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2627
                                           node_verify_param,
2628
                                           self.cfg.GetClusterName())
2629
    nvinfo_endtime = time.time()
2630

    
2631
    if self.extra_lv_nodes and vg_name is not None:
2632
      extra_lv_nvinfo = \
2633
          self.rpc.call_node_verify(self.extra_lv_nodes,
2634
                                    {constants.NV_LVLIST: vg_name},
2635
                                    self.cfg.GetClusterName())
2636
    else:
2637
      extra_lv_nvinfo = {}
2638

    
2639
    all_drbd_map = self.cfg.ComputeDRBDMap()
2640

    
2641
    feedback_fn("* Gathering disk information (%s nodes)" %
2642
                len(self.my_node_names))
2643
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2644
                                     self.my_inst_info)
2645

    
2646
    feedback_fn("* Verifying configuration file consistency")
2647

    
2648
    # If not all nodes are being checked, we need to make sure the master node
2649
    # and a non-checked vm_capable node are in the list.
2650
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2651
    if absent_nodes:
2652
      vf_nvinfo = all_nvinfo.copy()
2653
      vf_node_info = list(self.my_node_info.values())
2654
      additional_nodes = []
2655
      if master_node not in self.my_node_info:
2656
        additional_nodes.append(master_node)
2657
        vf_node_info.append(self.all_node_info[master_node])
2658
      # Add the first vm_capable node we find which is not included,
2659
      # excluding the master node (which we already have)
2660
      for node in absent_nodes:
2661
        nodeinfo = self.all_node_info[node]
2662
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2663
            node != master_node):
2664
          additional_nodes.append(node)
2665
          vf_node_info.append(self.all_node_info[node])
2666
          break
2667
      key = constants.NV_FILELIST
2668
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2669
                                                 {key: node_verify_param[key]},
2670
                                                 self.cfg.GetClusterName()))
2671
    else:
2672
      vf_nvinfo = all_nvinfo
2673
      vf_node_info = self.my_node_info.values()
2674

    
2675
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2676

    
2677
    feedback_fn("* Verifying node status")
2678

    
2679
    refos_img = None
2680

    
2681
    for node_i in node_data_list:
2682
      node = node_i.name
2683
      nimg = node_image[node]
2684

    
2685
      if node_i.offline:
2686
        if verbose:
2687
          feedback_fn("* Skipping offline node %s" % (node,))
2688
        n_offline += 1
2689
        continue
2690

    
2691
      if node == master_node:
2692
        ntype = "master"
2693
      elif node_i.master_candidate:
2694
        ntype = "master candidate"
2695
      elif node_i.drained:
2696
        ntype = "drained"
2697
        n_drained += 1
2698
      else:
2699
        ntype = "regular"
2700
      if verbose:
2701
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2702

    
2703
      msg = all_nvinfo[node].fail_msg
2704
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2705
               msg)
2706
      if msg:
2707
        nimg.rpc_fail = True
2708
        continue
2709

    
2710
      nresult = all_nvinfo[node].payload
2711

    
2712
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2713
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2714
      self._VerifyNodeNetwork(node_i, nresult)
2715
      self._VerifyNodeUserScripts(node_i, nresult)
2716
      self._VerifyOob(node_i, nresult)
2717
      self._VerifyFileStoragePaths(node_i, nresult,
2718
                                   node == master_node)
2719

    
2720
      if nimg.vm_capable:
2721
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2722
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2723
                             all_drbd_map)
2724

    
2725
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2726
        self._UpdateNodeInstances(node_i, nresult, nimg)
2727
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2728
        self._UpdateNodeOS(node_i, nresult, nimg)
2729

    
2730
        if not nimg.os_fail:
2731
          if refos_img is None:
2732
            refos_img = nimg
2733
          self._VerifyNodeOS(node_i, nimg, refos_img)
2734
        self._VerifyNodeBridges(node_i, nresult, bridges)
2735

    
2736
        # Check whether all running instancies are primary for the node. (This
2737
        # can no longer be done from _VerifyInstance below, since some of the
2738
        # wrong instances could be from other node groups.)
2739
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2740

    
2741
        for inst in non_primary_inst:
2742
          test = inst in self.all_inst_info
2743
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2744
                   "instance should not run on node %s", node_i.name)
2745
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2746
                   "node is running unknown instance %s", inst)
2747

    
2748
    self._VerifyGroupLVM(node_image, vg_name)
2749

    
2750
    for node, result in extra_lv_nvinfo.items():
2751
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2752
                              node_image[node], vg_name)
2753

    
2754
    feedback_fn("* Verifying instance status")
2755
    for instance in self.my_inst_names:
2756
      if verbose:
2757
        feedback_fn("* Verifying instance %s" % instance)
2758
      inst_config = self.my_inst_info[instance]
2759
      self._VerifyInstance(instance, inst_config, node_image,
2760
                           instdisk[instance])
2761

    
2762
      # If the instance is non-redundant we cannot survive losing its primary
2763
      # node, so we are not N+1 compliant.
2764
      if inst_config.disk_template not in constants.DTS_MIRRORED:
2765
        i_non_redundant.append(instance)
2766

    
2767
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2768
        i_non_a_balanced.append(instance)
2769

    
2770
    feedback_fn("* Verifying orphan volumes")
2771
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2772

    
2773
    # We will get spurious "unknown volume" warnings if any node of this group
2774
    # is secondary for an instance whose primary is in another group. To avoid
2775
    # them, we find these instances and add their volumes to node_vol_should.
2776
    for inst in self.all_inst_info.values():
2777
      for secondary in inst.secondary_nodes:
2778
        if (secondary in self.my_node_info
2779
            and inst.name not in self.my_inst_info):
2780
          inst.MapLVsByNode(node_vol_should)
2781
          break
2782

    
2783
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2784

    
2785
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2786
      feedback_fn("* Verifying N+1 Memory redundancy")
2787
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2788

    
2789
    feedback_fn("* Other Notes")
2790
    if i_non_redundant:
2791
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2792
                  % len(i_non_redundant))
2793

    
2794
    if i_non_a_balanced:
2795
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2796
                  % len(i_non_a_balanced))
2797

    
2798
    if i_offline:
2799
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2800

    
2801
    if n_offline:
2802
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2803

    
2804
    if n_drained:
2805
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2806

    
2807
    return not self.bad
2808

    
2809
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2810
    """Analyze the post-hooks' result
2811

2812
    This method analyses the hook result, handles it, and sends some
2813
    nicely-formatted feedback back to the user.
2814

2815
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2816
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2817
    @param hooks_results: the results of the multi-node hooks rpc call
2818
    @param feedback_fn: function used send feedback back to the caller
2819
    @param lu_result: previous Exec result
2820
    @return: the new Exec result, based on the previous result
2821
        and hook results
2822

2823
    """
2824
    # We only really run POST phase hooks, only for non-empty groups,
2825
    # and are only interested in their results
2826
    if not self.my_node_names:
2827
      # empty node group
2828
      pass
2829
    elif phase == constants.HOOKS_PHASE_POST:
2830
      # Used to change hooks' output to proper indentation
2831
      feedback_fn("* Hooks Results")
2832
      assert hooks_results, "invalid result from hooks"
2833

    
2834
      for node_name in hooks_results:
2835
        res = hooks_results[node_name]
2836
        msg = res.fail_msg
2837
        test = msg and not res.offline
2838
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2839
                      "Communication failure in hooks execution: %s", msg)
2840
        if res.offline or msg:
2841
          # No need to investigate payload if node is offline or gave
2842
          # an error.
2843
          continue
2844
        for script, hkr, output in res.payload:
2845
          test = hkr == constants.HKR_FAIL
2846
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2847
                        "Script %s failed, output:", script)
2848
          if test:
2849
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2850
            feedback_fn("%s" % output)
2851
            lu_result = False
2852

    
2853
    return lu_result
2854

    
2855

    
2856
class LUClusterVerifyDisks(NoHooksLU):
2857
  """Verifies the cluster disks status.
2858

2859
  """
2860
  REQ_BGL = False
2861

    
2862
  def ExpandNames(self):
2863
    self.share_locks = ShareAll()
2864
    self.needed_locks = {
2865
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2866
      }
2867

    
2868
  def Exec(self, feedback_fn):
2869
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2870

    
2871
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2872
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2873
                           for group in group_names])