Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 75f2ff7d

History | View | Annotate | Download (104.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.name,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, master_params.name)
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
170
                                                     master_params, ems)
171
    if result.fail_msg:
172
      self.LogWarning("Error disabling the master IP address: %s",
173
                      result.fail_msg)
174

    
175
    return master_params.name
176

    
177

    
178
class LUClusterPostInit(LogicalUnit):
179
  """Logical unit for running hooks after cluster initialization.
180

181
  """
182
  HPATH = "cluster-init"
183
  HTYPE = constants.HTYPE_CLUSTER
184

    
185
  def BuildHooksEnv(self):
186
    """Build hooks env.
187

188
    """
189
    return {
190
      "OP_TARGET": self.cfg.GetClusterName(),
191
      }
192

    
193
  def BuildHooksNodes(self):
194
    """Build hooks nodes.
195

196
    """
197
    return ([], [self.cfg.GetMasterNode()])
198

    
199
  def Exec(self, feedback_fn):
200
    """Nothing to do.
201

202
    """
203
    return True
204

    
205

    
206
class ClusterQuery(QueryBase):
207
  FIELDS = query.CLUSTER_FIELDS
208

    
209
  #: Do not sort (there is only one item)
210
  SORT_FIELD = None
211

    
212
  def ExpandNames(self, lu):
213
    lu.needed_locks = {}
214

    
215
    # The following variables interact with _QueryBase._GetNames
216
    self.wanted = locking.ALL_SET
217
    self.do_locking = self.use_locking
218

    
219
    if self.do_locking:
220
      raise errors.OpPrereqError("Can not use locking for cluster queries",
221
                                 errors.ECODE_INVAL)
222

    
223
  def DeclareLocks(self, lu, level):
224
    pass
225

    
226
  def _GetQueryData(self, lu):
227
    """Computes the list of nodes and their attributes.
228

229
    """
230
    # Locking is not used
231
    assert not (compat.any(lu.glm.is_owned(level)
232
                           for level in locking.LEVELS
233
                           if level != locking.LEVEL_CLUSTER) or
234
                self.do_locking or self.use_locking)
235

    
236
    if query.CQ_CONFIG in self.requested_data:
237
      cluster = lu.cfg.GetClusterInfo()
238
    else:
239
      cluster = NotImplemented
240

    
241
    if query.CQ_QUEUE_DRAINED in self.requested_data:
242
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243
    else:
244
      drain_flag = NotImplemented
245

    
246
    if query.CQ_WATCHER_PAUSE in self.requested_data:
247
      master_name = lu.cfg.GetMasterNode()
248

    
249
      result = lu.rpc.call_get_watcher_pause(master_name)
250
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
251
                   master_name)
252

    
253
      watcher_pause = result.payload
254
    else:
255
      watcher_pause = NotImplemented
256

    
257
    return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
258

    
259

    
260
class LUClusterQuery(NoHooksLU):
261
  """Query cluster configuration.
262

263
  """
264
  REQ_BGL = False
265

    
266
  def ExpandNames(self):
267
    self.needed_locks = {}
268

    
269
  def Exec(self, feedback_fn):
270
    """Return cluster config.
271

272
    """
273
    cluster = self.cfg.GetClusterInfo()
274
    os_hvp = {}
275

    
276
    # Filter just for enabled hypervisors
277
    for os_name, hv_dict in cluster.os_hvp.items():
278
      os_hvp[os_name] = {}
279
      for hv_name, hv_params in hv_dict.items():
280
        if hv_name in cluster.enabled_hypervisors:
281
          os_hvp[os_name][hv_name] = hv_params
282

    
283
    # Convert ip_family to ip_version
284
    primary_ip_version = constants.IP4_VERSION
285
    if cluster.primary_ip_family == netutils.IP6Address.family:
286
      primary_ip_version = constants.IP6_VERSION
287

    
288
    result = {
289
      "software_version": constants.RELEASE_VERSION,
290
      "protocol_version": constants.PROTOCOL_VERSION,
291
      "config_version": constants.CONFIG_VERSION,
292
      "os_api_version": max(constants.OS_API_VERSIONS),
293
      "export_version": constants.EXPORT_VERSION,
294
      "architecture": runtime.GetArchInfo(),
295
      "name": cluster.cluster_name,
296
      "master": cluster.master_node,
297
      "default_hypervisor": cluster.primary_hypervisor,
298
      "enabled_hypervisors": cluster.enabled_hypervisors,
299
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
300
                        for hypervisor_name in cluster.enabled_hypervisors]),
301
      "os_hvp": os_hvp,
302
      "beparams": cluster.beparams,
303
      "osparams": cluster.osparams,
304
      "ipolicy": cluster.ipolicy,
305
      "nicparams": cluster.nicparams,
306
      "ndparams": cluster.ndparams,
307
      "diskparams": cluster.diskparams,
308
      "candidate_pool_size": cluster.candidate_pool_size,
309
      "master_netdev": cluster.master_netdev,
310
      "master_netmask": cluster.master_netmask,
311
      "use_external_mip_script": cluster.use_external_mip_script,
312
      "volume_group_name": cluster.volume_group_name,
313
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
314
      "file_storage_dir": cluster.file_storage_dir,
315
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
316
      "maintain_node_health": cluster.maintain_node_health,
317
      "ctime": cluster.ctime,
318
      "mtime": cluster.mtime,
319
      "uuid": cluster.uuid,
320
      "tags": list(cluster.GetTags()),
321
      "uid_pool": cluster.uid_pool,
322
      "default_iallocator": cluster.default_iallocator,
323
      "reserved_lvs": cluster.reserved_lvs,
324
      "primary_ip_version": primary_ip_version,
325
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
326
      "hidden_os": cluster.hidden_os,
327
      "blacklisted_os": cluster.blacklisted_os,
328
      "enabled_disk_templates": cluster.enabled_disk_templates,
329
      }
330

    
331
    return result
332

    
333

    
334
class LUClusterRedistConf(NoHooksLU):
335
  """Force the redistribution of cluster configuration.
336

337
  This is a very simple LU.
338

339
  """
340
  REQ_BGL = False
341

    
342
  def ExpandNames(self):
343
    self.needed_locks = {
344
      locking.LEVEL_NODE: locking.ALL_SET,
345
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
346
    }
347
    self.share_locks = ShareAll()
348

    
349
  def Exec(self, feedback_fn):
350
    """Redistribute the configuration.
351

352
    """
353
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
354
    RedistributeAncillaryFiles(self)
355

    
356

    
357
class LUClusterRename(LogicalUnit):
358
  """Rename the cluster.
359

360
  """
361
  HPATH = "cluster-rename"
362
  HTYPE = constants.HTYPE_CLUSTER
363

    
364
  def BuildHooksEnv(self):
365
    """Build hooks env.
366

367
    """
368
    return {
369
      "OP_TARGET": self.cfg.GetClusterName(),
370
      "NEW_NAME": self.op.name,
371
      }
372

    
373
  def BuildHooksNodes(self):
374
    """Build hooks nodes.
375

376
    """
377
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
378

    
379
  def CheckPrereq(self):
380
    """Verify that the passed name is a valid one.
381

382
    """
383
    hostname = netutils.GetHostname(name=self.op.name,
384
                                    family=self.cfg.GetPrimaryIPFamily())
385

    
386
    new_name = hostname.name
387
    self.ip = new_ip = hostname.ip
388
    old_name = self.cfg.GetClusterName()
389
    old_ip = self.cfg.GetMasterIP()
390
    if new_name == old_name and new_ip == old_ip:
391
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
392
                                 " cluster has changed",
393
                                 errors.ECODE_INVAL)
394
    if new_ip != old_ip:
395
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
396
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
397
                                   " reachable on the network" %
398
                                   new_ip, errors.ECODE_NOTUNIQUE)
399

    
400
    self.op.name = new_name
401

    
402
  def Exec(self, feedback_fn):
403
    """Rename the cluster.
404

405
    """
406
    clustername = self.op.name
407
    new_ip = self.ip
408

    
409
    # shutdown the master IP
410
    master_params = self.cfg.GetMasterNetworkParameters()
411
    ems = self.cfg.GetUseExternalMipScript()
412
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
413
                                                     master_params, ems)
414
    result.Raise("Could not disable the master role")
415

    
416
    try:
417
      cluster = self.cfg.GetClusterInfo()
418
      cluster.cluster_name = clustername
419
      cluster.master_ip = new_ip
420
      self.cfg.Update(cluster, feedback_fn)
421

    
422
      # update the known hosts file
423
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
424
      node_list = self.cfg.GetOnlineNodeList()
425
      try:
426
        node_list.remove(master_params.name)
427
      except ValueError:
428
        pass
429
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
430
    finally:
431
      master_params.ip = new_ip
432
      result = self.rpc.call_node_activate_master_ip(master_params.name,
433
                                                     master_params, ems)
434
      msg = result.fail_msg
435
      if msg:
436
        self.LogWarning("Could not re-enable the master role on"
437
                        " the master, please restart manually: %s", msg)
438

    
439
    return clustername
440

    
441

    
442
class LUClusterRepairDiskSizes(NoHooksLU):
443
  """Verifies the cluster disks sizes.
444

445
  """
446
  REQ_BGL = False
447

    
448
  def ExpandNames(self):
449
    if self.op.instances:
450
      self.wanted_names = GetWantedInstances(self, self.op.instances)
451
      # Not getting the node allocation lock as only a specific set of
452
      # instances (and their nodes) is going to be acquired
453
      self.needed_locks = {
454
        locking.LEVEL_NODE_RES: [],
455
        locking.LEVEL_INSTANCE: self.wanted_names,
456
        }
457
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
458
    else:
459
      self.wanted_names = None
460
      self.needed_locks = {
461
        locking.LEVEL_NODE_RES: locking.ALL_SET,
462
        locking.LEVEL_INSTANCE: locking.ALL_SET,
463

    
464
        # This opcode is acquires the node locks for all instances
465
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
466
        }
467

    
468
    self.share_locks = {
469
      locking.LEVEL_NODE_RES: 1,
470
      locking.LEVEL_INSTANCE: 0,
471
      locking.LEVEL_NODE_ALLOC: 1,
472
      }
473

    
474
  def DeclareLocks(self, level):
475
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
476
      self._LockInstancesNodes(primary_only=True, level=level)
477

    
478
  def CheckPrereq(self):
479
    """Check prerequisites.
480

481
    This only checks the optional instance list against the existing names.
482

483
    """
484
    if self.wanted_names is None:
485
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
486

    
487
    self.wanted_instances = \
488
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
489

    
490
  def _EnsureChildSizes(self, disk):
491
    """Ensure children of the disk have the needed disk size.
492

493
    This is valid mainly for DRBD8 and fixes an issue where the
494
    children have smaller disk size.
495

496
    @param disk: an L{ganeti.objects.Disk} object
497

498
    """
499
    if disk.dev_type == constants.LD_DRBD8:
500
      assert disk.children, "Empty children for DRBD8?"
501
      fchild = disk.children[0]
502
      mismatch = fchild.size < disk.size
503
      if mismatch:
504
        self.LogInfo("Child disk has size %d, parent %d, fixing",
505
                     fchild.size, disk.size)
506
        fchild.size = disk.size
507

    
508
      # and we recurse on this child only, not on the metadev
509
      return self._EnsureChildSizes(fchild) or mismatch
510
    else:
511
      return False
512

    
513
  def Exec(self, feedback_fn):
514
    """Verify the size of cluster disks.
515

516
    """
517
    # TODO: check child disks too
518
    # TODO: check differences in size between primary/secondary nodes
519
    per_node_disks = {}
520
    for instance in self.wanted_instances:
521
      pnode = instance.primary_node
522
      if pnode not in per_node_disks:
523
        per_node_disks[pnode] = []
524
      for idx, disk in enumerate(instance.disks):
525
        per_node_disks[pnode].append((instance, idx, disk))
526

    
527
    assert not (frozenset(per_node_disks.keys()) -
528
                self.owned_locks(locking.LEVEL_NODE_RES)), \
529
      "Not owning correct locks"
530
    assert not self.owned_locks(locking.LEVEL_NODE)
531

    
532
    changed = []
533
    for node, dskl in per_node_disks.items():
534
      newl = [v[2].Copy() for v in dskl]
535
      for dsk in newl:
536
        self.cfg.SetDiskID(dsk, node)
537
      result = self.rpc.call_blockdev_getsize(node, newl)
538
      if result.fail_msg:
539
        self.LogWarning("Failure in blockdev_getsize call to node"
540
                        " %s, ignoring", node)
541
        continue
542
      if len(result.payload) != len(dskl):
543
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
544
                        " result.payload=%s", node, len(dskl), result.payload)
545
        self.LogWarning("Invalid result from node %s, ignoring node results",
546
                        node)
547
        continue
548
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
549
        if size is None:
550
          self.LogWarning("Disk %d of instance %s did not return size"
551
                          " information, ignoring", idx, instance.name)
552
          continue
553
        if not isinstance(size, (int, long)):
554
          self.LogWarning("Disk %d of instance %s did not return valid"
555
                          " size information, ignoring", idx, instance.name)
556
          continue
557
        size = size >> 20
558
        if size != disk.size:
559
          self.LogInfo("Disk %d of instance %s has mismatched size,"
560
                       " correcting: recorded %d, actual %d", idx,
561
                       instance.name, disk.size, size)
562
          disk.size = size
563
          self.cfg.Update(instance, feedback_fn)
564
          changed.append((instance.name, idx, size))
565
        if self._EnsureChildSizes(disk):
566
          self.cfg.Update(instance, feedback_fn)
567
          changed.append((instance.name, idx, disk.size))
568
    return changed
569

    
570

    
571
def _ValidateNetmask(cfg, netmask):
572
  """Checks if a netmask is valid.
573

574
  @type cfg: L{config.ConfigWriter}
575
  @param cfg: The cluster configuration
576
  @type netmask: int
577
  @param netmask: the netmask to be verified
578
  @raise errors.OpPrereqError: if the validation fails
579

580
  """
581
  ip_family = cfg.GetPrimaryIPFamily()
582
  try:
583
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
584
  except errors.ProgrammerError:
585
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
586
                               ip_family, errors.ECODE_INVAL)
587
  if not ipcls.ValidateNetmask(netmask):
588
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
589
                               (netmask), errors.ECODE_INVAL)
590

    
591

    
592
class LUClusterSetParams(LogicalUnit):
593
  """Change the parameters of the cluster.
594

595
  """
596
  HPATH = "cluster-modify"
597
  HTYPE = constants.HTYPE_CLUSTER
598
  REQ_BGL = False
599

    
600
  def CheckArguments(self):
601
    """Check parameters
602

603
    """
604
    if self.op.uid_pool:
605
      uidpool.CheckUidPool(self.op.uid_pool)
606

    
607
    if self.op.add_uids:
608
      uidpool.CheckUidPool(self.op.add_uids)
609

    
610
    if self.op.remove_uids:
611
      uidpool.CheckUidPool(self.op.remove_uids)
612

    
613
    if self.op.master_netmask is not None:
614
      _ValidateNetmask(self.cfg, self.op.master_netmask)
615

    
616
    if self.op.diskparams:
617
      for dt_params in self.op.diskparams.values():
618
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
619
      try:
620
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
621
      except errors.OpPrereqError, err:
622
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
623
                                   errors.ECODE_INVAL)
624

    
625
  def ExpandNames(self):
626
    # FIXME: in the future maybe other cluster params won't require checking on
627
    # all nodes to be modified.
628
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
629
    # resource locks the right thing, shouldn't it be the BGL instead?
630
    self.needed_locks = {
631
      locking.LEVEL_NODE: locking.ALL_SET,
632
      locking.LEVEL_INSTANCE: locking.ALL_SET,
633
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
634
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
635
    }
636
    self.share_locks = ShareAll()
637

    
638
  def BuildHooksEnv(self):
639
    """Build hooks env.
640

641
    """
642
    return {
643
      "OP_TARGET": self.cfg.GetClusterName(),
644
      "NEW_VG_NAME": self.op.vg_name,
645
      }
646

    
647
  def BuildHooksNodes(self):
648
    """Build hooks nodes.
649

650
    """
651
    mn = self.cfg.GetMasterNode()
652
    return ([mn], [mn])
653

    
654
  def CheckPrereq(self):
655
    """Check prerequisites.
656

657
    This checks whether the given params don't conflict and
658
    if the given volume group is valid.
659

660
    """
661
    if self.op.vg_name is not None and not self.op.vg_name:
662
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
663
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
664
                                   " instances exist", errors.ECODE_INVAL)
665

    
666
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
667
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
668
        raise errors.OpPrereqError("Cannot disable drbd helper while"
669
                                   " drbd-based instances exist",
670
                                   errors.ECODE_INVAL)
671

    
672
    node_list = self.owned_locks(locking.LEVEL_NODE)
673

    
674
    vm_capable_nodes = [node.name
675
                        for node in self.cfg.GetAllNodesInfo().values()
676
                        if node.name in node_list and node.vm_capable]
677

    
678
    # if vg_name not None, checks given volume group on all nodes
679
    if self.op.vg_name:
680
      vglist = self.rpc.call_vg_list(vm_capable_nodes)
681
      for node in vm_capable_nodes:
682
        msg = vglist[node].fail_msg
683
        if msg:
684
          # ignoring down node
685
          self.LogWarning("Error while gathering data on node %s"
686
                          " (ignoring node): %s", node, msg)
687
          continue
688
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
689
                                              self.op.vg_name,
690
                                              constants.MIN_VG_SIZE)
691
        if vgstatus:
692
          raise errors.OpPrereqError("Error on node '%s': %s" %
693
                                     (node, vgstatus), errors.ECODE_ENVIRON)
694

    
695
    if self.op.drbd_helper:
696
      # checks given drbd helper on all nodes
697
      helpers = self.rpc.call_drbd_helper(node_list)
698
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
699
        if ninfo.offline:
700
          self.LogInfo("Not checking drbd helper on offline node %s", node)
701
          continue
702
        msg = helpers[node].fail_msg
703
        if msg:
704
          raise errors.OpPrereqError("Error checking drbd helper on node"
705
                                     " '%s': %s" % (node, msg),
706
                                     errors.ECODE_ENVIRON)
707
        node_helper = helpers[node].payload
708
        if node_helper != self.op.drbd_helper:
709
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
710
                                     (node, node_helper), errors.ECODE_ENVIRON)
711

    
712
    self.cluster = cluster = self.cfg.GetClusterInfo()
713
    # validate params changes
714
    if self.op.beparams:
715
      objects.UpgradeBeParams(self.op.beparams)
716
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
717
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
718

    
719
    if self.op.ndparams:
720
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
721
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
722

    
723
      # TODO: we need a more general way to handle resetting
724
      # cluster-level parameters to default values
725
      if self.new_ndparams["oob_program"] == "":
726
        self.new_ndparams["oob_program"] = \
727
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
728

    
729
    if self.op.hv_state:
730
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
731
                                           self.cluster.hv_state_static)
732
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
733
                               for hv, values in new_hv_state.items())
734

    
735
    if self.op.disk_state:
736
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
737
                                               self.cluster.disk_state_static)
738
      self.new_disk_state = \
739
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
740
                            for name, values in svalues.items()))
741
             for storage, svalues in new_disk_state.items())
742

    
743
    if self.op.ipolicy:
744
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
745
                                           group_policy=False)
746

    
747
      all_instances = self.cfg.GetAllInstancesInfo().values()
748
      violations = set()
749
      for group in self.cfg.GetAllNodeGroupsInfo().values():
750
        instances = frozenset([inst for inst in all_instances
751
                               if compat.any(node in group.members
752
                                             for node in inst.all_nodes)])
753
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
754
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
755
        new = ComputeNewInstanceViolations(ipol,
756
                                           new_ipolicy, instances, self.cfg)
757
        if new:
758
          violations.update(new)
759

    
760
      if violations:
761
        self.LogWarning("After the ipolicy change the following instances"
762
                        " violate them: %s",
763
                        utils.CommaJoin(utils.NiceSort(violations)))
764

    
765
    if self.op.nicparams:
766
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
767
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
768
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
769
      nic_errors = []
770

    
771
      # check all instances for consistency
772
      for instance in self.cfg.GetAllInstancesInfo().values():
773
        for nic_idx, nic in enumerate(instance.nics):
774
          params_copy = copy.deepcopy(nic.nicparams)
775
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
776

    
777
          # check parameter syntax
778
          try:
779
            objects.NIC.CheckParameterSyntax(params_filled)
780
          except errors.ConfigurationError, err:
781
            nic_errors.append("Instance %s, nic/%d: %s" %
782
                              (instance.name, nic_idx, err))
783

    
784
          # if we're moving instances to routed, check that they have an ip
785
          target_mode = params_filled[constants.NIC_MODE]
786
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
787
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
788
                              " address" % (instance.name, nic_idx))
789
      if nic_errors:
790
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
791
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
792

    
793
    # hypervisor list/parameters
794
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
795
    if self.op.hvparams:
796
      for hv_name, hv_dict in self.op.hvparams.items():
797
        if hv_name not in self.new_hvparams:
798
          self.new_hvparams[hv_name] = hv_dict
799
        else:
800
          self.new_hvparams[hv_name].update(hv_dict)
801

    
802
    # disk template parameters
803
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
804
    if self.op.diskparams:
805
      for dt_name, dt_params in self.op.diskparams.items():
806
        if dt_name not in self.op.diskparams:
807
          self.new_diskparams[dt_name] = dt_params
808
        else:
809
          self.new_diskparams[dt_name].update(dt_params)
810

    
811
    # os hypervisor parameters
812
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
813
    if self.op.os_hvp:
814
      for os_name, hvs in self.op.os_hvp.items():
815
        if os_name not in self.new_os_hvp:
816
          self.new_os_hvp[os_name] = hvs
817
        else:
818
          for hv_name, hv_dict in hvs.items():
819
            if hv_dict is None:
820
              # Delete if it exists
821
              self.new_os_hvp[os_name].pop(hv_name, None)
822
            elif hv_name not in self.new_os_hvp[os_name]:
823
              self.new_os_hvp[os_name][hv_name] = hv_dict
824
            else:
825
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
826

    
827
    # os parameters
828
    self.new_osp = objects.FillDict(cluster.osparams, {})
829
    if self.op.osparams:
830
      for os_name, osp in self.op.osparams.items():
831
        if os_name not in self.new_osp:
832
          self.new_osp[os_name] = {}
833

    
834
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
835
                                                 use_none=True)
836

    
837
        if not self.new_osp[os_name]:
838
          # we removed all parameters
839
          del self.new_osp[os_name]
840
        else:
841
          # check the parameter validity (remote check)
842
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
843
                        os_name, self.new_osp[os_name])
844

    
845
    # changes to the hypervisor list
846
    if self.op.enabled_hypervisors is not None:
847
      self.hv_list = self.op.enabled_hypervisors
848
      for hv in self.hv_list:
849
        # if the hypervisor doesn't already exist in the cluster
850
        # hvparams, we initialize it to empty, and then (in both
851
        # cases) we make sure to fill the defaults, as we might not
852
        # have a complete defaults list if the hypervisor wasn't
853
        # enabled before
854
        if hv not in new_hvp:
855
          new_hvp[hv] = {}
856
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
857
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
858
    else:
859
      self.hv_list = cluster.enabled_hypervisors
860

    
861
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
862
      # either the enabled list has changed, or the parameters have, validate
863
      for hv_name, hv_params in self.new_hvparams.items():
864
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
865
            (self.op.enabled_hypervisors and
866
             hv_name in self.op.enabled_hypervisors)):
867
          # either this is a new hypervisor, or its parameters have changed
868
          hv_class = hypervisor.GetHypervisorClass(hv_name)
869
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
870
          hv_class.CheckParameterSyntax(hv_params)
871
          CheckHVParams(self, node_list, hv_name, hv_params)
872

    
873
    self._CheckDiskTemplateConsistency()
874

    
875
    if self.op.os_hvp:
876
      # no need to check any newly-enabled hypervisors, since the
877
      # defaults have already been checked in the above code-block
878
      for os_name, os_hvp in self.new_os_hvp.items():
879
        for hv_name, hv_params in os_hvp.items():
880
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
881
          # we need to fill in the new os_hvp on top of the actual hv_p
882
          cluster_defaults = self.new_hvparams.get(hv_name, {})
883
          new_osp = objects.FillDict(cluster_defaults, hv_params)
884
          hv_class = hypervisor.GetHypervisorClass(hv_name)
885
          hv_class.CheckParameterSyntax(new_osp)
886
          CheckHVParams(self, node_list, hv_name, new_osp)
887

    
888
    if self.op.default_iallocator:
889
      alloc_script = utils.FindFile(self.op.default_iallocator,
890
                                    constants.IALLOCATOR_SEARCH_PATH,
891
                                    os.path.isfile)
892
      if alloc_script is None:
893
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
894
                                   " specified" % self.op.default_iallocator,
895
                                   errors.ECODE_INVAL)
896

    
897
  def _CheckDiskTemplateConsistency(self):
898
    """Check whether the disk templates that are going to be disabled
899
       are still in use by some instances.
900

901
    """
902
    if self.op.enabled_disk_templates:
903
      cluster = self.cfg.GetClusterInfo()
904
      instances = self.cfg.GetAllInstancesInfo()
905

    
906
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
907
        - set(self.op.enabled_disk_templates)
908
      for instance in instances.itervalues():
909
        if instance.disk_template in disk_templates_to_remove:
910
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
911
                                     " because instance '%s' is using it." %
912
                                     (instance.disk_template, instance.name))
913

    
914
  def Exec(self, feedback_fn):
915
    """Change the parameters of the cluster.
916

917
    """
918
    if self.op.vg_name is not None:
919
      new_volume = self.op.vg_name
920
      if not new_volume:
921
        new_volume = None
922
      if new_volume != self.cfg.GetVGName():
923
        self.cfg.SetVGName(new_volume)
924
      else:
925
        feedback_fn("Cluster LVM configuration already in desired"
926
                    " state, not changing")
927
    if self.op.drbd_helper is not None:
928
      new_helper = self.op.drbd_helper
929
      if not new_helper:
930
        new_helper = None
931
      if new_helper != self.cfg.GetDRBDHelper():
932
        self.cfg.SetDRBDHelper(new_helper)
933
      else:
934
        feedback_fn("Cluster DRBD helper already in desired state,"
935
                    " not changing")
936
    if self.op.hvparams:
937
      self.cluster.hvparams = self.new_hvparams
938
    if self.op.os_hvp:
939
      self.cluster.os_hvp = self.new_os_hvp
940
    if self.op.enabled_hypervisors is not None:
941
      self.cluster.hvparams = self.new_hvparams
942
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
943
    if self.op.enabled_disk_templates:
944
      self.cluster.enabled_disk_templates = \
945
        list(set(self.op.enabled_disk_templates))
946
    if self.op.beparams:
947
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
948
    if self.op.nicparams:
949
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
950
    if self.op.ipolicy:
951
      self.cluster.ipolicy = self.new_ipolicy
952
    if self.op.osparams:
953
      self.cluster.osparams = self.new_osp
954
    if self.op.ndparams:
955
      self.cluster.ndparams = self.new_ndparams
956
    if self.op.diskparams:
957
      self.cluster.diskparams = self.new_diskparams
958
    if self.op.hv_state:
959
      self.cluster.hv_state_static = self.new_hv_state
960
    if self.op.disk_state:
961
      self.cluster.disk_state_static = self.new_disk_state
962

    
963
    if self.op.candidate_pool_size is not None:
964
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
965
      # we need to update the pool size here, otherwise the save will fail
966
      AdjustCandidatePool(self, [])
967

    
968
    if self.op.maintain_node_health is not None:
969
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
970
        feedback_fn("Note: CONFD was disabled at build time, node health"
971
                    " maintenance is not useful (still enabling it)")
972
      self.cluster.maintain_node_health = self.op.maintain_node_health
973

    
974
    if self.op.modify_etc_hosts is not None:
975
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
976

    
977
    if self.op.prealloc_wipe_disks is not None:
978
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
979

    
980
    if self.op.add_uids is not None:
981
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
982

    
983
    if self.op.remove_uids is not None:
984
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
985

    
986
    if self.op.uid_pool is not None:
987
      self.cluster.uid_pool = self.op.uid_pool
988

    
989
    if self.op.default_iallocator is not None:
990
      self.cluster.default_iallocator = self.op.default_iallocator
991

    
992
    if self.op.reserved_lvs is not None:
993
      self.cluster.reserved_lvs = self.op.reserved_lvs
994

    
995
    if self.op.use_external_mip_script is not None:
996
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
997

    
998
    def helper_os(aname, mods, desc):
999
      desc += " OS list"
1000
      lst = getattr(self.cluster, aname)
1001
      for key, val in mods:
1002
        if key == constants.DDM_ADD:
1003
          if val in lst:
1004
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1005
          else:
1006
            lst.append(val)
1007
        elif key == constants.DDM_REMOVE:
1008
          if val in lst:
1009
            lst.remove(val)
1010
          else:
1011
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1012
        else:
1013
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1014

    
1015
    if self.op.hidden_os:
1016
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1017

    
1018
    if self.op.blacklisted_os:
1019
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1020

    
1021
    if self.op.master_netdev:
1022
      master_params = self.cfg.GetMasterNetworkParameters()
1023
      ems = self.cfg.GetUseExternalMipScript()
1024
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1025
                  self.cluster.master_netdev)
1026
      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1027
                                                       master_params, ems)
1028
      if not self.op.force:
1029
        result.Raise("Could not disable the master ip")
1030
      else:
1031
        if result.fail_msg:
1032
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1033
                 result.fail_msg)
1034
          feedback_fn(msg)
1035
      feedback_fn("Changing master_netdev from %s to %s" %
1036
                  (master_params.netdev, self.op.master_netdev))
1037
      self.cluster.master_netdev = self.op.master_netdev
1038

    
1039
    if self.op.master_netmask:
1040
      master_params = self.cfg.GetMasterNetworkParameters()
1041
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1042
      result = self.rpc.call_node_change_master_netmask(master_params.name,
1043
                                                        master_params.netmask,
1044
                                                        self.op.master_netmask,
1045
                                                        master_params.ip,
1046
                                                        master_params.netdev)
1047
      if result.fail_msg:
1048
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
1049
        feedback_fn(msg)
1050

    
1051
      self.cluster.master_netmask = self.op.master_netmask
1052

    
1053
    self.cfg.Update(self.cluster, feedback_fn)
1054

    
1055
    if self.op.master_netdev:
1056
      master_params = self.cfg.GetMasterNetworkParameters()
1057
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1058
                  self.op.master_netdev)
1059
      ems = self.cfg.GetUseExternalMipScript()
1060
      result = self.rpc.call_node_activate_master_ip(master_params.name,
1061
                                                     master_params, ems)
1062
      if result.fail_msg:
1063
        self.LogWarning("Could not re-enable the master ip on"
1064
                        " the master, please restart manually: %s",
1065
                        result.fail_msg)
1066

    
1067

    
1068
class LUClusterVerify(NoHooksLU):
1069
  """Submits all jobs necessary to verify the cluster.
1070

1071
  """
1072
  REQ_BGL = False
1073

    
1074
  def ExpandNames(self):
1075
    self.needed_locks = {}
1076

    
1077
  def Exec(self, feedback_fn):
1078
    jobs = []
1079

    
1080
    if self.op.group_name:
1081
      groups = [self.op.group_name]
1082
      depends_fn = lambda: None
1083
    else:
1084
      groups = self.cfg.GetNodeGroupList()
1085

    
1086
      # Verify global configuration
1087
      jobs.append([
1088
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1089
        ])
1090

    
1091
      # Always depend on global verification
1092
      depends_fn = lambda: [(-len(jobs), [])]
1093

    
1094
    jobs.extend(
1095
      [opcodes.OpClusterVerifyGroup(group_name=group,
1096
                                    ignore_errors=self.op.ignore_errors,
1097
                                    depends=depends_fn())]
1098
      for group in groups)
1099

    
1100
    # Fix up all parameters
1101
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1102
      op.debug_simulate_errors = self.op.debug_simulate_errors
1103
      op.verbose = self.op.verbose
1104
      op.error_codes = self.op.error_codes
1105
      try:
1106
        op.skip_checks = self.op.skip_checks
1107
      except AttributeError:
1108
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1109

    
1110
    return ResultWithJobs(jobs)
1111

    
1112

    
1113
class _VerifyErrors(object):
1114
  """Mix-in for cluster/group verify LUs.
1115

1116
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1117
  self.op and self._feedback_fn to be available.)
1118

1119
  """
1120

    
1121
  ETYPE_FIELD = "code"
1122
  ETYPE_ERROR = "ERROR"
1123
  ETYPE_WARNING = "WARNING"
1124

    
1125
  def _Error(self, ecode, item, msg, *args, **kwargs):
1126
    """Format an error message.
1127

1128
    Based on the opcode's error_codes parameter, either format a
1129
    parseable error code, or a simpler error string.
1130

1131
    This must be called only from Exec and functions called from Exec.
1132

1133
    """
1134
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1135
    itype, etxt, _ = ecode
1136
    # If the error code is in the list of ignored errors, demote the error to a
1137
    # warning
1138
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1139
      ltype = self.ETYPE_WARNING
1140
    # first complete the msg
1141
    if args:
1142
      msg = msg % args
1143
    # then format the whole message
1144
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1145
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1146
    else:
1147
      if item:
1148
        item = " " + item
1149
      else:
1150
        item = ""
1151
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1152
    # and finally report it via the feedback_fn
1153
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1154
    # do not mark the operation as failed for WARN cases only
1155
    if ltype == self.ETYPE_ERROR:
1156
      self.bad = True
1157

    
1158
  def _ErrorIf(self, cond, *args, **kwargs):
1159
    """Log an error message if the passed condition is True.
1160

1161
    """
1162
    if (bool(cond)
1163
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1164
      self._Error(*args, **kwargs)
1165

    
1166

    
1167
def _VerifyCertificate(filename):
1168
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1169

1170
  @type filename: string
1171
  @param filename: Path to PEM file
1172

1173
  """
1174
  try:
1175
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1176
                                           utils.ReadFile(filename))
1177
  except Exception, err: # pylint: disable=W0703
1178
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1179
            "Failed to load X509 certificate %s: %s" % (filename, err))
1180

    
1181
  (errcode, msg) = \
1182
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1183
                                constants.SSL_CERT_EXPIRATION_ERROR)
1184

    
1185
  if msg:
1186
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1187
  else:
1188
    fnamemsg = None
1189

    
1190
  if errcode is None:
1191
    return (None, fnamemsg)
1192
  elif errcode == utils.CERT_WARNING:
1193
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1194
  elif errcode == utils.CERT_ERROR:
1195
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1196

    
1197
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1198

    
1199

    
1200
def _GetAllHypervisorParameters(cluster, instances):
1201
  """Compute the set of all hypervisor parameters.
1202

1203
  @type cluster: L{objects.Cluster}
1204
  @param cluster: the cluster object
1205
  @param instances: list of L{objects.Instance}
1206
  @param instances: additional instances from which to obtain parameters
1207
  @rtype: list of (origin, hypervisor, parameters)
1208
  @return: a list with all parameters found, indicating the hypervisor they
1209
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1210

1211
  """
1212
  hvp_data = []
1213

    
1214
  for hv_name in cluster.enabled_hypervisors:
1215
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1216

    
1217
  for os_name, os_hvp in cluster.os_hvp.items():
1218
    for hv_name, hv_params in os_hvp.items():
1219
      if hv_params:
1220
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1221
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1222

    
1223
  # TODO: collapse identical parameter values in a single one
1224
  for instance in instances:
1225
    if instance.hvparams:
1226
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1227
                       cluster.FillHV(instance)))
1228

    
1229
  return hvp_data
1230

    
1231

    
1232
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1233
  """Verifies the cluster config.
1234

1235
  """
1236
  REQ_BGL = False
1237

    
1238
  def _VerifyHVP(self, hvp_data):
1239
    """Verifies locally the syntax of the hypervisor parameters.
1240

1241
    """
1242
    for item, hv_name, hv_params in hvp_data:
1243
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1244
             (item, hv_name))
1245
      try:
1246
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1247
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1248
        hv_class.CheckParameterSyntax(hv_params)
1249
      except errors.GenericError, err:
1250
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1251

    
1252
  def ExpandNames(self):
1253
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1254
    self.share_locks = ShareAll()
1255

    
1256
  def CheckPrereq(self):
1257
    """Check prerequisites.
1258

1259
    """
1260
    # Retrieve all information
1261
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1262
    self.all_node_info = self.cfg.GetAllNodesInfo()
1263
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1264

    
1265
  def Exec(self, feedback_fn):
1266
    """Verify integrity of cluster, performing various test on nodes.
1267

1268
    """
1269
    self.bad = False
1270
    self._feedback_fn = feedback_fn
1271

    
1272
    feedback_fn("* Verifying cluster config")
1273

    
1274
    for msg in self.cfg.VerifyConfig():
1275
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1276

    
1277
    feedback_fn("* Verifying cluster certificate files")
1278

    
1279
    for cert_filename in pathutils.ALL_CERT_FILES:
1280
      (errcode, msg) = _VerifyCertificate(cert_filename)
1281
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1282

    
1283
    self._ErrorIf(not utils.CanRead(constants.CONFD_USER,
1284
                                    pathutils.NODED_CERT_FILE),
1285
                  constants.CV_ECLUSTERCERT,
1286
                  None,
1287
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1288
                    constants.CONFD_USER + " user")
1289

    
1290
    feedback_fn("* Verifying hypervisor parameters")
1291

    
1292
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1293
                                                self.all_inst_info.values()))
1294

    
1295
    feedback_fn("* Verifying all nodes belong to an existing group")
1296

    
1297
    # We do this verification here because, should this bogus circumstance
1298
    # occur, it would never be caught by VerifyGroup, which only acts on
1299
    # nodes/instances reachable from existing node groups.
1300

    
1301
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1302
                         if node.group not in self.all_group_info)
1303

    
1304
    dangling_instances = {}
1305
    no_node_instances = []
1306

    
1307
    for inst in self.all_inst_info.values():
1308
      if inst.primary_node in dangling_nodes:
1309
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1310
      elif inst.primary_node not in self.all_node_info:
1311
        no_node_instances.append(inst.name)
1312

    
1313
    pretty_dangling = [
1314
        "%s (%s)" %
1315
        (node.name,
1316
         utils.CommaJoin(dangling_instances.get(node.name,
1317
                                                ["no instances"])))
1318
        for node in dangling_nodes]
1319

    
1320
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1321
                  None,
1322
                  "the following nodes (and their instances) belong to a non"
1323
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1324

    
1325
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1326
                  None,
1327
                  "the following instances have a non-existing primary-node:"
1328
                  " %s", utils.CommaJoin(no_node_instances))
1329

    
1330
    return not self.bad
1331

    
1332

    
1333
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1334
  """Verifies the status of a node group.
1335

1336
  """
1337
  HPATH = "cluster-verify"
1338
  HTYPE = constants.HTYPE_CLUSTER
1339
  REQ_BGL = False
1340

    
1341
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1342

    
1343
  class NodeImage(object):
1344
    """A class representing the logical and physical status of a node.
1345

1346
    @type name: string
1347
    @ivar name: the node name to which this object refers
1348
    @ivar volumes: a structure as returned from
1349
        L{ganeti.backend.GetVolumeList} (runtime)
1350
    @ivar instances: a list of running instances (runtime)
1351
    @ivar pinst: list of configured primary instances (config)
1352
    @ivar sinst: list of configured secondary instances (config)
1353
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1354
        instances for which this node is secondary (config)
1355
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1356
    @ivar dfree: free disk, as reported by the node (runtime)
1357
    @ivar offline: the offline status (config)
1358
    @type rpc_fail: boolean
1359
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1360
        not whether the individual keys were correct) (runtime)
1361
    @type lvm_fail: boolean
1362
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1363
    @type hyp_fail: boolean
1364
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1365
    @type ghost: boolean
1366
    @ivar ghost: whether this is a known node or not (config)
1367
    @type os_fail: boolean
1368
    @ivar os_fail: whether the RPC call didn't return valid OS data
1369
    @type oslist: list
1370
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1371
    @type vm_capable: boolean
1372
    @ivar vm_capable: whether the node can host instances
1373
    @type pv_min: float
1374
    @ivar pv_min: size in MiB of the smallest PVs
1375
    @type pv_max: float
1376
    @ivar pv_max: size in MiB of the biggest PVs
1377

1378
    """
1379
    def __init__(self, offline=False, name=None, vm_capable=True):
1380
      self.name = name
1381
      self.volumes = {}
1382
      self.instances = []
1383
      self.pinst = []
1384
      self.sinst = []
1385
      self.sbp = {}
1386
      self.mfree = 0
1387
      self.dfree = 0
1388
      self.offline = offline
1389
      self.vm_capable = vm_capable
1390
      self.rpc_fail = False
1391
      self.lvm_fail = False
1392
      self.hyp_fail = False
1393
      self.ghost = False
1394
      self.os_fail = False
1395
      self.oslist = {}
1396
      self.pv_min = None
1397
      self.pv_max = None
1398

    
1399
  def ExpandNames(self):
1400
    # This raises errors.OpPrereqError on its own:
1401
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1402

    
1403
    # Get instances in node group; this is unsafe and needs verification later
1404
    inst_names = \
1405
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1406

    
1407
    self.needed_locks = {
1408
      locking.LEVEL_INSTANCE: inst_names,
1409
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1410
      locking.LEVEL_NODE: [],
1411

    
1412
      # This opcode is run by watcher every five minutes and acquires all nodes
1413
      # for a group. It doesn't run for a long time, so it's better to acquire
1414
      # the node allocation lock as well.
1415
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1416
      }
1417

    
1418
    self.share_locks = ShareAll()
1419

    
1420
  def DeclareLocks(self, level):
1421
    if level == locking.LEVEL_NODE:
1422
      # Get members of node group; this is unsafe and needs verification later
1423
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1424

    
1425
      all_inst_info = self.cfg.GetAllInstancesInfo()
1426

    
1427
      # In Exec(), we warn about mirrored instances that have primary and
1428
      # secondary living in separate node groups. To fully verify that
1429
      # volumes for these instances are healthy, we will need to do an
1430
      # extra call to their secondaries. We ensure here those nodes will
1431
      # be locked.
1432
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1433
        # Important: access only the instances whose lock is owned
1434
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1435
          nodes.update(all_inst_info[inst].secondary_nodes)
1436

    
1437
      self.needed_locks[locking.LEVEL_NODE] = nodes
1438

    
1439
  def CheckPrereq(self):
1440
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1441
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1442

    
1443
    group_nodes = set(self.group_info.members)
1444
    group_instances = \
1445
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1446

    
1447
    unlocked_nodes = \
1448
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1449

    
1450
    unlocked_instances = \
1451
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1452

    
1453
    if unlocked_nodes:
1454
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1455
                                 utils.CommaJoin(unlocked_nodes),
1456
                                 errors.ECODE_STATE)
1457

    
1458
    if unlocked_instances:
1459
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1460
                                 utils.CommaJoin(unlocked_instances),
1461
                                 errors.ECODE_STATE)
1462

    
1463
    self.all_node_info = self.cfg.GetAllNodesInfo()
1464
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1465

    
1466
    self.my_node_names = utils.NiceSort(group_nodes)
1467
    self.my_inst_names = utils.NiceSort(group_instances)
1468

    
1469
    self.my_node_info = dict((name, self.all_node_info[name])
1470
                             for name in self.my_node_names)
1471

    
1472
    self.my_inst_info = dict((name, self.all_inst_info[name])
1473
                             for name in self.my_inst_names)
1474

    
1475
    # We detect here the nodes that will need the extra RPC calls for verifying
1476
    # split LV volumes; they should be locked.
1477
    extra_lv_nodes = set()
1478

    
1479
    for inst in self.my_inst_info.values():
1480
      if inst.disk_template in constants.DTS_INT_MIRROR:
1481
        for nname in inst.all_nodes:
1482
          if self.all_node_info[nname].group != self.group_uuid:
1483
            extra_lv_nodes.add(nname)
1484

    
1485
    unlocked_lv_nodes = \
1486
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1487

    
1488
    if unlocked_lv_nodes:
1489
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1490
                                 utils.CommaJoin(unlocked_lv_nodes),
1491
                                 errors.ECODE_STATE)
1492
    self.extra_lv_nodes = list(extra_lv_nodes)
1493

    
1494
  def _VerifyNode(self, ninfo, nresult):
1495
    """Perform some basic validation on data returned from a node.
1496

1497
      - check the result data structure is well formed and has all the
1498
        mandatory fields
1499
      - check ganeti version
1500

1501
    @type ninfo: L{objects.Node}
1502
    @param ninfo: the node to check
1503
    @param nresult: the results from the node
1504
    @rtype: boolean
1505
    @return: whether overall this call was successful (and we can expect
1506
         reasonable values in the respose)
1507

1508
    """
1509
    node = ninfo.name
1510
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1511

    
1512
    # main result, nresult should be a non-empty dict
1513
    test = not nresult or not isinstance(nresult, dict)
1514
    _ErrorIf(test, constants.CV_ENODERPC, node,
1515
                  "unable to verify node: no data returned")
1516
    if test:
1517
      return False
1518

    
1519
    # compares ganeti version
1520
    local_version = constants.PROTOCOL_VERSION
1521
    remote_version = nresult.get("version", None)
1522
    test = not (remote_version and
1523
                isinstance(remote_version, (list, tuple)) and
1524
                len(remote_version) == 2)
1525
    _ErrorIf(test, constants.CV_ENODERPC, node,
1526
             "connection to node returned invalid data")
1527
    if test:
1528
      return False
1529

    
1530
    test = local_version != remote_version[0]
1531
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1532
             "incompatible protocol versions: master %s,"
1533
             " node %s", local_version, remote_version[0])
1534
    if test:
1535
      return False
1536

    
1537
    # node seems compatible, we can actually try to look into its results
1538

    
1539
    # full package version
1540
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1541
                  constants.CV_ENODEVERSION, node,
1542
                  "software version mismatch: master %s, node %s",
1543
                  constants.RELEASE_VERSION, remote_version[1],
1544
                  code=self.ETYPE_WARNING)
1545

    
1546
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1547
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1548
      for hv_name, hv_result in hyp_result.iteritems():
1549
        test = hv_result is not None
1550
        _ErrorIf(test, constants.CV_ENODEHV, node,
1551
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1552

    
1553
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1554
    if ninfo.vm_capable and isinstance(hvp_result, list):
1555
      for item, hv_name, hv_result in hvp_result:
1556
        _ErrorIf(True, constants.CV_ENODEHV, node,
1557
                 "hypervisor %s parameter verify failure (source %s): %s",
1558
                 hv_name, item, hv_result)
1559

    
1560
    test = nresult.get(constants.NV_NODESETUP,
1561
                       ["Missing NODESETUP results"])
1562
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1563
             "; ".join(test))
1564

    
1565
    return True
1566

    
1567
  def _VerifyNodeTime(self, ninfo, nresult,
1568
                      nvinfo_starttime, nvinfo_endtime):
1569
    """Check the node time.
1570

1571
    @type ninfo: L{objects.Node}
1572
    @param ninfo: the node to check
1573
    @param nresult: the remote results for the node
1574
    @param nvinfo_starttime: the start time of the RPC call
1575
    @param nvinfo_endtime: the end time of the RPC call
1576

1577
    """
1578
    node = ninfo.name
1579
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1580

    
1581
    ntime = nresult.get(constants.NV_TIME, None)
1582
    try:
1583
      ntime_merged = utils.MergeTime(ntime)
1584
    except (ValueError, TypeError):
1585
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1586
      return
1587

    
1588
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1589
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1590
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1591
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1592
    else:
1593
      ntime_diff = None
1594

    
1595
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1596
             "Node time diverges by at least %s from master node time",
1597
             ntime_diff)
1598

    
1599
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1600
    """Check the node LVM results and update info for cross-node checks.
1601

1602
    @type ninfo: L{objects.Node}
1603
    @param ninfo: the node to check
1604
    @param nresult: the remote results for the node
1605
    @param vg_name: the configured VG name
1606
    @type nimg: L{NodeImage}
1607
    @param nimg: node image
1608

1609
    """
1610
    if vg_name is None:
1611
      return
1612

    
1613
    node = ninfo.name
1614
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1615

    
1616
    # checks vg existence and size > 20G
1617
    vglist = nresult.get(constants.NV_VGLIST, None)
1618
    test = not vglist
1619
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1620
    if not test:
1621
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1622
                                            constants.MIN_VG_SIZE)
1623
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1624

    
1625
    # Check PVs
1626
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1627
    for em in errmsgs:
1628
      self._Error(constants.CV_ENODELVM, node, em)
1629
    if pvminmax is not None:
1630
      (nimg.pv_min, nimg.pv_max) = pvminmax
1631

    
1632
  def _VerifyGroupLVM(self, node_image, vg_name):
1633
    """Check cross-node consistency in LVM.
1634

1635
    @type node_image: dict
1636
    @param node_image: info about nodes, mapping from node to names to
1637
      L{NodeImage} objects
1638
    @param vg_name: the configured VG name
1639

1640
    """
1641
    if vg_name is None:
1642
      return
1643

    
1644
    # Only exlcusive storage needs this kind of checks
1645
    if not self._exclusive_storage:
1646
      return
1647

    
1648
    # exclusive_storage wants all PVs to have the same size (approximately),
1649
    # if the smallest and the biggest ones are okay, everything is fine.
1650
    # pv_min is None iff pv_max is None
1651
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1652
    if not vals:
1653
      return
1654
    (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1655
    (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1656
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1657
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1658
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1659
                  " on %s, biggest (%s MB) is on %s",
1660
                  pvmin, minnode, pvmax, maxnode)
1661

    
1662
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1663
    """Check the node bridges.
1664

1665
    @type ninfo: L{objects.Node}
1666
    @param ninfo: the node to check
1667
    @param nresult: the remote results for the node
1668
    @param bridges: the expected list of bridges
1669

1670
    """
1671
    if not bridges:
1672
      return
1673

    
1674
    node = ninfo.name
1675
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1676

    
1677
    missing = nresult.get(constants.NV_BRIDGES, None)
1678
    test = not isinstance(missing, list)
1679
    _ErrorIf(test, constants.CV_ENODENET, node,
1680
             "did not return valid bridge information")
1681
    if not test:
1682
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1683
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1684

    
1685
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1686
    """Check the results of user scripts presence and executability on the node
1687

1688
    @type ninfo: L{objects.Node}
1689
    @param ninfo: the node to check
1690
    @param nresult: the remote results for the node
1691

1692
    """
1693
    node = ninfo.name
1694

    
1695
    test = not constants.NV_USERSCRIPTS in nresult
1696
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1697
                  "did not return user scripts information")
1698

    
1699
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1700
    if not test:
1701
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1702
                    "user scripts not present or not executable: %s" %
1703
                    utils.CommaJoin(sorted(broken_scripts)))
1704

    
1705
  def _VerifyNodeNetwork(self, ninfo, nresult):
1706
    """Check the node network connectivity results.
1707

1708
    @type ninfo: L{objects.Node}
1709
    @param ninfo: the node to check
1710
    @param nresult: the remote results for the node
1711

1712
    """
1713
    node = ninfo.name
1714
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1715

    
1716
    test = constants.NV_NODELIST not in nresult
1717
    _ErrorIf(test, constants.CV_ENODESSH, node,
1718
             "node hasn't returned node ssh connectivity data")
1719
    if not test:
1720
      if nresult[constants.NV_NODELIST]:
1721
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1722
          _ErrorIf(True, constants.CV_ENODESSH, node,
1723
                   "ssh communication with node '%s': %s", a_node, a_msg)
1724

    
1725
    test = constants.NV_NODENETTEST not in nresult
1726
    _ErrorIf(test, constants.CV_ENODENET, node,
1727
             "node hasn't returned node tcp connectivity data")
1728
    if not test:
1729
      if nresult[constants.NV_NODENETTEST]:
1730
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1731
        for anode in nlist:
1732
          _ErrorIf(True, constants.CV_ENODENET, node,
1733
                   "tcp communication with node '%s': %s",
1734
                   anode, nresult[constants.NV_NODENETTEST][anode])
1735

    
1736
    test = constants.NV_MASTERIP not in nresult
1737
    _ErrorIf(test, constants.CV_ENODENET, node,
1738
             "node hasn't returned node master IP reachability data")
1739
    if not test:
1740
      if not nresult[constants.NV_MASTERIP]:
1741
        if node == self.master_node:
1742
          msg = "the master node cannot reach the master IP (not configured?)"
1743
        else:
1744
          msg = "cannot reach the master IP"
1745
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
1746

    
1747
  def _VerifyInstance(self, instance, inst_config, node_image,
1748
                      diskstatus):
1749
    """Verify an instance.
1750

1751
    This function checks to see if the required block devices are
1752
    available on the instance's node, and that the nodes are in the correct
1753
    state.
1754

1755
    """
1756
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1757
    pnode = inst_config.primary_node
1758
    pnode_img = node_image[pnode]
1759
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1760

    
1761
    node_vol_should = {}
1762
    inst_config.MapLVsByNode(node_vol_should)
1763

    
1764
    cluster = self.cfg.GetClusterInfo()
1765
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1766
                                                            self.group_info)
1767
    err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1768
    _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1769
             code=self.ETYPE_WARNING)
1770

    
1771
    for node in node_vol_should:
1772
      n_img = node_image[node]
1773
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1774
        # ignore missing volumes on offline or broken nodes
1775
        continue
1776
      for volume in node_vol_should[node]:
1777
        test = volume not in n_img.volumes
1778
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1779
                 "volume %s missing on node %s", volume, node)
1780

    
1781
    if inst_config.admin_state == constants.ADMINST_UP:
1782
      test = instance not in pnode_img.instances and not pnode_img.offline
1783
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1784
               "instance not running on its primary node %s",
1785
               pnode)
1786
      _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1787
               "instance is marked as running and lives on offline node %s",
1788
               pnode)
1789

    
1790
    diskdata = [(nname, success, status, idx)
1791
                for (nname, disks) in diskstatus.items()
1792
                for idx, (success, status) in enumerate(disks)]
1793

    
1794
    for nname, success, bdev_status, idx in diskdata:
1795
      # the 'ghost node' construction in Exec() ensures that we have a
1796
      # node here
1797
      snode = node_image[nname]
1798
      bad_snode = snode.ghost or snode.offline
1799
      _ErrorIf(inst_config.disks_active and
1800
               not success and not bad_snode,
1801
               constants.CV_EINSTANCEFAULTYDISK, instance,
1802
               "couldn't retrieve status for disk/%s on %s: %s",
1803
               idx, nname, bdev_status)
1804
      _ErrorIf((inst_config.disks_active and
1805
                success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1806
               constants.CV_EINSTANCEFAULTYDISK, instance,
1807
               "disk/%s on %s is faulty", idx, nname)
1808

    
1809
    _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1810
             constants.CV_ENODERPC, pnode, "instance %s, connection to"
1811
             " primary node failed", instance)
1812

    
1813
    _ErrorIf(len(inst_config.secondary_nodes) > 1,
1814
             constants.CV_EINSTANCELAYOUT,
1815
             instance, "instance has multiple secondary nodes: %s",
1816
             utils.CommaJoin(inst_config.secondary_nodes),
1817
             code=self.ETYPE_WARNING)
1818

    
1819
    if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1820
      # Disk template not compatible with exclusive_storage: no instance
1821
      # node should have the flag set
1822
      es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1823
                                                     inst_config.all_nodes)
1824
      es_nodes = [n for (n, es) in es_flags.items()
1825
                  if es]
1826
      _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance,
1827
               "instance has template %s, which is not supported on nodes"
1828
               " that have exclusive storage set: %s",
1829
               inst_config.disk_template, utils.CommaJoin(es_nodes))
1830

    
1831
    if inst_config.disk_template in constants.DTS_INT_MIRROR:
1832
      instance_nodes = utils.NiceSort(inst_config.all_nodes)
1833
      instance_groups = {}
1834

    
1835
      for node in instance_nodes:
1836
        instance_groups.setdefault(self.all_node_info[node].group,
1837
                                   []).append(node)
1838

    
1839
      pretty_list = [
1840
        "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1841
        # Sort so that we always list the primary node first.
1842
        for group, nodes in sorted(instance_groups.items(),
1843
                                   key=lambda (_, nodes): pnode in nodes,
1844
                                   reverse=True)]
1845

    
1846
      self._ErrorIf(len(instance_groups) > 1,
1847
                    constants.CV_EINSTANCESPLITGROUPS,
1848
                    instance, "instance has primary and secondary nodes in"
1849
                    " different groups: %s", utils.CommaJoin(pretty_list),
1850
                    code=self.ETYPE_WARNING)
1851

    
1852
    inst_nodes_offline = []
1853
    for snode in inst_config.secondary_nodes:
1854
      s_img = node_image[snode]
1855
      _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1856
               snode, "instance %s, connection to secondary node failed",
1857
               instance)
1858

    
1859
      if s_img.offline:
1860
        inst_nodes_offline.append(snode)
1861

    
1862
    # warn that the instance lives on offline nodes
1863
    _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1864
             "instance has offline secondary node(s) %s",
1865
             utils.CommaJoin(inst_nodes_offline))
1866
    # ... or ghost/non-vm_capable nodes
1867
    for node in inst_config.all_nodes:
1868
      _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1869
               instance, "instance lives on ghost node %s", node)
1870
      _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1871
               instance, "instance lives on non-vm_capable node %s", node)
1872

    
1873
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1874
    """Verify if there are any unknown volumes in the cluster.
1875

1876
    The .os, .swap and backup volumes are ignored. All other volumes are
1877
    reported as unknown.
1878

1879
    @type reserved: L{ganeti.utils.FieldSet}
1880
    @param reserved: a FieldSet of reserved volume names
1881

1882
    """
1883
    for node, n_img in node_image.items():
1884
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1885
          self.all_node_info[node].group != self.group_uuid):
1886
        # skip non-healthy nodes
1887
        continue
1888
      for volume in n_img.volumes:
1889
        test = ((node not in node_vol_should or
1890
                volume not in node_vol_should[node]) and
1891
                not reserved.Matches(volume))
1892
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1893
                      "volume %s is unknown", volume)
1894

    
1895
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1896
    """Verify N+1 Memory Resilience.
1897

1898
    Check that if one single node dies we can still start all the
1899
    instances it was primary for.
1900

1901
    """
1902
    cluster_info = self.cfg.GetClusterInfo()
1903
    for node, n_img in node_image.items():
1904
      # This code checks that every node which is now listed as
1905
      # secondary has enough memory to host all instances it is
1906
      # supposed to should a single other node in the cluster fail.
1907
      # FIXME: not ready for failover to an arbitrary node
1908
      # FIXME: does not support file-backed instances
1909
      # WARNING: we currently take into account down instances as well
1910
      # as up ones, considering that even if they're down someone
1911
      # might want to start them even in the event of a node failure.
1912
      if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1913
        # we're skipping nodes marked offline and nodes in other groups from
1914
        # the N+1 warning, since most likely we don't have good memory
1915
        # infromation from them; we already list instances living on such
1916
        # nodes, and that's enough warning
1917
        continue
1918
      #TODO(dynmem): also consider ballooning out other instances
1919
      for prinode, instances in n_img.sbp.items():
1920
        needed_mem = 0
1921
        for instance in instances:
1922
          bep = cluster_info.FillBE(instance_cfg[instance])
1923
          if bep[constants.BE_AUTO_BALANCE]:
1924
            needed_mem += bep[constants.BE_MINMEM]
1925
        test = n_img.mfree < needed_mem
1926
        self._ErrorIf(test, constants.CV_ENODEN1, node,
1927
                      "not enough memory to accomodate instance failovers"
1928
                      " should node %s fail (%dMiB needed, %dMiB available)",
1929
                      prinode, needed_mem, n_img.mfree)
1930

    
1931
  @classmethod
1932
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1933
                   (files_all, files_opt, files_mc, files_vm)):
1934
    """Verifies file checksums collected from all nodes.
1935

1936
    @param errorif: Callback for reporting errors
1937
    @param nodeinfo: List of L{objects.Node} objects
1938
    @param master_node: Name of master node
1939
    @param all_nvinfo: RPC results
1940

1941
    """
1942
    # Define functions determining which nodes to consider for a file
1943
    files2nodefn = [
1944
      (files_all, None),
1945
      (files_mc, lambda node: (node.master_candidate or
1946
                               node.name == master_node)),
1947
      (files_vm, lambda node: node.vm_capable),
1948
      ]
1949

    
1950
    # Build mapping from filename to list of nodes which should have the file
1951
    nodefiles = {}
1952
    for (files, fn) in files2nodefn:
1953
      if fn is None:
1954
        filenodes = nodeinfo
1955
      else:
1956
        filenodes = filter(fn, nodeinfo)
1957
      nodefiles.update((filename,
1958
                        frozenset(map(operator.attrgetter("name"), filenodes)))
1959
                       for filename in files)
1960

    
1961
    assert set(nodefiles) == (files_all | files_mc | files_vm)
1962

    
1963
    fileinfo = dict((filename, {}) for filename in nodefiles)
1964
    ignore_nodes = set()
1965

    
1966
    for node in nodeinfo:
1967
      if node.offline:
1968
        ignore_nodes.add(node.name)
1969
        continue
1970

    
1971
      nresult = all_nvinfo[node.name]
1972

    
1973
      if nresult.fail_msg or not nresult.payload:
1974
        node_files = None
1975
      else:
1976
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
1977
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
1978
                          for (key, value) in fingerprints.items())
1979
        del fingerprints
1980

    
1981
      test = not (node_files and isinstance(node_files, dict))
1982
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
1983
              "Node did not return file checksum data")
1984
      if test:
1985
        ignore_nodes.add(node.name)
1986
        continue
1987

    
1988
      # Build per-checksum mapping from filename to nodes having it
1989
      for (filename, checksum) in node_files.items():
1990
        assert filename in nodefiles
1991
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
1992

    
1993
    for (filename, checksums) in fileinfo.items():
1994
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1995

    
1996
      # Nodes having the file
1997
      with_file = frozenset(node_name
1998
                            for nodes in fileinfo[filename].values()
1999
                            for node_name in nodes) - ignore_nodes
2000

    
2001
      expected_nodes = nodefiles[filename] - ignore_nodes
2002

    
2003
      # Nodes missing file
2004
      missing_file = expected_nodes - with_file
2005

    
2006
      if filename in files_opt:
2007
        # All or no nodes
2008
        errorif(missing_file and missing_file != expected_nodes,
2009
                constants.CV_ECLUSTERFILECHECK, None,
2010
                "File %s is optional, but it must exist on all or no"
2011
                " nodes (not found on %s)",
2012
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2013
      else:
2014
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2015
                "File %s is missing from node(s) %s", filename,
2016
                utils.CommaJoin(utils.NiceSort(missing_file)))
2017

    
2018
        # Warn if a node has a file it shouldn't
2019
        unexpected = with_file - expected_nodes
2020
        errorif(unexpected,
2021
                constants.CV_ECLUSTERFILECHECK, None,
2022
                "File %s should not exist on node(s) %s",
2023
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2024

    
2025
      # See if there are multiple versions of the file
2026
      test = len(checksums) > 1
2027
      if test:
2028
        variants = ["variant %s on %s" %
2029
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2030
                    for (idx, (checksum, nodes)) in
2031
                      enumerate(sorted(checksums.items()))]
2032
      else:
2033
        variants = []
2034

    
2035
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2036
              "File %s found with %s different checksums (%s)",
2037
              filename, len(checksums), "; ".join(variants))
2038

    
2039
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2040
                      drbd_map):
2041
    """Verifies and the node DRBD status.
2042

2043
    @type ninfo: L{objects.Node}
2044
    @param ninfo: the node to check
2045
    @param nresult: the remote results for the node
2046
    @param instanceinfo: the dict of instances
2047
    @param drbd_helper: the configured DRBD usermode helper
2048
    @param drbd_map: the DRBD map as returned by
2049
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2050

2051
    """
2052
    node = ninfo.name
2053
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2054

    
2055
    if drbd_helper:
2056
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2057
      test = (helper_result is None)
2058
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2059
               "no drbd usermode helper returned")
2060
      if helper_result:
2061
        status, payload = helper_result
2062
        test = not status
2063
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2064
                 "drbd usermode helper check unsuccessful: %s", payload)
2065
        test = status and (payload != drbd_helper)
2066
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2067
                 "wrong drbd usermode helper: %s", payload)
2068

    
2069
    # compute the DRBD minors
2070
    node_drbd = {}
2071
    for minor, instance in drbd_map[node].items():
2072
      test = instance not in instanceinfo
2073
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2074
               "ghost instance '%s' in temporary DRBD map", instance)
2075
        # ghost instance should not be running, but otherwise we
2076
        # don't give double warnings (both ghost instance and
2077
        # unallocated minor in use)
2078
      if test:
2079
        node_drbd[minor] = (instance, False)
2080
      else:
2081
        instance = instanceinfo[instance]
2082
        node_drbd[minor] = (instance.name, instance.disks_active)
2083

    
2084
    # and now check them
2085
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2086
    test = not isinstance(used_minors, (tuple, list))
2087
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2088
             "cannot parse drbd status file: %s", str(used_minors))
2089
    if test:
2090
      # we cannot check drbd status
2091
      return
2092

    
2093
    for minor, (iname, must_exist) in node_drbd.items():
2094
      test = minor not in used_minors and must_exist
2095
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2096
               "drbd minor %d of instance %s is not active", minor, iname)
2097
    for minor in used_minors:
2098
      test = minor not in node_drbd
2099
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2100
               "unallocated drbd minor %d is in use", minor)
2101

    
2102
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2103
    """Builds the node OS structures.
2104

2105
    @type ninfo: L{objects.Node}
2106
    @param ninfo: the node to check
2107
    @param nresult: the remote results for the node
2108
    @param nimg: the node image object
2109

2110
    """
2111
    node = ninfo.name
2112
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2113

    
2114
    remote_os = nresult.get(constants.NV_OSLIST, None)
2115
    test = (not isinstance(remote_os, list) or
2116
            not compat.all(isinstance(v, list) and len(v) == 7
2117
                           for v in remote_os))
2118

    
2119
    _ErrorIf(test, constants.CV_ENODEOS, node,
2120
             "node hasn't returned valid OS data")
2121

    
2122
    nimg.os_fail = test
2123

    
2124
    if test:
2125
      return
2126

    
2127
    os_dict = {}
2128

    
2129
    for (name, os_path, status, diagnose,
2130
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2131

    
2132
      if name not in os_dict:
2133
        os_dict[name] = []
2134

    
2135
      # parameters is a list of lists instead of list of tuples due to
2136
      # JSON lacking a real tuple type, fix it:
2137
      parameters = [tuple(v) for v in parameters]
2138
      os_dict[name].append((os_path, status, diagnose,
2139
                            set(variants), set(parameters), set(api_ver)))
2140

    
2141
    nimg.oslist = os_dict
2142

    
2143
  def _VerifyNodeOS(self, ninfo, nimg, base):
2144
    """Verifies the node OS list.
2145

2146
    @type ninfo: L{objects.Node}
2147
    @param ninfo: the node to check
2148
    @param nimg: the node image object
2149
    @param base: the 'template' node we match against (e.g. from the master)
2150

2151
    """
2152
    node = ninfo.name
2153
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2154

    
2155
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2156

    
2157
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2158
    for os_name, os_data in nimg.oslist.items():
2159
      assert os_data, "Empty OS status for OS %s?!" % os_name
2160
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2161
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2162
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2163
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2164
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2165
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2166
      # comparisons with the 'base' image
2167
      test = os_name not in base.oslist
2168
      _ErrorIf(test, constants.CV_ENODEOS, node,
2169
               "Extra OS %s not present on reference node (%s)",
2170
               os_name, base.name)
2171
      if test:
2172
        continue
2173
      assert base.oslist[os_name], "Base node has empty OS status?"
2174
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2175
      if not b_status:
2176
        # base OS is invalid, skipping
2177
        continue
2178
      for kind, a, b in [("API version", f_api, b_api),
2179
                         ("variants list", f_var, b_var),
2180
                         ("parameters", beautify_params(f_param),
2181
                          beautify_params(b_param))]:
2182
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2183
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2184
                 kind, os_name, base.name,
2185
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2186

    
2187
    # check any missing OSes
2188
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2189
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2190
             "OSes present on reference node %s but missing on this node: %s",
2191
             base.name, utils.CommaJoin(missing))
2192

    
2193
  def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2194
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2195

2196
    @type ninfo: L{objects.Node}
2197
    @param ninfo: the node to check
2198
    @param nresult: the remote results for the node
2199
    @type is_master: bool
2200
    @param is_master: Whether node is the master node
2201

2202
    """
2203
    node = ninfo.name
2204

    
2205
    if (is_master and
2206
        (constants.ENABLE_FILE_STORAGE or
2207
         constants.ENABLE_SHARED_FILE_STORAGE)):
2208
      try:
2209
        fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2210
      except KeyError:
2211
        # This should never happen
2212
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2213
                      "Node did not return forbidden file storage paths")
2214
      else:
2215
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2216
                      "Found forbidden file storage paths: %s",
2217
                      utils.CommaJoin(fspaths))
2218
    else:
2219
      self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2220
                    constants.CV_ENODEFILESTORAGEPATHS, node,
2221
                    "Node should not have returned forbidden file storage"
2222
                    " paths")
2223

    
2224
  def _VerifyOob(self, ninfo, nresult):
2225
    """Verifies out of band functionality of a node.
2226

2227
    @type ninfo: L{objects.Node}
2228
    @param ninfo: the node to check
2229
    @param nresult: the remote results for the node
2230

2231
    """
2232
    node = ninfo.name
2233
    # We just have to verify the paths on master and/or master candidates
2234
    # as the oob helper is invoked on the master
2235
    if ((ninfo.master_candidate or ninfo.master_capable) and
2236
        constants.NV_OOB_PATHS in nresult):
2237
      for path_result in nresult[constants.NV_OOB_PATHS]:
2238
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2239

    
2240
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2241
    """Verifies and updates the node volume data.
2242

2243
    This function will update a L{NodeImage}'s internal structures
2244
    with data from the remote call.
2245

2246
    @type ninfo: L{objects.Node}
2247
    @param ninfo: the node to check
2248
    @param nresult: the remote results for the node
2249
    @param nimg: the node image object
2250
    @param vg_name: the configured VG name
2251

2252
    """
2253
    node = ninfo.name
2254
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2255

    
2256
    nimg.lvm_fail = True
2257
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2258
    if vg_name is None:
2259
      pass
2260
    elif isinstance(lvdata, basestring):
2261
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2262
               utils.SafeEncode(lvdata))
2263
    elif not isinstance(lvdata, dict):
2264
      _ErrorIf(True, constants.CV_ENODELVM, node,
2265
               "rpc call to node failed (lvlist)")
2266
    else:
2267
      nimg.volumes = lvdata
2268
      nimg.lvm_fail = False
2269

    
2270
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2271
    """Verifies and updates the node instance list.
2272

2273
    If the listing was successful, then updates this node's instance
2274
    list. Otherwise, it marks the RPC call as failed for the instance
2275
    list key.
2276

2277
    @type ninfo: L{objects.Node}
2278
    @param ninfo: the node to check
2279
    @param nresult: the remote results for the node
2280
    @param nimg: the node image object
2281

2282
    """
2283
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2284
    test = not isinstance(idata, list)
2285
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2286
                  "rpc call to node failed (instancelist): %s",
2287
                  utils.SafeEncode(str(idata)))
2288
    if test:
2289
      nimg.hyp_fail = True
2290
    else:
2291
      nimg.instances = idata
2292

    
2293
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2294
    """Verifies and computes a node information map
2295

2296
    @type ninfo: L{objects.Node}
2297
    @param ninfo: the node to check
2298
    @param nresult: the remote results for the node
2299
    @param nimg: the node image object
2300
    @param vg_name: the configured VG name
2301

2302
    """
2303
    node = ninfo.name
2304
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2305

    
2306
    # try to read free memory (from the hypervisor)
2307
    hv_info = nresult.get(constants.NV_HVINFO, None)
2308
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2309
    _ErrorIf(test, constants.CV_ENODEHV, node,
2310
             "rpc call to node failed (hvinfo)")
2311
    if not test:
2312
      try:
2313
        nimg.mfree = int(hv_info["memory_free"])
2314
      except (ValueError, TypeError):
2315
        _ErrorIf(True, constants.CV_ENODERPC, node,
2316
                 "node returned invalid nodeinfo, check hypervisor")
2317

    
2318
    # FIXME: devise a free space model for file based instances as well
2319
    if vg_name is not None:
2320
      test = (constants.NV_VGLIST not in nresult or
2321
              vg_name not in nresult[constants.NV_VGLIST])
2322
      _ErrorIf(test, constants.CV_ENODELVM, node,
2323
               "node didn't return data for the volume group '%s'"
2324
               " - it is either missing or broken", vg_name)
2325
      if not test:
2326
        try:
2327
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2328
        except (ValueError, TypeError):
2329
          _ErrorIf(True, constants.CV_ENODERPC, node,
2330
                   "node returned invalid LVM info, check LVM status")
2331

    
2332
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2333
    """Gets per-disk status information for all instances.
2334

2335
    @type nodelist: list of strings
2336
    @param nodelist: Node names
2337
    @type node_image: dict of (name, L{objects.Node})
2338
    @param node_image: Node objects
2339
    @type instanceinfo: dict of (name, L{objects.Instance})
2340
    @param instanceinfo: Instance objects
2341
    @rtype: {instance: {node: [(succes, payload)]}}
2342
    @return: a dictionary of per-instance dictionaries with nodes as
2343
        keys and disk information as values; the disk information is a
2344
        list of tuples (success, payload)
2345

2346
    """
2347
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2348

    
2349
    node_disks = {}
2350
    node_disks_devonly = {}
2351
    diskless_instances = set()
2352
    diskless = constants.DT_DISKLESS
2353

    
2354
    for nname in nodelist:
2355
      node_instances = list(itertools.chain(node_image[nname].pinst,
2356
                                            node_image[nname].sinst))
2357
      diskless_instances.update(inst for inst in node_instances
2358
                                if instanceinfo[inst].disk_template == diskless)
2359
      disks = [(inst, disk)
2360
               for inst in node_instances
2361
               for disk in instanceinfo[inst].disks]
2362

    
2363
      if not disks:
2364
        # No need to collect data
2365
        continue
2366

    
2367
      node_disks[nname] = disks
2368

    
2369
      # _AnnotateDiskParams makes already copies of the disks
2370
      devonly = []
2371
      for (inst, dev) in disks:
2372
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2373
        self.cfg.SetDiskID(anno_disk, nname)
2374
        devonly.append(anno_disk)
2375

    
2376
      node_disks_devonly[nname] = devonly
2377

    
2378
    assert len(node_disks) == len(node_disks_devonly)
2379

    
2380
    # Collect data from all nodes with disks
2381
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2382
                                                          node_disks_devonly)
2383

    
2384
    assert len(result) == len(node_disks)
2385

    
2386
    instdisk = {}
2387

    
2388
    for (nname, nres) in result.items():
2389
      disks = node_disks[nname]
2390

    
2391
      if nres.offline:
2392
        # No data from this node
2393
        data = len(disks) * [(False, "node offline")]
2394
      else:
2395
        msg = nres.fail_msg
2396
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2397
                 "while getting disk information: %s", msg)
2398
        if msg:
2399
          # No data from this node
2400
          data = len(disks) * [(False, msg)]
2401
        else:
2402
          data = []
2403
          for idx, i in enumerate(nres.payload):
2404
            if isinstance(i, (tuple, list)) and len(i) == 2:
2405
              data.append(i)
2406
            else:
2407
              logging.warning("Invalid result from node %s, entry %d: %s",
2408
                              nname, idx, i)
2409
              data.append((False, "Invalid result from the remote node"))
2410

    
2411
      for ((inst, _), status) in zip(disks, data):
2412
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2413

    
2414
    # Add empty entries for diskless instances.
2415
    for inst in diskless_instances:
2416
      assert inst not in instdisk
2417
      instdisk[inst] = {}
2418

    
2419
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2420
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2421
                      compat.all(isinstance(s, (tuple, list)) and
2422
                                 len(s) == 2 for s in statuses)
2423
                      for inst, nnames in instdisk.items()
2424
                      for nname, statuses in nnames.items())
2425
    if __debug__:
2426
      instdisk_keys = set(instdisk)
2427
      instanceinfo_keys = set(instanceinfo)
2428
      assert instdisk_keys == instanceinfo_keys, \
2429
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2430
         (instdisk_keys, instanceinfo_keys))
2431

    
2432
    return instdisk
2433

    
2434
  @staticmethod
2435
  def _SshNodeSelector(group_uuid, all_nodes):
2436
    """Create endless iterators for all potential SSH check hosts.
2437

2438
    """
2439
    nodes = [node for node in all_nodes
2440
             if (node.group != group_uuid and
2441
                 not node.offline)]
2442
    keyfunc = operator.attrgetter("group")
2443

    
2444
    return map(itertools.cycle,
2445
               [sorted(map(operator.attrgetter("name"), names))
2446
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2447
                                                  keyfunc)])
2448

    
2449
  @classmethod
2450
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2451
    """Choose which nodes should talk to which other nodes.
2452

2453
    We will make nodes contact all nodes in their group, and one node from
2454
    every other group.
2455

2456
    @warning: This algorithm has a known issue if one node group is much
2457
      smaller than others (e.g. just one node). In such a case all other
2458
      nodes will talk to the single node.
2459

2460
    """
2461
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2462
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2463

    
2464
    return (online_nodes,
2465
            dict((name, sorted([i.next() for i in sel]))
2466
                 for name in online_nodes))
2467

    
2468
  def BuildHooksEnv(self):
2469
    """Build hooks env.
2470

2471
    Cluster-Verify hooks just ran in the post phase and their failure makes
2472
    the output be logged in the verify output and the verification to fail.
2473

2474
    """
2475
    env = {
2476
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2477
      }
2478

    
2479
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2480
               for node in self.my_node_info.values())
2481

    
2482
    return env
2483

    
2484
  def BuildHooksNodes(self):
2485
    """Build hooks nodes.
2486

2487
    """
2488
    return ([], self.my_node_names)
2489

    
2490
  def Exec(self, feedback_fn):
2491
    """Verify integrity of the node group, performing various test on nodes.
2492

2493
    """
2494
    # This method has too many local variables. pylint: disable=R0914
2495
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2496

    
2497
    if not self.my_node_names:
2498
      # empty node group
2499
      feedback_fn("* Empty node group, skipping verification")
2500
      return True
2501

    
2502
    self.bad = False
2503
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2504
    verbose = self.op.verbose
2505
    self._feedback_fn = feedback_fn
2506

    
2507
    vg_name = self.cfg.GetVGName()
2508
    drbd_helper = self.cfg.GetDRBDHelper()
2509
    cluster = self.cfg.GetClusterInfo()
2510
    hypervisors = cluster.enabled_hypervisors
2511
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2512

    
2513
    i_non_redundant = [] # Non redundant instances
2514
    i_non_a_balanced = [] # Non auto-balanced instances
2515
    i_offline = 0 # Count of offline instances
2516
    n_offline = 0 # Count of offline nodes
2517
    n_drained = 0 # Count of nodes being drained
2518
    node_vol_should = {}
2519

    
2520
    # FIXME: verify OS list
2521

    
2522
    # File verification
2523
    filemap = ComputeAncillaryFiles(cluster, False)
2524

    
2525
    # do local checksums
2526
    master_node = self.master_node = self.cfg.GetMasterNode()
2527
    master_ip = self.cfg.GetMasterIP()
2528

    
2529
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2530

    
2531
    user_scripts = []
2532
    if self.cfg.GetUseExternalMipScript():
2533
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2534

    
2535
    node_verify_param = {
2536
      constants.NV_FILELIST:
2537
        map(vcluster.MakeVirtualPath,
2538
            utils.UniqueSequence(filename
2539
                                 for files in filemap
2540
                                 for filename in files)),
2541
      constants.NV_NODELIST:
2542
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2543
                                  self.all_node_info.values()),
2544
      constants.NV_HYPERVISOR: hypervisors,
2545
      constants.NV_HVPARAMS:
2546
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2547
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2548
                                 for node in node_data_list
2549
                                 if not node.offline],
2550
      constants.NV_INSTANCELIST: hypervisors,
2551
      constants.NV_VERSION: None,
2552
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2553
      constants.NV_NODESETUP: None,
2554
      constants.NV_TIME: None,
2555
      constants.NV_MASTERIP: (master_node, master_ip),
2556
      constants.NV_OSLIST: None,
2557
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2558
      constants.NV_USERSCRIPTS: user_scripts,
2559
      }
2560

    
2561
    if vg_name is not None:
2562
      node_verify_param[constants.NV_VGLIST] = None
2563
      node_verify_param[constants.NV_LVLIST] = vg_name
2564
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2565

    
2566
    if drbd_helper:
2567
      node_verify_param[constants.NV_DRBDLIST] = None
2568
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2569

    
2570
    if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2571
      # Load file storage paths only from master node
2572
      node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2573

    
2574
    # bridge checks
2575
    # FIXME: this needs to be changed per node-group, not cluster-wide
2576
    bridges = set()
2577
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2578
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2579
      bridges.add(default_nicpp[constants.NIC_LINK])
2580
    for instance in self.my_inst_info.values():
2581
      for nic in instance.nics:
2582
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2583
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2584
          bridges.add(full_nic[constants.NIC_LINK])
2585

    
2586
    if bridges:
2587
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2588

    
2589
    # Build our expected cluster state
2590
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2591
                                                 name=node.name,
2592
                                                 vm_capable=node.vm_capable))
2593
                      for node in node_data_list)
2594

    
2595
    # Gather OOB paths
2596
    oob_paths = []
2597
    for node in self.all_node_info.values():
2598
      path = SupportsOob(self.cfg, node)
2599
      if path and path not in oob_paths:
2600
        oob_paths.append(path)
2601

    
2602
    if oob_paths:
2603
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2604

    
2605
    for instance in self.my_inst_names:
2606
      inst_config = self.my_inst_info[instance]
2607
      if inst_config.admin_state == constants.ADMINST_OFFLINE:
2608
        i_offline += 1
2609

    
2610
      for nname in inst_config.all_nodes:
2611
        if nname not in node_image:
2612
          gnode = self.NodeImage(name=nname)
2613
          gnode.ghost = (nname not in self.all_node_info)
2614
          node_image[nname] = gnode
2615

    
2616
      inst_config.MapLVsByNode(node_vol_should)
2617

    
2618
      pnode = inst_config.primary_node
2619
      node_image[pnode].pinst.append(instance)
2620

    
2621
      for snode in inst_config.secondary_nodes:
2622
        nimg = node_image[snode]
2623
        nimg.sinst.append(instance)
2624
        if pnode not in nimg.sbp:
2625
          nimg.sbp[pnode] = []
2626
        nimg.sbp[pnode].append(instance)
2627

    
2628
    es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2629
    # The value of exclusive_storage should be the same across the group, so if
2630
    # it's True for at least a node, we act as if it were set for all the nodes
2631
    self._exclusive_storage = compat.any(es_flags.values())
2632
    if self._exclusive_storage:
2633
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2634

    
2635
    # At this point, we have the in-memory data structures complete,
2636
    # except for the runtime information, which we'll gather next
2637

    
2638
    # Due to the way our RPC system works, exact response times cannot be
2639
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2640
    # time before and after executing the request, we can at least have a time
2641
    # window.
2642
    nvinfo_starttime = time.time()
2643
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2644
                                           node_verify_param,
2645
                                           self.cfg.GetClusterName())
2646
    nvinfo_endtime = time.time()
2647

    
2648
    if self.extra_lv_nodes and vg_name is not None:
2649
      extra_lv_nvinfo = \
2650
          self.rpc.call_node_verify(self.extra_lv_nodes,
2651
                                    {constants.NV_LVLIST: vg_name},
2652
                                    self.cfg.GetClusterName())
2653
    else:
2654
      extra_lv_nvinfo = {}
2655

    
2656
    all_drbd_map = self.cfg.ComputeDRBDMap()
2657

    
2658
    feedback_fn("* Gathering disk information (%s nodes)" %
2659
                len(self.my_node_names))
2660
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2661
                                     self.my_inst_info)
2662

    
2663
    feedback_fn("* Verifying configuration file consistency")
2664

    
2665
    # If not all nodes are being checked, we need to make sure the master node
2666
    # and a non-checked vm_capable node are in the list.
2667
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2668
    if absent_nodes:
2669
      vf_nvinfo = all_nvinfo.copy()
2670
      vf_node_info = list(self.my_node_info.values())
2671
      additional_nodes = []
2672
      if master_node not in self.my_node_info:
2673
        additional_nodes.append(master_node)
2674
        vf_node_info.append(self.all_node_info[master_node])
2675
      # Add the first vm_capable node we find which is not included,
2676
      # excluding the master node (which we already have)
2677
      for node in absent_nodes:
2678
        nodeinfo = self.all_node_info[node]
2679
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2680
            node != master_node):
2681
          additional_nodes.append(node)
2682
          vf_node_info.append(self.all_node_info[node])
2683
          break
2684
      key = constants.NV_FILELIST
2685
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2686
                                                 {key: node_verify_param[key]},
2687
                                                 self.cfg.GetClusterName()))
2688
    else:
2689
      vf_nvinfo = all_nvinfo
2690
      vf_node_info = self.my_node_info.values()
2691

    
2692
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2693

    
2694
    feedback_fn("* Verifying node status")
2695

    
2696
    refos_img = None
2697

    
2698
    for node_i in node_data_list:
2699
      node = node_i.name
2700
      nimg = node_image[node]
2701

    
2702
      if node_i.offline:
2703
        if verbose:
2704
          feedback_fn("* Skipping offline node %s" % (node,))
2705
        n_offline += 1
2706
        continue
2707

    
2708
      if node == master_node:
2709
        ntype = "master"
2710
      elif node_i.master_candidate:
2711
        ntype = "master candidate"
2712
      elif node_i.drained:
2713
        ntype = "drained"
2714
        n_drained += 1
2715
      else:
2716
        ntype = "regular"
2717
      if verbose:
2718
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2719

    
2720
      msg = all_nvinfo[node].fail_msg
2721
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2722
               msg)
2723
      if msg:
2724
        nimg.rpc_fail = True
2725
        continue
2726

    
2727
      nresult = all_nvinfo[node].payload
2728

    
2729
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2730
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2731
      self._VerifyNodeNetwork(node_i, nresult)
2732
      self._VerifyNodeUserScripts(node_i, nresult)
2733
      self._VerifyOob(node_i, nresult)
2734
      self._VerifyFileStoragePaths(node_i, nresult,
2735
                                   node == master_node)
2736

    
2737
      if nimg.vm_capable:
2738
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2739
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2740
                             all_drbd_map)
2741

    
2742
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2743
        self._UpdateNodeInstances(node_i, nresult, nimg)
2744
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2745
        self._UpdateNodeOS(node_i, nresult, nimg)
2746

    
2747
        if not nimg.os_fail:
2748
          if refos_img is None:
2749
            refos_img = nimg
2750
          self._VerifyNodeOS(node_i, nimg, refos_img)
2751
        self._VerifyNodeBridges(node_i, nresult, bridges)
2752

    
2753
        # Check whether all running instancies are primary for the node. (This
2754
        # can no longer be done from _VerifyInstance below, since some of the
2755
        # wrong instances could be from other node groups.)
2756
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2757

    
2758
        for inst in non_primary_inst:
2759
          test = inst in self.all_inst_info
2760
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2761
                   "instance should not run on node %s", node_i.name)
2762
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2763
                   "node is running unknown instance %s", inst)
2764

    
2765
    self._VerifyGroupLVM(node_image, vg_name)
2766

    
2767
    for node, result in extra_lv_nvinfo.items():
2768
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2769
                              node_image[node], vg_name)
2770

    
2771
    feedback_fn("* Verifying instance status")
2772
    for instance in self.my_inst_names:
2773
      if verbose:
2774
        feedback_fn("* Verifying instance %s" % instance)
2775
      inst_config = self.my_inst_info[instance]
2776
      self._VerifyInstance(instance, inst_config, node_image,
2777
                           instdisk[instance])
2778

    
2779
      # If the instance is non-redundant we cannot survive losing its primary
2780
      # node, so we are not N+1 compliant.
2781
      if inst_config.disk_template not in constants.DTS_MIRRORED:
2782
        i_non_redundant.append(instance)
2783

    
2784
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2785
        i_non_a_balanced.append(instance)
2786

    
2787
    feedback_fn("* Verifying orphan volumes")
2788
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2789

    
2790
    # We will get spurious "unknown volume" warnings if any node of this group
2791
    # is secondary for an instance whose primary is in another group. To avoid
2792
    # them, we find these instances and add their volumes to node_vol_should.
2793
    for inst in self.all_inst_info.values():
2794
      for secondary in inst.secondary_nodes:
2795
        if (secondary in self.my_node_info
2796
            and inst.name not in self.my_inst_info):
2797
          inst.MapLVsByNode(node_vol_should)
2798
          break
2799

    
2800
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2801

    
2802
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2803
      feedback_fn("* Verifying N+1 Memory redundancy")
2804
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2805

    
2806
    feedback_fn("* Other Notes")
2807
    if i_non_redundant:
2808
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2809
                  % len(i_non_redundant))
2810

    
2811
    if i_non_a_balanced:
2812
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2813
                  % len(i_non_a_balanced))
2814

    
2815
    if i_offline:
2816
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2817

    
2818
    if n_offline:
2819
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2820

    
2821
    if n_drained:
2822
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2823

    
2824
    return not self.bad
2825

    
2826
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2827
    """Analyze the post-hooks' result
2828

2829
    This method analyses the hook result, handles it, and sends some
2830
    nicely-formatted feedback back to the user.
2831

2832
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2833
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2834
    @param hooks_results: the results of the multi-node hooks rpc call
2835
    @param feedback_fn: function used send feedback back to the caller
2836
    @param lu_result: previous Exec result
2837
    @return: the new Exec result, based on the previous result
2838
        and hook results
2839

2840
    """
2841
    # We only really run POST phase hooks, only for non-empty groups,
2842
    # and are only interested in their results
2843
    if not self.my_node_names:
2844
      # empty node group
2845
      pass
2846
    elif phase == constants.HOOKS_PHASE_POST:
2847
      # Used to change hooks' output to proper indentation
2848
      feedback_fn("* Hooks Results")
2849
      assert hooks_results, "invalid result from hooks"
2850

    
2851
      for node_name in hooks_results:
2852
        res = hooks_results[node_name]
2853
        msg = res.fail_msg
2854
        test = msg and not res.offline
2855
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2856
                      "Communication failure in hooks execution: %s", msg)
2857
        if res.offline or msg:
2858
          # No need to investigate payload if node is offline or gave
2859
          # an error.
2860
          continue
2861
        for script, hkr, output in res.payload:
2862
          test = hkr == constants.HKR_FAIL
2863
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2864
                        "Script %s failed, output:", script)
2865
          if test:
2866
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2867
            feedback_fn("%s" % output)
2868
            lu_result = False
2869

    
2870
    return lu_result
2871

    
2872

    
2873
class LUClusterVerifyDisks(NoHooksLU):
2874
  """Verifies the cluster disks status.
2875

2876
  """
2877
  REQ_BGL = False
2878

    
2879
  def ExpandNames(self):
2880
    self.share_locks = ShareAll()
2881
    self.needed_locks = {
2882
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2883
      }
2884

    
2885
  def Exec(self, feedback_fn):
2886
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2887

    
2888
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2889
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2890
                           for group in group_names])