Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ c7dd65be

History | View | Annotate | Download (108.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.name,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, master_params.name)
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
170
                                                     master_params, ems)
171
    result.Warn("Error disabling the master IP address", self.LogWarning)
172
    return master_params.name
173

    
174

    
175
class LUClusterPostInit(LogicalUnit):
176
  """Logical unit for running hooks after cluster initialization.
177

178
  """
179
  HPATH = "cluster-init"
180
  HTYPE = constants.HTYPE_CLUSTER
181

    
182
  def BuildHooksEnv(self):
183
    """Build hooks env.
184

185
    """
186
    return {
187
      "OP_TARGET": self.cfg.GetClusterName(),
188
      }
189

    
190
  def BuildHooksNodes(self):
191
    """Build hooks nodes.
192

193
    """
194
    return ([], [self.cfg.GetMasterNode()])
195

    
196
  def Exec(self, feedback_fn):
197
    """Nothing to do.
198

199
    """
200
    return True
201

    
202

    
203
class ClusterQuery(QueryBase):
204
  FIELDS = query.CLUSTER_FIELDS
205

    
206
  #: Do not sort (there is only one item)
207
  SORT_FIELD = None
208

    
209
  def ExpandNames(self, lu):
210
    lu.needed_locks = {}
211

    
212
    # The following variables interact with _QueryBase._GetNames
213
    self.wanted = locking.ALL_SET
214
    self.do_locking = self.use_locking
215

    
216
    if self.do_locking:
217
      raise errors.OpPrereqError("Can not use locking for cluster queries",
218
                                 errors.ECODE_INVAL)
219

    
220
  def DeclareLocks(self, lu, level):
221
    pass
222

    
223
  def _GetQueryData(self, lu):
224
    """Computes the list of nodes and their attributes.
225

226
    """
227
    # Locking is not used
228
    assert not (compat.any(lu.glm.is_owned(level)
229
                           for level in locking.LEVELS
230
                           if level != locking.LEVEL_CLUSTER) or
231
                self.do_locking or self.use_locking)
232

    
233
    if query.CQ_CONFIG in self.requested_data:
234
      cluster = lu.cfg.GetClusterInfo()
235
    else:
236
      cluster = NotImplemented
237

    
238
    if query.CQ_QUEUE_DRAINED in self.requested_data:
239
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
240
    else:
241
      drain_flag = NotImplemented
242

    
243
    if query.CQ_WATCHER_PAUSE in self.requested_data:
244
      master_name = lu.cfg.GetMasterNode()
245

    
246
      result = lu.rpc.call_get_watcher_pause(master_name)
247
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
248
                   master_name)
249

    
250
      watcher_pause = result.payload
251
    else:
252
      watcher_pause = NotImplemented
253

    
254
    return query.ClusterQueryData(cluster, drain_flag, watcher_pause)
255

    
256

    
257
class LUClusterQuery(NoHooksLU):
258
  """Query cluster configuration.
259

260
  """
261
  REQ_BGL = False
262

    
263
  def ExpandNames(self):
264
    self.needed_locks = {}
265

    
266
  def Exec(self, feedback_fn):
267
    """Return cluster config.
268

269
    """
270
    cluster = self.cfg.GetClusterInfo()
271
    os_hvp = {}
272

    
273
    # Filter just for enabled hypervisors
274
    for os_name, hv_dict in cluster.os_hvp.items():
275
      os_hvp[os_name] = {}
276
      for hv_name, hv_params in hv_dict.items():
277
        if hv_name in cluster.enabled_hypervisors:
278
          os_hvp[os_name][hv_name] = hv_params
279

    
280
    # Convert ip_family to ip_version
281
    primary_ip_version = constants.IP4_VERSION
282
    if cluster.primary_ip_family == netutils.IP6Address.family:
283
      primary_ip_version = constants.IP6_VERSION
284

    
285
    result = {
286
      "software_version": constants.RELEASE_VERSION,
287
      "protocol_version": constants.PROTOCOL_VERSION,
288
      "config_version": constants.CONFIG_VERSION,
289
      "os_api_version": max(constants.OS_API_VERSIONS),
290
      "export_version": constants.EXPORT_VERSION,
291
      "architecture": runtime.GetArchInfo(),
292
      "name": cluster.cluster_name,
293
      "master": cluster.master_node,
294
      "default_hypervisor": cluster.primary_hypervisor,
295
      "enabled_hypervisors": cluster.enabled_hypervisors,
296
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
297
                        for hypervisor_name in cluster.enabled_hypervisors]),
298
      "os_hvp": os_hvp,
299
      "beparams": cluster.beparams,
300
      "osparams": cluster.osparams,
301
      "ipolicy": cluster.ipolicy,
302
      "nicparams": cluster.nicparams,
303
      "ndparams": cluster.ndparams,
304
      "diskparams": cluster.diskparams,
305
      "candidate_pool_size": cluster.candidate_pool_size,
306
      "master_netdev": cluster.master_netdev,
307
      "master_netmask": cluster.master_netmask,
308
      "use_external_mip_script": cluster.use_external_mip_script,
309
      "volume_group_name": cluster.volume_group_name,
310
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
311
      "file_storage_dir": cluster.file_storage_dir,
312
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
313
      "maintain_node_health": cluster.maintain_node_health,
314
      "ctime": cluster.ctime,
315
      "mtime": cluster.mtime,
316
      "uuid": cluster.uuid,
317
      "tags": list(cluster.GetTags()),
318
      "uid_pool": cluster.uid_pool,
319
      "default_iallocator": cluster.default_iallocator,
320
      "reserved_lvs": cluster.reserved_lvs,
321
      "primary_ip_version": primary_ip_version,
322
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
323
      "hidden_os": cluster.hidden_os,
324
      "blacklisted_os": cluster.blacklisted_os,
325
      }
326

    
327
    return result
328

    
329

    
330
class LUClusterRedistConf(NoHooksLU):
331
  """Force the redistribution of cluster configuration.
332

333
  This is a very simple LU.
334

335
  """
336
  REQ_BGL = False
337

    
338
  def ExpandNames(self):
339
    self.needed_locks = {
340
      locking.LEVEL_NODE: locking.ALL_SET,
341
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
342
    }
343
    self.share_locks = ShareAll()
344

    
345
  def Exec(self, feedback_fn):
346
    """Redistribute the configuration.
347

348
    """
349
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
350
    RedistributeAncillaryFiles(self)
351

    
352

    
353
class LUClusterRename(LogicalUnit):
354
  """Rename the cluster.
355

356
  """
357
  HPATH = "cluster-rename"
358
  HTYPE = constants.HTYPE_CLUSTER
359

    
360
  def BuildHooksEnv(self):
361
    """Build hooks env.
362

363
    """
364
    return {
365
      "OP_TARGET": self.cfg.GetClusterName(),
366
      "NEW_NAME": self.op.name,
367
      }
368

    
369
  def BuildHooksNodes(self):
370
    """Build hooks nodes.
371

372
    """
373
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
374

    
375
  def CheckPrereq(self):
376
    """Verify that the passed name is a valid one.
377

378
    """
379
    hostname = netutils.GetHostname(name=self.op.name,
380
                                    family=self.cfg.GetPrimaryIPFamily())
381

    
382
    new_name = hostname.name
383
    self.ip = new_ip = hostname.ip
384
    old_name = self.cfg.GetClusterName()
385
    old_ip = self.cfg.GetMasterIP()
386
    if new_name == old_name and new_ip == old_ip:
387
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
388
                                 " cluster has changed",
389
                                 errors.ECODE_INVAL)
390
    if new_ip != old_ip:
391
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
392
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
393
                                   " reachable on the network" %
394
                                   new_ip, errors.ECODE_NOTUNIQUE)
395

    
396
    self.op.name = new_name
397

    
398
  def Exec(self, feedback_fn):
399
    """Rename the cluster.
400

401
    """
402
    clustername = self.op.name
403
    new_ip = self.ip
404

    
405
    # shutdown the master IP
406
    master_params = self.cfg.GetMasterNetworkParameters()
407
    ems = self.cfg.GetUseExternalMipScript()
408
    result = self.rpc.call_node_deactivate_master_ip(master_params.name,
409
                                                     master_params, ems)
410
    result.Raise("Could not disable the master role")
411

    
412
    try:
413
      cluster = self.cfg.GetClusterInfo()
414
      cluster.cluster_name = clustername
415
      cluster.master_ip = new_ip
416
      self.cfg.Update(cluster, feedback_fn)
417

    
418
      # update the known hosts file
419
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
420
      node_list = self.cfg.GetOnlineNodeList()
421
      try:
422
        node_list.remove(master_params.name)
423
      except ValueError:
424
        pass
425
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
426
    finally:
427
      master_params.ip = new_ip
428
      result = self.rpc.call_node_activate_master_ip(master_params.name,
429
                                                     master_params, ems)
430
      result.Warn("Could not re-enable the master role on the master,"
431
                  " please restart manually", self.LogWarning)
432

    
433
    return clustername
434

    
435

    
436
class LUClusterRepairDiskSizes(NoHooksLU):
437
  """Verifies the cluster disks sizes.
438

439
  """
440
  REQ_BGL = False
441

    
442
  def ExpandNames(self):
443
    if self.op.instances:
444
      self.wanted_names = GetWantedInstances(self, self.op.instances)
445
      # Not getting the node allocation lock as only a specific set of
446
      # instances (and their nodes) is going to be acquired
447
      self.needed_locks = {
448
        locking.LEVEL_NODE_RES: [],
449
        locking.LEVEL_INSTANCE: self.wanted_names,
450
        }
451
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
452
    else:
453
      self.wanted_names = None
454
      self.needed_locks = {
455
        locking.LEVEL_NODE_RES: locking.ALL_SET,
456
        locking.LEVEL_INSTANCE: locking.ALL_SET,
457

    
458
        # This opcode is acquires the node locks for all instances
459
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
460
        }
461

    
462
    self.share_locks = {
463
      locking.LEVEL_NODE_RES: 1,
464
      locking.LEVEL_INSTANCE: 0,
465
      locking.LEVEL_NODE_ALLOC: 1,
466
      }
467

    
468
  def DeclareLocks(self, level):
469
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
470
      self._LockInstancesNodes(primary_only=True, level=level)
471

    
472
  def CheckPrereq(self):
473
    """Check prerequisites.
474

475
    This only checks the optional instance list against the existing names.
476

477
    """
478
    if self.wanted_names is None:
479
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
480

    
481
    self.wanted_instances = \
482
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
483

    
484
  def _EnsureChildSizes(self, disk):
485
    """Ensure children of the disk have the needed disk size.
486

487
    This is valid mainly for DRBD8 and fixes an issue where the
488
    children have smaller disk size.
489

490
    @param disk: an L{ganeti.objects.Disk} object
491

492
    """
493
    if disk.dev_type == constants.LD_DRBD8:
494
      assert disk.children, "Empty children for DRBD8?"
495
      fchild = disk.children[0]
496
      mismatch = fchild.size < disk.size
497
      if mismatch:
498
        self.LogInfo("Child disk has size %d, parent %d, fixing",
499
                     fchild.size, disk.size)
500
        fchild.size = disk.size
501

    
502
      # and we recurse on this child only, not on the metadev
503
      return self._EnsureChildSizes(fchild) or mismatch
504
    else:
505
      return False
506

    
507
  def Exec(self, feedback_fn):
508
    """Verify the size of cluster disks.
509

510
    """
511
    # TODO: check child disks too
512
    # TODO: check differences in size between primary/secondary nodes
513
    per_node_disks = {}
514
    for instance in self.wanted_instances:
515
      pnode = instance.primary_node
516
      if pnode not in per_node_disks:
517
        per_node_disks[pnode] = []
518
      for idx, disk in enumerate(instance.disks):
519
        per_node_disks[pnode].append((instance, idx, disk))
520

    
521
    assert not (frozenset(per_node_disks.keys()) -
522
                self.owned_locks(locking.LEVEL_NODE_RES)), \
523
      "Not owning correct locks"
524
    assert not self.owned_locks(locking.LEVEL_NODE)
525

    
526
    es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
527
                                                   per_node_disks.keys())
528

    
529
    changed = []
530
    for node, dskl in per_node_disks.items():
531
      newl = [v[2].Copy() for v in dskl]
532
      for dsk in newl:
533
        self.cfg.SetDiskID(dsk, node)
534
      result = self.rpc.call_blockdev_getdimensions(node, newl)
535
      if result.fail_msg:
536
        self.LogWarning("Failure in blockdev_getdimensions call to node"
537
                        " %s, ignoring", node)
538
        continue
539
      if len(result.payload) != len(dskl):
540
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
541
                        " result.payload=%s", node, len(dskl), result.payload)
542
        self.LogWarning("Invalid result from node %s, ignoring node results",
543
                        node)
544
        continue
545
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
546
        if dimensions is None:
547
          self.LogWarning("Disk %d of instance %s did not return size"
548
                          " information, ignoring", idx, instance.name)
549
          continue
550
        if not isinstance(dimensions, (tuple, list)):
551
          self.LogWarning("Disk %d of instance %s did not return valid"
552
                          " dimension information, ignoring", idx,
553
                          instance.name)
554
          continue
555
        (size, spindles) = dimensions
556
        if not isinstance(size, (int, long)):
557
          self.LogWarning("Disk %d of instance %s did not return valid"
558
                          " size information, ignoring", idx, instance.name)
559
          continue
560
        size = size >> 20
561
        if size != disk.size:
562
          self.LogInfo("Disk %d of instance %s has mismatched size,"
563
                       " correcting: recorded %d, actual %d", idx,
564
                       instance.name, disk.size, size)
565
          disk.size = size
566
          self.cfg.Update(instance, feedback_fn)
567
          changed.append((instance.name, idx, "size", size))
568
        if es_flags[node]:
569
          if spindles is None:
570
            self.LogWarning("Disk %d of instance %s did not return valid"
571
                            " spindles information, ignoring", idx,
572
                            instance.name)
573
          elif disk.spindles is None or disk.spindles != spindles:
574
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
575
                         " correcting: recorded %s, actual %s",
576
                         idx, instance.name, disk.spindles, spindles)
577
            disk.spindles = spindles
578
            self.cfg.Update(instance, feedback_fn)
579
            changed.append((instance.name, idx, "spindles", disk.spindles))
580
        if self._EnsureChildSizes(disk):
581
          self.cfg.Update(instance, feedback_fn)
582
          changed.append((instance.name, idx, "size", disk.size))
583
    return changed
584

    
585

    
586
def _ValidateNetmask(cfg, netmask):
587
  """Checks if a netmask is valid.
588

589
  @type cfg: L{config.ConfigWriter}
590
  @param cfg: The cluster configuration
591
  @type netmask: int
592
  @param netmask: the netmask to be verified
593
  @raise errors.OpPrereqError: if the validation fails
594

595
  """
596
  ip_family = cfg.GetPrimaryIPFamily()
597
  try:
598
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
599
  except errors.ProgrammerError:
600
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
601
                               ip_family, errors.ECODE_INVAL)
602
  if not ipcls.ValidateNetmask(netmask):
603
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
604
                               (netmask), errors.ECODE_INVAL)
605

    
606

    
607
class LUClusterSetParams(LogicalUnit):
608
  """Change the parameters of the cluster.
609

610
  """
611
  HPATH = "cluster-modify"
612
  HTYPE = constants.HTYPE_CLUSTER
613
  REQ_BGL = False
614

    
615
  def CheckArguments(self):
616
    """Check parameters
617

618
    """
619
    if self.op.uid_pool:
620
      uidpool.CheckUidPool(self.op.uid_pool)
621

    
622
    if self.op.add_uids:
623
      uidpool.CheckUidPool(self.op.add_uids)
624

    
625
    if self.op.remove_uids:
626
      uidpool.CheckUidPool(self.op.remove_uids)
627

    
628
    if self.op.master_netmask is not None:
629
      _ValidateNetmask(self.cfg, self.op.master_netmask)
630

    
631
    if self.op.diskparams:
632
      for dt_params in self.op.diskparams.values():
633
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
634
      try:
635
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
636
      except errors.OpPrereqError, err:
637
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
638
                                   errors.ECODE_INVAL)
639

    
640
  def ExpandNames(self):
641
    # FIXME: in the future maybe other cluster params won't require checking on
642
    # all nodes to be modified.
643
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
644
    # resource locks the right thing, shouldn't it be the BGL instead?
645
    self.needed_locks = {
646
      locking.LEVEL_NODE: locking.ALL_SET,
647
      locking.LEVEL_INSTANCE: locking.ALL_SET,
648
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
649
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
650
    }
651
    self.share_locks = ShareAll()
652

    
653
  def BuildHooksEnv(self):
654
    """Build hooks env.
655

656
    """
657
    return {
658
      "OP_TARGET": self.cfg.GetClusterName(),
659
      "NEW_VG_NAME": self.op.vg_name,
660
      }
661

    
662
  def BuildHooksNodes(self):
663
    """Build hooks nodes.
664

665
    """
666
    mn = self.cfg.GetMasterNode()
667
    return ([mn], [mn])
668

    
669
  def _CheckVgName(self, node_list, enabled_disk_templates,
670
                   new_enabled_disk_templates):
671
    """Check the consistency of the vg name on all nodes and in case it gets
672
       unset whether there are instances still using it.
673

674
    """
675
    if self.op.vg_name is not None and not self.op.vg_name:
676
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
677
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
678
                                   " instances exist", errors.ECODE_INVAL)
679

    
680
    if (self.op.vg_name is not None and
681
        utils.IsLvmEnabled(enabled_disk_templates)) or \
682
           (self.cfg.GetVGName() is not None and
683
            utils.LvmGetsEnabled(enabled_disk_templates,
684
                                 new_enabled_disk_templates)):
685
      self._CheckVgNameOnNodes(node_list)
686

    
687
  def _CheckVgNameOnNodes(self, node_list):
688
    """Check the status of the volume group on each node.
689

690
    """
691
    vglist = self.rpc.call_vg_list(node_list)
692
    for node in node_list:
693
      msg = vglist[node].fail_msg
694
      if msg:
695
        # ignoring down node
696
        self.LogWarning("Error while gathering data on node %s"
697
                        " (ignoring node): %s", node, msg)
698
        continue
699
      vgstatus = utils.CheckVolumeGroupSize(vglist[node].payload,
700
                                            self.op.vg_name,
701
                                            constants.MIN_VG_SIZE)
702
      if vgstatus:
703
        raise errors.OpPrereqError("Error on node '%s': %s" %
704
                                   (node, vgstatus), errors.ECODE_ENVIRON)
705

    
706
  def _GetEnabledDiskTemplates(self, cluster):
707
    """Determines the enabled disk templates and the subset of disk templates
708
       that are newly enabled by this operation.
709

710
    """
711
    enabled_disk_templates = None
712
    new_enabled_disk_templates = []
713
    if self.op.enabled_disk_templates:
714
      enabled_disk_templates = self.op.enabled_disk_templates
715
      new_enabled_disk_templates = \
716
        list(set(enabled_disk_templates)
717
             - set(cluster.enabled_disk_templates))
718
    else:
719
      enabled_disk_templates = cluster.enabled_disk_templates
720
    return (enabled_disk_templates, new_enabled_disk_templates)
721

    
722
  def CheckPrereq(self):
723
    """Check prerequisites.
724

725
    This checks whether the given params don't conflict and
726
    if the given volume group is valid.
727

728
    """
729
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
730
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
731
        raise errors.OpPrereqError("Cannot disable drbd helper while"
732
                                   " drbd-based instances exist",
733
                                   errors.ECODE_INVAL)
734

    
735
    node_list = self.owned_locks(locking.LEVEL_NODE)
736
    self.cluster = cluster = self.cfg.GetClusterInfo()
737

    
738
    vm_capable_nodes = [node.name
739
                        for node in self.cfg.GetAllNodesInfo().values()
740
                        if node.name in node_list and node.vm_capable]
741

    
742
    (enabled_disk_templates, new_enabled_disk_templates) = \
743
      self._GetEnabledDiskTemplates(cluster)
744

    
745
    self._CheckVgName(vm_capable_nodes, enabled_disk_templates,
746
                      new_enabled_disk_templates)
747

    
748
    if self.op.drbd_helper:
749
      # checks given drbd helper on all nodes
750
      helpers = self.rpc.call_drbd_helper(node_list)
751
      for (node, ninfo) in self.cfg.GetMultiNodeInfo(node_list):
752
        if ninfo.offline:
753
          self.LogInfo("Not checking drbd helper on offline node %s", node)
754
          continue
755
        msg = helpers[node].fail_msg
756
        if msg:
757
          raise errors.OpPrereqError("Error checking drbd helper on node"
758
                                     " '%s': %s" % (node, msg),
759
                                     errors.ECODE_ENVIRON)
760
        node_helper = helpers[node].payload
761
        if node_helper != self.op.drbd_helper:
762
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
763
                                     (node, node_helper), errors.ECODE_ENVIRON)
764

    
765
    # validate params changes
766
    if self.op.beparams:
767
      objects.UpgradeBeParams(self.op.beparams)
768
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
769
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
770

    
771
    if self.op.ndparams:
772
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
773
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
774

    
775
      # TODO: we need a more general way to handle resetting
776
      # cluster-level parameters to default values
777
      if self.new_ndparams["oob_program"] == "":
778
        self.new_ndparams["oob_program"] = \
779
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
780

    
781
    if self.op.hv_state:
782
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
783
                                           self.cluster.hv_state_static)
784
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
785
                               for hv, values in new_hv_state.items())
786

    
787
    if self.op.disk_state:
788
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
789
                                               self.cluster.disk_state_static)
790
      self.new_disk_state = \
791
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
792
                            for name, values in svalues.items()))
793
             for storage, svalues in new_disk_state.items())
794

    
795
    if self.op.ipolicy:
796
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
797
                                           group_policy=False)
798

    
799
      all_instances = self.cfg.GetAllInstancesInfo().values()
800
      violations = set()
801
      for group in self.cfg.GetAllNodeGroupsInfo().values():
802
        instances = frozenset([inst for inst in all_instances
803
                               if compat.any(node in group.members
804
                                             for node in inst.all_nodes)])
805
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
806
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
807
        new = ComputeNewInstanceViolations(ipol,
808
                                           new_ipolicy, instances, self.cfg)
809
        if new:
810
          violations.update(new)
811

    
812
      if violations:
813
        self.LogWarning("After the ipolicy change the following instances"
814
                        " violate them: %s",
815
                        utils.CommaJoin(utils.NiceSort(violations)))
816

    
817
    if self.op.nicparams:
818
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
819
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
820
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
821
      nic_errors = []
822

    
823
      # check all instances for consistency
824
      for instance in self.cfg.GetAllInstancesInfo().values():
825
        for nic_idx, nic in enumerate(instance.nics):
826
          params_copy = copy.deepcopy(nic.nicparams)
827
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
828

    
829
          # check parameter syntax
830
          try:
831
            objects.NIC.CheckParameterSyntax(params_filled)
832
          except errors.ConfigurationError, err:
833
            nic_errors.append("Instance %s, nic/%d: %s" %
834
                              (instance.name, nic_idx, err))
835

    
836
          # if we're moving instances to routed, check that they have an ip
837
          target_mode = params_filled[constants.NIC_MODE]
838
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
839
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
840
                              " address" % (instance.name, nic_idx))
841
      if nic_errors:
842
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
843
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
844

    
845
    # hypervisor list/parameters
846
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
847
    if self.op.hvparams:
848
      for hv_name, hv_dict in self.op.hvparams.items():
849
        if hv_name not in self.new_hvparams:
850
          self.new_hvparams[hv_name] = hv_dict
851
        else:
852
          self.new_hvparams[hv_name].update(hv_dict)
853

    
854
    # disk template parameters
855
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
856
    if self.op.diskparams:
857
      for dt_name, dt_params in self.op.diskparams.items():
858
        if dt_name not in self.op.diskparams:
859
          self.new_diskparams[dt_name] = dt_params
860
        else:
861
          self.new_diskparams[dt_name].update(dt_params)
862

    
863
    # os hypervisor parameters
864
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
865
    if self.op.os_hvp:
866
      for os_name, hvs in self.op.os_hvp.items():
867
        if os_name not in self.new_os_hvp:
868
          self.new_os_hvp[os_name] = hvs
869
        else:
870
          for hv_name, hv_dict in hvs.items():
871
            if hv_dict is None:
872
              # Delete if it exists
873
              self.new_os_hvp[os_name].pop(hv_name, None)
874
            elif hv_name not in self.new_os_hvp[os_name]:
875
              self.new_os_hvp[os_name][hv_name] = hv_dict
876
            else:
877
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
878

    
879
    # os parameters
880
    self.new_osp = objects.FillDict(cluster.osparams, {})
881
    if self.op.osparams:
882
      for os_name, osp in self.op.osparams.items():
883
        if os_name not in self.new_osp:
884
          self.new_osp[os_name] = {}
885

    
886
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
887
                                                 use_none=True)
888

    
889
        if not self.new_osp[os_name]:
890
          # we removed all parameters
891
          del self.new_osp[os_name]
892
        else:
893
          # check the parameter validity (remote check)
894
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
895
                        os_name, self.new_osp[os_name])
896

    
897
    # changes to the hypervisor list
898
    if self.op.enabled_hypervisors is not None:
899
      self.hv_list = self.op.enabled_hypervisors
900
      for hv in self.hv_list:
901
        # if the hypervisor doesn't already exist in the cluster
902
        # hvparams, we initialize it to empty, and then (in both
903
        # cases) we make sure to fill the defaults, as we might not
904
        # have a complete defaults list if the hypervisor wasn't
905
        # enabled before
906
        if hv not in new_hvp:
907
          new_hvp[hv] = {}
908
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
909
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
910
    else:
911
      self.hv_list = cluster.enabled_hypervisors
912

    
913
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
914
      # either the enabled list has changed, or the parameters have, validate
915
      for hv_name, hv_params in self.new_hvparams.items():
916
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
917
            (self.op.enabled_hypervisors and
918
             hv_name in self.op.enabled_hypervisors)):
919
          # either this is a new hypervisor, or its parameters have changed
920
          hv_class = hypervisor.GetHypervisorClass(hv_name)
921
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
922
          hv_class.CheckParameterSyntax(hv_params)
923
          CheckHVParams(self, node_list, hv_name, hv_params)
924

    
925
    self._CheckDiskTemplateConsistency()
926

    
927
    if self.op.os_hvp:
928
      # no need to check any newly-enabled hypervisors, since the
929
      # defaults have already been checked in the above code-block
930
      for os_name, os_hvp in self.new_os_hvp.items():
931
        for hv_name, hv_params in os_hvp.items():
932
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
933
          # we need to fill in the new os_hvp on top of the actual hv_p
934
          cluster_defaults = self.new_hvparams.get(hv_name, {})
935
          new_osp = objects.FillDict(cluster_defaults, hv_params)
936
          hv_class = hypervisor.GetHypervisorClass(hv_name)
937
          hv_class.CheckParameterSyntax(new_osp)
938
          CheckHVParams(self, node_list, hv_name, new_osp)
939

    
940
    if self.op.default_iallocator:
941
      alloc_script = utils.FindFile(self.op.default_iallocator,
942
                                    constants.IALLOCATOR_SEARCH_PATH,
943
                                    os.path.isfile)
944
      if alloc_script is None:
945
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
946
                                   " specified" % self.op.default_iallocator,
947
                                   errors.ECODE_INVAL)
948

    
949
  def _CheckDiskTemplateConsistency(self):
950
    """Check whether the disk templates that are going to be disabled
951
       are still in use by some instances.
952

953
    """
954
    if self.op.enabled_disk_templates:
955
      cluster = self.cfg.GetClusterInfo()
956
      instances = self.cfg.GetAllInstancesInfo()
957

    
958
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
959
        - set(self.op.enabled_disk_templates)
960
      for instance in instances.itervalues():
961
        if instance.disk_template in disk_templates_to_remove:
962
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
963
                                     " because instance '%s' is using it." %
964
                                     (instance.disk_template, instance.name))
965

    
966
  def _SetVgName(self, feedback_fn):
967
    """Determines and sets the new volume group name.
968

969
    """
970
    if self.op.vg_name is not None:
971
      if self.op.vg_name and not \
972
           utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
973
        feedback_fn("Note that you specified a volume group, but did not"
974
                    " enable any lvm disk template.")
975
      new_volume = self.op.vg_name
976
      if not new_volume:
977
        if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
978
          raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
979
                                     " disk templates are enabled.")
980
        new_volume = None
981
      if new_volume != self.cfg.GetVGName():
982
        self.cfg.SetVGName(new_volume)
983
      else:
984
        feedback_fn("Cluster LVM configuration already in desired"
985
                    " state, not changing")
986
    else:
987
      if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
988
          not self.cfg.GetVGName():
989
        raise errors.OpPrereqError("Please specify a volume group when"
990
                                   " enabling lvm-based disk-templates.")
991

    
992
  def Exec(self, feedback_fn):
993
    """Change the parameters of the cluster.
994

995
    """
996
    if self.op.enabled_disk_templates:
997
      self.cluster.enabled_disk_templates = \
998
        list(set(self.op.enabled_disk_templates))
999

    
1000
    self._SetVgName(feedback_fn)
1001

    
1002
    if self.op.drbd_helper is not None:
1003
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1004
        feedback_fn("Note that you specified a drbd user helper, but did"
1005
                    " enabled the drbd disk template.")
1006
      new_helper = self.op.drbd_helper
1007
      if not new_helper:
1008
        new_helper = None
1009
      if new_helper != self.cfg.GetDRBDHelper():
1010
        self.cfg.SetDRBDHelper(new_helper)
1011
      else:
1012
        feedback_fn("Cluster DRBD helper already in desired state,"
1013
                    " not changing")
1014
    if self.op.hvparams:
1015
      self.cluster.hvparams = self.new_hvparams
1016
    if self.op.os_hvp:
1017
      self.cluster.os_hvp = self.new_os_hvp
1018
    if self.op.enabled_hypervisors is not None:
1019
      self.cluster.hvparams = self.new_hvparams
1020
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1021
    if self.op.beparams:
1022
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1023
    if self.op.nicparams:
1024
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1025
    if self.op.ipolicy:
1026
      self.cluster.ipolicy = self.new_ipolicy
1027
    if self.op.osparams:
1028
      self.cluster.osparams = self.new_osp
1029
    if self.op.ndparams:
1030
      self.cluster.ndparams = self.new_ndparams
1031
    if self.op.diskparams:
1032
      self.cluster.diskparams = self.new_diskparams
1033
    if self.op.hv_state:
1034
      self.cluster.hv_state_static = self.new_hv_state
1035
    if self.op.disk_state:
1036
      self.cluster.disk_state_static = self.new_disk_state
1037

    
1038
    if self.op.candidate_pool_size is not None:
1039
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1040
      # we need to update the pool size here, otherwise the save will fail
1041
      AdjustCandidatePool(self, [])
1042

    
1043
    if self.op.maintain_node_health is not None:
1044
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1045
        feedback_fn("Note: CONFD was disabled at build time, node health"
1046
                    " maintenance is not useful (still enabling it)")
1047
      self.cluster.maintain_node_health = self.op.maintain_node_health
1048

    
1049
    if self.op.prealloc_wipe_disks is not None:
1050
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1051

    
1052
    if self.op.add_uids is not None:
1053
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1054

    
1055
    if self.op.remove_uids is not None:
1056
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1057

    
1058
    if self.op.uid_pool is not None:
1059
      self.cluster.uid_pool = self.op.uid_pool
1060

    
1061
    if self.op.default_iallocator is not None:
1062
      self.cluster.default_iallocator = self.op.default_iallocator
1063

    
1064
    if self.op.reserved_lvs is not None:
1065
      self.cluster.reserved_lvs = self.op.reserved_lvs
1066

    
1067
    if self.op.use_external_mip_script is not None:
1068
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1069

    
1070
    def helper_os(aname, mods, desc):
1071
      desc += " OS list"
1072
      lst = getattr(self.cluster, aname)
1073
      for key, val in mods:
1074
        if key == constants.DDM_ADD:
1075
          if val in lst:
1076
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1077
          else:
1078
            lst.append(val)
1079
        elif key == constants.DDM_REMOVE:
1080
          if val in lst:
1081
            lst.remove(val)
1082
          else:
1083
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1084
        else:
1085
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1086

    
1087
    if self.op.hidden_os:
1088
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1089

    
1090
    if self.op.blacklisted_os:
1091
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1092

    
1093
    if self.op.master_netdev:
1094
      master_params = self.cfg.GetMasterNetworkParameters()
1095
      ems = self.cfg.GetUseExternalMipScript()
1096
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1097
                  self.cluster.master_netdev)
1098
      result = self.rpc.call_node_deactivate_master_ip(master_params.name,
1099
                                                       master_params, ems)
1100
      result.Raise("Could not disable the master ip")
1101
      feedback_fn("Changing master_netdev from %s to %s" %
1102
                  (master_params.netdev, self.op.master_netdev))
1103
      self.cluster.master_netdev = self.op.master_netdev
1104

    
1105
    if self.op.master_netmask:
1106
      master_params = self.cfg.GetMasterNetworkParameters()
1107
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1108
      result = self.rpc.call_node_change_master_netmask(master_params.name,
1109
                                                        master_params.netmask,
1110
                                                        self.op.master_netmask,
1111
                                                        master_params.ip,
1112
                                                        master_params.netdev)
1113
      result.Warn("Could not change the master IP netmask", feedback_fn)
1114
      self.cluster.master_netmask = self.op.master_netmask
1115

    
1116
    self.cfg.Update(self.cluster, feedback_fn)
1117

    
1118
    if self.op.master_netdev:
1119
      master_params = self.cfg.GetMasterNetworkParameters()
1120
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1121
                  self.op.master_netdev)
1122
      ems = self.cfg.GetUseExternalMipScript()
1123
      result = self.rpc.call_node_activate_master_ip(master_params.name,
1124
                                                     master_params, ems)
1125
      result.Warn("Could not re-enable the master ip on the master,"
1126
                  " please restart manually", self.LogWarning)
1127

    
1128

    
1129
class LUClusterVerify(NoHooksLU):
1130
  """Submits all jobs necessary to verify the cluster.
1131

1132
  """
1133
  REQ_BGL = False
1134

    
1135
  def ExpandNames(self):
1136
    self.needed_locks = {}
1137

    
1138
  def Exec(self, feedback_fn):
1139
    jobs = []
1140

    
1141
    if self.op.group_name:
1142
      groups = [self.op.group_name]
1143
      depends_fn = lambda: None
1144
    else:
1145
      groups = self.cfg.GetNodeGroupList()
1146

    
1147
      # Verify global configuration
1148
      jobs.append([
1149
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1150
        ])
1151

    
1152
      # Always depend on global verification
1153
      depends_fn = lambda: [(-len(jobs), [])]
1154

    
1155
    jobs.extend(
1156
      [opcodes.OpClusterVerifyGroup(group_name=group,
1157
                                    ignore_errors=self.op.ignore_errors,
1158
                                    depends=depends_fn())]
1159
      for group in groups)
1160

    
1161
    # Fix up all parameters
1162
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1163
      op.debug_simulate_errors = self.op.debug_simulate_errors
1164
      op.verbose = self.op.verbose
1165
      op.error_codes = self.op.error_codes
1166
      try:
1167
        op.skip_checks = self.op.skip_checks
1168
      except AttributeError:
1169
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1170

    
1171
    return ResultWithJobs(jobs)
1172

    
1173

    
1174
class _VerifyErrors(object):
1175
  """Mix-in for cluster/group verify LUs.
1176

1177
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1178
  self.op and self._feedback_fn to be available.)
1179

1180
  """
1181

    
1182
  ETYPE_FIELD = "code"
1183
  ETYPE_ERROR = "ERROR"
1184
  ETYPE_WARNING = "WARNING"
1185

    
1186
  def _Error(self, ecode, item, msg, *args, **kwargs):
1187
    """Format an error message.
1188

1189
    Based on the opcode's error_codes parameter, either format a
1190
    parseable error code, or a simpler error string.
1191

1192
    This must be called only from Exec and functions called from Exec.
1193

1194
    """
1195
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1196
    itype, etxt, _ = ecode
1197
    # If the error code is in the list of ignored errors, demote the error to a
1198
    # warning
1199
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1200
      ltype = self.ETYPE_WARNING
1201
    # first complete the msg
1202
    if args:
1203
      msg = msg % args
1204
    # then format the whole message
1205
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1206
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1207
    else:
1208
      if item:
1209
        item = " " + item
1210
      else:
1211
        item = ""
1212
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1213
    # and finally report it via the feedback_fn
1214
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1215
    # do not mark the operation as failed for WARN cases only
1216
    if ltype == self.ETYPE_ERROR:
1217
      self.bad = True
1218

    
1219
  def _ErrorIf(self, cond, *args, **kwargs):
1220
    """Log an error message if the passed condition is True.
1221

1222
    """
1223
    if (bool(cond)
1224
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1225
      self._Error(*args, **kwargs)
1226

    
1227

    
1228
def _VerifyCertificate(filename):
1229
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1230

1231
  @type filename: string
1232
  @param filename: Path to PEM file
1233

1234
  """
1235
  try:
1236
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1237
                                           utils.ReadFile(filename))
1238
  except Exception, err: # pylint: disable=W0703
1239
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1240
            "Failed to load X509 certificate %s: %s" % (filename, err))
1241

    
1242
  (errcode, msg) = \
1243
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1244
                                constants.SSL_CERT_EXPIRATION_ERROR)
1245

    
1246
  if msg:
1247
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1248
  else:
1249
    fnamemsg = None
1250

    
1251
  if errcode is None:
1252
    return (None, fnamemsg)
1253
  elif errcode == utils.CERT_WARNING:
1254
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1255
  elif errcode == utils.CERT_ERROR:
1256
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1257

    
1258
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1259

    
1260

    
1261
def _GetAllHypervisorParameters(cluster, instances):
1262
  """Compute the set of all hypervisor parameters.
1263

1264
  @type cluster: L{objects.Cluster}
1265
  @param cluster: the cluster object
1266
  @param instances: list of L{objects.Instance}
1267
  @param instances: additional instances from which to obtain parameters
1268
  @rtype: list of (origin, hypervisor, parameters)
1269
  @return: a list with all parameters found, indicating the hypervisor they
1270
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1271

1272
  """
1273
  hvp_data = []
1274

    
1275
  for hv_name in cluster.enabled_hypervisors:
1276
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1277

    
1278
  for os_name, os_hvp in cluster.os_hvp.items():
1279
    for hv_name, hv_params in os_hvp.items():
1280
      if hv_params:
1281
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1282
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1283

    
1284
  # TODO: collapse identical parameter values in a single one
1285
  for instance in instances:
1286
    if instance.hvparams:
1287
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1288
                       cluster.FillHV(instance)))
1289

    
1290
  return hvp_data
1291

    
1292

    
1293
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1294
  """Verifies the cluster config.
1295

1296
  """
1297
  REQ_BGL = False
1298

    
1299
  def _VerifyHVP(self, hvp_data):
1300
    """Verifies locally the syntax of the hypervisor parameters.
1301

1302
    """
1303
    for item, hv_name, hv_params in hvp_data:
1304
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1305
             (item, hv_name))
1306
      try:
1307
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1308
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1309
        hv_class.CheckParameterSyntax(hv_params)
1310
      except errors.GenericError, err:
1311
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1312

    
1313
  def ExpandNames(self):
1314
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1315
    self.share_locks = ShareAll()
1316

    
1317
  def CheckPrereq(self):
1318
    """Check prerequisites.
1319

1320
    """
1321
    # Retrieve all information
1322
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1323
    self.all_node_info = self.cfg.GetAllNodesInfo()
1324
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1325

    
1326
  def Exec(self, feedback_fn):
1327
    """Verify integrity of cluster, performing various test on nodes.
1328

1329
    """
1330
    self.bad = False
1331
    self._feedback_fn = feedback_fn
1332

    
1333
    feedback_fn("* Verifying cluster config")
1334

    
1335
    for msg in self.cfg.VerifyConfig():
1336
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1337

    
1338
    feedback_fn("* Verifying cluster certificate files")
1339

    
1340
    for cert_filename in pathutils.ALL_CERT_FILES:
1341
      (errcode, msg) = _VerifyCertificate(cert_filename)
1342
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1343

    
1344
    feedback_fn("* Verifying hypervisor parameters")
1345

    
1346
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1347
                                                self.all_inst_info.values()))
1348

    
1349
    feedback_fn("* Verifying all nodes belong to an existing group")
1350

    
1351
    # We do this verification here because, should this bogus circumstance
1352
    # occur, it would never be caught by VerifyGroup, which only acts on
1353
    # nodes/instances reachable from existing node groups.
1354

    
1355
    dangling_nodes = set(node.name for node in self.all_node_info.values()
1356
                         if node.group not in self.all_group_info)
1357

    
1358
    dangling_instances = {}
1359
    no_node_instances = []
1360

    
1361
    for inst in self.all_inst_info.values():
1362
      if inst.primary_node in dangling_nodes:
1363
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1364
      elif inst.primary_node not in self.all_node_info:
1365
        no_node_instances.append(inst.name)
1366

    
1367
    pretty_dangling = [
1368
        "%s (%s)" %
1369
        (node.name,
1370
         utils.CommaJoin(dangling_instances.get(node.name,
1371
                                                ["no instances"])))
1372
        for node in dangling_nodes]
1373

    
1374
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1375
                  None,
1376
                  "the following nodes (and their instances) belong to a non"
1377
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1378

    
1379
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1380
                  None,
1381
                  "the following instances have a non-existing primary-node:"
1382
                  " %s", utils.CommaJoin(no_node_instances))
1383

    
1384
    return not self.bad
1385

    
1386

    
1387
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1388
  """Verifies the status of a node group.
1389

1390
  """
1391
  HPATH = "cluster-verify"
1392
  HTYPE = constants.HTYPE_CLUSTER
1393
  REQ_BGL = False
1394

    
1395
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1396

    
1397
  class NodeImage(object):
1398
    """A class representing the logical and physical status of a node.
1399

1400
    @type name: string
1401
    @ivar name: the node name to which this object refers
1402
    @ivar volumes: a structure as returned from
1403
        L{ganeti.backend.GetVolumeList} (runtime)
1404
    @ivar instances: a list of running instances (runtime)
1405
    @ivar pinst: list of configured primary instances (config)
1406
    @ivar sinst: list of configured secondary instances (config)
1407
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1408
        instances for which this node is secondary (config)
1409
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1410
    @ivar dfree: free disk, as reported by the node (runtime)
1411
    @ivar offline: the offline status (config)
1412
    @type rpc_fail: boolean
1413
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1414
        not whether the individual keys were correct) (runtime)
1415
    @type lvm_fail: boolean
1416
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1417
    @type hyp_fail: boolean
1418
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1419
    @type ghost: boolean
1420
    @ivar ghost: whether this is a known node or not (config)
1421
    @type os_fail: boolean
1422
    @ivar os_fail: whether the RPC call didn't return valid OS data
1423
    @type oslist: list
1424
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1425
    @type vm_capable: boolean
1426
    @ivar vm_capable: whether the node can host instances
1427
    @type pv_min: float
1428
    @ivar pv_min: size in MiB of the smallest PVs
1429
    @type pv_max: float
1430
    @ivar pv_max: size in MiB of the biggest PVs
1431

1432
    """
1433
    def __init__(self, offline=False, name=None, vm_capable=True):
1434
      self.name = name
1435
      self.volumes = {}
1436
      self.instances = []
1437
      self.pinst = []
1438
      self.sinst = []
1439
      self.sbp = {}
1440
      self.mfree = 0
1441
      self.dfree = 0
1442
      self.offline = offline
1443
      self.vm_capable = vm_capable
1444
      self.rpc_fail = False
1445
      self.lvm_fail = False
1446
      self.hyp_fail = False
1447
      self.ghost = False
1448
      self.os_fail = False
1449
      self.oslist = {}
1450
      self.pv_min = None
1451
      self.pv_max = None
1452

    
1453
  def ExpandNames(self):
1454
    # This raises errors.OpPrereqError on its own:
1455
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1456

    
1457
    # Get instances in node group; this is unsafe and needs verification later
1458
    inst_names = \
1459
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1460

    
1461
    self.needed_locks = {
1462
      locking.LEVEL_INSTANCE: inst_names,
1463
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1464
      locking.LEVEL_NODE: [],
1465

    
1466
      # This opcode is run by watcher every five minutes and acquires all nodes
1467
      # for a group. It doesn't run for a long time, so it's better to acquire
1468
      # the node allocation lock as well.
1469
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1470
      }
1471

    
1472
    self.share_locks = ShareAll()
1473

    
1474
  def DeclareLocks(self, level):
1475
    if level == locking.LEVEL_NODE:
1476
      # Get members of node group; this is unsafe and needs verification later
1477
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1478

    
1479
      all_inst_info = self.cfg.GetAllInstancesInfo()
1480

    
1481
      # In Exec(), we warn about mirrored instances that have primary and
1482
      # secondary living in separate node groups. To fully verify that
1483
      # volumes for these instances are healthy, we will need to do an
1484
      # extra call to their secondaries. We ensure here those nodes will
1485
      # be locked.
1486
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1487
        # Important: access only the instances whose lock is owned
1488
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1489
          nodes.update(all_inst_info[inst].secondary_nodes)
1490

    
1491
      self.needed_locks[locking.LEVEL_NODE] = nodes
1492

    
1493
  def CheckPrereq(self):
1494
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1495
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1496

    
1497
    group_nodes = set(self.group_info.members)
1498
    group_instances = \
1499
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1500

    
1501
    unlocked_nodes = \
1502
        group_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1503

    
1504
    unlocked_instances = \
1505
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1506

    
1507
    if unlocked_nodes:
1508
      raise errors.OpPrereqError("Missing lock for nodes: %s" %
1509
                                 utils.CommaJoin(unlocked_nodes),
1510
                                 errors.ECODE_STATE)
1511

    
1512
    if unlocked_instances:
1513
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1514
                                 utils.CommaJoin(unlocked_instances),
1515
                                 errors.ECODE_STATE)
1516

    
1517
    self.all_node_info = self.cfg.GetAllNodesInfo()
1518
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1519

    
1520
    self.my_node_names = utils.NiceSort(group_nodes)
1521
    self.my_inst_names = utils.NiceSort(group_instances)
1522

    
1523
    self.my_node_info = dict((name, self.all_node_info[name])
1524
                             for name in self.my_node_names)
1525

    
1526
    self.my_inst_info = dict((name, self.all_inst_info[name])
1527
                             for name in self.my_inst_names)
1528

    
1529
    # We detect here the nodes that will need the extra RPC calls for verifying
1530
    # split LV volumes; they should be locked.
1531
    extra_lv_nodes = set()
1532

    
1533
    for inst in self.my_inst_info.values():
1534
      if inst.disk_template in constants.DTS_INT_MIRROR:
1535
        for nname in inst.all_nodes:
1536
          if self.all_node_info[nname].group != self.group_uuid:
1537
            extra_lv_nodes.add(nname)
1538

    
1539
    unlocked_lv_nodes = \
1540
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1541

    
1542
    if unlocked_lv_nodes:
1543
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1544
                                 utils.CommaJoin(unlocked_lv_nodes),
1545
                                 errors.ECODE_STATE)
1546
    self.extra_lv_nodes = list(extra_lv_nodes)
1547

    
1548
  def _VerifyNode(self, ninfo, nresult):
1549
    """Perform some basic validation on data returned from a node.
1550

1551
      - check the result data structure is well formed and has all the
1552
        mandatory fields
1553
      - check ganeti version
1554

1555
    @type ninfo: L{objects.Node}
1556
    @param ninfo: the node to check
1557
    @param nresult: the results from the node
1558
    @rtype: boolean
1559
    @return: whether overall this call was successful (and we can expect
1560
         reasonable values in the respose)
1561

1562
    """
1563
    node = ninfo.name
1564
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1565

    
1566
    # main result, nresult should be a non-empty dict
1567
    test = not nresult or not isinstance(nresult, dict)
1568
    _ErrorIf(test, constants.CV_ENODERPC, node,
1569
                  "unable to verify node: no data returned")
1570
    if test:
1571
      return False
1572

    
1573
    # compares ganeti version
1574
    local_version = constants.PROTOCOL_VERSION
1575
    remote_version = nresult.get("version", None)
1576
    test = not (remote_version and
1577
                isinstance(remote_version, (list, tuple)) and
1578
                len(remote_version) == 2)
1579
    _ErrorIf(test, constants.CV_ENODERPC, node,
1580
             "connection to node returned invalid data")
1581
    if test:
1582
      return False
1583

    
1584
    test = local_version != remote_version[0]
1585
    _ErrorIf(test, constants.CV_ENODEVERSION, node,
1586
             "incompatible protocol versions: master %s,"
1587
             " node %s", local_version, remote_version[0])
1588
    if test:
1589
      return False
1590

    
1591
    # node seems compatible, we can actually try to look into its results
1592

    
1593
    # full package version
1594
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1595
                  constants.CV_ENODEVERSION, node,
1596
                  "software version mismatch: master %s, node %s",
1597
                  constants.RELEASE_VERSION, remote_version[1],
1598
                  code=self.ETYPE_WARNING)
1599

    
1600
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1601
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1602
      for hv_name, hv_result in hyp_result.iteritems():
1603
        test = hv_result is not None
1604
        _ErrorIf(test, constants.CV_ENODEHV, node,
1605
                 "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1606

    
1607
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1608
    if ninfo.vm_capable and isinstance(hvp_result, list):
1609
      for item, hv_name, hv_result in hvp_result:
1610
        _ErrorIf(True, constants.CV_ENODEHV, node,
1611
                 "hypervisor %s parameter verify failure (source %s): %s",
1612
                 hv_name, item, hv_result)
1613

    
1614
    test = nresult.get(constants.NV_NODESETUP,
1615
                       ["Missing NODESETUP results"])
1616
    _ErrorIf(test, constants.CV_ENODESETUP, node, "node setup error: %s",
1617
             "; ".join(test))
1618

    
1619
    return True
1620

    
1621
  def _VerifyNodeTime(self, ninfo, nresult,
1622
                      nvinfo_starttime, nvinfo_endtime):
1623
    """Check the node time.
1624

1625
    @type ninfo: L{objects.Node}
1626
    @param ninfo: the node to check
1627
    @param nresult: the remote results for the node
1628
    @param nvinfo_starttime: the start time of the RPC call
1629
    @param nvinfo_endtime: the end time of the RPC call
1630

1631
    """
1632
    node = ninfo.name
1633
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1634

    
1635
    ntime = nresult.get(constants.NV_TIME, None)
1636
    try:
1637
      ntime_merged = utils.MergeTime(ntime)
1638
    except (ValueError, TypeError):
1639
      _ErrorIf(True, constants.CV_ENODETIME, node, "Node returned invalid time")
1640
      return
1641

    
1642
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1643
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1644
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1645
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1646
    else:
1647
      ntime_diff = None
1648

    
1649
    _ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, node,
1650
             "Node time diverges by at least %s from master node time",
1651
             ntime_diff)
1652

    
1653
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1654
    """Check the node LVM results and update info for cross-node checks.
1655

1656
    @type ninfo: L{objects.Node}
1657
    @param ninfo: the node to check
1658
    @param nresult: the remote results for the node
1659
    @param vg_name: the configured VG name
1660
    @type nimg: L{NodeImage}
1661
    @param nimg: node image
1662

1663
    """
1664
    if vg_name is None:
1665
      return
1666

    
1667
    node = ninfo.name
1668
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1669

    
1670
    # checks vg existence and size > 20G
1671
    vglist = nresult.get(constants.NV_VGLIST, None)
1672
    test = not vglist
1673
    _ErrorIf(test, constants.CV_ENODELVM, node, "unable to check volume groups")
1674
    if not test:
1675
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1676
                                            constants.MIN_VG_SIZE)
1677
      _ErrorIf(vgstatus, constants.CV_ENODELVM, node, vgstatus)
1678

    
1679
    # Check PVs
1680
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1681
    for em in errmsgs:
1682
      self._Error(constants.CV_ENODELVM, node, em)
1683
    if pvminmax is not None:
1684
      (nimg.pv_min, nimg.pv_max) = pvminmax
1685

    
1686
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1687
    """Check cross-node DRBD version consistency.
1688

1689
    @type node_verify_infos: dict
1690
    @param node_verify_infos: infos about nodes as returned from the
1691
      node_verify call.
1692

1693
    """
1694
    node_versions = {}
1695
    for node, ndata in node_verify_infos.items():
1696
      nresult = ndata.payload
1697
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1698
      node_versions[node] = version
1699

    
1700
    if len(set(node_versions.values())) > 1:
1701
      for node, version in sorted(node_versions.items()):
1702
        msg = "DRBD version mismatch: %s" % version
1703
        self._Error(constants.CV_ENODEDRBDHELPER, node, msg,
1704
                    code=self.ETYPE_WARNING)
1705

    
1706
  def _VerifyGroupLVM(self, node_image, vg_name):
1707
    """Check cross-node consistency in LVM.
1708

1709
    @type node_image: dict
1710
    @param node_image: info about nodes, mapping from node to names to
1711
      L{NodeImage} objects
1712
    @param vg_name: the configured VG name
1713

1714
    """
1715
    if vg_name is None:
1716
      return
1717

    
1718
    # Only exlcusive storage needs this kind of checks
1719
    if not self._exclusive_storage:
1720
      return
1721

    
1722
    # exclusive_storage wants all PVs to have the same size (approximately),
1723
    # if the smallest and the biggest ones are okay, everything is fine.
1724
    # pv_min is None iff pv_max is None
1725
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1726
    if not vals:
1727
      return
1728
    (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1729
    (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1730
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1731
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1732
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1733
                  " on %s, biggest (%s MB) is on %s",
1734
                  pvmin, minnode, pvmax, maxnode)
1735

    
1736
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1737
    """Check the node bridges.
1738

1739
    @type ninfo: L{objects.Node}
1740
    @param ninfo: the node to check
1741
    @param nresult: the remote results for the node
1742
    @param bridges: the expected list of bridges
1743

1744
    """
1745
    if not bridges:
1746
      return
1747

    
1748
    node = ninfo.name
1749
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1750

    
1751
    missing = nresult.get(constants.NV_BRIDGES, None)
1752
    test = not isinstance(missing, list)
1753
    _ErrorIf(test, constants.CV_ENODENET, node,
1754
             "did not return valid bridge information")
1755
    if not test:
1756
      _ErrorIf(bool(missing), constants.CV_ENODENET, node,
1757
               "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1758

    
1759
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1760
    """Check the results of user scripts presence and executability on the node
1761

1762
    @type ninfo: L{objects.Node}
1763
    @param ninfo: the node to check
1764
    @param nresult: the remote results for the node
1765

1766
    """
1767
    node = ninfo.name
1768

    
1769
    test = not constants.NV_USERSCRIPTS in nresult
1770
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, node,
1771
                  "did not return user scripts information")
1772

    
1773
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1774
    if not test:
1775
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, node,
1776
                    "user scripts not present or not executable: %s" %
1777
                    utils.CommaJoin(sorted(broken_scripts)))
1778

    
1779
  def _VerifyNodeNetwork(self, ninfo, nresult):
1780
    """Check the node network connectivity results.
1781

1782
    @type ninfo: L{objects.Node}
1783
    @param ninfo: the node to check
1784
    @param nresult: the remote results for the node
1785

1786
    """
1787
    node = ninfo.name
1788
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1789

    
1790
    test = constants.NV_NODELIST not in nresult
1791
    _ErrorIf(test, constants.CV_ENODESSH, node,
1792
             "node hasn't returned node ssh connectivity data")
1793
    if not test:
1794
      if nresult[constants.NV_NODELIST]:
1795
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1796
          _ErrorIf(True, constants.CV_ENODESSH, node,
1797
                   "ssh communication with node '%s': %s", a_node, a_msg)
1798

    
1799
    test = constants.NV_NODENETTEST not in nresult
1800
    _ErrorIf(test, constants.CV_ENODENET, node,
1801
             "node hasn't returned node tcp connectivity data")
1802
    if not test:
1803
      if nresult[constants.NV_NODENETTEST]:
1804
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1805
        for anode in nlist:
1806
          _ErrorIf(True, constants.CV_ENODENET, node,
1807
                   "tcp communication with node '%s': %s",
1808
                   anode, nresult[constants.NV_NODENETTEST][anode])
1809

    
1810
    test = constants.NV_MASTERIP not in nresult
1811
    _ErrorIf(test, constants.CV_ENODENET, node,
1812
             "node hasn't returned node master IP reachability data")
1813
    if not test:
1814
      if not nresult[constants.NV_MASTERIP]:
1815
        if node == self.master_node:
1816
          msg = "the master node cannot reach the master IP (not configured?)"
1817
        else:
1818
          msg = "cannot reach the master IP"
1819
        _ErrorIf(True, constants.CV_ENODENET, node, msg)
1820

    
1821
  def _VerifyInstance(self, instance, inst_config, node_image,
1822
                      diskstatus):
1823
    """Verify an instance.
1824

1825
    This function checks to see if the required block devices are
1826
    available on the instance's node, and that the nodes are in the correct
1827
    state.
1828

1829
    """
1830
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
1831
    pnode = inst_config.primary_node
1832
    pnode_img = node_image[pnode]
1833
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1834

    
1835
    node_vol_should = {}
1836
    inst_config.MapLVsByNode(node_vol_should)
1837

    
1838
    cluster = self.cfg.GetClusterInfo()
1839
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1840
                                                            self.group_info)
1841
    err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1842
    _ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance, utils.CommaJoin(err),
1843
             code=self.ETYPE_WARNING)
1844

    
1845
    for node in node_vol_should:
1846
      n_img = node_image[node]
1847
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1848
        # ignore missing volumes on offline or broken nodes
1849
        continue
1850
      for volume in node_vol_should[node]:
1851
        test = volume not in n_img.volumes
1852
        _ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1853
                 "volume %s missing on node %s", volume, node)
1854

    
1855
    if inst_config.admin_state == constants.ADMINST_UP:
1856
      test = instance not in pnode_img.instances and not pnode_img.offline
1857
      _ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1858
               "instance not running on its primary node %s",
1859
               pnode)
1860
      _ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1861
               "instance is marked as running and lives on offline node %s",
1862
               pnode)
1863

    
1864
    diskdata = [(nname, success, status, idx)
1865
                for (nname, disks) in diskstatus.items()
1866
                for idx, (success, status) in enumerate(disks)]
1867

    
1868
    for nname, success, bdev_status, idx in diskdata:
1869
      # the 'ghost node' construction in Exec() ensures that we have a
1870
      # node here
1871
      snode = node_image[nname]
1872
      bad_snode = snode.ghost or snode.offline
1873
      _ErrorIf(inst_config.disks_active and
1874
               not success and not bad_snode,
1875
               constants.CV_EINSTANCEFAULTYDISK, instance,
1876
               "couldn't retrieve status for disk/%s on %s: %s",
1877
               idx, nname, bdev_status)
1878
      _ErrorIf((inst_config.disks_active and
1879
                success and bdev_status.ldisk_status == constants.LDS_FAULTY),
1880
               constants.CV_EINSTANCEFAULTYDISK, instance,
1881
               "disk/%s on %s is faulty", idx, nname)
1882

    
1883
    _ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1884
             constants.CV_ENODERPC, pnode, "instance %s, connection to"
1885
             " primary node failed", instance)
1886

    
1887
    _ErrorIf(len(inst_config.secondary_nodes) > 1,
1888
             constants.CV_EINSTANCELAYOUT,
1889
             instance, "instance has multiple secondary nodes: %s",
1890
             utils.CommaJoin(inst_config.secondary_nodes),
1891
             code=self.ETYPE_WARNING)
1892

    
1893
    es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg,
1894
                                                   inst_config.all_nodes)
1895
    if any(es_flags.values()):
1896
      if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1897
        # Disk template not compatible with exclusive_storage: no instance
1898
        # node should have the flag set
1899
        es_nodes = [n
1900
                    for (n, es) in es_flags.items()
1901
                    if es]
1902
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance,
1903
                    "instance has template %s, which is not supported on nodes"
1904
                    " that have exclusive storage set: %s",
1905
                    inst_config.disk_template, utils.CommaJoin(es_nodes))
1906
      for (idx, disk) in enumerate(inst_config.disks):
1907
        _ErrorIf(disk.spindles is None,
1908
                 constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance,
1909
                 "number of spindles not configured for disk %s while"
1910
                 " exclusive storage is enabled, try running"
1911
                 " gnt-cluster repair-disk-sizes",
1912
                 idx)
1913

    
1914
    if inst_config.disk_template in constants.DTS_INT_MIRROR:
1915
      instance_nodes = utils.NiceSort(inst_config.all_nodes)
1916
      instance_groups = {}
1917

    
1918
      for node in instance_nodes:
1919
        instance_groups.setdefault(self.all_node_info[node].group,
1920
                                   []).append(node)
1921

    
1922
      pretty_list = [
1923
        "%s (group %s)" % (utils.CommaJoin(nodes), groupinfo[group].name)
1924
        # Sort so that we always list the primary node first.
1925
        for group, nodes in sorted(instance_groups.items(),
1926
                                   key=lambda (_, nodes): pnode in nodes,
1927
                                   reverse=True)]
1928

    
1929
      self._ErrorIf(len(instance_groups) > 1,
1930
                    constants.CV_EINSTANCESPLITGROUPS,
1931
                    instance, "instance has primary and secondary nodes in"
1932
                    " different groups: %s", utils.CommaJoin(pretty_list),
1933
                    code=self.ETYPE_WARNING)
1934

    
1935
    inst_nodes_offline = []
1936
    for snode in inst_config.secondary_nodes:
1937
      s_img = node_image[snode]
1938
      _ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1939
               snode, "instance %s, connection to secondary node failed",
1940
               instance)
1941

    
1942
      if s_img.offline:
1943
        inst_nodes_offline.append(snode)
1944

    
1945
    # warn that the instance lives on offline nodes
1946
    _ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1947
             "instance has offline secondary node(s) %s",
1948
             utils.CommaJoin(inst_nodes_offline))
1949
    # ... or ghost/non-vm_capable nodes
1950
    for node in inst_config.all_nodes:
1951
      _ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1952
               instance, "instance lives on ghost node %s", node)
1953
      _ErrorIf(not node_image[node].vm_capable, constants.CV_EINSTANCEBADNODE,
1954
               instance, "instance lives on non-vm_capable node %s", node)
1955

    
1956
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1957
    """Verify if there are any unknown volumes in the cluster.
1958

1959
    The .os, .swap and backup volumes are ignored. All other volumes are
1960
    reported as unknown.
1961

1962
    @type reserved: L{ganeti.utils.FieldSet}
1963
    @param reserved: a FieldSet of reserved volume names
1964

1965
    """
1966
    for node, n_img in node_image.items():
1967
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1968
          self.all_node_info[node].group != self.group_uuid):
1969
        # skip non-healthy nodes
1970
        continue
1971
      for volume in n_img.volumes:
1972
        test = ((node not in node_vol_should or
1973
                volume not in node_vol_should[node]) and
1974
                not reserved.Matches(volume))
1975
        self._ErrorIf(test, constants.CV_ENODEORPHANLV, node,
1976
                      "volume %s is unknown", volume)
1977

    
1978
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1979
    """Verify N+1 Memory Resilience.
1980

1981
    Check that if one single node dies we can still start all the
1982
    instances it was primary for.
1983

1984
    """
1985
    cluster_info = self.cfg.GetClusterInfo()
1986
    for node, n_img in node_image.items():
1987
      # This code checks that every node which is now listed as
1988
      # secondary has enough memory to host all instances it is
1989
      # supposed to should a single other node in the cluster fail.
1990
      # FIXME: not ready for failover to an arbitrary node
1991
      # FIXME: does not support file-backed instances
1992
      # WARNING: we currently take into account down instances as well
1993
      # as up ones, considering that even if they're down someone
1994
      # might want to start them even in the event of a node failure.
1995
      if n_img.offline or self.all_node_info[node].group != self.group_uuid:
1996
        # we're skipping nodes marked offline and nodes in other groups from
1997
        # the N+1 warning, since most likely we don't have good memory
1998
        # infromation from them; we already list instances living on such
1999
        # nodes, and that's enough warning
2000
        continue
2001
      #TODO(dynmem): also consider ballooning out other instances
2002
      for prinode, instances in n_img.sbp.items():
2003
        needed_mem = 0
2004
        for instance in instances:
2005
          bep = cluster_info.FillBE(instance_cfg[instance])
2006
          if bep[constants.BE_AUTO_BALANCE]:
2007
            needed_mem += bep[constants.BE_MINMEM]
2008
        test = n_img.mfree < needed_mem
2009
        self._ErrorIf(test, constants.CV_ENODEN1, node,
2010
                      "not enough memory to accomodate instance failovers"
2011
                      " should node %s fail (%dMiB needed, %dMiB available)",
2012
                      prinode, needed_mem, n_img.mfree)
2013

    
2014
  @classmethod
2015
  def _VerifyFiles(cls, errorif, nodeinfo, master_node, all_nvinfo,
2016
                   (files_all, files_opt, files_mc, files_vm)):
2017
    """Verifies file checksums collected from all nodes.
2018

2019
    @param errorif: Callback for reporting errors
2020
    @param nodeinfo: List of L{objects.Node} objects
2021
    @param master_node: Name of master node
2022
    @param all_nvinfo: RPC results
2023

2024
    """
2025
    # Define functions determining which nodes to consider for a file
2026
    files2nodefn = [
2027
      (files_all, None),
2028
      (files_mc, lambda node: (node.master_candidate or
2029
                               node.name == master_node)),
2030
      (files_vm, lambda node: node.vm_capable),
2031
      ]
2032

    
2033
    # Build mapping from filename to list of nodes which should have the file
2034
    nodefiles = {}
2035
    for (files, fn) in files2nodefn:
2036
      if fn is None:
2037
        filenodes = nodeinfo
2038
      else:
2039
        filenodes = filter(fn, nodeinfo)
2040
      nodefiles.update((filename,
2041
                        frozenset(map(operator.attrgetter("name"), filenodes)))
2042
                       for filename in files)
2043

    
2044
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2045

    
2046
    fileinfo = dict((filename, {}) for filename in nodefiles)
2047
    ignore_nodes = set()
2048

    
2049
    for node in nodeinfo:
2050
      if node.offline:
2051
        ignore_nodes.add(node.name)
2052
        continue
2053

    
2054
      nresult = all_nvinfo[node.name]
2055

    
2056
      if nresult.fail_msg or not nresult.payload:
2057
        node_files = None
2058
      else:
2059
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2060
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2061
                          for (key, value) in fingerprints.items())
2062
        del fingerprints
2063

    
2064
      test = not (node_files and isinstance(node_files, dict))
2065
      errorif(test, constants.CV_ENODEFILECHECK, node.name,
2066
              "Node did not return file checksum data")
2067
      if test:
2068
        ignore_nodes.add(node.name)
2069
        continue
2070

    
2071
      # Build per-checksum mapping from filename to nodes having it
2072
      for (filename, checksum) in node_files.items():
2073
        assert filename in nodefiles
2074
        fileinfo[filename].setdefault(checksum, set()).add(node.name)
2075

    
2076
    for (filename, checksums) in fileinfo.items():
2077
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2078

    
2079
      # Nodes having the file
2080
      with_file = frozenset(node_name
2081
                            for nodes in fileinfo[filename].values()
2082
                            for node_name in nodes) - ignore_nodes
2083

    
2084
      expected_nodes = nodefiles[filename] - ignore_nodes
2085

    
2086
      # Nodes missing file
2087
      missing_file = expected_nodes - with_file
2088

    
2089
      if filename in files_opt:
2090
        # All or no nodes
2091
        errorif(missing_file and missing_file != expected_nodes,
2092
                constants.CV_ECLUSTERFILECHECK, None,
2093
                "File %s is optional, but it must exist on all or no"
2094
                " nodes (not found on %s)",
2095
                filename, utils.CommaJoin(utils.NiceSort(missing_file)))
2096
      else:
2097
        errorif(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2098
                "File %s is missing from node(s) %s", filename,
2099
                utils.CommaJoin(utils.NiceSort(missing_file)))
2100

    
2101
        # Warn if a node has a file it shouldn't
2102
        unexpected = with_file - expected_nodes
2103
        errorif(unexpected,
2104
                constants.CV_ECLUSTERFILECHECK, None,
2105
                "File %s should not exist on node(s) %s",
2106
                filename, utils.CommaJoin(utils.NiceSort(unexpected)))
2107

    
2108
      # See if there are multiple versions of the file
2109
      test = len(checksums) > 1
2110
      if test:
2111
        variants = ["variant %s on %s" %
2112
                    (idx + 1, utils.CommaJoin(utils.NiceSort(nodes)))
2113
                    for (idx, (checksum, nodes)) in
2114
                      enumerate(sorted(checksums.items()))]
2115
      else:
2116
        variants = []
2117

    
2118
      errorif(test, constants.CV_ECLUSTERFILECHECK, None,
2119
              "File %s found with %s different checksums (%s)",
2120
              filename, len(checksums), "; ".join(variants))
2121

    
2122
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2123
                      drbd_map):
2124
    """Verifies and the node DRBD status.
2125

2126
    @type ninfo: L{objects.Node}
2127
    @param ninfo: the node to check
2128
    @param nresult: the remote results for the node
2129
    @param instanceinfo: the dict of instances
2130
    @param drbd_helper: the configured DRBD usermode helper
2131
    @param drbd_map: the DRBD map as returned by
2132
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2133

2134
    """
2135
    node = ninfo.name
2136
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2137

    
2138
    if drbd_helper:
2139
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2140
      test = (helper_result is None)
2141
      _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2142
               "no drbd usermode helper returned")
2143
      if helper_result:
2144
        status, payload = helper_result
2145
        test = not status
2146
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2147
                 "drbd usermode helper check unsuccessful: %s", payload)
2148
        test = status and (payload != drbd_helper)
2149
        _ErrorIf(test, constants.CV_ENODEDRBDHELPER, node,
2150
                 "wrong drbd usermode helper: %s", payload)
2151

    
2152
    # compute the DRBD minors
2153
    node_drbd = {}
2154
    for minor, instance in drbd_map[node].items():
2155
      test = instance not in instanceinfo
2156
      _ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2157
               "ghost instance '%s' in temporary DRBD map", instance)
2158
        # ghost instance should not be running, but otherwise we
2159
        # don't give double warnings (both ghost instance and
2160
        # unallocated minor in use)
2161
      if test:
2162
        node_drbd[minor] = (instance, False)
2163
      else:
2164
        instance = instanceinfo[instance]
2165
        node_drbd[minor] = (instance.name, instance.disks_active)
2166

    
2167
    # and now check them
2168
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2169
    test = not isinstance(used_minors, (tuple, list))
2170
    _ErrorIf(test, constants.CV_ENODEDRBD, node,
2171
             "cannot parse drbd status file: %s", str(used_minors))
2172
    if test:
2173
      # we cannot check drbd status
2174
      return
2175

    
2176
    for minor, (iname, must_exist) in node_drbd.items():
2177
      test = minor not in used_minors and must_exist
2178
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2179
               "drbd minor %d of instance %s is not active", minor, iname)
2180
    for minor in used_minors:
2181
      test = minor not in node_drbd
2182
      _ErrorIf(test, constants.CV_ENODEDRBD, node,
2183
               "unallocated drbd minor %d is in use", minor)
2184

    
2185
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2186
    """Builds the node OS structures.
2187

2188
    @type ninfo: L{objects.Node}
2189
    @param ninfo: the node to check
2190
    @param nresult: the remote results for the node
2191
    @param nimg: the node image object
2192

2193
    """
2194
    node = ninfo.name
2195
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2196

    
2197
    remote_os = nresult.get(constants.NV_OSLIST, None)
2198
    test = (not isinstance(remote_os, list) or
2199
            not compat.all(isinstance(v, list) and len(v) == 7
2200
                           for v in remote_os))
2201

    
2202
    _ErrorIf(test, constants.CV_ENODEOS, node,
2203
             "node hasn't returned valid OS data")
2204

    
2205
    nimg.os_fail = test
2206

    
2207
    if test:
2208
      return
2209

    
2210
    os_dict = {}
2211

    
2212
    for (name, os_path, status, diagnose,
2213
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2214

    
2215
      if name not in os_dict:
2216
        os_dict[name] = []
2217

    
2218
      # parameters is a list of lists instead of list of tuples due to
2219
      # JSON lacking a real tuple type, fix it:
2220
      parameters = [tuple(v) for v in parameters]
2221
      os_dict[name].append((os_path, status, diagnose,
2222
                            set(variants), set(parameters), set(api_ver)))
2223

    
2224
    nimg.oslist = os_dict
2225

    
2226
  def _VerifyNodeOS(self, ninfo, nimg, base):
2227
    """Verifies the node OS list.
2228

2229
    @type ninfo: L{objects.Node}
2230
    @param ninfo: the node to check
2231
    @param nimg: the node image object
2232
    @param base: the 'template' node we match against (e.g. from the master)
2233

2234
    """
2235
    node = ninfo.name
2236
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2237

    
2238
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2239

    
2240
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2241
    for os_name, os_data in nimg.oslist.items():
2242
      assert os_data, "Empty OS status for OS %s?!" % os_name
2243
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2244
      _ErrorIf(not f_status, constants.CV_ENODEOS, node,
2245
               "Invalid OS %s (located at %s): %s", os_name, f_path, f_diag)
2246
      _ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, node,
2247
               "OS '%s' has multiple entries (first one shadows the rest): %s",
2248
               os_name, utils.CommaJoin([v[0] for v in os_data]))
2249
      # comparisons with the 'base' image
2250
      test = os_name not in base.oslist
2251
      _ErrorIf(test, constants.CV_ENODEOS, node,
2252
               "Extra OS %s not present on reference node (%s)",
2253
               os_name, base.name)
2254
      if test:
2255
        continue
2256
      assert base.oslist[os_name], "Base node has empty OS status?"
2257
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2258
      if not b_status:
2259
        # base OS is invalid, skipping
2260
        continue
2261
      for kind, a, b in [("API version", f_api, b_api),
2262
                         ("variants list", f_var, b_var),
2263
                         ("parameters", beautify_params(f_param),
2264
                          beautify_params(b_param))]:
2265
        _ErrorIf(a != b, constants.CV_ENODEOS, node,
2266
                 "OS %s for %s differs from reference node %s: [%s] vs. [%s]",
2267
                 kind, os_name, base.name,
2268
                 utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2269

    
2270
    # check any missing OSes
2271
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2272
    _ErrorIf(missing, constants.CV_ENODEOS, node,
2273
             "OSes present on reference node %s but missing on this node: %s",
2274
             base.name, utils.CommaJoin(missing))
2275

    
2276
  def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2277
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2278

2279
    @type ninfo: L{objects.Node}
2280
    @param ninfo: the node to check
2281
    @param nresult: the remote results for the node
2282
    @type is_master: bool
2283
    @param is_master: Whether node is the master node
2284

2285
    """
2286
    node = ninfo.name
2287

    
2288
    if (is_master and
2289
        (constants.ENABLE_FILE_STORAGE or
2290
         constants.ENABLE_SHARED_FILE_STORAGE)):
2291
      try:
2292
        fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2293
      except KeyError:
2294
        # This should never happen
2295
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, node,
2296
                      "Node did not return forbidden file storage paths")
2297
      else:
2298
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, node,
2299
                      "Found forbidden file storage paths: %s",
2300
                      utils.CommaJoin(fspaths))
2301
    else:
2302
      self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2303
                    constants.CV_ENODEFILESTORAGEPATHS, node,
2304
                    "Node should not have returned forbidden file storage"
2305
                    " paths")
2306

    
2307
  def _VerifyOob(self, ninfo, nresult):
2308
    """Verifies out of band functionality of a node.
2309

2310
    @type ninfo: L{objects.Node}
2311
    @param ninfo: the node to check
2312
    @param nresult: the remote results for the node
2313

2314
    """
2315
    node = ninfo.name
2316
    # We just have to verify the paths on master and/or master candidates
2317
    # as the oob helper is invoked on the master
2318
    if ((ninfo.master_candidate or ninfo.master_capable) and
2319
        constants.NV_OOB_PATHS in nresult):
2320
      for path_result in nresult[constants.NV_OOB_PATHS]:
2321
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH, node, path_result)
2322

    
2323
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2324
    """Verifies and updates the node volume data.
2325

2326
    This function will update a L{NodeImage}'s internal structures
2327
    with data from the remote call.
2328

2329
    @type ninfo: L{objects.Node}
2330
    @param ninfo: the node to check
2331
    @param nresult: the remote results for the node
2332
    @param nimg: the node image object
2333
    @param vg_name: the configured VG name
2334

2335
    """
2336
    node = ninfo.name
2337
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2338

    
2339
    nimg.lvm_fail = True
2340
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2341
    if vg_name is None:
2342
      pass
2343
    elif isinstance(lvdata, basestring):
2344
      _ErrorIf(True, constants.CV_ENODELVM, node, "LVM problem on node: %s",
2345
               utils.SafeEncode(lvdata))
2346
    elif not isinstance(lvdata, dict):
2347
      _ErrorIf(True, constants.CV_ENODELVM, node,
2348
               "rpc call to node failed (lvlist)")
2349
    else:
2350
      nimg.volumes = lvdata
2351
      nimg.lvm_fail = False
2352

    
2353
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2354
    """Verifies and updates the node instance list.
2355

2356
    If the listing was successful, then updates this node's instance
2357
    list. Otherwise, it marks the RPC call as failed for the instance
2358
    list key.
2359

2360
    @type ninfo: L{objects.Node}
2361
    @param ninfo: the node to check
2362
    @param nresult: the remote results for the node
2363
    @param nimg: the node image object
2364

2365
    """
2366
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2367
    test = not isinstance(idata, list)
2368
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2369
                  "rpc call to node failed (instancelist): %s",
2370
                  utils.SafeEncode(str(idata)))
2371
    if test:
2372
      nimg.hyp_fail = True
2373
    else:
2374
      nimg.instances = idata
2375

    
2376
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2377
    """Verifies and computes a node information map
2378

2379
    @type ninfo: L{objects.Node}
2380
    @param ninfo: the node to check
2381
    @param nresult: the remote results for the node
2382
    @param nimg: the node image object
2383
    @param vg_name: the configured VG name
2384

2385
    """
2386
    node = ninfo.name
2387
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2388

    
2389
    # try to read free memory (from the hypervisor)
2390
    hv_info = nresult.get(constants.NV_HVINFO, None)
2391
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2392
    _ErrorIf(test, constants.CV_ENODEHV, node,
2393
             "rpc call to node failed (hvinfo)")
2394
    if not test:
2395
      try:
2396
        nimg.mfree = int(hv_info["memory_free"])
2397
      except (ValueError, TypeError):
2398
        _ErrorIf(True, constants.CV_ENODERPC, node,
2399
                 "node returned invalid nodeinfo, check hypervisor")
2400

    
2401
    # FIXME: devise a free space model for file based instances as well
2402
    if vg_name is not None:
2403
      test = (constants.NV_VGLIST not in nresult or
2404
              vg_name not in nresult[constants.NV_VGLIST])
2405
      _ErrorIf(test, constants.CV_ENODELVM, node,
2406
               "node didn't return data for the volume group '%s'"
2407
               " - it is either missing or broken", vg_name)
2408
      if not test:
2409
        try:
2410
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2411
        except (ValueError, TypeError):
2412
          _ErrorIf(True, constants.CV_ENODERPC, node,
2413
                   "node returned invalid LVM info, check LVM status")
2414

    
2415
  def _CollectDiskInfo(self, nodelist, node_image, instanceinfo):
2416
    """Gets per-disk status information for all instances.
2417

2418
    @type nodelist: list of strings
2419
    @param nodelist: Node names
2420
    @type node_image: dict of (name, L{objects.Node})
2421
    @param node_image: Node objects
2422
    @type instanceinfo: dict of (name, L{objects.Instance})
2423
    @param instanceinfo: Instance objects
2424
    @rtype: {instance: {node: [(succes, payload)]}}
2425
    @return: a dictionary of per-instance dictionaries with nodes as
2426
        keys and disk information as values; the disk information is a
2427
        list of tuples (success, payload)
2428

2429
    """
2430
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2431

    
2432
    node_disks = {}
2433
    node_disks_devonly = {}
2434
    diskless_instances = set()
2435
    diskless = constants.DT_DISKLESS
2436

    
2437
    for nname in nodelist:
2438
      node_instances = list(itertools.chain(node_image[nname].pinst,
2439
                                            node_image[nname].sinst))
2440
      diskless_instances.update(inst for inst in node_instances
2441
                                if instanceinfo[inst].disk_template == diskless)
2442
      disks = [(inst, disk)
2443
               for inst in node_instances
2444
               for disk in instanceinfo[inst].disks]
2445

    
2446
      if not disks:
2447
        # No need to collect data
2448
        continue
2449

    
2450
      node_disks[nname] = disks
2451

    
2452
      # _AnnotateDiskParams makes already copies of the disks
2453
      devonly = []
2454
      for (inst, dev) in disks:
2455
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2456
        self.cfg.SetDiskID(anno_disk, nname)
2457
        devonly.append(anno_disk)
2458

    
2459
      node_disks_devonly[nname] = devonly
2460

    
2461
    assert len(node_disks) == len(node_disks_devonly)
2462

    
2463
    # Collect data from all nodes with disks
2464
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2465
                                                          node_disks_devonly)
2466

    
2467
    assert len(result) == len(node_disks)
2468

    
2469
    instdisk = {}
2470

    
2471
    for (nname, nres) in result.items():
2472
      disks = node_disks[nname]
2473

    
2474
      if nres.offline:
2475
        # No data from this node
2476
        data = len(disks) * [(False, "node offline")]
2477
      else:
2478
        msg = nres.fail_msg
2479
        _ErrorIf(msg, constants.CV_ENODERPC, nname,
2480
                 "while getting disk information: %s", msg)
2481
        if msg:
2482
          # No data from this node
2483
          data = len(disks) * [(False, msg)]
2484
        else:
2485
          data = []
2486
          for idx, i in enumerate(nres.payload):
2487
            if isinstance(i, (tuple, list)) and len(i) == 2:
2488
              data.append(i)
2489
            else:
2490
              logging.warning("Invalid result from node %s, entry %d: %s",
2491
                              nname, idx, i)
2492
              data.append((False, "Invalid result from the remote node"))
2493

    
2494
      for ((inst, _), status) in zip(disks, data):
2495
        instdisk.setdefault(inst, {}).setdefault(nname, []).append(status)
2496

    
2497
    # Add empty entries for diskless instances.
2498
    for inst in diskless_instances:
2499
      assert inst not in instdisk
2500
      instdisk[inst] = {}
2501

    
2502
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2503
                      len(nnames) <= len(instanceinfo[inst].all_nodes) and
2504
                      compat.all(isinstance(s, (tuple, list)) and
2505
                                 len(s) == 2 for s in statuses)
2506
                      for inst, nnames in instdisk.items()
2507
                      for nname, statuses in nnames.items())
2508
    if __debug__:
2509
      instdisk_keys = set(instdisk)
2510
      instanceinfo_keys = set(instanceinfo)
2511
      assert instdisk_keys == instanceinfo_keys, \
2512
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2513
         (instdisk_keys, instanceinfo_keys))
2514

    
2515
    return instdisk
2516

    
2517
  @staticmethod
2518
  def _SshNodeSelector(group_uuid, all_nodes):
2519
    """Create endless iterators for all potential SSH check hosts.
2520

2521
    """
2522
    nodes = [node for node in all_nodes
2523
             if (node.group != group_uuid and
2524
                 not node.offline)]
2525
    keyfunc = operator.attrgetter("group")
2526

    
2527
    return map(itertools.cycle,
2528
               [sorted(map(operator.attrgetter("name"), names))
2529
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2530
                                                  keyfunc)])
2531

    
2532
  @classmethod
2533
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2534
    """Choose which nodes should talk to which other nodes.
2535

2536
    We will make nodes contact all nodes in their group, and one node from
2537
    every other group.
2538

2539
    @warning: This algorithm has a known issue if one node group is much
2540
      smaller than others (e.g. just one node). In such a case all other
2541
      nodes will talk to the single node.
2542

2543
    """
2544
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2545
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2546

    
2547
    return (online_nodes,
2548
            dict((name, sorted([i.next() for i in sel]))
2549
                 for name in online_nodes))
2550

    
2551
  def BuildHooksEnv(self):
2552
    """Build hooks env.
2553

2554
    Cluster-Verify hooks just ran in the post phase and their failure makes
2555
    the output be logged in the verify output and the verification to fail.
2556

2557
    """
2558
    env = {
2559
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2560
      }
2561

    
2562
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2563
               for node in self.my_node_info.values())
2564

    
2565
    return env
2566

    
2567
  def BuildHooksNodes(self):
2568
    """Build hooks nodes.
2569

2570
    """
2571
    return ([], self.my_node_names)
2572

    
2573
  def Exec(self, feedback_fn):
2574
    """Verify integrity of the node group, performing various test on nodes.
2575

2576
    """
2577
    # This method has too many local variables. pylint: disable=R0914
2578
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2579

    
2580
    if not self.my_node_names:
2581
      # empty node group
2582
      feedback_fn("* Empty node group, skipping verification")
2583
      return True
2584

    
2585
    self.bad = False
2586
    _ErrorIf = self._ErrorIf # pylint: disable=C0103
2587
    verbose = self.op.verbose
2588
    self._feedback_fn = feedback_fn
2589

    
2590
    vg_name = self.cfg.GetVGName()
2591
    drbd_helper = self.cfg.GetDRBDHelper()
2592
    cluster = self.cfg.GetClusterInfo()
2593
    hypervisors = cluster.enabled_hypervisors
2594
    node_data_list = [self.my_node_info[name] for name in self.my_node_names]
2595

    
2596
    i_non_redundant = [] # Non redundant instances
2597
    i_non_a_balanced = [] # Non auto-balanced instances
2598
    i_offline = 0 # Count of offline instances
2599
    n_offline = 0 # Count of offline nodes
2600
    n_drained = 0 # Count of nodes being drained
2601
    node_vol_should = {}
2602

    
2603
    # FIXME: verify OS list
2604

    
2605
    # File verification
2606
    filemap = ComputeAncillaryFiles(cluster, False)
2607

    
2608
    # do local checksums
2609
    master_node = self.master_node = self.cfg.GetMasterNode()
2610
    master_ip = self.cfg.GetMasterIP()
2611

    
2612
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_names))
2613

    
2614
    user_scripts = []
2615
    if self.cfg.GetUseExternalMipScript():
2616
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2617

    
2618
    node_verify_param = {
2619
      constants.NV_FILELIST:
2620
        map(vcluster.MakeVirtualPath,
2621
            utils.UniqueSequence(filename
2622
                                 for files in filemap
2623
                                 for filename in files)),
2624
      constants.NV_NODELIST:
2625
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2626
                                  self.all_node_info.values()),
2627
      constants.NV_HYPERVISOR: hypervisors,
2628
      constants.NV_HVPARAMS:
2629
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2630
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2631
                                 for node in node_data_list
2632
                                 if not node.offline],
2633
      constants.NV_INSTANCELIST: hypervisors,
2634
      constants.NV_VERSION: None,
2635
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2636
      constants.NV_NODESETUP: None,
2637
      constants.NV_TIME: None,
2638
      constants.NV_MASTERIP: (master_node, master_ip),
2639
      constants.NV_OSLIST: None,
2640
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2641
      constants.NV_USERSCRIPTS: user_scripts,
2642
      }
2643

    
2644
    if vg_name is not None:
2645
      node_verify_param[constants.NV_VGLIST] = None
2646
      node_verify_param[constants.NV_LVLIST] = vg_name
2647
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2648

    
2649
    if drbd_helper:
2650
      node_verify_param[constants.NV_DRBDVERSION] = None
2651
      node_verify_param[constants.NV_DRBDLIST] = None
2652
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2653

    
2654
    if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2655
      # Load file storage paths only from master node
2656
      node_verify_param[constants.NV_FILE_STORAGE_PATHS] = master_node
2657

    
2658
    # bridge checks
2659
    # FIXME: this needs to be changed per node-group, not cluster-wide
2660
    bridges = set()
2661
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2662
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2663
      bridges.add(default_nicpp[constants.NIC_LINK])
2664
    for instance in self.my_inst_info.values():
2665
      for nic in instance.nics:
2666
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2667
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2668
          bridges.add(full_nic[constants.NIC_LINK])
2669

    
2670
    if bridges:
2671
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2672

    
2673
    # Build our expected cluster state
2674
    node_image = dict((node.name, self.NodeImage(offline=node.offline,
2675
                                                 name=node.name,
2676
                                                 vm_capable=node.vm_capable))
2677
                      for node in node_data_list)
2678

    
2679
    # Gather OOB paths
2680
    oob_paths = []
2681
    for node in self.all_node_info.values():
2682
      path = SupportsOob(self.cfg, node)
2683
      if path and path not in oob_paths:
2684
        oob_paths.append(path)
2685

    
2686
    if oob_paths:
2687
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2688

    
2689
    for instance in self.my_inst_names:
2690
      inst_config = self.my_inst_info[instance]
2691
      if inst_config.admin_state == constants.ADMINST_OFFLINE:
2692
        i_offline += 1
2693

    
2694
      for nname in inst_config.all_nodes:
2695
        if nname not in node_image:
2696
          gnode = self.NodeImage(name=nname)
2697
          gnode.ghost = (nname not in self.all_node_info)
2698
          node_image[nname] = gnode
2699

    
2700
      inst_config.MapLVsByNode(node_vol_should)
2701

    
2702
      pnode = inst_config.primary_node
2703
      node_image[pnode].pinst.append(instance)
2704

    
2705
      for snode in inst_config.secondary_nodes:
2706
        nimg = node_image[snode]
2707
        nimg.sinst.append(instance)
2708
        if pnode not in nimg.sbp:
2709
          nimg.sbp[pnode] = []
2710
        nimg.sbp[pnode].append(instance)
2711

    
2712
    es_flags = rpc.GetExclusiveStorageForNodeNames(self.cfg, self.my_node_names)
2713
    # The value of exclusive_storage should be the same across the group, so if
2714
    # it's True for at least a node, we act as if it were set for all the nodes
2715
    self._exclusive_storage = compat.any(es_flags.values())
2716
    if self._exclusive_storage:
2717
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2718

    
2719
    # At this point, we have the in-memory data structures complete,
2720
    # except for the runtime information, which we'll gather next
2721

    
2722
    # Due to the way our RPC system works, exact response times cannot be
2723
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2724
    # time before and after executing the request, we can at least have a time
2725
    # window.
2726
    nvinfo_starttime = time.time()
2727
    all_nvinfo = self.rpc.call_node_verify(self.my_node_names,
2728
                                           node_verify_param,
2729
                                           self.cfg.GetClusterName())
2730
    nvinfo_endtime = time.time()
2731

    
2732
    if self.extra_lv_nodes and vg_name is not None:
2733
      extra_lv_nvinfo = \
2734
          self.rpc.call_node_verify(self.extra_lv_nodes,
2735
                                    {constants.NV_LVLIST: vg_name},
2736
                                    self.cfg.GetClusterName())
2737
    else:
2738
      extra_lv_nvinfo = {}
2739

    
2740
    all_drbd_map = self.cfg.ComputeDRBDMap()
2741

    
2742
    feedback_fn("* Gathering disk information (%s nodes)" %
2743
                len(self.my_node_names))
2744
    instdisk = self._CollectDiskInfo(self.my_node_names, node_image,
2745
                                     self.my_inst_info)
2746

    
2747
    feedback_fn("* Verifying configuration file consistency")
2748

    
2749
    # If not all nodes are being checked, we need to make sure the master node
2750
    # and a non-checked vm_capable node are in the list.
2751
    absent_nodes = set(self.all_node_info).difference(self.my_node_info)
2752
    if absent_nodes:
2753
      vf_nvinfo = all_nvinfo.copy()
2754
      vf_node_info = list(self.my_node_info.values())
2755
      additional_nodes = []
2756
      if master_node not in self.my_node_info:
2757
        additional_nodes.append(master_node)
2758
        vf_node_info.append(self.all_node_info[master_node])
2759
      # Add the first vm_capable node we find which is not included,
2760
      # excluding the master node (which we already have)
2761
      for node in absent_nodes:
2762
        nodeinfo = self.all_node_info[node]
2763
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2764
            node != master_node):
2765
          additional_nodes.append(node)
2766
          vf_node_info.append(self.all_node_info[node])
2767
          break
2768
      key = constants.NV_FILELIST
2769
      vf_nvinfo.update(self.rpc.call_node_verify(additional_nodes,
2770
                                                 {key: node_verify_param[key]},
2771
                                                 self.cfg.GetClusterName()))
2772
    else:
2773
      vf_nvinfo = all_nvinfo
2774
      vf_node_info = self.my_node_info.values()
2775

    
2776
    self._VerifyFiles(_ErrorIf, vf_node_info, master_node, vf_nvinfo, filemap)
2777

    
2778
    feedback_fn("* Verifying node status")
2779

    
2780
    refos_img = None
2781

    
2782
    for node_i in node_data_list:
2783
      node = node_i.name
2784
      nimg = node_image[node]
2785

    
2786
      if node_i.offline:
2787
        if verbose:
2788
          feedback_fn("* Skipping offline node %s" % (node,))
2789
        n_offline += 1
2790
        continue
2791

    
2792
      if node == master_node:
2793
        ntype = "master"
2794
      elif node_i.master_candidate:
2795
        ntype = "master candidate"
2796
      elif node_i.drained:
2797
        ntype = "drained"
2798
        n_drained += 1
2799
      else:
2800
        ntype = "regular"
2801
      if verbose:
2802
        feedback_fn("* Verifying node %s (%s)" % (node, ntype))
2803

    
2804
      msg = all_nvinfo[node].fail_msg
2805
      _ErrorIf(msg, constants.CV_ENODERPC, node, "while contacting node: %s",
2806
               msg)
2807
      if msg:
2808
        nimg.rpc_fail = True
2809
        continue
2810

    
2811
      nresult = all_nvinfo[node].payload
2812

    
2813
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2814
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2815
      self._VerifyNodeNetwork(node_i, nresult)
2816
      self._VerifyNodeUserScripts(node_i, nresult)
2817
      self._VerifyOob(node_i, nresult)
2818
      self._VerifyFileStoragePaths(node_i, nresult,
2819
                                   node == master_node)
2820

    
2821
      if nimg.vm_capable:
2822
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2823
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2824
                             all_drbd_map)
2825

    
2826
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2827
        self._UpdateNodeInstances(node_i, nresult, nimg)
2828
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2829
        self._UpdateNodeOS(node_i, nresult, nimg)
2830

    
2831
        if not nimg.os_fail:
2832
          if refos_img is None:
2833
            refos_img = nimg
2834
          self._VerifyNodeOS(node_i, nimg, refos_img)
2835
        self._VerifyNodeBridges(node_i, nresult, bridges)
2836

    
2837
        # Check whether all running instancies are primary for the node. (This
2838
        # can no longer be done from _VerifyInstance below, since some of the
2839
        # wrong instances could be from other node groups.)
2840
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2841

    
2842
        for inst in non_primary_inst:
2843
          test = inst in self.all_inst_info
2844
          _ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2845
                   "instance should not run on node %s", node_i.name)
2846
          _ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2847
                   "node is running unknown instance %s", inst)
2848

    
2849
    self._VerifyGroupDRBDVersion(all_nvinfo)
2850
    self._VerifyGroupLVM(node_image, vg_name)
2851

    
2852
    for node, result in extra_lv_nvinfo.items():
2853
      self._UpdateNodeVolumes(self.all_node_info[node], result.payload,
2854
                              node_image[node], vg_name)
2855

    
2856
    feedback_fn("* Verifying instance status")
2857
    for instance in self.my_inst_names:
2858
      if verbose:
2859
        feedback_fn("* Verifying instance %s" % instance)
2860
      inst_config = self.my_inst_info[instance]
2861
      self._VerifyInstance(instance, inst_config, node_image,
2862
                           instdisk[instance])
2863

    
2864
      # If the instance is non-redundant we cannot survive losing its primary
2865
      # node, so we are not N+1 compliant.
2866
      if inst_config.disk_template not in constants.DTS_MIRRORED:
2867
        i_non_redundant.append(instance)
2868

    
2869
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2870
        i_non_a_balanced.append(instance)
2871

    
2872
    feedback_fn("* Verifying orphan volumes")
2873
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2874

    
2875
    # We will get spurious "unknown volume" warnings if any node of this group
2876
    # is secondary for an instance whose primary is in another group. To avoid
2877
    # them, we find these instances and add their volumes to node_vol_should.
2878
    for inst in self.all_inst_info.values():
2879
      for secondary in inst.secondary_nodes:
2880
        if (secondary in self.my_node_info
2881
            and inst.name not in self.my_inst_info):
2882
          inst.MapLVsByNode(node_vol_should)
2883
          break
2884

    
2885
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2886

    
2887
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2888
      feedback_fn("* Verifying N+1 Memory redundancy")
2889
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2890

    
2891
    feedback_fn("* Other Notes")
2892
    if i_non_redundant:
2893
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2894
                  % len(i_non_redundant))
2895

    
2896
    if i_non_a_balanced:
2897
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2898
                  % len(i_non_a_balanced))
2899

    
2900
    if i_offline:
2901
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2902

    
2903
    if n_offline:
2904
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2905

    
2906
    if n_drained:
2907
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2908

    
2909
    return not self.bad
2910

    
2911
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2912
    """Analyze the post-hooks' result
2913

2914
    This method analyses the hook result, handles it, and sends some
2915
    nicely-formatted feedback back to the user.
2916

2917
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2918
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2919
    @param hooks_results: the results of the multi-node hooks rpc call
2920
    @param feedback_fn: function used send feedback back to the caller
2921
    @param lu_result: previous Exec result
2922
    @return: the new Exec result, based on the previous result
2923
        and hook results
2924

2925
    """
2926
    # We only really run POST phase hooks, only for non-empty groups,
2927
    # and are only interested in their results
2928
    if not self.my_node_names:
2929
      # empty node group
2930
      pass
2931
    elif phase == constants.HOOKS_PHASE_POST:
2932
      # Used to change hooks' output to proper indentation
2933
      feedback_fn("* Hooks Results")
2934
      assert hooks_results, "invalid result from hooks"
2935

    
2936
      for node_name in hooks_results:
2937
        res = hooks_results[node_name]
2938
        msg = res.fail_msg
2939
        test = msg and not res.offline
2940
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2941
                      "Communication failure in hooks execution: %s", msg)
2942
        if res.offline or msg:
2943
          # No need to investigate payload if node is offline or gave
2944
          # an error.
2945
          continue
2946
        for script, hkr, output in res.payload:
2947
          test = hkr == constants.HKR_FAIL
2948
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2949
                        "Script %s failed, output:", script)
2950
          if test:
2951
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2952
            feedback_fn("%s" % output)
2953
            lu_result = False
2954

    
2955
    return lu_result
2956

    
2957

    
2958
class LUClusterVerifyDisks(NoHooksLU):
2959
  """Verifies the cluster disks status.
2960

2961
  """
2962
  REQ_BGL = False
2963

    
2964
  def ExpandNames(self):
2965
    self.share_locks = ShareAll()
2966
    self.needed_locks = {
2967
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2968
      }
2969

    
2970
  def Exec(self, feedback_fn):
2971
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2972

    
2973
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2974
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2975
                           for group in group_names])