Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 40d93e3b

History | View | Annotate | Download (108.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.name,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, master_params.name)
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
170
                                                     master_params, ems)
171
    if result.fail_msg:
172
      self.LogWarning("Error disabling the master IP address: %s",
173
                      result.fail_msg)
174

    
175
    return master_params.name
176

    
177

    
178
class LUClusterPostInit(LogicalUnit):
179
  """Logical unit for running hooks after cluster initialization.
180

181
  """
182
  HPATH = "cluster-init"
183
  HTYPE = constants.HTYPE_CLUSTER
184

    
185
  def BuildHooksEnv(self):
186
    """Build hooks env.
187

188
    """
189
    return {
190
      "OP_TARGET": self.cfg.GetClusterName(),
191
      }
192

    
193
  def BuildHooksNodes(self):
194
    """Build hooks nodes.
195

196
    """
197
    return ([], [self.cfg.GetMasterNode()])
198

    
199
  def Exec(self, feedback_fn):
200
    """Nothing to do.
201

202
    """
203
    return True
204

    
205

    
206
class ClusterQuery(QueryBase):
207
  FIELDS = query.CLUSTER_FIELDS
208

    
209
  #: Do not sort (there is only one item)
210
  SORT_FIELD = None
211

    
212
  def ExpandNames(self, lu):
213
    lu.needed_locks = {}
214

    
215
    # The following variables interact with _QueryBase._GetNames
216
    self.wanted = locking.ALL_SET
217
    self.do_locking = self.use_locking
218

    
219
    if self.do_locking:
220
      raise errors.OpPrereqError("Can not use locking for cluster queries",
221
                                 errors.ECODE_INVAL)
222

    
223
  def DeclareLocks(self, lu, level):
224
    pass
225

    
226
  def _GetQueryData(self, lu):
227
    """Computes the list of nodes and their attributes.
228

229
    """
230
    # Locking is not used
231
    assert not (compat.any(lu.glm.is_owned(level)
232
                           for level in locking.LEVELS
233
                           if level != locking.LEVEL_CLUSTER) or
234
                self.do_locking or self.use_locking)
235

    
236
    if query.CQ_CONFIG in self.requested_data:
237
      cluster = lu.cfg.GetClusterInfo()
238
    else:
239
      cluster = NotImplemented
240

    
241
    if query.CQ_QUEUE_DRAINED in self.requested_data:
242
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243
    else:
244
      drain_flag = NotImplemented
245

    
246
    if query.CQ_WATCHER_PAUSE in self.requested_data:
247
      master_name = lu.cfg.GetMasterNode()
248

    
249
      result = lu.rpc.call_get_watcher_pause(master_name)
250
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
251
                   master_name)
252

    
253
      watcher_pause = result.payload
254
    else:
255
      watcher_pause = NotImplemented
256

    
257
    return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
258

    
259

    
260
class LUClusterQuery(NoHooksLU):
261
  """Query cluster configuration.
262

263
  """
264
  REQ_BGL = False
265

    
266
  def ExpandNames(self):
267
    self.needed_locks = {}
268

    
269
  def Exec(self, feedback_fn):
270
    """Return cluster config.
271

272
    """
273
    cluster = self.cfg.GetClusterInfo()
274
    os_hvp = {}
275

    
276
    # Filter just for enabled hypervisors
277
    for os_name, hv_dict in cluster.os_hvp.items():
278
      os_hvp[os_name] = {}
279
      for hv_name, hv_params in hv_dict.items():
280
        if hv_name in cluster.enabled_hypervisors:
281
          os_hvp[os_name][hv_name] = hv_params
282

    
283
    # Convert ip_family to ip_version
284
    primary_ip_version = constants.IP4_VERSION
285
    if cluster.primary_ip_family == netutils.IP6Address.family:
286
      primary_ip_version = constants.IP6_VERSION
287

    
288
    result = {
289
      "software_version": constants.RELEASE_VERSION,
290
      "protocol_version": constants.PROTOCOL_VERSION,
291
      "config_version": constants.CONFIG_VERSION,
292
      "os_api_version": max(constants.OS_API_VERSIONS),
293
      "export_version": constants.EXPORT_VERSION,
294
      "architecture": runtime.GetArchInfo(),
295
      "name": cluster.cluster_name,
296
      "master": cluster.master_node,
297
      "default_hypervisor": cluster.primary_hypervisor,
298
      "enabled_hypervisors": cluster.enabled_hypervisors,
299
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
300
                        for hypervisor_name in cluster.enabled_hypervisors]),
301
      "os_hvp": os_hvp,
302
      "beparams": cluster.beparams,
303
      "osparams": cluster.osparams,
304
      "ipolicy": cluster.ipolicy,
305
      "nicparams": cluster.nicparams,
306
      "ndparams": cluster.ndparams,
307
      "diskparams": cluster.diskparams,
308
      "candidate_pool_size": cluster.candidate_pool_size,
309
      "master_netdev": cluster.master_netdev,
310
      "master_netmask": cluster.master_netmask,
311
      "use_external_mip_script": cluster.use_external_mip_script,
312
      "volume_group_name": cluster.volume_group_name,
313
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
314
      "file_storage_dir": cluster.file_storage_dir,
315
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
316
      "maintain_node_health": cluster.maintain_node_health,
317
      "ctime": cluster.ctime,
318
      "mtime": cluster.mtime,
319
      "uuid": cluster.uuid,
320
      "tags": list(cluster.GetTags()),
321
      "uid_pool": cluster.uid_pool,
322
      "default_iallocator": cluster.default_iallocator,
323
      "reserved_lvs": cluster.reserved_lvs,
324
      "primary_ip_version": primary_ip_version,
325
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
326
      "hidden_os": cluster.hidden_os,
327
      "blacklisted_os": cluster.blacklisted_os,
328
      }
329

    
330
    return result
331

    
332

    
333
class LUClusterRedistConf(NoHooksLU):
334
  """Force the redistribution of cluster configuration.
335

336
  This is a very simple LU.
337

338
  """
339
  REQ_BGL = False
340

    
341
  def ExpandNames(self):
342
    self.needed_locks = {
343
      locking.LEVEL_NODE: locking.ALL_SET,
344
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
345
    }
346
    self.share_locks = ShareAll()
347

    
348
  def Exec(self, feedback_fn):
349
    """Redistribute the configuration.
350

351
    """
352
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
353
    RedistributeAncillaryFiles(self)
354

    
355

    
356
class LUClusterRename(LogicalUnit):
357
  """Rename the cluster.
358

359
  """
360
  HPATH = "cluster-rename"
361
  HTYPE = constants.HTYPE_CLUSTER
362

    
363
  def BuildHooksEnv(self):
364
    """Build hooks env.
365

366
    """
367
    return {
368
      "OP_TARGET": self.cfg.GetClusterName(),
369
      "NEW_NAME": self.op.name,
370
      }
371

    
372
  def BuildHooksNodes(self):
373
    """Build hooks nodes.
374

375
    """
376
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
377

    
378
  def CheckPrereq(self):
379
    """Verify that the passed name is a valid one.
380

381
    """
382
    hostname = netutils.GetHostname(name=self.op.name,
383
                                    family=self.cfg.GetPrimaryIPFamily())
384

    
385
    new_name = hostname.name
386
    self.ip = new_ip = hostname.ip
387
    old_name = self.cfg.GetClusterName()
388
    old_ip = self.cfg.GetMasterIP()
389
    if new_name == old_name and new_ip == old_ip:
390
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
391
                                 " cluster has changed",
392
                                 errors.ECODE_INVAL)
393
    if new_ip != old_ip:
394
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
395
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
396
                                   " reachable on the network" %
397
                                   new_ip, errors.ECODE_NOTUNIQUE)
398

    
399
    self.op.name = new_name
400

    
401
  def Exec(self, feedback_fn):
402
    """Rename the cluster.
403

404
    """
405
    clustername = self.op.name
406
    new_ip = self.ip
407

    
408
    # shutdown the master IP
409
    master_params = self.cfg.GetMasterNetworkParameters()
410
    ems = self.cfg.GetUseExternalMipScript()
411
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
412
                                                     master_params, ems)
413
    result.Raise("Could not disable the master role")
414

    
415
    try:
416
      cluster = self.cfg.GetClusterInfo()
417
      cluster.cluster_name = clustername
418
      cluster.master_ip = new_ip
419
      self.cfg.Update(cluster, feedback_fn)
420

    
421
      # update the known hosts file
422
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
423
      node_list = self.cfg.GetOnlineNodeList()
424
      try:
425
        node_list.remove(master_params.name)
426
      except ValueError:
427
        pass
428
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
429
    finally:
430
      master_params.ip = new_ip
431
      result = self.rpc.call_node_activate_master_ip(master_params.name,
432
                                                     master_params, ems)
433
      msg = result.fail_msg
434
      if msg:
435
        self.LogWarning("Could not re-enable the master role on"
436
                        " the master, please restart manually: %s", msg)
437

    
438
    return clustername
439

    
440

    
441
class LUClusterRepairDiskSizes(NoHooksLU):
442
  """Verifies the cluster disks sizes.
443

444
  """
445
  REQ_BGL = False
446

    
447
  def ExpandNames(self):
448
    if self.op.instances:
449
      self.wanted_names = GetWantedInstances(self, self.op.instances)
450
      # Not getting the node allocation lock as only a specific set of
451
      # instances (and their nodes) is going to be acquired
452
      self.needed_locks = {
453
        locking.LEVEL_NODE_RES: [],
454
        locking.LEVEL_INSTANCE: self.wanted_names,
455
        }
456
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
457
    else:
458
      self.wanted_names = None
459
      self.needed_locks = {
460
        locking.LEVEL_NODE_RES: locking.ALL_SET,
461
        locking.LEVEL_INSTANCE: locking.ALL_SET,
462

    
463
        # This opcode is acquires the node locks for all instances
464
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
465
        }
466

    
467
    self.share_locks = {
468
      locking.LEVEL_NODE_RES: 1,
469
      locking.LEVEL_INSTANCE: 0,
470
      locking.LEVEL_NODE_ALLOC: 1,
471
      }
472

    
473
  def DeclareLocks(self, level):
474
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
475
      self._LockInstancesNodes(primary_only=True, level=level)
476

    
477
  def CheckPrereq(self):
478
    """Check prerequisites.
479

480
    This only checks the optional instance list against the existing names.
481

482
    """
483
    if self.wanted_names is None:
484
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
485

    
486
    self.wanted_instances = \
487
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
488

    
489
  def _EnsureChildSizes(self, disk):
490
    """Ensure children of the disk have the needed disk size.
491

492
    This is valid mainly for DRBD8 and fixes an issue where the
493
    children have smaller disk size.
494

495
    @param disk: an L{ganeti.objects.Disk} object
496

497
    """
498
    if disk.dev_type == constants.LD_DRBD8:
499
      assert disk.children, "Empty children for DRBD8?"
500
      fchild = disk.children[0]
501
      mismatch = fchild.size < disk.size
502
      if mismatch:
503
        self.LogInfo("Child disk has size %d, parent %d, fixing",
504
                     fchild.size, disk.size)
505
        fchild.size = disk.size
506

    
507
      # and we recurse on this child only, not on the metadev
508
      return self._EnsureChildSizes(fchild) or mismatch
509
    else:
510
      return False
511

    
512
  def Exec(self, feedback_fn):
513
    """Verify the size of cluster disks.
514

515
    """
516
    # TODO: check child disks too
517
    # TODO: check differences in size between primary/secondary nodes
518
    per_node_disks = {}
519
    for instance in self.wanted_instances:
520
      pnode = instance.primary_node
521
      if pnode not in per_node_disks:
522
        per_node_disks[pnode] = []
523
      for idx, disk in enumerate(instance.disks):
524
        per_node_disks[pnode].append((instance, idx, disk))
525

    
526
    assert not (frozenset(per_node_disks.keys()) -
527
                self.owned_locks(locking.LEVEL_NODE_RES)), \
528
      "Not owning correct locks"
529
    assert not self.owned_locks(locking.LEVEL_NODE)
530

    
531
    es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
532
                                                   per_node_disks.keys())
533

    
534
    changed = []
535
    for node, dskl in per_node_disks.items():
536
      newl = [v[2].Copy() for v in dskl]
537
      for dsk in newl:
538
        self.cfg.SetDiskID(dsk, node)
539
      result = self.rpc.call_blockdev_getdimensions(node, newl)
540
      if result.fail_msg:
541
        self.LogWarning("Failure in blockdev_getdimensions call to node"
542
                        " %s, ignoring", node)
543
        continue
544
      if len(result.payload) != len(dskl):
545
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
546
                        " result.payload=%s", node, len(dskl), result.payload)
547
        self.LogWarning("Invalid result from node %s, ignoring node results",
548
                        node)
549
        continue
550
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
551
        if dimensions is None:
552
          self.LogWarning("Disk %d of instance %s did not return size"
553
                          " information, ignoring", idx, instance.name)
554
          continue
555
        if not isinstance(dimensions, (tuple, list)):
556
          self.LogWarning("Disk %d of instance %s did not return valid"
557
                          " dimension information, ignoring", idx,
558
                          instance.name)
559
          continue
560
        (size, spindles) = dimensions
561
        if not isinstance(size, (int, long)):
562
          self.LogWarning("Disk %d of instance %s did not return valid"
563
                          " size information, ignoring", idx, instance.name)
564
          continue
565
        size = size >> 20
566
        if size != disk.size:
567
          self.LogInfo("Disk %d of instance %s has mismatched size,"
568
                       " correcting: recorded %d, actual %d", idx,
569
                       instance.name, disk.size, size)
570
          disk.size = size
571
          self.cfg.Update(instance, feedback_fn)
572
          changed.append((instance.name, idx, "size", size))
573
        if es_flags[node]:
574
          if spindles is None:
575
            self.LogWarning("Disk %d of instance %s did not return valid"
576
                            " spindles information, ignoring", idx,
577
                            instance.name)
578
          elif disk.spindles is None or disk.spindles != spindles:
579
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
580
                         " correcting: recorded %s, actual %s",
581
                         idx, instance.name, disk.spindles, spindles)
582
            disk.spindles = spindles
583
            self.cfg.Update(instance, feedback_fn)
584
            changed.append((instance.name, idx, "spindles", disk.spindles))
585
        if self._EnsureChildSizes(disk):
586
          self.cfg.Update(instance, feedback_fn)
587
          changed.append((instance.name, idx, "size", disk.size))
588
    return changed
589

    
590

    
591
def _ValidateNetmask(cfg, netmask):
592
  """Checks if a netmask is valid.
593

594
  @type cfg: L{config.ConfigWriter}
595
  @param cfg: The cluster configuration
596
  @type netmask: int
597
  @param netmask: the netmask to be verified
598
  @raise errors.OpPrereqError: if the validation fails
599

600
  """
601
  ip_family = cfg.GetPrimaryIPFamily()
602
  try:
603
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
604
  except errors.ProgrammerError:
605
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
606
                               ip_family, errors.ECODE_INVAL)
607
  if not ipcls.ValidateNetmask(netmask):
608
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
609
                               (netmask), errors.ECODE_INVAL)
610

    
611

    
612
class LUClusterSetParams(LogicalUnit):
613
  """Change the parameters of the cluster.
614

615
  """
616
  HPATH = "cluster-modify"
617
  HTYPE = constants.HTYPE_CLUSTER
618
  REQ_BGL = False
619

    
620
  def CheckArguments(self):
621
    """Check parameters
622

623
    """
624
    if self.op.uid_pool:
625
      uidpool.CheckUidPool(self.op.uid_pool)
626

    
627
    if self.op.add_uids:
628
      uidpool.CheckUidPool(self.op.add_uids)
629

    
630
    if self.op.remove_uids:
631
      uidpool.CheckUidPool(self.op.remove_uids)
632

    
633
    if self.op.master_netmask is not None:
634
      _ValidateNetmask(self.cfg, self.op.master_netmask)
635

    
636
    if self.op.diskparams:
637
      for dt_params in self.op.diskparams.values():
638
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
639
      try:
640
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
641
      except errors.OpPrereqError, err:
642
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
643
                                   errors.ECODE_INVAL)
644

    
645
  def ExpandNames(self):
646
    # FIXME: in the future maybe other cluster params won't require checking on
647
    # all nodes to be modified.
648
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
649
    # resource locks the right thing, shouldn't it be the BGL instead?
650
    self.needed_locks = {
651
      locking.LEVEL_NODE: locking.ALL_SET,
652
      locking.LEVEL_INSTANCE: locking.ALL_SET,
653
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
654
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
655
    }
656
    self.share_locks = ShareAll()
657

    
658
  def BuildHooksEnv(self):
659
    """Build hooks env.
660

661
    """
662
    return {
663
      "OP_TARGET": self.cfg.GetClusterName(),
664
      "NEW_VG_NAME": self.op.vg_name,
665
      }
666

    
667
  def BuildHooksNodes(self):
668
    """Build hooks nodes.
669

670
    """
671
    mn = self.cfg.GetMasterNode()
672
    return ([mn], [mn])
673

    
674
  def _CheckVgName(self, node_list, enabled_disk_templates,
675
                   new_enabled_disk_templates):
676
    """Check the consistency of the vg name on all nodes and in case it gets
677
       unset whether there are instances still using it.
678

679
    """
680
    if self.op.vg_name is not None and not self.op.vg_name:
681
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
682
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
683
                                   " instances exist", errors.ECODE_INVAL)
684

    
685
    if (self.op.vg_name is not None and
686
        utils.IsLvmEnabled(enabled_disk_templates)) or \
687
           (self.cfg.GetVGName() is not None and
688
            utils.LvmGetsEnabled(enabled_disk_templates,
689
                                 new_enabled_disk_templates)):
690
      self._CheckVgNameOnNodes(node_list)
691

    
692
  def _CheckVgNameOnNodes(self, node_list):
693
    """Check the status of the volume group on each node.
694

695
    """
696
    vglist = self.rpc.call_vg_list(node_list)
697
    for node in node_list:
698
      msg = vglist[node].fail_msg
699
      if msg:
700
        # ignoring down node
701
        self.LogWarning("Error while gathering data on node %s"
702
                        " (ignoring node): %s", node, msg)
703
        continue
704
      vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
705
                                            self.op.vg_name,
706
                                            constants.MIN_VG_SIZE)
707
      if vgstatus:
708
        raise errors.OpPrereqError("Error on node '%s': %s" %
709
                                   (node, vgstatus), errors.ECODE_ENVIRON)
710

    
711
  def _GetEnabledDiskTemplates(self, cluster):
712
    """Determines the enabled disk templates and the subset of disk templates
713
       that are newly enabled by this operation.
714

715
    """
716
    enabled_disk_templates = None
717
    new_enabled_disk_templates = []
718
    if self.op.enabled_disk_templates:
719
      enabled_disk_templates = self.op.enabled_disk_templates
720
      new_enabled_disk_templates = \
721
        list(set(enabled_disk_templates)
722
             - set(cluster.enabled_disk_templates))
723
    else:
724
      enabled_disk_templates = cluster.enabled_disk_templates
725
    return (enabled_disk_templates, new_enabled_disk_templates)
726

    
727
  def CheckPrereq(self):
728
    """Check prerequisites.
729

730
    This checks whether the given params don't conflict and
731
    if the given volume group is valid.
732

733
    """
734
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
735
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
736
        raise errors.OpPrereqError("Cannot disable drbd helper while"
737
                                   " drbd-based instances exist",
738
                                   errors.ECODE_INVAL)
739

    
740
    node_list = self.owned_locks(locking.LEVEL_NODE)
741
    self.cluster = cluster = self.cfg.GetClusterInfo()
742

    
743
    vm_capable_nodes = [node.name
744
                        for node in self.cfg.GetAllNodesInfo().values()
745
                        if node.name in node_list and node.vm_capable]
746

    
747
    (enabled_disk_templates, new_enabled_disk_templates) = \
748
      self._GetEnabledDiskTemplates(cluster)
749

    
750
    self._CheckVgName(vm_capable_nodes, enabled_disk_templates,
751
                      new_enabled_disk_templates)
752

    
753
    if self.op.drbd_helper:
754
      # checks given drbd helper on all nodes
755
      helpers = self.rpc.call_drbd_helper(node_list)
756
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
757
        if ninfo.offline:
758
          self.LogInfo("Not checking drbd helper on offline node %s", node)
759
          continue
760
        msg = helpers[node].fail_msg
761
        if msg:
762
          raise errors.OpPrereqError("Error checking drbd helper on node"
763
                                     " '%s': %s" % (node, msg),
764
                                     errors.ECODE_ENVIRON)
765
        node_helper = helpers[node].payload
766
        if node_helper != self.op.drbd_helper:
767
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
768
                                     (node, node_helper), errors.ECODE_ENVIRON)
769

    
770
    # validate params changes
771
    if self.op.beparams:
772
      objects.UpgradeBeParams(self.op.beparams)
773
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
774
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
775

    
776
    if self.op.ndparams:
777
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
778
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
779

    
780
      # TODO: we need a more general way to handle resetting
781
      # cluster-level parameters to default values
782
      if self.new_ndparams["oob_program"] == "":
783
        self.new_ndparams["oob_program"] = \
784
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
785

    
786
    if self.op.hv_state:
787
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
788
                                           self.cluster.hv_state_static)
789
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
790
                               for hv, values in new_hv_state.items())
791

    
792
    if self.op.disk_state:
793
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
794
                                               self.cluster.disk_state_static)
795
      self.new_disk_state = \
796
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
797
                            for name, values in svalues.items()))
798
             for storage, svalues in new_disk_state.items())
799

    
800
    if self.op.ipolicy:
801
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
802
                                           group_policy=False)
803

    
804
      all_instances = self.cfg.GetAllInstancesInfo().values()
805
      violations = set()
806
      for group in self.cfg.GetAllNodeGroupsInfo().values():
807
        instances = frozenset([inst for inst in all_instances
808
                               if compat.any(node in group.members
809
                                             for node in inst.all_nodes)])
810
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
811
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
812
        new = ComputeNewInstanceViolations(ipol,
813
                                           new_ipolicy, instances, self.cfg)
814
        if new:
815
          violations.update(new)
816

    
817
      if violations:
818
        self.LogWarning("After the ipolicy change the following instances"
819
                        " violate them: %s",
820
                        utils.CommaJoin(utils.NiceSort(violations)))
821

    
822
    if self.op.nicparams:
823
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
824
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
825
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
826
      nic_errors = []
827

    
828
      # check all instances for consistency
829
      for instance in self.cfg.GetAllInstancesInfo().values():
830
        for nic_idx, nic in enumerate(instance.nics):
831
          params_copy = copy.deepcopy(nic.nicparams)
832
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
833

    
834
          # check parameter syntax
835
          try:
836
            objects.NIC.CheckParameterSyntax(params_filled)
837
          except errors.ConfigurationError, err:
838
            nic_errors.append("Instance %s, nic/%d: %s" %
839
                              (instance.name, nic_idx, err))
840

    
841
          # if we're moving instances to routed, check that they have an ip
842
          target_mode = params_filled[constants.NIC_MODE]
843
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
844
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
845
                              " address" % (instance.name, nic_idx))
846
      if nic_errors:
847
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
848
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
849

    
850
    # hypervisor list/parameters
851
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
852
    if self.op.hvparams:
853
      for hv_name, hv_dict in self.op.hvparams.items():
854
        if hv_name not in self.new_hvparams:
855
          self.new_hvparams[hv_name] = hv_dict
856
        else:
857
          self.new_hvparams[hv_name].update(hv_dict)
858

    
859
    # disk template parameters
860
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
861
    if self.op.diskparams:
862
      for dt_name, dt_params in self.op.diskparams.items():
863
        if dt_name not in self.op.diskparams:
864
          self.new_diskparams[dt_name] = dt_params
865
        else:
866
          self.new_diskparams[dt_name].update(dt_params)
867

    
868
    # os hypervisor parameters
869
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
870
    if self.op.os_hvp:
871
      for os_name, hvs in self.op.os_hvp.items():
872
        if os_name not in self.new_os_hvp:
873
          self.new_os_hvp[os_name] = hvs
874
        else:
875
          for hv_name, hv_dict in hvs.items():
876
            if hv_dict is None:
877
              # Delete if it exists
878
              self.new_os_hvp[os_name].pop(hv_name, None)
879
            elif hv_name not in self.new_os_hvp[os_name]:
880
              self.new_os_hvp[os_name][hv_name] = hv_dict
881
            else:
882
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
883

    
884
    # os parameters
885
    self.new_osp = objects.FillDict(cluster.osparams, {})
886
    if self.op.osparams:
887
      for os_name, osp in self.op.osparams.items():
888
        if os_name not in self.new_osp:
889
          self.new_osp[os_name] = {}
890

    
891
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
892
                                                 use_none=True)
893

    
894
        if not self.new_osp[os_name]:
895
          # we removed all parameters
896
          del self.new_osp[os_name]
897
        else:
898
          # check the parameter validity (remote check)
899
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
900
                        os_name, self.new_osp[os_name])
901

    
902
    # changes to the hypervisor list
903
    if self.op.enabled_hypervisors is not None:
904
      self.hv_list = self.op.enabled_hypervisors
905
      for hv in self.hv_list:
906
        # if the hypervisor doesn't already exist in the cluster
907
        # hvparams, we initialize it to empty, and then (in both
908
        # cases) we make sure to fill the defaults, as we might not
909
        # have a complete defaults list if the hypervisor wasn't
910
        # enabled before
911
        if hv not in new_hvp:
912
          new_hvp[hv] = {}
913
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
914
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
915
    else:
916
      self.hv_list = cluster.enabled_hypervisors
917

    
918
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
919
      # either the enabled list has changed, or the parameters have, validate
920
      for hv_name, hv_params in self.new_hvparams.items():
921
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
922
            (self.op.enabled_hypervisors and
923
             hv_name in self.op.enabled_hypervisors)):
924
          # either this is a new hypervisor, or its parameters have changed
925
          hv_class = hypervisor.GetHypervisorClass(hv_name)
926
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
927
          hv_class.CheckParameterSyntax(hv_params)
928
          CheckHVParams(self, node_list, hv_name, hv_params)
929

    
930
    self._CheckDiskTemplateConsistency()
931

    
932
    if self.op.os_hvp:
933
      # no need to check any newly-enabled hypervisors, since the
934
      # defaults have already been checked in the above code-block
935
      for os_name, os_hvp in self.new_os_hvp.items():
936
        for hv_name, hv_params in os_hvp.items():
937
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
938
          # we need to fill in the new os_hvp on top of the actual hv_p
939
          cluster_defaults = self.new_hvparams.get(hv_name, {})
940
          new_osp = objects.FillDict(cluster_defaults, hv_params)
941
          hv_class = hypervisor.GetHypervisorClass(hv_name)
942
          hv_class.CheckParameterSyntax(new_osp)
943
          CheckHVParams(self, node_list, hv_name, new_osp)
944

    
945
    if self.op.default_iallocator:
946
      alloc_script = utils.FindFile(self.op.default_iallocator,
947
                                    constants.IALLOCATOR_SEARCH_PATH,
948
                                    os.path.isfile)
949
      if alloc_script is None:
950
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
951
                                   " specified" % self.op.default_iallocator,
952
                                   errors.ECODE_INVAL)
953

    
954
  def _CheckDiskTemplateConsistency(self):
955
    """Check whether the disk templates that are going to be disabled
956
       are still in use by some instances.
957

958
    """
959
    if self.op.enabled_disk_templates:
960
      cluster = self.cfg.GetClusterInfo()
961
      instances = self.cfg.GetAllInstancesInfo()
962

    
963
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
964
        - set(self.op.enabled_disk_templates)
965
      for instance in instances.itervalues():
966
        if instance.disk_template in disk_templates_to_remove:
967
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
968
                                     " because instance '%s' is using it." %
969
                                     (instance.disk_template, instance.name))
970

    
971
  def _SetVgName(self, feedback_fn):
972
    """Determines and sets the new volume group name.
973

974
    """
975
    if self.op.vg_name is not None:
976
      if self.op.vg_name and not \
977
           utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
978
        feedback_fn("Note that you specified a volume group, but did not"
979
                    " enable any lvm disk template.")
980
      new_volume = self.op.vg_name
981
      if not new_volume:
982
        if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
983
          raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
984
                                     " disk templates are enabled.")
985
        new_volume = None
986
      if new_volume != self.cfg.GetVGName():
987
        self.cfg.SetVGName(new_volume)
988
      else:
989
        feedback_fn("Cluster LVM configuration already in desired"
990
                    " state, not changing")
991
    else:
992
      if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
993
          not self.cfg.GetVGName():
994
        raise errors.OpPrereqError("Please specify a volume group when"
995
                                   " enabling lvm-based disk-templates.")
996

    
997
  def Exec(self, feedback_fn):
998
    """Change the parameters of the cluster.
999

1000
    """
1001
    if self.op.enabled_disk_templates:
1002
      self.cluster.enabled_disk_templates = \
1003
        list(set(self.op.enabled_disk_templates))
1004

    
1005
    self._SetVgName(feedback_fn)
1006

    
1007
    if self.op.drbd_helper is not None:
1008
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1009
        feedback_fn("Note that you specified a drbd user helper, but did"
1010
                    " enabled the drbd disk template.")
1011
      new_helper = self.op.drbd_helper
1012
      if not new_helper:
1013
        new_helper = None
1014
      if new_helper != self.cfg.GetDRBDHelper():
1015
        self.cfg.SetDRBDHelper(new_helper)
1016
      else:
1017
        feedback_fn("Cluster DRBD helper already in desired state,"
1018
                    " not changing")
1019
    if self.op.hvparams:
1020
      self.cluster.hvparams = self.new_hvparams
1021
    if self.op.os_hvp:
1022
      self.cluster.os_hvp = self.new_os_hvp
1023
    if self.op.enabled_hypervisors is not None:
1024
      self.cluster.hvparams = self.new_hvparams
1025
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1026
    if self.op.beparams:
1027
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1028
    if self.op.nicparams:
1029
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1030
    if self.op.ipolicy:
1031
      self.cluster.ipolicy = self.new_ipolicy
1032
    if self.op.osparams:
1033
      self.cluster.osparams = self.new_osp
1034
    if self.op.ndparams:
1035
      self.cluster.ndparams = self.new_ndparams
1036
    if self.op.diskparams:
1037
      self.cluster.diskparams = self.new_diskparams
1038
    if self.op.hv_state:
1039
      self.cluster.hv_state_static = self.new_hv_state
1040
    if self.op.disk_state:
1041
      self.cluster.disk_state_static = self.new_disk_state
1042

    
1043
    if self.op.candidate_pool_size is not None:
1044
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1045
      # we need to update the pool size here, otherwise the save will fail
1046
      AdjustCandidatePool(self, [])
1047

    
1048
    if self.op.maintain_node_health is not None:
1049
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1050
        feedback_fn("Note: CONFD was disabled at build time, node health"
1051
                    " maintenance is not useful (still enabling it)")
1052
      self.cluster.maintain_node_health = self.op.maintain_node_health
1053

    
1054
    if self.op.prealloc_wipe_disks is not None:
1055
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1056

    
1057
    if self.op.add_uids is not None:
1058
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1059

    
1060
    if self.op.remove_uids is not None:
1061
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1062

    
1063
    if self.op.uid_pool is not None:
1064
      self.cluster.uid_pool = self.op.uid_pool
1065

    
1066
    if self.op.default_iallocator is not None:
1067
      self.cluster.default_iallocator = self.op.default_iallocator
1068

    
1069
    if self.op.reserved_lvs is not None:
1070
      self.cluster.reserved_lvs = self.op.reserved_lvs
1071

    
1072
    if self.op.use_external_mip_script is not None:
1073
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1074

    
1075
    def helper_os(aname, mods, desc):
1076
      desc += " OS list"
1077
      lst = getattr(self.cluster, aname)
1078
      for key, val in mods:
1079
        if key == constants.DDM_ADD:
1080
          if val in lst:
1081
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1082
          else:
1083
            lst.append(val)
1084
        elif key == constants.DDM_REMOVE:
1085
          if val in lst:
1086
            lst.remove(val)
1087
          else:
1088
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1089
        else:
1090
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1091

    
1092
    if self.op.hidden_os:
1093
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1094

    
1095
    if self.op.blacklisted_os:
1096
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1097

    
1098
    if self.op.master_netdev:
1099
      master_params = self.cfg.GetMasterNetworkParameters()
1100
      ems = self.cfg.GetUseExternalMipScript()
1101
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1102
                  self.cluster.master_netdev)
1103
      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1104
                                                       master_params, ems)
1105
      result.Raise("Could not disable the master ip")
1106
      feedback_fn("Changing master_netdev from %s to %s" %
1107
                  (master_params.netdev, self.op.master_netdev))
1108
      self.cluster.master_netdev = self.op.master_netdev
1109

    
1110
    if self.op.master_netmask:
1111
      master_params = self.cfg.GetMasterNetworkParameters()
1112
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1113
      result = self.rpc.call_node_change_master_netmask(master_params.name,
1114
                                                        master_params.netmask,
1115
                                                        self.op.master_netmask,
1116
                                                        master_params.ip,
1117
                                                        master_params.netdev)
1118
      if result.fail_msg:
1119
        msg = "Could not change the master IP netmask: %s" % result.fail_msg
1120
        feedback_fn(msg)
1121

    
1122
      self.cluster.master_netmask = self.op.master_netmask
1123

    
1124
    self.cfg.Update(self.cluster, feedback_fn)
1125

    
1126
    if self.op.master_netdev:
1127
      master_params = self.cfg.GetMasterNetworkParameters()
1128
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1129
                  self.op.master_netdev)
1130
      ems = self.cfg.GetUseExternalMipScript()
1131
      result = self.rpc.call_node_activate_master_ip(master_params.name,
1132
                                                     master_params, ems)
1133
      if result.fail_msg:
1134
        self.LogWarning("Could not re-enable the master ip on"
1135
                        " the master, please restart manually: %s",
1136
                        result.fail_msg)
1137

    
1138

    
1139
class LUClusterVerify(NoHooksLU):
1140
  """Submits all jobs necessary to verify the cluster.
1141

1142
  """
1143
  REQ_BGL = False
1144

    
1145
  def ExpandNames(self):
1146
    self.needed_locks = {}
1147

    
1148
  def Exec(self, feedback_fn):
1149
    jobs = []
1150

    
1151
    if self.op.group_name:
1152
      groups = [self.op.group_name]
1153
      depends_fn = lambda: None
1154
    else:
1155
      groups = self.cfg.GetNodeGroupList()
1156

    
1157
      # Verify global configuration
1158
      jobs.append([
1159
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1160
        ])
1161

    
1162
      # Always depend on global verification
1163
      depends_fn = lambda: [(-len(jobs), [])]
1164

    
1165
    jobs.extend(
1166
      [opcodes.OpClusterVerifyGroup(group_name=group,
1167
                                    ignore_errors=self.op.ignore_errors,
1168
                                    depends=depends_fn())]
1169
      for group in groups)
1170

    
1171
    # Fix up all parameters
1172
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1173
      op.debug_simulate_errors = self.op.debug_simulate_errors
1174
      op.verbose = self.op.verbose
1175
      op.error_codes = self.op.error_codes
1176
      try:
1177
        op.skip_checks = self.op.skip_checks
1178
      except AttributeError:
1179
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1180

    
1181
    return ResultWithJobs(jobs)
1182

    
1183

    
1184
class _VerifyErrors(object):
1185
  """Mix-in for cluster/group verify LUs.
1186

1187
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1188
  self.op and self._feedback_fn to be available.)
1189

1190
  """
1191

    
1192
  ETYPE_FIELD = "code"
1193
  ETYPE_ERROR = "ERROR"
1194
  ETYPE_WARNING = "WARNING"
1195

    
1196
  def _Error(self, ecode, item, msg, *args, **kwargs):
1197
    """Format an error message.
1198

1199
    Based on the opcode's error_codes parameter, either format a
1200
    parseable error code, or a simpler error string.
1201

1202
    This must be called only from Exec and functions called from Exec.
1203

1204
    """
1205
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1206
    itype, etxt, _ = ecode
1207
    # If the error code is in the list of ignored errors, demote the error to a
1208
    # warning
1209
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1210
      ltype = self.ETYPE_WARNING
1211
    # first complete the msg
1212
    if args:
1213
      msg = msg % args
1214
    # then format the whole message
1215
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1216
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1217
    else:
1218
      if item:
1219
        item = " " + item
1220
      else:
1221
        item = ""
1222
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1223
    # and finally report it via the feedback_fn
1224
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1225
    # do not mark the operation as failed for WARN cases only
1226
    if ltype == self.ETYPE_ERROR:
1227
      self.bad = True
1228

    
1229
  def _ErrorIf(self, cond, *args, **kwargs):
1230
    """Log an error message if the passed condition is True.
1231

1232
    """
1233
    if (bool(cond)
1234
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1235
      self._Error(*args, **kwargs)
1236

    
1237

    
1238
def _VerifyCertificate(filename):
1239
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1240

1241
  @type filename: string
1242
  @param filename: Path to PEM file
1243

1244
  """
1245
  try:
1246
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1247
                                           utils.ReadFile(filename))
1248
  except Exception, err: # pylint: disable=W0703
1249
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1250
            "Failed to load X509 certificate %s: %s" % (filename, err))
1251

    
1252
  (errcode, msg) = \
1253
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1254
                                constants.SSL_CERT_EXPIRATION_ERROR)
1255

    
1256
  if msg:
1257
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1258
  else:
1259
    fnamemsg = None
1260

    
1261
  if errcode is None:
1262
    return (None, fnamemsg)
1263
  elif errcode == utils.CERT_WARNING:
1264
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1265
  elif errcode == utils.CERT_ERROR:
1266
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1267

    
1268
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1269

    
1270

    
1271
def _GetAllHypervisorParameters(cluster, instances):
1272
  """Compute the set of all hypervisor parameters.
1273

1274
  @type cluster: L{objects.Cluster}
1275
  @param cluster: the cluster object
1276
  @param instances: list of L{objects.Instance}
1277
  @param instances: additional instances from which to obtain parameters
1278
  @rtype: list of (origin, hypervisor, parameters)
1279
  @return: a list with all parameters found, indicating the hypervisor they
1280
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1281

1282
  """
1283
  hvp_data = []
1284

    
1285
  for hv_name in cluster.enabled_hypervisors:
1286
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1287

    
1288
  for os_name, os_hvp in cluster.os_hvp.items():
1289
    for hv_name, hv_params in os_hvp.items():
1290
      if hv_params:
1291
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1292
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1293

    
1294
  # TODO: collapse identical parameter values in a single one
1295
  for instance in instances:
1296
    if instance.hvparams:
1297
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1298
                       cluster.FillHV(instance)))
1299

    
1300
  return hvp_data
1301

    
1302

    
1303
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1304
  """Verifies the cluster config.
1305

1306
  """
1307
  REQ_BGL = False
1308

    
1309
  def _VerifyHVP(self, hvp_data):
1310
    """Verifies locally the syntax of the hypervisor parameters.
1311

1312
    """
1313
    for item, hv_name, hv_params in hvp_data:
1314
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1315
             (item, hv_name))
1316
      try:
1317
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1318
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1319
        hv_class.CheckParameterSyntax(hv_params)
1320
      except errors.GenericError, err:
1321
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1322

    
1323
  def ExpandNames(self):
1324
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1325
    self.share_locks = ShareAll()
1326

    
1327
  def CheckPrereq(self):
1328
    """Check prerequisites.
1329

1330
    """
1331
    # Retrieve all information
1332
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1333
    self.all_node_info = self.cfg.GetAllNodesInfo()
1334
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1335

    
1336
  def Exec(self, feedback_fn):
1337
    """Verify integrity of cluster, performing various test on nodes.
1338

1339
    """
1340
    self.bad = False
1341
    self._feedback_fn = feedback_fn
1342

    
1343
    feedback_fn("* Verifying cluster config")
1344

    
1345
    for msg in self.cfg.VerifyConfig():
1346
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1347

    
1348
    feedback_fn("* Verifying cluster certificate files")
1349

    
1350
    for cert_filename in pathutils.ALL_CERT_FILES:
1351
      (errcode, msg) = _VerifyCertificate(cert_filename)
1352
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1353

    
1354
    feedback_fn("* Verifying hypervisor parameters")
1355

    
1356
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1357
                                                self.all_inst_info.values()))
1358

    
1359
    feedback_fn("* Verifying all nodes belong to an existing group")
1360

    
1361
    # We do this verification here because, should this bogus circumstance
1362
    # occur, it would never be caught by VerifyGroup, which only acts on
1363
    # nodes/instances reachable from existing node groups.
1364

    
1365
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1366
                         if node.group not in self.all_group_info)
1367

    
1368
    dangling_instances = {}
1369
    no_node_instances = []
1370

    
1371
    for inst in self.all_inst_info.values():
1372
      if inst.primary_node in dangling_nodes:
1373
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1374
      elif inst.primary_node not in self.all_node_info:
1375
        no_node_instances.append(inst.name)
1376

    
1377
    pretty_dangling = [
1378
        "%s (%s)" %
1379
        (node.name,
1380
         utils.CommaJoin(dangling_instances.get(node.name,
1381
                                                ["no instances"])))
1382
        for node in dangling_nodes]
1383

    
1384
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1385
                  None,
1386
                  "the following nodes (and their instances) belong to a non"
1387
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1388

    
1389
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1390
                  None,
1391
                  "the following instances have a non-existing primary-node:"
1392
                  " %s", utils.CommaJoin(no_node_instances))
1393

    
1394
    return not self.bad
1395

    
1396

    
1397
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1398
  """Verifies the status of a node group.
1399

1400
  """
1401
  HPATH = "cluster-verify"
1402
  HTYPE = constants.HTYPE_CLUSTER
1403
  REQ_BGL = False
1404

    
1405
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1406

    
1407
  class NodeImage(object):
1408
    """A class representing the logical and physical status of a node.
1409

1410
    @type name: string
1411
    @ivar name: the node name to which this object refers
1412
    @ivar volumes: a structure as returned from
1413
        L{ganeti.backend.GetVolumeList} (runtime)
1414
    @ivar instances: a list of running instances (runtime)
1415
    @ivar pinst: list of configured primary instances (config)
1416
    @ivar sinst: list of configured secondary instances (config)
1417
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1418
        instances for which this node is secondary (config)
1419
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1420
    @ivar dfree: free disk, as reported by the node (runtime)
1421
    @ivar offline: the offline status (config)
1422
    @type rpc_fail: boolean
1423
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1424
        not whether the individual keys were correct) (runtime)
1425
    @type lvm_fail: boolean
1426
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1427
    @type hyp_fail: boolean
1428
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1429
    @type ghost: boolean
1430
    @ivar ghost: whether this is a known node or not (config)
1431
    @type os_fail: boolean
1432
    @ivar os_fail: whether the RPC call didn't return valid OS data
1433
    @type oslist: list
1434
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1435
    @type vm_capable: boolean
1436
    @ivar vm_capable: whether the node can host instances
1437
    @type pv_min: float
1438
    @ivar pv_min: size in MiB of the smallest PVs
1439
    @type pv_max: float
1440
    @ivar pv_max: size in MiB of the biggest PVs
1441

1442
    """
1443
    def __init__(self, offline=False, name=None, vm_capable=True):
1444
      self.name = name
1445
      self.volumes = {}
1446
      self.instances = []
1447
      self.pinst = []
1448
      self.sinst = []
1449
      self.sbp = {}
1450
      self.mfree = 0
1451
      self.dfree = 0
1452
      self.offline = offline
1453
      self.vm_capable = vm_capable
1454
      self.rpc_fail = False
1455
      self.lvm_fail = False
1456
      self.hyp_fail = False
1457
      self.ghost = False
1458
      self.os_fail = False
1459
      self.oslist = {}
1460
      self.pv_min = None
1461
      self.pv_max = None
1462

    
1463
  def ExpandNames(self):
1464
    # This raises errors.OpPrereqError on its own:
1465
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1466

    
1467
    # Get instances in node group; this is unsafe and needs verification later
1468
    inst_names = \
1469
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1470

    
1471
    self.needed_locks = {
1472
      locking.LEVEL_INSTANCE: inst_names,
1473
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1474
      locking.LEVEL_NODE: [],
1475

    
1476
      # This opcode is run by watcher every five minutes and acquires all nodes
1477
      # for a group. It doesn't run for a long time, so it's better to acquire
1478
      # the node allocation lock as well.
1479
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1480
      }
1481

    
1482
    self.share_locks = ShareAll()
1483

    
1484
  def DeclareLocks(self, level):
1485
    if level == locking.LEVEL_NODE:
1486
      # Get members of node group; this is unsafe and needs verification later
1487
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1488

    
1489
      all_inst_info = self.cfg.GetAllInstancesInfo()
1490

    
1491
      # In Exec(), we warn about mirrored instances that have primary and
1492
      # secondary living in separate node groups. To fully verify that
1493
      # volumes for these instances are healthy, we will need to do an
1494
      # extra call to their secondaries. We ensure here those nodes will
1495
      # be locked.
1496
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1497
        # Important: access only the instances whose lock is owned
1498
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1499
          nodes.update(all_inst_info[inst].secondary_nodes)
1500

    
1501
      self.needed_locks[locking.LEVEL_NODE] = nodes
1502

    
1503
  def CheckPrereq(self):
1504
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1505
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1506

    
1507
    group_nodes = set(self.group_info.members)
1508
    group_instances = \
1509
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1510

    
1511
    unlocked_nodes = \
1512
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1513

    
1514
    unlocked_instances = \
1515
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1516

    
1517
    if unlocked_nodes:
1518
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1519
                                 utils.CommaJoin(unlocked_nodes),
1520
                                 errors.ECODE_STATE)
1521

    
1522
    if unlocked_instances:
1523
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1524
                                 utils.CommaJoin(unlocked_instances),
1525
                                 errors.ECODE_STATE)
1526

    
1527
    self.all_node_info = self.cfg.GetAllNodesInfo()
1528
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1529

    
1530
    self.my_node_names = utils.NiceSort(group_nodes)
1531
    self.my_inst_names = utils.NiceSort(group_instances)
1532

    
1533
    self.my_node_info = dict((name, self.all_node_info[name])
1534
                             for name in self.my_node_names)
1535

    
1536
    self.my_inst_info = dict((name, self.all_inst_info[name])
1537
                             for name in self.my_inst_names)
1538

    
1539
    # We detect here the nodes that will need the extra RPC calls for verifying
1540
    # split LV volumes; they should be locked.
1541
    extra_lv_nodes = set()
1542

    
1543
    for inst in self.my_inst_info.values():
1544
      if inst.disk_template in constants.DTS_INT_MIRROR:
1545
        for nname in inst.all_nodes:
1546
          if self.all_node_info[nname].group != self.group_uuid:
1547
            extra_lv_nodes.add(nname)
1548

    
1549
    unlocked_lv_nodes = \
1550
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1551

    
1552
    if unlocked_lv_nodes:
1553
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1554
                                 utils.CommaJoin(unlocked_lv_nodes),
1555
                                 errors.ECODE_STATE)
1556
    self.extra_lv_nodes = list(extra_lv_nodes)
1557

    
1558
  def _VerifyNode(self, ninfo, nresult):
1559
    """Perform some basic validation on data returned from a node.
1560

1561
      - check the result data structure is well formed and has all the
1562
        mandatory fields
1563
      - check ganeti version
1564

1565
    @type ninfo: L{objects.Node}
1566
    @param ninfo: the node to check
1567
    @param nresult: the results from the node
1568
    @rtype: boolean
1569
    @return: whether overall this call was successful (and we can expect
1570
         reasonable values in the respose)
1571

1572
    """
1573
    node = ninfo.name
1574
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1575

    
1576
    # main result, nresult should be a non-empty dict
1577
    test = not nresult or not isinstance(nresult, dict)
1578
    _ErrorIf(test, constants.CV_ENODERPC, node,
1579
                  "unable to verify node: no data returned")
1580
    if test:
1581
      return False
1582

    
1583
    # compares ganeti version
1584
    local_version = constants.PROTOCOL_VERSION
1585
    remote_version = nresult.get("version", None)
1586
    test = not (remote_version and
1587
                isinstance(remote_version, (list, tuple)) and
1588
                len(remote_version) == 2)
1589
    _ErrorIf(test, constants.CV_ENODERPC, node,
1590
             "connection to node returned invalid data")
1591
    if test:
1592
      return False
1593

    
1594
    test = local_version != remote_version[0]
1595
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1596
             "incompatible protocol versions: master %s,"
1597
             " node %s", local_version, remote_version[0])
1598
    if test:
1599
      return False
1600

    
1601
    # node seems compatible, we can actually try to look into its results
1602

    
1603
    # full package version
1604
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1605
                  constants.CV_ENODEVERSION, node,
1606
                  "software version mismatch: master %s, node %s",
1607
                  constants.RELEASE_VERSION, remote_version[1],
1608
                  code=self.ETYPE_WARNING)
1609

    
1610
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1611
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1612
      for hv_name, hv_result in hyp_result.iteritems():
1613
        test = hv_result is not None
1614
        _ErrorIf(test, constants.CV_ENODEHV, node,
1615
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1616

    
1617
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1618
    if ninfo.vm_capable and isinstance(hvp_result, list):
1619
      for item, hv_name, hv_result in hvp_result:
1620
        _ErrorIf(True, constants.CV_ENODEHV, node,
1621
                 "hypervisor %s parameter verify failure (source %s): %s",
1622
                 hv_name, item, hv_result)
1623

    
1624
    test = nresult.get(constants.NV_NODESETUP,
1625
                       ["Missing NODESETUP results"])
1626
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1627
             "; ".join(test))
1628

    
1629
    return True
1630

    
1631
  def _VerifyNodeTime(self, ninfo, nresult,
1632
                      nvinfo_starttime, nvinfo_endtime):
1633
    """Check the node time.
1634

1635
    @type ninfo: L{objects.Node}
1636
    @param ninfo: the node to check
1637
    @param nresult: the remote results for the node
1638
    @param nvinfo_starttime: the start time of the RPC call
1639
    @param nvinfo_endtime: the end time of the RPC call
1640

1641
    """
1642
    node = ninfo.name
1643
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1644

    
1645
    ntime = nresult.get(constants.NV_TIME, None)
1646
    try:
1647
      ntime_merged = utils.MergeTime(ntime)
1648
    except (ValueError, TypeError):
1649
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1650
      return
1651

    
1652
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1653
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1654
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1655
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1656
    else:
1657
      ntime_diff = None
1658

    
1659
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1660
             "Node time diverges by at least %s from master node time",
1661
             ntime_diff)
1662

    
1663
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1664
    """Check the node LVM results and update info for cross-node checks.
1665

1666
    @type ninfo: L{objects.Node}
1667
    @param ninfo: the node to check
1668
    @param nresult: the remote results for the node
1669
    @param vg_name: the configured VG name
1670
    @type nimg: L{NodeImage}
1671
    @param nimg: node image
1672

1673
    """
1674
    if vg_name is None:
1675
      return
1676

    
1677
    node = ninfo.name
1678
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1679

    
1680
    # checks vg existence and size > 20G
1681
    vglist = nresult.get(constants.NV_VGLIST, None)
1682
    test = not vglist
1683
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1684
    if not test:
1685
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1686
                                            constants.MIN_VG_SIZE)
1687
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1688

    
1689
    # Check PVs
1690
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1691
    for em in errmsgs:
1692
      self._Error(constants.CV_ENODELVM, node, em)
1693
    if pvminmax is not None:
1694
      (nimg.pv_min, nimg.pv_max) = pvminmax
1695

    
1696
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1697
    """Check cross-node DRBD version consistency.
1698

1699
    @type node_verify_infos: dict
1700
    @param node_verify_infos: infos about nodes as returned from the
1701
      node_verify call.
1702

1703
    """
1704
    node_versions = {}
1705
    for node, ndata in node_verify_infos.items():
1706
      nresult = ndata.payload
1707
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1708
      node_versions[node] = version
1709

    
1710
    if len(set(node_versions.values())) > 1:
1711
      for node, version in sorted(node_versions.items()):
1712
        msg = "DRBD version mismatch: %s" % version
1713
        self._Error(constants.CV_ENODEDRBDHELPER, node, msg,
1714
                    code=self.ETYPE_WARNING)
1715

    
1716
  def _VerifyGroupLVM(self, node_image, vg_name):
1717
    """Check cross-node consistency in LVM.
1718

1719
    @type node_image: dict
1720
    @param node_image: info about nodes, mapping from node to names to
1721
      L{NodeImage} objects
1722
    @param vg_name: the configured VG name
1723

1724
    """
1725
    if vg_name is None:
1726
      return
1727

    
1728
    # Only exlcusive storage needs this kind of checks
1729
    if not self._exclusive_storage:
1730
      return
1731

    
1732
    # exclusive_storage wants all PVs to have the same size (approximately),
1733
    # if the smallest and the biggest ones are okay, everything is fine.
1734
    # pv_min is None iff pv_max is None
1735
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1736
    if not vals:
1737
      return
1738
    (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1739
    (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1740
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1741
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1742
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1743
                  " on %s, biggest (%s MB) is on %s",
1744
                  pvmin, minnode, pvmax, maxnode)
1745

    
1746
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1747
    """Check the node bridges.
1748

1749
    @type ninfo: L{objects.Node}
1750
    @param ninfo: the node to check
1751
    @param nresult: the remote results for the node
1752
    @param bridges: the expected list of bridges
1753

1754
    """
1755
    if not bridges:
1756
      return
1757

    
1758
    node = ninfo.name
1759
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1760

    
1761
    missing = nresult.get(constants.NV_BRIDGES, None)
1762
    test = not isinstance(missing, list)
1763
    _ErrorIf(test, constants.CV_ENODENET, node,
1764
             "did not return valid bridge information")
1765
    if not test:
1766
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1767
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1768

    
1769
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1770
    """Check the results of user scripts presence and executability on the node
1771

1772
    @type ninfo: L{objects.Node}
1773
    @param ninfo: the node to check
1774
    @param nresult: the remote results for the node
1775

1776
    """
1777
    node = ninfo.name
1778

    
1779
    test = not constants.NV_USERSCRIPTS in nresult
1780
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1781
                  "did not return user scripts information")
1782

    
1783
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1784
    if not test:
1785
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1786
                    "user scripts not present or not executable: %s" %
1787
                    utils.CommaJoin(sorted(broken_scripts)))
1788

    
1789
  def _VerifyNodeNetwork(self, ninfo, nresult):
1790
    """Check the node network connectivity results.
1791

1792
    @type ninfo: L{objects.Node}
1793
    @param ninfo: the node to check
1794
    @param nresult: the remote results for the node
1795

1796
    """
1797
    node = ninfo.name
1798
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1799

    
1800
    test = constants.NV_NODELIST not in nresult
1801
    _ErrorIf(test, constants.CV_ENODESSH, node,
1802
             "node hasn't returned node ssh connectivity data")
1803
    if not test:
1804
      if nresult[constants.NV_NODELIST]:
1805
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1806
          _ErrorIf(True, constants.CV_ENODESSH, node,
1807
                   "ssh communication with node '%s': %s", a_node, a_msg)
1808

    
1809
    test = constants.NV_NODENETTEST not in nresult
1810
    _ErrorIf(test, constants.CV_ENODENET, node,
1811
             "node hasn't returned node tcp connectivity data")
1812
    if not test:
1813
      if nresult[constants.NV_NODENETTEST]:
1814
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1815
        for anode in nlist:
1816
          _ErrorIf(True, constants.CV_ENODENET, node,
1817
                   "tcp communication with node '%s': %s",
1818
                   anode, nresult[constants.NV_NODENETTEST][anode])
1819

    
1820
    test = constants.NV_MASTERIP not in nresult
1821
    _ErrorIf(test, constants.CV_ENODENET, node,
1822
             "node hasn't returned node master IP reachability data")
1823
    if not test:
1824
      if not nresult[constants.NV_MASTERIP]:
1825
        if node == self.master_node:
1826
          msg = "the master node cannot reach the master IP (not configured?)"
1827
        else:
1828
          msg = "cannot reach the master IP"
1829
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
1830

    
1831
  def _VerifyInstance(self, instance, inst_config, node_image,
1832
                      diskstatus):
1833
    """Verify an instance.
1834

1835
    This function checks to see if the required block devices are
1836
    available on the instance's node, and that the nodes are in the correct
1837
    state.
1838

1839
    """
1840
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1841
    pnode = inst_config.primary_node
1842
    pnode_img = node_image[pnode]
1843
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1844

    
1845
    node_vol_should = {}
1846
    inst_config.MapLVsByNode(node_vol_should)
1847

    
1848
    cluster = self.cfg.GetClusterInfo()
1849
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1850
                                                            self.group_info)
1851
    err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1852
    _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1853
             code=self.ETYPE_WARNING)
1854

    
1855
    for node in node_vol_should:
1856
      n_img = node_image[node]
1857
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1858
        # ignore missing volumes on offline or broken nodes
1859
        continue
1860
      for volume in node_vol_should[node]:
1861
        test = volume not in n_img.volumes
1862
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1863
                 "volume %s missing on node %s", volume, node)
1864

    
1865
    if inst_config.admin_state == constants.ADMINST_UP:
1866
      test = instance not in pnode_img.instances and not pnode_img.offline
1867
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1868
               "instance not running on its primary node %s",
1869
               pnode)
1870
      _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1871
               "instance is marked as running and lives on offline node %s",
1872
               pnode)
1873

    
1874
    diskdata = [(nname, success, status, idx)
1875
                for (nname, disks) in diskstatus.items()
1876
                for idx, (success, status) in enumerate(disks)]
1877

    
1878
    for nname, success, bdev_status, idx in diskdata:
1879
      # the 'ghost node' construction in Exec() ensures that we have a
1880
      # node here
1881
      snode = node_image[nname]
1882
      bad_snode = snode.ghost or snode.offline
1883
      _ErrorIf(inst_config.admin_state == constants.ADMINST_UP and
1884
               not success and not bad_snode,
1885
               constants.CV_EINSTANCEFAULTYDISK, instance,
1886
               "couldn't retrieve status for disk/%s on %s: %s",
1887
               idx, nname, bdev_status)
1888
      _ErrorIf((inst_config.admin_state == constants.ADMINST_UP and
1889
                success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1890
               constants.CV_EINSTANCEFAULTYDISK, instance,
1891
               "disk/%s on %s is faulty", idx, nname)
1892

    
1893
    _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1894
             constants.CV_ENODERPC, pnode, "instance %s, connection to"
1895
             " primary node failed", instance)
1896

    
1897
    _ErrorIf(len(inst_config.secondary_nodes) > 1,
1898
             constants.CV_EINSTANCELAYOUT,
1899
             instance, "instance has multiple secondary nodes: %s",
1900
             utils.CommaJoin(inst_config.secondary_nodes),
1901
             code=self.ETYPE_WARNING)
1902

    
1903
    if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1904
      # Disk template not compatible with exclusive_storage: no instance
1905
      # node should have the flag set
1906
      es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1907
                                                     inst_config.all_nodes)
1908
      es_nodes = [n for (n, es) in es_flags.items()
1909
                  if es]
1910
      _ErrorIf(es_nodes, constants.CV_EINSTANCEUNSUITABLENODE, instance,
1911
               "instance has template %s, which is not supported on nodes"
1912
               " that have exclusive storage set: %s",
1913
               inst_config.disk_template, utils.CommaJoin(es_nodes))
1914

    
1915
    if inst_config.disk_template in constants.DTS_INT_MIRROR:
1916
      instance_nodes = utils.NiceSort(inst_config.all_nodes)
1917
      instance_groups = {}
1918

    
1919
      for node in instance_nodes:
1920
        instance_groups.setdefault(self.all_node_info[node].group,
1921
                                   []).append(node)
1922

    
1923
      pretty_list = [
1924
        "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1925
        # Sort so that we always list the primary node first.
1926
        for group, nodes in sorted(instance_groups.items(),
1927
                                   key=lambda (_, nodes): pnode in nodes,
1928
                                   reverse=True)]
1929

    
1930
      self._ErrorIf(len(instance_groups) > 1,
1931
                    constants.CV_EINSTANCESPLITGROUPS,
1932
                    instance, "instance has primary and secondary nodes in"
1933
                    " different groups: %s", utils.CommaJoin(pretty_list),
1934
                    code=self.ETYPE_WARNING)
1935

    
1936
    inst_nodes_offline = []
1937
    for snode in inst_config.secondary_nodes:
1938
      s_img = node_image[snode]
1939
      _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1940
               snode, "instance %s, connection to secondary node failed",
1941
               instance)
1942

    
1943
      if s_img.offline:
1944
        inst_nodes_offline.append(snode)
1945

    
1946
    # warn that the instance lives on offline nodes
1947
    _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1948
             "instance has offline secondary node(s) %s",
1949
             utils.CommaJoin(inst_nodes_offline))
1950
    # ... or ghost/non-vm_capable nodes
1951
    for node in inst_config.all_nodes:
1952
      _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1953
               instance, "instance lives on ghost node %s", node)
1954
      _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1955
               instance, "instance lives on non-vm_capable node %s", node)
1956

    
1957
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1958
    """Verify if there are any unknown volumes in the cluster.
1959

1960
    The .os, .swap and backup volumes are ignored. All other volumes are
1961
    reported as unknown.
1962

1963
    @type reserved: L{ganeti.utils.FieldSet}
1964
    @param reserved: a FieldSet of reserved volume names
1965

1966
    """
1967
    for node, n_img in node_image.items():
1968
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1969
          self.all_node_info[node].group != self.group_uuid):
1970
        # skip non-healthy nodes
1971
        continue
1972
      for volume in n_img.volumes:
1973
        test = ((node not in node_vol_should or
1974
                volume not in node_vol_should[node]) and
1975
                not reserved.Matches(volume))
1976
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1977
                      "volume %s is unknown", volume)
1978

    
1979
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1980
    """Verify N+1 Memory Resilience.
1981

1982
    Check that if one single node dies we can still start all the
1983
    instances it was primary for.
1984

1985
    """
1986
    cluster_info = self.cfg.GetClusterInfo()
1987
    for node, n_img in node_image.items():
1988
      # This code checks that every node which is now listed as
1989
      # secondary has enough memory to host all instances it is
1990
      # supposed to should a single other node in the cluster fail.
1991
      # FIXME: not ready for failover to an arbitrary node
1992
      # FIXME: does not support file-backed instances
1993
      # WARNING: we currently take into account down instances as well
1994
      # as up ones, considering that even if they're down someone
1995
      # might want to start them even in the event of a node failure.
1996
      if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1997
        # we're skipping nodes marked offline and nodes in other groups from
1998
        # the N+1 warning, since most likely we don't have good memory
1999
        # infromation from them; we already list instances living on such
2000
        # nodes, and that's enough warning
2001
        continue
2002
      #TODO(dynmem): also consider ballooning out other instances
2003
      for prinode, instances in n_img.sbp.items():
2004
        needed_mem = 0
2005
        for instance in instances:
2006
          bep = cluster_info.FillBE(instance_cfg[instance])
2007
          if bep[constants.BE_AUTO_BALANCE]:
2008
            needed_mem += bep[constants.BE_MINMEM]
2009
        test = n_img.mfree < needed_mem
2010
        self._ErrorIf(test, constants.CV_ENODEN1, node,
2011
                      "not enough memory to accomodate instance failovers"
2012
                      " should node %s fail (%dMiB needed, %dMiB available)",
2013
                      prinode, needed_mem, n_img.mfree)
2014

    
2015
  @classmethod
2016
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2017
                   (files_all, files_opt, files_mc, files_vm)):
2018
    """Verifies file checksums collected from all nodes.
2019

2020
    @param errorif: Callback for reporting errors
2021
    @param nodeinfo: List of L{objects.Node} objects
2022
    @param master_node: Name of master node
2023
    @param all_nvinfo: RPC results
2024

2025
    """
2026
    # Define functions determining which nodes to consider for a file
2027
    files2nodefn = [
2028
      (files_all, None),
2029
      (files_mc, lambda node: (node.master_candidate or
2030
                               node.name == master_node)),
2031
      (files_vm, lambda node: node.vm_capable),
2032
      ]
2033

    
2034
    # Build mapping from filename to list of nodes which should have the file
2035
    nodefiles = {}
2036
    for (files, fn) in files2nodefn:
2037
      if fn is None:
2038
        filenodes = nodeinfo
2039
      else:
2040
        filenodes = filter(fn, nodeinfo)
2041
      nodefiles.update((filename,
2042
                        frozenset(map(operator.attrgetter("name"), filenodes)))
2043
                       for filename in files)
2044

    
2045
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2046

    
2047
    fileinfo = dict((filename, {}) for filename in nodefiles)
2048
    ignore_nodes = set()
2049

    
2050
    for node in nodeinfo:
2051
      if node.offline:
2052
        ignore_nodes.add(node.name)
2053
        continue
2054

    
2055
      nresult = all_nvinfo[node.name]
2056

    
2057
      if nresult.fail_msg or not nresult.payload:
2058
        node_files = None
2059
      else:
2060
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2061
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2062
                          for (key, value) in fingerprints.items())
2063
        del fingerprints
2064

    
2065
      test = not (node_files and isinstance(node_files, dict))
2066
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
2067
              "Node did not return file checksum data")
2068
      if test:
2069
        ignore_nodes.add(node.name)
2070
        continue
2071

    
2072
      # Build per-checksum mapping from filename to nodes having it
2073
      for (filename, checksum) in node_files.items():
2074
        assert filename in nodefiles
2075
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
2076

    
2077
    for (filename, checksums) in fileinfo.items():
2078
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2079

    
2080
      # Nodes having the file
2081
      with_file = frozenset(node_name
2082
                            for nodes in fileinfo[filename].values()
2083
                            for node_name in nodes) - ignore_nodes
2084

    
2085
      expected_nodes = nodefiles[filename] - ignore_nodes
2086

    
2087
      # Nodes missing file
2088
      missing_file = expected_nodes - with_file
2089

    
2090
      if filename in files_opt:
2091
        # All or no nodes
2092
        errorif(missing_file and missing_file != expected_nodes,
2093
                constants.CV_ECLUSTERFILECHECK, None,
2094
                "File %s is optional, but it must exist on all or no"
2095
                " nodes (not found on %s)",
2096
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2097
      else:
2098
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2099
                "File %s is missing from node(s) %s", filename,
2100
                utils.CommaJoin(utils.NiceSort(missing_file)))
2101

    
2102
        # Warn if a node has a file it shouldn't
2103
        unexpected = with_file - expected_nodes
2104
        errorif(unexpected,
2105
                constants.CV_ECLUSTERFILECHECK, None,
2106
                "File %s should not exist on node(s) %s",
2107
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2108

    
2109
      # See if there are multiple versions of the file
2110
      test = len(checksums) > 1
2111
      if test:
2112
        variants = ["variant %s on %s" %
2113
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2114
                    for (idx, (checksum, nodes)) in
2115
                      enumerate(sorted(checksums.items()))]
2116
      else:
2117
        variants = []
2118

    
2119
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2120
              "File %s found with %s different checksums (%s)",
2121
              filename, len(checksums), "; ".join(variants))
2122

    
2123
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2124
                      drbd_map):
2125
    """Verifies and the node DRBD status.
2126

2127
    @type ninfo: L{objects.Node}
2128
    @param ninfo: the node to check
2129
    @param nresult: the remote results for the node
2130
    @param instanceinfo: the dict of instances
2131
    @param drbd_helper: the configured DRBD usermode helper
2132
    @param drbd_map: the DRBD map as returned by
2133
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2134

2135
    """
2136
    node = ninfo.name
2137
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2138

    
2139
    if drbd_helper:
2140
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2141
      test = (helper_result is None)
2142
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2143
               "no drbd usermode helper returned")
2144
      if helper_result:
2145
        status, payload = helper_result
2146
        test = not status
2147
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2148
                 "drbd usermode helper check unsuccessful: %s", payload)
2149
        test = status and (payload != drbd_helper)
2150
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2151
                 "wrong drbd usermode helper: %s", payload)
2152

    
2153
    # compute the DRBD minors
2154
    node_drbd = {}
2155
    for minor, instance in drbd_map[node].items():
2156
      test = instance not in instanceinfo
2157
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2158
               "ghost instance '%s' in temporary DRBD map", instance)
2159
        # ghost instance should not be running, but otherwise we
2160
        # don't give double warnings (both ghost instance and
2161
        # unallocated minor in use)
2162
      if test:
2163
        node_drbd[minor] = (instance, False)
2164
      else:
2165
        instance = instanceinfo[instance]
2166
        node_drbd[minor] = (instance.name,
2167
                            instance.admin_state == constants.ADMINST_UP)
2168

    
2169
    # and now check them
2170
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2171
    test = not isinstance(used_minors, (tuple, list))
2172
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2173
             "cannot parse drbd status file: %s", str(used_minors))
2174
    if test:
2175
      # we cannot check drbd status
2176
      return
2177

    
2178
    for minor, (iname, must_exist) in node_drbd.items():
2179
      test = minor not in used_minors and must_exist
2180
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2181
               "drbd minor %d of instance %s is not active", minor, iname)
2182
    for minor in used_minors:
2183
      test = minor not in node_drbd
2184
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2185
               "unallocated drbd minor %d is in use", minor)
2186

    
2187
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2188
    """Builds the node OS structures.
2189

2190
    @type ninfo: L{objects.Node}
2191
    @param ninfo: the node to check
2192
    @param nresult: the remote results for the node
2193
    @param nimg: the node image object
2194

2195
    """
2196
    node = ninfo.name
2197
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2198

    
2199
    remote_os = nresult.get(constants.NV_OSLIST, None)
2200
    test = (not isinstance(remote_os, list) or
2201
            not compat.all(isinstance(v, list) and len(v) == 7
2202
                           for v in remote_os))
2203

    
2204
    _ErrorIf(test, constants.CV_ENODEOS, node,
2205
             "node hasn't returned valid OS data")
2206

    
2207
    nimg.os_fail = test
2208

    
2209
    if test:
2210
      return
2211

    
2212
    os_dict = {}
2213

    
2214
    for (name, os_path, status, diagnose,
2215
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2216

    
2217
      if name not in os_dict:
2218
        os_dict[name] = []
2219

    
2220
      # parameters is a list of lists instead of list of tuples due to
2221
      # JSON lacking a real tuple type, fix it:
2222
      parameters = [tuple(v) for v in parameters]
2223
      os_dict[name].append((os_path, status, diagnose,
2224
                            set(variants), set(parameters), set(api_ver)))
2225

    
2226
    nimg.oslist = os_dict
2227

    
2228
  def _VerifyNodeOS(self, ninfo, nimg, base):
2229
    """Verifies the node OS list.
2230

2231
    @type ninfo: L{objects.Node}
2232
    @param ninfo: the node to check
2233
    @param nimg: the node image object
2234
    @param base: the 'template' node we match against (e.g. from the master)
2235

2236
    """
2237
    node = ninfo.name
2238
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2239

    
2240
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2241

    
2242
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2243
    for os_name, os_data in nimg.oslist.items():
2244
      assert os_data, "Empty OS status for OS %s?!" % os_name
2245
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2246
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2247
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2248
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2249
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2250
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2251
      # comparisons with the 'base' image
2252
      test = os_name not in base.oslist
2253
      _ErrorIf(test, constants.CV_ENODEOS, node,
2254
               "Extra OS %s not present on reference node (%s)",
2255
               os_name, base.name)
2256
      if test:
2257
        continue
2258
      assert base.oslist[os_name], "Base node has empty OS status?"
2259
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2260
      if not b_status:
2261
        # base OS is invalid, skipping
2262
        continue
2263
      for kind, a, b in [("API version", f_api, b_api),
2264
                         ("variants list", f_var, b_var),
2265
                         ("parameters", beautify_params(f_param),
2266
                          beautify_params(b_param))]:
2267
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2268
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2269
                 kind, os_name, base.name,
2270
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2271

    
2272
    # check any missing OSes
2273
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2274
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2275
             "OSes present on reference node %s but missing on this node: %s",
2276
             base.name, utils.CommaJoin(missing))
2277

    
2278
  def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2279
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2280

2281
    @type ninfo: L{objects.Node}
2282
    @param ninfo: the node to check
2283
    @param nresult: the remote results for the node
2284
    @type is_master: bool
2285
    @param is_master: Whether node is the master node
2286

2287
    """
2288
    node = ninfo.name
2289

    
2290
    if (is_master and
2291
        (constants.ENABLE_FILE_STORAGE or
2292
         constants.ENABLE_SHARED_FILE_STORAGE)):
2293
      try:
2294
        fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2295
      except KeyError:
2296
        # This should never happen
2297
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2298
                      "Node did not return forbidden file storage paths")
2299
      else:
2300
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2301
                      "Found forbidden file storage paths: %s",
2302
                      utils.CommaJoin(fspaths))
2303
    else:
2304
      self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2305
                    constants.CV_ENODEFILESTORAGEPATHS, node,
2306
                    "Node should not have returned forbidden file storage"
2307
                    " paths")
2308

    
2309
  def _VerifyOob(self, ninfo, nresult):
2310
    """Verifies out of band functionality of a node.
2311

2312
    @type ninfo: L{objects.Node}
2313
    @param ninfo: the node to check
2314
    @param nresult: the remote results for the node
2315

2316
    """
2317
    node = ninfo.name
2318
    # We just have to verify the paths on master and/or master candidates
2319
    # as the oob helper is invoked on the master
2320
    if ((ninfo.master_candidate or ninfo.master_capable) and
2321
        constants.NV_OOB_PATHS in nresult):
2322
      for path_result in nresult[constants.NV_OOB_PATHS]:
2323
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2324

    
2325
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2326
    """Verifies and updates the node volume data.
2327

2328
    This function will update a L{NodeImage}'s internal structures
2329
    with data from the remote call.
2330

2331
    @type ninfo: L{objects.Node}
2332
    @param ninfo: the node to check
2333
    @param nresult: the remote results for the node
2334
    @param nimg: the node image object
2335
    @param vg_name: the configured VG name
2336

2337
    """
2338
    node = ninfo.name
2339
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2340

    
2341
    nimg.lvm_fail = True
2342
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2343
    if vg_name is None:
2344
      pass
2345
    elif isinstance(lvdata, basestring):
2346
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2347
               utils.SafeEncode(lvdata))
2348
    elif not isinstance(lvdata, dict):
2349
      _ErrorIf(True, constants.CV_ENODELVM, node,
2350
               "rpc call to node failed (lvlist)")
2351
    else:
2352
      nimg.volumes = lvdata
2353
      nimg.lvm_fail = False
2354

    
2355
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2356
    """Verifies and updates the node instance list.
2357

2358
    If the listing was successful, then updates this node's instance
2359
    list. Otherwise, it marks the RPC call as failed for the instance
2360
    list key.
2361

2362
    @type ninfo: L{objects.Node}
2363
    @param ninfo: the node to check
2364
    @param nresult: the remote results for the node
2365
    @param nimg: the node image object
2366

2367
    """
2368
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2369
    test = not isinstance(idata, list)
2370
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2371
                  "rpc call to node failed (instancelist): %s",
2372
                  utils.SafeEncode(str(idata)))
2373
    if test:
2374
      nimg.hyp_fail = True
2375
    else:
2376
      nimg.instances = idata
2377

    
2378
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2379
    """Verifies and computes a node information map
2380

2381
    @type ninfo: L{objects.Node}
2382
    @param ninfo: the node to check
2383
    @param nresult: the remote results for the node
2384
    @param nimg: the node image object
2385
    @param vg_name: the configured VG name
2386

2387
    """
2388
    node = ninfo.name
2389
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2390

    
2391
    # try to read free memory (from the hypervisor)
2392
    hv_info = nresult.get(constants.NV_HVINFO, None)
2393
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2394
    _ErrorIf(test, constants.CV_ENODEHV, node,
2395
             "rpc call to node failed (hvinfo)")
2396
    if not test:
2397
      try:
2398
        nimg.mfree = int(hv_info["memory_free"])
2399
      except (ValueError, TypeError):
2400
        _ErrorIf(True, constants.CV_ENODERPC, node,
2401
                 "node returned invalid nodeinfo, check hypervisor")
2402

    
2403
    # FIXME: devise a free space model for file based instances as well
2404
    if vg_name is not None:
2405
      test = (constants.NV_VGLIST not in nresult or
2406
              vg_name not in nresult[constants.NV_VGLIST])
2407
      _ErrorIf(test, constants.CV_ENODELVM, node,
2408
               "node didn't return data for the volume group '%s'"
2409
               " - it is either missing or broken", vg_name)
2410
      if not test:
2411
        try:
2412
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2413
        except (ValueError, TypeError):
2414
          _ErrorIf(True, constants.CV_ENODERPC, node,
2415
                   "node returned invalid LVM info, check LVM status")
2416

    
2417
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2418
    """Gets per-disk status information for all instances.
2419

2420
    @type nodelist: list of strings
2421
    @param nodelist: Node names
2422
    @type node_image: dict of (name, L{objects.Node})
2423
    @param node_image: Node objects
2424
    @type instanceinfo: dict of (name, L{objects.Instance})
2425
    @param instanceinfo: Instance objects
2426
    @rtype: {instance: {node: [(succes, payload)]}}
2427
    @return: a dictionary of per-instance dictionaries with nodes as
2428
        keys and disk information as values; the disk information is a
2429
        list of tuples (success, payload)
2430

2431
    """
2432
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2433

    
2434
    node_disks = {}
2435
    node_disks_devonly = {}
2436
    diskless_instances = set()
2437
    diskless = constants.DT_DISKLESS
2438

    
2439
    for nname in nodelist:
2440
      node_instances = list(itertools.chain(node_image[nname].pinst,
2441
                                            node_image[nname].sinst))
2442
      diskless_instances.update(inst for inst in node_instances
2443
                                if instanceinfo[inst].disk_template == diskless)
2444
      disks = [(inst, disk)
2445
               for inst in node_instances
2446
               for disk in instanceinfo[inst].disks]
2447

    
2448
      if not disks:
2449
        # No need to collect data
2450
        continue
2451

    
2452
      node_disks[nname] = disks
2453

    
2454
      # _AnnotateDiskParams makes already copies of the disks
2455
      devonly = []
2456
      for (inst, dev) in disks:
2457
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2458
        self.cfg.SetDiskID(anno_disk, nname)
2459
        devonly.append(anno_disk)
2460

    
2461
      node_disks_devonly[nname] = devonly
2462

    
2463
    assert len(node_disks) == len(node_disks_devonly)
2464

    
2465
    # Collect data from all nodes with disks
2466
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2467
                                                          node_disks_devonly)
2468

    
2469
    assert len(result) == len(node_disks)
2470

    
2471
    instdisk = {}
2472

    
2473
    for (nname, nres) in result.items():
2474
      disks = node_disks[nname]
2475

    
2476
      if nres.offline:
2477
        # No data from this node
2478
        data = len(disks) * [(False, "node offline")]
2479
      else:
2480
        msg = nres.fail_msg
2481
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2482
                 "while getting disk information: %s", msg)
2483
        if msg:
2484
          # No data from this node
2485
          data = len(disks) * [(False, msg)]
2486
        else:
2487
          data = []
2488
          for idx, i in enumerate(nres.payload):
2489
            if isinstance(i, (tuple, list)) and len(i) == 2:
2490
              data.append(i)
2491
            else:
2492
              logging.warning("Invalid result from node %s, entry %d: %s",
2493
                              nname, idx, i)
2494
              data.append((False, "Invalid result from the remote node"))
2495

    
2496
      for ((inst, _), status) in zip(disks, data):
2497
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2498

    
2499
    # Add empty entries for diskless instances.
2500
    for inst in diskless_instances:
2501
      assert inst not in instdisk
2502
      instdisk[inst] = {}
2503

    
2504
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2505
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2506
                      compat.all(isinstance(s, (tuple, list)) and
2507
                                 len(s) == 2 for s in statuses)
2508
                      for inst, nnames in instdisk.items()
2509
                      for nname, statuses in nnames.items())
2510
    if __debug__:
2511
      instdisk_keys = set(instdisk)
2512
      instanceinfo_keys = set(instanceinfo)
2513
      assert instdisk_keys == instanceinfo_keys, \
2514
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2515
         (instdisk_keys, instanceinfo_keys))
2516

    
2517
    return instdisk
2518

    
2519
  @staticmethod
2520
  def _SshNodeSelector(group_uuid, all_nodes):
2521
    """Create endless iterators for all potential SSH check hosts.
2522

2523
    """
2524
    nodes = [node for node in all_nodes
2525
             if (node.group != group_uuid and
2526
                 not node.offline)]
2527
    keyfunc = operator.attrgetter("group")
2528

    
2529
    return map(itertools.cycle,
2530
               [sorted(map(operator.attrgetter("name"), names))
2531
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2532
                                                  keyfunc)])
2533

    
2534
  @classmethod
2535
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2536
    """Choose which nodes should talk to which other nodes.
2537

2538
    We will make nodes contact all nodes in their group, and one node from
2539
    every other group.
2540

2541
    @warning: This algorithm has a known issue if one node group is much
2542
      smaller than others (e.g. just one node). In such a case all other
2543
      nodes will talk to the single node.
2544

2545
    """
2546
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2547
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2548

    
2549
    return (online_nodes,
2550
            dict((name, sorted([i.next() for i in sel]))
2551
                 for name in online_nodes))
2552

    
2553
  def BuildHooksEnv(self):
2554
    """Build hooks env.
2555

2556
    Cluster-Verify hooks just ran in the post phase and their failure makes
2557
    the output be logged in the verify output and the verification to fail.
2558

2559
    """
2560
    env = {
2561
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2562
      }
2563

    
2564
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2565
               for node in self.my_node_info.values())
2566

    
2567
    return env
2568

    
2569
  def BuildHooksNodes(self):
2570
    """Build hooks nodes.
2571

2572
    """
2573
    return ([], self.my_node_names)
2574

    
2575
  def Exec(self, feedback_fn):
2576
    """Verify integrity of the node group, performing various test on nodes.
2577

2578
    """
2579
    # This method has too many local variables. pylint: disable=R0914
2580
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2581

    
2582
    if not self.my_node_names:
2583
      # empty node group
2584
      feedback_fn("* Empty node group, skipping verification")
2585
      return True
2586

    
2587
    self.bad = False
2588
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2589
    verbose = self.op.verbose
2590
    self._feedback_fn = feedback_fn
2591

    
2592
    vg_name = self.cfg.GetVGName()
2593
    drbd_helper = self.cfg.GetDRBDHelper()
2594
    cluster = self.cfg.GetClusterInfo()
2595
    hypervisors = cluster.enabled_hypervisors
2596
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2597

    
2598
    i_non_redundant = [] # Non redundant instances
2599
    i_non_a_balanced = [] # Non auto-balanced instances
2600
    i_offline = 0 # Count of offline instances
2601
    n_offline = 0 # Count of offline nodes
2602
    n_drained = 0 # Count of nodes being drained
2603
    node_vol_should = {}
2604

    
2605
    # FIXME: verify OS list
2606

    
2607
    # File verification
2608
    filemap = ComputeAncillaryFiles(cluster, False)
2609

    
2610
    # do local checksums
2611
    master_node = self.master_node = self.cfg.GetMasterNode()
2612
    master_ip = self.cfg.GetMasterIP()
2613

    
2614
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2615

    
2616
    user_scripts = []
2617
    if self.cfg.GetUseExternalMipScript():
2618
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2619

    
2620
    node_verify_param = {
2621
      constants.NV_FILELIST:
2622
        map(vcluster.MakeVirtualPath,
2623
            utils.UniqueSequence(filename
2624
                                 for files in filemap
2625
                                 for filename in files)),
2626
      constants.NV_NODELIST:
2627
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2628
                                  self.all_node_info.values()),
2629
      constants.NV_HYPERVISOR: hypervisors,
2630
      constants.NV_HVPARAMS:
2631
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2632
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2633
                                 for node in node_data_list
2634
                                 if not node.offline],
2635
      constants.NV_INSTANCELIST: hypervisors,
2636
      constants.NV_VERSION: None,
2637
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2638
      constants.NV_NODESETUP: None,
2639
      constants.NV_TIME: None,
2640
      constants.NV_MASTERIP: (master_node, master_ip),
2641
      constants.NV_OSLIST: None,
2642
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2643
      constants.NV_USERSCRIPTS: user_scripts,
2644
      }
2645

    
2646
    if vg_name is not None:
2647
      node_verify_param[constants.NV_VGLIST] = None
2648
      node_verify_param[constants.NV_LVLIST] = vg_name
2649
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2650

    
2651
    if drbd_helper:
2652
      node_verify_param[constants.NV_DRBDVERSION] = None
2653
      node_verify_param[constants.NV_DRBDLIST] = None
2654
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2655

    
2656
    if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2657
      # Load file storage paths only from master node
2658
      node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2659

    
2660
    # bridge checks
2661
    # FIXME: this needs to be changed per node-group, not cluster-wide
2662
    bridges = set()
2663
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2664
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2665
      bridges.add(default_nicpp[constants.NIC_LINK])
2666
    for instance in self.my_inst_info.values():
2667
      for nic in instance.nics:
2668
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2669
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2670
          bridges.add(full_nic[constants.NIC_LINK])
2671

    
2672
    if bridges:
2673
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2674

    
2675
    # Build our expected cluster state
2676
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2677
                                                 name=node.name,
2678
                                                 vm_capable=node.vm_capable))
2679
                      for node in node_data_list)
2680

    
2681
    # Gather OOB paths
2682
    oob_paths = []
2683
    for node in self.all_node_info.values():
2684
      path = SupportsOob(self.cfg, node)
2685
      if path and path not in oob_paths:
2686
        oob_paths.append(path)
2687

    
2688
    if oob_paths:
2689
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2690

    
2691
    for instance in self.my_inst_names:
2692
      inst_config = self.my_inst_info[instance]
2693
      if inst_config.admin_state == constants.ADMINST_OFFLINE:
2694
        i_offline += 1
2695

    
2696
      for nname in inst_config.all_nodes:
2697
        if nname not in node_image:
2698
          gnode = self.NodeImage(name=nname)
2699
          gnode.ghost = (nname not in self.all_node_info)
2700
          node_image[nname] = gnode
2701

    
2702
      inst_config.MapLVsByNode(node_vol_should)
2703

    
2704
      pnode = inst_config.primary_node
2705
      node_image[pnode].pinst.append(instance)
2706

    
2707
      for snode in inst_config.secondary_nodes:
2708
        nimg = node_image[snode]
2709
        nimg.sinst.append(instance)
2710
        if pnode not in nimg.sbp:
2711
          nimg.sbp[pnode] = []
2712
        nimg.sbp[pnode].append(instance)
2713

    
2714
    es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2715
    # The value of exclusive_storage should be the same across the group, so if
2716
    # it's True for at least a node, we act as if it were set for all the nodes
2717
    self._exclusive_storage = compat.any(es_flags.values())
2718
    if self._exclusive_storage:
2719
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2720

    
2721
    # At this point, we have the in-memory data structures complete,
2722
    # except for the runtime information, which we'll gather next
2723

    
2724
    # Due to the way our RPC system works, exact response times cannot be
2725
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2726
    # time before and after executing the request, we can at least have a time
2727
    # window.
2728
    nvinfo_starttime = time.time()
2729
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2730
                                           node_verify_param,
2731
                                           self.cfg.GetClusterName())
2732
    nvinfo_endtime = time.time()
2733

    
2734
    if self.extra_lv_nodes and vg_name is not None:
2735
      extra_lv_nvinfo = \
2736
          self.rpc.call_node_verify(self.extra_lv_nodes,
2737
                                    {constants.NV_LVLIST: vg_name},
2738
                                    self.cfg.GetClusterName())
2739
    else:
2740
      extra_lv_nvinfo = {}
2741

    
2742
    all_drbd_map = self.cfg.ComputeDRBDMap()
2743

    
2744
    feedback_fn("* Gathering disk information (%s nodes)" %
2745
                len(self.my_node_names))
2746
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2747
                                     self.my_inst_info)
2748

    
2749
    feedback_fn("* Verifying configuration file consistency")
2750

    
2751
    # If not all nodes are being checked, we need to make sure the master node
2752
    # and a non-checked vm_capable node are in the list.
2753
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2754
    if absent_nodes:
2755
      vf_nvinfo = all_nvinfo.copy()
2756
      vf_node_info = list(self.my_node_info.values())
2757
      additional_nodes = []
2758
      if master_node not in self.my_node_info:
2759
        additional_nodes.append(master_node)
2760
        vf_node_info.append(self.all_node_info[master_node])
2761
      # Add the first vm_capable node we find which is not included,
2762
      # excluding the master node (which we already have)
2763
      for node in absent_nodes:
2764
        nodeinfo = self.all_node_info[node]
2765
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2766
            node != master_node):
2767
          additional_nodes.append(node)
2768
          vf_node_info.append(self.all_node_info[node])
2769
          break
2770
      key = constants.NV_FILELIST
2771
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2772
                                                 {key: node_verify_param[key]},
2773
                                                 self.cfg.GetClusterName()))
2774
    else:
2775
      vf_nvinfo = all_nvinfo
2776
      vf_node_info = self.my_node_info.values()
2777

    
2778
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2779

    
2780
    feedback_fn("* Verifying node status")
2781

    
2782
    refos_img = None
2783

    
2784
    for node_i in node_data_list:
2785
      node = node_i.name
2786
      nimg = node_image[node]
2787

    
2788
      if node_i.offline:
2789
        if verbose:
2790
          feedback_fn("* Skipping offline node %s" % (node,))
2791
        n_offline += 1
2792
        continue
2793

    
2794
      if node == master_node:
2795
        ntype = "master"
2796
      elif node_i.master_candidate:
2797
        ntype = "master candidate"
2798
      elif node_i.drained:
2799
        ntype = "drained"
2800
        n_drained += 1
2801
      else:
2802
        ntype = "regular"
2803
      if verbose:
2804
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2805

    
2806
      msg = all_nvinfo[node].fail_msg
2807
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2808
               msg)
2809
      if msg:
2810
        nimg.rpc_fail = True
2811
        continue
2812

    
2813
      nresult = all_nvinfo[node].payload
2814

    
2815
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2816
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2817
      self._VerifyNodeNetwork(node_i, nresult)
2818
      self._VerifyNodeUserScripts(node_i, nresult)
2819
      self._VerifyOob(node_i, nresult)
2820
      self._VerifyFileStoragePaths(node_i, nresult,
2821
                                   node == master_node)
2822

    
2823
      if nimg.vm_capable:
2824
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2825
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2826
                             all_drbd_map)
2827

    
2828
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2829
        self._UpdateNodeInstances(node_i, nresult, nimg)
2830
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2831
        self._UpdateNodeOS(node_i, nresult, nimg)
2832

    
2833
        if not nimg.os_fail:
2834
          if refos_img is None:
2835
            refos_img = nimg
2836
          self._VerifyNodeOS(node_i, nimg, refos_img)
2837
        self._VerifyNodeBridges(node_i, nresult, bridges)
2838

    
2839
        # Check whether all running instancies are primary for the node. (This
2840
        # can no longer be done from _VerifyInstance below, since some of the
2841
        # wrong instances could be from other node groups.)
2842
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2843

    
2844
        for inst in non_primary_inst:
2845
          test = inst in self.all_inst_info
2846
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2847
                   "instance should not run on node %s", node_i.name)
2848
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2849
                   "node is running unknown instance %s", inst)
2850

    
2851
    self._VerifyGroupDRBDVersion(all_nvinfo)
2852
    self._VerifyGroupLVM(node_image, vg_name)
2853

    
2854
    for node, result in extra_lv_nvinfo.items():
2855
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2856
                              node_image[node], vg_name)
2857

    
2858
    feedback_fn("* Verifying instance status")
2859
    for instance in self.my_inst_names:
2860
      if verbose:
2861
        feedback_fn("* Verifying instance %s" % instance)
2862
      inst_config = self.my_inst_info[instance]
2863
      self._VerifyInstance(instance, inst_config, node_image,
2864
                           instdisk[instance])
2865

    
2866
      # If the instance is non-redundant we cannot survive losing its primary
2867
      # node, so we are not N+1 compliant.
2868
      if inst_config.disk_template not in constants.DTS_MIRRORED:
2869
        i_non_redundant.append(instance)
2870

    
2871
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2872
        i_non_a_balanced.append(instance)
2873

    
2874
    feedback_fn("* Verifying orphan volumes")
2875
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2876

    
2877
    # We will get spurious "unknown volume" warnings if any node of this group
2878
    # is secondary for an instance whose primary is in another group. To avoid
2879
    # them, we find these instances and add their volumes to node_vol_should.
2880
    for inst in self.all_inst_info.values():
2881
      for secondary in inst.secondary_nodes:
2882
        if (secondary in self.my_node_info
2883
            and inst.name not in self.my_inst_info):
2884
          inst.MapLVsByNode(node_vol_should)
2885
          break
2886

    
2887
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2888

    
2889
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2890
      feedback_fn("* Verifying N+1 Memory redundancy")
2891
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2892

    
2893
    feedback_fn("* Other Notes")
2894
    if i_non_redundant:
2895
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2896
                  % len(i_non_redundant))
2897

    
2898
    if i_non_a_balanced:
2899
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2900
                  % len(i_non_a_balanced))
2901

    
2902
    if i_offline:
2903
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2904

    
2905
    if n_offline:
2906
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2907

    
2908
    if n_drained:
2909
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2910

    
2911
    return not self.bad
2912

    
2913
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2914
    """Analyze the post-hooks' result
2915

2916
    This method analyses the hook result, handles it, and sends some
2917
    nicely-formatted feedback back to the user.
2918

2919
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2920
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2921
    @param hooks_results: the results of the multi-node hooks rpc call
2922
    @param feedback_fn: function used send feedback back to the caller
2923
    @param lu_result: previous Exec result
2924
    @return: the new Exec result, based on the previous result
2925
        and hook results
2926

2927
    """
2928
    # We only really run POST phase hooks, only for non-empty groups,
2929
    # and are only interested in their results
2930
    if not self.my_node_names:
2931
      # empty node group
2932
      pass
2933
    elif phase == constants.HOOKS_PHASE_POST:
2934
      # Used to change hooks' output to proper indentation
2935
      feedback_fn("* Hooks Results")
2936
      assert hooks_results, "invalid result from hooks"
2937

    
2938
      for node_name in hooks_results:
2939
        res = hooks_results[node_name]
2940
        msg = res.fail_msg
2941
        test = msg and not res.offline
2942
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2943
                      "Communication failure in hooks execution: %s", msg)
2944
        if res.offline or msg:
2945
          # No need to investigate payload if node is offline or gave
2946
          # an error.
2947
          continue
2948
        for script, hkr, output in res.payload:
2949
          test = hkr == constants.HKR_FAIL
2950
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2951
                        "Script %s failed, output:", script)
2952
          if test:
2953
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2954
            feedback_fn("%s" % output)
2955
            lu_result = False
2956

    
2957
    return lu_result
2958

    
2959

    
2960
class LUClusterVerifyDisks(NoHooksLU):
2961
  """Verifies the cluster disks status.
2962

2963
  """
2964
  REQ_BGL = False
2965

    
2966
  def ExpandNames(self):
2967
    self.share_locks = ShareAll()
2968
    self.needed_locks = {
2969
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2970
      }
2971

    
2972
  def Exec(self, feedback_fn):
2973
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2974

    
2975
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2976
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2977
                           for group in group_names])