Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 69ac3b74

History | View | Annotate | Download (104.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.name,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, master_params.name)
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
170
                                                     master_params, ems)
171
    if result.fail_msg:
172
      self.LogWarning("Error disabling the master IP address: %s",
173
                      result.fail_msg)
174

    
175
    return master_params.name
176

    
177

    
178
class LUClusterPostInit(LogicalUnit):
179
  """Logical unit for running hooks after cluster initialization.
180

181
  """
182
  HPATH = "cluster-init"
183
  HTYPE = constants.HTYPE_CLUSTER
184

    
185
  def BuildHooksEnv(self):
186
    """Build hooks env.
187

188
    """
189
    return {
190
      "OP_TARGET": self.cfg.GetClusterName(),
191
      }
192

    
193
  def BuildHooksNodes(self):
194
    """Build hooks nodes.
195

196
    """
197
    return ([], [self.cfg.GetMasterNode()])
198

    
199
  def Exec(self, feedback_fn):
200
    """Nothing to do.
201

202
    """
203
    return True
204

    
205

    
206
class ClusterQuery(QueryBase):
207
  FIELDS = query.CLUSTER_FIELDS
208

    
209
  #: Do not sort (there is only one item)
210
  SORT_FIELD = None
211

    
212
  def ExpandNames(self, lu):
213
    lu.needed_locks = {}
214

    
215
    # The following variables interact with _QueryBase._GetNames
216
    self.wanted = locking.ALL_SET
217
    self.do_locking = self.use_locking
218

    
219
    if self.do_locking:
220
      raise errors.OpPrereqError("Can not use locking for cluster queries",
221
                                 errors.ECODE_INVAL)
222

    
223
  def DeclareLocks(self, lu, level):
224
    pass
225

    
226
  def _GetQueryData(self, lu):
227
    """Computes the list of nodes and their attributes.
228

229
    """
230
    # Locking is not used
231
    assert not (compat.any(lu.glm.is_owned(level)
232
                           for level in locking.LEVELS
233
                           if level != locking.LEVEL_CLUSTER) or
234
                self.do_locking or self.use_locking)
235

    
236
    if query.CQ_CONFIG in self.requested_data:
237
      cluster = lu.cfg.GetClusterInfo()
238
    else:
239
      cluster = NotImplemented
240

    
241
    if query.CQ_QUEUE_DRAINED in self.requested_data:
242
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243
    else:
244
      drain_flag = NotImplemented
245

    
246
    if query.CQ_WATCHER_PAUSE in self.requested_data:
247
      master_name = lu.cfg.GetMasterNode()
248

    
249
      result = lu.rpc.call_get_watcher_pause(master_name)
250
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
251
                   master_name)
252

    
253
      watcher_pause = result.payload
254
    else:
255
      watcher_pause = NotImplemented
256

    
257
    return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
258

    
259

    
260
class LUClusterQuery(NoHooksLU):
261
  """Query cluster configuration.
262

263
  """
264
  REQ_BGL = False
265

    
266
  def ExpandNames(self):
267
    self.needed_locks = {}
268

    
269
  def Exec(self, feedback_fn):
270
    """Return cluster config.
271

272
    """
273
    cluster = self.cfg.GetClusterInfo()
274
    os_hvp = {}
275

    
276
    # Filter just for enabled hypervisors
277
    for os_name, hv_dict in cluster.os_hvp.items():
278
      os_hvp[os_name] = {}
279
      for hv_name, hv_params in hv_dict.items():
280
        if hv_name in cluster.enabled_hypervisors:
281
          os_hvp[os_name][hv_name] = hv_params
282

    
283
    # Convert ip_family to ip_version
284
    primary_ip_version = constants.IP4_VERSION
285
    if cluster.primary_ip_family == netutils.IP6Address.family:
286
      primary_ip_version = constants.IP6_VERSION
287

    
288
    result = {
289
      "software_version": constants.RELEASE_VERSION,
290
      "protocol_version": constants.PROTOCOL_VERSION,
291
      "config_version": constants.CONFIG_VERSION,
292
      "os_api_version": max(constants.OS_API_VERSIONS),
293
      "export_version": constants.EXPORT_VERSION,
294
      "architecture": runtime.GetArchInfo(),
295
      "name": cluster.cluster_name,
296
      "master": cluster.master_node,
297
      "default_hypervisor": cluster.primary_hypervisor,
298
      "enabled_hypervisors": cluster.enabled_hypervisors,
299
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
300
                        for hypervisor_name in cluster.enabled_hypervisors]),
301
      "os_hvp": os_hvp,
302
      "beparams": cluster.beparams,
303
      "osparams": cluster.osparams,
304
      "ipolicy": cluster.ipolicy,
305
      "nicparams": cluster.nicparams,
306
      "ndparams": cluster.ndparams,
307
      "diskparams": cluster.diskparams,
308
      "candidate_pool_size": cluster.candidate_pool_size,
309
      "master_netdev": cluster.master_netdev,
310
      "master_netmask": cluster.master_netmask,
311
      "use_external_mip_script": cluster.use_external_mip_script,
312
      "volume_group_name": cluster.volume_group_name,
313
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
314
      "file_storage_dir": cluster.file_storage_dir,
315
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
316
      "maintain_node_health": cluster.maintain_node_health,
317
      "ctime": cluster.ctime,
318
      "mtime": cluster.mtime,
319
      "uuid": cluster.uuid,
320
      "tags": list(cluster.GetTags()),
321
      "uid_pool": cluster.uid_pool,
322
      "default_iallocator": cluster.default_iallocator,
323
      "reserved_lvs": cluster.reserved_lvs,
324
      "primary_ip_version": primary_ip_version,
325
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
326
      "hidden_os": cluster.hidden_os,
327
      "blacklisted_os": cluster.blacklisted_os,
328
      "enabled_disk_templates": cluster.enabled_disk_templates,
329
      }
330

    
331
    return result
332

    
333

    
334
class LUClusterRedistConf(NoHooksLU):
335
  """Force the redistribution of cluster configuration.
336

337
  This is a very simple LU.
338

339
  """
340
  REQ_BGL = False
341

    
342
  def ExpandNames(self):
343
    self.needed_locks = {
344
      locking.LEVEL_NODE: locking.ALL_SET,
345
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
346
    }
347
    self.share_locks = ShareAll()
348

    
349
  def Exec(self, feedback_fn):
350
    """Redistribute the configuration.
351

352
    """
353
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
354
    RedistributeAncillaryFiles(self)
355

    
356

    
357
class LUClusterRename(LogicalUnit):
358
  """Rename the cluster.
359

360
  """
361
  HPATH = "cluster-rename"
362
  HTYPE = constants.HTYPE_CLUSTER
363

    
364
  def BuildHooksEnv(self):
365
    """Build hooks env.
366

367
    """
368
    return {
369
      "OP_TARGET": self.cfg.GetClusterName(),
370
      "NEW_NAME": self.op.name,
371
      }
372

    
373
  def BuildHooksNodes(self):
374
    """Build hooks nodes.
375

376
    """
377
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
378

    
379
  def CheckPrereq(self):
380
    """Verify that the passed name is a valid one.
381

382
    """
383
    hostname = netutils.GetHostname(name=self.op.name,
384
                                    family=self.cfg.GetPrimaryIPFamily())
385

    
386
    new_name = hostname.name
387
    self.ip = new_ip = hostname.ip
388
    old_name = self.cfg.GetClusterName()
389
    old_ip = self.cfg.GetMasterIP()
390
    if new_name == old_name and new_ip == old_ip:
391
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
392
                                 " cluster has changed",
393
                                 errors.ECODE_INVAL)
394
    if new_ip != old_ip:
395
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
396
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
397
                                   " reachable on the network" %
398
                                   new_ip, errors.ECODE_NOTUNIQUE)
399

    
400
    self.op.name = new_name
401

    
402
  def Exec(self, feedback_fn):
403
    """Rename the cluster.
404

405
    """
406
    clustername = self.op.name
407
    new_ip = self.ip
408

    
409
    # shutdown the master IP
410
    master_params = self.cfg.GetMasterNetworkParameters()
411
    ems = self.cfg.GetUseExternalMipScript()
412
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
413
                                                     master_params, ems)
414
    result.Raise("Could not disable the master role")
415

    
416
    try:
417
      cluster = self.cfg.GetClusterInfo()
418
      cluster.cluster_name = clustername
419
      cluster.master_ip = new_ip
420
      self.cfg.Update(cluster, feedback_fn)
421

    
422
      # update the known hosts file
423
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
424
      node_list = self.cfg.GetOnlineNodeList()
425
      try:
426
        node_list.remove(master_params.name)
427
      except ValueError:
428
        pass
429
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
430
    finally:
431
      master_params.ip = new_ip
432
      result = self.rpc.call_node_activate_master_ip(master_params.name,
433
                                                     master_params, ems)
434
      msg = result.fail_msg
435
      if msg:
436
        self.LogWarning("Could not re-enable the master role on"
437
                        " the master, please restart manually: %s", msg)
438

    
439
    return clustername
440

    
441

    
442
class LUClusterRepairDiskSizes(NoHooksLU):
443
  """Verifies the cluster disks sizes.
444

445
  """
446
  REQ_BGL = False
447

    
448
  def ExpandNames(self):
449
    if self.op.instances:
450
      self.wanted_names = GetWantedInstances(self, self.op.instances)
451
      # Not getting the node allocation lock as only a specific set of
452
      # instances (and their nodes) is going to be acquired
453
      self.needed_locks = {
454
        locking.LEVEL_NODE_RES: [],
455
        locking.LEVEL_INSTANCE: self.wanted_names,
456
        }
457
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
458
    else:
459
      self.wanted_names = None
460
      self.needed_locks = {
461
        locking.LEVEL_NODE_RES: locking.ALL_SET,
462
        locking.LEVEL_INSTANCE: locking.ALL_SET,
463

    
464
        # This opcode is acquires the node locks for all instances
465
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
466
        }
467

    
468
    self.share_locks = {
469
      locking.LEVEL_NODE_RES: 1,
470
      locking.LEVEL_INSTANCE: 0,
471
      locking.LEVEL_NODE_ALLOC: 1,
472
      }
473

    
474
  def DeclareLocks(self, level):
475
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
476
      self._LockInstancesNodes(primary_only=True, level=level)
477

    
478
  def CheckPrereq(self):
479
    """Check prerequisites.
480

481
    This only checks the optional instance list against the existing names.
482

483
    """
484
    if self.wanted_names is None:
485
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
486

    
487
    self.wanted_instances = \
488
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
489

    
490
  def _EnsureChildSizes(self, disk):
491
    """Ensure children of the disk have the needed disk size.
492

493
    This is valid mainly for DRBD8 and fixes an issue where the
494
    children have smaller disk size.
495

496
    @param disk: an L{ganeti.objects.Disk} object
497

498
    """
499
    if disk.dev_type == constants.LD_DRBD8:
500
      assert disk.children, "Empty children for DRBD8?"
501
      fchild = disk.children[0]
502
      mismatch = fchild.size < disk.size
503
      if mismatch:
504
        self.LogInfo("Child disk has size %d, parent %d, fixing",
505
                     fchild.size, disk.size)
506
        fchild.size = disk.size
507

    
508
      # and we recurse on this child only, not on the metadev
509
      return self._EnsureChildSizes(fchild) or mismatch
510
    else:
511
      return False
512

    
513
  def Exec(self, feedback_fn):
514
    """Verify the size of cluster disks.
515

516
    """
517
    # TODO: check child disks too
518
    # TODO: check differences in size between primary/secondary nodes
519
    per_node_disks = {}
520
    for instance in self.wanted_instances:
521
      pnode = instance.primary_node
522
      if pnode not in per_node_disks:
523
        per_node_disks[pnode] = []
524
      for idx, disk in enumerate(instance.disks):
525
        per_node_disks[pnode].append((instance, idx, disk))
526

    
527
    assert not (frozenset(per_node_disks.keys()) -
528
                self.owned_locks(locking.LEVEL_NODE_RES)), \
529
      "Not owning correct locks"
530
    assert not self.owned_locks(locking.LEVEL_NODE)
531

    
532
    changed = []
533
    for node, dskl in per_node_disks.items():
534
      newl = [v[2].Copy() for v in dskl]
535
      for dsk in newl:
536
        self.cfg.SetDiskID(dsk, node)
537
      result = self.rpc.call_blockdev_getsize(node, newl)
538
      if result.fail_msg:
539
        self.LogWarning("Failure in blockdev_getsize call to node"
540
                        " %s, ignoring", node)
541
        continue
542
      if len(result.payload) != len(dskl):
543
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
544
                        " result.payload=%s", node, len(dskl), result.payload)
545
        self.LogWarning("Invalid result from node %s, ignoring node results",
546
                        node)
547
        continue
548
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
549
        if size is None:
550
          self.LogWarning("Disk %d of instance %s did not return size"
551
                          " information, ignoring", idx, instance.name)
552
          continue
553
        if not isinstance(size, (int, long)):
554
          self.LogWarning("Disk %d of instance %s did not return valid"
555
                          " size information, ignoring", idx, instance.name)
556
          continue
557
        size = size >> 20
558
        if size != disk.size:
559
          self.LogInfo("Disk %d of instance %s has mismatched size,"
560
                       " correcting: recorded %d, actual %d", idx,
561
                       instance.name, disk.size, size)
562
          disk.size = size
563
          self.cfg.Update(instance, feedback_fn)
564
          changed.append((instance.name, idx, size))
565
        if self._EnsureChildSizes(disk):
566
          self.cfg.Update(instance, feedback_fn)
567
          changed.append((instance.name, idx, disk.size))
568
    return changed
569

    
570

    
571
def _ValidateNetmask(cfg, netmask):
572
  """Checks if a netmask is valid.
573

574
  @type cfg: L{config.ConfigWriter}
575
  @param cfg: The cluster configuration
576
  @type netmask: int
577
  @param netmask: the netmask to be verified
578
  @raise errors.OpPrereqError: if the validation fails
579

580
  """
581
  ip_family = cfg.GetPrimaryIPFamily()
582
  try:
583
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
584
  except errors.ProgrammerError:
585
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
586
                               ip_family, errors.ECODE_INVAL)
587
  if not ipcls.ValidateNetmask(netmask):
588
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
589
                               (netmask), errors.ECODE_INVAL)
590

    
591

    
592
class LUClusterSetParams(LogicalUnit):
593
  """Change the parameters of the cluster.
594

595
  """
596
  HPATH = "cluster-modify"
597
  HTYPE = constants.HTYPE_CLUSTER
598
  REQ_BGL = False
599

    
600
  def CheckArguments(self):
601
    """Check parameters
602

603
    """
604
    if self.op.uid_pool:
605
      uidpool.CheckUidPool(self.op.uid_pool)
606

    
607
    if self.op.add_uids:
608
      uidpool.CheckUidPool(self.op.add_uids)
609

    
610
    if self.op.remove_uids:
611
      uidpool.CheckUidPool(self.op.remove_uids)
612

    
613
    if self.op.master_netmask is not None:
614
      _ValidateNetmask(self.cfg, self.op.master_netmask)
615

    
616
    if self.op.diskparams:
617
      for dt_params in self.op.diskparams.values():
618
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
619
      try:
620
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
621
      except errors.OpPrereqError, err:
622
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
623
                                   errors.ECODE_INVAL)
624

    
625
  def ExpandNames(self):
626
    # FIXME: in the future maybe other cluster params won't require checking on
627
    # all nodes to be modified.
628
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
629
    # resource locks the right thing, shouldn't it be the BGL instead?
630
    self.needed_locks = {
631
      locking.LEVEL_NODE: locking.ALL_SET,
632
      locking.LEVEL_INSTANCE: locking.ALL_SET,
633
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
634
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
635
    }
636
    self.share_locks = ShareAll()
637

    
638
  def BuildHooksEnv(self):
639
    """Build hooks env.
640

641
    """
642
    return {
643
      "OP_TARGET": self.cfg.GetClusterName(),
644
      "NEW_VG_NAME": self.op.vg_name,
645
      }
646

    
647
  def BuildHooksNodes(self):
648
    """Build hooks nodes.
649

650
    """
651
    mn = self.cfg.GetMasterNode()
652
    return ([mn], [mn])
653

    
654
  def CheckPrereq(self):
655
    """Check prerequisites.
656

657
    This checks whether the given params don't conflict and
658
    if the given volume group is valid.
659

660
    """
661
    if self.op.vg_name is not None and not self.op.vg_name:
662
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
663
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
664
                                   " instances exist", errors.ECODE_INVAL)
665

    
666
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
667
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
668
        raise errors.OpPrereqError("Cannot disable drbd helper while"
669
                                   " drbd-based instances exist",
670
                                   errors.ECODE_INVAL)
671

    
672
    node_list = self.owned_locks(locking.LEVEL_NODE)
673

    
674
    vm_capable_nodes = [node.name
675
                        for node in self.cfg.GetAllNodesInfo().values()
676
                        if node.name in node_list and node.vm_capable]
677

    
678
    # if vg_name not None, checks given volume group on all nodes
679
    if self.op.vg_name:
680
      vglist = self.rpc.call_vg_list(vm_capable_nodes)
681
      for node in vm_capable_nodes:
682
        msg = vglist[node].fail_msg
683
        if msg:
684
          # ignoring down node
685
          self.LogWarning("Error while gathering data on node %s"
686
                          " (ignoring node): %s", node, msg)
687
          continue
688
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
689
                                              self.op.vg_name,
690
                                              constants.MIN_VG_SIZE)
691
        if vgstatus:
692
          raise errors.OpPrereqError("Error on node '%s': %s" %
693
                                     (node, vgstatus), errors.ECODE_ENVIRON)
694

    
695
    if self.op.drbd_helper:
696
      # checks given drbd helper on all nodes
697
      helpers = self.rpc.call_drbd_helper(node_list)
698
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
699
        if ninfo.offline:
700
          self.LogInfo("Not checking drbd helper on offline node %s", node)
701
          continue
702
        msg = helpers[node].fail_msg
703
        if msg:
704
          raise errors.OpPrereqError("Error checking drbd helper on node"
705
                                     " '%s': %s" % (node, msg),
706
                                     errors.ECODE_ENVIRON)
707
        node_helper = helpers[node].payload
708
        if node_helper != self.op.drbd_helper:
709
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
710
                                     (node, node_helper), errors.ECODE_ENVIRON)
711

    
712
    self.cluster = cluster = self.cfg.GetClusterInfo()
713
    # validate params changes
714
    if self.op.beparams:
715
      objects.UpgradeBeParams(self.op.beparams)
716
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
717
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
718

    
719
    if self.op.ndparams:
720
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
721
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
722

    
723
      # TODO: we need a more general way to handle resetting
724
      # cluster-level parameters to default values
725
      if self.new_ndparams["oob_program"] == "":
726
        self.new_ndparams["oob_program"] = \
727
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
728

    
729
    if self.op.hv_state:
730
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
731
                                           self.cluster.hv_state_static)
732
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
733
                               for hv, values in new_hv_state.items())
734

    
735
    if self.op.disk_state:
736
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
737
                                               self.cluster.disk_state_static)
738
      self.new_disk_state = \
739
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
740
                            for name, values in svalues.items()))
741
             for storage, svalues in new_disk_state.items())
742

    
743
    if self.op.ipolicy:
744
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
745
                                           group_policy=False)
746

    
747
      all_instances = self.cfg.GetAllInstancesInfo().values()
748
      violations = set()
749
      for group in self.cfg.GetAllNodeGroupsInfo().values():
750
        instances = frozenset([inst for inst in all_instances
751
                               if compat.any(node in group.members
752
                                             for node in inst.all_nodes)])
753
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
754
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
755
        new = ComputeNewInstanceViolations(ipol,
756
                                           new_ipolicy, instances, self.cfg)
757
        if new:
758
          violations.update(new)
759

    
760
      if violations:
761
        self.LogWarning("After the ipolicy change the following instances"
762
                        " violate them: %s",
763
                        utils.CommaJoin(utils.NiceSort(violations)))
764

    
765
    if self.op.nicparams:
766
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
767
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
768
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
769
      nic_errors = []
770

    
771
      # check all instances for consistency
772
      for instance in self.cfg.GetAllInstancesInfo().values():
773
        for nic_idx, nic in enumerate(instance.nics):
774
          params_copy = copy.deepcopy(nic.nicparams)
775
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
776

    
777
          # check parameter syntax
778
          try:
779
            objects.NIC.CheckParameterSyntax(params_filled)
780
          except errors.ConfigurationError, err:
781
            nic_errors.append("Instance %s, nic/%d: %s" %
782
                              (instance.name, nic_idx, err))
783

    
784
          # if we're moving instances to routed, check that they have an ip
785
          target_mode = params_filled[constants.NIC_MODE]
786
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
787
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
788
                              " address" % (instance.name, nic_idx))
789
      if nic_errors:
790
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
791
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
792

    
793
    # hypervisor list/parameters
794
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
795
    if self.op.hvparams:
796
      for hv_name, hv_dict in self.op.hvparams.items():
797
        if hv_name not in self.new_hvparams:
798
          self.new_hvparams[hv_name] = hv_dict
799
        else:
800
          self.new_hvparams[hv_name].update(hv_dict)
801

    
802
    # disk template parameters
803
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
804
    if self.op.diskparams:
805
      for dt_name, dt_params in self.op.diskparams.items():
806
        if dt_name not in self.op.diskparams:
807
          self.new_diskparams[dt_name] = dt_params
808
        else:
809
          self.new_diskparams[dt_name].update(dt_params)
810

    
811
    # os hypervisor parameters
812
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
813
    if self.op.os_hvp:
814
      for os_name, hvs in self.op.os_hvp.items():
815
        if os_name not in self.new_os_hvp:
816
          self.new_os_hvp[os_name] = hvs
817
        else:
818
          for hv_name, hv_dict in hvs.items():
819
            if hv_dict is None:
820
              # Delete if it exists
821
              self.new_os_hvp[os_name].pop(hv_name, None)
822
            elif hv_name not in self.new_os_hvp[os_name]:
823
              self.new_os_hvp[os_name][hv_name] = hv_dict
824
            else:
825
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
826

    
827
    # os parameters
828
    self.new_osp = objects.FillDict(cluster.osparams, {})
829
    if self.op.osparams:
830
      for os_name, osp in self.op.osparams.items():
831
        if os_name not in self.new_osp:
832
          self.new_osp[os_name] = {}
833

    
834
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
835
                                                 use_none=True)
836

    
837
        if not self.new_osp[os_name]:
838
          # we removed all parameters
839
          del self.new_osp[os_name]
840
        else:
841
          # check the parameter validity (remote check)
842
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
843
                        os_name, self.new_osp[os_name])
844

    
845
    # changes to the hypervisor list
846
    if self.op.enabled_hypervisors is not None:
847
      self.hv_list = self.op.enabled_hypervisors
848
      for hv in self.hv_list:
849
        # if the hypervisor doesn't already exist in the cluster
850
        # hvparams, we initialize it to empty, and then (in both
851
        # cases) we make sure to fill the defaults, as we might not
852
        # have a complete defaults list if the hypervisor wasn't
853
        # enabled before
854
        if hv not in new_hvp:
855
          new_hvp[hv] = {}
856
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
857
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
858
    else:
859
      self.hv_list = cluster.enabled_hypervisors
860

    
861
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
862
      # either the enabled list has changed, or the parameters have, validate
863
      for hv_name, hv_params in self.new_hvparams.items():
864
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
865
            (self.op.enabled_hypervisors and
866
             hv_name in self.op.enabled_hypervisors)):
867
          # either this is a new hypervisor, or its parameters have changed
868
          hv_class = hypervisor.GetHypervisorClass(hv_name)
869
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
870
          hv_class.CheckParameterSyntax(hv_params)
871
          CheckHVParams(self, node_list, hv_name, hv_params)
872

    
873
    self._CheckDiskTemplateConsistency()
874

    
875
    if self.op.os_hvp:
876
      # no need to check any newly-enabled hypervisors, since the
877
      # defaults have already been checked in the above code-block
878
      for os_name, os_hvp in self.new_os_hvp.items():
879
        for hv_name, hv_params in os_hvp.items():
880
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
881
          # we need to fill in the new os_hvp on top of the actual hv_p
882
          cluster_defaults = self.new_hvparams.get(hv_name, {})
883
          new_osp = objects.FillDict(cluster_defaults, hv_params)
884
          hv_class = hypervisor.GetHypervisorClass(hv_name)
885
          hv_class.CheckParameterSyntax(new_osp)
886
          CheckHVParams(self, node_list, hv_name, new_osp)
887

    
888
    if self.op.default_iallocator:
889
      alloc_script = utils.FindFile(self.op.default_iallocator,
890
                                    constants.IALLOCATOR_SEARCH_PATH,
891
                                    os.path.isfile)
892
      if alloc_script is None:
893
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
894
                                   " specified" % self.op.default_iallocator,
895
                                   errors.ECODE_INVAL)
896

    
897
  def _CheckDiskTemplateConsistency(self):
898
    """Check whether the disk templates that are going to be disabled
899
       are still in use by some instances.
900

901
    """
902
    if self.op.enabled_disk_templates:
903
      cluster = self.cfg.GetClusterInfo()
904
      instances = self.cfg.GetAllInstancesInfo()
905

    
906
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
907
        - set(self.op.enabled_disk_templates)
908
      for instance in instances.itervalues():
909
        if instance.disk_template in disk_templates_to_remove:
910
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
911
                                     " because instance '%s' is using it." %
912
                                     (instance.disk_template, instance.name))
913

    
914
  def Exec(self, feedback_fn):
915
    """Change the parameters of the cluster.
916

917
    """
918
    if self.op.vg_name is not None:
919
      new_volume = self.op.vg_name
920
      if not new_volume:
921
        new_volume = None
922
      if new_volume != self.cfg.GetVGName():
923
        self.cfg.SetVGName(new_volume)
924
      else:
925
        feedback_fn("Cluster LVM configuration already in desired"
926
                    " state, not changing")
927
    if self.op.drbd_helper is not None:
928
      new_helper = self.op.drbd_helper
929
      if not new_helper:
930
        new_helper = None
931
      if new_helper != self.cfg.GetDRBDHelper():
932
        self.cfg.SetDRBDHelper(new_helper)
933
      else:
934
        feedback_fn("Cluster DRBD helper already in desired state,"
935
                    " not changing")
936
    if self.op.hvparams:
937
      self.cluster.hvparams = self.new_hvparams
938
    if self.op.os_hvp:
939
      self.cluster.os_hvp = self.new_os_hvp
940
    if self.op.enabled_hypervisors is not None:
941
      self.cluster.hvparams = self.new_hvparams
942
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
943
    if self.op.enabled_disk_templates:
944
      self.cluster.enabled_disk_templates = \
945
        list(set(self.op.enabled_disk_templates))
946
    if self.op.beparams:
947
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
948
    if self.op.nicparams:
949
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
950
    if self.op.ipolicy:
951
      self.cluster.ipolicy = self.new_ipolicy
952
    if self.op.osparams:
953
      self.cluster.osparams = self.new_osp
954
    if self.op.ndparams:
955
      self.cluster.ndparams = self.new_ndparams
956
    if self.op.diskparams:
957
      self.cluster.diskparams = self.new_diskparams
958
    if self.op.hv_state:
959
      self.cluster.hv_state_static = self.new_hv_state
960
    if self.op.disk_state:
961
      self.cluster.disk_state_static = self.new_disk_state
962

    
963
    if self.op.candidate_pool_size is not None:
964
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
965
      # we need to update the pool size here, otherwise the save will fail
966
      AdjustCandidatePool(self, [])
967

    
968
    if self.op.maintain_node_health is not None:
969
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
970
        feedback_fn("Note: CONFD was disabled at build time, node health"
971
                    " maintenance is not useful (still enabling it)")
972
      self.cluster.maintain_node_health = self.op.maintain_node_health
973

    
974
    if self.op.prealloc_wipe_disks is not None:
975
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
976

    
977
    if self.op.add_uids is not None:
978
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
979

    
980
    if self.op.remove_uids is not None:
981
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
982

    
983
    if self.op.uid_pool is not None:
984
      self.cluster.uid_pool = self.op.uid_pool
985

    
986
    if self.op.default_iallocator is not None:
987
      self.cluster.default_iallocator = self.op.default_iallocator
988

    
989
    if self.op.reserved_lvs is not None:
990
      self.cluster.reserved_lvs = self.op.reserved_lvs
991

    
992
    if self.op.use_external_mip_script is not None:
993
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
994

    
995
    def helper_os(aname, mods, desc):
996
      desc += " OS list"
997
      lst = getattr(self.cluster, aname)
998
      for key, val in mods:
999
        if key == constants.DDM_ADD:
1000
          if val in lst:
1001
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1002
          else:
1003
            lst.append(val)
1004
        elif key == constants.DDM_REMOVE:
1005
          if val in lst:
1006
            lst.remove(val)
1007
          else:
1008
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1009
        else:
1010
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1011

    
1012
    if self.op.hidden_os:
1013
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1014

    
1015
    if self.op.blacklisted_os:
1016
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1017

    
1018
    if self.op.master_netdev:
1019
      master_params = self.cfg.GetMasterNetworkParameters()
1020
      ems = self.cfg.GetUseExternalMipScript()
1021
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1022
                  self.cluster.master_netdev)
1023
      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1024
                                                       master_params, ems)
1025
      if not self.op.force:
1026
        result.Raise("Could not disable the master ip")
1027
      else:
1028
        if result.fail_msg:
1029
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1030
                 result.fail_msg)
1031
          feedback_fn(msg)
1032
      feedback_fn("Changing master_netdev from %s to %s" %
1033
                  (master_params.netdev, self.op.master_netdev))
1034
      self.cluster.master_netdev = self.op.master_netdev
1035

    
1036
    if self.op.master_netmask:
1037
      master_params = self.cfg.GetMasterNetworkParameters()
1038
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1039
      result = self.rpc.call_node_change_master_netmask(master_params.name,
1040
                                                        master_params.netmask,
1041
                                                        self.op.master_netmask,
1042
                                                        master_params.ip,
1043
                                                        master_params.netdev)
1044
      if result.fail_msg:
1045
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
1046
        feedback_fn(msg)
1047

    
1048
      self.cluster.master_netmask = self.op.master_netmask
1049

    
1050
    self.cfg.Update(self.cluster, feedback_fn)
1051

    
1052
    if self.op.master_netdev:
1053
      master_params = self.cfg.GetMasterNetworkParameters()
1054
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1055
                  self.op.master_netdev)
1056
      ems = self.cfg.GetUseExternalMipScript()
1057
      result = self.rpc.call_node_activate_master_ip(master_params.name,
1058
                                                     master_params, ems)
1059
      if result.fail_msg:
1060
        self.LogWarning("Could not re-enable the master ip on"
1061
                        " the master, please restart manually: %s",
1062
                        result.fail_msg)
1063

    
1064

    
1065
class LUClusterVerify(NoHooksLU):
1066
  """Submits all jobs necessary to verify the cluster.
1067

1068
  """
1069
  REQ_BGL = False
1070

    
1071
  def ExpandNames(self):
1072
    self.needed_locks = {}
1073

    
1074
  def Exec(self, feedback_fn):
1075
    jobs = []
1076

    
1077
    if self.op.group_name:
1078
      groups = [self.op.group_name]
1079
      depends_fn = lambda: None
1080
    else:
1081
      groups = self.cfg.GetNodeGroupList()
1082

    
1083
      # Verify global configuration
1084
      jobs.append([
1085
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1086
        ])
1087

    
1088
      # Always depend on global verification
1089
      depends_fn = lambda: [(-len(jobs), [])]
1090

    
1091
    jobs.extend(
1092
      [opcodes.OpClusterVerifyGroup(group_name=group,
1093
                                    ignore_errors=self.op.ignore_errors,
1094
                                    depends=depends_fn())]
1095
      for group in groups)
1096

    
1097
    # Fix up all parameters
1098
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1099
      op.debug_simulate_errors = self.op.debug_simulate_errors
1100
      op.verbose = self.op.verbose
1101
      op.error_codes = self.op.error_codes
1102
      try:
1103
        op.skip_checks = self.op.skip_checks
1104
      except AttributeError:
1105
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1106

    
1107
    return ResultWithJobs(jobs)
1108

    
1109

    
1110
class _VerifyErrors(object):
1111
  """Mix-in for cluster/group verify LUs.
1112

1113
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1114
  self.op and self._feedback_fn to be available.)
1115

1116
  """
1117

    
1118
  ETYPE_FIELD = "code"
1119
  ETYPE_ERROR = "ERROR"
1120
  ETYPE_WARNING = "WARNING"
1121

    
1122
  def _Error(self, ecode, item, msg, *args, **kwargs):
1123
    """Format an error message.
1124

1125
    Based on the opcode's error_codes parameter, either format a
1126
    parseable error code, or a simpler error string.
1127

1128
    This must be called only from Exec and functions called from Exec.
1129

1130
    """
1131
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1132
    itype, etxt, _ = ecode
1133
    # If the error code is in the list of ignored errors, demote the error to a
1134
    # warning
1135
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1136
      ltype = self.ETYPE_WARNING
1137
    # first complete the msg
1138
    if args:
1139
      msg = msg % args
1140
    # then format the whole message
1141
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1142
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1143
    else:
1144
      if item:
1145
        item = " " + item
1146
      else:
1147
        item = ""
1148
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1149
    # and finally report it via the feedback_fn
1150
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1151
    # do not mark the operation as failed for WARN cases only
1152
    if ltype == self.ETYPE_ERROR:
1153
      self.bad = True
1154

    
1155
  def _ErrorIf(self, cond, *args, **kwargs):
1156
    """Log an error message if the passed condition is True.
1157

1158
    """
1159
    if (bool(cond)
1160
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1161
      self._Error(*args, **kwargs)
1162

    
1163

    
1164
def _VerifyCertificate(filename):
1165
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1166

1167
  @type filename: string
1168
  @param filename: Path to PEM file
1169

1170
  """
1171
  try:
1172
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1173
                                           utils.ReadFile(filename))
1174
  except Exception, err: # pylint: disable=W0703
1175
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1176
            "Failed to load X509 certificate %s: %s" % (filename, err))
1177

    
1178
  (errcode, msg) = \
1179
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1180
                                constants.SSL_CERT_EXPIRATION_ERROR)
1181

    
1182
  if msg:
1183
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1184
  else:
1185
    fnamemsg = None
1186

    
1187
  if errcode is None:
1188
    return (None, fnamemsg)
1189
  elif errcode == utils.CERT_WARNING:
1190
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1191
  elif errcode == utils.CERT_ERROR:
1192
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1193

    
1194
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1195

    
1196

    
1197
def _GetAllHypervisorParameters(cluster, instances):
1198
  """Compute the set of all hypervisor parameters.
1199

1200
  @type cluster: L{objects.Cluster}
1201
  @param cluster: the cluster object
1202
  @param instances: list of L{objects.Instance}
1203
  @param instances: additional instances from which to obtain parameters
1204
  @rtype: list of (origin, hypervisor, parameters)
1205
  @return: a list with all parameters found, indicating the hypervisor they
1206
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1207

1208
  """
1209
  hvp_data = []
1210

    
1211
  for hv_name in cluster.enabled_hypervisors:
1212
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1213

    
1214
  for os_name, os_hvp in cluster.os_hvp.items():
1215
    for hv_name, hv_params in os_hvp.items():
1216
      if hv_params:
1217
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1218
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1219

    
1220
  # TODO: collapse identical parameter values in a single one
1221
  for instance in instances:
1222
    if instance.hvparams:
1223
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1224
                       cluster.FillHV(instance)))
1225

    
1226
  return hvp_data
1227

    
1228

    
1229
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1230
  """Verifies the cluster config.
1231

1232
  """
1233
  REQ_BGL = False
1234

    
1235
  def _VerifyHVP(self, hvp_data):
1236
    """Verifies locally the syntax of the hypervisor parameters.
1237

1238
    """
1239
    for item, hv_name, hv_params in hvp_data:
1240
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1241
             (item, hv_name))
1242
      try:
1243
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1244
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1245
        hv_class.CheckParameterSyntax(hv_params)
1246
      except errors.GenericError, err:
1247
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1248

    
1249
  def ExpandNames(self):
1250
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1251
    self.share_locks = ShareAll()
1252

    
1253
  def CheckPrereq(self):
1254
    """Check prerequisites.
1255

1256
    """
1257
    # Retrieve all information
1258
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1259
    self.all_node_info = self.cfg.GetAllNodesInfo()
1260
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1261

    
1262
  def Exec(self, feedback_fn):
1263
    """Verify integrity of cluster, performing various test on nodes.
1264

1265
    """
1266
    self.bad = False
1267
    self._feedback_fn = feedback_fn
1268

    
1269
    feedback_fn("* Verifying cluster config")
1270

    
1271
    for msg in self.cfg.VerifyConfig():
1272
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1273

    
1274
    feedback_fn("* Verifying cluster certificate files")
1275

    
1276
    for cert_filename in pathutils.ALL_CERT_FILES:
1277
      (errcode, msg) = _VerifyCertificate(cert_filename)
1278
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1279

    
1280
    self._ErrorIf(not utils.CanRead(constants.CONFD_USER,
1281
                                    pathutils.NODED_CERT_FILE),
1282
                  constants.CV_ECLUSTERCERT,
1283
                  None,
1284
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1285
                    constants.CONFD_USER + " user")
1286

    
1287
    feedback_fn("* Verifying hypervisor parameters")
1288

    
1289
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1290
                                                self.all_inst_info.values()))
1291

    
1292
    feedback_fn("* Verifying all nodes belong to an existing group")
1293

    
1294
    # We do this verification here because, should this bogus circumstance
1295
    # occur, it would never be caught by VerifyGroup, which only acts on
1296
    # nodes/instances reachable from existing node groups.
1297

    
1298
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1299
                         if node.group not in self.all_group_info)
1300

    
1301
    dangling_instances = {}
1302
    no_node_instances = []
1303

    
1304
    for inst in self.all_inst_info.values():
1305
      if inst.primary_node in dangling_nodes:
1306
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1307
      elif inst.primary_node not in self.all_node_info:
1308
        no_node_instances.append(inst.name)
1309

    
1310
    pretty_dangling = [
1311
        "%s (%s)" %
1312
        (node.name,
1313
         utils.CommaJoin(dangling_instances.get(node.name,
1314
                                                ["no instances"])))
1315
        for node in dangling_nodes]
1316

    
1317
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1318
                  None,
1319
                  "the following nodes (and their instances) belong to a non"
1320
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1321

    
1322
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1323
                  None,
1324
                  "the following instances have a non-existing primary-node:"
1325
                  " %s", utils.CommaJoin(no_node_instances))
1326

    
1327
    return not self.bad
1328

    
1329

    
1330
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1331
  """Verifies the status of a node group.
1332

1333
  """
1334
  HPATH = "cluster-verify"
1335
  HTYPE = constants.HTYPE_CLUSTER
1336
  REQ_BGL = False
1337

    
1338
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1339

    
1340
  class NodeImage(object):
1341
    """A class representing the logical and physical status of a node.
1342

1343
    @type name: string
1344
    @ivar name: the node name to which this object refers
1345
    @ivar volumes: a structure as returned from
1346
        L{ganeti.backend.GetVolumeList} (runtime)
1347
    @ivar instances: a list of running instances (runtime)
1348
    @ivar pinst: list of configured primary instances (config)
1349
    @ivar sinst: list of configured secondary instances (config)
1350
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1351
        instances for which this node is secondary (config)
1352
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1353
    @ivar dfree: free disk, as reported by the node (runtime)
1354
    @ivar offline: the offline status (config)
1355
    @type rpc_fail: boolean
1356
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1357
        not whether the individual keys were correct) (runtime)
1358
    @type lvm_fail: boolean
1359
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1360
    @type hyp_fail: boolean
1361
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1362
    @type ghost: boolean
1363
    @ivar ghost: whether this is a known node or not (config)
1364
    @type os_fail: boolean
1365
    @ivar os_fail: whether the RPC call didn't return valid OS data
1366
    @type oslist: list
1367
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1368
    @type vm_capable: boolean
1369
    @ivar vm_capable: whether the node can host instances
1370
    @type pv_min: float
1371
    @ivar pv_min: size in MiB of the smallest PVs
1372
    @type pv_max: float
1373
    @ivar pv_max: size in MiB of the biggest PVs
1374

1375
    """
1376
    def __init__(self, offline=False, name=None, vm_capable=True):
1377
      self.name = name
1378
      self.volumes = {}
1379
      self.instances = []
1380
      self.pinst = []
1381
      self.sinst = []
1382
      self.sbp = {}
1383
      self.mfree = 0
1384
      self.dfree = 0
1385
      self.offline = offline
1386
      self.vm_capable = vm_capable
1387
      self.rpc_fail = False
1388
      self.lvm_fail = False
1389
      self.hyp_fail = False
1390
      self.ghost = False
1391
      self.os_fail = False
1392
      self.oslist = {}
1393
      self.pv_min = None
1394
      self.pv_max = None
1395

    
1396
  def ExpandNames(self):
1397
    # This raises errors.OpPrereqError on its own:
1398
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1399

    
1400
    # Get instances in node group; this is unsafe and needs verification later
1401
    inst_names = \
1402
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1403

    
1404
    self.needed_locks = {
1405
      locking.LEVEL_INSTANCE: inst_names,
1406
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1407
      locking.LEVEL_NODE: [],
1408

    
1409
      # This opcode is run by watcher every five minutes and acquires all nodes
1410
      # for a group. It doesn't run for a long time, so it's better to acquire
1411
      # the node allocation lock as well.
1412
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1413
      }
1414

    
1415
    self.share_locks = ShareAll()
1416

    
1417
  def DeclareLocks(self, level):
1418
    if level == locking.LEVEL_NODE:
1419
      # Get members of node group; this is unsafe and needs verification later
1420
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1421

    
1422
      all_inst_info = self.cfg.GetAllInstancesInfo()
1423

    
1424
      # In Exec(), we warn about mirrored instances that have primary and
1425
      # secondary living in separate node groups. To fully verify that
1426
      # volumes for these instances are healthy, we will need to do an
1427
      # extra call to their secondaries. We ensure here those nodes will
1428
      # be locked.
1429
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1430
        # Important: access only the instances whose lock is owned
1431
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1432
          nodes.update(all_inst_info[inst].secondary_nodes)
1433

    
1434
      self.needed_locks[locking.LEVEL_NODE] = nodes
1435

    
1436
  def CheckPrereq(self):
1437
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1438
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1439

    
1440
    group_nodes = set(self.group_info.members)
1441
    group_instances = \
1442
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1443

    
1444
    unlocked_nodes = \
1445
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1446

    
1447
    unlocked_instances = \
1448
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1449

    
1450
    if unlocked_nodes:
1451
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1452
                                 utils.CommaJoin(unlocked_nodes),
1453
                                 errors.ECODE_STATE)
1454

    
1455
    if unlocked_instances:
1456
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1457
                                 utils.CommaJoin(unlocked_instances),
1458
                                 errors.ECODE_STATE)
1459

    
1460
    self.all_node_info = self.cfg.GetAllNodesInfo()
1461
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1462

    
1463
    self.my_node_names = utils.NiceSort(group_nodes)
1464
    self.my_inst_names = utils.NiceSort(group_instances)
1465

    
1466
    self.my_node_info = dict((name, self.all_node_info[name])
1467
                             for name in self.my_node_names)
1468

    
1469
    self.my_inst_info = dict((name, self.all_inst_info[name])
1470
                             for name in self.my_inst_names)
1471

    
1472
    # We detect here the nodes that will need the extra RPC calls for verifying
1473
    # split LV volumes; they should be locked.
1474
    extra_lv_nodes = set()
1475

    
1476
    for inst in self.my_inst_info.values():
1477
      if inst.disk_template in constants.DTS_INT_MIRROR:
1478
        for nname in inst.all_nodes:
1479
          if self.all_node_info[nname].group != self.group_uuid:
1480
            extra_lv_nodes.add(nname)
1481

    
1482
    unlocked_lv_nodes = \
1483
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1484

    
1485
    if unlocked_lv_nodes:
1486
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1487
                                 utils.CommaJoin(unlocked_lv_nodes),
1488
                                 errors.ECODE_STATE)
1489
    self.extra_lv_nodes = list(extra_lv_nodes)
1490

    
1491
  def _VerifyNode(self, ninfo, nresult):
1492
    """Perform some basic validation on data returned from a node.
1493

1494
      - check the result data structure is well formed and has all the
1495
        mandatory fields
1496
      - check ganeti version
1497

1498
    @type ninfo: L{objects.Node}
1499
    @param ninfo: the node to check
1500
    @param nresult: the results from the node
1501
    @rtype: boolean
1502
    @return: whether overall this call was successful (and we can expect
1503
         reasonable values in the respose)
1504

1505
    """
1506
    node = ninfo.name
1507
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1508

    
1509
    # main result, nresult should be a non-empty dict
1510
    test = not nresult or not isinstance(nresult, dict)
1511
    _ErrorIf(test, constants.CV_ENODERPC, node,
1512
                  "unable to verify node: no data returned")
1513
    if test:
1514
      return False
1515

    
1516
    # compares ganeti version
1517
    local_version = constants.PROTOCOL_VERSION
1518
    remote_version = nresult.get("version", None)
1519
    test = not (remote_version and
1520
                isinstance(remote_version, (list, tuple)) and
1521
                len(remote_version) == 2)
1522
    _ErrorIf(test, constants.CV_ENODERPC, node,
1523
             "connection to node returned invalid data")
1524
    if test:
1525
      return False
1526

    
1527
    test = local_version != remote_version[0]
1528
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1529
             "incompatible protocol versions: master %s,"
1530
             " node %s", local_version, remote_version[0])
1531
    if test:
1532
      return False
1533

    
1534
    # node seems compatible, we can actually try to look into its results
1535

    
1536
    # full package version
1537
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1538
                  constants.CV_ENODEVERSION, node,
1539
                  "software version mismatch: master %s, node %s",
1540
                  constants.RELEASE_VERSION, remote_version[1],
1541
                  code=self.ETYPE_WARNING)
1542

    
1543
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1544
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1545
      for hv_name, hv_result in hyp_result.iteritems():
1546
        test = hv_result is not None
1547
        _ErrorIf(test, constants.CV_ENODEHV, node,
1548
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1549

    
1550
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1551
    if ninfo.vm_capable and isinstance(hvp_result, list):
1552
      for item, hv_name, hv_result in hvp_result:
1553
        _ErrorIf(True, constants.CV_ENODEHV, node,
1554
                 "hypervisor %s parameter verify failure (source %s): %s",
1555
                 hv_name, item, hv_result)
1556

    
1557
    test = nresult.get(constants.NV_NODESETUP,
1558
                       ["Missing NODESETUP results"])
1559
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1560
             "; ".join(test))
1561

    
1562
    return True
1563

    
1564
  def _VerifyNodeTime(self, ninfo, nresult,
1565
                      nvinfo_starttime, nvinfo_endtime):
1566
    """Check the node time.
1567

1568
    @type ninfo: L{objects.Node}
1569
    @param ninfo: the node to check
1570
    @param nresult: the remote results for the node
1571
    @param nvinfo_starttime: the start time of the RPC call
1572
    @param nvinfo_endtime: the end time of the RPC call
1573

1574
    """
1575
    node = ninfo.name
1576
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1577

    
1578
    ntime = nresult.get(constants.NV_TIME, None)
1579
    try:
1580
      ntime_merged = utils.MergeTime(ntime)
1581
    except (ValueError, TypeError):
1582
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1583
      return
1584

    
1585
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1586
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1587
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1588
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1589
    else:
1590
      ntime_diff = None
1591

    
1592
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1593
             "Node time diverges by at least %s from master node time",
1594
             ntime_diff)
1595

    
1596
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1597
    """Check the node LVM results and update info for cross-node checks.
1598

1599
    @type ninfo: L{objects.Node}
1600
    @param ninfo: the node to check
1601
    @param nresult: the remote results for the node
1602
    @param vg_name: the configured VG name
1603
    @type nimg: L{NodeImage}
1604
    @param nimg: node image
1605

1606
    """
1607
    if vg_name is None:
1608
      return
1609

    
1610
    node = ninfo.name
1611
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1612

    
1613
    # checks vg existence and size > 20G
1614
    vglist = nresult.get(constants.NV_VGLIST, None)
1615
    test = not vglist
1616
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1617
    if not test:
1618
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1619
                                            constants.MIN_VG_SIZE)
1620
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1621

    
1622
    # Check PVs
1623
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1624
    for em in errmsgs:
1625
      self._Error(constants.CV_ENODELVM, node, em)
1626
    if pvminmax is not None:
1627
      (nimg.pv_min, nimg.pv_max) = pvminmax
1628

    
1629
  def _VerifyGroupLVM(self, node_image, vg_name):
1630
    """Check cross-node consistency in LVM.
1631

1632
    @type node_image: dict
1633
    @param node_image: info about nodes, mapping from node to names to
1634
      L{NodeImage} objects
1635
    @param vg_name: the configured VG name
1636

1637
    """
1638
    if vg_name is None:
1639
      return
1640

    
1641
    # Only exlcusive storage needs this kind of checks
1642
    if not self._exclusive_storage:
1643
      return
1644

    
1645
    # exclusive_storage wants all PVs to have the same size (approximately),
1646
    # if the smallest and the biggest ones are okay, everything is fine.
1647
    # pv_min is None iff pv_max is None
1648
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1649
    if not vals:
1650
      return
1651
    (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1652
    (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1653
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1654
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1655
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1656
                  " on %s, biggest (%s MB) is on %s",
1657
                  pvmin, minnode, pvmax, maxnode)
1658

    
1659
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1660
    """Check the node bridges.
1661

1662
    @type ninfo: L{objects.Node}
1663
    @param ninfo: the node to check
1664
    @param nresult: the remote results for the node
1665
    @param bridges: the expected list of bridges
1666

1667
    """
1668
    if not bridges:
1669
      return
1670

    
1671
    node = ninfo.name
1672
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1673

    
1674
    missing = nresult.get(constants.NV_BRIDGES, None)
1675
    test = not isinstance(missing, list)
1676
    _ErrorIf(test, constants.CV_ENODENET, node,
1677
             "did not return valid bridge information")
1678
    if not test:
1679
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1680
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1681

    
1682
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1683
    """Check the results of user scripts presence and executability on the node
1684

1685
    @type ninfo: L{objects.Node}
1686
    @param ninfo: the node to check
1687
    @param nresult: the remote results for the node
1688

1689
    """
1690
    node = ninfo.name
1691

    
1692
    test = not constants.NV_USERSCRIPTS in nresult
1693
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1694
                  "did not return user scripts information")
1695

    
1696
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1697
    if not test:
1698
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1699
                    "user scripts not present or not executable: %s" %
1700
                    utils.CommaJoin(sorted(broken_scripts)))
1701

    
1702
  def _VerifyNodeNetwork(self, ninfo, nresult):
1703
    """Check the node network connectivity results.
1704

1705
    @type ninfo: L{objects.Node}
1706
    @param ninfo: the node to check
1707
    @param nresult: the remote results for the node
1708

1709
    """
1710
    node = ninfo.name
1711
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1712

    
1713
    test = constants.NV_NODELIST not in nresult
1714
    _ErrorIf(test, constants.CV_ENODESSH, node,
1715
             "node hasn't returned node ssh connectivity data")
1716
    if not test:
1717
      if nresult[constants.NV_NODELIST]:
1718
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1719
          _ErrorIf(True, constants.CV_ENODESSH, node,
1720
                   "ssh communication with node '%s': %s", a_node, a_msg)
1721

    
1722
    test = constants.NV_NODENETTEST not in nresult
1723
    _ErrorIf(test, constants.CV_ENODENET, node,
1724
             "node hasn't returned node tcp connectivity data")
1725
    if not test:
1726
      if nresult[constants.NV_NODENETTEST]:
1727
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1728
        for anode in nlist:
1729
          _ErrorIf(True, constants.CV_ENODENET, node,
1730
                   "tcp communication with node '%s': %s",
1731
                   anode, nresult[constants.NV_NODENETTEST][anode])
1732

    
1733
    test = constants.NV_MASTERIP not in nresult
1734
    _ErrorIf(test, constants.CV_ENODENET, node,
1735
             "node hasn't returned node master IP reachability data")
1736
    if not test:
1737
      if not nresult[constants.NV_MASTERIP]:
1738
        if node == self.master_node:
1739
          msg = "the master node cannot reach the master IP (not configured?)"
1740
        else:
1741
          msg = "cannot reach the master IP"
1742
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
1743

    
1744
  def _VerifyInstance(self, instance, inst_config, node_image,
1745
                      diskstatus):
1746
    """Verify an instance.
1747

1748
    This function checks to see if the required block devices are
1749
    available on the instance's node, and that the nodes are in the correct
1750
    state.
1751

1752
    """
1753
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1754
    pnode = inst_config.primary_node
1755
    pnode_img = node_image[pnode]
1756
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1757

    
1758
    node_vol_should = {}
1759
    inst_config.MapLVsByNode(node_vol_should)
1760

    
1761
    cluster = self.cfg.GetClusterInfo()
1762
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1763
                                                            self.group_info)
1764
    err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1765
    _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1766
             code=self.ETYPE_WARNING)
1767

    
1768
    for node in node_vol_should:
1769
      n_img = node_image[node]
1770
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1771
        # ignore missing volumes on offline or broken nodes
1772
        continue
1773
      for volume in node_vol_should[node]:
1774
        test = volume not in n_img.volumes
1775
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1776
                 "volume %s missing on node %s", volume, node)
1777

    
1778
    if inst_config.admin_state == constants.ADMINST_UP:
1779
      test = instance not in pnode_img.instances and not pnode_img.offline
1780
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1781
               "instance not running on its primary node %s",
1782
               pnode)
1783
      _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1784
               "instance is marked as running and lives on offline node %s",
1785
               pnode)
1786

    
1787
    diskdata = [(nname, success, status, idx)
1788
                for (nname, disks) in diskstatus.items()
1789
                for idx, (success, status) in enumerate(disks)]
1790

    
1791
    for nname, success, bdev_status, idx in diskdata:
1792
      # the 'ghost node' construction in Exec() ensures that we have a
1793
      # node here
1794
      snode = node_image[nname]
1795
      bad_snode = snode.ghost or snode.offline
1796
      _ErrorIf(inst_config.disks_active and
1797
               not success and not bad_snode,
1798
               constants.CV_EINSTANCEFAULTYDISK, instance,
1799
               "couldn't retrieve status for disk/%s on %s: %s",
1800
               idx, nname, bdev_status)
1801
      _ErrorIf((inst_config.disks_active and
1802
                success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1803
               constants.CV_EINSTANCEFAULTYDISK, instance,
1804
               "disk/%s on %s is faulty", idx, nname)
1805

    
1806
    _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1807
             constants.CV_ENODERPC, pnode, "instance %s, connection to"
1808
             " primary node failed", instance)
1809

    
1810
    _ErrorIf(len(inst_config.secondary_nodes) > 1,
1811
             constants.CV_EINSTANCELAYOUT,
1812
             instance, "instance has multiple secondary nodes: %s",
1813
             utils.CommaJoin(inst_config.secondary_nodes),
1814
             code=self.ETYPE_WARNING)
1815

    
1816
    if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1817
      # Disk template not compatible with exclusive_storage: no instance
1818
      # node should have the flag set
1819
      es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1820
                                                     inst_config.all_nodes)
1821
      es_nodes = [n for (n, es) in es_flags.items()
1822
                  if es]
1823
      _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance,
1824
               "instance has template %s, which is not supported on nodes"
1825
               " that have exclusive storage set: %s",
1826
               inst_config.disk_template, utils.CommaJoin(es_nodes))
1827

    
1828
    if inst_config.disk_template in constants.DTS_INT_MIRROR:
1829
      instance_nodes = utils.NiceSort(inst_config.all_nodes)
1830
      instance_groups = {}
1831

    
1832
      for node in instance_nodes:
1833
        instance_groups.setdefault(self.all_node_info[node].group,
1834
                                   []).append(node)
1835

    
1836
      pretty_list = [
1837
        "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1838
        # Sort so that we always list the primary node first.
1839
        for group, nodes in sorted(instance_groups.items(),
1840
                                   key=lambda (_, nodes): pnode in nodes,
1841
                                   reverse=True)]
1842

    
1843
      self._ErrorIf(len(instance_groups) > 1,
1844
                    constants.CV_EINSTANCESPLITGROUPS,
1845
                    instance, "instance has primary and secondary nodes in"
1846
                    " different groups: %s", utils.CommaJoin(pretty_list),
1847
                    code=self.ETYPE_WARNING)
1848

    
1849
    inst_nodes_offline = []
1850
    for snode in inst_config.secondary_nodes:
1851
      s_img = node_image[snode]
1852
      _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1853
               snode, "instance %s, connection to secondary node failed",
1854
               instance)
1855

    
1856
      if s_img.offline:
1857
        inst_nodes_offline.append(snode)
1858

    
1859
    # warn that the instance lives on offline nodes
1860
    _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1861
             "instance has offline secondary node(s) %s",
1862
             utils.CommaJoin(inst_nodes_offline))
1863
    # ... or ghost/non-vm_capable nodes
1864
    for node in inst_config.all_nodes:
1865
      _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1866
               instance, "instance lives on ghost node %s", node)
1867
      _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1868
               instance, "instance lives on non-vm_capable node %s", node)
1869

    
1870
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1871
    """Verify if there are any unknown volumes in the cluster.
1872

1873
    The .os, .swap and backup volumes are ignored. All other volumes are
1874
    reported as unknown.
1875

1876
    @type reserved: L{ganeti.utils.FieldSet}
1877
    @param reserved: a FieldSet of reserved volume names
1878

1879
    """
1880
    for node, n_img in node_image.items():
1881
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1882
          self.all_node_info[node].group != self.group_uuid):
1883
        # skip non-healthy nodes
1884
        continue
1885
      for volume in n_img.volumes:
1886
        test = ((node not in node_vol_should or
1887
                volume not in node_vol_should[node]) and
1888
                not reserved.Matches(volume))
1889
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1890
                      "volume %s is unknown", volume)
1891

    
1892
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1893
    """Verify N+1 Memory Resilience.
1894

1895
    Check that if one single node dies we can still start all the
1896
    instances it was primary for.
1897

1898
    """
1899
    cluster_info = self.cfg.GetClusterInfo()
1900
    for node, n_img in node_image.items():
1901
      # This code checks that every node which is now listed as
1902
      # secondary has enough memory to host all instances it is
1903
      # supposed to should a single other node in the cluster fail.
1904
      # FIXME: not ready for failover to an arbitrary node
1905
      # FIXME: does not support file-backed instances
1906
      # WARNING: we currently take into account down instances as well
1907
      # as up ones, considering that even if they're down someone
1908
      # might want to start them even in the event of a node failure.
1909
      if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1910
        # we're skipping nodes marked offline and nodes in other groups from
1911
        # the N+1 warning, since most likely we don't have good memory
1912
        # infromation from them; we already list instances living on such
1913
        # nodes, and that's enough warning
1914
        continue
1915
      #TODO(dynmem): also consider ballooning out other instances
1916
      for prinode, instances in n_img.sbp.items():
1917
        needed_mem = 0
1918
        for instance in instances:
1919
          bep = cluster_info.FillBE(instance_cfg[instance])
1920
          if bep[constants.BE_AUTO_BALANCE]:
1921
            needed_mem += bep[constants.BE_MINMEM]
1922
        test = n_img.mfree < needed_mem
1923
        self._ErrorIf(test, constants.CV_ENODEN1, node,
1924
                      "not enough memory to accomodate instance failovers"
1925
                      " should node %s fail (%dMiB needed, %dMiB available)",
1926
                      prinode, needed_mem, n_img.mfree)
1927

    
1928
  @classmethod
1929
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1930
                   (files_all, files_opt, files_mc, files_vm)):
1931
    """Verifies file checksums collected from all nodes.
1932

1933
    @param errorif: Callback for reporting errors
1934
    @param nodeinfo: List of L{objects.Node} objects
1935
    @param master_node: Name of master node
1936
    @param all_nvinfo: RPC results
1937

1938
    """
1939
    # Define functions determining which nodes to consider for a file
1940
    files2nodefn = [
1941
      (files_all, None),
1942
      (files_mc, lambda node: (node.master_candidate or
1943
                               node.name == master_node)),
1944
      (files_vm, lambda node: node.vm_capable),
1945
      ]
1946

    
1947
    # Build mapping from filename to list of nodes which should have the file
1948
    nodefiles = {}
1949
    for (files, fn) in files2nodefn:
1950
      if fn is None:
1951
        filenodes = nodeinfo
1952
      else:
1953
        filenodes = filter(fn, nodeinfo)
1954
      nodefiles.update((filename,
1955
                        frozenset(map(operator.attrgetter("name"), filenodes)))
1956
                       for filename in files)
1957

    
1958
    assert set(nodefiles) == (files_all | files_mc | files_vm)
1959

    
1960
    fileinfo = dict((filename, {}) for filename in nodefiles)
1961
    ignore_nodes = set()
1962

    
1963
    for node in nodeinfo:
1964
      if node.offline:
1965
        ignore_nodes.add(node.name)
1966
        continue
1967

    
1968
      nresult = all_nvinfo[node.name]
1969

    
1970
      if nresult.fail_msg or not nresult.payload:
1971
        node_files = None
1972
      else:
1973
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
1974
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
1975
                          for (key, value) in fingerprints.items())
1976
        del fingerprints
1977

    
1978
      test = not (node_files and isinstance(node_files, dict))
1979
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
1980
              "Node did not return file checksum data")
1981
      if test:
1982
        ignore_nodes.add(node.name)
1983
        continue
1984

    
1985
      # Build per-checksum mapping from filename to nodes having it
1986
      for (filename, checksum) in node_files.items():
1987
        assert filename in nodefiles
1988
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
1989

    
1990
    for (filename, checksums) in fileinfo.items():
1991
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1992

    
1993
      # Nodes having the file
1994
      with_file = frozenset(node_name
1995
                            for nodes in fileinfo[filename].values()
1996
                            for node_name in nodes) - ignore_nodes
1997

    
1998
      expected_nodes = nodefiles[filename] - ignore_nodes
1999

    
2000
      # Nodes missing file
2001
      missing_file = expected_nodes - with_file
2002

    
2003
      if filename in files_opt:
2004
        # All or no nodes
2005
        errorif(missing_file and missing_file != expected_nodes,
2006
                constants.CV_ECLUSTERFILECHECK, None,
2007
                "File %s is optional, but it must exist on all or no"
2008
                " nodes (not found on %s)",
2009
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2010
      else:
2011
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2012
                "File %s is missing from node(s) %s", filename,
2013
                utils.CommaJoin(utils.NiceSort(missing_file)))
2014

    
2015
        # Warn if a node has a file it shouldn't
2016
        unexpected = with_file - expected_nodes
2017
        errorif(unexpected,
2018
                constants.CV_ECLUSTERFILECHECK, None,
2019
                "File %s should not exist on node(s) %s",
2020
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2021

    
2022
      # See if there are multiple versions of the file
2023
      test = len(checksums) > 1
2024
      if test:
2025
        variants = ["variant %s on %s" %
2026
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2027
                    for (idx, (checksum, nodes)) in
2028
                      enumerate(sorted(checksums.items()))]
2029
      else:
2030
        variants = []
2031

    
2032
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2033
              "File %s found with %s different checksums (%s)",
2034
              filename, len(checksums), "; ".join(variants))
2035

    
2036
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2037
                      drbd_map):
2038
    """Verifies and the node DRBD status.
2039

2040
    @type ninfo: L{objects.Node}
2041
    @param ninfo: the node to check
2042
    @param nresult: the remote results for the node
2043
    @param instanceinfo: the dict of instances
2044
    @param drbd_helper: the configured DRBD usermode helper
2045
    @param drbd_map: the DRBD map as returned by
2046
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2047

2048
    """
2049
    node = ninfo.name
2050
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2051

    
2052
    if drbd_helper:
2053
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2054
      test = (helper_result is None)
2055
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2056
               "no drbd usermode helper returned")
2057
      if helper_result:
2058
        status, payload = helper_result
2059
        test = not status
2060
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2061
                 "drbd usermode helper check unsuccessful: %s", payload)
2062
        test = status and (payload != drbd_helper)
2063
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2064
                 "wrong drbd usermode helper: %s", payload)
2065

    
2066
    # compute the DRBD minors
2067
    node_drbd = {}
2068
    for minor, instance in drbd_map[node].items():
2069
      test = instance not in instanceinfo
2070
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2071
               "ghost instance '%s' in temporary DRBD map", instance)
2072
        # ghost instance should not be running, but otherwise we
2073
        # don't give double warnings (both ghost instance and
2074
        # unallocated minor in use)
2075
      if test:
2076
        node_drbd[minor] = (instance, False)
2077
      else:
2078
        instance = instanceinfo[instance]
2079
        node_drbd[minor] = (instance.name, instance.disks_active)
2080

    
2081
    # and now check them
2082
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2083
    test = not isinstance(used_minors, (tuple, list))
2084
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2085
             "cannot parse drbd status file: %s", str(used_minors))
2086
    if test:
2087
      # we cannot check drbd status
2088
      return
2089

    
2090
    for minor, (iname, must_exist) in node_drbd.items():
2091
      test = minor not in used_minors and must_exist
2092
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2093
               "drbd minor %d of instance %s is not active", minor, iname)
2094
    for minor in used_minors:
2095
      test = minor not in node_drbd
2096
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2097
               "unallocated drbd minor %d is in use", minor)
2098

    
2099
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2100
    """Builds the node OS structures.
2101

2102
    @type ninfo: L{objects.Node}
2103
    @param ninfo: the node to check
2104
    @param nresult: the remote results for the node
2105
    @param nimg: the node image object
2106

2107
    """
2108
    node = ninfo.name
2109
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2110

    
2111
    remote_os = nresult.get(constants.NV_OSLIST, None)
2112
    test = (not isinstance(remote_os, list) or
2113
            not compat.all(isinstance(v, list) and len(v) == 7
2114
                           for v in remote_os))
2115

    
2116
    _ErrorIf(test, constants.CV_ENODEOS, node,
2117
             "node hasn't returned valid OS data")
2118

    
2119
    nimg.os_fail = test
2120

    
2121
    if test:
2122
      return
2123

    
2124
    os_dict = {}
2125

    
2126
    for (name, os_path, status, diagnose,
2127
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2128

    
2129
      if name not in os_dict:
2130
        os_dict[name] = []
2131

    
2132
      # parameters is a list of lists instead of list of tuples due to
2133
      # JSON lacking a real tuple type, fix it:
2134
      parameters = [tuple(v) for v in parameters]
2135
      os_dict[name].append((os_path, status, diagnose,
2136
                            set(variants), set(parameters), set(api_ver)))
2137

    
2138
    nimg.oslist = os_dict
2139

    
2140
  def _VerifyNodeOS(self, ninfo, nimg, base):
2141
    """Verifies the node OS list.
2142

2143
    @type ninfo: L{objects.Node}
2144
    @param ninfo: the node to check
2145
    @param nimg: the node image object
2146
    @param base: the 'template' node we match against (e.g. from the master)
2147

2148
    """
2149
    node = ninfo.name
2150
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2151

    
2152
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2153

    
2154
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2155
    for os_name, os_data in nimg.oslist.items():
2156
      assert os_data, "Empty OS status for OS %s?!" % os_name
2157
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2158
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2159
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2160
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2161
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2162
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2163
      # comparisons with the 'base' image
2164
      test = os_name not in base.oslist
2165
      _ErrorIf(test, constants.CV_ENODEOS, node,
2166
               "Extra OS %s not present on reference node (%s)",
2167
               os_name, base.name)
2168
      if test:
2169
        continue
2170
      assert base.oslist[os_name], "Base node has empty OS status?"
2171
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2172
      if not b_status:
2173
        # base OS is invalid, skipping
2174
        continue
2175
      for kind, a, b in [("API version", f_api, b_api),
2176
                         ("variants list", f_var, b_var),
2177
                         ("parameters", beautify_params(f_param),
2178
                          beautify_params(b_param))]:
2179
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2180
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2181
                 kind, os_name, base.name,
2182
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2183

    
2184
    # check any missing OSes
2185
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2186
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2187
             "OSes present on reference node %s but missing on this node: %s",
2188
             base.name, utils.CommaJoin(missing))
2189

    
2190
  def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2191
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2192

2193
    @type ninfo: L{objects.Node}
2194
    @param ninfo: the node to check
2195
    @param nresult: the remote results for the node
2196
    @type is_master: bool
2197
    @param is_master: Whether node is the master node
2198

2199
    """
2200
    node = ninfo.name
2201

    
2202
    if (is_master and
2203
        (constants.ENABLE_FILE_STORAGE or
2204
         constants.ENABLE_SHARED_FILE_STORAGE)):
2205
      try:
2206
        fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2207
      except KeyError:
2208
        # This should never happen
2209
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2210
                      "Node did not return forbidden file storage paths")
2211
      else:
2212
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2213
                      "Found forbidden file storage paths: %s",
2214
                      utils.CommaJoin(fspaths))
2215
    else:
2216
      self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2217
                    constants.CV_ENODEFILESTORAGEPATHS, node,
2218
                    "Node should not have returned forbidden file storage"
2219
                    " paths")
2220

    
2221
  def _VerifyOob(self, ninfo, nresult):
2222
    """Verifies out of band functionality of a node.
2223

2224
    @type ninfo: L{objects.Node}
2225
    @param ninfo: the node to check
2226
    @param nresult: the remote results for the node
2227

2228
    """
2229
    node = ninfo.name
2230
    # We just have to verify the paths on master and/or master candidates
2231
    # as the oob helper is invoked on the master
2232
    if ((ninfo.master_candidate or ninfo.master_capable) and
2233
        constants.NV_OOB_PATHS in nresult):
2234
      for path_result in nresult[constants.NV_OOB_PATHS]:
2235
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2236

    
2237
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2238
    """Verifies and updates the node volume data.
2239

2240
    This function will update a L{NodeImage}'s internal structures
2241
    with data from the remote call.
2242

2243
    @type ninfo: L{objects.Node}
2244
    @param ninfo: the node to check
2245
    @param nresult: the remote results for the node
2246
    @param nimg: the node image object
2247
    @param vg_name: the configured VG name
2248

2249
    """
2250
    node = ninfo.name
2251
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2252

    
2253
    nimg.lvm_fail = True
2254
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2255
    if vg_name is None:
2256
      pass
2257
    elif isinstance(lvdata, basestring):
2258
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2259
               utils.SafeEncode(lvdata))
2260
    elif not isinstance(lvdata, dict):
2261
      _ErrorIf(True, constants.CV_ENODELVM, node,
2262
               "rpc call to node failed (lvlist)")
2263
    else:
2264
      nimg.volumes = lvdata
2265
      nimg.lvm_fail = False
2266

    
2267
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2268
    """Verifies and updates the node instance list.
2269

2270
    If the listing was successful, then updates this node's instance
2271
    list. Otherwise, it marks the RPC call as failed for the instance
2272
    list key.
2273

2274
    @type ninfo: L{objects.Node}
2275
    @param ninfo: the node to check
2276
    @param nresult: the remote results for the node
2277
    @param nimg: the node image object
2278

2279
    """
2280
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2281
    test = not isinstance(idata, list)
2282
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2283
                  "rpc call to node failed (instancelist): %s",
2284
                  utils.SafeEncode(str(idata)))
2285
    if test:
2286
      nimg.hyp_fail = True
2287
    else:
2288
      nimg.instances = idata
2289

    
2290
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2291
    """Verifies and computes a node information map
2292

2293
    @type ninfo: L{objects.Node}
2294
    @param ninfo: the node to check
2295
    @param nresult: the remote results for the node
2296
    @param nimg: the node image object
2297
    @param vg_name: the configured VG name
2298

2299
    """
2300
    node = ninfo.name
2301
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2302

    
2303
    # try to read free memory (from the hypervisor)
2304
    hv_info = nresult.get(constants.NV_HVINFO, None)
2305
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2306
    _ErrorIf(test, constants.CV_ENODEHV, node,
2307
             "rpc call to node failed (hvinfo)")
2308
    if not test:
2309
      try:
2310
        nimg.mfree = int(hv_info["memory_free"])
2311
      except (ValueError, TypeError):
2312
        _ErrorIf(True, constants.CV_ENODERPC, node,
2313
                 "node returned invalid nodeinfo, check hypervisor")
2314

    
2315
    # FIXME: devise a free space model for file based instances as well
2316
    if vg_name is not None:
2317
      test = (constants.NV_VGLIST not in nresult or
2318
              vg_name not in nresult[constants.NV_VGLIST])
2319
      _ErrorIf(test, constants.CV_ENODELVM, node,
2320
               "node didn't return data for the volume group '%s'"
2321
               " - it is either missing or broken", vg_name)
2322
      if not test:
2323
        try:
2324
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2325
        except (ValueError, TypeError):
2326
          _ErrorIf(True, constants.CV_ENODERPC, node,
2327
                   "node returned invalid LVM info, check LVM status")
2328

    
2329
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2330
    """Gets per-disk status information for all instances.
2331

2332
    @type nodelist: list of strings
2333
    @param nodelist: Node names
2334
    @type node_image: dict of (name, L{objects.Node})
2335
    @param node_image: Node objects
2336
    @type instanceinfo: dict of (name, L{objects.Instance})
2337
    @param instanceinfo: Instance objects
2338
    @rtype: {instance: {node: [(succes, payload)]}}
2339
    @return: a dictionary of per-instance dictionaries with nodes as
2340
        keys and disk information as values; the disk information is a
2341
        list of tuples (success, payload)
2342

2343
    """
2344
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2345

    
2346
    node_disks = {}
2347
    node_disks_devonly = {}
2348
    diskless_instances = set()
2349
    diskless = constants.DT_DISKLESS
2350

    
2351
    for nname in nodelist:
2352
      node_instances = list(itertools.chain(node_image[nname].pinst,
2353
                                            node_image[nname].sinst))
2354
      diskless_instances.update(inst for inst in node_instances
2355
                                if instanceinfo[inst].disk_template == diskless)
2356
      disks = [(inst, disk)
2357
               for inst in node_instances
2358
               for disk in instanceinfo[inst].disks]
2359

    
2360
      if not disks:
2361
        # No need to collect data
2362
        continue
2363

    
2364
      node_disks[nname] = disks
2365

    
2366
      # _AnnotateDiskParams makes already copies of the disks
2367
      devonly = []
2368
      for (inst, dev) in disks:
2369
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2370
        self.cfg.SetDiskID(anno_disk, nname)
2371
        devonly.append(anno_disk)
2372

    
2373
      node_disks_devonly[nname] = devonly
2374

    
2375
    assert len(node_disks) == len(node_disks_devonly)
2376

    
2377
    # Collect data from all nodes with disks
2378
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2379
                                                          node_disks_devonly)
2380

    
2381
    assert len(result) == len(node_disks)
2382

    
2383
    instdisk = {}
2384

    
2385
    for (nname, nres) in result.items():
2386
      disks = node_disks[nname]
2387

    
2388
      if nres.offline:
2389
        # No data from this node
2390
        data = len(disks) * [(False, "node offline")]
2391
      else:
2392
        msg = nres.fail_msg
2393
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2394
                 "while getting disk information: %s", msg)
2395
        if msg:
2396
          # No data from this node
2397
          data = len(disks) * [(False, msg)]
2398
        else:
2399
          data = []
2400
          for idx, i in enumerate(nres.payload):
2401
            if isinstance(i, (tuple, list)) and len(i) == 2:
2402
              data.append(i)
2403
            else:
2404
              logging.warning("Invalid result from node %s, entry %d: %s",
2405
                              nname, idx, i)
2406
              data.append((False, "Invalid result from the remote node"))
2407

    
2408
      for ((inst, _), status) in zip(disks, data):
2409
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2410

    
2411
    # Add empty entries for diskless instances.
2412
    for inst in diskless_instances:
2413
      assert inst not in instdisk
2414
      instdisk[inst] = {}
2415

    
2416
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2417
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2418
                      compat.all(isinstance(s, (tuple, list)) and
2419
                                 len(s) == 2 for s in statuses)
2420
                      for inst, nnames in instdisk.items()
2421
                      for nname, statuses in nnames.items())
2422
    if __debug__:
2423
      instdisk_keys = set(instdisk)
2424
      instanceinfo_keys = set(instanceinfo)
2425
      assert instdisk_keys == instanceinfo_keys, \
2426
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2427
         (instdisk_keys, instanceinfo_keys))
2428

    
2429
    return instdisk
2430

    
2431
  @staticmethod
2432
  def _SshNodeSelector(group_uuid, all_nodes):
2433
    """Create endless iterators for all potential SSH check hosts.
2434

2435
    """
2436
    nodes = [node for node in all_nodes
2437
             if (node.group != group_uuid and
2438
                 not node.offline)]
2439
    keyfunc = operator.attrgetter("group")
2440

    
2441
    return map(itertools.cycle,
2442
               [sorted(map(operator.attrgetter("name"), names))
2443
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2444
                                                  keyfunc)])
2445

    
2446
  @classmethod
2447
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2448
    """Choose which nodes should talk to which other nodes.
2449

2450
    We will make nodes contact all nodes in their group, and one node from
2451
    every other group.
2452

2453
    @warning: This algorithm has a known issue if one node group is much
2454
      smaller than others (e.g. just one node). In such a case all other
2455
      nodes will talk to the single node.
2456

2457
    """
2458
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2459
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2460

    
2461
    return (online_nodes,
2462
            dict((name, sorted([i.next() for i in sel]))
2463
                 for name in online_nodes))
2464

    
2465
  def BuildHooksEnv(self):
2466
    """Build hooks env.
2467

2468
    Cluster-Verify hooks just ran in the post phase and their failure makes
2469
    the output be logged in the verify output and the verification to fail.
2470

2471
    """
2472
    env = {
2473
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2474
      }
2475

    
2476
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2477
               for node in self.my_node_info.values())
2478

    
2479
    return env
2480

    
2481
  def BuildHooksNodes(self):
2482
    """Build hooks nodes.
2483

2484
    """
2485
    return ([], self.my_node_names)
2486

    
2487
  def Exec(self, feedback_fn):
2488
    """Verify integrity of the node group, performing various test on nodes.
2489

2490
    """
2491
    # This method has too many local variables. pylint: disable=R0914
2492
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2493

    
2494
    if not self.my_node_names:
2495
      # empty node group
2496
      feedback_fn("* Empty node group, skipping verification")
2497
      return True
2498

    
2499
    self.bad = False
2500
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2501
    verbose = self.op.verbose
2502
    self._feedback_fn = feedback_fn
2503

    
2504
    vg_name = self.cfg.GetVGName()
2505
    drbd_helper = self.cfg.GetDRBDHelper()
2506
    cluster = self.cfg.GetClusterInfo()
2507
    hypervisors = cluster.enabled_hypervisors
2508
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2509

    
2510
    i_non_redundant = [] # Non redundant instances
2511
    i_non_a_balanced = [] # Non auto-balanced instances
2512
    i_offline = 0 # Count of offline instances
2513
    n_offline = 0 # Count of offline nodes
2514
    n_drained = 0 # Count of nodes being drained
2515
    node_vol_should = {}
2516

    
2517
    # FIXME: verify OS list
2518

    
2519
    # File verification
2520
    filemap = ComputeAncillaryFiles(cluster, False)
2521

    
2522
    # do local checksums
2523
    master_node = self.master_node = self.cfg.GetMasterNode()
2524
    master_ip = self.cfg.GetMasterIP()
2525

    
2526
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2527

    
2528
    user_scripts = []
2529
    if self.cfg.GetUseExternalMipScript():
2530
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2531

    
2532
    node_verify_param = {
2533
      constants.NV_FILELIST:
2534
        map(vcluster.MakeVirtualPath,
2535
            utils.UniqueSequence(filename
2536
                                 for files in filemap
2537
                                 for filename in files)),
2538
      constants.NV_NODELIST:
2539
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2540
                                  self.all_node_info.values()),
2541
      constants.NV_HYPERVISOR: hypervisors,
2542
      constants.NV_HVPARAMS:
2543
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2544
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2545
                                 for node in node_data_list
2546
                                 if not node.offline],
2547
      constants.NV_INSTANCELIST: hypervisors,
2548
      constants.NV_VERSION: None,
2549
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2550
      constants.NV_NODESETUP: None,
2551
      constants.NV_TIME: None,
2552
      constants.NV_MASTERIP: (master_node, master_ip),
2553
      constants.NV_OSLIST: None,
2554
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2555
      constants.NV_USERSCRIPTS: user_scripts,
2556
      }
2557

    
2558
    if vg_name is not None:
2559
      node_verify_param[constants.NV_VGLIST] = None
2560
      node_verify_param[constants.NV_LVLIST] = vg_name
2561
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2562

    
2563
    if drbd_helper:
2564
      node_verify_param[constants.NV_DRBDLIST] = None
2565
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2566

    
2567
    if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2568
      # Load file storage paths only from master node
2569
      node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2570

    
2571
    # bridge checks
2572
    # FIXME: this needs to be changed per node-group, not cluster-wide
2573
    bridges = set()
2574
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2575
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2576
      bridges.add(default_nicpp[constants.NIC_LINK])
2577
    for instance in self.my_inst_info.values():
2578
      for nic in instance.nics:
2579
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2580
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2581
          bridges.add(full_nic[constants.NIC_LINK])
2582

    
2583
    if bridges:
2584
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2585

    
2586
    # Build our expected cluster state
2587
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2588
                                                 name=node.name,
2589
                                                 vm_capable=node.vm_capable))
2590
                      for node in node_data_list)
2591

    
2592
    # Gather OOB paths
2593
    oob_paths = []
2594
    for node in self.all_node_info.values():
2595
      path = SupportsOob(self.cfg, node)
2596
      if path and path not in oob_paths:
2597
        oob_paths.append(path)
2598

    
2599
    if oob_paths:
2600
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2601

    
2602
    for instance in self.my_inst_names:
2603
      inst_config = self.my_inst_info[instance]
2604
      if inst_config.admin_state == constants.ADMINST_OFFLINE:
2605
        i_offline += 1
2606

    
2607
      for nname in inst_config.all_nodes:
2608
        if nname not in node_image:
2609
          gnode = self.NodeImage(name=nname)
2610
          gnode.ghost = (nname not in self.all_node_info)
2611
          node_image[nname] = gnode
2612

    
2613
      inst_config.MapLVsByNode(node_vol_should)
2614

    
2615
      pnode = inst_config.primary_node
2616
      node_image[pnode].pinst.append(instance)
2617

    
2618
      for snode in inst_config.secondary_nodes:
2619
        nimg = node_image[snode]
2620
        nimg.sinst.append(instance)
2621
        if pnode not in nimg.sbp:
2622
          nimg.sbp[pnode] = []
2623
        nimg.sbp[pnode].append(instance)
2624

    
2625
    es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2626
    # The value of exclusive_storage should be the same across the group, so if
2627
    # it's True for at least a node, we act as if it were set for all the nodes
2628
    self._exclusive_storage = compat.any(es_flags.values())
2629
    if self._exclusive_storage:
2630
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2631

    
2632
    # At this point, we have the in-memory data structures complete,
2633
    # except for the runtime information, which we'll gather next
2634

    
2635
    # Due to the way our RPC system works, exact response times cannot be
2636
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2637
    # time before and after executing the request, we can at least have a time
2638
    # window.
2639
    nvinfo_starttime = time.time()
2640
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2641
                                           node_verify_param,
2642
                                           self.cfg.GetClusterName())
2643
    nvinfo_endtime = time.time()
2644

    
2645
    if self.extra_lv_nodes and vg_name is not None:
2646
      extra_lv_nvinfo = \
2647
          self.rpc.call_node_verify(self.extra_lv_nodes,
2648
                                    {constants.NV_LVLIST: vg_name},
2649
                                    self.cfg.GetClusterName())
2650
    else:
2651
      extra_lv_nvinfo = {}
2652

    
2653
    all_drbd_map = self.cfg.ComputeDRBDMap()
2654

    
2655
    feedback_fn("* Gathering disk information (%s nodes)" %
2656
                len(self.my_node_names))
2657
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2658
                                     self.my_inst_info)
2659

    
2660
    feedback_fn("* Verifying configuration file consistency")
2661

    
2662
    # If not all nodes are being checked, we need to make sure the master node
2663
    # and a non-checked vm_capable node are in the list.
2664
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2665
    if absent_nodes:
2666
      vf_nvinfo = all_nvinfo.copy()
2667
      vf_node_info = list(self.my_node_info.values())
2668
      additional_nodes = []
2669
      if master_node not in self.my_node_info:
2670
        additional_nodes.append(master_node)
2671
        vf_node_info.append(self.all_node_info[master_node])
2672
      # Add the first vm_capable node we find which is not included,
2673
      # excluding the master node (which we already have)
2674
      for node in absent_nodes:
2675
        nodeinfo = self.all_node_info[node]
2676
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2677
            node != master_node):
2678
          additional_nodes.append(node)
2679
          vf_node_info.append(self.all_node_info[node])
2680
          break
2681
      key = constants.NV_FILELIST
2682
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2683
                                                 {key: node_verify_param[key]},
2684
                                                 self.cfg.GetClusterName()))
2685
    else:
2686
      vf_nvinfo = all_nvinfo
2687
      vf_node_info = self.my_node_info.values()
2688

    
2689
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2690

    
2691
    feedback_fn("* Verifying node status")
2692

    
2693
    refos_img = None
2694

    
2695
    for node_i in node_data_list:
2696
      node = node_i.name
2697
      nimg = node_image[node]
2698

    
2699
      if node_i.offline:
2700
        if verbose:
2701
          feedback_fn("* Skipping offline node %s" % (node,))
2702
        n_offline += 1
2703
        continue
2704

    
2705
      if node == master_node:
2706
        ntype = "master"
2707
      elif node_i.master_candidate:
2708
        ntype = "master candidate"
2709
      elif node_i.drained:
2710
        ntype = "drained"
2711
        n_drained += 1
2712
      else:
2713
        ntype = "regular"
2714
      if verbose:
2715
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2716

    
2717
      msg = all_nvinfo[node].fail_msg
2718
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2719
               msg)
2720
      if msg:
2721
        nimg.rpc_fail = True
2722
        continue
2723

    
2724
      nresult = all_nvinfo[node].payload
2725

    
2726
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2727
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2728
      self._VerifyNodeNetwork(node_i, nresult)
2729
      self._VerifyNodeUserScripts(node_i, nresult)
2730
      self._VerifyOob(node_i, nresult)
2731
      self._VerifyFileStoragePaths(node_i, nresult,
2732
                                   node == master_node)
2733

    
2734
      if nimg.vm_capable:
2735
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2736
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2737
                             all_drbd_map)
2738

    
2739
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2740
        self._UpdateNodeInstances(node_i, nresult, nimg)
2741
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2742
        self._UpdateNodeOS(node_i, nresult, nimg)
2743

    
2744
        if not nimg.os_fail:
2745
          if refos_img is None:
2746
            refos_img = nimg
2747
          self._VerifyNodeOS(node_i, nimg, refos_img)
2748
        self._VerifyNodeBridges(node_i, nresult, bridges)
2749

    
2750
        # Check whether all running instancies are primary for the node. (This
2751
        # can no longer be done from _VerifyInstance below, since some of the
2752
        # wrong instances could be from other node groups.)
2753
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2754

    
2755
        for inst in non_primary_inst:
2756
          test = inst in self.all_inst_info
2757
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2758
                   "instance should not run on node %s", node_i.name)
2759
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2760
                   "node is running unknown instance %s", inst)
2761

    
2762
    self._VerifyGroupLVM(node_image, vg_name)
2763

    
2764
    for node, result in extra_lv_nvinfo.items():
2765
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2766
                              node_image[node], vg_name)
2767

    
2768
    feedback_fn("* Verifying instance status")
2769
    for instance in self.my_inst_names:
2770
      if verbose:
2771
        feedback_fn("* Verifying instance %s" % instance)
2772
      inst_config = self.my_inst_info[instance]
2773
      self._VerifyInstance(instance, inst_config, node_image,
2774
                           instdisk[instance])
2775

    
2776
      # If the instance is non-redundant we cannot survive losing its primary
2777
      # node, so we are not N+1 compliant.
2778
      if inst_config.disk_template not in constants.DTS_MIRRORED:
2779
        i_non_redundant.append(instance)
2780

    
2781
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2782
        i_non_a_balanced.append(instance)
2783

    
2784
    feedback_fn("* Verifying orphan volumes")
2785
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2786

    
2787
    # We will get spurious "unknown volume" warnings if any node of this group
2788
    # is secondary for an instance whose primary is in another group. To avoid
2789
    # them, we find these instances and add their volumes to node_vol_should.
2790
    for inst in self.all_inst_info.values():
2791
      for secondary in inst.secondary_nodes:
2792
        if (secondary in self.my_node_info
2793
            and inst.name not in self.my_inst_info):
2794
          inst.MapLVsByNode(node_vol_should)
2795
          break
2796

    
2797
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2798

    
2799
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2800
      feedback_fn("* Verifying N+1 Memory redundancy")
2801
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2802

    
2803
    feedback_fn("* Other Notes")
2804
    if i_non_redundant:
2805
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2806
                  % len(i_non_redundant))
2807

    
2808
    if i_non_a_balanced:
2809
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2810
                  % len(i_non_a_balanced))
2811

    
2812
    if i_offline:
2813
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2814

    
2815
    if n_offline:
2816
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2817

    
2818
    if n_drained:
2819
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2820

    
2821
    return not self.bad
2822

    
2823
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2824
    """Analyze the post-hooks' result
2825

2826
    This method analyses the hook result, handles it, and sends some
2827
    nicely-formatted feedback back to the user.
2828

2829
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2830
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2831
    @param hooks_results: the results of the multi-node hooks rpc call
2832
    @param feedback_fn: function used send feedback back to the caller
2833
    @param lu_result: previous Exec result
2834
    @return: the new Exec result, based on the previous result
2835
        and hook results
2836

2837
    """
2838
    # We only really run POST phase hooks, only for non-empty groups,
2839
    # and are only interested in their results
2840
    if not self.my_node_names:
2841
      # empty node group
2842
      pass
2843
    elif phase == constants.HOOKS_PHASE_POST:
2844
      # Used to change hooks' output to proper indentation
2845
      feedback_fn("* Hooks Results")
2846
      assert hooks_results, "invalid result from hooks"
2847

    
2848
      for node_name in hooks_results:
2849
        res = hooks_results[node_name]
2850
        msg = res.fail_msg
2851
        test = msg and not res.offline
2852
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2853
                      "Communication failure in hooks execution: %s", msg)
2854
        if res.offline or msg:
2855
          # No need to investigate payload if node is offline or gave
2856
          # an error.
2857
          continue
2858
        for script, hkr, output in res.payload:
2859
          test = hkr == constants.HKR_FAIL
2860
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2861
                        "Script %s failed, output:", script)
2862
          if test:
2863
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2864
            feedback_fn("%s" % output)
2865
            lu_result = False
2866

    
2867
    return lu_result
2868

    
2869

    
2870
class LUClusterVerifyDisks(NoHooksLU):
2871
  """Verifies the cluster disks status.
2872

2873
  """
2874
  REQ_BGL = False
2875

    
2876
  def ExpandNames(self):
2877
    self.share_locks = ShareAll()
2878
    self.needed_locks = {
2879
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2880
      }
2881

    
2882
  def Exec(self, feedback_fn):
2883
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2884

    
2885
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2886
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2887
                           for group in group_names])