Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 5e4475de

History | View | Annotate | Download (105 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.name,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, master_params.name)
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
170
                                                     master_params, ems)
171
    if result.fail_msg:
172
      self.LogWarning("Error disabling the master IP address: %s",
173
                      result.fail_msg)
174

    
175
    return master_params.name
176

    
177

    
178
class LUClusterPostInit(LogicalUnit):
179
  """Logical unit for running hooks after cluster initialization.
180

181
  """
182
  HPATH = "cluster-init"
183
  HTYPE = constants.HTYPE_CLUSTER
184

    
185
  def BuildHooksEnv(self):
186
    """Build hooks env.
187

188
    """
189
    return {
190
      "OP_TARGET": self.cfg.GetClusterName(),
191
      }
192

    
193
  def BuildHooksNodes(self):
194
    """Build hooks nodes.
195

196
    """
197
    return ([], [self.cfg.GetMasterNode()])
198

    
199
  def Exec(self, feedback_fn):
200
    """Nothing to do.
201

202
    """
203
    return True
204

    
205

    
206
class ClusterQuery(QueryBase):
207
  FIELDS = query.CLUSTER_FIELDS
208

    
209
  #: Do not sort (there is only one item)
210
  SORT_FIELD = None
211

    
212
  def ExpandNames(self, lu):
213
    lu.needed_locks = {}
214

    
215
    # The following variables interact with _QueryBase._GetNames
216
    self.wanted = locking.ALL_SET
217
    self.do_locking = self.use_locking
218

    
219
    if self.do_locking:
220
      raise errors.OpPrereqError("Can not use locking for cluster queries",
221
                                 errors.ECODE_INVAL)
222

    
223
  def DeclareLocks(self, lu, level):
224
    pass
225

    
226
  def _GetQueryData(self, lu):
227
    """Computes the list of nodes and their attributes.
228

229
    """
230
    # Locking is not used
231
    assert not (compat.any(lu.glm.is_owned(level)
232
                           for level in locking.LEVELS
233
                           if level != locking.LEVEL_CLUSTER) or
234
                self.do_locking or self.use_locking)
235

    
236
    if query.CQ_CONFIG in self.requested_data:
237
      cluster = lu.cfg.GetClusterInfo()
238
    else:
239
      cluster = NotImplemented
240

    
241
    if query.CQ_QUEUE_DRAINED in self.requested_data:
242
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243
    else:
244
      drain_flag = NotImplemented
245

    
246
    if query.CQ_WATCHER_PAUSE in self.requested_data:
247
      master_name = lu.cfg.GetMasterNode()
248

    
249
      result = lu.rpc.call_get_watcher_pause(master_name)
250
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
251
                   master_name)
252

    
253
      watcher_pause = result.payload
254
    else:
255
      watcher_pause = NotImplemented
256

    
257
    return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
258

    
259

    
260
class LUClusterQuery(NoHooksLU):
261
  """Query cluster configuration.
262

263
  """
264
  REQ_BGL = False
265

    
266
  def ExpandNames(self):
267
    self.needed_locks = {}
268

    
269
  def Exec(self, feedback_fn):
270
    """Return cluster config.
271

272
    """
273
    cluster = self.cfg.GetClusterInfo()
274
    os_hvp = {}
275

    
276
    # Filter just for enabled hypervisors
277
    for os_name, hv_dict in cluster.os_hvp.items():
278
      os_hvp[os_name] = {}
279
      for hv_name, hv_params in hv_dict.items():
280
        if hv_name in cluster.enabled_hypervisors:
281
          os_hvp[os_name][hv_name] = hv_params
282

    
283
    # Convert ip_family to ip_version
284
    primary_ip_version = constants.IP4_VERSION
285
    if cluster.primary_ip_family == netutils.IP6Address.family:
286
      primary_ip_version = constants.IP6_VERSION
287

    
288
    result = {
289
      "software_version": constants.RELEASE_VERSION,
290
      "protocol_version": constants.PROTOCOL_VERSION,
291
      "config_version": constants.CONFIG_VERSION,
292
      "os_api_version": max(constants.OS_API_VERSIONS),
293
      "export_version": constants.EXPORT_VERSION,
294
      "vcs_version": constants.VCS_VERSION,
295
      "architecture": runtime.GetArchInfo(),
296
      "name": cluster.cluster_name,
297
      "master": cluster.master_node,
298
      "default_hypervisor": cluster.primary_hypervisor,
299
      "enabled_hypervisors": cluster.enabled_hypervisors,
300
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301
                        for hypervisor_name in cluster.enabled_hypervisors]),
302
      "os_hvp": os_hvp,
303
      "beparams": cluster.beparams,
304
      "osparams": cluster.osparams,
305
      "ipolicy": cluster.ipolicy,
306
      "nicparams": cluster.nicparams,
307
      "ndparams": cluster.ndparams,
308
      "diskparams": cluster.diskparams,
309
      "candidate_pool_size": cluster.candidate_pool_size,
310
      "master_netdev": cluster.master_netdev,
311
      "master_netmask": cluster.master_netmask,
312
      "use_external_mip_script": cluster.use_external_mip_script,
313
      "volume_group_name": cluster.volume_group_name,
314
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
315
      "file_storage_dir": cluster.file_storage_dir,
316
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
317
      "maintain_node_health": cluster.maintain_node_health,
318
      "ctime": cluster.ctime,
319
      "mtime": cluster.mtime,
320
      "uuid": cluster.uuid,
321
      "tags": list(cluster.GetTags()),
322
      "uid_pool": cluster.uid_pool,
323
      "default_iallocator": cluster.default_iallocator,
324
      "reserved_lvs": cluster.reserved_lvs,
325
      "primary_ip_version": primary_ip_version,
326
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327
      "hidden_os": cluster.hidden_os,
328
      "blacklisted_os": cluster.blacklisted_os,
329
      "enabled_disk_templates": cluster.enabled_disk_templates,
330
      }
331

    
332
    return result
333

    
334

    
335
class LUClusterRedistConf(NoHooksLU):
336
  """Force the redistribution of cluster configuration.
337

338
  This is a very simple LU.
339

340
  """
341
  REQ_BGL = False
342

    
343
  def ExpandNames(self):
344
    self.needed_locks = {
345
      locking.LEVEL_NODE: locking.ALL_SET,
346
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
347
    }
348
    self.share_locks = ShareAll()
349

    
350
  def Exec(self, feedback_fn):
351
    """Redistribute the configuration.
352

353
    """
354
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
355
    RedistributeAncillaryFiles(self)
356

    
357

    
358
class LUClusterRename(LogicalUnit):
359
  """Rename the cluster.
360

361
  """
362
  HPATH = "cluster-rename"
363
  HTYPE = constants.HTYPE_CLUSTER
364

    
365
  def BuildHooksEnv(self):
366
    """Build hooks env.
367

368
    """
369
    return {
370
      "OP_TARGET": self.cfg.GetClusterName(),
371
      "NEW_NAME": self.op.name,
372
      }
373

    
374
  def BuildHooksNodes(self):
375
    """Build hooks nodes.
376

377
    """
378
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
379

    
380
  def CheckPrereq(self):
381
    """Verify that the passed name is a valid one.
382

383
    """
384
    hostname = netutils.GetHostname(name=self.op.name,
385
                                    family=self.cfg.GetPrimaryIPFamily())
386

    
387
    new_name = hostname.name
388
    self.ip = new_ip = hostname.ip
389
    old_name = self.cfg.GetClusterName()
390
    old_ip = self.cfg.GetMasterIP()
391
    if new_name == old_name and new_ip == old_ip:
392
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
393
                                 " cluster has changed",
394
                                 errors.ECODE_INVAL)
395
    if new_ip != old_ip:
396
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
397
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
398
                                   " reachable on the network" %
399
                                   new_ip, errors.ECODE_NOTUNIQUE)
400

    
401
    self.op.name = new_name
402

    
403
  def Exec(self, feedback_fn):
404
    """Rename the cluster.
405

406
    """
407
    clustername = self.op.name
408
    new_ip = self.ip
409

    
410
    # shutdown the master IP
411
    master_params = self.cfg.GetMasterNetworkParameters()
412
    ems = self.cfg.GetUseExternalMipScript()
413
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
414
                                                     master_params, ems)
415
    result.Raise("Could not disable the master role")
416

    
417
    try:
418
      cluster = self.cfg.GetClusterInfo()
419
      cluster.cluster_name = clustername
420
      cluster.master_ip = new_ip
421
      self.cfg.Update(cluster, feedback_fn)
422

    
423
      # update the known hosts file
424
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
425
      node_list = self.cfg.GetOnlineNodeList()
426
      try:
427
        node_list.remove(master_params.name)
428
      except ValueError:
429
        pass
430
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
431
    finally:
432
      master_params.ip = new_ip
433
      result = self.rpc.call_node_activate_master_ip(master_params.name,
434
                                                     master_params, ems)
435
      msg = result.fail_msg
436
      if msg:
437
        self.LogWarning("Could not re-enable the master role on"
438
                        " the master, please restart manually: %s", msg)
439

    
440
    return clustername
441

    
442

    
443
class LUClusterRepairDiskSizes(NoHooksLU):
444
  """Verifies the cluster disks sizes.
445

446
  """
447
  REQ_BGL = False
448

    
449
  def ExpandNames(self):
450
    if self.op.instances:
451
      self.wanted_names = GetWantedInstances(self, self.op.instances)
452
      # Not getting the node allocation lock as only a specific set of
453
      # instances (and their nodes) is going to be acquired
454
      self.needed_locks = {
455
        locking.LEVEL_NODE_RES: [],
456
        locking.LEVEL_INSTANCE: self.wanted_names,
457
        }
458
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
459
    else:
460
      self.wanted_names = None
461
      self.needed_locks = {
462
        locking.LEVEL_NODE_RES: locking.ALL_SET,
463
        locking.LEVEL_INSTANCE: locking.ALL_SET,
464

    
465
        # This opcode is acquires the node locks for all instances
466
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
467
        }
468

    
469
    self.share_locks = {
470
      locking.LEVEL_NODE_RES: 1,
471
      locking.LEVEL_INSTANCE: 0,
472
      locking.LEVEL_NODE_ALLOC: 1,
473
      }
474

    
475
  def DeclareLocks(self, level):
476
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
477
      self._LockInstancesNodes(primary_only=True, level=level)
478

    
479
  def CheckPrereq(self):
480
    """Check prerequisites.
481

482
    This only checks the optional instance list against the existing names.
483

484
    """
485
    if self.wanted_names is None:
486
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
487

    
488
    self.wanted_instances = \
489
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
490

    
491
  def _EnsureChildSizes(self, disk):
492
    """Ensure children of the disk have the needed disk size.
493

494
    This is valid mainly for DRBD8 and fixes an issue where the
495
    children have smaller disk size.
496

497
    @param disk: an L{ganeti.objects.Disk} object
498

499
    """
500
    if disk.dev_type == constants.LD_DRBD8:
501
      assert disk.children, "Empty children for DRBD8?"
502
      fchild = disk.children[0]
503
      mismatch = fchild.size < disk.size
504
      if mismatch:
505
        self.LogInfo("Child disk has size %d, parent %d, fixing",
506
                     fchild.size, disk.size)
507
        fchild.size = disk.size
508

    
509
      # and we recurse on this child only, not on the metadev
510
      return self._EnsureChildSizes(fchild) or mismatch
511
    else:
512
      return False
513

    
514
  def Exec(self, feedback_fn):
515
    """Verify the size of cluster disks.
516

517
    """
518
    # TODO: check child disks too
519
    # TODO: check differences in size between primary/secondary nodes
520
    per_node_disks = {}
521
    for instance in self.wanted_instances:
522
      pnode = instance.primary_node
523
      if pnode not in per_node_disks:
524
        per_node_disks[pnode] = []
525
      for idx, disk in enumerate(instance.disks):
526
        per_node_disks[pnode].append((instance, idx, disk))
527

    
528
    assert not (frozenset(per_node_disks.keys()) -
529
                self.owned_locks(locking.LEVEL_NODE_RES)), \
530
      "Not owning correct locks"
531
    assert not self.owned_locks(locking.LEVEL_NODE)
532

    
533
    changed = []
534
    for node, dskl in per_node_disks.items():
535
      newl = [v[2].Copy() for v in dskl]
536
      for dsk in newl:
537
        self.cfg.SetDiskID(dsk, node)
538
      result = self.rpc.call_blockdev_getsize(node, newl)
539
      if result.fail_msg:
540
        self.LogWarning("Failure in blockdev_getsize call to node"
541
                        " %s, ignoring", node)
542
        continue
543
      if len(result.payload) != len(dskl):
544
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
545
                        " result.payload=%s", node, len(dskl), result.payload)
546
        self.LogWarning("Invalid result from node %s, ignoring node results",
547
                        node)
548
        continue
549
      for ((instance, idx, disk), size) in zip(dskl, result.payload):
550
        if size is None:
551
          self.LogWarning("Disk %d of instance %s did not return size"
552
                          " information, ignoring", idx, instance.name)
553
          continue
554
        if not isinstance(size, (int, long)):
555
          self.LogWarning("Disk %d of instance %s did not return valid"
556
                          " size information, ignoring", idx, instance.name)
557
          continue
558
        size = size >> 20
559
        if size != disk.size:
560
          self.LogInfo("Disk %d of instance %s has mismatched size,"
561
                       " correcting: recorded %d, actual %d", idx,
562
                       instance.name, disk.size, size)
563
          disk.size = size
564
          self.cfg.Update(instance, feedback_fn)
565
          changed.append((instance.name, idx, size))
566
        if self._EnsureChildSizes(disk):
567
          self.cfg.Update(instance, feedback_fn)
568
          changed.append((instance.name, idx, disk.size))
569
    return changed
570

    
571

    
572
def _ValidateNetmask(cfg, netmask):
573
  """Checks if a netmask is valid.
574

575
  @type cfg: L{config.ConfigWriter}
576
  @param cfg: The cluster configuration
577
  @type netmask: int
578
  @param netmask: the netmask to be verified
579
  @raise errors.OpPrereqError: if the validation fails
580

581
  """
582
  ip_family = cfg.GetPrimaryIPFamily()
583
  try:
584
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
585
  except errors.ProgrammerError:
586
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
587
                               ip_family, errors.ECODE_INVAL)
588
  if not ipcls.ValidateNetmask(netmask):
589
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
590
                               (netmask), errors.ECODE_INVAL)
591

    
592

    
593
class LUClusterSetParams(LogicalUnit):
594
  """Change the parameters of the cluster.
595

596
  """
597
  HPATH = "cluster-modify"
598
  HTYPE = constants.HTYPE_CLUSTER
599
  REQ_BGL = False
600

    
601
  def CheckArguments(self):
602
    """Check parameters
603

604
    """
605
    if self.op.uid_pool:
606
      uidpool.CheckUidPool(self.op.uid_pool)
607

    
608
    if self.op.add_uids:
609
      uidpool.CheckUidPool(self.op.add_uids)
610

    
611
    if self.op.remove_uids:
612
      uidpool.CheckUidPool(self.op.remove_uids)
613

    
614
    if self.op.master_netmask is not None:
615
      _ValidateNetmask(self.cfg, self.op.master_netmask)
616

    
617
    if self.op.diskparams:
618
      for dt_params in self.op.diskparams.values():
619
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
620
      try:
621
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
622
      except errors.OpPrereqError, err:
623
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
624
                                   errors.ECODE_INVAL)
625

    
626
  def ExpandNames(self):
627
    # FIXME: in the future maybe other cluster params won't require checking on
628
    # all nodes to be modified.
629
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
630
    # resource locks the right thing, shouldn't it be the BGL instead?
631
    self.needed_locks = {
632
      locking.LEVEL_NODE: locking.ALL_SET,
633
      locking.LEVEL_INSTANCE: locking.ALL_SET,
634
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
635
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
636
    }
637
    self.share_locks = ShareAll()
638

    
639
  def BuildHooksEnv(self):
640
    """Build hooks env.
641

642
    """
643
    return {
644
      "OP_TARGET": self.cfg.GetClusterName(),
645
      "NEW_VG_NAME": self.op.vg_name,
646
      }
647

    
648
  def BuildHooksNodes(self):
649
    """Build hooks nodes.
650

651
    """
652
    mn = self.cfg.GetMasterNode()
653
    return ([mn], [mn])
654

    
655
  def CheckPrereq(self):
656
    """Check prerequisites.
657

658
    This checks whether the given params don't conflict and
659
    if the given volume group is valid.
660

661
    """
662
    if self.op.vg_name is not None and not self.op.vg_name:
663
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
664
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
665
                                   " instances exist", errors.ECODE_INVAL)
666

    
667
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
668
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
669
        raise errors.OpPrereqError("Cannot disable drbd helper while"
670
                                   " drbd-based instances exist",
671
                                   errors.ECODE_INVAL)
672

    
673
    node_list = self.owned_locks(locking.LEVEL_NODE)
674

    
675
    vm_capable_nodes = [node.name
676
                        for node in self.cfg.GetAllNodesInfo().values()
677
                        if node.name in node_list and node.vm_capable]
678

    
679
    # if vg_name not None, checks given volume group on all nodes
680
    if self.op.vg_name:
681
      vglist = self.rpc.call_vg_list(vm_capable_nodes)
682
      for node in vm_capable_nodes:
683
        msg = vglist[node].fail_msg
684
        if msg:
685
          # ignoring down node
686
          self.LogWarning("Error while gathering data on node %s"
687
                          " (ignoring node): %s", node, msg)
688
          continue
689
        vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
690
                                              self.op.vg_name,
691
                                              constants.MIN_VG_SIZE)
692
        if vgstatus:
693
          raise errors.OpPrereqError("Error on node '%s': %s" %
694
                                     (node, vgstatus), errors.ECODE_ENVIRON)
695

    
696
    if self.op.drbd_helper:
697
      # checks given drbd helper on all nodes
698
      helpers = self.rpc.call_drbd_helper(node_list)
699
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
700
        if ninfo.offline:
701
          self.LogInfo("Not checking drbd helper on offline node %s", node)
702
          continue
703
        msg = helpers[node].fail_msg
704
        if msg:
705
          raise errors.OpPrereqError("Error checking drbd helper on node"
706
                                     " '%s': %s" % (node, msg),
707
                                     errors.ECODE_ENVIRON)
708
        node_helper = helpers[node].payload
709
        if node_helper != self.op.drbd_helper:
710
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
711
                                     (node, node_helper), errors.ECODE_ENVIRON)
712

    
713
    self.cluster = cluster = self.cfg.GetClusterInfo()
714
    # validate params changes
715
    if self.op.beparams:
716
      objects.UpgradeBeParams(self.op.beparams)
717
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
718
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
719

    
720
    if self.op.ndparams:
721
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
722
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
723

    
724
      # TODO: we need a more general way to handle resetting
725
      # cluster-level parameters to default values
726
      if self.new_ndparams["oob_program"] == "":
727
        self.new_ndparams["oob_program"] = \
728
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
729

    
730
    if self.op.hv_state:
731
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
732
                                           self.cluster.hv_state_static)
733
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
734
                               for hv, values in new_hv_state.items())
735

    
736
    if self.op.disk_state:
737
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
738
                                               self.cluster.disk_state_static)
739
      self.new_disk_state = \
740
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
741
                            for name, values in svalues.items()))
742
             for storage, svalues in new_disk_state.items())
743

    
744
    if self.op.ipolicy:
745
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
746
                                           group_policy=False)
747

    
748
      all_instances = self.cfg.GetAllInstancesInfo().values()
749
      violations = set()
750
      for group in self.cfg.GetAllNodeGroupsInfo().values():
751
        instances = frozenset([inst for inst in all_instances
752
                               if compat.any(node in group.members
753
                                             for node in inst.all_nodes)])
754
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
755
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
756
        new = ComputeNewInstanceViolations(ipol,
757
                                           new_ipolicy, instances, self.cfg)
758
        if new:
759
          violations.update(new)
760

    
761
      if violations:
762
        self.LogWarning("After the ipolicy change the following instances"
763
                        " violate them: %s",
764
                        utils.CommaJoin(utils.NiceSort(violations)))
765

    
766
    if self.op.nicparams:
767
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
768
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
769
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
770
      nic_errors = []
771

    
772
      # check all instances for consistency
773
      for instance in self.cfg.GetAllInstancesInfo().values():
774
        for nic_idx, nic in enumerate(instance.nics):
775
          params_copy = copy.deepcopy(nic.nicparams)
776
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
777

    
778
          # check parameter syntax
779
          try:
780
            objects.NIC.CheckParameterSyntax(params_filled)
781
          except errors.ConfigurationError, err:
782
            nic_errors.append("Instance %s, nic/%d: %s" %
783
                              (instance.name, nic_idx, err))
784

    
785
          # if we're moving instances to routed, check that they have an ip
786
          target_mode = params_filled[constants.NIC_MODE]
787
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
788
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
789
                              " address" % (instance.name, nic_idx))
790
      if nic_errors:
791
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
792
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
793

    
794
    # hypervisor list/parameters
795
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
796
    if self.op.hvparams:
797
      for hv_name, hv_dict in self.op.hvparams.items():
798
        if hv_name not in self.new_hvparams:
799
          self.new_hvparams[hv_name] = hv_dict
800
        else:
801
          self.new_hvparams[hv_name].update(hv_dict)
802

    
803
    # disk template parameters
804
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
805
    if self.op.diskparams:
806
      for dt_name, dt_params in self.op.diskparams.items():
807
        if dt_name not in self.new_diskparams:
808
          self.new_diskparams[dt_name] = dt_params
809
        else:
810
          self.new_diskparams[dt_name].update(dt_params)
811

    
812
    # os hypervisor parameters
813
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
814
    if self.op.os_hvp:
815
      for os_name, hvs in self.op.os_hvp.items():
816
        if os_name not in self.new_os_hvp:
817
          self.new_os_hvp[os_name] = hvs
818
        else:
819
          for hv_name, hv_dict in hvs.items():
820
            if hv_dict is None:
821
              # Delete if it exists
822
              self.new_os_hvp[os_name].pop(hv_name, None)
823
            elif hv_name not in self.new_os_hvp[os_name]:
824
              self.new_os_hvp[os_name][hv_name] = hv_dict
825
            else:
826
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
827

    
828
    # os parameters
829
    self.new_osp = objects.FillDict(cluster.osparams, {})
830
    if self.op.osparams:
831
      for os_name, osp in self.op.osparams.items():
832
        if os_name not in self.new_osp:
833
          self.new_osp[os_name] = {}
834

    
835
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
836
                                                 use_none=True)
837

    
838
        if not self.new_osp[os_name]:
839
          # we removed all parameters
840
          del self.new_osp[os_name]
841
        else:
842
          # check the parameter validity (remote check)
843
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
844
                        os_name, self.new_osp[os_name])
845

    
846
    # changes to the hypervisor list
847
    if self.op.enabled_hypervisors is not None:
848
      self.hv_list = self.op.enabled_hypervisors
849
      for hv in self.hv_list:
850
        # if the hypervisor doesn't already exist in the cluster
851
        # hvparams, we initialize it to empty, and then (in both
852
        # cases) we make sure to fill the defaults, as we might not
853
        # have a complete defaults list if the hypervisor wasn't
854
        # enabled before
855
        if hv not in new_hvp:
856
          new_hvp[hv] = {}
857
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
858
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
859
    else:
860
      self.hv_list = cluster.enabled_hypervisors
861

    
862
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
863
      # either the enabled list has changed, or the parameters have, validate
864
      for hv_name, hv_params in self.new_hvparams.items():
865
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
866
            (self.op.enabled_hypervisors and
867
             hv_name in self.op.enabled_hypervisors)):
868
          # either this is a new hypervisor, or its parameters have changed
869
          hv_class = hypervisor.GetHypervisorClass(hv_name)
870
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
871
          hv_class.CheckParameterSyntax(hv_params)
872
          CheckHVParams(self, node_list, hv_name, hv_params)
873

    
874
    self._CheckDiskTemplateConsistency()
875

    
876
    if self.op.os_hvp:
877
      # no need to check any newly-enabled hypervisors, since the
878
      # defaults have already been checked in the above code-block
879
      for os_name, os_hvp in self.new_os_hvp.items():
880
        for hv_name, hv_params in os_hvp.items():
881
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
882
          # we need to fill in the new os_hvp on top of the actual hv_p
883
          cluster_defaults = self.new_hvparams.get(hv_name, {})
884
          new_osp = objects.FillDict(cluster_defaults, hv_params)
885
          hv_class = hypervisor.GetHypervisorClass(hv_name)
886
          hv_class.CheckParameterSyntax(new_osp)
887
          CheckHVParams(self, node_list, hv_name, new_osp)
888

    
889
    if self.op.default_iallocator:
890
      alloc_script = utils.FindFile(self.op.default_iallocator,
891
                                    constants.IALLOCATOR_SEARCH_PATH,
892
                                    os.path.isfile)
893
      if alloc_script is None:
894
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
895
                                   " specified" % self.op.default_iallocator,
896
                                   errors.ECODE_INVAL)
897

    
898
  def _CheckDiskTemplateConsistency(self):
899
    """Check whether the disk templates that are going to be disabled
900
       are still in use by some instances.
901

902
    """
903
    if self.op.enabled_disk_templates:
904
      cluster = self.cfg.GetClusterInfo()
905
      instances = self.cfg.GetAllInstancesInfo()
906

    
907
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
908
        - set(self.op.enabled_disk_templates)
909
      for instance in instances.itervalues():
910
        if instance.disk_template in disk_templates_to_remove:
911
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
912
                                     " because instance '%s' is using it." %
913
                                     (instance.disk_template, instance.name))
914

    
915
  def Exec(self, feedback_fn):
916
    """Change the parameters of the cluster.
917

918
    """
919
    if self.op.vg_name is not None:
920
      new_volume = self.op.vg_name
921
      if not new_volume:
922
        new_volume = None
923
      if new_volume != self.cfg.GetVGName():
924
        self.cfg.SetVGName(new_volume)
925
      else:
926
        feedback_fn("Cluster LVM configuration already in desired"
927
                    " state, not changing")
928
    if self.op.drbd_helper is not None:
929
      new_helper = self.op.drbd_helper
930
      if not new_helper:
931
        new_helper = None
932
      if new_helper != self.cfg.GetDRBDHelper():
933
        self.cfg.SetDRBDHelper(new_helper)
934
      else:
935
        feedback_fn("Cluster DRBD helper already in desired state,"
936
                    " not changing")
937
    if self.op.hvparams:
938
      self.cluster.hvparams = self.new_hvparams
939
    if self.op.os_hvp:
940
      self.cluster.os_hvp = self.new_os_hvp
941
    if self.op.enabled_hypervisors is not None:
942
      self.cluster.hvparams = self.new_hvparams
943
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
944
    if self.op.enabled_disk_templates:
945
      self.cluster.enabled_disk_templates = \
946
        list(set(self.op.enabled_disk_templates))
947
    if self.op.beparams:
948
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
949
    if self.op.nicparams:
950
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
951
    if self.op.ipolicy:
952
      self.cluster.ipolicy = self.new_ipolicy
953
    if self.op.osparams:
954
      self.cluster.osparams = self.new_osp
955
    if self.op.ndparams:
956
      self.cluster.ndparams = self.new_ndparams
957
    if self.op.diskparams:
958
      self.cluster.diskparams = self.new_diskparams
959
    if self.op.hv_state:
960
      self.cluster.hv_state_static = self.new_hv_state
961
    if self.op.disk_state:
962
      self.cluster.disk_state_static = self.new_disk_state
963

    
964
    if self.op.candidate_pool_size is not None:
965
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
966
      # we need to update the pool size here, otherwise the save will fail
967
      AdjustCandidatePool(self, [])
968

    
969
    if self.op.maintain_node_health is not None:
970
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
971
        feedback_fn("Note: CONFD was disabled at build time, node health"
972
                    " maintenance is not useful (still enabling it)")
973
      self.cluster.maintain_node_health = self.op.maintain_node_health
974

    
975
    if self.op.modify_etc_hosts is not None:
976
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
977

    
978
    if self.op.prealloc_wipe_disks is not None:
979
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
980

    
981
    if self.op.add_uids is not None:
982
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
983

    
984
    if self.op.remove_uids is not None:
985
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
986

    
987
    if self.op.uid_pool is not None:
988
      self.cluster.uid_pool = self.op.uid_pool
989

    
990
    if self.op.default_iallocator is not None:
991
      self.cluster.default_iallocator = self.op.default_iallocator
992

    
993
    if self.op.reserved_lvs is not None:
994
      self.cluster.reserved_lvs = self.op.reserved_lvs
995

    
996
    if self.op.use_external_mip_script is not None:
997
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
998

    
999
    def helper_os(aname, mods, desc):
1000
      desc += " OS list"
1001
      lst = getattr(self.cluster, aname)
1002
      for key, val in mods:
1003
        if key == constants.DDM_ADD:
1004
          if val in lst:
1005
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1006
          else:
1007
            lst.append(val)
1008
        elif key == constants.DDM_REMOVE:
1009
          if val in lst:
1010
            lst.remove(val)
1011
          else:
1012
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1013
        else:
1014
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1015

    
1016
    if self.op.hidden_os:
1017
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1018

    
1019
    if self.op.blacklisted_os:
1020
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1021

    
1022
    if self.op.master_netdev:
1023
      master_params = self.cfg.GetMasterNetworkParameters()
1024
      ems = self.cfg.GetUseExternalMipScript()
1025
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1026
                  self.cluster.master_netdev)
1027
      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1028
                                                       master_params, ems)
1029
      if not self.op.force:
1030
        result.Raise("Could not disable the master ip")
1031
      else:
1032
        if result.fail_msg:
1033
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1034
                 result.fail_msg)
1035
          feedback_fn(msg)
1036
      feedback_fn("Changing master_netdev from %s to %s" %
1037
                  (master_params.netdev, self.op.master_netdev))
1038
      self.cluster.master_netdev = self.op.master_netdev
1039

    
1040
    if self.op.master_netmask:
1041
      master_params = self.cfg.GetMasterNetworkParameters()
1042
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1043
      result = self.rpc.call_node_change_master_netmask(master_params.name,
1044
                                                        master_params.netmask,
1045
                                                        self.op.master_netmask,
1046
                                                        master_params.ip,
1047
                                                        master_params.netdev)
1048
      if result.fail_msg:
1049
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
1050
        feedback_fn(msg)
1051

    
1052
      self.cluster.master_netmask = self.op.master_netmask
1053

    
1054
    self.cfg.Update(self.cluster, feedback_fn)
1055

    
1056
    if self.op.master_netdev:
1057
      master_params = self.cfg.GetMasterNetworkParameters()
1058
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1059
                  self.op.master_netdev)
1060
      ems = self.cfg.GetUseExternalMipScript()
1061
      result = self.rpc.call_node_activate_master_ip(master_params.name,
1062
                                                     master_params, ems)
1063
      if result.fail_msg:
1064
        self.LogWarning("Could not re-enable the master ip on"
1065
                        " the master, please restart manually: %s",
1066
                        result.fail_msg)
1067

    
1068

    
1069
class LUClusterVerify(NoHooksLU):
1070
  """Submits all jobs necessary to verify the cluster.
1071

1072
  """
1073
  REQ_BGL = False
1074

    
1075
  def ExpandNames(self):
1076
    self.needed_locks = {}
1077

    
1078
  def Exec(self, feedback_fn):
1079
    jobs = []
1080

    
1081
    if self.op.group_name:
1082
      groups = [self.op.group_name]
1083
      depends_fn = lambda: None
1084
    else:
1085
      groups = self.cfg.GetNodeGroupList()
1086

    
1087
      # Verify global configuration
1088
      jobs.append([
1089
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1090
        ])
1091

    
1092
      # Always depend on global verification
1093
      depends_fn = lambda: [(-len(jobs), [])]
1094

    
1095
    jobs.extend(
1096
      [opcodes.OpClusterVerifyGroup(group_name=group,
1097
                                    ignore_errors=self.op.ignore_errors,
1098
                                    depends=depends_fn())]
1099
      for group in groups)
1100

    
1101
    # Fix up all parameters
1102
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1103
      op.debug_simulate_errors = self.op.debug_simulate_errors
1104
      op.verbose = self.op.verbose
1105
      op.error_codes = self.op.error_codes
1106
      try:
1107
        op.skip_checks = self.op.skip_checks
1108
      except AttributeError:
1109
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1110

    
1111
    return ResultWithJobs(jobs)
1112

    
1113

    
1114
class _VerifyErrors(object):
1115
  """Mix-in for cluster/group verify LUs.
1116

1117
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1118
  self.op and self._feedback_fn to be available.)
1119

1120
  """
1121

    
1122
  ETYPE_FIELD = "code"
1123
  ETYPE_ERROR = "ERROR"
1124
  ETYPE_WARNING = "WARNING"
1125

    
1126
  def _Error(self, ecode, item, msg, *args, **kwargs):
1127
    """Format an error message.
1128

1129
    Based on the opcode's error_codes parameter, either format a
1130
    parseable error code, or a simpler error string.
1131

1132
    This must be called only from Exec and functions called from Exec.
1133

1134
    """
1135
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1136
    itype, etxt, _ = ecode
1137
    # If the error code is in the list of ignored errors, demote the error to a
1138
    # warning
1139
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1140
      ltype = self.ETYPE_WARNING
1141
    # first complete the msg
1142
    if args:
1143
      msg = msg % args
1144
    # then format the whole message
1145
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1146
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1147
    else:
1148
      if item:
1149
        item = " " + item
1150
      else:
1151
        item = ""
1152
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1153
    # and finally report it via the feedback_fn
1154
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1155
    # do not mark the operation as failed for WARN cases only
1156
    if ltype == self.ETYPE_ERROR:
1157
      self.bad = True
1158

    
1159
  def _ErrorIf(self, cond, *args, **kwargs):
1160
    """Log an error message if the passed condition is True.
1161

1162
    """
1163
    if (bool(cond)
1164
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1165
      self._Error(*args, **kwargs)
1166

    
1167

    
1168
def _VerifyCertificate(filename):
1169
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1170

1171
  @type filename: string
1172
  @param filename: Path to PEM file
1173

1174
  """
1175
  try:
1176
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1177
                                           utils.ReadFile(filename))
1178
  except Exception, err: # pylint: disable=W0703
1179
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1180
            "Failed to load X509 certificate %s: %s" % (filename, err))
1181

    
1182
  (errcode, msg) = \
1183
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1184
                                constants.SSL_CERT_EXPIRATION_ERROR)
1185

    
1186
  if msg:
1187
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1188
  else:
1189
    fnamemsg = None
1190

    
1191
  if errcode is None:
1192
    return (None, fnamemsg)
1193
  elif errcode == utils.CERT_WARNING:
1194
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1195
  elif errcode == utils.CERT_ERROR:
1196
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1197

    
1198
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1199

    
1200

    
1201
def _GetAllHypervisorParameters(cluster, instances):
1202
  """Compute the set of all hypervisor parameters.
1203

1204
  @type cluster: L{objects.Cluster}
1205
  @param cluster: the cluster object
1206
  @param instances: list of L{objects.Instance}
1207
  @param instances: additional instances from which to obtain parameters
1208
  @rtype: list of (origin, hypervisor, parameters)
1209
  @return: a list with all parameters found, indicating the hypervisor they
1210
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1211

1212
  """
1213
  hvp_data = []
1214

    
1215
  for hv_name in cluster.enabled_hypervisors:
1216
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1217

    
1218
  for os_name, os_hvp in cluster.os_hvp.items():
1219
    for hv_name, hv_params in os_hvp.items():
1220
      if hv_params:
1221
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1222
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1223

    
1224
  # TODO: collapse identical parameter values in a single one
1225
  for instance in instances:
1226
    if instance.hvparams:
1227
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1228
                       cluster.FillHV(instance)))
1229

    
1230
  return hvp_data
1231

    
1232

    
1233
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1234
  """Verifies the cluster config.
1235

1236
  """
1237
  REQ_BGL = False
1238

    
1239
  def _VerifyHVP(self, hvp_data):
1240
    """Verifies locally the syntax of the hypervisor parameters.
1241

1242
    """
1243
    for item, hv_name, hv_params in hvp_data:
1244
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1245
             (item, hv_name))
1246
      try:
1247
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1248
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1249
        hv_class.CheckParameterSyntax(hv_params)
1250
      except errors.GenericError, err:
1251
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1252

    
1253
  def ExpandNames(self):
1254
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1255
    self.share_locks = ShareAll()
1256

    
1257
  def CheckPrereq(self):
1258
    """Check prerequisites.
1259

1260
    """
1261
    # Retrieve all information
1262
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1263
    self.all_node_info = self.cfg.GetAllNodesInfo()
1264
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1265

    
1266
  def Exec(self, feedback_fn):
1267
    """Verify integrity of cluster, performing various test on nodes.
1268

1269
    """
1270
    self.bad = False
1271
    self._feedback_fn = feedback_fn
1272

    
1273
    feedback_fn("* Verifying cluster config")
1274

    
1275
    for msg in self.cfg.VerifyConfig():
1276
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1277

    
1278
    feedback_fn("* Verifying cluster certificate files")
1279

    
1280
    for cert_filename in pathutils.ALL_CERT_FILES:
1281
      (errcode, msg) = _VerifyCertificate(cert_filename)
1282
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1283

    
1284
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1285
                                    pathutils.NODED_CERT_FILE),
1286
                  constants.CV_ECLUSTERCERT,
1287
                  None,
1288
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1289
                    constants.LUXID_USER + " user")
1290

    
1291
    feedback_fn("* Verifying hypervisor parameters")
1292

    
1293
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1294
                                                self.all_inst_info.values()))
1295

    
1296
    feedback_fn("* Verifying all nodes belong to an existing group")
1297

    
1298
    # We do this verification here because, should this bogus circumstance
1299
    # occur, it would never be caught by VerifyGroup, which only acts on
1300
    # nodes/instances reachable from existing node groups.
1301

    
1302
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1303
                         if node.group not in self.all_group_info)
1304

    
1305
    dangling_instances = {}
1306
    no_node_instances = []
1307

    
1308
    for inst in self.all_inst_info.values():
1309
      if inst.primary_node in dangling_nodes:
1310
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1311
      elif inst.primary_node not in self.all_node_info:
1312
        no_node_instances.append(inst.name)
1313

    
1314
    pretty_dangling = [
1315
        "%s (%s)" %
1316
        (node.name,
1317
         utils.CommaJoin(dangling_instances.get(node.name,
1318
                                                ["no instances"])))
1319
        for node in dangling_nodes]
1320

    
1321
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1322
                  None,
1323
                  "the following nodes (and their instances) belong to a non"
1324
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1325

    
1326
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1327
                  None,
1328
                  "the following instances have a non-existing primary-node:"
1329
                  " %s", utils.CommaJoin(no_node_instances))
1330

    
1331
    return not self.bad
1332

    
1333

    
1334
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1335
  """Verifies the status of a node group.
1336

1337
  """
1338
  HPATH = "cluster-verify"
1339
  HTYPE = constants.HTYPE_CLUSTER
1340
  REQ_BGL = False
1341

    
1342
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1343

    
1344
  class NodeImage(object):
1345
    """A class representing the logical and physical status of a node.
1346

1347
    @type name: string
1348
    @ivar name: the node name to which this object refers
1349
    @ivar volumes: a structure as returned from
1350
        L{ganeti.backend.GetVolumeList} (runtime)
1351
    @ivar instances: a list of running instances (runtime)
1352
    @ivar pinst: list of configured primary instances (config)
1353
    @ivar sinst: list of configured secondary instances (config)
1354
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1355
        instances for which this node is secondary (config)
1356
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1357
    @ivar dfree: free disk, as reported by the node (runtime)
1358
    @ivar offline: the offline status (config)
1359
    @type rpc_fail: boolean
1360
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1361
        not whether the individual keys were correct) (runtime)
1362
    @type lvm_fail: boolean
1363
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1364
    @type hyp_fail: boolean
1365
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1366
    @type ghost: boolean
1367
    @ivar ghost: whether this is a known node or not (config)
1368
    @type os_fail: boolean
1369
    @ivar os_fail: whether the RPC call didn't return valid OS data
1370
    @type oslist: list
1371
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1372
    @type vm_capable: boolean
1373
    @ivar vm_capable: whether the node can host instances
1374
    @type pv_min: float
1375
    @ivar pv_min: size in MiB of the smallest PVs
1376
    @type pv_max: float
1377
    @ivar pv_max: size in MiB of the biggest PVs
1378

1379
    """
1380
    def __init__(self, offline=False, name=None, vm_capable=True):
1381
      self.name = name
1382
      self.volumes = {}
1383
      self.instances = []
1384
      self.pinst = []
1385
      self.sinst = []
1386
      self.sbp = {}
1387
      self.mfree = 0
1388
      self.dfree = 0
1389
      self.offline = offline
1390
      self.vm_capable = vm_capable
1391
      self.rpc_fail = False
1392
      self.lvm_fail = False
1393
      self.hyp_fail = False
1394
      self.ghost = False
1395
      self.os_fail = False
1396
      self.oslist = {}
1397
      self.pv_min = None
1398
      self.pv_max = None
1399

    
1400
  def ExpandNames(self):
1401
    # This raises errors.OpPrereqError on its own:
1402
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1403

    
1404
    # Get instances in node group; this is unsafe and needs verification later
1405
    inst_names = \
1406
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1407

    
1408
    self.needed_locks = {
1409
      locking.LEVEL_INSTANCE: inst_names,
1410
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1411
      locking.LEVEL_NODE: [],
1412

    
1413
      # This opcode is run by watcher every five minutes and acquires all nodes
1414
      # for a group. It doesn't run for a long time, so it's better to acquire
1415
      # the node allocation lock as well.
1416
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1417
      }
1418

    
1419
    self.share_locks = ShareAll()
1420

    
1421
  def DeclareLocks(self, level):
1422
    if level == locking.LEVEL_NODE:
1423
      # Get members of node group; this is unsafe and needs verification later
1424
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1425

    
1426
      all_inst_info = self.cfg.GetAllInstancesInfo()
1427

    
1428
      # In Exec(), we warn about mirrored instances that have primary and
1429
      # secondary living in separate node groups. To fully verify that
1430
      # volumes for these instances are healthy, we will need to do an
1431
      # extra call to their secondaries. We ensure here those nodes will
1432
      # be locked.
1433
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1434
        # Important: access only the instances whose lock is owned
1435
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1436
          nodes.update(all_inst_info[inst].secondary_nodes)
1437

    
1438
      self.needed_locks[locking.LEVEL_NODE] = nodes
1439

    
1440
  def CheckPrereq(self):
1441
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1442
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1443

    
1444
    group_nodes = set(self.group_info.members)
1445
    group_instances = \
1446
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1447

    
1448
    unlocked_nodes = \
1449
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1450

    
1451
    unlocked_instances = \
1452
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1453

    
1454
    if unlocked_nodes:
1455
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1456
                                 utils.CommaJoin(unlocked_nodes),
1457
                                 errors.ECODE_STATE)
1458

    
1459
    if unlocked_instances:
1460
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1461
                                 utils.CommaJoin(unlocked_instances),
1462
                                 errors.ECODE_STATE)
1463

    
1464
    self.all_node_info = self.cfg.GetAllNodesInfo()
1465
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1466

    
1467
    self.my_node_names = utils.NiceSort(group_nodes)
1468
    self.my_inst_names = utils.NiceSort(group_instances)
1469

    
1470
    self.my_node_info = dict((name, self.all_node_info[name])
1471
                             for name in self.my_node_names)
1472

    
1473
    self.my_inst_info = dict((name, self.all_inst_info[name])
1474
                             for name in self.my_inst_names)
1475

    
1476
    # We detect here the nodes that will need the extra RPC calls for verifying
1477
    # split LV volumes; they should be locked.
1478
    extra_lv_nodes = set()
1479

    
1480
    for inst in self.my_inst_info.values():
1481
      if inst.disk_template in constants.DTS_INT_MIRROR:
1482
        for nname in inst.all_nodes:
1483
          if self.all_node_info[nname].group != self.group_uuid:
1484
            extra_lv_nodes.add(nname)
1485

    
1486
    unlocked_lv_nodes = \
1487
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1488

    
1489
    if unlocked_lv_nodes:
1490
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1491
                                 utils.CommaJoin(unlocked_lv_nodes),
1492
                                 errors.ECODE_STATE)
1493
    self.extra_lv_nodes = list(extra_lv_nodes)
1494

    
1495
  def _VerifyNode(self, ninfo, nresult):
1496
    """Perform some basic validation on data returned from a node.
1497

1498
      - check the result data structure is well formed and has all the
1499
        mandatory fields
1500
      - check ganeti version
1501

1502
    @type ninfo: L{objects.Node}
1503
    @param ninfo: the node to check
1504
    @param nresult: the results from the node
1505
    @rtype: boolean
1506
    @return: whether overall this call was successful (and we can expect
1507
         reasonable values in the respose)
1508

1509
    """
1510
    node = ninfo.name
1511
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1512

    
1513
    # main result, nresult should be a non-empty dict
1514
    test = not nresult or not isinstance(nresult, dict)
1515
    _ErrorIf(test, constants.CV_ENODERPC, node,
1516
                  "unable to verify node: no data returned")
1517
    if test:
1518
      return False
1519

    
1520
    # compares ganeti version
1521
    local_version = constants.PROTOCOL_VERSION
1522
    remote_version = nresult.get("version", None)
1523
    test = not (remote_version and
1524
                isinstance(remote_version, (list, tuple)) and
1525
                len(remote_version) == 2)
1526
    _ErrorIf(test, constants.CV_ENODERPC, node,
1527
             "connection to node returned invalid data")
1528
    if test:
1529
      return False
1530

    
1531
    test = local_version != remote_version[0]
1532
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1533
             "incompatible protocol versions: master %s,"
1534
             " node %s", local_version, remote_version[0])
1535
    if test:
1536
      return False
1537

    
1538
    # node seems compatible, we can actually try to look into its results
1539

    
1540
    # full package version
1541
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1542
                  constants.CV_ENODEVERSION, node,
1543
                  "software version mismatch: master %s, node %s",
1544
                  constants.RELEASE_VERSION, remote_version[1],
1545
                  code=self.ETYPE_WARNING)
1546

    
1547
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1548
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1549
      for hv_name, hv_result in hyp_result.iteritems():
1550
        test = hv_result is not None
1551
        _ErrorIf(test, constants.CV_ENODEHV, node,
1552
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1553

    
1554
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1555
    if ninfo.vm_capable and isinstance(hvp_result, list):
1556
      for item, hv_name, hv_result in hvp_result:
1557
        _ErrorIf(True, constants.CV_ENODEHV, node,
1558
                 "hypervisor %s parameter verify failure (source %s): %s",
1559
                 hv_name, item, hv_result)
1560

    
1561
    test = nresult.get(constants.NV_NODESETUP,
1562
                       ["Missing NODESETUP results"])
1563
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1564
             "; ".join(test))
1565

    
1566
    return True
1567

    
1568
  def _VerifyNodeTime(self, ninfo, nresult,
1569
                      nvinfo_starttime, nvinfo_endtime):
1570
    """Check the node time.
1571

1572
    @type ninfo: L{objects.Node}
1573
    @param ninfo: the node to check
1574
    @param nresult: the remote results for the node
1575
    @param nvinfo_starttime: the start time of the RPC call
1576
    @param nvinfo_endtime: the end time of the RPC call
1577

1578
    """
1579
    node = ninfo.name
1580
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1581

    
1582
    ntime = nresult.get(constants.NV_TIME, None)
1583
    try:
1584
      ntime_merged = utils.MergeTime(ntime)
1585
    except (ValueError, TypeError):
1586
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1587
      return
1588

    
1589
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1590
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1591
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1592
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1593
    else:
1594
      ntime_diff = None
1595

    
1596
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1597
             "Node time diverges by at least %s from master node time",
1598
             ntime_diff)
1599

    
1600
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1601
    """Check the node LVM results and update info for cross-node checks.
1602

1603
    @type ninfo: L{objects.Node}
1604
    @param ninfo: the node to check
1605
    @param nresult: the remote results for the node
1606
    @param vg_name: the configured VG name
1607
    @type nimg: L{NodeImage}
1608
    @param nimg: node image
1609

1610
    """
1611
    if vg_name is None:
1612
      return
1613

    
1614
    node = ninfo.name
1615
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1616

    
1617
    # checks vg existence and size > 20G
1618
    vglist = nresult.get(constants.NV_VGLIST, None)
1619
    test = not vglist
1620
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1621
    if not test:
1622
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1623
                                            constants.MIN_VG_SIZE)
1624
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1625

    
1626
    # Check PVs
1627
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1628
    for em in errmsgs:
1629
      self._Error(constants.CV_ENODELVM, node, em)
1630
    if pvminmax is not None:
1631
      (nimg.pv_min, nimg.pv_max) = pvminmax
1632

    
1633
  def _VerifyGroupLVM(self, node_image, vg_name):
1634
    """Check cross-node consistency in LVM.
1635

1636
    @type node_image: dict
1637
    @param node_image: info about nodes, mapping from node to names to
1638
      L{NodeImage} objects
1639
    @param vg_name: the configured VG name
1640

1641
    """
1642
    if vg_name is None:
1643
      return
1644

    
1645
    # Only exlcusive storage needs this kind of checks
1646
    if not self._exclusive_storage:
1647
      return
1648

    
1649
    # exclusive_storage wants all PVs to have the same size (approximately),
1650
    # if the smallest and the biggest ones are okay, everything is fine.
1651
    # pv_min is None iff pv_max is None
1652
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1653
    if not vals:
1654
      return
1655
    (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1656
    (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1657
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1658
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1659
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1660
                  " on %s, biggest (%s MB) is on %s",
1661
                  pvmin, minnode, pvmax, maxnode)
1662

    
1663
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1664
    """Check the node bridges.
1665

1666
    @type ninfo: L{objects.Node}
1667
    @param ninfo: the node to check
1668
    @param nresult: the remote results for the node
1669
    @param bridges: the expected list of bridges
1670

1671
    """
1672
    if not bridges:
1673
      return
1674

    
1675
    node = ninfo.name
1676
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1677

    
1678
    missing = nresult.get(constants.NV_BRIDGES, None)
1679
    test = not isinstance(missing, list)
1680
    _ErrorIf(test, constants.CV_ENODENET, node,
1681
             "did not return valid bridge information")
1682
    if not test:
1683
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1684
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1685

    
1686
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1687
    """Check the results of user scripts presence and executability on the node
1688

1689
    @type ninfo: L{objects.Node}
1690
    @param ninfo: the node to check
1691
    @param nresult: the remote results for the node
1692

1693
    """
1694
    node = ninfo.name
1695

    
1696
    test = not constants.NV_USERSCRIPTS in nresult
1697
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1698
                  "did not return user scripts information")
1699

    
1700
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1701
    if not test:
1702
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1703
                    "user scripts not present or not executable: %s" %
1704
                    utils.CommaJoin(sorted(broken_scripts)))
1705

    
1706
  def _VerifyNodeNetwork(self, ninfo, nresult):
1707
    """Check the node network connectivity results.
1708

1709
    @type ninfo: L{objects.Node}
1710
    @param ninfo: the node to check
1711
    @param nresult: the remote results for the node
1712

1713
    """
1714
    node = ninfo.name
1715
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1716

    
1717
    test = constants.NV_NODELIST not in nresult
1718
    _ErrorIf(test, constants.CV_ENODESSH, node,
1719
             "node hasn't returned node ssh connectivity data")
1720
    if not test:
1721
      if nresult[constants.NV_NODELIST]:
1722
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1723
          _ErrorIf(True, constants.CV_ENODESSH, node,
1724
                   "ssh communication with node '%s': %s", a_node, a_msg)
1725

    
1726
    test = constants.NV_NODENETTEST not in nresult
1727
    _ErrorIf(test, constants.CV_ENODENET, node,
1728
             "node hasn't returned node tcp connectivity data")
1729
    if not test:
1730
      if nresult[constants.NV_NODENETTEST]:
1731
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1732
        for anode in nlist:
1733
          _ErrorIf(True, constants.CV_ENODENET, node,
1734
                   "tcp communication with node '%s': %s",
1735
                   anode, nresult[constants.NV_NODENETTEST][anode])
1736

    
1737
    test = constants.NV_MASTERIP not in nresult
1738
    _ErrorIf(test, constants.CV_ENODENET, node,
1739
             "node hasn't returned node master IP reachability data")
1740
    if not test:
1741
      if not nresult[constants.NV_MASTERIP]:
1742
        if node == self.master_node:
1743
          msg = "the master node cannot reach the master IP (not configured?)"
1744
        else:
1745
          msg = "cannot reach the master IP"
1746
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
1747

    
1748
  def _VerifyInstance(self, instance, inst_config, node_image,
1749
                      diskstatus):
1750
    """Verify an instance.
1751

1752
    This function checks to see if the required block devices are
1753
    available on the instance's node, and that the nodes are in the correct
1754
    state.
1755

1756
    """
1757
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1758
    pnode = inst_config.primary_node
1759
    pnode_img = node_image[pnode]
1760
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1761

    
1762
    node_vol_should = {}
1763
    inst_config.MapLVsByNode(node_vol_should)
1764

    
1765
    cluster = self.cfg.GetClusterInfo()
1766
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1767
                                                            self.group_info)
1768
    err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1769
    _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1770
             code=self.ETYPE_WARNING)
1771

    
1772
    for node in node_vol_should:
1773
      n_img = node_image[node]
1774
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1775
        # ignore missing volumes on offline or broken nodes
1776
        continue
1777
      for volume in node_vol_should[node]:
1778
        test = volume not in n_img.volumes
1779
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1780
                 "volume %s missing on node %s", volume, node)
1781

    
1782
    if inst_config.admin_state == constants.ADMINST_UP:
1783
      test = instance not in pnode_img.instances and not pnode_img.offline
1784
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1785
               "instance not running on its primary node %s",
1786
               pnode)
1787
      _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1788
               "instance is marked as running and lives on offline node %s",
1789
               pnode)
1790

    
1791
    diskdata = [(nname, success, status, idx)
1792
                for (nname, disks) in diskstatus.items()
1793
                for idx, (success, status) in enumerate(disks)]
1794

    
1795
    for nname, success, bdev_status, idx in diskdata:
1796
      # the 'ghost node' construction in Exec() ensures that we have a
1797
      # node here
1798
      snode = node_image[nname]
1799
      bad_snode = snode.ghost or snode.offline
1800
      _ErrorIf(inst_config.disks_active and
1801
               not success and not bad_snode,
1802
               constants.CV_EINSTANCEFAULTYDISK, instance,
1803
               "couldn't retrieve status for disk/%s on %s: %s",
1804
               idx, nname, bdev_status)
1805
      _ErrorIf((inst_config.disks_active and
1806
                success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1807
               constants.CV_EINSTANCEFAULTYDISK, instance,
1808
               "disk/%s on %s is faulty", idx, nname)
1809

    
1810
    _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1811
             constants.CV_ENODERPC, pnode, "instance %s, connection to"
1812
             " primary node failed", instance)
1813

    
1814
    _ErrorIf(len(inst_config.secondary_nodes) > 1,
1815
             constants.CV_EINSTANCELAYOUT,
1816
             instance, "instance has multiple secondary nodes: %s",
1817
             utils.CommaJoin(inst_config.secondary_nodes),
1818
             code=self.ETYPE_WARNING)
1819

    
1820
    if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1821
      # Disk template not compatible with exclusive_storage: no instance
1822
      # node should have the flag set
1823
      es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1824
                                                     inst_config.all_nodes)
1825
      es_nodes = [n for (n, es) in es_flags.items()
1826
                  if es]
1827
      _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance,
1828
               "instance has template %s, which is not supported on nodes"
1829
               " that have exclusive storage set: %s",
1830
               inst_config.disk_template, utils.CommaJoin(es_nodes))
1831

    
1832
    if inst_config.disk_template in constants.DTS_INT_MIRROR:
1833
      instance_nodes = utils.NiceSort(inst_config.all_nodes)
1834
      instance_groups = {}
1835

    
1836
      for node in instance_nodes:
1837
        instance_groups.setdefault(self.all_node_info[node].group,
1838
                                   []).append(node)
1839

    
1840
      pretty_list = [
1841
        "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1842
        # Sort so that we always list the primary node first.
1843
        for group, nodes in sorted(instance_groups.items(),
1844
                                   key=lambda (_, nodes): pnode in nodes,
1845
                                   reverse=True)]
1846

    
1847
      self._ErrorIf(len(instance_groups) > 1,
1848
                    constants.CV_EINSTANCESPLITGROUPS,
1849
                    instance, "instance has primary and secondary nodes in"
1850
                    " different groups: %s", utils.CommaJoin(pretty_list),
1851
                    code=self.ETYPE_WARNING)
1852

    
1853
    inst_nodes_offline = []
1854
    for snode in inst_config.secondary_nodes:
1855
      s_img = node_image[snode]
1856
      _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1857
               snode, "instance %s, connection to secondary node failed",
1858
               instance)
1859

    
1860
      if s_img.offline:
1861
        inst_nodes_offline.append(snode)
1862

    
1863
    # warn that the instance lives on offline nodes
1864
    _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1865
             "instance has offline secondary node(s) %s",
1866
             utils.CommaJoin(inst_nodes_offline))
1867
    # ... or ghost/non-vm_capable nodes
1868
    for node in inst_config.all_nodes:
1869
      _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1870
               instance, "instance lives on ghost node %s", node)
1871
      _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1872
               instance, "instance lives on non-vm_capable node %s", node)
1873

    
1874
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1875
    """Verify if there are any unknown volumes in the cluster.
1876

1877
    The .os, .swap and backup volumes are ignored. All other volumes are
1878
    reported as unknown.
1879

1880
    @type reserved: L{ganeti.utils.FieldSet}
1881
    @param reserved: a FieldSet of reserved volume names
1882

1883
    """
1884
    for node, n_img in node_image.items():
1885
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1886
          self.all_node_info[node].group != self.group_uuid):
1887
        # skip non-healthy nodes
1888
        continue
1889
      for volume in n_img.volumes:
1890
        test = ((node not in node_vol_should or
1891
                volume not in node_vol_should[node]) and
1892
                not reserved.Matches(volume))
1893
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1894
                      "volume %s is unknown", volume)
1895

    
1896
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1897
    """Verify N+1 Memory Resilience.
1898

1899
    Check that if one single node dies we can still start all the
1900
    instances it was primary for.
1901

1902
    """
1903
    cluster_info = self.cfg.GetClusterInfo()
1904
    for node, n_img in node_image.items():
1905
      # This code checks that every node which is now listed as
1906
      # secondary has enough memory to host all instances it is
1907
      # supposed to should a single other node in the cluster fail.
1908
      # FIXME: not ready for failover to an arbitrary node
1909
      # FIXME: does not support file-backed instances
1910
      # WARNING: we currently take into account down instances as well
1911
      # as up ones, considering that even if they're down someone
1912
      # might want to start them even in the event of a node failure.
1913
      if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1914
        # we're skipping nodes marked offline and nodes in other groups from
1915
        # the N+1 warning, since most likely we don't have good memory
1916
        # infromation from them; we already list instances living on such
1917
        # nodes, and that's enough warning
1918
        continue
1919
      #TODO(dynmem): also consider ballooning out other instances
1920
      for prinode, instances in n_img.sbp.items():
1921
        needed_mem = 0
1922
        for instance in instances:
1923
          bep = cluster_info.FillBE(instance_cfg[instance])
1924
          if bep[constants.BE_AUTO_BALANCE]:
1925
            needed_mem += bep[constants.BE_MINMEM]
1926
        test = n_img.mfree < needed_mem
1927
        self._ErrorIf(test, constants.CV_ENODEN1, node,
1928
                      "not enough memory to accomodate instance failovers"
1929
                      " should node %s fail (%dMiB needed, %dMiB available)",
1930
                      prinode, needed_mem, n_img.mfree)
1931

    
1932
  @classmethod
1933
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
1934
                   (files_all, files_opt, files_mc, files_vm)):
1935
    """Verifies file checksums collected from all nodes.
1936

1937
    @param errorif: Callback for reporting errors
1938
    @param nodeinfo: List of L{objects.Node} objects
1939
    @param master_node: Name of master node
1940
    @param all_nvinfo: RPC results
1941

1942
    """
1943
    # Define functions determining which nodes to consider for a file
1944
    files2nodefn = [
1945
      (files_all, None),
1946
      (files_mc, lambda node: (node.master_candidate or
1947
                               node.name == master_node)),
1948
      (files_vm, lambda node: node.vm_capable),
1949
      ]
1950

    
1951
    # Build mapping from filename to list of nodes which should have the file
1952
    nodefiles = {}
1953
    for (files, fn) in files2nodefn:
1954
      if fn is None:
1955
        filenodes = nodeinfo
1956
      else:
1957
        filenodes = filter(fn, nodeinfo)
1958
      nodefiles.update((filename,
1959
                        frozenset(map(operator.attrgetter("name"), filenodes)))
1960
                       for filename in files)
1961

    
1962
    assert set(nodefiles) == (files_all | files_mc | files_vm)
1963

    
1964
    fileinfo = dict((filename, {}) for filename in nodefiles)
1965
    ignore_nodes = set()
1966

    
1967
    for node in nodeinfo:
1968
      if node.offline:
1969
        ignore_nodes.add(node.name)
1970
        continue
1971

    
1972
      nresult = all_nvinfo[node.name]
1973

    
1974
      if nresult.fail_msg or not nresult.payload:
1975
        node_files = None
1976
      else:
1977
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
1978
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
1979
                          for (key, value) in fingerprints.items())
1980
        del fingerprints
1981

    
1982
      test = not (node_files and isinstance(node_files, dict))
1983
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
1984
              "Node did not return file checksum data")
1985
      if test:
1986
        ignore_nodes.add(node.name)
1987
        continue
1988

    
1989
      # Build per-checksum mapping from filename to nodes having it
1990
      for (filename, checksum) in node_files.items():
1991
        assert filename in nodefiles
1992
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
1993

    
1994
    for (filename, checksums) in fileinfo.items():
1995
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
1996

    
1997
      # Nodes having the file
1998
      with_file = frozenset(node_name
1999
                            for nodes in fileinfo[filename].values()
2000
                            for node_name in nodes) - ignore_nodes
2001

    
2002
      expected_nodes = nodefiles[filename] - ignore_nodes
2003

    
2004
      # Nodes missing file
2005
      missing_file = expected_nodes - with_file
2006

    
2007
      if filename in files_opt:
2008
        # All or no nodes
2009
        errorif(missing_file and missing_file != expected_nodes,
2010
                constants.CV_ECLUSTERFILECHECK, None,
2011
                "File %s is optional, but it must exist on all or no"
2012
                " nodes (not found on %s)",
2013
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2014
      else:
2015
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2016
                "File %s is missing from node(s) %s", filename,
2017
                utils.CommaJoin(utils.NiceSort(missing_file)))
2018

    
2019
        # Warn if a node has a file it shouldn't
2020
        unexpected = with_file - expected_nodes
2021
        errorif(unexpected,
2022
                constants.CV_ECLUSTERFILECHECK, None,
2023
                "File %s should not exist on node(s) %s",
2024
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2025

    
2026
      # See if there are multiple versions of the file
2027
      test = len(checksums) > 1
2028
      if test:
2029
        variants = ["variant %s on %s" %
2030
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2031
                    for (idx, (checksum, nodes)) in
2032
                      enumerate(sorted(checksums.items()))]
2033
      else:
2034
        variants = []
2035

    
2036
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2037
              "File %s found with %s different checksums (%s)",
2038
              filename, len(checksums), "; ".join(variants))
2039

    
2040
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2041
                      drbd_map):
2042
    """Verifies and the node DRBD status.
2043

2044
    @type ninfo: L{objects.Node}
2045
    @param ninfo: the node to check
2046
    @param nresult: the remote results for the node
2047
    @param instanceinfo: the dict of instances
2048
    @param drbd_helper: the configured DRBD usermode helper
2049
    @param drbd_map: the DRBD map as returned by
2050
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2051

2052
    """
2053
    node = ninfo.name
2054
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2055

    
2056
    if drbd_helper:
2057
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2058
      test = (helper_result is None)
2059
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2060
               "no drbd usermode helper returned")
2061
      if helper_result:
2062
        status, payload = helper_result
2063
        test = not status
2064
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2065
                 "drbd usermode helper check unsuccessful: %s", payload)
2066
        test = status and (payload != drbd_helper)
2067
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2068
                 "wrong drbd usermode helper: %s", payload)
2069

    
2070
    # compute the DRBD minors
2071
    node_drbd = {}
2072
    for minor, instance in drbd_map[node].items():
2073
      test = instance not in instanceinfo
2074
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2075
               "ghost instance '%s' in temporary DRBD map", instance)
2076
        # ghost instance should not be running, but otherwise we
2077
        # don't give double warnings (both ghost instance and
2078
        # unallocated minor in use)
2079
      if test:
2080
        node_drbd[minor] = (instance, False)
2081
      else:
2082
        instance = instanceinfo[instance]
2083
        node_drbd[minor] = (instance.name, instance.disks_active)
2084

    
2085
    # and now check them
2086
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2087
    test = not isinstance(used_minors, (tuple, list))
2088
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2089
             "cannot parse drbd status file: %s", str(used_minors))
2090
    if test:
2091
      # we cannot check drbd status
2092
      return
2093

    
2094
    for minor, (iname, must_exist) in node_drbd.items():
2095
      test = minor not in used_minors and must_exist
2096
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2097
               "drbd minor %d of instance %s is not active", minor, iname)
2098
    for minor in used_minors:
2099
      test = minor not in node_drbd
2100
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2101
               "unallocated drbd minor %d is in use", minor)
2102

    
2103
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2104
    """Builds the node OS structures.
2105

2106
    @type ninfo: L{objects.Node}
2107
    @param ninfo: the node to check
2108
    @param nresult: the remote results for the node
2109
    @param nimg: the node image object
2110

2111
    """
2112
    node = ninfo.name
2113
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2114

    
2115
    remote_os = nresult.get(constants.NV_OSLIST, None)
2116
    test = (not isinstance(remote_os, list) or
2117
            not compat.all(isinstance(v, list) and len(v) == 7
2118
                           for v in remote_os))
2119

    
2120
    _ErrorIf(test, constants.CV_ENODEOS, node,
2121
             "node hasn't returned valid OS data")
2122

    
2123
    nimg.os_fail = test
2124

    
2125
    if test:
2126
      return
2127

    
2128
    os_dict = {}
2129

    
2130
    for (name, os_path, status, diagnose,
2131
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2132

    
2133
      if name not in os_dict:
2134
        os_dict[name] = []
2135

    
2136
      # parameters is a list of lists instead of list of tuples due to
2137
      # JSON lacking a real tuple type, fix it:
2138
      parameters = [tuple(v) for v in parameters]
2139
      os_dict[name].append((os_path, status, diagnose,
2140
                            set(variants), set(parameters), set(api_ver)))
2141

    
2142
    nimg.oslist = os_dict
2143

    
2144
  def _VerifyNodeOS(self, ninfo, nimg, base):
2145
    """Verifies the node OS list.
2146

2147
    @type ninfo: L{objects.Node}
2148
    @param ninfo: the node to check
2149
    @param nimg: the node image object
2150
    @param base: the 'template' node we match against (e.g. from the master)
2151

2152
    """
2153
    node = ninfo.name
2154
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2155

    
2156
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2157

    
2158
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2159
    for os_name, os_data in nimg.oslist.items():
2160
      assert os_data, "Empty OS status for OS %s?!" % os_name
2161
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2162
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2163
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2164
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2165
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2166
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2167
      # comparisons with the 'base' image
2168
      test = os_name not in base.oslist
2169
      _ErrorIf(test, constants.CV_ENODEOS, node,
2170
               "Extra OS %s not present on reference node (%s)",
2171
               os_name, base.name)
2172
      if test:
2173
        continue
2174
      assert base.oslist[os_name], "Base node has empty OS status?"
2175
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2176
      if not b_status:
2177
        # base OS is invalid, skipping
2178
        continue
2179
      for kind, a, b in [("API version", f_api, b_api),
2180
                         ("variants list", f_var, b_var),
2181
                         ("parameters", beautify_params(f_param),
2182
                          beautify_params(b_param))]:
2183
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2184
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2185
                 kind, os_name, base.name,
2186
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2187

    
2188
    # check any missing OSes
2189
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2190
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2191
             "OSes present on reference node %s but missing on this node: %s",
2192
             base.name, utils.CommaJoin(missing))
2193

    
2194
  def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2195
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2196

2197
    @type ninfo: L{objects.Node}
2198
    @param ninfo: the node to check
2199
    @param nresult: the remote results for the node
2200
    @type is_master: bool
2201
    @param is_master: Whether node is the master node
2202

2203
    """
2204
    node = ninfo.name
2205

    
2206
    if (is_master and
2207
        (constants.ENABLE_FILE_STORAGE or
2208
         constants.ENABLE_SHARED_FILE_STORAGE)):
2209
      try:
2210
        fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2211
      except KeyError:
2212
        # This should never happen
2213
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2214
                      "Node did not return forbidden file storage paths")
2215
      else:
2216
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2217
                      "Found forbidden file storage paths: %s",
2218
                      utils.CommaJoin(fspaths))
2219
    else:
2220
      self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2221
                    constants.CV_ENODEFILESTORAGEPATHS, node,
2222
                    "Node should not have returned forbidden file storage"
2223
                    " paths")
2224

    
2225
  def _VerifyOob(self, ninfo, nresult):
2226
    """Verifies out of band functionality of a node.
2227

2228
    @type ninfo: L{objects.Node}
2229
    @param ninfo: the node to check
2230
    @param nresult: the remote results for the node
2231

2232
    """
2233
    node = ninfo.name
2234
    # We just have to verify the paths on master and/or master candidates
2235
    # as the oob helper is invoked on the master
2236
    if ((ninfo.master_candidate or ninfo.master_capable) and
2237
        constants.NV_OOB_PATHS in nresult):
2238
      for path_result in nresult[constants.NV_OOB_PATHS]:
2239
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2240

    
2241
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2242
    """Verifies and updates the node volume data.
2243

2244
    This function will update a L{NodeImage}'s internal structures
2245
    with data from the remote call.
2246

2247
    @type ninfo: L{objects.Node}
2248
    @param ninfo: the node to check
2249
    @param nresult: the remote results for the node
2250
    @param nimg: the node image object
2251
    @param vg_name: the configured VG name
2252

2253
    """
2254
    node = ninfo.name
2255
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2256

    
2257
    nimg.lvm_fail = True
2258
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2259
    if vg_name is None:
2260
      pass
2261
    elif isinstance(lvdata, basestring):
2262
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2263
               utils.SafeEncode(lvdata))
2264
    elif not isinstance(lvdata, dict):
2265
      _ErrorIf(True, constants.CV_ENODELVM, node,
2266
               "rpc call to node failed (lvlist)")
2267
    else:
2268
      nimg.volumes = lvdata
2269
      nimg.lvm_fail = False
2270

    
2271
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2272
    """Verifies and updates the node instance list.
2273

2274
    If the listing was successful, then updates this node's instance
2275
    list. Otherwise, it marks the RPC call as failed for the instance
2276
    list key.
2277

2278
    @type ninfo: L{objects.Node}
2279
    @param ninfo: the node to check
2280
    @param nresult: the remote results for the node
2281
    @param nimg: the node image object
2282

2283
    """
2284
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2285
    test = not isinstance(idata, list)
2286
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2287
                  "rpc call to node failed (instancelist): %s",
2288
                  utils.SafeEncode(str(idata)))
2289
    if test:
2290
      nimg.hyp_fail = True
2291
    else:
2292
      nimg.instances = idata
2293

    
2294
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2295
    """Verifies and computes a node information map
2296

2297
    @type ninfo: L{objects.Node}
2298
    @param ninfo: the node to check
2299
    @param nresult: the remote results for the node
2300
    @param nimg: the node image object
2301
    @param vg_name: the configured VG name
2302

2303
    """
2304
    node = ninfo.name
2305
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2306

    
2307
    # try to read free memory (from the hypervisor)
2308
    hv_info = nresult.get(constants.NV_HVINFO, None)
2309
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2310
    _ErrorIf(test, constants.CV_ENODEHV, node,
2311
             "rpc call to node failed (hvinfo)")
2312
    if not test:
2313
      try:
2314
        nimg.mfree = int(hv_info["memory_free"])
2315
      except (ValueError, TypeError):
2316
        _ErrorIf(True, constants.CV_ENODERPC, node,
2317
                 "node returned invalid nodeinfo, check hypervisor")
2318

    
2319
    # FIXME: devise a free space model for file based instances as well
2320
    if vg_name is not None:
2321
      test = (constants.NV_VGLIST not in nresult or
2322
              vg_name not in nresult[constants.NV_VGLIST])
2323
      _ErrorIf(test, constants.CV_ENODELVM, node,
2324
               "node didn't return data for the volume group '%s'"
2325
               " - it is either missing or broken", vg_name)
2326
      if not test:
2327
        try:
2328
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2329
        except (ValueError, TypeError):
2330
          _ErrorIf(True, constants.CV_ENODERPC, node,
2331
                   "node returned invalid LVM info, check LVM status")
2332

    
2333
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2334
    """Gets per-disk status information for all instances.
2335

2336
    @type nodelist: list of strings
2337
    @param nodelist: Node names
2338
    @type node_image: dict of (name, L{objects.Node})
2339
    @param node_image: Node objects
2340
    @type instanceinfo: dict of (name, L{objects.Instance})
2341
    @param instanceinfo: Instance objects
2342
    @rtype: {instance: {node: [(succes, payload)]}}
2343
    @return: a dictionary of per-instance dictionaries with nodes as
2344
        keys and disk information as values; the disk information is a
2345
        list of tuples (success, payload)
2346

2347
    """
2348
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2349

    
2350
    node_disks = {}
2351
    node_disks_devonly = {}
2352
    diskless_instances = set()
2353
    nodisk_instances = set()
2354
    diskless = constants.DT_DISKLESS
2355

    
2356
    for nname in nodelist:
2357
      node_instances = list(itertools.chain(node_image[nname].pinst,
2358
                                            node_image[nname].sinst))
2359
      diskless_instances.update(inst for inst in node_instances
2360
                                if instanceinfo[inst].disk_template == diskless)
2361
      disks = [(inst, disk)
2362
               for inst in node_instances
2363
               for disk in instanceinfo[inst].disks]
2364

    
2365
      if not disks:
2366
        nodisk_instances.update(inst for inst in node_instances
2367
                                if instanceinfo[inst].disk_template != diskless)
2368
        # No need to collect data
2369
        continue
2370

    
2371
      node_disks[nname] = disks
2372

    
2373
      # _AnnotateDiskParams makes already copies of the disks
2374
      devonly = []
2375
      for (inst, dev) in disks:
2376
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2377
        self.cfg.SetDiskID(anno_disk, nname)
2378
        devonly.append(anno_disk)
2379

    
2380
      node_disks_devonly[nname] = devonly
2381

    
2382
    assert len(node_disks) == len(node_disks_devonly)
2383

    
2384
    # Collect data from all nodes with disks
2385
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2386
                                                          node_disks_devonly)
2387

    
2388
    assert len(result) == len(node_disks)
2389

    
2390
    instdisk = {}
2391

    
2392
    for (nname, nres) in result.items():
2393
      disks = node_disks[nname]
2394

    
2395
      if nres.offline:
2396
        # No data from this node
2397
        data = len(disks) * [(False, "node offline")]
2398
      else:
2399
        msg = nres.fail_msg
2400
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2401
                 "while getting disk information: %s", msg)
2402
        if msg:
2403
          # No data from this node
2404
          data = len(disks) * [(False, msg)]
2405
        else:
2406
          data = []
2407
          for idx, i in enumerate(nres.payload):
2408
            if isinstance(i, (tuple, list)) and len(i) == 2:
2409
              data.append(i)
2410
            else:
2411
              logging.warning("Invalid result from node %s, entry %d: %s",
2412
                              nname, idx, i)
2413
              data.append((False, "Invalid result from the remote node"))
2414

    
2415
      for ((inst, _), status) in zip(disks, data):
2416
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2417

    
2418
    # Add empty entries for diskless instances.
2419
    for inst in diskless_instances:
2420
      assert inst not in instdisk
2421
      instdisk[inst] = {}
2422
    # ...and disk-full instances that happen to have no disks
2423
    for inst in nodisk_instances:
2424
      assert inst not in instdisk
2425
      instdisk[inst] = {}
2426

    
2427
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2428
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2429
                      compat.all(isinstance(s, (tuple, list)) and
2430
                                 len(s) == 2 for s in statuses)
2431
                      for inst, nnames in instdisk.items()
2432
                      for nname, statuses in nnames.items())
2433
    if __debug__:
2434
      instdisk_keys = set(instdisk)
2435
      instanceinfo_keys = set(instanceinfo)
2436
      assert instdisk_keys == instanceinfo_keys, \
2437
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2438
         (instdisk_keys, instanceinfo_keys))
2439

    
2440
    return instdisk
2441

    
2442
  @staticmethod
2443
  def _SshNodeSelector(group_uuid, all_nodes):
2444
    """Create endless iterators for all potential SSH check hosts.
2445

2446
    """
2447
    nodes = [node for node in all_nodes
2448
             if (node.group != group_uuid and
2449
                 not node.offline)]
2450
    keyfunc = operator.attrgetter("group")
2451

    
2452
    return map(itertools.cycle,
2453
               [sorted(map(operator.attrgetter("name"), names))
2454
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2455
                                                  keyfunc)])
2456

    
2457
  @classmethod
2458
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2459
    """Choose which nodes should talk to which other nodes.
2460

2461
    We will make nodes contact all nodes in their group, and one node from
2462
    every other group.
2463

2464
    @warning: This algorithm has a known issue if one node group is much
2465
      smaller than others (e.g. just one node). In such a case all other
2466
      nodes will talk to the single node.
2467

2468
    """
2469
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2470
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2471

    
2472
    return (online_nodes,
2473
            dict((name, sorted([i.next() for i in sel]))
2474
                 for name in online_nodes))
2475

    
2476
  def BuildHooksEnv(self):
2477
    """Build hooks env.
2478

2479
    Cluster-Verify hooks just ran in the post phase and their failure makes
2480
    the output be logged in the verify output and the verification to fail.
2481

2482
    """
2483
    env = {
2484
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2485
      }
2486

    
2487
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2488
               for node in self.my_node_info.values())
2489

    
2490
    return env
2491

    
2492
  def BuildHooksNodes(self):
2493
    """Build hooks nodes.
2494

2495
    """
2496
    return ([], self.my_node_names)
2497

    
2498
  def Exec(self, feedback_fn):
2499
    """Verify integrity of the node group, performing various test on nodes.
2500

2501
    """
2502
    # This method has too many local variables. pylint: disable=R0914
2503
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2504

    
2505
    if not self.my_node_names:
2506
      # empty node group
2507
      feedback_fn("* Empty node group, skipping verification")
2508
      return True
2509

    
2510
    self.bad = False
2511
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2512
    verbose = self.op.verbose
2513
    self._feedback_fn = feedback_fn
2514

    
2515
    vg_name = self.cfg.GetVGName()
2516
    drbd_helper = self.cfg.GetDRBDHelper()
2517
    cluster = self.cfg.GetClusterInfo()
2518
    hypervisors = cluster.enabled_hypervisors
2519
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2520

    
2521
    i_non_redundant = [] # Non redundant instances
2522
    i_non_a_balanced = [] # Non auto-balanced instances
2523
    i_offline = 0 # Count of offline instances
2524
    n_offline = 0 # Count of offline nodes
2525
    n_drained = 0 # Count of nodes being drained
2526
    node_vol_should = {}
2527

    
2528
    # FIXME: verify OS list
2529

    
2530
    # File verification
2531
    filemap = ComputeAncillaryFiles(cluster, False)
2532

    
2533
    # do local checksums
2534
    master_node = self.master_node = self.cfg.GetMasterNode()
2535
    master_ip = self.cfg.GetMasterIP()
2536

    
2537
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2538

    
2539
    user_scripts = []
2540
    if self.cfg.GetUseExternalMipScript():
2541
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2542

    
2543
    node_verify_param = {
2544
      constants.NV_FILELIST:
2545
        map(vcluster.MakeVirtualPath,
2546
            utils.UniqueSequence(filename
2547
                                 for files in filemap
2548
                                 for filename in files)),
2549
      constants.NV_NODELIST:
2550
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2551
                                  self.all_node_info.values()),
2552
      constants.NV_HYPERVISOR: hypervisors,
2553
      constants.NV_HVPARAMS:
2554
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2555
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2556
                                 for node in node_data_list
2557
                                 if not node.offline],
2558
      constants.NV_INSTANCELIST: hypervisors,
2559
      constants.NV_VERSION: None,
2560
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2561
      constants.NV_NODESETUP: None,
2562
      constants.NV_TIME: None,
2563
      constants.NV_MASTERIP: (master_node, master_ip),
2564
      constants.NV_OSLIST: None,
2565
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2566
      constants.NV_USERSCRIPTS: user_scripts,
2567
      }
2568

    
2569
    if vg_name is not None:
2570
      node_verify_param[constants.NV_VGLIST] = None
2571
      node_verify_param[constants.NV_LVLIST] = vg_name
2572
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2573

    
2574
    if drbd_helper:
2575
      node_verify_param[constants.NV_DRBDLIST] = None
2576
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2577

    
2578
    if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2579
      # Load file storage paths only from master node
2580
      node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2581

    
2582
    # bridge checks
2583
    # FIXME: this needs to be changed per node-group, not cluster-wide
2584
    bridges = set()
2585
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2586
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2587
      bridges.add(default_nicpp[constants.NIC_LINK])
2588
    for instance in self.my_inst_info.values():
2589
      for nic in instance.nics:
2590
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2591
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2592
          bridges.add(full_nic[constants.NIC_LINK])
2593

    
2594
    if bridges:
2595
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2596

    
2597
    # Build our expected cluster state
2598
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2599
                                                 name=node.name,
2600
                                                 vm_capable=node.vm_capable))
2601
                      for node in node_data_list)
2602

    
2603
    # Gather OOB paths
2604
    oob_paths = []
2605
    for node in self.all_node_info.values():
2606
      path = SupportsOob(self.cfg, node)
2607
      if path and path not in oob_paths:
2608
        oob_paths.append(path)
2609

    
2610
    if oob_paths:
2611
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2612

    
2613
    for instance in self.my_inst_names:
2614
      inst_config = self.my_inst_info[instance]
2615
      if inst_config.admin_state == constants.ADMINST_OFFLINE:
2616
        i_offline += 1
2617

    
2618
      for nname in inst_config.all_nodes:
2619
        if nname not in node_image:
2620
          gnode = self.NodeImage(name=nname)
2621
          gnode.ghost = (nname not in self.all_node_info)
2622
          node_image[nname] = gnode
2623

    
2624
      inst_config.MapLVsByNode(node_vol_should)
2625

    
2626
      pnode = inst_config.primary_node
2627
      node_image[pnode].pinst.append(instance)
2628

    
2629
      for snode in inst_config.secondary_nodes:
2630
        nimg = node_image[snode]
2631
        nimg.sinst.append(instance)
2632
        if pnode not in nimg.sbp:
2633
          nimg.sbp[pnode] = []
2634
        nimg.sbp[pnode].append(instance)
2635

    
2636
    es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2637
    # The value of exclusive_storage should be the same across the group, so if
2638
    # it's True for at least a node, we act as if it were set for all the nodes
2639
    self._exclusive_storage = compat.any(es_flags.values())
2640
    if self._exclusive_storage:
2641
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2642

    
2643
    # At this point, we have the in-memory data structures complete,
2644
    # except for the runtime information, which we'll gather next
2645

    
2646
    # Due to the way our RPC system works, exact response times cannot be
2647
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2648
    # time before and after executing the request, we can at least have a time
2649
    # window.
2650
    nvinfo_starttime = time.time()
2651
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2652
                                           node_verify_param,
2653
                                           self.cfg.GetClusterName())
2654
    nvinfo_endtime = time.time()
2655

    
2656
    if self.extra_lv_nodes and vg_name is not None:
2657
      extra_lv_nvinfo = \
2658
          self.rpc.call_node_verify(self.extra_lv_nodes,
2659
                                    {constants.NV_LVLIST: vg_name},
2660
                                    self.cfg.GetClusterName())
2661
    else:
2662
      extra_lv_nvinfo = {}
2663

    
2664
    all_drbd_map = self.cfg.ComputeDRBDMap()
2665

    
2666
    feedback_fn("* Gathering disk information (%s nodes)" %
2667
                len(self.my_node_names))
2668
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2669
                                     self.my_inst_info)
2670

    
2671
    feedback_fn("* Verifying configuration file consistency")
2672

    
2673
    # If not all nodes are being checked, we need to make sure the master node
2674
    # and a non-checked vm_capable node are in the list.
2675
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2676
    if absent_nodes:
2677
      vf_nvinfo = all_nvinfo.copy()
2678
      vf_node_info = list(self.my_node_info.values())
2679
      additional_nodes = []
2680
      if master_node not in self.my_node_info:
2681
        additional_nodes.append(master_node)
2682
        vf_node_info.append(self.all_node_info[master_node])
2683
      # Add the first vm_capable node we find which is not included,
2684
      # excluding the master node (which we already have)
2685
      for node in absent_nodes:
2686
        nodeinfo = self.all_node_info[node]
2687
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2688
            node != master_node):
2689
          additional_nodes.append(node)
2690
          vf_node_info.append(self.all_node_info[node])
2691
          break
2692
      key = constants.NV_FILELIST
2693
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2694
                                                 {key: node_verify_param[key]},
2695
                                                 self.cfg.GetClusterName()))
2696
    else:
2697
      vf_nvinfo = all_nvinfo
2698
      vf_node_info = self.my_node_info.values()
2699

    
2700
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2701

    
2702
    feedback_fn("* Verifying node status")
2703

    
2704
    refos_img = None
2705

    
2706
    for node_i in node_data_list:
2707
      node = node_i.name
2708
      nimg = node_image[node]
2709

    
2710
      if node_i.offline:
2711
        if verbose:
2712
          feedback_fn("* Skipping offline node %s" % (node,))
2713
        n_offline += 1
2714
        continue
2715

    
2716
      if node == master_node:
2717
        ntype = "master"
2718
      elif node_i.master_candidate:
2719
        ntype = "master candidate"
2720
      elif node_i.drained:
2721
        ntype = "drained"
2722
        n_drained += 1
2723
      else:
2724
        ntype = "regular"
2725
      if verbose:
2726
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2727

    
2728
      msg = all_nvinfo[node].fail_msg
2729
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2730
               msg)
2731
      if msg:
2732
        nimg.rpc_fail = True
2733
        continue
2734

    
2735
      nresult = all_nvinfo[node].payload
2736

    
2737
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2738
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2739
      self._VerifyNodeNetwork(node_i, nresult)
2740
      self._VerifyNodeUserScripts(node_i, nresult)
2741
      self._VerifyOob(node_i, nresult)
2742
      self._VerifyFileStoragePaths(node_i, nresult,
2743
                                   node == master_node)
2744

    
2745
      if nimg.vm_capable:
2746
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2747
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2748
                             all_drbd_map)
2749

    
2750
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2751
        self._UpdateNodeInstances(node_i, nresult, nimg)
2752
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2753
        self._UpdateNodeOS(node_i, nresult, nimg)
2754

    
2755
        if not nimg.os_fail:
2756
          if refos_img is None:
2757
            refos_img = nimg
2758
          self._VerifyNodeOS(node_i, nimg, refos_img)
2759
        self._VerifyNodeBridges(node_i, nresult, bridges)
2760

    
2761
        # Check whether all running instancies are primary for the node. (This
2762
        # can no longer be done from _VerifyInstance below, since some of the
2763
        # wrong instances could be from other node groups.)
2764
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2765

    
2766
        for inst in non_primary_inst:
2767
          test = inst in self.all_inst_info
2768
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2769
                   "instance should not run on node %s", node_i.name)
2770
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2771
                   "node is running unknown instance %s", inst)
2772

    
2773
    self._VerifyGroupLVM(node_image, vg_name)
2774

    
2775
    for node, result in extra_lv_nvinfo.items():
2776
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2777
                              node_image[node], vg_name)
2778

    
2779
    feedback_fn("* Verifying instance status")
2780
    for instance in self.my_inst_names:
2781
      if verbose:
2782
        feedback_fn("* Verifying instance %s" % instance)
2783
      inst_config = self.my_inst_info[instance]
2784
      self._VerifyInstance(instance, inst_config, node_image,
2785
                           instdisk[instance])
2786

    
2787
      # If the instance is non-redundant we cannot survive losing its primary
2788
      # node, so we are not N+1 compliant.
2789
      if inst_config.disk_template not in constants.DTS_MIRRORED:
2790
        i_non_redundant.append(instance)
2791

    
2792
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2793
        i_non_a_balanced.append(instance)
2794

    
2795
    feedback_fn("* Verifying orphan volumes")
2796
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2797

    
2798
    # We will get spurious "unknown volume" warnings if any node of this group
2799
    # is secondary for an instance whose primary is in another group. To avoid
2800
    # them, we find these instances and add their volumes to node_vol_should.
2801
    for inst in self.all_inst_info.values():
2802
      for secondary in inst.secondary_nodes:
2803
        if (secondary in self.my_node_info
2804
            and inst.name not in self.my_inst_info):
2805
          inst.MapLVsByNode(node_vol_should)
2806
          break
2807

    
2808
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2809

    
2810
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2811
      feedback_fn("* Verifying N+1 Memory redundancy")
2812
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2813

    
2814
    feedback_fn("* Other Notes")
2815
    if i_non_redundant:
2816
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2817
                  % len(i_non_redundant))
2818

    
2819
    if i_non_a_balanced:
2820
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2821
                  % len(i_non_a_balanced))
2822

    
2823
    if i_offline:
2824
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2825

    
2826
    if n_offline:
2827
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2828

    
2829
    if n_drained:
2830
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2831

    
2832
    return not self.bad
2833

    
2834
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2835
    """Analyze the post-hooks' result
2836

2837
    This method analyses the hook result, handles it, and sends some
2838
    nicely-formatted feedback back to the user.
2839

2840
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2841
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2842
    @param hooks_results: the results of the multi-node hooks rpc call
2843
    @param feedback_fn: function used send feedback back to the caller
2844
    @param lu_result: previous Exec result
2845
    @return: the new Exec result, based on the previous result
2846
        and hook results
2847

2848
    """
2849
    # We only really run POST phase hooks, only for non-empty groups,
2850
    # and are only interested in their results
2851
    if not self.my_node_names:
2852
      # empty node group
2853
      pass
2854
    elif phase == constants.HOOKS_PHASE_POST:
2855
      # Used to change hooks' output to proper indentation
2856
      feedback_fn("* Hooks Results")
2857
      assert hooks_results, "invalid result from hooks"
2858

    
2859
      for node_name in hooks_results:
2860
        res = hooks_results[node_name]
2861
        msg = res.fail_msg
2862
        test = msg and not res.offline
2863
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2864
                      "Communication failure in hooks execution: %s", msg)
2865
        if res.offline or msg:
2866
          # No need to investigate payload if node is offline or gave
2867
          # an error.
2868
          continue
2869
        for script, hkr, output in res.payload:
2870
          test = hkr == constants.HKR_FAIL
2871
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2872
                        "Script %s failed, output:", script)
2873
          if test:
2874
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2875
            feedback_fn("%s" % output)
2876
            lu_result = False
2877

    
2878
    return lu_result
2879

    
2880

    
2881
class LUClusterVerifyDisks(NoHooksLU):
2882
  """Verifies the cluster disks status.
2883

2884
  """
2885
  REQ_BGL = False
2886

    
2887
  def ExpandNames(self):
2888
    self.share_locks = ShareAll()
2889
    self.needed_locks = {
2890
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2891
      }
2892

    
2893
  def Exec(self, feedback_fn):
2894
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2895

    
2896
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2897
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2898
                           for group in group_names])