Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 6ef8077e

History | View | Annotate | Download (107.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.name,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, master_params.name)
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
170
                                                     master_params, ems)
171
    if result.fail_msg:
172
      self.LogWarning("Error disabling the master IP address: %s",
173
                      result.fail_msg)
174

    
175
    return master_params.name
176

    
177

    
178
class LUClusterPostInit(LogicalUnit):
179
  """Logical unit for running hooks after cluster initialization.
180

181
  """
182
  HPATH = "cluster-init"
183
  HTYPE = constants.HTYPE_CLUSTER
184

    
185
  def BuildHooksEnv(self):
186
    """Build hooks env.
187

188
    """
189
    return {
190
      "OP_TARGET": self.cfg.GetClusterName(),
191
      }
192

    
193
  def BuildHooksNodes(self):
194
    """Build hooks nodes.
195

196
    """
197
    return ([], [self.cfg.GetMasterNode()])
198

    
199
  def Exec(self, feedback_fn):
200
    """Nothing to do.
201

202
    """
203
    return True
204

    
205

    
206
class ClusterQuery(QueryBase):
207
  FIELDS = query.CLUSTER_FIELDS
208

    
209
  #: Do not sort (there is only one item)
210
  SORT_FIELD = None
211

    
212
  def ExpandNames(self, lu):
213
    lu.needed_locks = {}
214

    
215
    # The following variables interact with _QueryBase._GetNames
216
    self.wanted = locking.ALL_SET
217
    self.do_locking = self.use_locking
218

    
219
    if self.do_locking:
220
      raise errors.OpPrereqError("Can not use locking for cluster queries",
221
                                 errors.ECODE_INVAL)
222

    
223
  def DeclareLocks(self, lu, level):
224
    pass
225

    
226
  def _GetQueryData(self, lu):
227
    """Computes the list of nodes and their attributes.
228

229
    """
230
    # Locking is not used
231
    assert not (compat.any(lu.glm.is_owned(level)
232
                           for level in locking.LEVELS
233
                           if level != locking.LEVEL_CLUSTER) or
234
                self.do_locking or self.use_locking)
235

    
236
    if query.CQ_CONFIG in self.requested_data:
237
      cluster = lu.cfg.GetClusterInfo()
238
    else:
239
      cluster = NotImplemented
240

    
241
    if query.CQ_QUEUE_DRAINED in self.requested_data:
242
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243
    else:
244
      drain_flag = NotImplemented
245

    
246
    if query.CQ_WATCHER_PAUSE in self.requested_data:
247
      master_name = lu.cfg.GetMasterNode()
248

    
249
      result = lu.rpc.call_get_watcher_pause(master_name)
250
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
251
                   master_name)
252

    
253
      watcher_pause = result.payload
254
    else:
255
      watcher_pause = NotImplemented
256

    
257
    return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
258

    
259

    
260
class LUClusterQuery(NoHooksLU):
261
  """Query cluster configuration.
262

263
  """
264
  REQ_BGL = False
265

    
266
  def ExpandNames(self):
267
    self.needed_locks = {}
268

    
269
  def Exec(self, feedback_fn):
270
    """Return cluster config.
271

272
    """
273
    cluster = self.cfg.GetClusterInfo()
274
    os_hvp = {}
275

    
276
    # Filter just for enabled hypervisors
277
    for os_name, hv_dict in cluster.os_hvp.items():
278
      os_hvp[os_name] = {}
279
      for hv_name, hv_params in hv_dict.items():
280
        if hv_name in cluster.enabled_hypervisors:
281
          os_hvp[os_name][hv_name] = hv_params
282

    
283
    # Convert ip_family to ip_version
284
    primary_ip_version = constants.IP4_VERSION
285
    if cluster.primary_ip_family == netutils.IP6Address.family:
286
      primary_ip_version = constants.IP6_VERSION
287

    
288
    result = {
289
      "software_version": constants.RELEASE_VERSION,
290
      "protocol_version": constants.PROTOCOL_VERSION,
291
      "config_version": constants.CONFIG_VERSION,
292
      "os_api_version": max(constants.OS_API_VERSIONS),
293
      "export_version": constants.EXPORT_VERSION,
294
      "architecture": runtime.GetArchInfo(),
295
      "name": cluster.cluster_name,
296
      "master": cluster.master_node,
297
      "default_hypervisor": cluster.primary_hypervisor,
298
      "enabled_hypervisors": cluster.enabled_hypervisors,
299
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
300
                        for hypervisor_name in cluster.enabled_hypervisors]),
301
      "os_hvp": os_hvp,
302
      "beparams": cluster.beparams,
303
      "osparams": cluster.osparams,
304
      "ipolicy": cluster.ipolicy,
305
      "nicparams": cluster.nicparams,
306
      "ndparams": cluster.ndparams,
307
      "diskparams": cluster.diskparams,
308
      "candidate_pool_size": cluster.candidate_pool_size,
309
      "master_netdev": cluster.master_netdev,
310
      "master_netmask": cluster.master_netmask,
311
      "use_external_mip_script": cluster.use_external_mip_script,
312
      "volume_group_name": cluster.volume_group_name,
313
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
314
      "file_storage_dir": cluster.file_storage_dir,
315
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
316
      "maintain_node_health": cluster.maintain_node_health,
317
      "ctime": cluster.ctime,
318
      "mtime": cluster.mtime,
319
      "uuid": cluster.uuid,
320
      "tags": list(cluster.GetTags()),
321
      "uid_pool": cluster.uid_pool,
322
      "default_iallocator": cluster.default_iallocator,
323
      "reserved_lvs": cluster.reserved_lvs,
324
      "primary_ip_version": primary_ip_version,
325
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
326
      "hidden_os": cluster.hidden_os,
327
      "blacklisted_os": cluster.blacklisted_os,
328
      }
329

    
330
    return result
331

    
332

    
333
class LUClusterRedistConf(NoHooksLU):
334
  """Force the redistribution of cluster configuration.
335

336
  This is a very simple LU.
337

338
  """
339
  REQ_BGL = False
340

    
341
  def ExpandNames(self):
342
    self.needed_locks = {
343
      locking.LEVEL_NODE: locking.ALL_SET,
344
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
345
    }
346
    self.share_locks = ShareAll()
347

    
348
  def Exec(self, feedback_fn):
349
    """Redistribute the configuration.
350

351
    """
352
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
353
    RedistributeAncillaryFiles(self)
354

    
355

    
356
class LUClusterRename(LogicalUnit):
357
  """Rename the cluster.
358

359
  """
360
  HPATH = "cluster-rename"
361
  HTYPE = constants.HTYPE_CLUSTER
362

    
363
  def BuildHooksEnv(self):
364
    """Build hooks env.
365

366
    """
367
    return {
368
      "OP_TARGET": self.cfg.GetClusterName(),
369
      "NEW_NAME": self.op.name,
370
      }
371

    
372
  def BuildHooksNodes(self):
373
    """Build hooks nodes.
374

375
    """
376
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
377

    
378
  def CheckPrereq(self):
379
    """Verify that the passed name is a valid one.
380

381
    """
382
    hostname = netutils.GetHostname(name=self.op.name,
383
                                    family=self.cfg.GetPrimaryIPFamily())
384

    
385
    new_name = hostname.name
386
    self.ip = new_ip = hostname.ip
387
    old_name = self.cfg.GetClusterName()
388
    old_ip = self.cfg.GetMasterIP()
389
    if new_name == old_name and new_ip == old_ip:
390
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
391
                                 " cluster has changed",
392
                                 errors.ECODE_INVAL)
393
    if new_ip != old_ip:
394
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
395
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
396
                                   " reachable on the network" %
397
                                   new_ip, errors.ECODE_NOTUNIQUE)
398

    
399
    self.op.name = new_name
400

    
401
  def Exec(self, feedback_fn):
402
    """Rename the cluster.
403

404
    """
405
    clustername = self.op.name
406
    new_ip = self.ip
407

    
408
    # shutdown the master IP
409
    master_params = self.cfg.GetMasterNetworkParameters()
410
    ems = self.cfg.GetUseExternalMipScript()
411
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
412
                                                     master_params, ems)
413
    result.Raise("Could not disable the master role")
414

    
415
    try:
416
      cluster = self.cfg.GetClusterInfo()
417
      cluster.cluster_name = clustername
418
      cluster.master_ip = new_ip
419
      self.cfg.Update(cluster, feedback_fn)
420

    
421
      # update the known hosts file
422
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
423
      node_list = self.cfg.GetOnlineNodeList()
424
      try:
425
        node_list.remove(master_params.name)
426
      except ValueError:
427
        pass
428
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
429
    finally:
430
      master_params.ip = new_ip
431
      result = self.rpc.call_node_activate_master_ip(master_params.name,
432
                                                     master_params, ems)
433
      msg = result.fail_msg
434
      if msg:
435
        self.LogWarning("Could not re-enable the master role on"
436
                        " the master, please restart manually: %s", msg)
437

    
438
    return clustername
439

    
440

    
441
class LUClusterRepairDiskSizes(NoHooksLU):
442
  """Verifies the cluster disks sizes.
443

444
  """
445
  REQ_BGL = False
446

    
447
  def ExpandNames(self):
448
    if self.op.instances:
449
      self.wanted_names = GetWantedInstances(self, self.op.instances)
450
      # Not getting the node allocation lock as only a specific set of
451
      # instances (and their nodes) is going to be acquired
452
      self.needed_locks = {
453
        locking.LEVEL_NODE_RES: [],
454
        locking.LEVEL_INSTANCE: self.wanted_names,
455
        }
456
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
457
    else:
458
      self.wanted_names = None
459
      self.needed_locks = {
460
        locking.LEVEL_NODE_RES: locking.ALL_SET,
461
        locking.LEVEL_INSTANCE: locking.ALL_SET,
462

    
463
        # This opcode is acquires the node locks for all instances
464
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
465
        }
466

    
467
    self.share_locks = {
468
      locking.LEVEL_NODE_RES: 1,
469
      locking.LEVEL_INSTANCE: 0,
470
      locking.LEVEL_NODE_ALLOC: 1,
471
      }
472

    
473
  def DeclareLocks(self, level):
474
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
475
      self._LockInstancesNodes(primary_only=True, level=level)
476

    
477
  def CheckPrereq(self):
478
    """Check prerequisites.
479

480
    This only checks the optional instance list against the existing names.
481

482
    """
483
    if self.wanted_names is None:
484
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
485

    
486
    self.wanted_instances = \
487
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
488

    
489
  def _EnsureChildSizes(self, disk):
490
    """Ensure children of the disk have the needed disk size.
491

492
    This is valid mainly for DRBD8 and fixes an issue where the
493
    children have smaller disk size.
494

495
    @param disk: an L{ganeti.objects.Disk} object
496

497
    """
498
    if disk.dev_type == constants.LD_DRBD8:
499
      assert disk.children, "Empty children for DRBD8?"
500
      fchild = disk.children[0]
501
      mismatch = fchild.size < disk.size
502
      if mismatch:
503
        self.LogInfo("Child disk has size %d, parent %d, fixing",
504
                     fchild.size, disk.size)
505
        fchild.size = disk.size
506

    
507
      # and we recurse on this child only, not on the metadev
508
      return self._EnsureChildSizes(fchild) or mismatch
509
    else:
510
      return False
511

    
512
  def Exec(self, feedback_fn):
513
    """Verify the size of cluster disks.
514

515
    """
516
    # TODO: check child disks too
517
    # TODO: check differences in size between primary/secondary nodes
518
    per_node_disks = {}
519
    for instance in self.wanted_instances:
520
      pnode = instance.primary_node
521
      if pnode not in per_node_disks:
522
        per_node_disks[pnode] = []
523
      for idx, disk in enumerate(instance.disks):
524
        per_node_disks[pnode].append((instance, idx, disk))
525

    
526
    assert not (frozenset(per_node_disks.keys()) -
527
                self.owned_locks(locking.LEVEL_NODE_RES)), \
528
      "Not owning correct locks"
529
    assert not self.owned_locks(locking.LEVEL_NODE)
530

    
531
    changed = []
532
    for node, dskl in per_node_disks.items():
533
      newl = [v[2].Copy() for v in dskl]
534
      for dsk in newl:
535
        self.cfg.SetDiskID(dsk, node)
536
      result = self.rpc.call_blockdev_getdimensions(node, newl)
537
      if result.fail_msg:
538
        self.LogWarning("Failure in blockdev_getdimensions call to node"
539
                        " %s, ignoring", node)
540
        continue
541
      if len(result.payload) != len(dskl):
542
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
543
                        " result.payload=%s", node, len(dskl), result.payload)
544
        self.LogWarning("Invalid result from node %s, ignoring node results",
545
                        node)
546
        continue
547
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
548
        if dimensions is None:
549
          self.LogWarning("Disk %d of instance %s did not return size"
550
                          " information, ignoring", idx, instance.name)
551
          continue
552
        if not isinstance(dimensions, (tuple, list)):
553
          self.LogWarning("Disk %d of instance %s did not return valid"
554
                          " dimension information, ignoring", idx,
555
                          instance.name)
556
          continue
557
        (size, _) = dimensions
558
        if not isinstance(size, (int, long)):
559
          self.LogWarning("Disk %d of instance %s did not return valid"
560
                          " size information, ignoring", idx, instance.name)
561
          continue
562
        size = size >> 20
563
        if size != disk.size:
564
          self.LogInfo("Disk %d of instance %s has mismatched size,"
565
                       " correcting: recorded %d, actual %d", idx,
566
                       instance.name, disk.size, size)
567
          disk.size = size
568
          self.cfg.Update(instance, feedback_fn)
569
          changed.append((instance.name, idx, size))
570
        if self._EnsureChildSizes(disk):
571
          self.cfg.Update(instance, feedback_fn)
572
          changed.append((instance.name, idx, disk.size))
573
    return changed
574

    
575

    
576
def _ValidateNetmask(cfg, netmask):
577
  """Checks if a netmask is valid.
578

579
  @type cfg: L{config.ConfigWriter}
580
  @param cfg: The cluster configuration
581
  @type netmask: int
582
  @param netmask: the netmask to be verified
583
  @raise errors.OpPrereqError: if the validation fails
584

585
  """
586
  ip_family = cfg.GetPrimaryIPFamily()
587
  try:
588
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
589
  except errors.ProgrammerError:
590
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
591
                               ip_family, errors.ECODE_INVAL)
592
  if not ipcls.ValidateNetmask(netmask):
593
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
594
                               (netmask), errors.ECODE_INVAL)
595

    
596

    
597
class LUClusterSetParams(LogicalUnit):
598
  """Change the parameters of the cluster.
599

600
  """
601
  HPATH = "cluster-modify"
602
  HTYPE = constants.HTYPE_CLUSTER
603
  REQ_BGL = False
604

    
605
  def CheckArguments(self):
606
    """Check parameters
607

608
    """
609
    if self.op.uid_pool:
610
      uidpool.CheckUidPool(self.op.uid_pool)
611

    
612
    if self.op.add_uids:
613
      uidpool.CheckUidPool(self.op.add_uids)
614

    
615
    if self.op.remove_uids:
616
      uidpool.CheckUidPool(self.op.remove_uids)
617

    
618
    if self.op.master_netmask is not None:
619
      _ValidateNetmask(self.cfg, self.op.master_netmask)
620

    
621
    if self.op.diskparams:
622
      for dt_params in self.op.diskparams.values():
623
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
624
      try:
625
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
626
      except errors.OpPrereqError, err:
627
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
628
                                   errors.ECODE_INVAL)
629

    
630
  def ExpandNames(self):
631
    # FIXME: in the future maybe other cluster params won't require checking on
632
    # all nodes to be modified.
633
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
634
    # resource locks the right thing, shouldn't it be the BGL instead?
635
    self.needed_locks = {
636
      locking.LEVEL_NODE: locking.ALL_SET,
637
      locking.LEVEL_INSTANCE: locking.ALL_SET,
638
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
639
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
640
    }
641
    self.share_locks = ShareAll()
642

    
643
  def BuildHooksEnv(self):
644
    """Build hooks env.
645

646
    """
647
    return {
648
      "OP_TARGET": self.cfg.GetClusterName(),
649
      "NEW_VG_NAME": self.op.vg_name,
650
      }
651

    
652
  def BuildHooksNodes(self):
653
    """Build hooks nodes.
654

655
    """
656
    mn = self.cfg.GetMasterNode()
657
    return ([mn], [mn])
658

    
659
  def _CheckVgName(self, node_list, enabled_disk_templates,
660
                   new_enabled_disk_templates):
661
    """Check the consistency of the vg name on all nodes and in case it gets
662
       unset whether there are instances still using it.
663

664
    """
665
    if self.op.vg_name is not None and not self.op.vg_name:
666
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
667
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
668
                                   " instances exist", errors.ECODE_INVAL)
669

    
670
    if (self.op.vg_name is not None and
671
        utils.IsLvmEnabled(enabled_disk_templates)) or \
672
           (self.cfg.GetVGName() is not None and
673
            utils.LvmGetsEnabled(enabled_disk_templates,
674
                                 new_enabled_disk_templates)):
675
      self._CheckVgNameOnNodes(node_list)
676

    
677
  def _CheckVgNameOnNodes(self, node_list):
678
    """Check the status of the volume group on each node.
679

680
    """
681
    vglist = self.rpc.call_vg_list(node_list)
682
    for node in node_list:
683
      msg = vglist[node].fail_msg
684
      if msg:
685
        # ignoring down node
686
        self.LogWarning("Error while gathering data on node %s"
687
                        " (ignoring node): %s", node, msg)
688
        continue
689
      vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
690
                                            self.op.vg_name,
691
                                            constants.MIN_VG_SIZE)
692
      if vgstatus:
693
        raise errors.OpPrereqError("Error on node '%s': %s" %
694
                                   (node, vgstatus), errors.ECODE_ENVIRON)
695

    
696
  def _GetEnabledDiskTemplates(self, cluster):
697
    """Determines the enabled disk templates and the subset of disk templates
698
       that are newly enabled by this operation.
699

700
    """
701
    enabled_disk_templates = None
702
    new_enabled_disk_templates = []
703
    if self.op.enabled_disk_templates:
704
      enabled_disk_templates = self.op.enabled_disk_templates
705
      new_enabled_disk_templates = \
706
        list(set(enabled_disk_templates)
707
             - set(cluster.enabled_disk_templates))
708
    else:
709
      enabled_disk_templates = cluster.enabled_disk_templates
710
    return (enabled_disk_templates, new_enabled_disk_templates)
711

    
712
  def CheckPrereq(self):
713
    """Check prerequisites.
714

715
    This checks whether the given params don't conflict and
716
    if the given volume group is valid.
717

718
    """
719
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
720
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
721
        raise errors.OpPrereqError("Cannot disable drbd helper while"
722
                                   " drbd-based instances exist",
723
                                   errors.ECODE_INVAL)
724

    
725
    node_list = self.owned_locks(locking.LEVEL_NODE)
726
    self.cluster = cluster = self.cfg.GetClusterInfo()
727

    
728
    vm_capable_nodes = [node.name
729
                        for node in self.cfg.GetAllNodesInfo().values()
730
                        if node.name in node_list and node.vm_capable]
731

    
732
    (enabled_disk_templates, new_enabled_disk_templates) = \
733
      self._GetEnabledDiskTemplates(cluster)
734

    
735
    self._CheckVgName(vm_capable_nodes, enabled_disk_templates,
736
                      new_enabled_disk_templates)
737

    
738
    if self.op.drbd_helper:
739
      # checks given drbd helper on all nodes
740
      helpers = self.rpc.call_drbd_helper(node_list)
741
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
742
        if ninfo.offline:
743
          self.LogInfo("Not checking drbd helper on offline node %s", node)
744
          continue
745
        msg = helpers[node].fail_msg
746
        if msg:
747
          raise errors.OpPrereqError("Error checking drbd helper on node"
748
                                     " '%s': %s" % (node, msg),
749
                                     errors.ECODE_ENVIRON)
750
        node_helper = helpers[node].payload
751
        if node_helper != self.op.drbd_helper:
752
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
753
                                     (node, node_helper), errors.ECODE_ENVIRON)
754

    
755
    # validate params changes
756
    if self.op.beparams:
757
      objects.UpgradeBeParams(self.op.beparams)
758
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
759
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
760

    
761
    if self.op.ndparams:
762
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
763
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
764

    
765
      # TODO: we need a more general way to handle resetting
766
      # cluster-level parameters to default values
767
      if self.new_ndparams["oob_program"] == "":
768
        self.new_ndparams["oob_program"] = \
769
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
770

    
771
    if self.op.hv_state:
772
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
773
                                           self.cluster.hv_state_static)
774
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
775
                               for hv, values in new_hv_state.items())
776

    
777
    if self.op.disk_state:
778
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
779
                                               self.cluster.disk_state_static)
780
      self.new_disk_state = \
781
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
782
                            for name, values in svalues.items()))
783
             for storage, svalues in new_disk_state.items())
784

    
785
    if self.op.ipolicy:
786
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
787
                                           group_policy=False)
788

    
789
      all_instances = self.cfg.GetAllInstancesInfo().values()
790
      violations = set()
791
      for group in self.cfg.GetAllNodeGroupsInfo().values():
792
        instances = frozenset([inst for inst in all_instances
793
                               if compat.any(node in group.members
794
                                             for node in inst.all_nodes)])
795
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
796
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
797
        new = ComputeNewInstanceViolations(ipol,
798
                                           new_ipolicy, instances, self.cfg)
799
        if new:
800
          violations.update(new)
801

    
802
      if violations:
803
        self.LogWarning("After the ipolicy change the following instances"
804
                        " violate them: %s",
805
                        utils.CommaJoin(utils.NiceSort(violations)))
806

    
807
    if self.op.nicparams:
808
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
809
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
810
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
811
      nic_errors = []
812

    
813
      # check all instances for consistency
814
      for instance in self.cfg.GetAllInstancesInfo().values():
815
        for nic_idx, nic in enumerate(instance.nics):
816
          params_copy = copy.deepcopy(nic.nicparams)
817
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
818

    
819
          # check parameter syntax
820
          try:
821
            objects.NIC.CheckParameterSyntax(params_filled)
822
          except errors.ConfigurationError, err:
823
            nic_errors.append("Instance %s, nic/%d: %s" %
824
                              (instance.name, nic_idx, err))
825

    
826
          # if we're moving instances to routed, check that they have an ip
827
          target_mode = params_filled[constants.NIC_MODE]
828
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
829
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
830
                              " address" % (instance.name, nic_idx))
831
      if nic_errors:
832
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
833
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
834

    
835
    # hypervisor list/parameters
836
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
837
    if self.op.hvparams:
838
      for hv_name, hv_dict in self.op.hvparams.items():
839
        if hv_name not in self.new_hvparams:
840
          self.new_hvparams[hv_name] = hv_dict
841
        else:
842
          self.new_hvparams[hv_name].update(hv_dict)
843

    
844
    # disk template parameters
845
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
846
    if self.op.diskparams:
847
      for dt_name, dt_params in self.op.diskparams.items():
848
        if dt_name not in self.op.diskparams:
849
          self.new_diskparams[dt_name] = dt_params
850
        else:
851
          self.new_diskparams[dt_name].update(dt_params)
852

    
853
    # os hypervisor parameters
854
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
855
    if self.op.os_hvp:
856
      for os_name, hvs in self.op.os_hvp.items():
857
        if os_name not in self.new_os_hvp:
858
          self.new_os_hvp[os_name] = hvs
859
        else:
860
          for hv_name, hv_dict in hvs.items():
861
            if hv_dict is None:
862
              # Delete if it exists
863
              self.new_os_hvp[os_name].pop(hv_name, None)
864
            elif hv_name not in self.new_os_hvp[os_name]:
865
              self.new_os_hvp[os_name][hv_name] = hv_dict
866
            else:
867
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
868

    
869
    # os parameters
870
    self.new_osp = objects.FillDict(cluster.osparams, {})
871
    if self.op.osparams:
872
      for os_name, osp in self.op.osparams.items():
873
        if os_name not in self.new_osp:
874
          self.new_osp[os_name] = {}
875

    
876
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
877
                                                 use_none=True)
878

    
879
        if not self.new_osp[os_name]:
880
          # we removed all parameters
881
          del self.new_osp[os_name]
882
        else:
883
          # check the parameter validity (remote check)
884
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
885
                        os_name, self.new_osp[os_name])
886

    
887
    # changes to the hypervisor list
888
    if self.op.enabled_hypervisors is not None:
889
      self.hv_list = self.op.enabled_hypervisors
890
      for hv in self.hv_list:
891
        # if the hypervisor doesn't already exist in the cluster
892
        # hvparams, we initialize it to empty, and then (in both
893
        # cases) we make sure to fill the defaults, as we might not
894
        # have a complete defaults list if the hypervisor wasn't
895
        # enabled before
896
        if hv not in new_hvp:
897
          new_hvp[hv] = {}
898
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
899
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
900
    else:
901
      self.hv_list = cluster.enabled_hypervisors
902

    
903
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
904
      # either the enabled list has changed, or the parameters have, validate
905
      for hv_name, hv_params in self.new_hvparams.items():
906
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
907
            (self.op.enabled_hypervisors and
908
             hv_name in self.op.enabled_hypervisors)):
909
          # either this is a new hypervisor, or its parameters have changed
910
          hv_class = hypervisor.GetHypervisorClass(hv_name)
911
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
912
          hv_class.CheckParameterSyntax(hv_params)
913
          CheckHVParams(self, node_list, hv_name, hv_params)
914

    
915
    self._CheckDiskTemplateConsistency()
916

    
917
    if self.op.os_hvp:
918
      # no need to check any newly-enabled hypervisors, since the
919
      # defaults have already been checked in the above code-block
920
      for os_name, os_hvp in self.new_os_hvp.items():
921
        for hv_name, hv_params in os_hvp.items():
922
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
923
          # we need to fill in the new os_hvp on top of the actual hv_p
924
          cluster_defaults = self.new_hvparams.get(hv_name, {})
925
          new_osp = objects.FillDict(cluster_defaults, hv_params)
926
          hv_class = hypervisor.GetHypervisorClass(hv_name)
927
          hv_class.CheckParameterSyntax(new_osp)
928
          CheckHVParams(self, node_list, hv_name, new_osp)
929

    
930
    if self.op.default_iallocator:
931
      alloc_script = utils.FindFile(self.op.default_iallocator,
932
                                    constants.IALLOCATOR_SEARCH_PATH,
933
                                    os.path.isfile)
934
      if alloc_script is None:
935
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
936
                                   " specified" % self.op.default_iallocator,
937
                                   errors.ECODE_INVAL)
938

    
939
  def _CheckDiskTemplateConsistency(self):
940
    """Check whether the disk templates that are going to be disabled
941
       are still in use by some instances.
942

943
    """
944
    if self.op.enabled_disk_templates:
945
      cluster = self.cfg.GetClusterInfo()
946
      instances = self.cfg.GetAllInstancesInfo()
947

    
948
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
949
        - set(self.op.enabled_disk_templates)
950
      for instance in instances.itervalues():
951
        if instance.disk_template in disk_templates_to_remove:
952
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
953
                                     " because instance '%s' is using it." %
954
                                     (instance.disk_template, instance.name))
955

    
956
  def _SetVgName(self, feedback_fn):
957
    """Determines and sets the new volume group name.
958

959
    """
960
    if self.op.vg_name is not None:
961
      if self.op.vg_name and not \
962
           utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
963
        feedback_fn("Note that you specified a volume group, but did not"
964
                    " enable any lvm disk template.")
965
      new_volume = self.op.vg_name
966
      if not new_volume:
967
        if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
968
          raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
969
                                     " disk templates are enabled.")
970
        new_volume = None
971
      if new_volume != self.cfg.GetVGName():
972
        self.cfg.SetVGName(new_volume)
973
      else:
974
        feedback_fn("Cluster LVM configuration already in desired"
975
                    " state, not changing")
976
    else:
977
      if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
978
          not self.cfg.GetVGName():
979
        raise errors.OpPrereqError("Please specify a volume group when"
980
                                   " enabling lvm-based disk-templates.")
981

    
982
  def Exec(self, feedback_fn):
983
    """Change the parameters of the cluster.
984

985
    """
986
    if self.op.enabled_disk_templates:
987
      self.cluster.enabled_disk_templates = \
988
        list(set(self.op.enabled_disk_templates))
989

    
990
    self._SetVgName(feedback_fn)
991

    
992
    if self.op.drbd_helper is not None:
993
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
994
        feedback_fn("Note that you specified a drbd user helper, but did"
995
                    " enabled the drbd disk template.")
996
      new_helper = self.op.drbd_helper
997
      if not new_helper:
998
        new_helper = None
999
      if new_helper != self.cfg.GetDRBDHelper():
1000
        self.cfg.SetDRBDHelper(new_helper)
1001
      else:
1002
        feedback_fn("Cluster DRBD helper already in desired state,"
1003
                    " not changing")
1004
    if self.op.hvparams:
1005
      self.cluster.hvparams = self.new_hvparams
1006
    if self.op.os_hvp:
1007
      self.cluster.os_hvp = self.new_os_hvp
1008
    if self.op.enabled_hypervisors is not None:
1009
      self.cluster.hvparams = self.new_hvparams
1010
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1011
    if self.op.beparams:
1012
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1013
    if self.op.nicparams:
1014
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1015
    if self.op.ipolicy:
1016
      self.cluster.ipolicy = self.new_ipolicy
1017
    if self.op.osparams:
1018
      self.cluster.osparams = self.new_osp
1019
    if self.op.ndparams:
1020
      self.cluster.ndparams = self.new_ndparams
1021
    if self.op.diskparams:
1022
      self.cluster.diskparams = self.new_diskparams
1023
    if self.op.hv_state:
1024
      self.cluster.hv_state_static = self.new_hv_state
1025
    if self.op.disk_state:
1026
      self.cluster.disk_state_static = self.new_disk_state
1027

    
1028
    if self.op.candidate_pool_size is not None:
1029
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1030
      # we need to update the pool size here, otherwise the save will fail
1031
      AdjustCandidatePool(self, [])
1032

    
1033
    if self.op.maintain_node_health is not None:
1034
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1035
        feedback_fn("Note: CONFD was disabled at build time, node health"
1036
                    " maintenance is not useful (still enabling it)")
1037
      self.cluster.maintain_node_health = self.op.maintain_node_health
1038

    
1039
    if self.op.prealloc_wipe_disks is not None:
1040
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1041

    
1042
    if self.op.add_uids is not None:
1043
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1044

    
1045
    if self.op.remove_uids is not None:
1046
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1047

    
1048
    if self.op.uid_pool is not None:
1049
      self.cluster.uid_pool = self.op.uid_pool
1050

    
1051
    if self.op.default_iallocator is not None:
1052
      self.cluster.default_iallocator = self.op.default_iallocator
1053

    
1054
    if self.op.reserved_lvs is not None:
1055
      self.cluster.reserved_lvs = self.op.reserved_lvs
1056

    
1057
    if self.op.use_external_mip_script is not None:
1058
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1059

    
1060
    def helper_os(aname, mods, desc):
1061
      desc += " OS list"
1062
      lst = getattr(self.cluster, aname)
1063
      for key, val in mods:
1064
        if key == constants.DDM_ADD:
1065
          if val in lst:
1066
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1067
          else:
1068
            lst.append(val)
1069
        elif key == constants.DDM_REMOVE:
1070
          if val in lst:
1071
            lst.remove(val)
1072
          else:
1073
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1074
        else:
1075
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1076

    
1077
    if self.op.hidden_os:
1078
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1079

    
1080
    if self.op.blacklisted_os:
1081
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1082

    
1083
    if self.op.master_netdev:
1084
      master_params = self.cfg.GetMasterNetworkParameters()
1085
      ems = self.cfg.GetUseExternalMipScript()
1086
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1087
                  self.cluster.master_netdev)
1088
      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1089
                                                       master_params, ems)
1090
      result.Raise("Could not disable the master ip")
1091
      feedback_fn("Changing master_netdev from %s to %s" %
1092
                  (master_params.netdev, self.op.master_netdev))
1093
      self.cluster.master_netdev = self.op.master_netdev
1094

    
1095
    if self.op.master_netmask:
1096
      master_params = self.cfg.GetMasterNetworkParameters()
1097
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1098
      result = self.rpc.call_node_change_master_netmask(master_params.name,
1099
                                                        master_params.netmask,
1100
                                                        self.op.master_netmask,
1101
                                                        master_params.ip,
1102
                                                        master_params.netdev)
1103
      if result.fail_msg:
1104
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
1105
        feedback_fn(msg)
1106

    
1107
      self.cluster.master_netmask = self.op.master_netmask
1108

    
1109
    self.cfg.Update(self.cluster, feedback_fn)
1110

    
1111
    if self.op.master_netdev:
1112
      master_params = self.cfg.GetMasterNetworkParameters()
1113
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1114
                  self.op.master_netdev)
1115
      ems = self.cfg.GetUseExternalMipScript()
1116
      result = self.rpc.call_node_activate_master_ip(master_params.name,
1117
                                                     master_params, ems)
1118
      if result.fail_msg:
1119
        self.LogWarning("Could not re-enable the master ip on"
1120
                        " the master, please restart manually: %s",
1121
                        result.fail_msg)
1122

    
1123

    
1124
class LUClusterVerify(NoHooksLU):
1125
  """Submits all jobs necessary to verify the cluster.
1126

1127
  """
1128
  REQ_BGL = False
1129

    
1130
  def ExpandNames(self):
1131
    self.needed_locks = {}
1132

    
1133
  def Exec(self, feedback_fn):
1134
    jobs = []
1135

    
1136
    if self.op.group_name:
1137
      groups = [self.op.group_name]
1138
      depends_fn = lambda: None
1139
    else:
1140
      groups = self.cfg.GetNodeGroupList()
1141

    
1142
      # Verify global configuration
1143
      jobs.append([
1144
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1145
        ])
1146

    
1147
      # Always depend on global verification
1148
      depends_fn = lambda: [(-len(jobs), [])]
1149

    
1150
    jobs.extend(
1151
      [opcodes.OpClusterVerifyGroup(group_name=group,
1152
                                    ignore_errors=self.op.ignore_errors,
1153
                                    depends=depends_fn())]
1154
      for group in groups)
1155

    
1156
    # Fix up all parameters
1157
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1158
      op.debug_simulate_errors = self.op.debug_simulate_errors
1159
      op.verbose = self.op.verbose
1160
      op.error_codes = self.op.error_codes
1161
      try:
1162
        op.skip_checks = self.op.skip_checks
1163
      except AttributeError:
1164
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1165

    
1166
    return ResultWithJobs(jobs)
1167

    
1168

    
1169
class _VerifyErrors(object):
1170
  """Mix-in for cluster/group verify LUs.
1171

1172
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1173
  self.op and self._feedback_fn to be available.)
1174

1175
  """
1176

    
1177
  ETYPE_FIELD = "code"
1178
  ETYPE_ERROR = "ERROR"
1179
  ETYPE_WARNING = "WARNING"
1180

    
1181
  def _Error(self, ecode, item, msg, *args, **kwargs):
1182
    """Format an error message.
1183

1184
    Based on the opcode's error_codes parameter, either format a
1185
    parseable error code, or a simpler error string.
1186

1187
    This must be called only from Exec and functions called from Exec.
1188

1189
    """
1190
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1191
    itype, etxt, _ = ecode
1192
    # If the error code is in the list of ignored errors, demote the error to a
1193
    # warning
1194
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1195
      ltype = self.ETYPE_WARNING
1196
    # first complete the msg
1197
    if args:
1198
      msg = msg % args
1199
    # then format the whole message
1200
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1201
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1202
    else:
1203
      if item:
1204
        item = " " + item
1205
      else:
1206
        item = ""
1207
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1208
    # and finally report it via the feedback_fn
1209
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1210
    # do not mark the operation as failed for WARN cases only
1211
    if ltype == self.ETYPE_ERROR:
1212
      self.bad = True
1213

    
1214
  def _ErrorIf(self, cond, *args, **kwargs):
1215
    """Log an error message if the passed condition is True.
1216

1217
    """
1218
    if (bool(cond)
1219
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1220
      self._Error(*args, **kwargs)
1221

    
1222

    
1223
def _VerifyCertificate(filename):
1224
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1225

1226
  @type filename: string
1227
  @param filename: Path to PEM file
1228

1229
  """
1230
  try:
1231
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1232
                                           utils.ReadFile(filename))
1233
  except Exception, err: # pylint: disable=W0703
1234
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1235
            "Failed to load X509 certificate %s: %s" % (filename, err))
1236

    
1237
  (errcode, msg) = \
1238
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1239
                                constants.SSL_CERT_EXPIRATION_ERROR)
1240

    
1241
  if msg:
1242
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1243
  else:
1244
    fnamemsg = None
1245

    
1246
  if errcode is None:
1247
    return (None, fnamemsg)
1248
  elif errcode == utils.CERT_WARNING:
1249
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1250
  elif errcode == utils.CERT_ERROR:
1251
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1252

    
1253
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1254

    
1255

    
1256
def _GetAllHypervisorParameters(cluster, instances):
1257
  """Compute the set of all hypervisor parameters.
1258

1259
  @type cluster: L{objects.Cluster}
1260
  @param cluster: the cluster object
1261
  @param instances: list of L{objects.Instance}
1262
  @param instances: additional instances from which to obtain parameters
1263
  @rtype: list of (origin, hypervisor, parameters)
1264
  @return: a list with all parameters found, indicating the hypervisor they
1265
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1266

1267
  """
1268
  hvp_data = []
1269

    
1270
  for hv_name in cluster.enabled_hypervisors:
1271
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1272

    
1273
  for os_name, os_hvp in cluster.os_hvp.items():
1274
    for hv_name, hv_params in os_hvp.items():
1275
      if hv_params:
1276
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1277
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1278

    
1279
  # TODO: collapse identical parameter values in a single one
1280
  for instance in instances:
1281
    if instance.hvparams:
1282
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1283
                       cluster.FillHV(instance)))
1284

    
1285
  return hvp_data
1286

    
1287

    
1288
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1289
  """Verifies the cluster config.
1290

1291
  """
1292
  REQ_BGL = False
1293

    
1294
  def _VerifyHVP(self, hvp_data):
1295
    """Verifies locally the syntax of the hypervisor parameters.
1296

1297
    """
1298
    for item, hv_name, hv_params in hvp_data:
1299
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1300
             (item, hv_name))
1301
      try:
1302
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1303
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1304
        hv_class.CheckParameterSyntax(hv_params)
1305
      except errors.GenericError, err:
1306
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1307

    
1308
  def ExpandNames(self):
1309
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1310
    self.share_locks = ShareAll()
1311

    
1312
  def CheckPrereq(self):
1313
    """Check prerequisites.
1314

1315
    """
1316
    # Retrieve all information
1317
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1318
    self.all_node_info = self.cfg.GetAllNodesInfo()
1319
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1320

    
1321
  def Exec(self, feedback_fn):
1322
    """Verify integrity of cluster, performing various test on nodes.
1323

1324
    """
1325
    self.bad = False
1326
    self._feedback_fn = feedback_fn
1327

    
1328
    feedback_fn("* Verifying cluster config")
1329

    
1330
    for msg in self.cfg.VerifyConfig():
1331
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1332

    
1333
    feedback_fn("* Verifying cluster certificate files")
1334

    
1335
    for cert_filename in pathutils.ALL_CERT_FILES:
1336
      (errcode, msg) = _VerifyCertificate(cert_filename)
1337
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1338

    
1339
    feedback_fn("* Verifying hypervisor parameters")
1340

    
1341
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1342
                                                self.all_inst_info.values()))
1343

    
1344
    feedback_fn("* Verifying all nodes belong to an existing group")
1345

    
1346
    # We do this verification here because, should this bogus circumstance
1347
    # occur, it would never be caught by VerifyGroup, which only acts on
1348
    # nodes/instances reachable from existing node groups.
1349

    
1350
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1351
                         if node.group not in self.all_group_info)
1352

    
1353
    dangling_instances = {}
1354
    no_node_instances = []
1355

    
1356
    for inst in self.all_inst_info.values():
1357
      if inst.primary_node in dangling_nodes:
1358
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1359
      elif inst.primary_node not in self.all_node_info:
1360
        no_node_instances.append(inst.name)
1361

    
1362
    pretty_dangling = [
1363
        "%s (%s)" %
1364
        (node.name,
1365
         utils.CommaJoin(dangling_instances.get(node.name,
1366
                                                ["no instances"])))
1367
        for node in dangling_nodes]
1368

    
1369
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1370
                  None,
1371
                  "the following nodes (and their instances) belong to a non"
1372
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1373

    
1374
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1375
                  None,
1376
                  "the following instances have a non-existing primary-node:"
1377
                  " %s", utils.CommaJoin(no_node_instances))
1378

    
1379
    return not self.bad
1380

    
1381

    
1382
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1383
  """Verifies the status of a node group.
1384

1385
  """
1386
  HPATH = "cluster-verify"
1387
  HTYPE = constants.HTYPE_CLUSTER
1388
  REQ_BGL = False
1389

    
1390
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1391

    
1392
  class NodeImage(object):
1393
    """A class representing the logical and physical status of a node.
1394

1395
    @type name: string
1396
    @ivar name: the node name to which this object refers
1397
    @ivar volumes: a structure as returned from
1398
        L{ganeti.backend.GetVolumeList} (runtime)
1399
    @ivar instances: a list of running instances (runtime)
1400
    @ivar pinst: list of configured primary instances (config)
1401
    @ivar sinst: list of configured secondary instances (config)
1402
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1403
        instances for which this node is secondary (config)
1404
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1405
    @ivar dfree: free disk, as reported by the node (runtime)
1406
    @ivar offline: the offline status (config)
1407
    @type rpc_fail: boolean
1408
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1409
        not whether the individual keys were correct) (runtime)
1410
    @type lvm_fail: boolean
1411
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1412
    @type hyp_fail: boolean
1413
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1414
    @type ghost: boolean
1415
    @ivar ghost: whether this is a known node or not (config)
1416
    @type os_fail: boolean
1417
    @ivar os_fail: whether the RPC call didn't return valid OS data
1418
    @type oslist: list
1419
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1420
    @type vm_capable: boolean
1421
    @ivar vm_capable: whether the node can host instances
1422
    @type pv_min: float
1423
    @ivar pv_min: size in MiB of the smallest PVs
1424
    @type pv_max: float
1425
    @ivar pv_max: size in MiB of the biggest PVs
1426

1427
    """
1428
    def __init__(self, offline=False, name=None, vm_capable=True):
1429
      self.name = name
1430
      self.volumes = {}
1431
      self.instances = []
1432
      self.pinst = []
1433
      self.sinst = []
1434
      self.sbp = {}
1435
      self.mfree = 0
1436
      self.dfree = 0
1437
      self.offline = offline
1438
      self.vm_capable = vm_capable
1439
      self.rpc_fail = False
1440
      self.lvm_fail = False
1441
      self.hyp_fail = False
1442
      self.ghost = False
1443
      self.os_fail = False
1444
      self.oslist = {}
1445
      self.pv_min = None
1446
      self.pv_max = None
1447

    
1448
  def ExpandNames(self):
1449
    # This raises errors.OpPrereqError on its own:
1450
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1451

    
1452
    # Get instances in node group; this is unsafe and needs verification later
1453
    inst_names = \
1454
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1455

    
1456
    self.needed_locks = {
1457
      locking.LEVEL_INSTANCE: inst_names,
1458
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1459
      locking.LEVEL_NODE: [],
1460

    
1461
      # This opcode is run by watcher every five minutes and acquires all nodes
1462
      # for a group. It doesn't run for a long time, so it's better to acquire
1463
      # the node allocation lock as well.
1464
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1465
      }
1466

    
1467
    self.share_locks = ShareAll()
1468

    
1469
  def DeclareLocks(self, level):
1470
    if level == locking.LEVEL_NODE:
1471
      # Get members of node group; this is unsafe and needs verification later
1472
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1473

    
1474
      all_inst_info = self.cfg.GetAllInstancesInfo()
1475

    
1476
      # In Exec(), we warn about mirrored instances that have primary and
1477
      # secondary living in separate node groups. To fully verify that
1478
      # volumes for these instances are healthy, we will need to do an
1479
      # extra call to their secondaries. We ensure here those nodes will
1480
      # be locked.
1481
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1482
        # Important: access only the instances whose lock is owned
1483
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1484
          nodes.update(all_inst_info[inst].secondary_nodes)
1485

    
1486
      self.needed_locks[locking.LEVEL_NODE] = nodes
1487

    
1488
  def CheckPrereq(self):
1489
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1490
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1491

    
1492
    group_nodes = set(self.group_info.members)
1493
    group_instances = \
1494
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1495

    
1496
    unlocked_nodes = \
1497
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1498

    
1499
    unlocked_instances = \
1500
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1501

    
1502
    if unlocked_nodes:
1503
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1504
                                 utils.CommaJoin(unlocked_nodes),
1505
                                 errors.ECODE_STATE)
1506

    
1507
    if unlocked_instances:
1508
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1509
                                 utils.CommaJoin(unlocked_instances),
1510
                                 errors.ECODE_STATE)
1511

    
1512
    self.all_node_info = self.cfg.GetAllNodesInfo()
1513
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1514

    
1515
    self.my_node_names = utils.NiceSort(group_nodes)
1516
    self.my_inst_names = utils.NiceSort(group_instances)
1517

    
1518
    self.my_node_info = dict((name, self.all_node_info[name])
1519
                             for name in self.my_node_names)
1520

    
1521
    self.my_inst_info = dict((name, self.all_inst_info[name])
1522
                             for name in self.my_inst_names)
1523

    
1524
    # We detect here the nodes that will need the extra RPC calls for verifying
1525
    # split LV volumes; they should be locked.
1526
    extra_lv_nodes = set()
1527

    
1528
    for inst in self.my_inst_info.values():
1529
      if inst.disk_template in constants.DTS_INT_MIRROR:
1530
        for nname in inst.all_nodes:
1531
          if self.all_node_info[nname].group != self.group_uuid:
1532
            extra_lv_nodes.add(nname)
1533

    
1534
    unlocked_lv_nodes = \
1535
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1536

    
1537
    if unlocked_lv_nodes:
1538
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1539
                                 utils.CommaJoin(unlocked_lv_nodes),
1540
                                 errors.ECODE_STATE)
1541
    self.extra_lv_nodes = list(extra_lv_nodes)
1542

    
1543
  def _VerifyNode(self, ninfo, nresult):
1544
    """Perform some basic validation on data returned from a node.
1545

1546
      - check the result data structure is well formed and has all the
1547
        mandatory fields
1548
      - check ganeti version
1549

1550
    @type ninfo: L{objects.Node}
1551
    @param ninfo: the node to check
1552
    @param nresult: the results from the node
1553
    @rtype: boolean
1554
    @return: whether overall this call was successful (and we can expect
1555
         reasonable values in the respose)
1556

1557
    """
1558
    node = ninfo.name
1559
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1560

    
1561
    # main result, nresult should be a non-empty dict
1562
    test = not nresult or not isinstance(nresult, dict)
1563
    _ErrorIf(test, constants.CV_ENODERPC, node,
1564
                  "unable to verify node: no data returned")
1565
    if test:
1566
      return False
1567

    
1568
    # compares ganeti version
1569
    local_version = constants.PROTOCOL_VERSION
1570
    remote_version = nresult.get("version", None)
1571
    test = not (remote_version and
1572
                isinstance(remote_version, (list, tuple)) and
1573
                len(remote_version) == 2)
1574
    _ErrorIf(test, constants.CV_ENODERPC, node,
1575
             "connection to node returned invalid data")
1576
    if test:
1577
      return False
1578

    
1579
    test = local_version != remote_version[0]
1580
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1581
             "incompatible protocol versions: master %s,"
1582
             " node %s", local_version, remote_version[0])
1583
    if test:
1584
      return False
1585

    
1586
    # node seems compatible, we can actually try to look into its results
1587

    
1588
    # full package version
1589
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1590
                  constants.CV_ENODEVERSION, node,
1591
                  "software version mismatch: master %s, node %s",
1592
                  constants.RELEASE_VERSION, remote_version[1],
1593
                  code=self.ETYPE_WARNING)
1594

    
1595
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1596
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1597
      for hv_name, hv_result in hyp_result.iteritems():
1598
        test = hv_result is not None
1599
        _ErrorIf(test, constants.CV_ENODEHV, node,
1600
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1601

    
1602
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1603
    if ninfo.vm_capable and isinstance(hvp_result, list):
1604
      for item, hv_name, hv_result in hvp_result:
1605
        _ErrorIf(True, constants.CV_ENODEHV, node,
1606
                 "hypervisor %s parameter verify failure (source %s): %s",
1607
                 hv_name, item, hv_result)
1608

    
1609
    test = nresult.get(constants.NV_NODESETUP,
1610
                       ["Missing NODESETUP results"])
1611
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1612
             "; ".join(test))
1613

    
1614
    return True
1615

    
1616
  def _VerifyNodeTime(self, ninfo, nresult,
1617
                      nvinfo_starttime, nvinfo_endtime):
1618
    """Check the node time.
1619

1620
    @type ninfo: L{objects.Node}
1621
    @param ninfo: the node to check
1622
    @param nresult: the remote results for the node
1623
    @param nvinfo_starttime: the start time of the RPC call
1624
    @param nvinfo_endtime: the end time of the RPC call
1625

1626
    """
1627
    node = ninfo.name
1628
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1629

    
1630
    ntime = nresult.get(constants.NV_TIME, None)
1631
    try:
1632
      ntime_merged = utils.MergeTime(ntime)
1633
    except (ValueError, TypeError):
1634
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1635
      return
1636

    
1637
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1638
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1639
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1640
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1641
    else:
1642
      ntime_diff = None
1643

    
1644
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1645
             "Node time diverges by at least %s from master node time",
1646
             ntime_diff)
1647

    
1648
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1649
    """Check the node LVM results and update info for cross-node checks.
1650

1651
    @type ninfo: L{objects.Node}
1652
    @param ninfo: the node to check
1653
    @param nresult: the remote results for the node
1654
    @param vg_name: the configured VG name
1655
    @type nimg: L{NodeImage}
1656
    @param nimg: node image
1657

1658
    """
1659
    if vg_name is None:
1660
      return
1661

    
1662
    node = ninfo.name
1663
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1664

    
1665
    # checks vg existence and size > 20G
1666
    vglist = nresult.get(constants.NV_VGLIST, None)
1667
    test = not vglist
1668
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1669
    if not test:
1670
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1671
                                            constants.MIN_VG_SIZE)
1672
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1673

    
1674
    # Check PVs
1675
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1676
    for em in errmsgs:
1677
      self._Error(constants.CV_ENODELVM, node, em)
1678
    if pvminmax is not None:
1679
      (nimg.pv_min, nimg.pv_max) = pvminmax
1680

    
1681
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1682
    """Check cross-node DRBD version consistency.
1683

1684
    @type node_verify_infos: dict
1685
    @param node_verify_infos: infos about nodes as returned from the
1686
      node_verify call.
1687

1688
    """
1689
    node_versions = {}
1690
    for node, ndata in node_verify_infos.items():
1691
      nresult = ndata.payload
1692
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1693
      node_versions[node] = version
1694

    
1695
    if len(set(node_versions.values())) > 1:
1696
      for node, version in sorted(node_versions.items()):
1697
        msg = "DRBD version mismatch: %s" % version
1698
        self._Error(constants.CV_ENODEDRBDHELPER, node, msg,
1699
                    code=self.ETYPE_WARNING)
1700

    
1701
  def _VerifyGroupLVM(self, node_image, vg_name):
1702
    """Check cross-node consistency in LVM.
1703

1704
    @type node_image: dict
1705
    @param node_image: info about nodes, mapping from node to names to
1706
      L{NodeImage} objects
1707
    @param vg_name: the configured VG name
1708

1709
    """
1710
    if vg_name is None:
1711
      return
1712

    
1713
    # Only exlcusive storage needs this kind of checks
1714
    if not self._exclusive_storage:
1715
      return
1716

    
1717
    # exclusive_storage wants all PVs to have the same size (approximately),
1718
    # if the smallest and the biggest ones are okay, everything is fine.
1719
    # pv_min is None iff pv_max is None
1720
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1721
    if not vals:
1722
      return
1723
    (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1724
    (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1725
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1726
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1727
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1728
                  " on %s, biggest (%s MB) is on %s",
1729
                  pvmin, minnode, pvmax, maxnode)
1730

    
1731
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1732
    """Check the node bridges.
1733

1734
    @type ninfo: L{objects.Node}
1735
    @param ninfo: the node to check
1736
    @param nresult: the remote results for the node
1737
    @param bridges: the expected list of bridges
1738

1739
    """
1740
    if not bridges:
1741
      return
1742

    
1743
    node = ninfo.name
1744
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1745

    
1746
    missing = nresult.get(constants.NV_BRIDGES, None)
1747
    test = not isinstance(missing, list)
1748
    _ErrorIf(test, constants.CV_ENODENET, node,
1749
             "did not return valid bridge information")
1750
    if not test:
1751
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1752
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1753

    
1754
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1755
    """Check the results of user scripts presence and executability on the node
1756

1757
    @type ninfo: L{objects.Node}
1758
    @param ninfo: the node to check
1759
    @param nresult: the remote results for the node
1760

1761
    """
1762
    node = ninfo.name
1763

    
1764
    test = not constants.NV_USERSCRIPTS in nresult
1765
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1766
                  "did not return user scripts information")
1767

    
1768
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1769
    if not test:
1770
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1771
                    "user scripts not present or not executable: %s" %
1772
                    utils.CommaJoin(sorted(broken_scripts)))
1773

    
1774
  def _VerifyNodeNetwork(self, ninfo, nresult):
1775
    """Check the node network connectivity results.
1776

1777
    @type ninfo: L{objects.Node}
1778
    @param ninfo: the node to check
1779
    @param nresult: the remote results for the node
1780

1781
    """
1782
    node = ninfo.name
1783
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1784

    
1785
    test = constants.NV_NODELIST not in nresult
1786
    _ErrorIf(test, constants.CV_ENODESSH, node,
1787
             "node hasn't returned node ssh connectivity data")
1788
    if not test:
1789
      if nresult[constants.NV_NODELIST]:
1790
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1791
          _ErrorIf(True, constants.CV_ENODESSH, node,
1792
                   "ssh communication with node '%s': %s", a_node, a_msg)
1793

    
1794
    test = constants.NV_NODENETTEST not in nresult
1795
    _ErrorIf(test, constants.CV_ENODENET, node,
1796
             "node hasn't returned node tcp connectivity data")
1797
    if not test:
1798
      if nresult[constants.NV_NODENETTEST]:
1799
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1800
        for anode in nlist:
1801
          _ErrorIf(True, constants.CV_ENODENET, node,
1802
                   "tcp communication with node '%s': %s",
1803
                   anode, nresult[constants.NV_NODENETTEST][anode])
1804

    
1805
    test = constants.NV_MASTERIP not in nresult
1806
    _ErrorIf(test, constants.CV_ENODENET, node,
1807
             "node hasn't returned node master IP reachability data")
1808
    if not test:
1809
      if not nresult[constants.NV_MASTERIP]:
1810
        if node == self.master_node:
1811
          msg = "the master node cannot reach the master IP (not configured?)"
1812
        else:
1813
          msg = "cannot reach the master IP"
1814
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
1815

    
1816
  def _VerifyInstance(self, instance, inst_config, node_image,
1817
                      diskstatus):
1818
    """Verify an instance.
1819

1820
    This function checks to see if the required block devices are
1821
    available on the instance's node, and that the nodes are in the correct
1822
    state.
1823

1824
    """
1825
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1826
    pnode = inst_config.primary_node
1827
    pnode_img = node_image[pnode]
1828
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1829

    
1830
    node_vol_should = {}
1831
    inst_config.MapLVsByNode(node_vol_should)
1832

    
1833
    cluster = self.cfg.GetClusterInfo()
1834
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1835
                                                            self.group_info)
1836
    err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1837
    _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1838
             code=self.ETYPE_WARNING)
1839

    
1840
    for node in node_vol_should:
1841
      n_img = node_image[node]
1842
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1843
        # ignore missing volumes on offline or broken nodes
1844
        continue
1845
      for volume in node_vol_should[node]:
1846
        test = volume not in n_img.volumes
1847
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1848
                 "volume %s missing on node %s", volume, node)
1849

    
1850
    if inst_config.admin_state == constants.ADMINST_UP:
1851
      test = instance not in pnode_img.instances and not pnode_img.offline
1852
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1853
               "instance not running on its primary node %s",
1854
               pnode)
1855
      _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1856
               "instance is marked as running and lives on offline node %s",
1857
               pnode)
1858

    
1859
    diskdata = [(nname, success, status, idx)
1860
                for (nname, disks) in diskstatus.items()
1861
                for idx, (success, status) in enumerate(disks)]
1862

    
1863
    for nname, success, bdev_status, idx in diskdata:
1864
      # the 'ghost node' construction in Exec() ensures that we have a
1865
      # node here
1866
      snode = node_image[nname]
1867
      bad_snode = snode.ghost or snode.offline
1868
      _ErrorIf(inst_config.admin_state == constants.ADMINST_UP and
1869
               not success and not bad_snode,
1870
               constants.CV_EINSTANCEFAULTYDISK, instance,
1871
               "couldn't retrieve status for disk/%s on %s: %s",
1872
               idx, nname, bdev_status)
1873
      _ErrorIf((inst_config.admin_state == constants.ADMINST_UP and
1874
                success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1875
               constants.CV_EINSTANCEFAULTYDISK, instance,
1876
               "disk/%s on %s is faulty", idx, nname)
1877

    
1878
    _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1879
             constants.CV_ENODERPC, pnode, "instance %s, connection to"
1880
             " primary node failed", instance)
1881

    
1882
    _ErrorIf(len(inst_config.secondary_nodes) > 1,
1883
             constants.CV_EINSTANCELAYOUT,
1884
             instance, "instance has multiple secondary nodes: %s",
1885
             utils.CommaJoin(inst_config.secondary_nodes),
1886
             code=self.ETYPE_WARNING)
1887

    
1888
    if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1889
      # Disk template not compatible with exclusive_storage: no instance
1890
      # node should have the flag set
1891
      es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1892
                                                     inst_config.all_nodes)
1893
      es_nodes = [n for (n, es) in es_flags.items()
1894
                  if es]
1895
      _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance,
1896
               "instance has template %s, which is not supported on nodes"
1897
               " that have exclusive storage set: %s",
1898
               inst_config.disk_template, utils.CommaJoin(es_nodes))
1899

    
1900
    if inst_config.disk_template in constants.DTS_INT_MIRROR:
1901
      instance_nodes = utils.NiceSort(inst_config.all_nodes)
1902
      instance_groups = {}
1903

    
1904
      for node in instance_nodes:
1905
        instance_groups.setdefault(self.all_node_info[node].group,
1906
                                   []).append(node)
1907

    
1908
      pretty_list = [
1909
        "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1910
        # Sort so that we always list the primary node first.
1911
        for group, nodes in sorted(instance_groups.items(),
1912
                                   key=lambda (_, nodes): pnode in nodes,
1913
                                   reverse=True)]
1914

    
1915
      self._ErrorIf(len(instance_groups) > 1,
1916
                    constants.CV_EINSTANCESPLITGROUPS,
1917
                    instance, "instance has primary and secondary nodes in"
1918
                    " different groups: %s", utils.CommaJoin(pretty_list),
1919
                    code=self.ETYPE_WARNING)
1920

    
1921
    inst_nodes_offline = []
1922
    for snode in inst_config.secondary_nodes:
1923
      s_img = node_image[snode]
1924
      _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1925
               snode, "instance %s, connection to secondary node failed",
1926
               instance)
1927

    
1928
      if s_img.offline:
1929
        inst_nodes_offline.append(snode)
1930

    
1931
    # warn that the instance lives on offline nodes
1932
    _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1933
             "instance has offline secondary node(s) %s",
1934
             utils.CommaJoin(inst_nodes_offline))
1935
    # ... or ghost/non-vm_capable nodes
1936
    for node in inst_config.all_nodes:
1937
      _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1938
               instance, "instance lives on ghost node %s", node)
1939
      _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1940
               instance, "instance lives on non-vm_capable node %s", node)
1941

    
1942
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1943
    """Verify if there are any unknown volumes in the cluster.
1944

1945
    The .os, .swap and backup volumes are ignored. All other volumes are
1946
    reported as unknown.
1947

1948
    @type reserved: L{ganeti.utils.FieldSet}
1949
    @param reserved: a FieldSet of reserved volume names
1950

1951
    """
1952
    for node, n_img in node_image.items():
1953
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1954
          self.all_node_info[node].group != self.group_uuid):
1955
        # skip non-healthy nodes
1956
        continue
1957
      for volume in n_img.volumes:
1958
        test = ((node not in node_vol_should or
1959
                volume not in node_vol_should[node]) and
1960
                not reserved.Matches(volume))
1961
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1962
                      "volume %s is unknown", volume)
1963

    
1964
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1965
    """Verify N+1 Memory Resilience.
1966

1967
    Check that if one single node dies we can still start all the
1968
    instances it was primary for.
1969

1970
    """
1971
    cluster_info = self.cfg.GetClusterInfo()
1972
    for node, n_img in node_image.items():
1973
      # This code checks that every node which is now listed as
1974
      # secondary has enough memory to host all instances it is
1975
      # supposed to should a single other node in the cluster fail.
1976
      # FIXME: not ready for failover to an arbitrary node
1977
      # FIXME: does not support file-backed instances
1978
      # WARNING: we currently take into account down instances as well
1979
      # as up ones, considering that even if they're down someone
1980
      # might want to start them even in the event of a node failure.
1981
      if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1982
        # we're skipping nodes marked offline and nodes in other groups from
1983
        # the N+1 warning, since most likely we don't have good memory
1984
        # infromation from them; we already list instances living on such
1985
        # nodes, and that's enough warning
1986
        continue
1987
      #TODO(dynmem): also consider ballooning out other instances
1988
      for prinode, instances in n_img.sbp.items():
1989
        needed_mem = 0
1990
        for instance in instances:
1991
          bep = cluster_info.FillBE(instance_cfg[instance])
1992
          if bep[constants.BE_AUTO_BALANCE]:
1993
            needed_mem += bep[constants.BE_MINMEM]
1994
        test = n_img.mfree < needed_mem
1995
        self._ErrorIf(test, constants.CV_ENODEN1, node,
1996
                      "not enough memory to accomodate instance failovers"
1997
                      " should node %s fail (%dMiB needed, %dMiB available)",
1998
                      prinode, needed_mem, n_img.mfree)
1999

    
2000
  @classmethod
2001
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2002
                   (files_all, files_opt, files_mc, files_vm)):
2003
    """Verifies file checksums collected from all nodes.
2004

2005
    @param errorif: Callback for reporting errors
2006
    @param nodeinfo: List of L{objects.Node} objects
2007
    @param master_node: Name of master node
2008
    @param all_nvinfo: RPC results
2009

2010
    """
2011
    # Define functions determining which nodes to consider for a file
2012
    files2nodefn = [
2013
      (files_all, None),
2014
      (files_mc, lambda node: (node.master_candidate or
2015
                               node.name == master_node)),
2016
      (files_vm, lambda node: node.vm_capable),
2017
      ]
2018

    
2019
    # Build mapping from filename to list of nodes which should have the file
2020
    nodefiles = {}
2021
    for (files, fn) in files2nodefn:
2022
      if fn is None:
2023
        filenodes = nodeinfo
2024
      else:
2025
        filenodes = filter(fn, nodeinfo)
2026
      nodefiles.update((filename,
2027
                        frozenset(map(operator.attrgetter("name"), filenodes)))
2028
                       for filename in files)
2029

    
2030
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2031

    
2032
    fileinfo = dict((filename, {}) for filename in nodefiles)
2033
    ignore_nodes = set()
2034

    
2035
    for node in nodeinfo:
2036
      if node.offline:
2037
        ignore_nodes.add(node.name)
2038
        continue
2039

    
2040
      nresult = all_nvinfo[node.name]
2041

    
2042
      if nresult.fail_msg or not nresult.payload:
2043
        node_files = None
2044
      else:
2045
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2046
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2047
                          for (key, value) in fingerprints.items())
2048
        del fingerprints
2049

    
2050
      test = not (node_files and isinstance(node_files, dict))
2051
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
2052
              "Node did not return file checksum data")
2053
      if test:
2054
        ignore_nodes.add(node.name)
2055
        continue
2056

    
2057
      # Build per-checksum mapping from filename to nodes having it
2058
      for (filename, checksum) in node_files.items():
2059
        assert filename in nodefiles
2060
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
2061

    
2062
    for (filename, checksums) in fileinfo.items():
2063
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2064

    
2065
      # Nodes having the file
2066
      with_file = frozenset(node_name
2067
                            for nodes in fileinfo[filename].values()
2068
                            for node_name in nodes) - ignore_nodes
2069

    
2070
      expected_nodes = nodefiles[filename] - ignore_nodes
2071

    
2072
      # Nodes missing file
2073
      missing_file = expected_nodes - with_file
2074

    
2075
      if filename in files_opt:
2076
        # All or no nodes
2077
        errorif(missing_file and missing_file != expected_nodes,
2078
                constants.CV_ECLUSTERFILECHECK, None,
2079
                "File %s is optional, but it must exist on all or no"
2080
                " nodes (not found on %s)",
2081
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2082
      else:
2083
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2084
                "File %s is missing from node(s) %s", filename,
2085
                utils.CommaJoin(utils.NiceSort(missing_file)))
2086

    
2087
        # Warn if a node has a file it shouldn't
2088
        unexpected = with_file - expected_nodes
2089
        errorif(unexpected,
2090
                constants.CV_ECLUSTERFILECHECK, None,
2091
                "File %s should not exist on node(s) %s",
2092
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2093

    
2094
      # See if there are multiple versions of the file
2095
      test = len(checksums) > 1
2096
      if test:
2097
        variants = ["variant %s on %s" %
2098
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2099
                    for (idx, (checksum, nodes)) in
2100
                      enumerate(sorted(checksums.items()))]
2101
      else:
2102
        variants = []
2103

    
2104
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2105
              "File %s found with %s different checksums (%s)",
2106
              filename, len(checksums), "; ".join(variants))
2107

    
2108
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2109
                      drbd_map):
2110
    """Verifies and the node DRBD status.
2111

2112
    @type ninfo: L{objects.Node}
2113
    @param ninfo: the node to check
2114
    @param nresult: the remote results for the node
2115
    @param instanceinfo: the dict of instances
2116
    @param drbd_helper: the configured DRBD usermode helper
2117
    @param drbd_map: the DRBD map as returned by
2118
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2119

2120
    """
2121
    node = ninfo.name
2122
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2123

    
2124
    if drbd_helper:
2125
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2126
      test = (helper_result is None)
2127
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2128
               "no drbd usermode helper returned")
2129
      if helper_result:
2130
        status, payload = helper_result
2131
        test = not status
2132
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2133
                 "drbd usermode helper check unsuccessful: %s", payload)
2134
        test = status and (payload != drbd_helper)
2135
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2136
                 "wrong drbd usermode helper: %s", payload)
2137

    
2138
    # compute the DRBD minors
2139
    node_drbd = {}
2140
    for minor, instance in drbd_map[node].items():
2141
      test = instance not in instanceinfo
2142
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2143
               "ghost instance '%s' in temporary DRBD map", instance)
2144
        # ghost instance should not be running, but otherwise we
2145
        # don't give double warnings (both ghost instance and
2146
        # unallocated minor in use)
2147
      if test:
2148
        node_drbd[minor] = (instance, False)
2149
      else:
2150
        instance = instanceinfo[instance]
2151
        node_drbd[minor] = (instance.name,
2152
                            instance.admin_state == constants.ADMINST_UP)
2153

    
2154
    # and now check them
2155
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2156
    test = not isinstance(used_minors, (tuple, list))
2157
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2158
             "cannot parse drbd status file: %s", str(used_minors))
2159
    if test:
2160
      # we cannot check drbd status
2161
      return
2162

    
2163
    for minor, (iname, must_exist) in node_drbd.items():
2164
      test = minor not in used_minors and must_exist
2165
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2166
               "drbd minor %d of instance %s is not active", minor, iname)
2167
    for minor in used_minors:
2168
      test = minor not in node_drbd
2169
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2170
               "unallocated drbd minor %d is in use", minor)
2171

    
2172
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2173
    """Builds the node OS structures.
2174

2175
    @type ninfo: L{objects.Node}
2176
    @param ninfo: the node to check
2177
    @param nresult: the remote results for the node
2178
    @param nimg: the node image object
2179

2180
    """
2181
    node = ninfo.name
2182
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2183

    
2184
    remote_os = nresult.get(constants.NV_OSLIST, None)
2185
    test = (not isinstance(remote_os, list) or
2186
            not compat.all(isinstance(v, list) and len(v) == 7
2187
                           for v in remote_os))
2188

    
2189
    _ErrorIf(test, constants.CV_ENODEOS, node,
2190
             "node hasn't returned valid OS data")
2191

    
2192
    nimg.os_fail = test
2193

    
2194
    if test:
2195
      return
2196

    
2197
    os_dict = {}
2198

    
2199
    for (name, os_path, status, diagnose,
2200
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2201

    
2202
      if name not in os_dict:
2203
        os_dict[name] = []
2204

    
2205
      # parameters is a list of lists instead of list of tuples due to
2206
      # JSON lacking a real tuple type, fix it:
2207
      parameters = [tuple(v) for v in parameters]
2208
      os_dict[name].append((os_path, status, diagnose,
2209
                            set(variants), set(parameters), set(api_ver)))
2210

    
2211
    nimg.oslist = os_dict
2212

    
2213
  def _VerifyNodeOS(self, ninfo, nimg, base):
2214
    """Verifies the node OS list.
2215

2216
    @type ninfo: L{objects.Node}
2217
    @param ninfo: the node to check
2218
    @param nimg: the node image object
2219
    @param base: the 'template' node we match against (e.g. from the master)
2220

2221
    """
2222
    node = ninfo.name
2223
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2224

    
2225
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2226

    
2227
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2228
    for os_name, os_data in nimg.oslist.items():
2229
      assert os_data, "Empty OS status for OS %s?!" % os_name
2230
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2231
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2232
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2233
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2234
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2235
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2236
      # comparisons with the 'base' image
2237
      test = os_name not in base.oslist
2238
      _ErrorIf(test, constants.CV_ENODEOS, node,
2239
               "Extra OS %s not present on reference node (%s)",
2240
               os_name, base.name)
2241
      if test:
2242
        continue
2243
      assert base.oslist[os_name], "Base node has empty OS status?"
2244
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2245
      if not b_status:
2246
        # base OS is invalid, skipping
2247
        continue
2248
      for kind, a, b in [("API version", f_api, b_api),
2249
                         ("variants list", f_var, b_var),
2250
                         ("parameters", beautify_params(f_param),
2251
                          beautify_params(b_param))]:
2252
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2253
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2254
                 kind, os_name, base.name,
2255
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2256

    
2257
    # check any missing OSes
2258
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2259
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2260
             "OSes present on reference node %s but missing on this node: %s",
2261
             base.name, utils.CommaJoin(missing))
2262

    
2263
  def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2264
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2265

2266
    @type ninfo: L{objects.Node}
2267
    @param ninfo: the node to check
2268
    @param nresult: the remote results for the node
2269
    @type is_master: bool
2270
    @param is_master: Whether node is the master node
2271

2272
    """
2273
    node = ninfo.name
2274

    
2275
    if (is_master and
2276
        (constants.ENABLE_FILE_STORAGE or
2277
         constants.ENABLE_SHARED_FILE_STORAGE)):
2278
      try:
2279
        fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2280
      except KeyError:
2281
        # This should never happen
2282
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2283
                      "Node did not return forbidden file storage paths")
2284
      else:
2285
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2286
                      "Found forbidden file storage paths: %s",
2287
                      utils.CommaJoin(fspaths))
2288
    else:
2289
      self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2290
                    constants.CV_ENODEFILESTORAGEPATHS, node,
2291
                    "Node should not have returned forbidden file storage"
2292
                    " paths")
2293

    
2294
  def _VerifyOob(self, ninfo, nresult):
2295
    """Verifies out of band functionality of a node.
2296

2297
    @type ninfo: L{objects.Node}
2298
    @param ninfo: the node to check
2299
    @param nresult: the remote results for the node
2300

2301
    """
2302
    node = ninfo.name
2303
    # We just have to verify the paths on master and/or master candidates
2304
    # as the oob helper is invoked on the master
2305
    if ((ninfo.master_candidate or ninfo.master_capable) and
2306
        constants.NV_OOB_PATHS in nresult):
2307
      for path_result in nresult[constants.NV_OOB_PATHS]:
2308
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2309

    
2310
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2311
    """Verifies and updates the node volume data.
2312

2313
    This function will update a L{NodeImage}'s internal structures
2314
    with data from the remote call.
2315

2316
    @type ninfo: L{objects.Node}
2317
    @param ninfo: the node to check
2318
    @param nresult: the remote results for the node
2319
    @param nimg: the node image object
2320
    @param vg_name: the configured VG name
2321

2322
    """
2323
    node = ninfo.name
2324
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2325

    
2326
    nimg.lvm_fail = True
2327
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2328
    if vg_name is None:
2329
      pass
2330
    elif isinstance(lvdata, basestring):
2331
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2332
               utils.SafeEncode(lvdata))
2333
    elif not isinstance(lvdata, dict):
2334
      _ErrorIf(True, constants.CV_ENODELVM, node,
2335
               "rpc call to node failed (lvlist)")
2336
    else:
2337
      nimg.volumes = lvdata
2338
      nimg.lvm_fail = False
2339

    
2340
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2341
    """Verifies and updates the node instance list.
2342

2343
    If the listing was successful, then updates this node's instance
2344
    list. Otherwise, it marks the RPC call as failed for the instance
2345
    list key.
2346

2347
    @type ninfo: L{objects.Node}
2348
    @param ninfo: the node to check
2349
    @param nresult: the remote results for the node
2350
    @param nimg: the node image object
2351

2352
    """
2353
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2354
    test = not isinstance(idata, list)
2355
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2356
                  "rpc call to node failed (instancelist): %s",
2357
                  utils.SafeEncode(str(idata)))
2358
    if test:
2359
      nimg.hyp_fail = True
2360
    else:
2361
      nimg.instances = idata
2362

    
2363
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2364
    """Verifies and computes a node information map
2365

2366
    @type ninfo: L{objects.Node}
2367
    @param ninfo: the node to check
2368
    @param nresult: the remote results for the node
2369
    @param nimg: the node image object
2370
    @param vg_name: the configured VG name
2371

2372
    """
2373
    node = ninfo.name
2374
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2375

    
2376
    # try to read free memory (from the hypervisor)
2377
    hv_info = nresult.get(constants.NV_HVINFO, None)
2378
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2379
    _ErrorIf(test, constants.CV_ENODEHV, node,
2380
             "rpc call to node failed (hvinfo)")
2381
    if not test:
2382
      try:
2383
        nimg.mfree = int(hv_info["memory_free"])
2384
      except (ValueError, TypeError):
2385
        _ErrorIf(True, constants.CV_ENODERPC, node,
2386
                 "node returned invalid nodeinfo, check hypervisor")
2387

    
2388
    # FIXME: devise a free space model for file based instances as well
2389
    if vg_name is not None:
2390
      test = (constants.NV_VGLIST not in nresult or
2391
              vg_name not in nresult[constants.NV_VGLIST])
2392
      _ErrorIf(test, constants.CV_ENODELVM, node,
2393
               "node didn't return data for the volume group '%s'"
2394
               " - it is either missing or broken", vg_name)
2395
      if not test:
2396
        try:
2397
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2398
        except (ValueError, TypeError):
2399
          _ErrorIf(True, constants.CV_ENODERPC, node,
2400
                   "node returned invalid LVM info, check LVM status")
2401

    
2402
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2403
    """Gets per-disk status information for all instances.
2404

2405
    @type nodelist: list of strings
2406
    @param nodelist: Node names
2407
    @type node_image: dict of (name, L{objects.Node})
2408
    @param node_image: Node objects
2409
    @type instanceinfo: dict of (name, L{objects.Instance})
2410
    @param instanceinfo: Instance objects
2411
    @rtype: {instance: {node: [(succes, payload)]}}
2412
    @return: a dictionary of per-instance dictionaries with nodes as
2413
        keys and disk information as values; the disk information is a
2414
        list of tuples (success, payload)
2415

2416
    """
2417
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2418

    
2419
    node_disks = {}
2420
    node_disks_devonly = {}
2421
    diskless_instances = set()
2422
    diskless = constants.DT_DISKLESS
2423

    
2424
    for nname in nodelist:
2425
      node_instances = list(itertools.chain(node_image[nname].pinst,
2426
                                            node_image[nname].sinst))
2427
      diskless_instances.update(inst for inst in node_instances
2428
                                if instanceinfo[inst].disk_template == diskless)
2429
      disks = [(inst, disk)
2430
               for inst in node_instances
2431
               for disk in instanceinfo[inst].disks]
2432

    
2433
      if not disks:
2434
        # No need to collect data
2435
        continue
2436

    
2437
      node_disks[nname] = disks
2438

    
2439
      # _AnnotateDiskParams makes already copies of the disks
2440
      devonly = []
2441
      for (inst, dev) in disks:
2442
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2443
        self.cfg.SetDiskID(anno_disk, nname)
2444
        devonly.append(anno_disk)
2445

    
2446
      node_disks_devonly[nname] = devonly
2447

    
2448
    assert len(node_disks) == len(node_disks_devonly)
2449

    
2450
    # Collect data from all nodes with disks
2451
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2452
                                                          node_disks_devonly)
2453

    
2454
    assert len(result) == len(node_disks)
2455

    
2456
    instdisk = {}
2457

    
2458
    for (nname, nres) in result.items():
2459
      disks = node_disks[nname]
2460

    
2461
      if nres.offline:
2462
        # No data from this node
2463
        data = len(disks) * [(False, "node offline")]
2464
      else:
2465
        msg = nres.fail_msg
2466
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2467
                 "while getting disk information: %s", msg)
2468
        if msg:
2469
          # No data from this node
2470
          data = len(disks) * [(False, msg)]
2471
        else:
2472
          data = []
2473
          for idx, i in enumerate(nres.payload):
2474
            if isinstance(i, (tuple, list)) and len(i) == 2:
2475
              data.append(i)
2476
            else:
2477
              logging.warning("Invalid result from node %s, entry %d: %s",
2478
                              nname, idx, i)
2479
              data.append((False, "Invalid result from the remote node"))
2480

    
2481
      for ((inst, _), status) in zip(disks, data):
2482
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2483

    
2484
    # Add empty entries for diskless instances.
2485
    for inst in diskless_instances:
2486
      assert inst not in instdisk
2487
      instdisk[inst] = {}
2488

    
2489
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2490
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2491
                      compat.all(isinstance(s, (tuple, list)) and
2492
                                 len(s) == 2 for s in statuses)
2493
                      for inst, nnames in instdisk.items()
2494
                      for nname, statuses in nnames.items())
2495
    if __debug__:
2496
      instdisk_keys = set(instdisk)
2497
      instanceinfo_keys = set(instanceinfo)
2498
      assert instdisk_keys == instanceinfo_keys, \
2499
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2500
         (instdisk_keys, instanceinfo_keys))
2501

    
2502
    return instdisk
2503

    
2504
  @staticmethod
2505
  def _SshNodeSelector(group_uuid, all_nodes):
2506
    """Create endless iterators for all potential SSH check hosts.
2507

2508
    """
2509
    nodes = [node for node in all_nodes
2510
             if (node.group != group_uuid and
2511
                 not node.offline)]
2512
    keyfunc = operator.attrgetter("group")
2513

    
2514
    return map(itertools.cycle,
2515
               [sorted(map(operator.attrgetter("name"), names))
2516
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2517
                                                  keyfunc)])
2518

    
2519
  @classmethod
2520
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2521
    """Choose which nodes should talk to which other nodes.
2522

2523
    We will make nodes contact all nodes in their group, and one node from
2524
    every other group.
2525

2526
    @warning: This algorithm has a known issue if one node group is much
2527
      smaller than others (e.g. just one node). In such a case all other
2528
      nodes will talk to the single node.
2529

2530
    """
2531
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2532
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2533

    
2534
    return (online_nodes,
2535
            dict((name, sorted([i.next() for i in sel]))
2536
                 for name in online_nodes))
2537

    
2538
  def BuildHooksEnv(self):
2539
    """Build hooks env.
2540

2541
    Cluster-Verify hooks just ran in the post phase and their failure makes
2542
    the output be logged in the verify output and the verification to fail.
2543

2544
    """
2545
    env = {
2546
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2547
      }
2548

    
2549
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2550
               for node in self.my_node_info.values())
2551

    
2552
    return env
2553

    
2554
  def BuildHooksNodes(self):
2555
    """Build hooks nodes.
2556

2557
    """
2558
    return ([], self.my_node_names)
2559

    
2560
  def Exec(self, feedback_fn):
2561
    """Verify integrity of the node group, performing various test on nodes.
2562

2563
    """
2564
    # This method has too many local variables. pylint: disable=R0914
2565
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2566

    
2567
    if not self.my_node_names:
2568
      # empty node group
2569
      feedback_fn("* Empty node group, skipping verification")
2570
      return True
2571

    
2572
    self.bad = False
2573
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2574
    verbose = self.op.verbose
2575
    self._feedback_fn = feedback_fn
2576

    
2577
    vg_name = self.cfg.GetVGName()
2578
    drbd_helper = self.cfg.GetDRBDHelper()
2579
    cluster = self.cfg.GetClusterInfo()
2580
    hypervisors = cluster.enabled_hypervisors
2581
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2582

    
2583
    i_non_redundant = [] # Non redundant instances
2584
    i_non_a_balanced = [] # Non auto-balanced instances
2585
    i_offline = 0 # Count of offline instances
2586
    n_offline = 0 # Count of offline nodes
2587
    n_drained = 0 # Count of nodes being drained
2588
    node_vol_should = {}
2589

    
2590
    # FIXME: verify OS list
2591

    
2592
    # File verification
2593
    filemap = ComputeAncillaryFiles(cluster, False)
2594

    
2595
    # do local checksums
2596
    master_node = self.master_node = self.cfg.GetMasterNode()
2597
    master_ip = self.cfg.GetMasterIP()
2598

    
2599
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2600

    
2601
    user_scripts = []
2602
    if self.cfg.GetUseExternalMipScript():
2603
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2604

    
2605
    node_verify_param = {
2606
      constants.NV_FILELIST:
2607
        map(vcluster.MakeVirtualPath,
2608
            utils.UniqueSequence(filename
2609
                                 for files in filemap
2610
                                 for filename in files)),
2611
      constants.NV_NODELIST:
2612
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2613
                                  self.all_node_info.values()),
2614
      constants.NV_HYPERVISOR: hypervisors,
2615
      constants.NV_HVPARAMS:
2616
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2617
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2618
                                 for node in node_data_list
2619
                                 if not node.offline],
2620
      constants.NV_INSTANCELIST: hypervisors,
2621
      constants.NV_VERSION: None,
2622
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2623
      constants.NV_NODESETUP: None,
2624
      constants.NV_TIME: None,
2625
      constants.NV_MASTERIP: (master_node, master_ip),
2626
      constants.NV_OSLIST: None,
2627
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2628
      constants.NV_USERSCRIPTS: user_scripts,
2629
      }
2630

    
2631
    if vg_name is not None:
2632
      node_verify_param[constants.NV_VGLIST] = None
2633
      node_verify_param[constants.NV_LVLIST] = vg_name
2634
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2635

    
2636
    if drbd_helper:
2637
      node_verify_param[constants.NV_DRBDVERSION] = None
2638
      node_verify_param[constants.NV_DRBDLIST] = None
2639
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2640

    
2641
    if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2642
      # Load file storage paths only from master node
2643
      node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2644

    
2645
    # bridge checks
2646
    # FIXME: this needs to be changed per node-group, not cluster-wide
2647
    bridges = set()
2648
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2649
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2650
      bridges.add(default_nicpp[constants.NIC_LINK])
2651
    for instance in self.my_inst_info.values():
2652
      for nic in instance.nics:
2653
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2654
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2655
          bridges.add(full_nic[constants.NIC_LINK])
2656

    
2657
    if bridges:
2658
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2659

    
2660
    # Build our expected cluster state
2661
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2662
                                                 name=node.name,
2663
                                                 vm_capable=node.vm_capable))
2664
                      for node in node_data_list)
2665

    
2666
    # Gather OOB paths
2667
    oob_paths = []
2668
    for node in self.all_node_info.values():
2669
      path = SupportsOob(self.cfg, node)
2670
      if path and path not in oob_paths:
2671
        oob_paths.append(path)
2672

    
2673
    if oob_paths:
2674
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2675

    
2676
    for instance in self.my_inst_names:
2677
      inst_config = self.my_inst_info[instance]
2678
      if inst_config.admin_state == constants.ADMINST_OFFLINE:
2679
        i_offline += 1
2680

    
2681
      for nname in inst_config.all_nodes:
2682
        if nname not in node_image:
2683
          gnode = self.NodeImage(name=nname)
2684
          gnode.ghost = (nname not in self.all_node_info)
2685
          node_image[nname] = gnode
2686

    
2687
      inst_config.MapLVsByNode(node_vol_should)
2688

    
2689
      pnode = inst_config.primary_node
2690
      node_image[pnode].pinst.append(instance)
2691

    
2692
      for snode in inst_config.secondary_nodes:
2693
        nimg = node_image[snode]
2694
        nimg.sinst.append(instance)
2695
        if pnode not in nimg.sbp:
2696
          nimg.sbp[pnode] = []
2697
        nimg.sbp[pnode].append(instance)
2698

    
2699
    es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2700
    # The value of exclusive_storage should be the same across the group, so if
2701
    # it's True for at least a node, we act as if it were set for all the nodes
2702
    self._exclusive_storage = compat.any(es_flags.values())
2703
    if self._exclusive_storage:
2704
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2705

    
2706
    # At this point, we have the in-memory data structures complete,
2707
    # except for the runtime information, which we'll gather next
2708

    
2709
    # Due to the way our RPC system works, exact response times cannot be
2710
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2711
    # time before and after executing the request, we can at least have a time
2712
    # window.
2713
    nvinfo_starttime = time.time()
2714
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2715
                                           node_verify_param,
2716
                                           self.cfg.GetClusterName())
2717
    nvinfo_endtime = time.time()
2718

    
2719
    if self.extra_lv_nodes and vg_name is not None:
2720
      extra_lv_nvinfo = \
2721
          self.rpc.call_node_verify(self.extra_lv_nodes,
2722
                                    {constants.NV_LVLIST: vg_name},
2723
                                    self.cfg.GetClusterName())
2724
    else:
2725
      extra_lv_nvinfo = {}
2726

    
2727
    all_drbd_map = self.cfg.ComputeDRBDMap()
2728

    
2729
    feedback_fn("* Gathering disk information (%s nodes)" %
2730
                len(self.my_node_names))
2731
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2732
                                     self.my_inst_info)
2733

    
2734
    feedback_fn("* Verifying configuration file consistency")
2735

    
2736
    # If not all nodes are being checked, we need to make sure the master node
2737
    # and a non-checked vm_capable node are in the list.
2738
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2739
    if absent_nodes:
2740
      vf_nvinfo = all_nvinfo.copy()
2741
      vf_node_info = list(self.my_node_info.values())
2742
      additional_nodes = []
2743
      if master_node not in self.my_node_info:
2744
        additional_nodes.append(master_node)
2745
        vf_node_info.append(self.all_node_info[master_node])
2746
      # Add the first vm_capable node we find which is not included,
2747
      # excluding the master node (which we already have)
2748
      for node in absent_nodes:
2749
        nodeinfo = self.all_node_info[node]
2750
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2751
            node != master_node):
2752
          additional_nodes.append(node)
2753
          vf_node_info.append(self.all_node_info[node])
2754
          break
2755
      key = constants.NV_FILELIST
2756
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2757
                                                 {key: node_verify_param[key]},
2758
                                                 self.cfg.GetClusterName()))
2759
    else:
2760
      vf_nvinfo = all_nvinfo
2761
      vf_node_info = self.my_node_info.values()
2762

    
2763
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2764

    
2765
    feedback_fn("* Verifying node status")
2766

    
2767
    refos_img = None
2768

    
2769
    for node_i in node_data_list:
2770
      node = node_i.name
2771
      nimg = node_image[node]
2772

    
2773
      if node_i.offline:
2774
        if verbose:
2775
          feedback_fn("* Skipping offline node %s" % (node,))
2776
        n_offline += 1
2777
        continue
2778

    
2779
      if node == master_node:
2780
        ntype = "master"
2781
      elif node_i.master_candidate:
2782
        ntype = "master candidate"
2783
      elif node_i.drained:
2784
        ntype = "drained"
2785
        n_drained += 1
2786
      else:
2787
        ntype = "regular"
2788
      if verbose:
2789
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2790

    
2791
      msg = all_nvinfo[node].fail_msg
2792
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2793
               msg)
2794
      if msg:
2795
        nimg.rpc_fail = True
2796
        continue
2797

    
2798
      nresult = all_nvinfo[node].payload
2799

    
2800
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2801
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2802
      self._VerifyNodeNetwork(node_i, nresult)
2803
      self._VerifyNodeUserScripts(node_i, nresult)
2804
      self._VerifyOob(node_i, nresult)
2805
      self._VerifyFileStoragePaths(node_i, nresult,
2806
                                   node == master_node)
2807

    
2808
      if nimg.vm_capable:
2809
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2810
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2811
                             all_drbd_map)
2812

    
2813
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2814
        self._UpdateNodeInstances(node_i, nresult, nimg)
2815
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2816
        self._UpdateNodeOS(node_i, nresult, nimg)
2817

    
2818
        if not nimg.os_fail:
2819
          if refos_img is None:
2820
            refos_img = nimg
2821
          self._VerifyNodeOS(node_i, nimg, refos_img)
2822
        self._VerifyNodeBridges(node_i, nresult, bridges)
2823

    
2824
        # Check whether all running instancies are primary for the node. (This
2825
        # can no longer be done from _VerifyInstance below, since some of the
2826
        # wrong instances could be from other node groups.)
2827
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2828

    
2829
        for inst in non_primary_inst:
2830
          test = inst in self.all_inst_info
2831
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2832
                   "instance should not run on node %s", node_i.name)
2833
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2834
                   "node is running unknown instance %s", inst)
2835

    
2836
    self._VerifyGroupDRBDVersion(all_nvinfo)
2837
    self._VerifyGroupLVM(node_image, vg_name)
2838

    
2839
    for node, result in extra_lv_nvinfo.items():
2840
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2841
                              node_image[node], vg_name)
2842

    
2843
    feedback_fn("* Verifying instance status")
2844
    for instance in self.my_inst_names:
2845
      if verbose:
2846
        feedback_fn("* Verifying instance %s" % instance)
2847
      inst_config = self.my_inst_info[instance]
2848
      self._VerifyInstance(instance, inst_config, node_image,
2849
                           instdisk[instance])
2850

    
2851
      # If the instance is non-redundant we cannot survive losing its primary
2852
      # node, so we are not N+1 compliant.
2853
      if inst_config.disk_template not in constants.DTS_MIRRORED:
2854
        i_non_redundant.append(instance)
2855

    
2856
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2857
        i_non_a_balanced.append(instance)
2858

    
2859
    feedback_fn("* Verifying orphan volumes")
2860
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2861

    
2862
    # We will get spurious "unknown volume" warnings if any node of this group
2863
    # is secondary for an instance whose primary is in another group. To avoid
2864
    # them, we find these instances and add their volumes to node_vol_should.
2865
    for inst in self.all_inst_info.values():
2866
      for secondary in inst.secondary_nodes:
2867
        if (secondary in self.my_node_info
2868
            and inst.name not in self.my_inst_info):
2869
          inst.MapLVsByNode(node_vol_should)
2870
          break
2871

    
2872
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2873

    
2874
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2875
      feedback_fn("* Verifying N+1 Memory redundancy")
2876
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2877

    
2878
    feedback_fn("* Other Notes")
2879
    if i_non_redundant:
2880
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2881
                  % len(i_non_redundant))
2882

    
2883
    if i_non_a_balanced:
2884
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2885
                  % len(i_non_a_balanced))
2886

    
2887
    if i_offline:
2888
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2889

    
2890
    if n_offline:
2891
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2892

    
2893
    if n_drained:
2894
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2895

    
2896
    return not self.bad
2897

    
2898
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2899
    """Analyze the post-hooks' result
2900

2901
    This method analyses the hook result, handles it, and sends some
2902
    nicely-formatted feedback back to the user.
2903

2904
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2905
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2906
    @param hooks_results: the results of the multi-node hooks rpc call
2907
    @param feedback_fn: function used send feedback back to the caller
2908
    @param lu_result: previous Exec result
2909
    @return: the new Exec result, based on the previous result
2910
        and hook results
2911

2912
    """
2913
    # We only really run POST phase hooks, only for non-empty groups,
2914
    # and are only interested in their results
2915
    if not self.my_node_names:
2916
      # empty node group
2917
      pass
2918
    elif phase == constants.HOOKS_PHASE_POST:
2919
      # Used to change hooks' output to proper indentation
2920
      feedback_fn("* Hooks Results")
2921
      assert hooks_results, "invalid result from hooks"
2922

    
2923
      for node_name in hooks_results:
2924
        res = hooks_results[node_name]
2925
        msg = res.fail_msg
2926
        test = msg and not res.offline
2927
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2928
                      "Communication failure in hooks execution: %s", msg)
2929
        if res.offline or msg:
2930
          # No need to investigate payload if node is offline or gave
2931
          # an error.
2932
          continue
2933
        for script, hkr, output in res.payload:
2934
          test = hkr == constants.HKR_FAIL
2935
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2936
                        "Script %s failed, output:", script)
2937
          if test:
2938
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2939
            feedback_fn("%s" % output)
2940
            lu_result = False
2941

    
2942
    return lu_result
2943

    
2944

    
2945
class LUClusterVerifyDisks(NoHooksLU):
2946
  """Verifies the cluster disks status.
2947

2948
  """
2949
  REQ_BGL = False
2950

    
2951
  def ExpandNames(self):
2952
    self.share_locks = ShareAll()
2953
    self.needed_locks = {
2954
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2955
      }
2956

    
2957
  def Exec(self, feedback_fn):
2958
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2959

    
2960
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2961
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2962
                           for group in group_names])