Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ fe782deb

History | View | Annotate | Download (104.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.name,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, master_params.name)
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
170
                                                     master_params, ems)
171
    if result.fail_msg:
172
      self.LogWarning("Error disabling the master IP address: %s",
173
                      result.fail_msg)
174

    
175
    return master_params.name
176

    
177

    
178
class LUClusterPostInit(LogicalUnit):
179
  """Logical unit for running hooks after cluster initialization.
180

181
  """
182
  HPATH = "cluster-init"
183
  HTYPE = constants.HTYPE_CLUSTER
184

    
185
  def BuildHooksEnv(self):
186
    """Build hooks env.
187

188
    """
189
    return {
190
      "OP_TARGET": self.cfg.GetClusterName(),
191
      }
192

    
193
  def BuildHooksNodes(self):
194
    """Build hooks nodes.
195

196
    """
197
    return ([], [self.cfg.GetMasterNode()])
198

    
199
  def Exec(self, feedback_fn):
200
    """Nothing to do.
201

202
    """
203
    return True
204

    
205

    
206
class ClusterQuery(QueryBase):
207
  FIELDS = query.CLUSTER_FIELDS
208

    
209
  #: Do not sort (there is only one item)
210
  SORT_FIELD = None
211

    
212
  def ExpandNames(self, lu):
213
    lu.needed_locks = {}
214

    
215
    # The following variables interact with _QueryBase._GetNames
216
    self.wanted = locking.ALL_SET
217
    self.do_locking = self.use_locking
218

    
219
    if self.do_locking:
220
      raise errors.OpPrereqError("Can not use locking for cluster queries",
221
                                 errors.ECODE_INVAL)
222

    
223
  def DeclareLocks(self, lu, level):
224
    pass
225

    
226
  def _GetQueryData(self, lu):
227
    """Computes the list of nodes and their attributes.
228

229
    """
230
    # Locking is not used
231
    assert not (compat.any(lu.glm.is_owned(level)
232
                           for level in locking.LEVELS
233
                           if level != locking.LEVEL_CLUSTER) or
234
                self.do_locking or self.use_locking)
235

    
236
    if query.CQ_CONFIG in self.requested_data:
237
      cluster = lu.cfg.GetClusterInfo()
238
    else:
239
      cluster = NotImplemented
240

    
241
    if query.CQ_QUEUE_DRAINED in self.requested_data:
242
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243
    else:
244
      drain_flag = NotImplemented
245

    
246
    if query.CQ_WATCHER_PAUSE in self.requested_data:
247
      master_name = lu.cfg.GetMasterNode()
248

    
249
      result = lu.rpc.call_get_watcher_pause(master_name)
250
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
251
                   master_name)
252

    
253
      watcher_pause = result.payload
254
    else:
255
      watcher_pause = NotImplemented
256

    
257
    return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
258

    
259

    
260
class LUClusterQuery(NoHooksLU):
261
  """Query cluster configuration.
262

263
  """
264
  REQ_BGL = False
265

    
266
  def ExpandNames(self):
267
    self.needed_locks = {}
268

    
269
  def Exec(self, feedback_fn):
270
    """Return cluster config.
271

272
    """
273
    cluster = self.cfg.GetClusterInfo()
274
    os_hvp = {}
275

    
276
    # Filter just for enabled hypervisors
277
    for os_name, hv_dict in cluster.os_hvp.items():
278
      os_hvp[os_name] = {}
279
      for hv_name, hv_params in hv_dict.items():
280
        if hv_name in cluster.enabled_hypervisors:
281
          os_hvp[os_name][hv_name] = hv_params
282

    
283
    # Convert ip_family to ip_version
284
    primary_ip_version = constants.IP4_VERSION
285
    if cluster.primary_ip_family == netutils.IP6Address.family:
286
      primary_ip_version = constants.IP6_VERSION
287

    
288
    result = {
289
      "software_version": constants.RELEASE_VERSION,
290
      "protocol_version": constants.PROTOCOL_VERSION,
291
      "config_version": constants.CONFIG_VERSION,
292
      "os_api_version": max(constants.OS_API_VERSIONS),
293
      "export_version": constants.EXPORT_VERSION,
294
      "architecture": runtime.GetArchInfo(),
295
      "name": cluster.cluster_name,
296
      "master": cluster.master_node,
297
      "default_hypervisor": cluster.primary_hypervisor,
298
      "enabled_hypervisors": cluster.enabled_hypervisors,
299
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
300
                        for hypervisor_name in cluster.enabled_hypervisors]),
301
      "os_hvp": os_hvp,
302
      "beparams": cluster.beparams,
303
      "osparams": cluster.osparams,
304
      "ipolicy": cluster.ipolicy,
305
      "nicparams": cluster.nicparams,
306
      "ndparams": cluster.ndparams,
307
      "diskparams": cluster.diskparams,
308
      "candidate_pool_size": cluster.candidate_pool_size,
309
      "master_netdev": cluster.master_netdev,
310
      "master_netmask": cluster.master_netmask,
311
      "use_external_mip_script": cluster.use_external_mip_script,
312
      "volume_group_name": cluster.volume_group_name,
313
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
314
      "file_storage_dir": cluster.file_storage_dir,
315
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
316
      "maintain_node_health": cluster.maintain_node_health,
317
      "ctime": cluster.ctime,
318
      "mtime": cluster.mtime,
319
      "uuid": cluster.uuid,
320
      "tags": list(cluster.GetTags()),
321
      "uid_pool": cluster.uid_pool,
322
      "default_iallocator": cluster.default_iallocator,
323
      "reserved_lvs": cluster.reserved_lvs,
324
      "primary_ip_version": primary_ip_version,
325
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
326
      "hidden_os": cluster.hidden_os,
327
      "blacklisted_os": cluster.blacklisted_os,
328
      "enabled_disk_templates": cluster.enabled_disk_templates,
329
      }
330

    
331
    return result
332

    
333

    
334
class LUClusterRedistConf(NoHooksLU):
335
  """Force the redistribution of cluster configuration.
336

337
  This is a very simple LU.
338

339
  """
340
  REQ_BGL = False
341

    
342
  def ExpandNames(self):
343
    self.needed_locks = {
344
      locking.LEVEL_NODE: locking.ALL_SET,
345
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
346
    }
347
    self.share_locks = ShareAll()
348

    
349
  def Exec(self, feedback_fn):
350
    """Redistribute the configuration.
351

352
    """
353
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
354
    RedistributeAncillaryFiles(self)
355

    
356

    
357
class LUClusterRename(LogicalUnit):
358
  """Rename the cluster.
359

360
  """
361
  HPATH = "cluster-rename"
362
  HTYPE = constants.HTYPE_CLUSTER
363

    
364
  def BuildHooksEnv(self):
365
    """Build hooks env.
366

367
    """
368
    return {
369
      "OP_TARGET": self.cfg.GetClusterName(),
370
      "NEW_NAME": self.op.name,
371
      }
372

    
373
  def BuildHooksNodes(self):
374
    """Build hooks nodes.
375

376
    """
377
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
378

    
379
  def CheckPrereq(self):
380
    """Verify that the passed name is a valid one.
381

382
    """
383
    hostname = netutils.GetHostname(name=self.op.name,
384
                                    family=self.cfg.GetPrimaryIPFamily())
385

    
386
    new_name = hostname.name
387
    self.ip = new_ip = hostname.ip
388
    old_name = self.cfg.GetClusterName()
389
    old_ip = self.cfg.GetMasterIP()
390
    if new_name == old_name and new_ip == old_ip:
391
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
392
                                 " cluster has changed",
393
                                 errors.ECODE_INVAL)
394
    if new_ip != old_ip:
395
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
396
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
397
                                   " reachable on the network" %
398
                                   new_ip, errors.ECODE_NOTUNIQUE)
399

    
400
    self.op.name = new_name
401

    
402
  def Exec(self, feedback_fn):
403
    """Rename the cluster.
404

405
    """
406
    clustername = self.op.name
407
    new_ip = self.ip
408

    
409
    # shutdown the master IP
410
    master_params = self.cfg.GetMasterNetworkParameters()
411
    ems = self.cfg.GetUseExternalMipScript()
412
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
413
                                                     master_params, ems)
414
    result.Raise("Could not disable the master role")
415

    
416
    try:
417
      cluster = self.cfg.GetClusterInfo()
418
      cluster.cluster_name = clustername
419
      cluster.master_ip = new_ip
420
      self.cfg.Update(cluster, feedback_fn)
421

    
422
      # update the known hosts file
423
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
424
      node_list = self.cfg.GetOnlineNodeList()
425
      try:
426
        node_list.remove(master_params.name)
427
      except ValueError:
428
        pass
429
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
430
    finally:
431
      master_params.ip = new_ip
432
      result = self.rpc.call_node_activate_master_ip(master_params.name,
433
                                                     master_params, ems)
434
      msg = result.fail_msg
435
      if msg:
436
        self.LogWarning("Could not re-enable the master role on"
437
                        " the master, please restart manually: %s", msg)
438

    
439
    return clustername
440

    
441

    
442
class LUClusterRepairDiskSizes(NoHooksLU):
443
  """Verifies the cluster disks sizes.
444

445
  """
446
  REQ_BGL = False
447

    
448
  def ExpandNames(self):
449
    if self.op.instances:
450
      self.wanted_names = GetWantedInstances(self, self.op.instances)
451
      # Not getting the node allocation lock as only a specific set of
452
      # instances (and their nodes) is going to be acquired
453
      self.needed_locks = {
454
        locking.LEVEL_NODE_RES: [],
455
        locking.LEVEL_INSTANCE: self.wanted_names,
456
        }
457
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
458
    else:
459
      self.wanted_names = None
460
      self.needed_locks = {
461
        locking.LEVEL_NODE_RES: locking.ALL_SET,
462
        locking.LEVEL_INSTANCE: locking.ALL_SET,
463

    
464
        # This opcode is acquires the node locks for all instances
465
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
466
        }
467

    
468
    self.share_locks = {
469
      locking.LEVEL_NODE_RES: 1,
470
      locking.LEVEL_INSTANCE: 0,
471
      locking.LEVEL_NODE_ALLOC: 1,
472
      }
473

    
474
  def DeclareLocks(self, level):
475
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
476
      self._LockInstancesNodes(primary_only=True, level=level)
477

    
478
  def CheckPrereq(self):
479
    """Check prerequisites.
480

481
    This only checks the optional instance list against the existing names.
482

483
    """
484
    if self.wanted_names is None:
485
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
486

    
487
    self.wanted_instances = \
488
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
489

    
490
  def _EnsureChildSizes(self, disk):
491
    """Ensure children of the disk have the needed disk size.
492

493
    This is valid mainly for DRBD8 and fixes an issue where the
494
    children have smaller disk size.
495

496
    @param disk: an L{ganeti.objects.Disk} object
497

498
    """
499
    if disk.dev_type == constants.LD_DRBD8:
500
      assert disk.children, "Empty children for DRBD8?"
501
      fchild = disk.children[0]
502
      mismatch = fchild.size < disk.size
503
      if mismatch:
504
        self.LogInfo("Child disk has size %d, parent %d, fixing",
505
                     fchild.size, disk.size)
506
        fchild.size = disk.size
507

    
508
      # and we recurse on this child only, not on the metadev
509
      return self._EnsureChildSizes(fchild) or mismatch
510
    else:
511
      return False
512

    
513
  def Exec(self, feedback_fn):
514
    """Verify the size of cluster disks.
515

516
    """
517
    # TODO: check child disks too
518
    # TODO: check differences in size between primary/secondary nodes
519
    per_node_disks = {}
520
    for instance in self.wanted_instances:
521
      pnode = instance.primary_node
522
      if pnode not in per_node_disks:
523
        per_node_disks[pnode] = []
524
      for idx, disk in enumerate(instance.disks):
525
        per_node_disks[pnode].append((instance, idx, disk))
526

    
527
    assert not (frozenset(per_node_disks.keys()) -
528
                self.owned_locks(locking.LEVEL_NODE_RES)), \
529
      "Not owning correct locks"
530
    assert not self.owned_locks(locking.LEVEL_NODE)
531

    
532
    changed = []
533
    for node, dskl in per_node_disks.items():
534
      newl = [v[2].Copy() for v in dskl]
535
      for dsk in newl:
536
        self.cfg.SetDiskID(dsk, node)
537
      result = self.rpc.call_blockdev_getsize(node, newl)
538
      if result.fail_msg:
539
        self.LogWarning("Failure in blockdev_getsize call to node"
540
                        " %s, ignoring", node)
541
        continue
542
      if len(result.payload) != len(dskl):
543
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
544
                        " result.payload=%s", node, len(dskl), result.payload)
545
        self.LogWarning("Invalid result from node %s, ignoring node results",
546
                        node)
547
        continue
548
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
549
        if size is None:
550
          self.LogWarning("Disk %d of instance %s did not return size"
551
                          " information, ignoring", idx, instance.name)
552
          continue
553
        if not isinstance(size, (int, long)):
554
          self.LogWarning("Disk %d of instance %s did not return valid"
555
                          " size information, ignoring", idx, instance.name)
556
          continue
557
        size = size >> 20
558
        if size != disk.size:
559
          self.LogInfo("Disk %d of instance %s has mismatched size,"
560
                       " correcting: recorded %d, actual %d", idx,
561
                       instance.name, disk.size, size)
562
          disk.size = size
563
          self.cfg.Update(instance, feedback_fn)
564
          changed.append((instance.name, idx, size))
565
        if self._EnsureChildSizes(disk):
566
          self.cfg.Update(instance, feedback_fn)
567
          changed.append((instance.name, idx, disk.size))
568
    return changed
569

    
570

    
571
def _ValidateNetmask(cfg, netmask):
572
  """Checks if a netmask is valid.
573

574
  @type cfg: L{config.ConfigWriter}
575
  @param cfg: The cluster configuration
576
  @type netmask: int
577
  @param netmask: the netmask to be verified
578
  @raise errors.OpPrereqError: if the validation fails
579

580
  """
581
  ip_family = cfg.GetPrimaryIPFamily()
582
  try:
583
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
584
  except errors.ProgrammerError:
585
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
586
                               ip_family, errors.ECODE_INVAL)
587
  if not ipcls.ValidateNetmask(netmask):
588
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
589
                               (netmask), errors.ECODE_INVAL)
590

    
591

    
592
class LUClusterSetParams(LogicalUnit):
593
  """Change the parameters of the cluster.
594

595
  """
596
  HPATH = "cluster-modify"
597
  HTYPE = constants.HTYPE_CLUSTER
598
  REQ_BGL = False
599

    
600
  def CheckArguments(self):
601
    """Check parameters
602

603
    """
604
    if self.op.uid_pool:
605
      uidpool.CheckUidPool(self.op.uid_pool)
606

    
607
    if self.op.add_uids:
608
      uidpool.CheckUidPool(self.op.add_uids)
609

    
610
    if self.op.remove_uids:
611
      uidpool.CheckUidPool(self.op.remove_uids)
612

    
613
    if self.op.master_netmask is not None:
614
      _ValidateNetmask(self.cfg, self.op.master_netmask)
615

    
616
    if self.op.diskparams:
617
      for dt_params in self.op.diskparams.values():
618
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
619
      try:
620
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
621
      except errors.OpPrereqError, err:
622
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
623
                                   errors.ECODE_INVAL)
624

    
625
  def ExpandNames(self):
626
    # FIXME: in the future maybe other cluster params won't require checking on
627
    # all nodes to be modified.
628
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
629
    # resource locks the right thing, shouldn't it be the BGL instead?
630
    self.needed_locks = {
631
      locking.LEVEL_NODE: locking.ALL_SET,
632
      locking.LEVEL_INSTANCE: locking.ALL_SET,
633
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
634
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
635
    }
636
    self.share_locks = ShareAll()
637

    
638
  def BuildHooksEnv(self):
639
    """Build hooks env.
640

641
    """
642
    return {
643
      "OP_TARGET": self.cfg.GetClusterName(),
644
      "NEW_VG_NAME": self.op.vg_name,
645
      }
646

    
647
  def BuildHooksNodes(self):
648
    """Build hooks nodes.
649

650
    """
651
    mn = self.cfg.GetMasterNode()
652
    return ([mn], [mn])
653

    
654
  def CheckPrereq(self):
655
    """Check prerequisites.
656

657
    This checks whether the given params don't conflict and
658
    if the given volume group is valid.
659

660
    """
661
    if self.op.vg_name is not None and not self.op.vg_name:
662
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
663
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
664
                                   " instances exist", errors.ECODE_INVAL)
665

    
666
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
667
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
668
        raise errors.OpPrereqError("Cannot disable drbd helper while"
669
                                   " drbd-based instances exist",
670
                                   errors.ECODE_INVAL)
671

    
672
    node_list = self.owned_locks(locking.LEVEL_NODE)
673

    
674
    vm_capable_nodes = [node.name
675
                        for node in self.cfg.GetAllNodesInfo().values()
676
                        if node.name in node_list and node.vm_capable]
677

    
678
    # if vg_name not None, checks given volume group on all nodes
679
    if self.op.vg_name:
680
      vglist = self.rpc.call_vg_list(vm_capable_nodes)
681
      for node in vm_capable_nodes:
682
        msg = vglist[node].fail_msg
683
        if msg:
684
          # ignoring down node
685
          self.LogWarning("Error while gathering data on node %s"
686
                          " (ignoring node): %s", node, msg)
687
          continue
688
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
689
                                              self.op.vg_name,
690
                                              constants.MIN_VG_SIZE)
691
        if vgstatus:
692
          raise errors.OpPrereqError("Error on node '%s': %s" %
693
                                     (node, vgstatus), errors.ECODE_ENVIRON)
694

    
695
    if self.op.drbd_helper:
696
      # checks given drbd helper on all nodes
697
      helpers = self.rpc.call_drbd_helper(node_list)
698
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
699
        if ninfo.offline:
700
          self.LogInfo("Not checking drbd helper on offline node %s", node)
701
          continue
702
        msg = helpers[node].fail_msg
703
        if msg:
704
          raise errors.OpPrereqError("Error checking drbd helper on node"
705
                                     " '%s': %s" % (node, msg),
706
                                     errors.ECODE_ENVIRON)
707
        node_helper = helpers[node].payload
708
        if node_helper != self.op.drbd_helper:
709
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
710
                                     (node, node_helper), errors.ECODE_ENVIRON)
711

    
712
    self.cluster = cluster = self.cfg.GetClusterInfo()
713
    # validate params changes
714
    if self.op.beparams:
715
      objects.UpgradeBeParams(self.op.beparams)
716
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
717
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
718

    
719
    if self.op.ndparams:
720
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
721
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
722

    
723
      # TODO: we need a more general way to handle resetting
724
      # cluster-level parameters to default values
725
      if self.new_ndparams["oob_program"] == "":
726
        self.new_ndparams["oob_program"] = \
727
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
728

    
729
    if self.op.hv_state:
730
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
731
                                           self.cluster.hv_state_static)
732
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
733
                               for hv, values in new_hv_state.items())
734

    
735
    if self.op.disk_state:
736
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
737
                                               self.cluster.disk_state_static)
738
      self.new_disk_state = \
739
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
740
                            for name, values in svalues.items()))
741
             for storage, svalues in new_disk_state.items())
742

    
743
    if self.op.ipolicy:
744
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
745
                                           group_policy=False)
746

    
747
      all_instances = self.cfg.GetAllInstancesInfo().values()
748
      violations = set()
749
      for group in self.cfg.GetAllNodeGroupsInfo().values():
750
        instances = frozenset([inst for inst in all_instances
751
                               if compat.any(node in group.members
752
                                             for node in inst.all_nodes)])
753
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
754
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
755
        new = ComputeNewInstanceViolations(ipol,
756
                                           new_ipolicy, instances, self.cfg)
757
        if new:
758
          violations.update(new)
759

    
760
      if violations:
761
        self.LogWarning("After the ipolicy change the following instances"
762
                        " violate them: %s",
763
                        utils.CommaJoin(utils.NiceSort(violations)))
764

    
765
    if self.op.nicparams:
766
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
767
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
768
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
769
      nic_errors = []
770

    
771
      # check all instances for consistency
772
      for instance in self.cfg.GetAllInstancesInfo().values():
773
        for nic_idx, nic in enumerate(instance.nics):
774
          params_copy = copy.deepcopy(nic.nicparams)
775
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
776

    
777
          # check parameter syntax
778
          try:
779
            objects.NIC.CheckParameterSyntax(params_filled)
780
          except errors.ConfigurationError, err:
781
            nic_errors.append("Instance %s, nic/%d: %s" %
782
                              (instance.name, nic_idx, err))
783

    
784
          # if we're moving instances to routed, check that they have an ip
785
          target_mode = params_filled[constants.NIC_MODE]
786
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
787
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
788
                              " address" % (instance.name, nic_idx))
789
      if nic_errors:
790
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
791
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
792

    
793
    # hypervisor list/parameters
794
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
795
    if self.op.hvparams:
796
      for hv_name, hv_dict in self.op.hvparams.items():
797
        if hv_name not in self.new_hvparams:
798
          self.new_hvparams[hv_name] = hv_dict
799
        else:
800
          self.new_hvparams[hv_name].update(hv_dict)
801

    
802
    # disk template parameters
803
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
804
    if self.op.diskparams:
805
      for dt_name, dt_params in self.op.diskparams.items():
806
        if dt_name not in self.op.diskparams:
807
          self.new_diskparams[dt_name] = dt_params
808
        else:
809
          self.new_diskparams[dt_name].update(dt_params)
810

    
811
    # os hypervisor parameters
812
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
813
    if self.op.os_hvp:
814
      for os_name, hvs in self.op.os_hvp.items():
815
        if os_name not in self.new_os_hvp:
816
          self.new_os_hvp[os_name] = hvs
817
        else:
818
          for hv_name, hv_dict in hvs.items():
819
            if hv_dict is None:
820
              # Delete if it exists
821
              self.new_os_hvp[os_name].pop(hv_name, None)
822
            elif hv_name not in self.new_os_hvp[os_name]:
823
              self.new_os_hvp[os_name][hv_name] = hv_dict
824
            else:
825
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
826

    
827
    # os parameters
828
    self.new_osp = objects.FillDict(cluster.osparams, {})
829
    if self.op.osparams:
830
      for os_name, osp in self.op.osparams.items():
831
        if os_name not in self.new_osp:
832
          self.new_osp[os_name] = {}
833

    
834
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
835
                                                 use_none=True)
836

    
837
        if not self.new_osp[os_name]:
838
          # we removed all parameters
839
          del self.new_osp[os_name]
840
        else:
841
          # check the parameter validity (remote check)
842
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
843
                        os_name, self.new_osp[os_name])
844

    
845
    # changes to the hypervisor list
846
    if self.op.enabled_hypervisors is not None:
847
      self.hv_list = self.op.enabled_hypervisors
848
      for hv in self.hv_list:
849
        # if the hypervisor doesn't already exist in the cluster
850
        # hvparams, we initialize it to empty, and then (in both
851
        # cases) we make sure to fill the defaults, as we might not
852
        # have a complete defaults list if the hypervisor wasn't
853
        # enabled before
854
        if hv not in new_hvp:
855
          new_hvp[hv] = {}
856
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
857
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
858
    else:
859
      self.hv_list = cluster.enabled_hypervisors
860

    
861
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
862
      # either the enabled list has changed, or the parameters have, validate
863
      for hv_name, hv_params in self.new_hvparams.items():
864
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
865
            (self.op.enabled_hypervisors and
866
             hv_name in self.op.enabled_hypervisors)):
867
          # either this is a new hypervisor, or its parameters have changed
868
          hv_class = hypervisor.GetHypervisorClass(hv_name)
869
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
870
          hv_class.CheckParameterSyntax(hv_params)
871
          CheckHVParams(self, node_list, hv_name, hv_params)
872

    
873
    self._CheckDiskTemplateConsistency()
874

    
875
    if self.op.os_hvp:
876
      # no need to check any newly-enabled hypervisors, since the
877
      # defaults have already been checked in the above code-block
878
      for os_name, os_hvp in self.new_os_hvp.items():
879
        for hv_name, hv_params in os_hvp.items():
880
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
881
          # we need to fill in the new os_hvp on top of the actual hv_p
882
          cluster_defaults = self.new_hvparams.get(hv_name, {})
883
          new_osp = objects.FillDict(cluster_defaults, hv_params)
884
          hv_class = hypervisor.GetHypervisorClass(hv_name)
885
          hv_class.CheckParameterSyntax(new_osp)
886
          CheckHVParams(self, node_list, hv_name, new_osp)
887

    
888
    if self.op.default_iallocator:
889
      alloc_script = utils.FindFile(self.op.default_iallocator,
890
                                    constants.IALLOCATOR_SEARCH_PATH,
891
                                    os.path.isfile)
892
      if alloc_script is None:
893
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
894
                                   " specified" % self.op.default_iallocator,
895
                                   errors.ECODE_INVAL)
896

    
897
  def _CheckDiskTemplateConsistency(self):
898
    """Check whether the disk templates that are going to be disabled
899
       are still in use by some instances.
900

901
    """
902
    if self.op.enabled_disk_templates:
903
      cluster = self.cfg.GetClusterInfo()
904
      instances = self.cfg.GetAllInstancesInfo()
905

    
906
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
907
        - set(self.op.enabled_disk_templates)
908
      for instance in instances.itervalues():
909
        if instance.disk_template in disk_templates_to_remove:
910
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
911
                                     " because instance '%s' is using it." %
912
                                     (instance.disk_template, instance.name))
913

    
914
  def Exec(self, feedback_fn):
915
    """Change the parameters of the cluster.
916

917
    """
918
    if self.op.vg_name is not None:
919
      new_volume = self.op.vg_name
920
      if not new_volume:
921
        new_volume = None
922
      if new_volume != self.cfg.GetVGName():
923
        self.cfg.SetVGName(new_volume)
924
      else:
925
        feedback_fn("Cluster LVM configuration already in desired"
926
                    " state, not changing")
927
    if self.op.drbd_helper is not None:
928
      new_helper = self.op.drbd_helper
929
      if not new_helper:
930
        new_helper = None
931
      if new_helper != self.cfg.GetDRBDHelper():
932
        self.cfg.SetDRBDHelper(new_helper)
933
      else:
934
        feedback_fn("Cluster DRBD helper already in desired state,"
935
                    " not changing")
936
    if self.op.hvparams:
937
      self.cluster.hvparams = self.new_hvparams
938
    if self.op.os_hvp:
939
      self.cluster.os_hvp = self.new_os_hvp
940
    if self.op.enabled_hypervisors is not None:
941
      self.cluster.hvparams = self.new_hvparams
942
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
943
    if self.op.enabled_disk_templates:
944
      self.cluster.enabled_disk_templates = \
945
        list(set(self.op.enabled_disk_templates))
946
    if self.op.beparams:
947
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
948
    if self.op.nicparams:
949
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
950
    if self.op.ipolicy:
951
      self.cluster.ipolicy = self.new_ipolicy
952
    if self.op.osparams:
953
      self.cluster.osparams = self.new_osp
954
    if self.op.ndparams:
955
      self.cluster.ndparams = self.new_ndparams
956
    if self.op.diskparams:
957
      self.cluster.diskparams = self.new_diskparams
958
    if self.op.hv_state:
959
      self.cluster.hv_state_static = self.new_hv_state
960
    if self.op.disk_state:
961
      self.cluster.disk_state_static = self.new_disk_state
962

    
963
    if self.op.candidate_pool_size is not None:
964
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
965
      # we need to update the pool size here, otherwise the save will fail
966
      AdjustCandidatePool(self, [])
967

    
968
    if self.op.maintain_node_health is not None:
969
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
970
        feedback_fn("Note: CONFD was disabled at build time, node health"
971
                    " maintenance is not useful (still enabling it)")
972
      self.cluster.maintain_node_health = self.op.maintain_node_health
973

    
974
    if self.op.prealloc_wipe_disks is not None:
975
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
976

    
977
    if self.op.add_uids is not None:
978
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
979

    
980
    if self.op.remove_uids is not None:
981
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
982

    
983
    if self.op.uid_pool is not None:
984
      self.cluster.uid_pool = self.op.uid_pool
985

    
986
    if self.op.default_iallocator is not None:
987
      self.cluster.default_iallocator = self.op.default_iallocator
988

    
989
    if self.op.reserved_lvs is not None:
990
      self.cluster.reserved_lvs = self.op.reserved_lvs
991

    
992
    if self.op.use_external_mip_script is not None:
993
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
994

    
995
    def helper_os(aname, mods, desc):
996
      desc += " OS list"
997
      lst = getattr(self.cluster, aname)
998
      for key, val in mods:
999
        if key == constants.DDM_ADD:
1000
          if val in lst:
1001
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1002
          else:
1003
            lst.append(val)
1004
        elif key == constants.DDM_REMOVE:
1005
          if val in lst:
1006
            lst.remove(val)
1007
          else:
1008
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1009
        else:
1010
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1011

    
1012
    if self.op.hidden_os:
1013
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1014

    
1015
    if self.op.blacklisted_os:
1016
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1017

    
1018
    if self.op.master_netdev:
1019
      master_params = self.cfg.GetMasterNetworkParameters()
1020
      ems = self.cfg.GetUseExternalMipScript()
1021
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1022
                  self.cluster.master_netdev)
1023
      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1024
                                                       master_params, ems)
1025
      if not self.op.force:
1026
        result.Raise("Could not disable the master ip")
1027
      else:
1028
        if result.fail_msg:
1029
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1030
                 result.fail_msg)
1031
          feedback_fn(msg)
1032
      feedback_fn("Changing master_netdev from %s to %s" %
1033
                  (master_params.netdev, self.op.master_netdev))
1034
      self.cluster.master_netdev = self.op.master_netdev
1035

    
1036
    if self.op.master_netmask:
1037
      master_params = self.cfg.GetMasterNetworkParameters()
1038
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1039
      result = self.rpc.call_node_change_master_netmask(master_params.name,
1040
                                                        master_params.netmask,
1041
                                                        self.op.master_netmask,
1042
                                                        master_params.ip,
1043
                                                        master_params.netdev)
1044
      if result.fail_msg:
1045
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
1046
        feedback_fn(msg)
1047

    
1048
      self.cluster.master_netmask = self.op.master_netmask
1049

    
1050
    self.cfg.Update(self.cluster, feedback_fn)
1051

    
1052
    if self.op.master_netdev:
1053
      master_params = self.cfg.GetMasterNetworkParameters()
1054
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1055
                  self.op.master_netdev)
1056
      ems = self.cfg.GetUseExternalMipScript()
1057
      result = self.rpc.call_node_activate_master_ip(master_params.name,
1058
                                                     master_params, ems)
1059
      if result.fail_msg:
1060
        self.LogWarning("Could not re-enable the master ip on"
1061
                        " the master, please restart manually: %s",
1062
                        result.fail_msg)
1063

    
1064

    
1065
class LUClusterVerify(NoHooksLU):
1066
  """Submits all jobs necessary to verify the cluster.
1067

1068
  """
1069
  REQ_BGL = False
1070

    
1071
  def ExpandNames(self):
1072
    self.needed_locks = {}
1073

    
1074
  def Exec(self, feedback_fn):
1075
    jobs = []
1076

    
1077
    if self.op.group_name:
1078
      groups = [self.op.group_name]
1079
      depends_fn = lambda: None
1080
    else:
1081
      groups = self.cfg.GetNodeGroupList()
1082

    
1083
      # Verify global configuration
1084
      jobs.append([
1085
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1086
        ])
1087

    
1088
      # Always depend on global verification
1089
      depends_fn = lambda: [(-len(jobs), [])]
1090

    
1091
    jobs.extend(
1092
      [opcodes.OpClusterVerifyGroup(group_name=group,
1093
                                    ignore_errors=self.op.ignore_errors,
1094
                                    depends=depends_fn())]
1095
      for group in groups)
1096

    
1097
    # Fix up all parameters
1098
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1099
      op.debug_simulate_errors = self.op.debug_simulate_errors
1100
      op.verbose = self.op.verbose
1101
      op.error_codes = self.op.error_codes
1102
      try:
1103
        op.skip_checks = self.op.skip_checks
1104
      except AttributeError:
1105
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1106

    
1107
    return ResultWithJobs(jobs)
1108

    
1109

    
1110
class _VerifyErrors(object):
1111
  """Mix-in for cluster/group verify LUs.
1112

1113
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1114
  self.op and self._feedback_fn to be available.)
1115

1116
  """
1117

    
1118
  ETYPE_FIELD = "code"
1119
  ETYPE_ERROR = "ERROR"
1120
  ETYPE_WARNING = "WARNING"
1121

    
1122
  def _Error(self, ecode, item, msg, *args, **kwargs):
1123
    """Format an error message.
1124

1125
    Based on the opcode's error_codes parameter, either format a
1126
    parseable error code, or a simpler error string.
1127

1128
    This must be called only from Exec and functions called from Exec.
1129

1130
    """
1131
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1132
    itype, etxt, _ = ecode
1133
    # If the error code is in the list of ignored errors, demote the error to a
1134
    # warning
1135
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1136
      ltype = self.ETYPE_WARNING
1137
    # first complete the msg
1138
    if args:
1139
      msg = msg % args
1140
    # then format the whole message
1141
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1142
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1143
    else:
1144
      if item:
1145
        item = " " + item
1146
      else:
1147
        item = ""
1148
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1149
    # and finally report it via the feedback_fn
1150
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1151
    # do not mark the operation as failed for WARN cases only
1152
    if ltype == self.ETYPE_ERROR:
1153
      self.bad = True
1154

    
1155
  def _ErrorIf(self, cond, *args, **kwargs):
1156
    """Log an error message if the passed condition is True.
1157

1158
    """
1159
    if (bool(cond)
1160
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1161
      self._Error(*args, **kwargs)
1162

    
1163

    
1164
def _VerifyCertificate(filename):
1165
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1166

1167
  @type filename: string
1168
  @param filename: Path to PEM file
1169

1170
  """
1171
  try:
1172
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1173
                                           utils.ReadFile(filename))
1174
  except Exception, err: # pylint: disable=W0703
1175
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1176
            "Failed to load X509 certificate %s: %s" % (filename, err))
1177

    
1178
  (errcode, msg) = \
1179
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1180
                                constants.SSL_CERT_EXPIRATION_ERROR)
1181

    
1182
  if msg:
1183
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1184
  else:
1185
    fnamemsg = None
1186

    
1187
  if errcode is None:
1188
    return (None, fnamemsg)
1189
  elif errcode == utils.CERT_WARNING:
1190
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1191
  elif errcode == utils.CERT_ERROR:
1192
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1193

    
1194
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1195

    
1196

    
1197
def _GetAllHypervisorParameters(cluster, instances):
1198
  """Compute the set of all hypervisor parameters.
1199

1200
  @type cluster: L{objects.Cluster}
1201
  @param cluster: the cluster object
1202
  @param instances: list of L{objects.Instance}
1203
  @param instances: additional instances from which to obtain parameters
1204
  @rtype: list of (origin, hypervisor, parameters)
1205
  @return: a list with all parameters found, indicating the hypervisor they
1206
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1207

1208
  """
1209
  hvp_data = []
1210

    
1211
  for hv_name in cluster.enabled_hypervisors:
1212
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1213

    
1214
  for os_name, os_hvp in cluster.os_hvp.items():
1215
    for hv_name, hv_params in os_hvp.items():
1216
      if hv_params:
1217
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1218
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1219

    
1220
  # TODO: collapse identical parameter values in a single one
1221
  for instance in instances:
1222
    if instance.hvparams:
1223
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1224
                       cluster.FillHV(instance)))
1225

    
1226
  return hvp_data
1227

    
1228

    
1229
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1230
  """Verifies the cluster config.
1231

1232
  """
1233
  REQ_BGL = False
1234

    
1235
  def _VerifyHVP(self, hvp_data):
1236
    """Verifies locally the syntax of the hypervisor parameters.
1237

1238
    """
1239
    for item, hv_name, hv_params in hvp_data:
1240
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1241
             (item, hv_name))
1242
      try:
1243
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1244
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1245
        hv_class.CheckParameterSyntax(hv_params)
1246
      except errors.GenericError, err:
1247
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1248

    
1249
  def ExpandNames(self):
1250
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1251
    self.share_locks = ShareAll()
1252

    
1253
  def CheckPrereq(self):
1254
    """Check prerequisites.
1255

1256
    """
1257
    # Retrieve all information
1258
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1259
    self.all_node_info = self.cfg.GetAllNodesInfo()
1260
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1261

    
1262
  def Exec(self, feedback_fn):
1263
    """Verify integrity of cluster, performing various test on nodes.
1264

1265
    """
1266
    self.bad = False
1267
    self._feedback_fn = feedback_fn
1268

    
1269
    feedback_fn("* Verifying cluster config")
1270

    
1271
    for msg in self.cfg.VerifyConfig():
1272
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1273

    
1274
    feedback_fn("* Verifying cluster certificate files")
1275

    
1276
    for cert_filename in pathutils.ALL_CERT_FILES:
1277
      (errcode, msg) = _VerifyCertificate(cert_filename)
1278
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1279

    
1280
    feedback_fn("* Verifying hypervisor parameters")
1281

    
1282
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1283
                                                self.all_inst_info.values()))
1284

    
1285
    feedback_fn("* Verifying all nodes belong to an existing group")
1286

    
1287
    # We do this verification here because, should this bogus circumstance
1288
    # occur, it would never be caught by VerifyGroup, which only acts on
1289
    # nodes/instances reachable from existing node groups.
1290

    
1291
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1292
                         if node.group not in self.all_group_info)
1293

    
1294
    dangling_instances = {}
1295
    no_node_instances = []
1296

    
1297
    for inst in self.all_inst_info.values():
1298
      if inst.primary_node in dangling_nodes:
1299
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1300
      elif inst.primary_node not in self.all_node_info:
1301
        no_node_instances.append(inst.name)
1302

    
1303
    pretty_dangling = [
1304
        "%s (%s)" %
1305
        (node.name,
1306
         utils.CommaJoin(dangling_instances.get(node.name,
1307
                                                ["no instances"])))
1308
        for node in dangling_nodes]
1309

    
1310
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1311
                  None,
1312
                  "the following nodes (and their instances) belong to a non"
1313
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1314

    
1315
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1316
                  None,
1317
                  "the following instances have a non-existing primary-node:"
1318
                  " %s", utils.CommaJoin(no_node_instances))
1319

    
1320
    return not self.bad
1321

    
1322

    
1323
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1324
  """Verifies the status of a node group.
1325

1326
  """
1327
  HPATH = "cluster-verify"
1328
  HTYPE = constants.HTYPE_CLUSTER
1329
  REQ_BGL = False
1330

    
1331
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1332

    
1333
  class NodeImage(object):
1334
    """A class representing the logical and physical status of a node.
1335

1336
    @type name: string
1337
    @ivar name: the node name to which this object refers
1338
    @ivar volumes: a structure as returned from
1339
        L{ganeti.backend.GetVolumeList} (runtime)
1340
    @ivar instances: a list of running instances (runtime)
1341
    @ivar pinst: list of configured primary instances (config)
1342
    @ivar sinst: list of configured secondary instances (config)
1343
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1344
        instances for which this node is secondary (config)
1345
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1346
    @ivar dfree: free disk, as reported by the node (runtime)
1347
    @ivar offline: the offline status (config)
1348
    @type rpc_fail: boolean
1349
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1350
        not whether the individual keys were correct) (runtime)
1351
    @type lvm_fail: boolean
1352
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1353
    @type hyp_fail: boolean
1354
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1355
    @type ghost: boolean
1356
    @ivar ghost: whether this is a known node or not (config)
1357
    @type os_fail: boolean
1358
    @ivar os_fail: whether the RPC call didn't return valid OS data
1359
    @type oslist: list
1360
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1361
    @type vm_capable: boolean
1362
    @ivar vm_capable: whether the node can host instances
1363
    @type pv_min: float
1364
    @ivar pv_min: size in MiB of the smallest PVs
1365
    @type pv_max: float
1366
    @ivar pv_max: size in MiB of the biggest PVs
1367

1368
    """
1369
    def __init__(self, offline=False, name=None, vm_capable=True):
1370
      self.name = name
1371
      self.volumes = {}
1372
      self.instances = []
1373
      self.pinst = []
1374
      self.sinst = []
1375
      self.sbp = {}
1376
      self.mfree = 0
1377
      self.dfree = 0
1378
      self.offline = offline
1379
      self.vm_capable = vm_capable
1380
      self.rpc_fail = False
1381
      self.lvm_fail = False
1382
      self.hyp_fail = False
1383
      self.ghost = False
1384
      self.os_fail = False
1385
      self.oslist = {}
1386
      self.pv_min = None
1387
      self.pv_max = None
1388

    
1389
  def ExpandNames(self):
1390
    # This raises errors.OpPrereqError on its own:
1391
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1392

    
1393
    # Get instances in node group; this is unsafe and needs verification later
1394
    inst_names = \
1395
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1396

    
1397
    self.needed_locks = {
1398
      locking.LEVEL_INSTANCE: inst_names,
1399
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1400
      locking.LEVEL_NODE: [],
1401

    
1402
      # This opcode is run by watcher every five minutes and acquires all nodes
1403
      # for a group. It doesn't run for a long time, so it's better to acquire
1404
      # the node allocation lock as well.
1405
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1406
      }
1407

    
1408
    self.share_locks = ShareAll()
1409

    
1410
  def DeclareLocks(self, level):
1411
    if level == locking.LEVEL_NODE:
1412
      # Get members of node group; this is unsafe and needs verification later
1413
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1414

    
1415
      all_inst_info = self.cfg.GetAllInstancesInfo()
1416

    
1417
      # In Exec(), we warn about mirrored instances that have primary and
1418
      # secondary living in separate node groups. To fully verify that
1419
      # volumes for these instances are healthy, we will need to do an
1420
      # extra call to their secondaries. We ensure here those nodes will
1421
      # be locked.
1422
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1423
        # Important: access only the instances whose lock is owned
1424
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1425
          nodes.update(all_inst_info[inst].secondary_nodes)
1426

    
1427
      self.needed_locks[locking.LEVEL_NODE] = nodes
1428

    
1429
  def CheckPrereq(self):
1430
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1431
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1432

    
1433
    group_nodes = set(self.group_info.members)
1434
    group_instances = \
1435
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1436

    
1437
    unlocked_nodes = \
1438
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1439

    
1440
    unlocked_instances = \
1441
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1442

    
1443
    if unlocked_nodes:
1444
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1445
                                 utils.CommaJoin(unlocked_nodes),
1446
                                 errors.ECODE_STATE)
1447

    
1448
    if unlocked_instances:
1449
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1450
                                 utils.CommaJoin(unlocked_instances),
1451
                                 errors.ECODE_STATE)
1452

    
1453
    self.all_node_info = self.cfg.GetAllNodesInfo()
1454
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1455

    
1456
    self.my_node_names = utils.NiceSort(group_nodes)
1457
    self.my_inst_names = utils.NiceSort(group_instances)
1458

    
1459
    self.my_node_info = dict((name, self.all_node_info[name])
1460
                             for name in self.my_node_names)
1461

    
1462
    self.my_inst_info = dict((name, self.all_inst_info[name])
1463
                             for name in self.my_inst_names)
1464

    
1465
    # We detect here the nodes that will need the extra RPC calls for verifying
1466
    # split LV volumes; they should be locked.
1467
    extra_lv_nodes = set()
1468

    
1469
    for inst in self.my_inst_info.values():
1470
      if inst.disk_template in constants.DTS_INT_MIRROR:
1471
        for nname in inst.all_nodes:
1472
          if self.all_node_info[nname].group != self.group_uuid:
1473
            extra_lv_nodes.add(nname)
1474

    
1475
    unlocked_lv_nodes = \
1476
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1477

    
1478
    if unlocked_lv_nodes:
1479
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1480
                                 utils.CommaJoin(unlocked_lv_nodes),
1481
                                 errors.ECODE_STATE)
1482
    self.extra_lv_nodes = list(extra_lv_nodes)
1483

    
1484
  def _VerifyNode(self, ninfo, nresult):
1485
    """Perform some basic validation on data returned from a node.
1486

1487
      - check the result data structure is well formed and has all the
1488
        mandatory fields
1489
      - check ganeti version
1490

1491
    @type ninfo: L{objects.Node}
1492
    @param ninfo: the node to check
1493
    @param nresult: the results from the node
1494
    @rtype: boolean
1495
    @return: whether overall this call was successful (and we can expect
1496
         reasonable values in the respose)
1497

1498
    """
1499
    node = ninfo.name
1500
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1501

    
1502
    # main result, nresult should be a non-empty dict
1503
    test = not nresult or not isinstance(nresult, dict)
1504
    _ErrorIf(test, constants.CV_ENODERPC, node,
1505
                  "unable to verify node: no data returned")
1506
    if test:
1507
      return False
1508

    
1509
    # compares ganeti version
1510
    local_version = constants.PROTOCOL_VERSION
1511
    remote_version = nresult.get("version", None)
1512
    test = not (remote_version and
1513
                isinstance(remote_version, (list, tuple)) and
1514
                len(remote_version) == 2)
1515
    _ErrorIf(test, constants.CV_ENODERPC, node,
1516
             "connection to node returned invalid data")
1517
    if test:
1518
      return False
1519

    
1520
    test = local_version != remote_version[0]
1521
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1522
             "incompatible protocol versions: master %s,"
1523
             " node %s", local_version, remote_version[0])
1524
    if test:
1525
      return False
1526

    
1527
    # node seems compatible, we can actually try to look into its results
1528

    
1529
    # full package version
1530
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1531
                  constants.CV_ENODEVERSION, node,
1532
                  "software version mismatch: master %s, node %s",
1533
                  constants.RELEASE_VERSION, remote_version[1],
1534
                  code=self.ETYPE_WARNING)
1535

    
1536
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1537
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1538
      for hv_name, hv_result in hyp_result.iteritems():
1539
        test = hv_result is not None
1540
        _ErrorIf(test, constants.CV_ENODEHV, node,
1541
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1542

    
1543
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1544
    if ninfo.vm_capable and isinstance(hvp_result, list):
1545
      for item, hv_name, hv_result in hvp_result:
1546
        _ErrorIf(True, constants.CV_ENODEHV, node,
1547
                 "hypervisor %s parameter verify failure (source %s): %s",
1548
                 hv_name, item, hv_result)
1549

    
1550
    test = nresult.get(constants.NV_NODESETUP,
1551
                       ["Missing NODESETUP results"])
1552
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1553
             "; ".join(test))
1554

    
1555
    return True
1556

    
1557
  def _VerifyNodeTime(self, ninfo, nresult,
1558
                      nvinfo_starttime, nvinfo_endtime):
1559
    """Check the node time.
1560

1561
    @type ninfo: L{objects.Node}
1562
    @param ninfo: the node to check
1563
    @param nresult: the remote results for the node
1564
    @param nvinfo_starttime: the start time of the RPC call
1565
    @param nvinfo_endtime: the end time of the RPC call
1566

1567
    """
1568
    node = ninfo.name
1569
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1570

    
1571
    ntime = nresult.get(constants.NV_TIME, None)
1572
    try:
1573
      ntime_merged = utils.MergeTime(ntime)
1574
    except (ValueError, TypeError):
1575
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1576
      return
1577

    
1578
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1579
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1580
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1581
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1582
    else:
1583
      ntime_diff = None
1584

    
1585
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1586
             "Node time diverges by at least %s from master node time",
1587
             ntime_diff)
1588

    
1589
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1590
    """Check the node LVM results and update info for cross-node checks.
1591

1592
    @type ninfo: L{objects.Node}
1593
    @param ninfo: the node to check
1594
    @param nresult: the remote results for the node
1595
    @param vg_name: the configured VG name
1596
    @type nimg: L{NodeImage}
1597
    @param nimg: node image
1598

1599
    """
1600
    if vg_name is None:
1601
      return
1602

    
1603
    node = ninfo.name
1604
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1605

    
1606
    # checks vg existence and size > 20G
1607
    vglist = nresult.get(constants.NV_VGLIST, None)
1608
    test = not vglist
1609
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1610
    if not test:
1611
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1612
                                            constants.MIN_VG_SIZE)
1613
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1614

    
1615
    # Check PVs
1616
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1617
    for em in errmsgs:
1618
      self._Error(constants.CV_ENODELVM, node, em)
1619
    if pvminmax is not None:
1620
      (nimg.pv_min, nimg.pv_max) = pvminmax
1621

    
1622
  def _VerifyGroupLVM(self, node_image, vg_name):
1623
    """Check cross-node consistency in LVM.
1624

1625
    @type node_image: dict
1626
    @param node_image: info about nodes, mapping from node to names to
1627
      L{NodeImage} objects
1628
    @param vg_name: the configured VG name
1629

1630
    """
1631
    if vg_name is None:
1632
      return
1633

    
1634
    # Only exlcusive storage needs this kind of checks
1635
    if not self._exclusive_storage:
1636
      return
1637

    
1638
    # exclusive_storage wants all PVs to have the same size (approximately),
1639
    # if the smallest and the biggest ones are okay, everything is fine.
1640
    # pv_min is None iff pv_max is None
1641
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1642
    if not vals:
1643
      return
1644
    (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1645
    (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1646
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1647
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1648
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1649
                  " on %s, biggest (%s MB) is on %s",
1650
                  pvmin, minnode, pvmax, maxnode)
1651

    
1652
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1653
    """Check the node bridges.
1654

1655
    @type ninfo: L{objects.Node}
1656
    @param ninfo: the node to check
1657
    @param nresult: the remote results for the node
1658
    @param bridges: the expected list of bridges
1659

1660
    """
1661
    if not bridges:
1662
      return
1663

    
1664
    node = ninfo.name
1665
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1666

    
1667
    missing = nresult.get(constants.NV_BRIDGES, None)
1668
    test = not isinstance(missing, list)
1669
    _ErrorIf(test, constants.CV_ENODENET, node,
1670
             "did not return valid bridge information")
1671
    if not test:
1672
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1673
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1674

    
1675
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1676
    """Check the results of user scripts presence and executability on the node
1677

1678
    @type ninfo: L{objects.Node}
1679
    @param ninfo: the node to check
1680
    @param nresult: the remote results for the node
1681

1682
    """
1683
    node = ninfo.name
1684

    
1685
    test = not constants.NV_USERSCRIPTS in nresult
1686
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1687
                  "did not return user scripts information")
1688

    
1689
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1690
    if not test:
1691
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1692
                    "user scripts not present or not executable: %s" %
1693
                    utils.CommaJoin(sorted(broken_scripts)))
1694

    
1695
  def _VerifyNodeNetwork(self, ninfo, nresult):
1696
    """Check the node network connectivity results.
1697

1698
    @type ninfo: L{objects.Node}
1699
    @param ninfo: the node to check
1700
    @param nresult: the remote results for the node
1701

1702
    """
1703
    node = ninfo.name
1704
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1705

    
1706
    test = constants.NV_NODELIST not in nresult
1707
    _ErrorIf(test, constants.CV_ENODESSH, node,
1708
             "node hasn't returned node ssh connectivity data")
1709
    if not test:
1710
      if nresult[constants.NV_NODELIST]:
1711
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1712
          _ErrorIf(True, constants.CV_ENODESSH, node,
1713
                   "ssh communication with node '%s': %s", a_node, a_msg)
1714

    
1715
    test = constants.NV_NODENETTEST not in nresult
1716
    _ErrorIf(test, constants.CV_ENODENET, node,
1717
             "node hasn't returned node tcp connectivity data")
1718
    if not test:
1719
      if nresult[constants.NV_NODENETTEST]:
1720
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1721
        for anode in nlist:
1722
          _ErrorIf(True, constants.CV_ENODENET, node,
1723
                   "tcp communication with node '%s': %s",
1724
                   anode, nresult[constants.NV_NODENETTEST][anode])
1725

    
1726
    test = constants.NV_MASTERIP not in nresult
1727
    _ErrorIf(test, constants.CV_ENODENET, node,
1728
             "node hasn't returned node master IP reachability data")
1729
    if not test:
1730
      if not nresult[constants.NV_MASTERIP]:
1731
        if node == self.master_node:
1732
          msg = "the master node cannot reach the master IP (not configured?)"
1733
        else:
1734
          msg = "cannot reach the master IP"
1735
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
1736

    
1737
  def _VerifyInstance(self, instance, inst_config, node_image,
1738
                      diskstatus):
1739
    """Verify an instance.
1740

1741
    This function checks to see if the required block devices are
1742
    available on the instance's node, and that the nodes are in the correct
1743
    state.
1744

1745
    """
1746
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1747
    pnode = inst_config.primary_node
1748
    pnode_img = node_image[pnode]
1749
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1750

    
1751
    node_vol_should = {}
1752
    inst_config.MapLVsByNode(node_vol_should)
1753

    
1754
    cluster = self.cfg.GetClusterInfo()
1755
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1756
                                                            self.group_info)
1757
    err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1758
    _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1759
             code=self.ETYPE_WARNING)
1760

    
1761
    for node in node_vol_should:
1762
      n_img = node_image[node]
1763
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1764
        # ignore missing volumes on offline or broken nodes
1765
        continue
1766
      for volume in node_vol_should[node]:
1767
        test = volume not in n_img.volumes
1768
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1769
                 "volume %s missing on node %s", volume, node)
1770

    
1771
    if inst_config.admin_state == constants.ADMINST_UP:
1772
      test = instance not in pnode_img.instances and not pnode_img.offline
1773
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1774
               "instance not running on its primary node %s",
1775
               pnode)
1776
      _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1777
               "instance is marked as running and lives on offline node %s",
1778
               pnode)
1779

    
1780
    diskdata = [(nname, success, status, idx)
1781
                for (nname, disks) in diskstatus.items()
1782
                for idx, (success, status) in enumerate(disks)]
1783

    
1784
    for nname, success, bdev_status, idx in diskdata:
1785
      # the 'ghost node' construction in Exec() ensures that we have a
1786
      # node here
1787
      snode = node_image[nname]
1788
      bad_snode = snode.ghost or snode.offline
1789
      _ErrorIf(inst_config.disks_active and
1790
               not success and not bad_snode,
1791
               constants.CV_EINSTANCEFAULTYDISK, instance,
1792
               "couldn't retrieve status for disk/%s on %s: %s",
1793
               idx, nname, bdev_status)
1794
      _ErrorIf((inst_config.disks_active and
1795
                success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1796
               constants.CV_EINSTANCEFAULTYDISK, instance,
1797
               "disk/%s on %s is faulty", idx, nname)
1798

    
1799
    _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1800
             constants.CV_ENODERPC, pnode, "instance %s, connection to"
1801
             " primary node failed", instance)
1802

    
1803
    _ErrorIf(len(inst_config.secondary_nodes) > 1,
1804
             constants.CV_EINSTANCELAYOUT,
1805
             instance, "instance has multiple secondary nodes: %s",
1806
             utils.CommaJoin(inst_config.secondary_nodes),
1807
             code=self.ETYPE_WARNING)
1808

    
1809
    if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1810
      # Disk template not compatible with exclusive_storage: no instance
1811
      # node should have the flag set
1812
      es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1813
                                                     inst_config.all_nodes)
1814
      es_nodes = [n for (n, es) in es_flags.items()
1815
                  if es]
1816
      _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance,
1817
               "instance has template %s, which is not supported on nodes"
1818
               " that have exclusive storage set: %s",
1819
               inst_config.disk_template, utils.CommaJoin(es_nodes))
1820

    
1821
    if inst_config.disk_template in constants.DTS_INT_MIRROR:
1822
      instance_nodes = utils.NiceSort(inst_config.all_nodes)
1823
      instance_groups = {}
1824

    
1825
      for node in instance_nodes:
1826
        instance_groups.setdefault(self.all_node_info[node].group,
1827
                                   []).append(node)
1828

    
1829
      pretty_list = [
1830
        "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1831
        # Sort so that we always list the primary node first.
1832
        for group, nodes in sorted(instance_groups.items(),
1833
                                   key=lambda (_, nodes): pnode in nodes,
1834
                                   reverse=True)]
1835

    
1836
      self._ErrorIf(len(instance_groups) > 1,
1837
                    constants.CV_EINSTANCESPLITGROUPS,
1838
                    instance, "instance has primary and secondary nodes in"
1839
                    " different groups: %s", utils.CommaJoin(pretty_list),
1840
                    code=self.ETYPE_WARNING)
1841

    
1842
    inst_nodes_offline = []
1843
    for snode in inst_config.secondary_nodes:
1844
      s_img = node_image[snode]
1845
      _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1846
               snode, "instance %s, connection to secondary node failed",
1847
               instance)
1848

    
1849
      if s_img.offline:
1850
        inst_nodes_offline.append(snode)
1851

    
1852
    # warn that the instance lives on offline nodes
1853
    _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1854
             "instance has offline secondary node(s) %s",
1855
             utils.CommaJoin(inst_nodes_offline))
1856
    # ... or ghost/non-vm_capable nodes
1857
    for node in inst_config.all_nodes:
1858
      _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1859
               instance, "instance lives on ghost node %s", node)
1860
      _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1861
               instance, "instance lives on non-vm_capable node %s", node)
1862

    
1863
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1864
    """Verify if there are any unknown volumes in the cluster.
1865

1866
    The .os, .swap and backup volumes are ignored. All other volumes are
1867
    reported as unknown.
1868

1869
    @type reserved: L{ganeti.utils.FieldSet}
1870
    @param reserved: a FieldSet of reserved volume names
1871

1872
    """
1873
    for node, n_img in node_image.items():
1874
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1875
          self.all_node_info[node].group != self.group_uuid):
1876
        # skip non-healthy nodes
1877
        continue
1878
      for volume in n_img.volumes:
1879
        test = ((node not in node_vol_should or
1880
                volume not in node_vol_should[node]) and
1881
                not reserved.Matches(volume))
1882
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1883
                      "volume %s is unknown", volume)
1884

    
1885
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1886
    """Verify N+1 Memory Resilience.
1887

1888
    Check that if one single node dies we can still start all the
1889
    instances it was primary for.
1890

1891
    """
1892
    cluster_info = self.cfg.GetClusterInfo()
1893
    for node, n_img in node_image.items():
1894
      # This code checks that every node which is now listed as
1895
      # secondary has enough memory to host all instances it is
1896
      # supposed to should a single other node in the cluster fail.
1897
      # FIXME: not ready for failover to an arbitrary node
1898
      # FIXME: does not support file-backed instances
1899
      # WARNING: we currently take into account down instances as well
1900
      # as up ones, considering that even if they're down someone
1901
      # might want to start them even in the event of a node failure.
1902
      if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1903
        # we're skipping nodes marked offline and nodes in other groups from
1904
        # the N+1 warning, since most likely we don't have good memory
1905
        # infromation from them; we already list instances living on such
1906
        # nodes, and that's enough warning
1907
        continue
1908
      #TODO(dynmem): also consider ballooning out other instances
1909
      for prinode, instances in n_img.sbp.items():
1910
        needed_mem = 0
1911
        for instance in instances:
1912
          bep = cluster_info.FillBE(instance_cfg[instance])
1913
          if bep[constants.BE_AUTO_BALANCE]:
1914
            needed_mem += bep[constants.BE_MINMEM]
1915
        test = n_img.mfree < needed_mem
1916
        self._ErrorIf(test, constants.CV_ENODEN1, node,
1917
                      "not enough memory to accomodate instance failovers"
1918
                      " should node %s fail (%dMiB needed, %dMiB available)",
1919
                      prinode, needed_mem, n_img.mfree)
1920

    
1921
  @classmethod
1922
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1923
                   (files_all, files_opt, files_mc, files_vm)):
1924
    """Verifies file checksums collected from all nodes.
1925

1926
    @param errorif: Callback for reporting errors
1927
    @param nodeinfo: List of L{objects.Node} objects
1928
    @param master_node: Name of master node
1929
    @param all_nvinfo: RPC results
1930

1931
    """
1932
    # Define functions determining which nodes to consider for a file
1933
    files2nodefn = [
1934
      (files_all, None),
1935
      (files_mc, lambda node: (node.master_candidate or
1936
                               node.name == master_node)),
1937
      (files_vm, lambda node: node.vm_capable),
1938
      ]
1939

    
1940
    # Build mapping from filename to list of nodes which should have the file
1941
    nodefiles = {}
1942
    for (files, fn) in files2nodefn:
1943
      if fn is None:
1944
        filenodes = nodeinfo
1945
      else:
1946
        filenodes = filter(fn, nodeinfo)
1947
      nodefiles.update((filename,
1948
                        frozenset(map(operator.attrgetter("name"), filenodes)))
1949
                       for filename in files)
1950

    
1951
    assert set(nodefiles) == (files_all | files_mc | files_vm)
1952

    
1953
    fileinfo = dict((filename, {}) for filename in nodefiles)
1954
    ignore_nodes = set()
1955

    
1956
    for node in nodeinfo:
1957
      if node.offline:
1958
        ignore_nodes.add(node.name)
1959
        continue
1960

    
1961
      nresult = all_nvinfo[node.name]
1962

    
1963
      if nresult.fail_msg or not nresult.payload:
1964
        node_files = None
1965
      else:
1966
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
1967
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
1968
                          for (key, value) in fingerprints.items())
1969
        del fingerprints
1970

    
1971
      test = not (node_files and isinstance(node_files, dict))
1972
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
1973
              "Node did not return file checksum data")
1974
      if test:
1975
        ignore_nodes.add(node.name)
1976
        continue
1977

    
1978
      # Build per-checksum mapping from filename to nodes having it
1979
      for (filename, checksum) in node_files.items():
1980
        assert filename in nodefiles
1981
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
1982

    
1983
    for (filename, checksums) in fileinfo.items():
1984
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1985

    
1986
      # Nodes having the file
1987
      with_file = frozenset(node_name
1988
                            for nodes in fileinfo[filename].values()
1989
                            for node_name in nodes) - ignore_nodes
1990

    
1991
      expected_nodes = nodefiles[filename] - ignore_nodes
1992

    
1993
      # Nodes missing file
1994
      missing_file = expected_nodes - with_file
1995

    
1996
      if filename in files_opt:
1997
        # All or no nodes
1998
        errorif(missing_file and missing_file != expected_nodes,
1999
                constants.CV_ECLUSTERFILECHECK, None,
2000
                "File %s is optional, but it must exist on all or no"
2001
                " nodes (not found on %s)",
2002
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2003
      else:
2004
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2005
                "File %s is missing from node(s) %s", filename,
2006
                utils.CommaJoin(utils.NiceSort(missing_file)))
2007

    
2008
        # Warn if a node has a file it shouldn't
2009
        unexpected = with_file - expected_nodes
2010
        errorif(unexpected,
2011
                constants.CV_ECLUSTERFILECHECK, None,
2012
                "File %s should not exist on node(s) %s",
2013
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2014

    
2015
      # See if there are multiple versions of the file
2016
      test = len(checksums) > 1
2017
      if test:
2018
        variants = ["variant %s on %s" %
2019
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2020
                    for (idx, (checksum, nodes)) in
2021
                      enumerate(sorted(checksums.items()))]
2022
      else:
2023
        variants = []
2024

    
2025
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2026
              "File %s found with %s different checksums (%s)",
2027
              filename, len(checksums), "; ".join(variants))
2028

    
2029
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2030
                      drbd_map):
2031
    """Verifies and the node DRBD status.
2032

2033
    @type ninfo: L{objects.Node}
2034
    @param ninfo: the node to check
2035
    @param nresult: the remote results for the node
2036
    @param instanceinfo: the dict of instances
2037
    @param drbd_helper: the configured DRBD usermode helper
2038
    @param drbd_map: the DRBD map as returned by
2039
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2040

2041
    """
2042
    node = ninfo.name
2043
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2044

    
2045
    if drbd_helper:
2046
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2047
      test = (helper_result is None)
2048
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2049
               "no drbd usermode helper returned")
2050
      if helper_result:
2051
        status, payload = helper_result
2052
        test = not status
2053
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2054
                 "drbd usermode helper check unsuccessful: %s", payload)
2055
        test = status and (payload != drbd_helper)
2056
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2057
                 "wrong drbd usermode helper: %s", payload)
2058

    
2059
    # compute the DRBD minors
2060
    node_drbd = {}
2061
    for minor, instance in drbd_map[node].items():
2062
      test = instance not in instanceinfo
2063
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2064
               "ghost instance '%s' in temporary DRBD map", instance)
2065
        # ghost instance should not be running, but otherwise we
2066
        # don't give double warnings (both ghost instance and
2067
        # unallocated minor in use)
2068
      if test:
2069
        node_drbd[minor] = (instance, False)
2070
      else:
2071
        instance = instanceinfo[instance]
2072
        node_drbd[minor] = (instance.name, instance.disks_active)
2073

    
2074
    # and now check them
2075
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2076
    test = not isinstance(used_minors, (tuple, list))
2077
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2078
             "cannot parse drbd status file: %s", str(used_minors))
2079
    if test:
2080
      # we cannot check drbd status
2081
      return
2082

    
2083
    for minor, (iname, must_exist) in node_drbd.items():
2084
      test = minor not in used_minors and must_exist
2085
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2086
               "drbd minor %d of instance %s is not active", minor, iname)
2087
    for minor in used_minors:
2088
      test = minor not in node_drbd
2089
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2090
               "unallocated drbd minor %d is in use", minor)
2091

    
2092
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2093
    """Builds the node OS structures.
2094

2095
    @type ninfo: L{objects.Node}
2096
    @param ninfo: the node to check
2097
    @param nresult: the remote results for the node
2098
    @param nimg: the node image object
2099

2100
    """
2101
    node = ninfo.name
2102
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2103

    
2104
    remote_os = nresult.get(constants.NV_OSLIST, None)
2105
    test = (not isinstance(remote_os, list) or
2106
            not compat.all(isinstance(v, list) and len(v) == 7
2107
                           for v in remote_os))
2108

    
2109
    _ErrorIf(test, constants.CV_ENODEOS, node,
2110
             "node hasn't returned valid OS data")
2111

    
2112
    nimg.os_fail = test
2113

    
2114
    if test:
2115
      return
2116

    
2117
    os_dict = {}
2118

    
2119
    for (name, os_path, status, diagnose,
2120
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2121

    
2122
      if name not in os_dict:
2123
        os_dict[name] = []
2124

    
2125
      # parameters is a list of lists instead of list of tuples due to
2126
      # JSON lacking a real tuple type, fix it:
2127
      parameters = [tuple(v) for v in parameters]
2128
      os_dict[name].append((os_path, status, diagnose,
2129
                            set(variants), set(parameters), set(api_ver)))
2130

    
2131
    nimg.oslist = os_dict
2132

    
2133
  def _VerifyNodeOS(self, ninfo, nimg, base):
2134
    """Verifies the node OS list.
2135

2136
    @type ninfo: L{objects.Node}
2137
    @param ninfo: the node to check
2138
    @param nimg: the node image object
2139
    @param base: the 'template' node we match against (e.g. from the master)
2140

2141
    """
2142
    node = ninfo.name
2143
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2144

    
2145
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2146

    
2147
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2148
    for os_name, os_data in nimg.oslist.items():
2149
      assert os_data, "Empty OS status for OS %s?!" % os_name
2150
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2151
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2152
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2153
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2154
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2155
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2156
      # comparisons with the 'base' image
2157
      test = os_name not in base.oslist
2158
      _ErrorIf(test, constants.CV_ENODEOS, node,
2159
               "Extra OS %s not present on reference node (%s)",
2160
               os_name, base.name)
2161
      if test:
2162
        continue
2163
      assert base.oslist[os_name], "Base node has empty OS status?"
2164
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2165
      if not b_status:
2166
        # base OS is invalid, skipping
2167
        continue
2168
      for kind, a, b in [("API version", f_api, b_api),
2169
                         ("variants list", f_var, b_var),
2170
                         ("parameters", beautify_params(f_param),
2171
                          beautify_params(b_param))]:
2172
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2173
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2174
                 kind, os_name, base.name,
2175
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2176

    
2177
    # check any missing OSes
2178
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2179
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2180
             "OSes present on reference node %s but missing on this node: %s",
2181
             base.name, utils.CommaJoin(missing))
2182

    
2183
  def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2184
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2185

2186
    @type ninfo: L{objects.Node}
2187
    @param ninfo: the node to check
2188
    @param nresult: the remote results for the node
2189
    @type is_master: bool
2190
    @param is_master: Whether node is the master node
2191

2192
    """
2193
    node = ninfo.name
2194

    
2195
    if (is_master and
2196
        (constants.ENABLE_FILE_STORAGE or
2197
         constants.ENABLE_SHARED_FILE_STORAGE)):
2198
      try:
2199
        fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2200
      except KeyError:
2201
        # This should never happen
2202
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2203
                      "Node did not return forbidden file storage paths")
2204
      else:
2205
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2206
                      "Found forbidden file storage paths: %s",
2207
                      utils.CommaJoin(fspaths))
2208
    else:
2209
      self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2210
                    constants.CV_ENODEFILESTORAGEPATHS, node,
2211
                    "Node should not have returned forbidden file storage"
2212
                    " paths")
2213

    
2214
  def _VerifyOob(self, ninfo, nresult):
2215
    """Verifies out of band functionality of a node.
2216

2217
    @type ninfo: L{objects.Node}
2218
    @param ninfo: the node to check
2219
    @param nresult: the remote results for the node
2220

2221
    """
2222
    node = ninfo.name
2223
    # We just have to verify the paths on master and/or master candidates
2224
    # as the oob helper is invoked on the master
2225
    if ((ninfo.master_candidate or ninfo.master_capable) and
2226
        constants.NV_OOB_PATHS in nresult):
2227
      for path_result in nresult[constants.NV_OOB_PATHS]:
2228
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2229

    
2230
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2231
    """Verifies and updates the node volume data.
2232

2233
    This function will update a L{NodeImage}'s internal structures
2234
    with data from the remote call.
2235

2236
    @type ninfo: L{objects.Node}
2237
    @param ninfo: the node to check
2238
    @param nresult: the remote results for the node
2239
    @param nimg: the node image object
2240
    @param vg_name: the configured VG name
2241

2242
    """
2243
    node = ninfo.name
2244
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2245

    
2246
    nimg.lvm_fail = True
2247
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2248
    if vg_name is None:
2249
      pass
2250
    elif isinstance(lvdata, basestring):
2251
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2252
               utils.SafeEncode(lvdata))
2253
    elif not isinstance(lvdata, dict):
2254
      _ErrorIf(True, constants.CV_ENODELVM, node,
2255
               "rpc call to node failed (lvlist)")
2256
    else:
2257
      nimg.volumes = lvdata
2258
      nimg.lvm_fail = False
2259

    
2260
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2261
    """Verifies and updates the node instance list.
2262

2263
    If the listing was successful, then updates this node's instance
2264
    list. Otherwise, it marks the RPC call as failed for the instance
2265
    list key.
2266

2267
    @type ninfo: L{objects.Node}
2268
    @param ninfo: the node to check
2269
    @param nresult: the remote results for the node
2270
    @param nimg: the node image object
2271

2272
    """
2273
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2274
    test = not isinstance(idata, list)
2275
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2276
                  "rpc call to node failed (instancelist): %s",
2277
                  utils.SafeEncode(str(idata)))
2278
    if test:
2279
      nimg.hyp_fail = True
2280
    else:
2281
      nimg.instances = idata
2282

    
2283
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2284
    """Verifies and computes a node information map
2285

2286
    @type ninfo: L{objects.Node}
2287
    @param ninfo: the node to check
2288
    @param nresult: the remote results for the node
2289
    @param nimg: the node image object
2290
    @param vg_name: the configured VG name
2291

2292
    """
2293
    node = ninfo.name
2294
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2295

    
2296
    # try to read free memory (from the hypervisor)
2297
    hv_info = nresult.get(constants.NV_HVINFO, None)
2298
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2299
    _ErrorIf(test, constants.CV_ENODEHV, node,
2300
             "rpc call to node failed (hvinfo)")
2301
    if not test:
2302
      try:
2303
        nimg.mfree = int(hv_info["memory_free"])
2304
      except (ValueError, TypeError):
2305
        _ErrorIf(True, constants.CV_ENODERPC, node,
2306
                 "node returned invalid nodeinfo, check hypervisor")
2307

    
2308
    # FIXME: devise a free space model for file based instances as well
2309
    if vg_name is not None:
2310
      test = (constants.NV_VGLIST not in nresult or
2311
              vg_name not in nresult[constants.NV_VGLIST])
2312
      _ErrorIf(test, constants.CV_ENODELVM, node,
2313
               "node didn't return data for the volume group '%s'"
2314
               " - it is either missing or broken", vg_name)
2315
      if not test:
2316
        try:
2317
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2318
        except (ValueError, TypeError):
2319
          _ErrorIf(True, constants.CV_ENODERPC, node,
2320
                   "node returned invalid LVM info, check LVM status")
2321

    
2322
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2323
    """Gets per-disk status information for all instances.
2324

2325
    @type nodelist: list of strings
2326
    @param nodelist: Node names
2327
    @type node_image: dict of (name, L{objects.Node})
2328
    @param node_image: Node objects
2329
    @type instanceinfo: dict of (name, L{objects.Instance})
2330
    @param instanceinfo: Instance objects
2331
    @rtype: {instance: {node: [(succes, payload)]}}
2332
    @return: a dictionary of per-instance dictionaries with nodes as
2333
        keys and disk information as values; the disk information is a
2334
        list of tuples (success, payload)
2335

2336
    """
2337
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2338

    
2339
    node_disks = {}
2340
    node_disks_devonly = {}
2341
    diskless_instances = set()
2342
    diskless = constants.DT_DISKLESS
2343

    
2344
    for nname in nodelist:
2345
      node_instances = list(itertools.chain(node_image[nname].pinst,
2346
                                            node_image[nname].sinst))
2347
      diskless_instances.update(inst for inst in node_instances
2348
                                if instanceinfo[inst].disk_template == diskless)
2349
      disks = [(inst, disk)
2350
               for inst in node_instances
2351
               for disk in instanceinfo[inst].disks]
2352

    
2353
      if not disks:
2354
        # No need to collect data
2355
        continue
2356

    
2357
      node_disks[nname] = disks
2358

    
2359
      # _AnnotateDiskParams makes already copies of the disks
2360
      devonly = []
2361
      for (inst, dev) in disks:
2362
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2363
        self.cfg.SetDiskID(anno_disk, nname)
2364
        devonly.append(anno_disk)
2365

    
2366
      node_disks_devonly[nname] = devonly
2367

    
2368
    assert len(node_disks) == len(node_disks_devonly)
2369

    
2370
    # Collect data from all nodes with disks
2371
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2372
                                                          node_disks_devonly)
2373

    
2374
    assert len(result) == len(node_disks)
2375

    
2376
    instdisk = {}
2377

    
2378
    for (nname, nres) in result.items():
2379
      disks = node_disks[nname]
2380

    
2381
      if nres.offline:
2382
        # No data from this node
2383
        data = len(disks) * [(False, "node offline")]
2384
      else:
2385
        msg = nres.fail_msg
2386
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2387
                 "while getting disk information: %s", msg)
2388
        if msg:
2389
          # No data from this node
2390
          data = len(disks) * [(False, msg)]
2391
        else:
2392
          data = []
2393
          for idx, i in enumerate(nres.payload):
2394
            if isinstance(i, (tuple, list)) and len(i) == 2:
2395
              data.append(i)
2396
            else:
2397
              logging.warning("Invalid result from node %s, entry %d: %s",
2398
                              nname, idx, i)
2399
              data.append((False, "Invalid result from the remote node"))
2400

    
2401
      for ((inst, _), status) in zip(disks, data):
2402
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2403

    
2404
    # Add empty entries for diskless instances.
2405
    for inst in diskless_instances:
2406
      assert inst not in instdisk
2407
      instdisk[inst] = {}
2408

    
2409
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2410
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2411
                      compat.all(isinstance(s, (tuple, list)) and
2412
                                 len(s) == 2 for s in statuses)
2413
                      for inst, nnames in instdisk.items()
2414
                      for nname, statuses in nnames.items())
2415
    if __debug__:
2416
      instdisk_keys = set(instdisk)
2417
      instanceinfo_keys = set(instanceinfo)
2418
      assert instdisk_keys == instanceinfo_keys, \
2419
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2420
         (instdisk_keys, instanceinfo_keys))
2421

    
2422
    return instdisk
2423

    
2424
  @staticmethod
2425
  def _SshNodeSelector(group_uuid, all_nodes):
2426
    """Create endless iterators for all potential SSH check hosts.
2427

2428
    """
2429
    nodes = [node for node in all_nodes
2430
             if (node.group != group_uuid and
2431
                 not node.offline)]
2432
    keyfunc = operator.attrgetter("group")
2433

    
2434
    return map(itertools.cycle,
2435
               [sorted(map(operator.attrgetter("name"), names))
2436
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2437
                                                  keyfunc)])
2438

    
2439
  @classmethod
2440
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2441
    """Choose which nodes should talk to which other nodes.
2442

2443
    We will make nodes contact all nodes in their group, and one node from
2444
    every other group.
2445

2446
    @warning: This algorithm has a known issue if one node group is much
2447
      smaller than others (e.g. just one node). In such a case all other
2448
      nodes will talk to the single node.
2449

2450
    """
2451
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2452
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2453

    
2454
    return (online_nodes,
2455
            dict((name, sorted([i.next() for i in sel]))
2456
                 for name in online_nodes))
2457

    
2458
  def BuildHooksEnv(self):
2459
    """Build hooks env.
2460

2461
    Cluster-Verify hooks just ran in the post phase and their failure makes
2462
    the output be logged in the verify output and the verification to fail.
2463

2464
    """
2465
    env = {
2466
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2467
      }
2468

    
2469
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2470
               for node in self.my_node_info.values())
2471

    
2472
    return env
2473

    
2474
  def BuildHooksNodes(self):
2475
    """Build hooks nodes.
2476

2477
    """
2478
    return ([], self.my_node_names)
2479

    
2480
  def Exec(self, feedback_fn):
2481
    """Verify integrity of the node group, performing various test on nodes.
2482

2483
    """
2484
    # This method has too many local variables. pylint: disable=R0914
2485
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2486

    
2487
    if not self.my_node_names:
2488
      # empty node group
2489
      feedback_fn("* Empty node group, skipping verification")
2490
      return True
2491

    
2492
    self.bad = False
2493
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2494
    verbose = self.op.verbose
2495
    self._feedback_fn = feedback_fn
2496

    
2497
    vg_name = self.cfg.GetVGName()
2498
    drbd_helper = self.cfg.GetDRBDHelper()
2499
    cluster = self.cfg.GetClusterInfo()
2500
    hypervisors = cluster.enabled_hypervisors
2501
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2502

    
2503
    i_non_redundant = [] # Non redundant instances
2504
    i_non_a_balanced = [] # Non auto-balanced instances
2505
    i_offline = 0 # Count of offline instances
2506
    n_offline = 0 # Count of offline nodes
2507
    n_drained = 0 # Count of nodes being drained
2508
    node_vol_should = {}
2509

    
2510
    # FIXME: verify OS list
2511

    
2512
    # File verification
2513
    filemap = ComputeAncillaryFiles(cluster, False)
2514

    
2515
    # do local checksums
2516
    master_node = self.master_node = self.cfg.GetMasterNode()
2517
    master_ip = self.cfg.GetMasterIP()
2518

    
2519
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2520

    
2521
    user_scripts = []
2522
    if self.cfg.GetUseExternalMipScript():
2523
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2524

    
2525
    node_verify_param = {
2526
      constants.NV_FILELIST:
2527
        map(vcluster.MakeVirtualPath,
2528
            utils.UniqueSequence(filename
2529
                                 for files in filemap
2530
                                 for filename in files)),
2531
      constants.NV_NODELIST:
2532
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2533
                                  self.all_node_info.values()),
2534
      constants.NV_HYPERVISOR: hypervisors,
2535
      constants.NV_HVPARAMS:
2536
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2537
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2538
                                 for node in node_data_list
2539
                                 if not node.offline],
2540
      constants.NV_INSTANCELIST: hypervisors,
2541
      constants.NV_VERSION: None,
2542
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2543
      constants.NV_NODESETUP: None,
2544
      constants.NV_TIME: None,
2545
      constants.NV_MASTERIP: (master_node, master_ip),
2546
      constants.NV_OSLIST: None,
2547
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2548
      constants.NV_USERSCRIPTS: user_scripts,
2549
      }
2550

    
2551
    if vg_name is not None:
2552
      node_verify_param[constants.NV_VGLIST] = None
2553
      node_verify_param[constants.NV_LVLIST] = vg_name
2554
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2555

    
2556
    if drbd_helper:
2557
      node_verify_param[constants.NV_DRBDLIST] = None
2558
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2559

    
2560
    if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2561
      # Load file storage paths only from master node
2562
      node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2563

    
2564
    # bridge checks
2565
    # FIXME: this needs to be changed per node-group, not cluster-wide
2566
    bridges = set()
2567
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2568
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2569
      bridges.add(default_nicpp[constants.NIC_LINK])
2570
    for instance in self.my_inst_info.values():
2571
      for nic in instance.nics:
2572
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2573
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2574
          bridges.add(full_nic[constants.NIC_LINK])
2575

    
2576
    if bridges:
2577
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2578

    
2579
    # Build our expected cluster state
2580
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2581
                                                 name=node.name,
2582
                                                 vm_capable=node.vm_capable))
2583
                      for node in node_data_list)
2584

    
2585
    # Gather OOB paths
2586
    oob_paths = []
2587
    for node in self.all_node_info.values():
2588
      path = SupportsOob(self.cfg, node)
2589
      if path and path not in oob_paths:
2590
        oob_paths.append(path)
2591

    
2592
    if oob_paths:
2593
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2594

    
2595
    for instance in self.my_inst_names:
2596
      inst_config = self.my_inst_info[instance]
2597
      if inst_config.admin_state == constants.ADMINST_OFFLINE:
2598
        i_offline += 1
2599

    
2600
      for nname in inst_config.all_nodes:
2601
        if nname not in node_image:
2602
          gnode = self.NodeImage(name=nname)
2603
          gnode.ghost = (nname not in self.all_node_info)
2604
          node_image[nname] = gnode
2605

    
2606
      inst_config.MapLVsByNode(node_vol_should)
2607

    
2608
      pnode = inst_config.primary_node
2609
      node_image[pnode].pinst.append(instance)
2610

    
2611
      for snode in inst_config.secondary_nodes:
2612
        nimg = node_image[snode]
2613
        nimg.sinst.append(instance)
2614
        if pnode not in nimg.sbp:
2615
          nimg.sbp[pnode] = []
2616
        nimg.sbp[pnode].append(instance)
2617

    
2618
    es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2619
    # The value of exclusive_storage should be the same across the group, so if
2620
    # it's True for at least a node, we act as if it were set for all the nodes
2621
    self._exclusive_storage = compat.any(es_flags.values())
2622
    if self._exclusive_storage:
2623
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2624

    
2625
    # At this point, we have the in-memory data structures complete,
2626
    # except for the runtime information, which we'll gather next
2627

    
2628
    # Due to the way our RPC system works, exact response times cannot be
2629
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2630
    # time before and after executing the request, we can at least have a time
2631
    # window.
2632
    nvinfo_starttime = time.time()
2633
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2634
                                           node_verify_param,
2635
                                           self.cfg.GetClusterName())
2636
    nvinfo_endtime = time.time()
2637

    
2638
    if self.extra_lv_nodes and vg_name is not None:
2639
      extra_lv_nvinfo = \
2640
          self.rpc.call_node_verify(self.extra_lv_nodes,
2641
                                    {constants.NV_LVLIST: vg_name},
2642
                                    self.cfg.GetClusterName())
2643
    else:
2644
      extra_lv_nvinfo = {}
2645

    
2646
    all_drbd_map = self.cfg.ComputeDRBDMap()
2647

    
2648
    feedback_fn("* Gathering disk information (%s nodes)" %
2649
                len(self.my_node_names))
2650
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2651
                                     self.my_inst_info)
2652

    
2653
    feedback_fn("* Verifying configuration file consistency")
2654

    
2655
    # If not all nodes are being checked, we need to make sure the master node
2656
    # and a non-checked vm_capable node are in the list.
2657
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2658
    if absent_nodes:
2659
      vf_nvinfo = all_nvinfo.copy()
2660
      vf_node_info = list(self.my_node_info.values())
2661
      additional_nodes = []
2662
      if master_node not in self.my_node_info:
2663
        additional_nodes.append(master_node)
2664
        vf_node_info.append(self.all_node_info[master_node])
2665
      # Add the first vm_capable node we find which is not included,
2666
      # excluding the master node (which we already have)
2667
      for node in absent_nodes:
2668
        nodeinfo = self.all_node_info[node]
2669
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2670
            node != master_node):
2671
          additional_nodes.append(node)
2672
          vf_node_info.append(self.all_node_info[node])
2673
          break
2674
      key = constants.NV_FILELIST
2675
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2676
                                                 {key: node_verify_param[key]},
2677
                                                 self.cfg.GetClusterName()))
2678
    else:
2679
      vf_nvinfo = all_nvinfo
2680
      vf_node_info = self.my_node_info.values()
2681

    
2682
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2683

    
2684
    feedback_fn("* Verifying node status")
2685

    
2686
    refos_img = None
2687

    
2688
    for node_i in node_data_list:
2689
      node = node_i.name
2690
      nimg = node_image[node]
2691

    
2692
      if node_i.offline:
2693
        if verbose:
2694
          feedback_fn("* Skipping offline node %s" % (node,))
2695
        n_offline += 1
2696
        continue
2697

    
2698
      if node == master_node:
2699
        ntype = "master"
2700
      elif node_i.master_candidate:
2701
        ntype = "master candidate"
2702
      elif node_i.drained:
2703
        ntype = "drained"
2704
        n_drained += 1
2705
      else:
2706
        ntype = "regular"
2707
      if verbose:
2708
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2709

    
2710
      msg = all_nvinfo[node].fail_msg
2711
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2712
               msg)
2713
      if msg:
2714
        nimg.rpc_fail = True
2715
        continue
2716

    
2717
      nresult = all_nvinfo[node].payload
2718

    
2719
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2720
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2721
      self._VerifyNodeNetwork(node_i, nresult)
2722
      self._VerifyNodeUserScripts(node_i, nresult)
2723
      self._VerifyOob(node_i, nresult)
2724
      self._VerifyFileStoragePaths(node_i, nresult,
2725
                                   node == master_node)
2726

    
2727
      if nimg.vm_capable:
2728
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2729
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2730
                             all_drbd_map)
2731

    
2732
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2733
        self._UpdateNodeInstances(node_i, nresult, nimg)
2734
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2735
        self._UpdateNodeOS(node_i, nresult, nimg)
2736

    
2737
        if not nimg.os_fail:
2738
          if refos_img is None:
2739
            refos_img = nimg
2740
          self._VerifyNodeOS(node_i, nimg, refos_img)
2741
        self._VerifyNodeBridges(node_i, nresult, bridges)
2742

    
2743
        # Check whether all running instancies are primary for the node. (This
2744
        # can no longer be done from _VerifyInstance below, since some of the
2745
        # wrong instances could be from other node groups.)
2746
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2747

    
2748
        for inst in non_primary_inst:
2749
          test = inst in self.all_inst_info
2750
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2751
                   "instance should not run on node %s", node_i.name)
2752
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2753
                   "node is running unknown instance %s", inst)
2754

    
2755
    self._VerifyGroupLVM(node_image, vg_name)
2756

    
2757
    for node, result in extra_lv_nvinfo.items():
2758
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2759
                              node_image[node], vg_name)
2760

    
2761
    feedback_fn("* Verifying instance status")
2762
    for instance in self.my_inst_names:
2763
      if verbose:
2764
        feedback_fn("* Verifying instance %s" % instance)
2765
      inst_config = self.my_inst_info[instance]
2766
      self._VerifyInstance(instance, inst_config, node_image,
2767
                           instdisk[instance])
2768

    
2769
      # If the instance is non-redundant we cannot survive losing its primary
2770
      # node, so we are not N+1 compliant.
2771
      if inst_config.disk_template not in constants.DTS_MIRRORED:
2772
        i_non_redundant.append(instance)
2773

    
2774
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2775
        i_non_a_balanced.append(instance)
2776

    
2777
    feedback_fn("* Verifying orphan volumes")
2778
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2779

    
2780
    # We will get spurious "unknown volume" warnings if any node of this group
2781
    # is secondary for an instance whose primary is in another group. To avoid
2782
    # them, we find these instances and add their volumes to node_vol_should.
2783
    for inst in self.all_inst_info.values():
2784
      for secondary in inst.secondary_nodes:
2785
        if (secondary in self.my_node_info
2786
            and inst.name not in self.my_inst_info):
2787
          inst.MapLVsByNode(node_vol_should)
2788
          break
2789

    
2790
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2791

    
2792
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2793
      feedback_fn("* Verifying N+1 Memory redundancy")
2794
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2795

    
2796
    feedback_fn("* Other Notes")
2797
    if i_non_redundant:
2798
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2799
                  % len(i_non_redundant))
2800

    
2801
    if i_non_a_balanced:
2802
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2803
                  % len(i_non_a_balanced))
2804

    
2805
    if i_offline:
2806
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2807

    
2808
    if n_offline:
2809
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2810

    
2811
    if n_drained:
2812
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2813

    
2814
    return not self.bad
2815

    
2816
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2817
    """Analyze the post-hooks' result
2818

2819
    This method analyses the hook result, handles it, and sends some
2820
    nicely-formatted feedback back to the user.
2821

2822
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2823
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2824
    @param hooks_results: the results of the multi-node hooks rpc call
2825
    @param feedback_fn: function used send feedback back to the caller
2826
    @param lu_result: previous Exec result
2827
    @return: the new Exec result, based on the previous result
2828
        and hook results
2829

2830
    """
2831
    # We only really run POST phase hooks, only for non-empty groups,
2832
    # and are only interested in their results
2833
    if not self.my_node_names:
2834
      # empty node group
2835
      pass
2836
    elif phase == constants.HOOKS_PHASE_POST:
2837
      # Used to change hooks' output to proper indentation
2838
      feedback_fn("* Hooks Results")
2839
      assert hooks_results, "invalid result from hooks"
2840

    
2841
      for node_name in hooks_results:
2842
        res = hooks_results[node_name]
2843
        msg = res.fail_msg
2844
        test = msg and not res.offline
2845
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2846
                      "Communication failure in hooks execution: %s", msg)
2847
        if res.offline or msg:
2848
          # No need to investigate payload if node is offline or gave
2849
          # an error.
2850
          continue
2851
        for script, hkr, output in res.payload:
2852
          test = hkr == constants.HKR_FAIL
2853
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2854
                        "Script %s failed, output:", script)
2855
          if test:
2856
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2857
            feedback_fn("%s" % output)
2858
            lu_result = False
2859

    
2860
    return lu_result
2861

    
2862

    
2863
class LUClusterVerifyDisks(NoHooksLU):
2864
  """Verifies the cluster disks status.
2865

2866
  """
2867
  REQ_BGL = False
2868

    
2869
  def ExpandNames(self):
2870
    self.share_locks = ShareAll()
2871
    self.needed_locks = {
2872
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2873
      }
2874

    
2875
  def Exec(self, feedback_fn):
2876
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2877

    
2878
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2879
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2880
                           for group in group_names])