Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 7ad422ec

History | View | Annotate | Download (112.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
170
                                                     master_params, ems)
171
    result.Warn("Error disabling the master IP address", self.LogWarning)
172
    return master_params.uuid
173

    
174

    
175
class LUClusterPostInit(LogicalUnit):
176
  """Logical unit for running hooks after cluster initialization.
177

178
  """
179
  HPATH = "cluster-init"
180
  HTYPE = constants.HTYPE_CLUSTER
181

    
182
  def BuildHooksEnv(self):
183
    """Build hooks env.
184

185
    """
186
    return {
187
      "OP_TARGET": self.cfg.GetClusterName(),
188
      }
189

    
190
  def BuildHooksNodes(self):
191
    """Build hooks nodes.
192

193
    """
194
    return ([], [self.cfg.GetMasterNode()])
195

    
196
  def Exec(self, feedback_fn):
197
    """Nothing to do.
198

199
    """
200
    return True
201

    
202

    
203
class ClusterQuery(QueryBase):
204
  FIELDS = query.CLUSTER_FIELDS
205

    
206
  #: Do not sort (there is only one item)
207
  SORT_FIELD = None
208

    
209
  def ExpandNames(self, lu):
210
    lu.needed_locks = {}
211

    
212
    # The following variables interact with _QueryBase._GetNames
213
    self.wanted = locking.ALL_SET
214
    self.do_locking = self.use_locking
215

    
216
    if self.do_locking:
217
      raise errors.OpPrereqError("Can not use locking for cluster queries",
218
                                 errors.ECODE_INVAL)
219

    
220
  def DeclareLocks(self, lu, level):
221
    pass
222

    
223
  def _GetQueryData(self, lu):
224
    """Computes the list of nodes and their attributes.
225

226
    """
227
    # Locking is not used
228
    assert not (compat.any(lu.glm.is_owned(level)
229
                           for level in locking.LEVELS
230
                           if level != locking.LEVEL_CLUSTER) or
231
                self.do_locking or self.use_locking)
232

    
233
    if query.CQ_CONFIG in self.requested_data:
234
      cluster = lu.cfg.GetClusterInfo()
235
      nodes = lu.cfg.GetAllNodesInfo()
236
    else:
237
      cluster = NotImplemented
238
      nodes = NotImplemented
239

    
240
    if query.CQ_QUEUE_DRAINED in self.requested_data:
241
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
242
    else:
243
      drain_flag = NotImplemented
244

    
245
    if query.CQ_WATCHER_PAUSE in self.requested_data:
246
      master_node_uuid = lu.cfg.GetMasterNode()
247

    
248
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
250
                   lu.cfg.GetMasterNodeName())
251

    
252
      watcher_pause = result.payload
253
    else:
254
      watcher_pause = NotImplemented
255

    
256
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
257

    
258

    
259
class LUClusterQuery(NoHooksLU):
260
  """Query cluster configuration.
261

262
  """
263
  REQ_BGL = False
264

    
265
  def ExpandNames(self):
266
    self.needed_locks = {}
267

    
268
  def Exec(self, feedback_fn):
269
    """Return cluster config.
270

271
    """
272
    cluster = self.cfg.GetClusterInfo()
273
    os_hvp = {}
274

    
275
    # Filter just for enabled hypervisors
276
    for os_name, hv_dict in cluster.os_hvp.items():
277
      os_hvp[os_name] = {}
278
      for hv_name, hv_params in hv_dict.items():
279
        if hv_name in cluster.enabled_hypervisors:
280
          os_hvp[os_name][hv_name] = hv_params
281

    
282
    # Convert ip_family to ip_version
283
    primary_ip_version = constants.IP4_VERSION
284
    if cluster.primary_ip_family == netutils.IP6Address.family:
285
      primary_ip_version = constants.IP6_VERSION
286

    
287
    result = {
288
      "software_version": constants.RELEASE_VERSION,
289
      "protocol_version": constants.PROTOCOL_VERSION,
290
      "config_version": constants.CONFIG_VERSION,
291
      "os_api_version": max(constants.OS_API_VERSIONS),
292
      "export_version": constants.EXPORT_VERSION,
293
      "architecture": runtime.GetArchInfo(),
294
      "name": cluster.cluster_name,
295
      "master": self.cfg.GetMasterNodeName(),
296
      "default_hypervisor": cluster.primary_hypervisor,
297
      "enabled_hypervisors": cluster.enabled_hypervisors,
298
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
299
                        for hypervisor_name in cluster.enabled_hypervisors]),
300
      "os_hvp": os_hvp,
301
      "beparams": cluster.beparams,
302
      "osparams": cluster.osparams,
303
      "ipolicy": cluster.ipolicy,
304
      "nicparams": cluster.nicparams,
305
      "ndparams": cluster.ndparams,
306
      "diskparams": cluster.diskparams,
307
      "candidate_pool_size": cluster.candidate_pool_size,
308
      "master_netdev": cluster.master_netdev,
309
      "master_netmask": cluster.master_netmask,
310
      "use_external_mip_script": cluster.use_external_mip_script,
311
      "volume_group_name": cluster.volume_group_name,
312
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
313
      "file_storage_dir": cluster.file_storage_dir,
314
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
315
      "maintain_node_health": cluster.maintain_node_health,
316
      "ctime": cluster.ctime,
317
      "mtime": cluster.mtime,
318
      "uuid": cluster.uuid,
319
      "tags": list(cluster.GetTags()),
320
      "uid_pool": cluster.uid_pool,
321
      "default_iallocator": cluster.default_iallocator,
322
      "reserved_lvs": cluster.reserved_lvs,
323
      "primary_ip_version": primary_ip_version,
324
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
325
      "hidden_os": cluster.hidden_os,
326
      "blacklisted_os": cluster.blacklisted_os,
327
      "enabled_disk_templates": cluster.enabled_disk_templates,
328
      }
329

    
330
    return result
331

    
332

    
333
class LUClusterRedistConf(NoHooksLU):
334
  """Force the redistribution of cluster configuration.
335

336
  This is a very simple LU.
337

338
  """
339
  REQ_BGL = False
340

    
341
  def ExpandNames(self):
342
    self.needed_locks = {
343
      locking.LEVEL_NODE: locking.ALL_SET,
344
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
345
    }
346
    self.share_locks = ShareAll()
347

    
348
  def Exec(self, feedback_fn):
349
    """Redistribute the configuration.
350

351
    """
352
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
353
    RedistributeAncillaryFiles(self)
354

    
355

    
356
class LUClusterRename(LogicalUnit):
357
  """Rename the cluster.
358

359
  """
360
  HPATH = "cluster-rename"
361
  HTYPE = constants.HTYPE_CLUSTER
362

    
363
  def BuildHooksEnv(self):
364
    """Build hooks env.
365

366
    """
367
    return {
368
      "OP_TARGET": self.cfg.GetClusterName(),
369
      "NEW_NAME": self.op.name,
370
      }
371

    
372
  def BuildHooksNodes(self):
373
    """Build hooks nodes.
374

375
    """
376
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
377

    
378
  def CheckPrereq(self):
379
    """Verify that the passed name is a valid one.
380

381
    """
382
    hostname = netutils.GetHostname(name=self.op.name,
383
                                    family=self.cfg.GetPrimaryIPFamily())
384

    
385
    new_name = hostname.name
386
    self.ip = new_ip = hostname.ip
387
    old_name = self.cfg.GetClusterName()
388
    old_ip = self.cfg.GetMasterIP()
389
    if new_name == old_name and new_ip == old_ip:
390
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
391
                                 " cluster has changed",
392
                                 errors.ECODE_INVAL)
393
    if new_ip != old_ip:
394
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
395
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
396
                                   " reachable on the network" %
397
                                   new_ip, errors.ECODE_NOTUNIQUE)
398

    
399
    self.op.name = new_name
400

    
401
  def Exec(self, feedback_fn):
402
    """Rename the cluster.
403

404
    """
405
    clustername = self.op.name
406
    new_ip = self.ip
407

    
408
    # shutdown the master IP
409
    master_params = self.cfg.GetMasterNetworkParameters()
410
    ems = self.cfg.GetUseExternalMipScript()
411
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
412
                                                     master_params, ems)
413
    result.Raise("Could not disable the master role")
414

    
415
    try:
416
      cluster = self.cfg.GetClusterInfo()
417
      cluster.cluster_name = clustername
418
      cluster.master_ip = new_ip
419
      self.cfg.Update(cluster, feedback_fn)
420

    
421
      # update the known hosts file
422
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
423
      node_list = self.cfg.GetOnlineNodeList()
424
      try:
425
        node_list.remove(master_params.uuid)
426
      except ValueError:
427
        pass
428
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
429
    finally:
430
      master_params.ip = new_ip
431
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
432
                                                     master_params, ems)
433
      result.Warn("Could not re-enable the master role on the master,"
434
                  " please restart manually", self.LogWarning)
435

    
436
    return clustername
437

    
438

    
439
class LUClusterRepairDiskSizes(NoHooksLU):
440
  """Verifies the cluster disks sizes.
441

442
  """
443
  REQ_BGL = False
444

    
445
  def ExpandNames(self):
446
    if self.op.instances:
447
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
448
      # Not getting the node allocation lock as only a specific set of
449
      # instances (and their nodes) is going to be acquired
450
      self.needed_locks = {
451
        locking.LEVEL_NODE_RES: [],
452
        locking.LEVEL_INSTANCE: self.wanted_names,
453
        }
454
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
455
    else:
456
      self.wanted_names = None
457
      self.needed_locks = {
458
        locking.LEVEL_NODE_RES: locking.ALL_SET,
459
        locking.LEVEL_INSTANCE: locking.ALL_SET,
460

    
461
        # This opcode is acquires the node locks for all instances
462
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
463
        }
464

    
465
    self.share_locks = {
466
      locking.LEVEL_NODE_RES: 1,
467
      locking.LEVEL_INSTANCE: 0,
468
      locking.LEVEL_NODE_ALLOC: 1,
469
      }
470

    
471
  def DeclareLocks(self, level):
472
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
473
      self._LockInstancesNodes(primary_only=True, level=level)
474

    
475
  def CheckPrereq(self):
476
    """Check prerequisites.
477

478
    This only checks the optional instance list against the existing names.
479

480
    """
481
    if self.wanted_names is None:
482
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
483

    
484
    self.wanted_instances = \
485
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
486

    
487
  def _EnsureChildSizes(self, disk):
488
    """Ensure children of the disk have the needed disk size.
489

490
    This is valid mainly for DRBD8 and fixes an issue where the
491
    children have smaller disk size.
492

493
    @param disk: an L{ganeti.objects.Disk} object
494

495
    """
496
    if disk.dev_type == constants.LD_DRBD8:
497
      assert disk.children, "Empty children for DRBD8?"
498
      fchild = disk.children[0]
499
      mismatch = fchild.size < disk.size
500
      if mismatch:
501
        self.LogInfo("Child disk has size %d, parent %d, fixing",
502
                     fchild.size, disk.size)
503
        fchild.size = disk.size
504

    
505
      # and we recurse on this child only, not on the metadev
506
      return self._EnsureChildSizes(fchild) or mismatch
507
    else:
508
      return False
509

    
510
  def Exec(self, feedback_fn):
511
    """Verify the size of cluster disks.
512

513
    """
514
    # TODO: check child disks too
515
    # TODO: check differences in size between primary/secondary nodes
516
    per_node_disks = {}
517
    for instance in self.wanted_instances:
518
      pnode = instance.primary_node
519
      if pnode not in per_node_disks:
520
        per_node_disks[pnode] = []
521
      for idx, disk in enumerate(instance.disks):
522
        per_node_disks[pnode].append((instance, idx, disk))
523

    
524
    assert not (frozenset(per_node_disks.keys()) -
525
                self.owned_locks(locking.LEVEL_NODE_RES)), \
526
      "Not owning correct locks"
527
    assert not self.owned_locks(locking.LEVEL_NODE)
528

    
529
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
530
                                               per_node_disks.keys())
531

    
532
    changed = []
533
    for node_uuid, dskl in per_node_disks.items():
534
      newl = [v[2].Copy() for v in dskl]
535
      for dsk in newl:
536
        self.cfg.SetDiskID(dsk, node_uuid)
537
      node_name = self.cfg.GetNodeName(node_uuid)
538
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
539
      if result.fail_msg:
540
        self.LogWarning("Failure in blockdev_getdimensions call to node"
541
                        " %s, ignoring", node_name)
542
        continue
543
      if len(result.payload) != len(dskl):
544
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
545
                        " result.payload=%s", node_name, len(dskl),
546
                        result.payload)
547
        self.LogWarning("Invalid result from node %s, ignoring node results",
548
                        node_name)
549
        continue
550
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
551
        if dimensions is None:
552
          self.LogWarning("Disk %d of instance %s did not return size"
553
                          " information, ignoring", idx, instance.name)
554
          continue
555
        if not isinstance(dimensions, (tuple, list)):
556
          self.LogWarning("Disk %d of instance %s did not return valid"
557
                          " dimension information, ignoring", idx,
558
                          instance.name)
559
          continue
560
        (size, spindles) = dimensions
561
        if not isinstance(size, (int, long)):
562
          self.LogWarning("Disk %d of instance %s did not return valid"
563
                          " size information, ignoring", idx, instance.name)
564
          continue
565
        size = size >> 20
566
        if size != disk.size:
567
          self.LogInfo("Disk %d of instance %s has mismatched size,"
568
                       " correcting: recorded %d, actual %d", idx,
569
                       instance.name, disk.size, size)
570
          disk.size = size
571
          self.cfg.Update(instance, feedback_fn)
572
          changed.append((instance.name, idx, "size", size))
573
        if es_flags[node_uuid]:
574
          if spindles is None:
575
            self.LogWarning("Disk %d of instance %s did not return valid"
576
                            " spindles information, ignoring", idx,
577
                            instance.name)
578
          elif disk.spindles is None or disk.spindles != spindles:
579
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
580
                         " correcting: recorded %s, actual %s",
581
                         idx, instance.name, disk.spindles, spindles)
582
            disk.spindles = spindles
583
            self.cfg.Update(instance, feedback_fn)
584
            changed.append((instance.name, idx, "spindles", disk.spindles))
585
        if self._EnsureChildSizes(disk):
586
          self.cfg.Update(instance, feedback_fn)
587
          changed.append((instance.name, idx, "size", disk.size))
588
    return changed
589

    
590

    
591
def _ValidateNetmask(cfg, netmask):
592
  """Checks if a netmask is valid.
593

594
  @type cfg: L{config.ConfigWriter}
595
  @param cfg: The cluster configuration
596
  @type netmask: int
597
  @param netmask: the netmask to be verified
598
  @raise errors.OpPrereqError: if the validation fails
599

600
  """
601
  ip_family = cfg.GetPrimaryIPFamily()
602
  try:
603
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
604
  except errors.ProgrammerError:
605
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
606
                               ip_family, errors.ECODE_INVAL)
607
  if not ipcls.ValidateNetmask(netmask):
608
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
609
                               (netmask), errors.ECODE_INVAL)
610

    
611

    
612
class LUClusterSetParams(LogicalUnit):
613
  """Change the parameters of the cluster.
614

615
  """
616
  HPATH = "cluster-modify"
617
  HTYPE = constants.HTYPE_CLUSTER
618
  REQ_BGL = False
619

    
620
  def CheckArguments(self):
621
    """Check parameters
622

623
    """
624
    if self.op.uid_pool:
625
      uidpool.CheckUidPool(self.op.uid_pool)
626

    
627
    if self.op.add_uids:
628
      uidpool.CheckUidPool(self.op.add_uids)
629

    
630
    if self.op.remove_uids:
631
      uidpool.CheckUidPool(self.op.remove_uids)
632

    
633
    if self.op.master_netmask is not None:
634
      _ValidateNetmask(self.cfg, self.op.master_netmask)
635

    
636
    if self.op.diskparams:
637
      for dt_params in self.op.diskparams.values():
638
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
639
      try:
640
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
641
      except errors.OpPrereqError, err:
642
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
643
                                   errors.ECODE_INVAL)
644

    
645
  def ExpandNames(self):
646
    # FIXME: in the future maybe other cluster params won't require checking on
647
    # all nodes to be modified.
648
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
649
    # resource locks the right thing, shouldn't it be the BGL instead?
650
    self.needed_locks = {
651
      locking.LEVEL_NODE: locking.ALL_SET,
652
      locking.LEVEL_INSTANCE: locking.ALL_SET,
653
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
654
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
655
    }
656
    self.share_locks = ShareAll()
657

    
658
  def BuildHooksEnv(self):
659
    """Build hooks env.
660

661
    """
662
    return {
663
      "OP_TARGET": self.cfg.GetClusterName(),
664
      "NEW_VG_NAME": self.op.vg_name,
665
      }
666

    
667
  def BuildHooksNodes(self):
668
    """Build hooks nodes.
669

670
    """
671
    mn = self.cfg.GetMasterNode()
672
    return ([mn], [mn])
673

    
674
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
675
                   new_enabled_disk_templates):
676
    """Check the consistency of the vg name on all nodes and in case it gets
677
       unset whether there are instances still using it.
678

679
    """
680
    if self.op.vg_name is not None and not self.op.vg_name:
681
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
682
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
683
                                   " instances exist", errors.ECODE_INVAL)
684

    
685
    if (self.op.vg_name is not None and
686
        utils.IsLvmEnabled(enabled_disk_templates)) or \
687
           (self.cfg.GetVGName() is not None and
688
            utils.LvmGetsEnabled(enabled_disk_templates,
689
                                 new_enabled_disk_templates)):
690
      self._CheckVgNameOnNodes(node_uuids)
691

    
692
  def _CheckVgNameOnNodes(self, node_uuids):
693
    """Check the status of the volume group on each node.
694

695
    """
696
    vglist = self.rpc.call_vg_list(node_uuids)
697
    for node_uuid in node_uuids:
698
      msg = vglist[node_uuid].fail_msg
699
      if msg:
700
        # ignoring down node
701
        self.LogWarning("Error while gathering data on node %s"
702
                        " (ignoring node): %s",
703
                        self.cfg.GetNodeName(node_uuid), msg)
704
        continue
705
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
706
                                            self.op.vg_name,
707
                                            constants.MIN_VG_SIZE)
708
      if vgstatus:
709
        raise errors.OpPrereqError("Error on node '%s': %s" %
710
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
711
                                   errors.ECODE_ENVIRON)
712

    
713
  def _GetEnabledDiskTemplates(self, cluster):
714
    """Determines the enabled disk templates and the subset of disk templates
715
       that are newly enabled by this operation.
716

717
    """
718
    enabled_disk_templates = None
719
    new_enabled_disk_templates = []
720
    if self.op.enabled_disk_templates:
721
      enabled_disk_templates = self.op.enabled_disk_templates
722
      new_enabled_disk_templates = \
723
        list(set(enabled_disk_templates)
724
             - set(cluster.enabled_disk_templates))
725
    else:
726
      enabled_disk_templates = cluster.enabled_disk_templates
727
    return (enabled_disk_templates, new_enabled_disk_templates)
728

    
729
  def CheckPrereq(self):
730
    """Check prerequisites.
731

732
    This checks whether the given params don't conflict and
733
    if the given volume group is valid.
734

735
    """
736
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
737
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
738
        raise errors.OpPrereqError("Cannot disable drbd helper while"
739
                                   " drbd-based instances exist",
740
                                   errors.ECODE_INVAL)
741

    
742
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
743
    self.cluster = cluster = self.cfg.GetClusterInfo()
744

    
745
    vm_capable_node_uuids = [node.uuid
746
                             for node in self.cfg.GetAllNodesInfo().values()
747
                             if node.uuid in node_uuids and node.vm_capable]
748

    
749
    (enabled_disk_templates, new_enabled_disk_templates) = \
750
      self._GetEnabledDiskTemplates(cluster)
751

    
752
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
753
                      new_enabled_disk_templates)
754

    
755
    if self.op.drbd_helper:
756
      # checks given drbd helper on all nodes
757
      helpers = self.rpc.call_drbd_helper(node_uuids)
758
      for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
759
        if ninfo.offline:
760
          self.LogInfo("Not checking drbd helper on offline node %s",
761
                       ninfo.name)
762
          continue
763
        msg = helpers[ninfo.uuid].fail_msg
764
        if msg:
765
          raise errors.OpPrereqError("Error checking drbd helper on node"
766
                                     " '%s': %s" % (ninfo.name, msg),
767
                                     errors.ECODE_ENVIRON)
768
        node_helper = helpers[ninfo.uuid].payload
769
        if node_helper != self.op.drbd_helper:
770
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
771
                                     (ninfo.name, node_helper),
772
                                     errors.ECODE_ENVIRON)
773

    
774
    # validate params changes
775
    if self.op.beparams:
776
      objects.UpgradeBeParams(self.op.beparams)
777
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
778
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
779

    
780
    if self.op.ndparams:
781
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
782
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
783

    
784
      # TODO: we need a more general way to handle resetting
785
      # cluster-level parameters to default values
786
      if self.new_ndparams["oob_program"] == "":
787
        self.new_ndparams["oob_program"] = \
788
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
789

    
790
    if self.op.hv_state:
791
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
792
                                           self.cluster.hv_state_static)
793
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
794
                               for hv, values in new_hv_state.items())
795

    
796
    if self.op.disk_state:
797
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
798
                                               self.cluster.disk_state_static)
799
      self.new_disk_state = \
800
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
801
                            for name, values in svalues.items()))
802
             for storage, svalues in new_disk_state.items())
803

    
804
    if self.op.ipolicy:
805
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
806
                                           group_policy=False)
807

    
808
      all_instances = self.cfg.GetAllInstancesInfo().values()
809
      violations = set()
810
      for group in self.cfg.GetAllNodeGroupsInfo().values():
811
        instances = frozenset([inst for inst in all_instances
812
                               if compat.any(nuuid in group.members
813
                                             for nuuid in inst.all_nodes)])
814
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
815
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
816
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
817
                                           self.cfg)
818
        if new:
819
          violations.update(new)
820

    
821
      if violations:
822
        self.LogWarning("After the ipolicy change the following instances"
823
                        " violate them: %s",
824
                        utils.CommaJoin(utils.NiceSort(violations)))
825

    
826
    if self.op.nicparams:
827
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
828
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
829
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
830
      nic_errors = []
831

    
832
      # check all instances for consistency
833
      for instance in self.cfg.GetAllInstancesInfo().values():
834
        for nic_idx, nic in enumerate(instance.nics):
835
          params_copy = copy.deepcopy(nic.nicparams)
836
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
837

    
838
          # check parameter syntax
839
          try:
840
            objects.NIC.CheckParameterSyntax(params_filled)
841
          except errors.ConfigurationError, err:
842
            nic_errors.append("Instance %s, nic/%d: %s" %
843
                              (instance.name, nic_idx, err))
844

    
845
          # if we're moving instances to routed, check that they have an ip
846
          target_mode = params_filled[constants.NIC_MODE]
847
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
848
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
849
                              " address" % (instance.name, nic_idx))
850
      if nic_errors:
851
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
852
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
853

    
854
    # hypervisor list/parameters
855
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
856
    if self.op.hvparams:
857
      for hv_name, hv_dict in self.op.hvparams.items():
858
        if hv_name not in self.new_hvparams:
859
          self.new_hvparams[hv_name] = hv_dict
860
        else:
861
          self.new_hvparams[hv_name].update(hv_dict)
862

    
863
    # disk template parameters
864
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
865
    if self.op.diskparams:
866
      for dt_name, dt_params in self.op.diskparams.items():
867
        if dt_name not in self.op.diskparams:
868
          self.new_diskparams[dt_name] = dt_params
869
        else:
870
          self.new_diskparams[dt_name].update(dt_params)
871

    
872
    # os hypervisor parameters
873
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
874
    if self.op.os_hvp:
875
      for os_name, hvs in self.op.os_hvp.items():
876
        if os_name not in self.new_os_hvp:
877
          self.new_os_hvp[os_name] = hvs
878
        else:
879
          for hv_name, hv_dict in hvs.items():
880
            if hv_dict is None:
881
              # Delete if it exists
882
              self.new_os_hvp[os_name].pop(hv_name, None)
883
            elif hv_name not in self.new_os_hvp[os_name]:
884
              self.new_os_hvp[os_name][hv_name] = hv_dict
885
            else:
886
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
887

    
888
    # os parameters
889
    self.new_osp = objects.FillDict(cluster.osparams, {})
890
    if self.op.osparams:
891
      for os_name, osp in self.op.osparams.items():
892
        if os_name not in self.new_osp:
893
          self.new_osp[os_name] = {}
894

    
895
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
896
                                                 use_none=True)
897

    
898
        if not self.new_osp[os_name]:
899
          # we removed all parameters
900
          del self.new_osp[os_name]
901
        else:
902
          # check the parameter validity (remote check)
903
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
904
                        os_name, self.new_osp[os_name])
905

    
906
    # changes to the hypervisor list
907
    if self.op.enabled_hypervisors is not None:
908
      self.hv_list = self.op.enabled_hypervisors
909
      for hv in self.hv_list:
910
        # if the hypervisor doesn't already exist in the cluster
911
        # hvparams, we initialize it to empty, and then (in both
912
        # cases) we make sure to fill the defaults, as we might not
913
        # have a complete defaults list if the hypervisor wasn't
914
        # enabled before
915
        if hv not in new_hvp:
916
          new_hvp[hv] = {}
917
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
918
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
919
    else:
920
      self.hv_list = cluster.enabled_hypervisors
921

    
922
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
923
      # either the enabled list has changed, or the parameters have, validate
924
      for hv_name, hv_params in self.new_hvparams.items():
925
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
926
            (self.op.enabled_hypervisors and
927
             hv_name in self.op.enabled_hypervisors)):
928
          # either this is a new hypervisor, or its parameters have changed
929
          hv_class = hypervisor.GetHypervisorClass(hv_name)
930
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
931
          hv_class.CheckParameterSyntax(hv_params)
932
          CheckHVParams(self, node_uuids, hv_name, hv_params)
933

    
934
    self._CheckDiskTemplateConsistency()
935

    
936
    if self.op.os_hvp:
937
      # no need to check any newly-enabled hypervisors, since the
938
      # defaults have already been checked in the above code-block
939
      for os_name, os_hvp in self.new_os_hvp.items():
940
        for hv_name, hv_params in os_hvp.items():
941
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
942
          # we need to fill in the new os_hvp on top of the actual hv_p
943
          cluster_defaults = self.new_hvparams.get(hv_name, {})
944
          new_osp = objects.FillDict(cluster_defaults, hv_params)
945
          hv_class = hypervisor.GetHypervisorClass(hv_name)
946
          hv_class.CheckParameterSyntax(new_osp)
947
          CheckHVParams(self, node_uuids, hv_name, new_osp)
948

    
949
    if self.op.default_iallocator:
950
      alloc_script = utils.FindFile(self.op.default_iallocator,
951
                                    constants.IALLOCATOR_SEARCH_PATH,
952
                                    os.path.isfile)
953
      if alloc_script is None:
954
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
955
                                   " specified" % self.op.default_iallocator,
956
                                   errors.ECODE_INVAL)
957

    
958
  def _CheckDiskTemplateConsistency(self):
959
    """Check whether the disk templates that are going to be disabled
960
       are still in use by some instances.
961

962
    """
963
    if self.op.enabled_disk_templates:
964
      cluster = self.cfg.GetClusterInfo()
965
      instances = self.cfg.GetAllInstancesInfo()
966

    
967
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
968
        - set(self.op.enabled_disk_templates)
969
      for instance in instances.itervalues():
970
        if instance.disk_template in disk_templates_to_remove:
971
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
972
                                     " because instance '%s' is using it." %
973
                                     (instance.disk_template, instance.name))
974

    
975
  def _SetVgName(self, feedback_fn):
976
    """Determines and sets the new volume group name.
977

978
    """
979
    if self.op.vg_name is not None:
980
      if self.op.vg_name and not \
981
           utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
982
        feedback_fn("Note that you specified a volume group, but did not"
983
                    " enable any lvm disk template.")
984
      new_volume = self.op.vg_name
985
      if not new_volume:
986
        if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
987
          raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
988
                                     " disk templates are enabled.")
989
        new_volume = None
990
      if new_volume != self.cfg.GetVGName():
991
        self.cfg.SetVGName(new_volume)
992
      else:
993
        feedback_fn("Cluster LVM configuration already in desired"
994
                    " state, not changing")
995
    else:
996
      if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
997
          not self.cfg.GetVGName():
998
        raise errors.OpPrereqError("Please specify a volume group when"
999
                                   " enabling lvm-based disk-templates.")
1000

    
1001
  def Exec(self, feedback_fn):
1002
    """Change the parameters of the cluster.
1003

1004
    """
1005
    if self.op.enabled_disk_templates:
1006
      self.cluster.enabled_disk_templates = \
1007
        list(set(self.op.enabled_disk_templates))
1008

    
1009
    self._SetVgName(feedback_fn)
1010

    
1011
    if self.op.drbd_helper is not None:
1012
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1013
        feedback_fn("Note that you specified a drbd user helper, but did"
1014
                    " enabled the drbd disk template.")
1015
      new_helper = self.op.drbd_helper
1016
      if not new_helper:
1017
        new_helper = None
1018
      if new_helper != self.cfg.GetDRBDHelper():
1019
        self.cfg.SetDRBDHelper(new_helper)
1020
      else:
1021
        feedback_fn("Cluster DRBD helper already in desired state,"
1022
                    " not changing")
1023
    if self.op.hvparams:
1024
      self.cluster.hvparams = self.new_hvparams
1025
    if self.op.os_hvp:
1026
      self.cluster.os_hvp = self.new_os_hvp
1027
    if self.op.enabled_hypervisors is not None:
1028
      self.cluster.hvparams = self.new_hvparams
1029
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1030
    if self.op.beparams:
1031
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1032
    if self.op.nicparams:
1033
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1034
    if self.op.ipolicy:
1035
      self.cluster.ipolicy = self.new_ipolicy
1036
    if self.op.osparams:
1037
      self.cluster.osparams = self.new_osp
1038
    if self.op.ndparams:
1039
      self.cluster.ndparams = self.new_ndparams
1040
    if self.op.diskparams:
1041
      self.cluster.diskparams = self.new_diskparams
1042
    if self.op.hv_state:
1043
      self.cluster.hv_state_static = self.new_hv_state
1044
    if self.op.disk_state:
1045
      self.cluster.disk_state_static = self.new_disk_state
1046

    
1047
    if self.op.candidate_pool_size is not None:
1048
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1049
      # we need to update the pool size here, otherwise the save will fail
1050
      AdjustCandidatePool(self, [])
1051

    
1052
    if self.op.maintain_node_health is not None:
1053
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1054
        feedback_fn("Note: CONFD was disabled at build time, node health"
1055
                    " maintenance is not useful (still enabling it)")
1056
      self.cluster.maintain_node_health = self.op.maintain_node_health
1057

    
1058
    if self.op.modify_etc_hosts is not None:
1059
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1060

    
1061
    if self.op.prealloc_wipe_disks is not None:
1062
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1063

    
1064
    if self.op.add_uids is not None:
1065
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1066

    
1067
    if self.op.remove_uids is not None:
1068
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1069

    
1070
    if self.op.uid_pool is not None:
1071
      self.cluster.uid_pool = self.op.uid_pool
1072

    
1073
    if self.op.default_iallocator is not None:
1074
      self.cluster.default_iallocator = self.op.default_iallocator
1075

    
1076
    if self.op.reserved_lvs is not None:
1077
      self.cluster.reserved_lvs = self.op.reserved_lvs
1078

    
1079
    if self.op.use_external_mip_script is not None:
1080
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1081

    
1082
    def helper_os(aname, mods, desc):
1083
      desc += " OS list"
1084
      lst = getattr(self.cluster, aname)
1085
      for key, val in mods:
1086
        if key == constants.DDM_ADD:
1087
          if val in lst:
1088
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1089
          else:
1090
            lst.append(val)
1091
        elif key == constants.DDM_REMOVE:
1092
          if val in lst:
1093
            lst.remove(val)
1094
          else:
1095
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1096
        else:
1097
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1098

    
1099
    if self.op.hidden_os:
1100
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1101

    
1102
    if self.op.blacklisted_os:
1103
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1104

    
1105
    if self.op.master_netdev:
1106
      master_params = self.cfg.GetMasterNetworkParameters()
1107
      ems = self.cfg.GetUseExternalMipScript()
1108
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1109
                  self.cluster.master_netdev)
1110
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1111
                                                       master_params, ems)
1112
      if not self.op.force:
1113
        result.Raise("Could not disable the master ip")
1114
      else:
1115
        if result.fail_msg:
1116
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1117
                 result.fail_msg)
1118
          feedback_fn(msg)
1119
      feedback_fn("Changing master_netdev from %s to %s" %
1120
                  (master_params.netdev, self.op.master_netdev))
1121
      self.cluster.master_netdev = self.op.master_netdev
1122

    
1123
    if self.op.master_netmask:
1124
      master_params = self.cfg.GetMasterNetworkParameters()
1125
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1126
      result = self.rpc.call_node_change_master_netmask(
1127
                 master_params.uuid, master_params.netmask,
1128
                 self.op.master_netmask, master_params.ip,
1129
                 master_params.netdev)
1130
      result.Warn("Could not change the master IP netmask", feedback_fn)
1131
      self.cluster.master_netmask = self.op.master_netmask
1132

    
1133
    self.cfg.Update(self.cluster, feedback_fn)
1134

    
1135
    if self.op.master_netdev:
1136
      master_params = self.cfg.GetMasterNetworkParameters()
1137
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1138
                  self.op.master_netdev)
1139
      ems = self.cfg.GetUseExternalMipScript()
1140
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1141
                                                     master_params, ems)
1142
      result.Warn("Could not re-enable the master ip on the master,"
1143
                  " please restart manually", self.LogWarning)
1144

    
1145

    
1146
class LUClusterVerify(NoHooksLU):
1147
  """Submits all jobs necessary to verify the cluster.
1148

1149
  """
1150
  REQ_BGL = False
1151

    
1152
  def ExpandNames(self):
1153
    self.needed_locks = {}
1154

    
1155
  def Exec(self, feedback_fn):
1156
    jobs = []
1157

    
1158
    if self.op.group_name:
1159
      groups = [self.op.group_name]
1160
      depends_fn = lambda: None
1161
    else:
1162
      groups = self.cfg.GetNodeGroupList()
1163

    
1164
      # Verify global configuration
1165
      jobs.append([
1166
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1167
        ])
1168

    
1169
      # Always depend on global verification
1170
      depends_fn = lambda: [(-len(jobs), [])]
1171

    
1172
    jobs.extend(
1173
      [opcodes.OpClusterVerifyGroup(group_name=group,
1174
                                    ignore_errors=self.op.ignore_errors,
1175
                                    depends=depends_fn())]
1176
      for group in groups)
1177

    
1178
    # Fix up all parameters
1179
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1180
      op.debug_simulate_errors = self.op.debug_simulate_errors
1181
      op.verbose = self.op.verbose
1182
      op.error_codes = self.op.error_codes
1183
      try:
1184
        op.skip_checks = self.op.skip_checks
1185
      except AttributeError:
1186
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1187

    
1188
    return ResultWithJobs(jobs)
1189

    
1190

    
1191
class _VerifyErrors(object):
1192
  """Mix-in for cluster/group verify LUs.
1193

1194
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1195
  self.op and self._feedback_fn to be available.)
1196

1197
  """
1198

    
1199
  ETYPE_FIELD = "code"
1200
  ETYPE_ERROR = "ERROR"
1201
  ETYPE_WARNING = "WARNING"
1202

    
1203
  def _Error(self, ecode, item, msg, *args, **kwargs):
1204
    """Format an error message.
1205

1206
    Based on the opcode's error_codes parameter, either format a
1207
    parseable error code, or a simpler error string.
1208

1209
    This must be called only from Exec and functions called from Exec.
1210

1211
    """
1212
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1213
    itype, etxt, _ = ecode
1214
    # If the error code is in the list of ignored errors, demote the error to a
1215
    # warning
1216
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1217
      ltype = self.ETYPE_WARNING
1218
    # first complete the msg
1219
    if args:
1220
      msg = msg % args
1221
    # then format the whole message
1222
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1223
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1224
    else:
1225
      if item:
1226
        item = " " + item
1227
      else:
1228
        item = ""
1229
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1230
    # and finally report it via the feedback_fn
1231
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1232
    # do not mark the operation as failed for WARN cases only
1233
    if ltype == self.ETYPE_ERROR:
1234
      self.bad = True
1235

    
1236
  def _ErrorIf(self, cond, *args, **kwargs):
1237
    """Log an error message if the passed condition is True.
1238

1239
    """
1240
    if (bool(cond)
1241
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1242
      self._Error(*args, **kwargs)
1243

    
1244

    
1245
def _VerifyCertificate(filename):
1246
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1247

1248
  @type filename: string
1249
  @param filename: Path to PEM file
1250

1251
  """
1252
  try:
1253
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1254
                                           utils.ReadFile(filename))
1255
  except Exception, err: # pylint: disable=W0703
1256
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1257
            "Failed to load X509 certificate %s: %s" % (filename, err))
1258

    
1259
  (errcode, msg) = \
1260
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1261
                                constants.SSL_CERT_EXPIRATION_ERROR)
1262

    
1263
  if msg:
1264
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1265
  else:
1266
    fnamemsg = None
1267

    
1268
  if errcode is None:
1269
    return (None, fnamemsg)
1270
  elif errcode == utils.CERT_WARNING:
1271
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1272
  elif errcode == utils.CERT_ERROR:
1273
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1274

    
1275
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1276

    
1277

    
1278
def _GetAllHypervisorParameters(cluster, instances):
1279
  """Compute the set of all hypervisor parameters.
1280

1281
  @type cluster: L{objects.Cluster}
1282
  @param cluster: the cluster object
1283
  @param instances: list of L{objects.Instance}
1284
  @param instances: additional instances from which to obtain parameters
1285
  @rtype: list of (origin, hypervisor, parameters)
1286
  @return: a list with all parameters found, indicating the hypervisor they
1287
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1288

1289
  """
1290
  hvp_data = []
1291

    
1292
  for hv_name in cluster.enabled_hypervisors:
1293
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1294

    
1295
  for os_name, os_hvp in cluster.os_hvp.items():
1296
    for hv_name, hv_params in os_hvp.items():
1297
      if hv_params:
1298
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1299
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1300

    
1301
  # TODO: collapse identical parameter values in a single one
1302
  for instance in instances:
1303
    if instance.hvparams:
1304
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1305
                       cluster.FillHV(instance)))
1306

    
1307
  return hvp_data
1308

    
1309

    
1310
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1311
  """Verifies the cluster config.
1312

1313
  """
1314
  REQ_BGL = False
1315

    
1316
  def _VerifyHVP(self, hvp_data):
1317
    """Verifies locally the syntax of the hypervisor parameters.
1318

1319
    """
1320
    for item, hv_name, hv_params in hvp_data:
1321
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1322
             (item, hv_name))
1323
      try:
1324
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1325
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1326
        hv_class.CheckParameterSyntax(hv_params)
1327
      except errors.GenericError, err:
1328
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1329

    
1330
  def ExpandNames(self):
1331
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1332
    self.share_locks = ShareAll()
1333

    
1334
  def CheckPrereq(self):
1335
    """Check prerequisites.
1336

1337
    """
1338
    # Retrieve all information
1339
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1340
    self.all_node_info = self.cfg.GetAllNodesInfo()
1341
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1342

    
1343
  def Exec(self, feedback_fn):
1344
    """Verify integrity of cluster, performing various test on nodes.
1345

1346
    """
1347
    self.bad = False
1348
    self._feedback_fn = feedback_fn
1349

    
1350
    feedback_fn("* Verifying cluster config")
1351

    
1352
    for msg in self.cfg.VerifyConfig():
1353
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1354

    
1355
    feedback_fn("* Verifying cluster certificate files")
1356

    
1357
    for cert_filename in pathutils.ALL_CERT_FILES:
1358
      (errcode, msg) = _VerifyCertificate(cert_filename)
1359
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1360

    
1361
    self._ErrorIf(not utils.CanRead(constants.CONFD_USER,
1362
                                    pathutils.NODED_CERT_FILE),
1363
                  constants.CV_ECLUSTERCERT,
1364
                  None,
1365
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1366
                    constants.CONFD_USER + " user")
1367

    
1368
    feedback_fn("* Verifying hypervisor parameters")
1369

    
1370
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1371
                                                self.all_inst_info.values()))
1372

    
1373
    feedback_fn("* Verifying all nodes belong to an existing group")
1374

    
1375
    # We do this verification here because, should this bogus circumstance
1376
    # occur, it would never be caught by VerifyGroup, which only acts on
1377
    # nodes/instances reachable from existing node groups.
1378

    
1379
    dangling_nodes = set(node for node in self.all_node_info.values()
1380
                         if node.group not in self.all_group_info)
1381

    
1382
    dangling_instances = {}
1383
    no_node_instances = []
1384

    
1385
    for inst in self.all_inst_info.values():
1386
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1387
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1388
      elif inst.primary_node not in self.all_node_info:
1389
        no_node_instances.append(inst)
1390

    
1391
    pretty_dangling = [
1392
        "%s (%s)" %
1393
        (node.name,
1394
         utils.CommaJoin(
1395
           self.cfg.GetInstanceNames(
1396
             dangling_instances.get(node.uuid, ["no instances"]))))
1397
        for node in dangling_nodes]
1398

    
1399
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1400
                  None,
1401
                  "the following nodes (and their instances) belong to a non"
1402
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1403

    
1404
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1405
                  None,
1406
                  "the following instances have a non-existing primary-node:"
1407
                  " %s", utils.CommaJoin(
1408
                           self.cfg.GetInstanceNames(no_node_instances)))
1409

    
1410
    return not self.bad
1411

    
1412

    
1413
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1414
  """Verifies the status of a node group.
1415

1416
  """
1417
  HPATH = "cluster-verify"
1418
  HTYPE = constants.HTYPE_CLUSTER
1419
  REQ_BGL = False
1420

    
1421
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1422

    
1423
  class NodeImage(object):
1424
    """A class representing the logical and physical status of a node.
1425

1426
    @type uuid: string
1427
    @ivar uuid: the node UUID to which this object refers
1428
    @ivar volumes: a structure as returned from
1429
        L{ganeti.backend.GetVolumeList} (runtime)
1430
    @ivar instances: a list of running instances (runtime)
1431
    @ivar pinst: list of configured primary instances (config)
1432
    @ivar sinst: list of configured secondary instances (config)
1433
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1434
        instances for which this node is secondary (config)
1435
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1436
    @ivar dfree: free disk, as reported by the node (runtime)
1437
    @ivar offline: the offline status (config)
1438
    @type rpc_fail: boolean
1439
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1440
        not whether the individual keys were correct) (runtime)
1441
    @type lvm_fail: boolean
1442
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1443
    @type hyp_fail: boolean
1444
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1445
    @type ghost: boolean
1446
    @ivar ghost: whether this is a known node or not (config)
1447
    @type os_fail: boolean
1448
    @ivar os_fail: whether the RPC call didn't return valid OS data
1449
    @type oslist: list
1450
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1451
    @type vm_capable: boolean
1452
    @ivar vm_capable: whether the node can host instances
1453
    @type pv_min: float
1454
    @ivar pv_min: size in MiB of the smallest PVs
1455
    @type pv_max: float
1456
    @ivar pv_max: size in MiB of the biggest PVs
1457

1458
    """
1459
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1460
      self.uuid = uuid
1461
      self.volumes = {}
1462
      self.instances = []
1463
      self.pinst = []
1464
      self.sinst = []
1465
      self.sbp = {}
1466
      self.mfree = 0
1467
      self.dfree = 0
1468
      self.offline = offline
1469
      self.vm_capable = vm_capable
1470
      self.rpc_fail = False
1471
      self.lvm_fail = False
1472
      self.hyp_fail = False
1473
      self.ghost = False
1474
      self.os_fail = False
1475
      self.oslist = {}
1476
      self.pv_min = None
1477
      self.pv_max = None
1478

    
1479
  def ExpandNames(self):
1480
    # This raises errors.OpPrereqError on its own:
1481
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1482

    
1483
    # Get instances in node group; this is unsafe and needs verification later
1484
    inst_uuids = \
1485
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1486

    
1487
    self.needed_locks = {
1488
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1489
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1490
      locking.LEVEL_NODE: [],
1491

    
1492
      # This opcode is run by watcher every five minutes and acquires all nodes
1493
      # for a group. It doesn't run for a long time, so it's better to acquire
1494
      # the node allocation lock as well.
1495
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1496
      }
1497

    
1498
    self.share_locks = ShareAll()
1499

    
1500
  def DeclareLocks(self, level):
1501
    if level == locking.LEVEL_NODE:
1502
      # Get members of node group; this is unsafe and needs verification later
1503
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1504

    
1505
      # In Exec(), we warn about mirrored instances that have primary and
1506
      # secondary living in separate node groups. To fully verify that
1507
      # volumes for these instances are healthy, we will need to do an
1508
      # extra call to their secondaries. We ensure here those nodes will
1509
      # be locked.
1510
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1511
        # Important: access only the instances whose lock is owned
1512
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1513
        if instance.disk_template in constants.DTS_INT_MIRROR:
1514
          nodes.update(instance.secondary_nodes)
1515

    
1516
      self.needed_locks[locking.LEVEL_NODE] = nodes
1517

    
1518
  def CheckPrereq(self):
1519
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1520
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1521

    
1522
    group_node_uuids = set(self.group_info.members)
1523
    group_inst_uuids = \
1524
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1525

    
1526
    unlocked_node_uuids = \
1527
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1528

    
1529
    unlocked_inst_uuids = \
1530
        group_inst_uuids.difference(
1531
          [self.cfg.GetInstanceInfoByName(name).uuid
1532
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1533

    
1534
    if unlocked_node_uuids:
1535
      raise errors.OpPrereqError(
1536
        "Missing lock for nodes: %s" %
1537
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1538
        errors.ECODE_STATE)
1539

    
1540
    if unlocked_inst_uuids:
1541
      raise errors.OpPrereqError(
1542
        "Missing lock for instances: %s" %
1543
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1544
        errors.ECODE_STATE)
1545

    
1546
    self.all_node_info = self.cfg.GetAllNodesInfo()
1547
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1548

    
1549
    self.my_node_uuids = group_node_uuids
1550
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1551
                             for node_uuid in group_node_uuids)
1552

    
1553
    self.my_inst_uuids = group_inst_uuids
1554
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1555
                             for inst_uuid in group_inst_uuids)
1556

    
1557
    # We detect here the nodes that will need the extra RPC calls for verifying
1558
    # split LV volumes; they should be locked.
1559
    extra_lv_nodes = set()
1560

    
1561
    for inst in self.my_inst_info.values():
1562
      if inst.disk_template in constants.DTS_INT_MIRROR:
1563
        for nuuid in inst.all_nodes:
1564
          if self.all_node_info[nuuid].group != self.group_uuid:
1565
            extra_lv_nodes.add(nuuid)
1566

    
1567
    unlocked_lv_nodes = \
1568
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1569

    
1570
    if unlocked_lv_nodes:
1571
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1572
                                 utils.CommaJoin(unlocked_lv_nodes),
1573
                                 errors.ECODE_STATE)
1574
    self.extra_lv_nodes = list(extra_lv_nodes)
1575

    
1576
  def _VerifyNode(self, ninfo, nresult):
1577
    """Perform some basic validation on data returned from a node.
1578

1579
      - check the result data structure is well formed and has all the
1580
        mandatory fields
1581
      - check ganeti version
1582

1583
    @type ninfo: L{objects.Node}
1584
    @param ninfo: the node to check
1585
    @param nresult: the results from the node
1586
    @rtype: boolean
1587
    @return: whether overall this call was successful (and we can expect
1588
         reasonable values in the respose)
1589

1590
    """
1591
    # main result, nresult should be a non-empty dict
1592
    test = not nresult or not isinstance(nresult, dict)
1593
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1594
                  "unable to verify node: no data returned")
1595
    if test:
1596
      return False
1597

    
1598
    # compares ganeti version
1599
    local_version = constants.PROTOCOL_VERSION
1600
    remote_version = nresult.get("version", None)
1601
    test = not (remote_version and
1602
                isinstance(remote_version, (list, tuple)) and
1603
                len(remote_version) == 2)
1604
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1605
                  "connection to node returned invalid data")
1606
    if test:
1607
      return False
1608

    
1609
    test = local_version != remote_version[0]
1610
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1611
                  "incompatible protocol versions: master %s,"
1612
                  " node %s", local_version, remote_version[0])
1613
    if test:
1614
      return False
1615

    
1616
    # node seems compatible, we can actually try to look into its results
1617

    
1618
    # full package version
1619
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1620
                  constants.CV_ENODEVERSION, ninfo.name,
1621
                  "software version mismatch: master %s, node %s",
1622
                  constants.RELEASE_VERSION, remote_version[1],
1623
                  code=self.ETYPE_WARNING)
1624

    
1625
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1626
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1627
      for hv_name, hv_result in hyp_result.iteritems():
1628
        test = hv_result is not None
1629
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1630
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1631

    
1632
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1633
    if ninfo.vm_capable and isinstance(hvp_result, list):
1634
      for item, hv_name, hv_result in hvp_result:
1635
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1636
                      "hypervisor %s parameter verify failure (source %s): %s",
1637
                      hv_name, item, hv_result)
1638

    
1639
    test = nresult.get(constants.NV_NODESETUP,
1640
                       ["Missing NODESETUP results"])
1641
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1642
                  "node setup error: %s", "; ".join(test))
1643

    
1644
    return True
1645

    
1646
  def _VerifyNodeTime(self, ninfo, nresult,
1647
                      nvinfo_starttime, nvinfo_endtime):
1648
    """Check the node time.
1649

1650
    @type ninfo: L{objects.Node}
1651
    @param ninfo: the node to check
1652
    @param nresult: the remote results for the node
1653
    @param nvinfo_starttime: the start time of the RPC call
1654
    @param nvinfo_endtime: the end time of the RPC call
1655

1656
    """
1657
    ntime = nresult.get(constants.NV_TIME, None)
1658
    try:
1659
      ntime_merged = utils.MergeTime(ntime)
1660
    except (ValueError, TypeError):
1661
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1662
                    "Node returned invalid time")
1663
      return
1664

    
1665
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1666
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1667
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1668
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1669
    else:
1670
      ntime_diff = None
1671

    
1672
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1673
                  "Node time diverges by at least %s from master node time",
1674
                  ntime_diff)
1675

    
1676
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1677
    """Check the node LVM results and update info for cross-node checks.
1678

1679
    @type ninfo: L{objects.Node}
1680
    @param ninfo: the node to check
1681
    @param nresult: the remote results for the node
1682
    @param vg_name: the configured VG name
1683
    @type nimg: L{NodeImage}
1684
    @param nimg: node image
1685

1686
    """
1687
    if vg_name is None:
1688
      return
1689

    
1690
    # checks vg existence and size > 20G
1691
    vglist = nresult.get(constants.NV_VGLIST, None)
1692
    test = not vglist
1693
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1694
                  "unable to check volume groups")
1695
    if not test:
1696
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1697
                                            constants.MIN_VG_SIZE)
1698
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1699

    
1700
    # Check PVs
1701
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1702
    for em in errmsgs:
1703
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1704
    if pvminmax is not None:
1705
      (nimg.pv_min, nimg.pv_max) = pvminmax
1706

    
1707
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1708
    """Check cross-node DRBD version consistency.
1709

1710
    @type node_verify_infos: dict
1711
    @param node_verify_infos: infos about nodes as returned from the
1712
      node_verify call.
1713

1714
    """
1715
    node_versions = {}
1716
    for node_uuid, ndata in node_verify_infos.items():
1717
      nresult = ndata.payload
1718
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1719
      node_versions[node_uuid] = version
1720

    
1721
    if len(set(node_versions.values())) > 1:
1722
      for node_uuid, version in sorted(node_versions.items()):
1723
        msg = "DRBD version mismatch: %s" % version
1724
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1725
                    code=self.ETYPE_WARNING)
1726

    
1727
  def _VerifyGroupLVM(self, node_image, vg_name):
1728
    """Check cross-node consistency in LVM.
1729

1730
    @type node_image: dict
1731
    @param node_image: info about nodes, mapping from node to names to
1732
      L{NodeImage} objects
1733
    @param vg_name: the configured VG name
1734

1735
    """
1736
    if vg_name is None:
1737
      return
1738

    
1739
    # Only exclusive storage needs this kind of checks
1740
    if not self._exclusive_storage:
1741
      return
1742

    
1743
    # exclusive_storage wants all PVs to have the same size (approximately),
1744
    # if the smallest and the biggest ones are okay, everything is fine.
1745
    # pv_min is None iff pv_max is None
1746
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1747
    if not vals:
1748
      return
1749
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1750
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1751
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1752
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1753
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1754
                  " on %s, biggest (%s MB) is on %s",
1755
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1756
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1757

    
1758
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1759
    """Check the node bridges.
1760

1761
    @type ninfo: L{objects.Node}
1762
    @param ninfo: the node to check
1763
    @param nresult: the remote results for the node
1764
    @param bridges: the expected list of bridges
1765

1766
    """
1767
    if not bridges:
1768
      return
1769

    
1770
    missing = nresult.get(constants.NV_BRIDGES, None)
1771
    test = not isinstance(missing, list)
1772
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1773
                  "did not return valid bridge information")
1774
    if not test:
1775
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1776
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1777

    
1778
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1779
    """Check the results of user scripts presence and executability on the node
1780

1781
    @type ninfo: L{objects.Node}
1782
    @param ninfo: the node to check
1783
    @param nresult: the remote results for the node
1784

1785
    """
1786
    test = not constants.NV_USERSCRIPTS in nresult
1787
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1788
                  "did not return user scripts information")
1789

    
1790
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1791
    if not test:
1792
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1793
                    "user scripts not present or not executable: %s" %
1794
                    utils.CommaJoin(sorted(broken_scripts)))
1795

    
1796
  def _VerifyNodeNetwork(self, ninfo, nresult):
1797
    """Check the node network connectivity results.
1798

1799
    @type ninfo: L{objects.Node}
1800
    @param ninfo: the node to check
1801
    @param nresult: the remote results for the node
1802

1803
    """
1804
    test = constants.NV_NODELIST not in nresult
1805
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1806
                  "node hasn't returned node ssh connectivity data")
1807
    if not test:
1808
      if nresult[constants.NV_NODELIST]:
1809
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1810
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1811
                        "ssh communication with node '%s': %s", a_node, a_msg)
1812

    
1813
    test = constants.NV_NODENETTEST not in nresult
1814
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1815
                  "node hasn't returned node tcp connectivity data")
1816
    if not test:
1817
      if nresult[constants.NV_NODENETTEST]:
1818
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1819
        for anode in nlist:
1820
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1821
                        "tcp communication with node '%s': %s",
1822
                        anode, nresult[constants.NV_NODENETTEST][anode])
1823

    
1824
    test = constants.NV_MASTERIP not in nresult
1825
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1826
                  "node hasn't returned node master IP reachability data")
1827
    if not test:
1828
      if not nresult[constants.NV_MASTERIP]:
1829
        if ninfo.uuid == self.master_node:
1830
          msg = "the master node cannot reach the master IP (not configured?)"
1831
        else:
1832
          msg = "cannot reach the master IP"
1833
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1834

    
1835
  def _VerifyInstance(self, instance, node_image, diskstatus):
1836
    """Verify an instance.
1837

1838
    This function checks to see if the required block devices are
1839
    available on the instance's node, and that the nodes are in the correct
1840
    state.
1841

1842
    """
1843
    pnode_uuid = instance.primary_node
1844
    pnode_img = node_image[pnode_uuid]
1845
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1846

    
1847
    node_vol_should = {}
1848
    instance.MapLVsByNode(node_vol_should)
1849

    
1850
    cluster = self.cfg.GetClusterInfo()
1851
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1852
                                                            self.group_info)
1853
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1854
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1855
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
1856

    
1857
    for node_uuid in node_vol_should:
1858
      n_img = node_image[node_uuid]
1859
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1860
        # ignore missing volumes on offline or broken nodes
1861
        continue
1862
      for volume in node_vol_should[node_uuid]:
1863
        test = volume not in n_img.volumes
1864
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1865
                      "volume %s missing on node %s", volume,
1866
                      self.cfg.GetNodeName(node_uuid))
1867

    
1868
    if instance.admin_state == constants.ADMINST_UP:
1869
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1870
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1871
                    "instance not running on its primary node %s",
1872
                     self.cfg.GetNodeName(pnode_uuid))
1873
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1874
                    instance.name, "instance is marked as running and lives on"
1875
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1876

    
1877
    diskdata = [(nname, success, status, idx)
1878
                for (nname, disks) in diskstatus.items()
1879
                for idx, (success, status) in enumerate(disks)]
1880

    
1881
    for nname, success, bdev_status, idx in diskdata:
1882
      # the 'ghost node' construction in Exec() ensures that we have a
1883
      # node here
1884
      snode = node_image[nname]
1885
      bad_snode = snode.ghost or snode.offline
1886
      self._ErrorIf(instance.disks_active and
1887
                    not success and not bad_snode,
1888
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
1889
                    "couldn't retrieve status for disk/%s on %s: %s",
1890
                    idx, self.cfg.GetNodeName(nname), bdev_status)
1891

    
1892
      if instance.disks_active and success and \
1893
         (bdev_status.is_degraded or
1894
          bdev_status.ldisk_status != constants.LDS_OKAY):
1895
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
1896
        if bdev_status.is_degraded:
1897
          msg += " is degraded"
1898
        if bdev_status.ldisk_status != constants.LDS_OKAY:
1899
          msg += "; state is '%s'" % \
1900
                 constants.LDS_NAMES[bdev_status.ldisk_status]
1901

    
1902
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
1903

    
1904
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1905
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
1906
                  "instance %s, connection to primary node failed",
1907
                  instance.name)
1908

    
1909
    self._ErrorIf(len(instance.secondary_nodes) > 1,
1910
                  constants.CV_EINSTANCELAYOUT, instance.name,
1911
                  "instance has multiple secondary nodes: %s",
1912
                  utils.CommaJoin(instance.secondary_nodes),
1913
                  code=self.ETYPE_WARNING)
1914

    
1915
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
1916
    if any(es_flags.values()):
1917
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
1918
        # Disk template not compatible with exclusive_storage: no instance
1919
        # node should have the flag set
1920
        es_nodes = [n
1921
                    for (n, es) in es_flags.items()
1922
                    if es]
1923
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
1924
                    "instance has template %s, which is not supported on nodes"
1925
                    " that have exclusive storage set: %s",
1926
                    instance.disk_template,
1927
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
1928
      for (idx, disk) in enumerate(instance.disks):
1929
        self._ErrorIf(disk.spindles is None,
1930
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
1931
                      "number of spindles not configured for disk %s while"
1932
                      " exclusive storage is enabled, try running"
1933
                      " gnt-cluster repair-disk-sizes", idx)
1934

    
1935
    if instance.disk_template in constants.DTS_INT_MIRROR:
1936
      instance_nodes = utils.NiceSort(instance.all_nodes)
1937
      instance_groups = {}
1938

    
1939
      for node_uuid in instance_nodes:
1940
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
1941
                                   []).append(node_uuid)
1942

    
1943
      pretty_list = [
1944
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
1945
                           groupinfo[group].name)
1946
        # Sort so that we always list the primary node first.
1947
        for group, nodes in sorted(instance_groups.items(),
1948
                                   key=lambda (_, nodes): pnode_uuid in nodes,
1949
                                   reverse=True)]
1950

    
1951
      self._ErrorIf(len(instance_groups) > 1,
1952
                    constants.CV_EINSTANCESPLITGROUPS,
1953
                    instance.name, "instance has primary and secondary nodes in"
1954
                    " different groups: %s", utils.CommaJoin(pretty_list),
1955
                    code=self.ETYPE_WARNING)
1956

    
1957
    inst_nodes_offline = []
1958
    for snode in instance.secondary_nodes:
1959
      s_img = node_image[snode]
1960
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1961
                    self.cfg.GetNodeName(snode),
1962
                    "instance %s, connection to secondary node failed",
1963
                    instance.name)
1964

    
1965
      if s_img.offline:
1966
        inst_nodes_offline.append(snode)
1967

    
1968
    # warn that the instance lives on offline nodes
1969
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
1970
                  instance.name, "instance has offline secondary node(s) %s",
1971
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
1972
    # ... or ghost/non-vm_capable nodes
1973
    for node_uuid in instance.all_nodes:
1974
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
1975
                    instance.name, "instance lives on ghost node %s",
1976
                    self.cfg.GetNodeName(node_uuid))
1977
      self._ErrorIf(not node_image[node_uuid].vm_capable,
1978
                    constants.CV_EINSTANCEBADNODE, instance.name,
1979
                    "instance lives on non-vm_capable node %s",
1980
                    self.cfg.GetNodeName(node_uuid))
1981

    
1982
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1983
    """Verify if there are any unknown volumes in the cluster.
1984

1985
    The .os, .swap and backup volumes are ignored. All other volumes are
1986
    reported as unknown.
1987

1988
    @type reserved: L{ganeti.utils.FieldSet}
1989
    @param reserved: a FieldSet of reserved volume names
1990

1991
    """
1992
    for node_uuid, n_img in node_image.items():
1993
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1994
          self.all_node_info[node_uuid].group != self.group_uuid):
1995
        # skip non-healthy nodes
1996
        continue
1997
      for volume in n_img.volumes:
1998
        test = ((node_uuid not in node_vol_should or
1999
                volume not in node_vol_should[node_uuid]) and
2000
                not reserved.Matches(volume))
2001
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2002
                      self.cfg.GetNodeName(node_uuid),
2003
                      "volume %s is unknown", volume)
2004

    
2005
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2006
    """Verify N+1 Memory Resilience.
2007

2008
    Check that if one single node dies we can still start all the
2009
    instances it was primary for.
2010

2011
    """
2012
    cluster_info = self.cfg.GetClusterInfo()
2013
    for node_uuid, n_img in node_image.items():
2014
      # This code checks that every node which is now listed as
2015
      # secondary has enough memory to host all instances it is
2016
      # supposed to should a single other node in the cluster fail.
2017
      # FIXME: not ready for failover to an arbitrary node
2018
      # FIXME: does not support file-backed instances
2019
      # WARNING: we currently take into account down instances as well
2020
      # as up ones, considering that even if they're down someone
2021
      # might want to start them even in the event of a node failure.
2022
      if n_img.offline or \
2023
         self.all_node_info[node_uuid].group != self.group_uuid:
2024
        # we're skipping nodes marked offline and nodes in other groups from
2025
        # the N+1 warning, since most likely we don't have good memory
2026
        # infromation from them; we already list instances living on such
2027
        # nodes, and that's enough warning
2028
        continue
2029
      #TODO(dynmem): also consider ballooning out other instances
2030
      for prinode, inst_uuids in n_img.sbp.items():
2031
        needed_mem = 0
2032
        for inst_uuid in inst_uuids:
2033
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2034
          if bep[constants.BE_AUTO_BALANCE]:
2035
            needed_mem += bep[constants.BE_MINMEM]
2036
        test = n_img.mfree < needed_mem
2037
        self._ErrorIf(test, constants.CV_ENODEN1,
2038
                      self.cfg.GetNodeName(node_uuid),
2039
                      "not enough memory to accomodate instance failovers"
2040
                      " should node %s fail (%dMiB needed, %dMiB available)",
2041
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2042

    
2043
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2044
                   (files_all, files_opt, files_mc, files_vm)):
2045
    """Verifies file checksums collected from all nodes.
2046

2047
    @param nodes: List of L{objects.Node} objects
2048
    @param master_node_uuid: UUID of master node
2049
    @param all_nvinfo: RPC results
2050

2051
    """
2052
    # Define functions determining which nodes to consider for a file
2053
    files2nodefn = [
2054
      (files_all, None),
2055
      (files_mc, lambda node: (node.master_candidate or
2056
                               node.uuid == master_node_uuid)),
2057
      (files_vm, lambda node: node.vm_capable),
2058
      ]
2059

    
2060
    # Build mapping from filename to list of nodes which should have the file
2061
    nodefiles = {}
2062
    for (files, fn) in files2nodefn:
2063
      if fn is None:
2064
        filenodes = nodes
2065
      else:
2066
        filenodes = filter(fn, nodes)
2067
      nodefiles.update((filename,
2068
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2069
                       for filename in files)
2070

    
2071
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2072

    
2073
    fileinfo = dict((filename, {}) for filename in nodefiles)
2074
    ignore_nodes = set()
2075

    
2076
    for node in nodes:
2077
      if node.offline:
2078
        ignore_nodes.add(node.uuid)
2079
        continue
2080

    
2081
      nresult = all_nvinfo[node.uuid]
2082

    
2083
      if nresult.fail_msg or not nresult.payload:
2084
        node_files = None
2085
      else:
2086
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2087
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2088
                          for (key, value) in fingerprints.items())
2089
        del fingerprints
2090

    
2091
      test = not (node_files and isinstance(node_files, dict))
2092
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2093
                    "Node did not return file checksum data")
2094
      if test:
2095
        ignore_nodes.add(node.uuid)
2096
        continue
2097

    
2098
      # Build per-checksum mapping from filename to nodes having it
2099
      for (filename, checksum) in node_files.items():
2100
        assert filename in nodefiles
2101
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2102

    
2103
    for (filename, checksums) in fileinfo.items():
2104
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2105

    
2106
      # Nodes having the file
2107
      with_file = frozenset(node_uuid
2108
                            for node_uuids in fileinfo[filename].values()
2109
                            for node_uuid in node_uuids) - ignore_nodes
2110

    
2111
      expected_nodes = nodefiles[filename] - ignore_nodes
2112

    
2113
      # Nodes missing file
2114
      missing_file = expected_nodes - with_file
2115

    
2116
      if filename in files_opt:
2117
        # All or no nodes
2118
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2119
                      constants.CV_ECLUSTERFILECHECK, None,
2120
                      "File %s is optional, but it must exist on all or no"
2121
                      " nodes (not found on %s)",
2122
                      filename,
2123
                      utils.CommaJoin(
2124
                        utils.NiceSort(
2125
                          map(self.cfg.GetNodeName, missing_file))))
2126
      else:
2127
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2128
                      "File %s is missing from node(s) %s", filename,
2129
                      utils.CommaJoin(
2130
                        utils.NiceSort(
2131
                          map(self.cfg.GetNodeName, missing_file))))
2132

    
2133
        # Warn if a node has a file it shouldn't
2134
        unexpected = with_file - expected_nodes
2135
        self._ErrorIf(unexpected,
2136
                      constants.CV_ECLUSTERFILECHECK, None,
2137
                      "File %s should not exist on node(s) %s",
2138
                      filename, utils.CommaJoin(
2139
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2140

    
2141
      # See if there are multiple versions of the file
2142
      test = len(checksums) > 1
2143
      if test:
2144
        variants = ["variant %s on %s" %
2145
                    (idx + 1,
2146
                     utils.CommaJoin(utils.NiceSort(
2147
                       map(self.cfg.GetNodeName, node_uuids))))
2148
                    for (idx, (checksum, node_uuids)) in
2149
                      enumerate(sorted(checksums.items()))]
2150
      else:
2151
        variants = []
2152

    
2153
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2154
                    "File %s found with %s different checksums (%s)",
2155
                    filename, len(checksums), "; ".join(variants))
2156

    
2157
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2158
                      drbd_map):
2159
    """Verifies and the node DRBD status.
2160

2161
    @type ninfo: L{objects.Node}
2162
    @param ninfo: the node to check
2163
    @param nresult: the remote results for the node
2164
    @param instanceinfo: the dict of instances
2165
    @param drbd_helper: the configured DRBD usermode helper
2166
    @param drbd_map: the DRBD map as returned by
2167
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2168

2169
    """
2170
    if drbd_helper:
2171
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2172
      test = (helper_result is None)
2173
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2174
                    "no drbd usermode helper returned")
2175
      if helper_result:
2176
        status, payload = helper_result
2177
        test = not status
2178
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2179
                      "drbd usermode helper check unsuccessful: %s", payload)
2180
        test = status and (payload != drbd_helper)
2181
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2182
                      "wrong drbd usermode helper: %s", payload)
2183

    
2184
    # compute the DRBD minors
2185
    node_drbd = {}
2186
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2187
      test = inst_uuid not in instanceinfo
2188
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2189
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2190
        # ghost instance should not be running, but otherwise we
2191
        # don't give double warnings (both ghost instance and
2192
        # unallocated minor in use)
2193
      if test:
2194
        node_drbd[minor] = (inst_uuid, False)
2195
      else:
2196
        instance = instanceinfo[inst_uuid]
2197
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2198

    
2199
    # and now check them
2200
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2201
    test = not isinstance(used_minors, (tuple, list))
2202
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2203
                  "cannot parse drbd status file: %s", str(used_minors))
2204
    if test:
2205
      # we cannot check drbd status
2206
      return
2207

    
2208
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2209
      test = minor not in used_minors and must_exist
2210
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2211
                    "drbd minor %d of instance %s is not active", minor,
2212
                    self.cfg.GetInstanceName(inst_uuid))
2213
    for minor in used_minors:
2214
      test = minor not in node_drbd
2215
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2216
                    "unallocated drbd minor %d is in use", minor)
2217

    
2218
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2219
    """Builds the node OS structures.
2220

2221
    @type ninfo: L{objects.Node}
2222
    @param ninfo: the node to check
2223
    @param nresult: the remote results for the node
2224
    @param nimg: the node image object
2225

2226
    """
2227
    remote_os = nresult.get(constants.NV_OSLIST, None)
2228
    test = (not isinstance(remote_os, list) or
2229
            not compat.all(isinstance(v, list) and len(v) == 7
2230
                           for v in remote_os))
2231

    
2232
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2233
                  "node hasn't returned valid OS data")
2234

    
2235
    nimg.os_fail = test
2236

    
2237
    if test:
2238
      return
2239

    
2240
    os_dict = {}
2241

    
2242
    for (name, os_path, status, diagnose,
2243
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2244

    
2245
      if name not in os_dict:
2246
        os_dict[name] = []
2247

    
2248
      # parameters is a list of lists instead of list of tuples due to
2249
      # JSON lacking a real tuple type, fix it:
2250
      parameters = [tuple(v) for v in parameters]
2251
      os_dict[name].append((os_path, status, diagnose,
2252
                            set(variants), set(parameters), set(api_ver)))
2253

    
2254
    nimg.oslist = os_dict
2255

    
2256
  def _VerifyNodeOS(self, ninfo, nimg, base):
2257
    """Verifies the node OS list.
2258

2259
    @type ninfo: L{objects.Node}
2260
    @param ninfo: the node to check
2261
    @param nimg: the node image object
2262
    @param base: the 'template' node we match against (e.g. from the master)
2263

2264
    """
2265
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2266

    
2267
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2268
    for os_name, os_data in nimg.oslist.items():
2269
      assert os_data, "Empty OS status for OS %s?!" % os_name
2270
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2271
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2272
                    "Invalid OS %s (located at %s): %s",
2273
                    os_name, f_path, f_diag)
2274
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2275
                    "OS '%s' has multiple entries"
2276
                    " (first one shadows the rest): %s",
2277
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2278
      # comparisons with the 'base' image
2279
      test = os_name not in base.oslist
2280
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2281
                    "Extra OS %s not present on reference node (%s)",
2282
                    os_name, self.cfg.GetNodeName(base.uuid))
2283
      if test:
2284
        continue
2285
      assert base.oslist[os_name], "Base node has empty OS status?"
2286
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2287
      if not b_status:
2288
        # base OS is invalid, skipping
2289
        continue
2290
      for kind, a, b in [("API version", f_api, b_api),
2291
                         ("variants list", f_var, b_var),
2292
                         ("parameters", beautify_params(f_param),
2293
                          beautify_params(b_param))]:
2294
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2295
                      "OS %s for %s differs from reference node %s:"
2296
                      " [%s] vs. [%s]", kind, os_name,
2297
                      self.cfg.GetNodeName(base.uuid),
2298
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2299

    
2300
    # check any missing OSes
2301
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2302
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2303
                  "OSes present on reference node %s"
2304
                  " but missing on this node: %s",
2305
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2306

    
2307
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2308
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2309

2310
    @type ninfo: L{objects.Node}
2311
    @param ninfo: the node to check
2312
    @param nresult: the remote results for the node
2313
    @type is_master: bool
2314
    @param is_master: Whether node is the master node
2315

2316
    """
2317
    cluster = self.cfg.GetClusterInfo()
2318
    if (is_master and
2319
        (cluster.IsFileStorageEnabled() or
2320
         cluster.IsSharedFileStorageEnabled())):
2321
      try:
2322
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2323
      except KeyError:
2324
        # This should never happen
2325
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2326
                      "Node did not return forbidden file storage paths")
2327
      else:
2328
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2329
                      "Found forbidden file storage paths: %s",
2330
                      utils.CommaJoin(fspaths))
2331
    else:
2332
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2333
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2334
                    "Node should not have returned forbidden file storage"
2335
                    " paths")
2336

    
2337
  def _VerifyStoragePaths(self, ninfo, nresult):
2338
    """Verifies (file) storage paths.
2339

2340
    @type ninfo: L{objects.Node}
2341
    @param ninfo: the node to check
2342
    @param nresult: the remote results for the node
2343

2344
    """
2345
    cluster = self.cfg.GetClusterInfo()
2346
    if cluster.IsFileStorageEnabled():
2347
      self._ErrorIf(
2348
          constants.NV_FILE_STORAGE_PATH in nresult,
2349
          constants.CV_ENODEFILESTORAGEPATHUNUSABLE, ninfo.name,
2350
          "The configured file storage path is unusable: %s" %
2351
          nresult.get(constants.NV_FILE_STORAGE_PATH))
2352

    
2353
  def _VerifyOob(self, ninfo, nresult):
2354
    """Verifies out of band functionality of a node.
2355

2356
    @type ninfo: L{objects.Node}
2357
    @param ninfo: the node to check
2358
    @param nresult: the remote results for the node
2359

2360
    """
2361
    # We just have to verify the paths on master and/or master candidates
2362
    # as the oob helper is invoked on the master
2363
    if ((ninfo.master_candidate or ninfo.master_capable) and
2364
        constants.NV_OOB_PATHS in nresult):
2365
      for path_result in nresult[constants.NV_OOB_PATHS]:
2366
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2367
                      ninfo.name, path_result)
2368

    
2369
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2370
    """Verifies and updates the node volume data.
2371

2372
    This function will update a L{NodeImage}'s internal structures
2373
    with data from the remote call.
2374

2375
    @type ninfo: L{objects.Node}
2376
    @param ninfo: the node to check
2377
    @param nresult: the remote results for the node
2378
    @param nimg: the node image object
2379
    @param vg_name: the configured VG name
2380

2381
    """
2382
    nimg.lvm_fail = True
2383
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2384
    if vg_name is None:
2385
      pass
2386
    elif isinstance(lvdata, basestring):
2387
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2388
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2389
    elif not isinstance(lvdata, dict):
2390
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2391
                    "rpc call to node failed (lvlist)")
2392
    else:
2393
      nimg.volumes = lvdata
2394
      nimg.lvm_fail = False
2395

    
2396
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2397
    """Verifies and updates the node instance list.
2398

2399
    If the listing was successful, then updates this node's instance
2400
    list. Otherwise, it marks the RPC call as failed for the instance
2401
    list key.
2402

2403
    @type ninfo: L{objects.Node}
2404
    @param ninfo: the node to check
2405
    @param nresult: the remote results for the node
2406
    @param nimg: the node image object
2407

2408
    """
2409
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2410
    test = not isinstance(idata, list)
2411
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2412
                  "rpc call to node failed (instancelist): %s",
2413
                  utils.SafeEncode(str(idata)))
2414
    if test:
2415
      nimg.hyp_fail = True
2416
    else:
2417
      nimg.instances = [inst.uuid for (_, inst) in
2418
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2419

    
2420
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2421
    """Verifies and computes a node information map
2422

2423
    @type ninfo: L{objects.Node}
2424
    @param ninfo: the node to check
2425
    @param nresult: the remote results for the node
2426
    @param nimg: the node image object
2427
    @param vg_name: the configured VG name
2428

2429
    """
2430
    # try to read free memory (from the hypervisor)
2431
    hv_info = nresult.get(constants.NV_HVINFO, None)
2432
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2433
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2434
                  "rpc call to node failed (hvinfo)")
2435
    if not test:
2436
      try:
2437
        nimg.mfree = int(hv_info["memory_free"])
2438
      except (ValueError, TypeError):
2439
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2440
                      "node returned invalid nodeinfo, check hypervisor")
2441

    
2442
    # FIXME: devise a free space model for file based instances as well
2443
    if vg_name is not None:
2444
      test = (constants.NV_VGLIST not in nresult or
2445
              vg_name not in nresult[constants.NV_VGLIST])
2446
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2447
                    "node didn't return data for the volume group '%s'"
2448
                    " - it is either missing or broken", vg_name)
2449
      if not test:
2450
        try:
2451
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2452
        except (ValueError, TypeError):
2453
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2454
                        "node returned invalid LVM info, check LVM status")
2455

    
2456
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2457
    """Gets per-disk status information for all instances.
2458

2459
    @type node_uuids: list of strings
2460
    @param node_uuids: Node UUIDs
2461
    @type node_image: dict of (UUID, L{objects.Node})
2462
    @param node_image: Node objects
2463
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2464
    @param instanceinfo: Instance objects
2465
    @rtype: {instance: {node: [(succes, payload)]}}
2466
    @return: a dictionary of per-instance dictionaries with nodes as
2467
        keys and disk information as values; the disk information is a
2468
        list of tuples (success, payload)
2469

2470
    """
2471
    node_disks = {}
2472
    node_disks_devonly = {}
2473
    diskless_instances = set()
2474
    diskless = constants.DT_DISKLESS
2475

    
2476
    for nuuid in node_uuids:
2477
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2478
                                             node_image[nuuid].sinst))
2479
      diskless_instances.update(uuid for uuid in node_inst_uuids
2480
                                if instanceinfo[uuid].disk_template == diskless)
2481
      disks = [(inst_uuid, disk)
2482
               for inst_uuid in node_inst_uuids
2483
               for disk in instanceinfo[inst_uuid].disks]
2484

    
2485
      if not disks:
2486
        # No need to collect data
2487
        continue
2488

    
2489
      node_disks[nuuid] = disks
2490

    
2491
      # _AnnotateDiskParams makes already copies of the disks
2492
      devonly = []
2493
      for (inst_uuid, dev) in disks:
2494
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2495
                                          self.cfg)
2496
        self.cfg.SetDiskID(anno_disk, nuuid)
2497
        devonly.append(anno_disk)
2498

    
2499
      node_disks_devonly[nuuid] = devonly
2500

    
2501
    assert len(node_disks) == len(node_disks_devonly)
2502

    
2503
    # Collect data from all nodes with disks
2504
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2505
                                                          node_disks_devonly)
2506

    
2507
    assert len(result) == len(node_disks)
2508

    
2509
    instdisk = {}
2510

    
2511
    for (nuuid, nres) in result.items():
2512
      node = self.cfg.GetNodeInfo(nuuid)
2513
      disks = node_disks[node.uuid]
2514

    
2515
      if nres.offline:
2516
        # No data from this node
2517
        data = len(disks) * [(False, "node offline")]
2518
      else:
2519
        msg = nres.fail_msg
2520
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2521
                      "while getting disk information: %s", msg)
2522
        if msg:
2523
          # No data from this node
2524
          data = len(disks) * [(False, msg)]
2525
        else:
2526
          data = []
2527
          for idx, i in enumerate(nres.payload):
2528
            if isinstance(i, (tuple, list)) and len(i) == 2:
2529
              data.append(i)
2530
            else:
2531
              logging.warning("Invalid result from node %s, entry %d: %s",
2532
                              node.name, idx, i)
2533
              data.append((False, "Invalid result from the remote node"))
2534

    
2535
      for ((inst_uuid, _), status) in zip(disks, data):
2536
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2537
          .append(status)
2538

    
2539
    # Add empty entries for diskless instances.
2540
    for inst_uuid in diskless_instances:
2541
      assert inst_uuid not in instdisk
2542
      instdisk[inst_uuid] = {}
2543

    
2544
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2545
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2546
                      compat.all(isinstance(s, (tuple, list)) and
2547
                                 len(s) == 2 for s in statuses)
2548
                      for inst, nuuids in instdisk.items()
2549
                      for nuuid, statuses in nuuids.items())
2550
    if __debug__:
2551
      instdisk_keys = set(instdisk)
2552
      instanceinfo_keys = set(instanceinfo)
2553
      assert instdisk_keys == instanceinfo_keys, \
2554
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2555
         (instdisk_keys, instanceinfo_keys))
2556

    
2557
    return instdisk
2558

    
2559
  @staticmethod
2560
  def _SshNodeSelector(group_uuid, all_nodes):
2561
    """Create endless iterators for all potential SSH check hosts.
2562

2563
    """
2564
    nodes = [node for node in all_nodes
2565
             if (node.group != group_uuid and
2566
                 not node.offline)]
2567
    keyfunc = operator.attrgetter("group")
2568

    
2569
    return map(itertools.cycle,
2570
               [sorted(map(operator.attrgetter("name"), names))
2571
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2572
                                                  keyfunc)])
2573

    
2574
  @classmethod
2575
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2576
    """Choose which nodes should talk to which other nodes.
2577

2578
    We will make nodes contact all nodes in their group, and one node from
2579
    every other group.
2580

2581
    @warning: This algorithm has a known issue if one node group is much
2582
      smaller than others (e.g. just one node). In such a case all other
2583
      nodes will talk to the single node.
2584

2585
    """
2586
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2587
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2588

    
2589
    return (online_nodes,
2590
            dict((name, sorted([i.next() for i in sel]))
2591
                 for name in online_nodes))
2592

    
2593
  def BuildHooksEnv(self):
2594
    """Build hooks env.
2595

2596
    Cluster-Verify hooks just ran in the post phase and their failure makes
2597
    the output be logged in the verify output and the verification to fail.
2598

2599
    """
2600
    env = {
2601
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2602
      }
2603

    
2604
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2605
               for node in self.my_node_info.values())
2606

    
2607
    return env
2608

    
2609
  def BuildHooksNodes(self):
2610
    """Build hooks nodes.
2611

2612
    """
2613
    return ([], list(self.my_node_info.keys()))
2614

    
2615
  def Exec(self, feedback_fn):
2616
    """Verify integrity of the node group, performing various test on nodes.
2617

2618
    """
2619
    # This method has too many local variables. pylint: disable=R0914
2620
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2621

    
2622
    if not self.my_node_uuids:
2623
      # empty node group
2624
      feedback_fn("* Empty node group, skipping verification")
2625
      return True
2626

    
2627
    self.bad = False
2628
    verbose = self.op.verbose
2629
    self._feedback_fn = feedback_fn
2630

    
2631
    vg_name = self.cfg.GetVGName()
2632
    drbd_helper = self.cfg.GetDRBDHelper()
2633
    cluster = self.cfg.GetClusterInfo()
2634
    hypervisors = cluster.enabled_hypervisors
2635
    node_data_list = self.my_node_info.values()
2636

    
2637
    i_non_redundant = [] # Non redundant instances
2638
    i_non_a_balanced = [] # Non auto-balanced instances
2639
    i_offline = 0 # Count of offline instances
2640
    n_offline = 0 # Count of offline nodes
2641
    n_drained = 0 # Count of nodes being drained
2642
    node_vol_should = {}
2643

    
2644
    # FIXME: verify OS list
2645

    
2646
    # File verification
2647
    filemap = ComputeAncillaryFiles(cluster, False)
2648

    
2649
    # do local checksums
2650
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2651
    master_ip = self.cfg.GetMasterIP()
2652

    
2653
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2654

    
2655
    user_scripts = []
2656
    if self.cfg.GetUseExternalMipScript():
2657
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2658

    
2659
    node_verify_param = {
2660
      constants.NV_FILELIST:
2661
        map(vcluster.MakeVirtualPath,
2662
            utils.UniqueSequence(filename
2663
                                 for files in filemap
2664
                                 for filename in files)),
2665
      constants.NV_NODELIST:
2666
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2667
                                  self.all_node_info.values()),
2668
      constants.NV_HYPERVISOR: hypervisors,
2669
      constants.NV_HVPARAMS:
2670
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2671
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2672
                                 for node in node_data_list
2673
                                 if not node.offline],
2674
      constants.NV_INSTANCELIST: hypervisors,
2675
      constants.NV_VERSION: None,
2676
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2677
      constants.NV_NODESETUP: None,
2678
      constants.NV_TIME: None,
2679
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2680
      constants.NV_OSLIST: None,
2681
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2682
      constants.NV_USERSCRIPTS: user_scripts,
2683
      }
2684

    
2685
    if vg_name is not None:
2686
      node_verify_param[constants.NV_VGLIST] = None
2687
      node_verify_param[constants.NV_LVLIST] = vg_name
2688
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2689

    
2690
    if drbd_helper:
2691
      node_verify_param[constants.NV_DRBDVERSION] = None
2692
      node_verify_param[constants.NV_DRBDLIST] = None
2693
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2694

    
2695
    if cluster.IsFileStorageEnabled() or \
2696
        cluster.IsSharedFileStorageEnabled():
2697
      # Load file storage paths only from master node
2698
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2699
        self.cfg.GetMasterNodeName()
2700
      if cluster.IsFileStorageEnabled():
2701
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2702
          cluster.file_storage_dir
2703

    
2704
    # bridge checks
2705
    # FIXME: this needs to be changed per node-group, not cluster-wide
2706
    bridges = set()
2707
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2708
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2709
      bridges.add(default_nicpp[constants.NIC_LINK])
2710
    for inst_uuid in self.my_inst_info.values():
2711
      for nic in inst_uuid.nics:
2712
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2713
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2714
          bridges.add(full_nic[constants.NIC_LINK])
2715

    
2716
    if bridges:
2717
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2718

    
2719
    # Build our expected cluster state
2720
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2721
                                                 uuid=node.uuid,
2722
                                                 vm_capable=node.vm_capable))
2723
                      for node in node_data_list)
2724

    
2725
    # Gather OOB paths
2726
    oob_paths = []
2727
    for node in self.all_node_info.values():
2728
      path = SupportsOob(self.cfg, node)
2729
      if path and path not in oob_paths:
2730
        oob_paths.append(path)
2731

    
2732
    if oob_paths:
2733
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2734

    
2735
    for inst_uuid in self.my_inst_uuids:
2736
      instance = self.my_inst_info[inst_uuid]
2737
      if instance.admin_state == constants.ADMINST_OFFLINE:
2738
        i_offline += 1
2739

    
2740
      for nuuid in instance.all_nodes:
2741
        if nuuid not in node_image:
2742
          gnode = self.NodeImage(uuid=nuuid)
2743
          gnode.ghost = (nuuid not in self.all_node_info)
2744
          node_image[nuuid] = gnode
2745

    
2746
      instance.MapLVsByNode(node_vol_should)
2747

    
2748
      pnode = instance.primary_node
2749
      node_image[pnode].pinst.append(instance.uuid)
2750

    
2751
      for snode in instance.secondary_nodes:
2752
        nimg = node_image[snode]
2753
        nimg.sinst.append(instance.uuid)
2754
        if pnode not in nimg.sbp:
2755
          nimg.sbp[pnode] = []
2756
        nimg.sbp[pnode].append(instance.uuid)
2757

    
2758
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2759
                                               self.my_node_info.keys())
2760
    # The value of exclusive_storage should be the same across the group, so if
2761
    # it's True for at least a node, we act as if it were set for all the nodes
2762
    self._exclusive_storage = compat.any(es_flags.values())
2763
    if self._exclusive_storage:
2764
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2765

    
2766
    # At this point, we have the in-memory data structures complete,
2767
    # except for the runtime information, which we'll gather next
2768

    
2769
    # Due to the way our RPC system works, exact response times cannot be
2770
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2771
    # time before and after executing the request, we can at least have a time
2772
    # window.
2773
    nvinfo_starttime = time.time()
2774
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2775
                                           node_verify_param,
2776
                                           self.cfg.GetClusterName(),
2777
                                           self.cfg.GetClusterInfo().hvparams)
2778
    nvinfo_endtime = time.time()
2779

    
2780
    if self.extra_lv_nodes and vg_name is not None:
2781
      extra_lv_nvinfo = \
2782
          self.rpc.call_node_verify(self.extra_lv_nodes,
2783
                                    {constants.NV_LVLIST: vg_name},
2784
                                    self.cfg.GetClusterName(),
2785
                                    self.cfg.GetClusterInfo().hvparams)
2786
    else:
2787
      extra_lv_nvinfo = {}
2788

    
2789
    all_drbd_map = self.cfg.ComputeDRBDMap()
2790

    
2791
    feedback_fn("* Gathering disk information (%s nodes)" %
2792
                len(self.my_node_uuids))
2793
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2794
                                     self.my_inst_info)
2795

    
2796
    feedback_fn("* Verifying configuration file consistency")
2797

    
2798
    # If not all nodes are being checked, we need to make sure the master node
2799
    # and a non-checked vm_capable node are in the list.
2800
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2801
    if absent_node_uuids:
2802
      vf_nvinfo = all_nvinfo.copy()
2803
      vf_node_info = list(self.my_node_info.values())
2804
      additional_node_uuids = []
2805
      if master_node_uuid not in self.my_node_info:
2806
        additional_node_uuids.append(master_node_uuid)
2807
        vf_node_info.append(self.all_node_info[master_node_uuid])
2808
      # Add the first vm_capable node we find which is not included,
2809
      # excluding the master node (which we already have)
2810
      for node_uuid in absent_node_uuids:
2811
        nodeinfo = self.all_node_info[node_uuid]
2812
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2813
            node_uuid != master_node_uuid):
2814
          additional_node_uuids.append(node_uuid)
2815
          vf_node_info.append(self.all_node_info[node_uuid])
2816
          break
2817
      key = constants.NV_FILELIST
2818
      vf_nvinfo.update(self.rpc.call_node_verify(
2819
         additional_node_uuids, {key: node_verify_param[key]},
2820
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2821
    else:
2822
      vf_nvinfo = all_nvinfo
2823
      vf_node_info = self.my_node_info.values()
2824

    
2825
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2826

    
2827
    feedback_fn("* Verifying node status")
2828

    
2829
    refos_img = None
2830

    
2831
    for node_i in node_data_list:
2832
      nimg = node_image[node_i.uuid]
2833

    
2834
      if node_i.offline:
2835
        if verbose:
2836
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
2837
        n_offline += 1
2838
        continue
2839

    
2840
      if node_i.uuid == master_node_uuid:
2841
        ntype = "master"
2842
      elif node_i.master_candidate:
2843
        ntype = "master candidate"
2844
      elif node_i.drained:
2845
        ntype = "drained"
2846
        n_drained += 1
2847
      else:
2848
        ntype = "regular"
2849
      if verbose:
2850
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2851

    
2852
      msg = all_nvinfo[node_i.uuid].fail_msg
2853
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2854
                    "while contacting node: %s", msg)
2855
      if msg:
2856
        nimg.rpc_fail = True
2857
        continue
2858

    
2859
      nresult = all_nvinfo[node_i.uuid].payload
2860

    
2861
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2862
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2863
      self._VerifyNodeNetwork(node_i, nresult)
2864
      self._VerifyNodeUserScripts(node_i, nresult)
2865
      self._VerifyOob(node_i, nresult)
2866
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
2867
                                           node_i.uuid == master_node_uuid)
2868
      self._VerifyStoragePaths(node_i, nresult)
2869

    
2870
      if nimg.vm_capable:
2871
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2872
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2873
                             all_drbd_map)
2874

    
2875
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2876
        self._UpdateNodeInstances(node_i, nresult, nimg)
2877
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2878
        self._UpdateNodeOS(node_i, nresult, nimg)
2879

    
2880
        if not nimg.os_fail:
2881
          if refos_img is None:
2882
            refos_img = nimg
2883
          self._VerifyNodeOS(node_i, nimg, refos_img)
2884
        self._VerifyNodeBridges(node_i, nresult, bridges)
2885

    
2886
        # Check whether all running instances are primary for the node. (This
2887
        # can no longer be done from _VerifyInstance below, since some of the
2888
        # wrong instances could be from other node groups.)
2889
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
2890

    
2891
        for inst_uuid in non_primary_inst_uuids:
2892
          test = inst_uuid in self.all_inst_info
2893
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
2894
                        self.cfg.GetInstanceName(inst_uuid),
2895
                        "instance should not run on node %s", node_i.name)
2896
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2897
                        "node is running unknown instance %s", inst_uuid)
2898

    
2899
    self._VerifyGroupDRBDVersion(all_nvinfo)
2900
    self._VerifyGroupLVM(node_image, vg_name)
2901

    
2902
    for node_uuid, result in extra_lv_nvinfo.items():
2903
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
2904
                              node_image[node_uuid], vg_name)
2905

    
2906
    feedback_fn("* Verifying instance status")
2907
    for inst_uuid in self.my_inst_uuids:
2908
      instance = self.my_inst_info[inst_uuid]
2909
      if verbose:
2910
        feedback_fn("* Verifying instance %s" % instance.name)
2911
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
2912

    
2913
      # If the instance is non-redundant we cannot survive losing its primary
2914
      # node, so we are not N+1 compliant.
2915
      if instance.disk_template not in constants.DTS_MIRRORED:
2916
        i_non_redundant.append(instance)
2917

    
2918
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
2919
        i_non_a_balanced.append(instance)
2920

    
2921
    feedback_fn("* Verifying orphan volumes")
2922
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2923

    
2924
    # We will get spurious "unknown volume" warnings if any node of this group
2925
    # is secondary for an instance whose primary is in another group. To avoid
2926
    # them, we find these instances and add their volumes to node_vol_should.
2927
    for instance in self.all_inst_info.values():
2928
      for secondary in instance.secondary_nodes:
2929
        if (secondary in self.my_node_info
2930
            and instance.name not in self.my_inst_info):
2931
          instance.MapLVsByNode(node_vol_should)
2932
          break
2933

    
2934
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2935

    
2936
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2937
      feedback_fn("* Verifying N+1 Memory redundancy")
2938
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2939

    
2940
    feedback_fn("* Other Notes")
2941
    if i_non_redundant:
2942
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2943
                  % len(i_non_redundant))
2944

    
2945
    if i_non_a_balanced:
2946
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2947
                  % len(i_non_a_balanced))
2948

    
2949
    if i_offline:
2950
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2951

    
2952
    if n_offline:
2953
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2954

    
2955
    if n_drained:
2956
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2957

    
2958
    return not self.bad
2959

    
2960
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2961
    """Analyze the post-hooks' result
2962

2963
    This method analyses the hook result, handles it, and sends some
2964
    nicely-formatted feedback back to the user.
2965

2966
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2967
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2968
    @param hooks_results: the results of the multi-node hooks rpc call
2969
    @param feedback_fn: function used send feedback back to the caller
2970
    @param lu_result: previous Exec result
2971
    @return: the new Exec result, based on the previous result
2972
        and hook results
2973

2974
    """
2975
    # We only really run POST phase hooks, only for non-empty groups,
2976
    # and are only interested in their results
2977
    if not self.my_node_uuids:
2978
      # empty node group
2979
      pass
2980
    elif phase == constants.HOOKS_PHASE_POST:
2981
      # Used to change hooks' output to proper indentation
2982
      feedback_fn("* Hooks Results")
2983
      assert hooks_results, "invalid result from hooks"
2984

    
2985
      for node_name in hooks_results:
2986
        res = hooks_results[node_name]
2987
        msg = res.fail_msg
2988
        test = msg and not res.offline
2989
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2990
                      "Communication failure in hooks execution: %s", msg)
2991
        if res.offline or msg:
2992
          # No need to investigate payload if node is offline or gave
2993
          # an error.
2994
          continue
2995
        for script, hkr, output in res.payload:
2996
          test = hkr == constants.HKR_FAIL
2997
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2998
                        "Script %s failed, output:", script)
2999
          if test:
3000
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3001
            feedback_fn("%s" % output)
3002
            lu_result = False
3003

    
3004
    return lu_result
3005

    
3006

    
3007
class LUClusterVerifyDisks(NoHooksLU):
3008
  """Verifies the cluster disks status.
3009

3010
  """
3011
  REQ_BGL = False
3012

    
3013
  def ExpandNames(self):
3014
    self.share_locks = ShareAll()
3015
    self.needed_locks = {
3016
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3017
      }
3018

    
3019
  def Exec(self, feedback_fn):
3020
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3021

    
3022
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3023
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3024
                           for group in group_names])