Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 43c54ced

History | View | Annotate | Download (111.5 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
170
                                                     master_params, ems)
171
    result.Warn("Error disabling the master IP address", self.LogWarning)
172
    return master_params.uuid
173

    
174

    
175
class LUClusterPostInit(LogicalUnit):
176
  """Logical unit for running hooks after cluster initialization.
177

178
  """
179
  HPATH = "cluster-init"
180
  HTYPE = constants.HTYPE_CLUSTER
181

    
182
  def BuildHooksEnv(self):
183
    """Build hooks env.
184

185
    """
186
    return {
187
      "OP_TARGET": self.cfg.GetClusterName(),
188
      }
189

    
190
  def BuildHooksNodes(self):
191
    """Build hooks nodes.
192

193
    """
194
    return ([], [self.cfg.GetMasterNode()])
195

    
196
  def Exec(self, feedback_fn):
197
    """Nothing to do.
198

199
    """
200
    return True
201

    
202

    
203
class ClusterQuery(QueryBase):
204
  FIELDS = query.CLUSTER_FIELDS
205

    
206
  #: Do not sort (there is only one item)
207
  SORT_FIELD = None
208

    
209
  def ExpandNames(self, lu):
210
    lu.needed_locks = {}
211

    
212
    # The following variables interact with _QueryBase._GetNames
213
    self.wanted = locking.ALL_SET
214
    self.do_locking = self.use_locking
215

    
216
    if self.do_locking:
217
      raise errors.OpPrereqError("Can not use locking for cluster queries",
218
                                 errors.ECODE_INVAL)
219

    
220
  def DeclareLocks(self, lu, level):
221
    pass
222

    
223
  def _GetQueryData(self, lu):
224
    """Computes the list of nodes and their attributes.
225

226
    """
227
    # Locking is not used
228
    assert not (compat.any(lu.glm.is_owned(level)
229
                           for level in locking.LEVELS
230
                           if level != locking.LEVEL_CLUSTER) or
231
                self.do_locking or self.use_locking)
232

    
233
    if query.CQ_CONFIG in self.requested_data:
234
      cluster = lu.cfg.GetClusterInfo()
235
      nodes = lu.cfg.GetAllNodesInfo()
236
    else:
237
      cluster = NotImplemented
238
      nodes = NotImplemented
239

    
240
    if query.CQ_QUEUE_DRAINED in self.requested_data:
241
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
242
    else:
243
      drain_flag = NotImplemented
244

    
245
    if query.CQ_WATCHER_PAUSE in self.requested_data:
246
      master_node_uuid = lu.cfg.GetMasterNode()
247

    
248
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
250
                   lu.cfg.GetMasterNodeName())
251

    
252
      watcher_pause = result.payload
253
    else:
254
      watcher_pause = NotImplemented
255

    
256
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
257

    
258

    
259
class LUClusterQuery(NoHooksLU):
260
  """Query cluster configuration.
261

262
  """
263
  REQ_BGL = False
264

    
265
  def ExpandNames(self):
266
    self.needed_locks = {}
267

    
268
  def Exec(self, feedback_fn):
269
    """Return cluster config.
270

271
    """
272
    cluster = self.cfg.GetClusterInfo()
273
    os_hvp = {}
274

    
275
    # Filter just for enabled hypervisors
276
    for os_name, hv_dict in cluster.os_hvp.items():
277
      os_hvp[os_name] = {}
278
      for hv_name, hv_params in hv_dict.items():
279
        if hv_name in cluster.enabled_hypervisors:
280
          os_hvp[os_name][hv_name] = hv_params
281

    
282
    # Convert ip_family to ip_version
283
    primary_ip_version = constants.IP4_VERSION
284
    if cluster.primary_ip_family == netutils.IP6Address.family:
285
      primary_ip_version = constants.IP6_VERSION
286

    
287
    result = {
288
      "software_version": constants.RELEASE_VERSION,
289
      "protocol_version": constants.PROTOCOL_VERSION,
290
      "config_version": constants.CONFIG_VERSION,
291
      "os_api_version": max(constants.OS_API_VERSIONS),
292
      "export_version": constants.EXPORT_VERSION,
293
      "architecture": runtime.GetArchInfo(),
294
      "name": cluster.cluster_name,
295
      "master": self.cfg.GetMasterNodeName(),
296
      "default_hypervisor": cluster.primary_hypervisor,
297
      "enabled_hypervisors": cluster.enabled_hypervisors,
298
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
299
                        for hypervisor_name in cluster.enabled_hypervisors]),
300
      "os_hvp": os_hvp,
301
      "beparams": cluster.beparams,
302
      "osparams": cluster.osparams,
303
      "ipolicy": cluster.ipolicy,
304
      "nicparams": cluster.nicparams,
305
      "ndparams": cluster.ndparams,
306
      "diskparams": cluster.diskparams,
307
      "candidate_pool_size": cluster.candidate_pool_size,
308
      "master_netdev": cluster.master_netdev,
309
      "master_netmask": cluster.master_netmask,
310
      "use_external_mip_script": cluster.use_external_mip_script,
311
      "volume_group_name": cluster.volume_group_name,
312
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
313
      "file_storage_dir": cluster.file_storage_dir,
314
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
315
      "maintain_node_health": cluster.maintain_node_health,
316
      "ctime": cluster.ctime,
317
      "mtime": cluster.mtime,
318
      "uuid": cluster.uuid,
319
      "tags": list(cluster.GetTags()),
320
      "uid_pool": cluster.uid_pool,
321
      "default_iallocator": cluster.default_iallocator,
322
      "reserved_lvs": cluster.reserved_lvs,
323
      "primary_ip_version": primary_ip_version,
324
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
325
      "hidden_os": cluster.hidden_os,
326
      "blacklisted_os": cluster.blacklisted_os,
327
      "enabled_disk_templates": cluster.enabled_disk_templates,
328
      }
329

    
330
    return result
331

    
332

    
333
class LUClusterRedistConf(NoHooksLU):
334
  """Force the redistribution of cluster configuration.
335

336
  This is a very simple LU.
337

338
  """
339
  REQ_BGL = False
340

    
341
  def ExpandNames(self):
342
    self.needed_locks = {
343
      locking.LEVEL_NODE: locking.ALL_SET,
344
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
345
    }
346
    self.share_locks = ShareAll()
347

    
348
  def Exec(self, feedback_fn):
349
    """Redistribute the configuration.
350

351
    """
352
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
353
    RedistributeAncillaryFiles(self)
354

    
355

    
356
class LUClusterRename(LogicalUnit):
357
  """Rename the cluster.
358

359
  """
360
  HPATH = "cluster-rename"
361
  HTYPE = constants.HTYPE_CLUSTER
362

    
363
  def BuildHooksEnv(self):
364
    """Build hooks env.
365

366
    """
367
    return {
368
      "OP_TARGET": self.cfg.GetClusterName(),
369
      "NEW_NAME": self.op.name,
370
      }
371

    
372
  def BuildHooksNodes(self):
373
    """Build hooks nodes.
374

375
    """
376
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
377

    
378
  def CheckPrereq(self):
379
    """Verify that the passed name is a valid one.
380

381
    """
382
    hostname = netutils.GetHostname(name=self.op.name,
383
                                    family=self.cfg.GetPrimaryIPFamily())
384

    
385
    new_name = hostname.name
386
    self.ip = new_ip = hostname.ip
387
    old_name = self.cfg.GetClusterName()
388
    old_ip = self.cfg.GetMasterIP()
389
    if new_name == old_name and new_ip == old_ip:
390
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
391
                                 " cluster has changed",
392
                                 errors.ECODE_INVAL)
393
    if new_ip != old_ip:
394
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
395
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
396
                                   " reachable on the network" %
397
                                   new_ip, errors.ECODE_NOTUNIQUE)
398

    
399
    self.op.name = new_name
400

    
401
  def Exec(self, feedback_fn):
402
    """Rename the cluster.
403

404
    """
405
    clustername = self.op.name
406
    new_ip = self.ip
407

    
408
    # shutdown the master IP
409
    master_params = self.cfg.GetMasterNetworkParameters()
410
    ems = self.cfg.GetUseExternalMipScript()
411
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
412
                                                     master_params, ems)
413
    result.Raise("Could not disable the master role")
414

    
415
    try:
416
      cluster = self.cfg.GetClusterInfo()
417
      cluster.cluster_name = clustername
418
      cluster.master_ip = new_ip
419
      self.cfg.Update(cluster, feedback_fn)
420

    
421
      # update the known hosts file
422
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
423
      node_list = self.cfg.GetOnlineNodeList()
424
      try:
425
        node_list.remove(master_params.uuid)
426
      except ValueError:
427
        pass
428
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
429
    finally:
430
      master_params.ip = new_ip
431
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
432
                                                     master_params, ems)
433
      result.Warn("Could not re-enable the master role on the master,"
434
                  " please restart manually", self.LogWarning)
435

    
436
    return clustername
437

    
438

    
439
class LUClusterRepairDiskSizes(NoHooksLU):
440
  """Verifies the cluster disks sizes.
441

442
  """
443
  REQ_BGL = False
444

    
445
  def ExpandNames(self):
446
    if self.op.instances:
447
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
448
      # Not getting the node allocation lock as only a specific set of
449
      # instances (and their nodes) is going to be acquired
450
      self.needed_locks = {
451
        locking.LEVEL_NODE_RES: [],
452
        locking.LEVEL_INSTANCE: self.wanted_names,
453
        }
454
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
455
    else:
456
      self.wanted_names = None
457
      self.needed_locks = {
458
        locking.LEVEL_NODE_RES: locking.ALL_SET,
459
        locking.LEVEL_INSTANCE: locking.ALL_SET,
460

    
461
        # This opcode is acquires the node locks for all instances
462
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
463
        }
464

    
465
    self.share_locks = {
466
      locking.LEVEL_NODE_RES: 1,
467
      locking.LEVEL_INSTANCE: 0,
468
      locking.LEVEL_NODE_ALLOC: 1,
469
      }
470

    
471
  def DeclareLocks(self, level):
472
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
473
      self._LockInstancesNodes(primary_only=True, level=level)
474

    
475
  def CheckPrereq(self):
476
    """Check prerequisites.
477

478
    This only checks the optional instance list against the existing names.
479

480
    """
481
    if self.wanted_names is None:
482
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
483

    
484
    self.wanted_instances = \
485
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
486

    
487
  def _EnsureChildSizes(self, disk):
488
    """Ensure children of the disk have the needed disk size.
489

490
    This is valid mainly for DRBD8 and fixes an issue where the
491
    children have smaller disk size.
492

493
    @param disk: an L{ganeti.objects.Disk} object
494

495
    """
496
    if disk.dev_type == constants.LD_DRBD8:
497
      assert disk.children, "Empty children for DRBD8?"
498
      fchild = disk.children[0]
499
      mismatch = fchild.size < disk.size
500
      if mismatch:
501
        self.LogInfo("Child disk has size %d, parent %d, fixing",
502
                     fchild.size, disk.size)
503
        fchild.size = disk.size
504

    
505
      # and we recurse on this child only, not on the metadev
506
      return self._EnsureChildSizes(fchild) or mismatch
507
    else:
508
      return False
509

    
510
  def Exec(self, feedback_fn):
511
    """Verify the size of cluster disks.
512

513
    """
514
    # TODO: check child disks too
515
    # TODO: check differences in size between primary/secondary nodes
516
    per_node_disks = {}
517
    for instance in self.wanted_instances:
518
      pnode = instance.primary_node
519
      if pnode not in per_node_disks:
520
        per_node_disks[pnode] = []
521
      for idx, disk in enumerate(instance.disks):
522
        per_node_disks[pnode].append((instance, idx, disk))
523

    
524
    assert not (frozenset(per_node_disks.keys()) -
525
                self.owned_locks(locking.LEVEL_NODE_RES)), \
526
      "Not owning correct locks"
527
    assert not self.owned_locks(locking.LEVEL_NODE)
528

    
529
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
530
                                               per_node_disks.keys())
531

    
532
    changed = []
533
    for node_uuid, dskl in per_node_disks.items():
534
      newl = [v[2].Copy() for v in dskl]
535
      for dsk in newl:
536
        self.cfg.SetDiskID(dsk, node_uuid)
537
      node_name = self.cfg.GetNodeName(node_uuid)
538
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
539
      if result.fail_msg:
540
        self.LogWarning("Failure in blockdev_getdimensions call to node"
541
                        " %s, ignoring", node_name)
542
        continue
543
      if len(result.payload) != len(dskl):
544
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
545
                        " result.payload=%s", node_name, len(dskl),
546
                        result.payload)
547
        self.LogWarning("Invalid result from node %s, ignoring node results",
548
                        node_name)
549
        continue
550
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
551
        if dimensions is None:
552
          self.LogWarning("Disk %d of instance %s did not return size"
553
                          " information, ignoring", idx, instance.name)
554
          continue
555
        if not isinstance(dimensions, (tuple, list)):
556
          self.LogWarning("Disk %d of instance %s did not return valid"
557
                          " dimension information, ignoring", idx,
558
                          instance.name)
559
          continue
560
        (size, spindles) = dimensions
561
        if not isinstance(size, (int, long)):
562
          self.LogWarning("Disk %d of instance %s did not return valid"
563
                          " size information, ignoring", idx, instance.name)
564
          continue
565
        size = size >> 20
566
        if size != disk.size:
567
          self.LogInfo("Disk %d of instance %s has mismatched size,"
568
                       " correcting: recorded %d, actual %d", idx,
569
                       instance.name, disk.size, size)
570
          disk.size = size
571
          self.cfg.Update(instance, feedback_fn)
572
          changed.append((instance.name, idx, "size", size))
573
        if es_flags[node_uuid]:
574
          if spindles is None:
575
            self.LogWarning("Disk %d of instance %s did not return valid"
576
                            " spindles information, ignoring", idx,
577
                            instance.name)
578
          elif disk.spindles is None or disk.spindles != spindles:
579
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
580
                         " correcting: recorded %s, actual %s",
581
                         idx, instance.name, disk.spindles, spindles)
582
            disk.spindles = spindles
583
            self.cfg.Update(instance, feedback_fn)
584
            changed.append((instance.name, idx, "spindles", disk.spindles))
585
        if self._EnsureChildSizes(disk):
586
          self.cfg.Update(instance, feedback_fn)
587
          changed.append((instance.name, idx, "size", disk.size))
588
    return changed
589

    
590

    
591
def _ValidateNetmask(cfg, netmask):
592
  """Checks if a netmask is valid.
593

594
  @type cfg: L{config.ConfigWriter}
595
  @param cfg: The cluster configuration
596
  @type netmask: int
597
  @param netmask: the netmask to be verified
598
  @raise errors.OpPrereqError: if the validation fails
599

600
  """
601
  ip_family = cfg.GetPrimaryIPFamily()
602
  try:
603
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
604
  except errors.ProgrammerError:
605
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
606
                               ip_family, errors.ECODE_INVAL)
607
  if not ipcls.ValidateNetmask(netmask):
608
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
609
                               (netmask), errors.ECODE_INVAL)
610

    
611

    
612
class LUClusterSetParams(LogicalUnit):
613
  """Change the parameters of the cluster.
614

615
  """
616
  HPATH = "cluster-modify"
617
  HTYPE = constants.HTYPE_CLUSTER
618
  REQ_BGL = False
619

    
620
  def CheckArguments(self):
621
    """Check parameters
622

623
    """
624
    if self.op.uid_pool:
625
      uidpool.CheckUidPool(self.op.uid_pool)
626

    
627
    if self.op.add_uids:
628
      uidpool.CheckUidPool(self.op.add_uids)
629

    
630
    if self.op.remove_uids:
631
      uidpool.CheckUidPool(self.op.remove_uids)
632

    
633
    if self.op.master_netmask is not None:
634
      _ValidateNetmask(self.cfg, self.op.master_netmask)
635

    
636
    if self.op.diskparams:
637
      for dt_params in self.op.diskparams.values():
638
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
639
      try:
640
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
641
      except errors.OpPrereqError, err:
642
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
643
                                   errors.ECODE_INVAL)
644

    
645
  def ExpandNames(self):
646
    # FIXME: in the future maybe other cluster params won't require checking on
647
    # all nodes to be modified.
648
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
649
    # resource locks the right thing, shouldn't it be the BGL instead?
650
    self.needed_locks = {
651
      locking.LEVEL_NODE: locking.ALL_SET,
652
      locking.LEVEL_INSTANCE: locking.ALL_SET,
653
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
654
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
655
    }
656
    self.share_locks = ShareAll()
657

    
658
  def BuildHooksEnv(self):
659
    """Build hooks env.
660

661
    """
662
    return {
663
      "OP_TARGET": self.cfg.GetClusterName(),
664
      "NEW_VG_NAME": self.op.vg_name,
665
      }
666

    
667
  def BuildHooksNodes(self):
668
    """Build hooks nodes.
669

670
    """
671
    mn = self.cfg.GetMasterNode()
672
    return ([mn], [mn])
673

    
674
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
675
                   new_enabled_disk_templates):
676
    """Check the consistency of the vg name on all nodes and in case it gets
677
       unset whether there are instances still using it.
678

679
    """
680
    if self.op.vg_name is not None and not self.op.vg_name:
681
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
682
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
683
                                   " instances exist", errors.ECODE_INVAL)
684

    
685
    if (self.op.vg_name is not None and
686
        utils.IsLvmEnabled(enabled_disk_templates)) or \
687
           (self.cfg.GetVGName() is not None and
688
            utils.LvmGetsEnabled(enabled_disk_templates,
689
                                 new_enabled_disk_templates)):
690
      self._CheckVgNameOnNodes(node_uuids)
691

    
692
  def _CheckVgNameOnNodes(self, node_uuids):
693
    """Check the status of the volume group on each node.
694

695
    """
696
    vglist = self.rpc.call_vg_list(node_uuids)
697
    for node_uuid in node_uuids:
698
      msg = vglist[node_uuid].fail_msg
699
      if msg:
700
        # ignoring down node
701
        self.LogWarning("Error while gathering data on node %s"
702
                        " (ignoring node): %s",
703
                        self.cfg.GetNodeName(node_uuid), msg)
704
        continue
705
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
706
                                            self.op.vg_name,
707
                                            constants.MIN_VG_SIZE)
708
      if vgstatus:
709
        raise errors.OpPrereqError("Error on node '%s': %s" %
710
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
711
                                   errors.ECODE_ENVIRON)
712

    
713
  def _GetEnabledDiskTemplates(self, cluster):
714
    """Determines the enabled disk templates and the subset of disk templates
715
       that are newly enabled by this operation.
716

717
    """
718
    enabled_disk_templates = None
719
    new_enabled_disk_templates = []
720
    if self.op.enabled_disk_templates:
721
      enabled_disk_templates = self.op.enabled_disk_templates
722
      new_enabled_disk_templates = \
723
        list(set(enabled_disk_templates)
724
             - set(cluster.enabled_disk_templates))
725
    else:
726
      enabled_disk_templates = cluster.enabled_disk_templates
727
    return (enabled_disk_templates, new_enabled_disk_templates)
728

    
729
  def CheckPrereq(self):
730
    """Check prerequisites.
731

732
    This checks whether the given params don't conflict and
733
    if the given volume group is valid.
734

735
    """
736
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
737
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
738
        raise errors.OpPrereqError("Cannot disable drbd helper while"
739
                                   " drbd-based instances exist",
740
                                   errors.ECODE_INVAL)
741

    
742
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
743
    self.cluster = cluster = self.cfg.GetClusterInfo()
744

    
745
    vm_capable_node_uuids = [node.uuid
746
                             for node in self.cfg.GetAllNodesInfo().values()
747
                             if node.uuid in node_uuids and node.vm_capable]
748

    
749
    (enabled_disk_templates, new_enabled_disk_templates) = \
750
      self._GetEnabledDiskTemplates(cluster)
751

    
752
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
753
                      new_enabled_disk_templates)
754

    
755
    if self.op.drbd_helper:
756
      # checks given drbd helper on all nodes
757
      helpers = self.rpc.call_drbd_helper(node_uuids)
758
      for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
759
        if ninfo.offline:
760
          self.LogInfo("Not checking drbd helper on offline node %s",
761
                       ninfo.name)
762
          continue
763
        msg = helpers[ninfo.uuid].fail_msg
764
        if msg:
765
          raise errors.OpPrereqError("Error checking drbd helper on node"
766
                                     " '%s': %s" % (ninfo.name, msg),
767
                                     errors.ECODE_ENVIRON)
768
        node_helper = helpers[ninfo.uuid].payload
769
        if node_helper != self.op.drbd_helper:
770
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
771
                                     (ninfo.name, node_helper),
772
                                     errors.ECODE_ENVIRON)
773

    
774
    # validate params changes
775
    if self.op.beparams:
776
      objects.UpgradeBeParams(self.op.beparams)
777
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
778
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
779

    
780
    if self.op.ndparams:
781
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
782
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
783

    
784
      # TODO: we need a more general way to handle resetting
785
      # cluster-level parameters to default values
786
      if self.new_ndparams["oob_program"] == "":
787
        self.new_ndparams["oob_program"] = \
788
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
789

    
790
    if self.op.hv_state:
791
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
792
                                           self.cluster.hv_state_static)
793
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
794
                               for hv, values in new_hv_state.items())
795

    
796
    if self.op.disk_state:
797
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
798
                                               self.cluster.disk_state_static)
799
      self.new_disk_state = \
800
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
801
                            for name, values in svalues.items()))
802
             for storage, svalues in new_disk_state.items())
803

    
804
    if self.op.ipolicy:
805
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
806
                                           group_policy=False)
807

    
808
      all_instances = self.cfg.GetAllInstancesInfo().values()
809
      violations = set()
810
      for group in self.cfg.GetAllNodeGroupsInfo().values():
811
        instances = frozenset([inst for inst in all_instances
812
                               if compat.any(nuuid in group.members
813
                                             for nuuid in inst.all_nodes)])
814
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
815
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
816
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
817
                                           self.cfg)
818
        if new:
819
          violations.update(new)
820

    
821
      if violations:
822
        self.LogWarning("After the ipolicy change the following instances"
823
                        " violate them: %s",
824
                        utils.CommaJoin(utils.NiceSort(violations)))
825

    
826
    if self.op.nicparams:
827
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
828
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
829
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
830
      nic_errors = []
831

    
832
      # check all instances for consistency
833
      for instance in self.cfg.GetAllInstancesInfo().values():
834
        for nic_idx, nic in enumerate(instance.nics):
835
          params_copy = copy.deepcopy(nic.nicparams)
836
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
837

    
838
          # check parameter syntax
839
          try:
840
            objects.NIC.CheckParameterSyntax(params_filled)
841
          except errors.ConfigurationError, err:
842
            nic_errors.append("Instance %s, nic/%d: %s" %
843
                              (instance.name, nic_idx, err))
844

    
845
          # if we're moving instances to routed, check that they have an ip
846
          target_mode = params_filled[constants.NIC_MODE]
847
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
848
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
849
                              " address" % (instance.name, nic_idx))
850
      if nic_errors:
851
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
852
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
853

    
854
    # hypervisor list/parameters
855
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
856
    if self.op.hvparams:
857
      for hv_name, hv_dict in self.op.hvparams.items():
858
        if hv_name not in self.new_hvparams:
859
          self.new_hvparams[hv_name] = hv_dict
860
        else:
861
          self.new_hvparams[hv_name].update(hv_dict)
862

    
863
    # disk template parameters
864
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
865
    if self.op.diskparams:
866
      for dt_name, dt_params in self.op.diskparams.items():
867
        if dt_name not in self.op.diskparams:
868
          self.new_diskparams[dt_name] = dt_params
869
        else:
870
          self.new_diskparams[dt_name].update(dt_params)
871

    
872
    # os hypervisor parameters
873
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
874
    if self.op.os_hvp:
875
      for os_name, hvs in self.op.os_hvp.items():
876
        if os_name not in self.new_os_hvp:
877
          self.new_os_hvp[os_name] = hvs
878
        else:
879
          for hv_name, hv_dict in hvs.items():
880
            if hv_dict is None:
881
              # Delete if it exists
882
              self.new_os_hvp[os_name].pop(hv_name, None)
883
            elif hv_name not in self.new_os_hvp[os_name]:
884
              self.new_os_hvp[os_name][hv_name] = hv_dict
885
            else:
886
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
887

    
888
    # os parameters
889
    self.new_osp = objects.FillDict(cluster.osparams, {})
890
    if self.op.osparams:
891
      for os_name, osp in self.op.osparams.items():
892
        if os_name not in self.new_osp:
893
          self.new_osp[os_name] = {}
894

    
895
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
896
                                                 use_none=True)
897

    
898
        if not self.new_osp[os_name]:
899
          # we removed all parameters
900
          del self.new_osp[os_name]
901
        else:
902
          # check the parameter validity (remote check)
903
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
904
                        os_name, self.new_osp[os_name])
905

    
906
    # changes to the hypervisor list
907
    if self.op.enabled_hypervisors is not None:
908
      self.hv_list = self.op.enabled_hypervisors
909
      for hv in self.hv_list:
910
        # if the hypervisor doesn't already exist in the cluster
911
        # hvparams, we initialize it to empty, and then (in both
912
        # cases) we make sure to fill the defaults, as we might not
913
        # have a complete defaults list if the hypervisor wasn't
914
        # enabled before
915
        if hv not in new_hvp:
916
          new_hvp[hv] = {}
917
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
918
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
919
    else:
920
      self.hv_list = cluster.enabled_hypervisors
921

    
922
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
923
      # either the enabled list has changed, or the parameters have, validate
924
      for hv_name, hv_params in self.new_hvparams.items():
925
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
926
            (self.op.enabled_hypervisors and
927
             hv_name in self.op.enabled_hypervisors)):
928
          # either this is a new hypervisor, or its parameters have changed
929
          hv_class = hypervisor.GetHypervisorClass(hv_name)
930
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
931
          hv_class.CheckParameterSyntax(hv_params)
932
          CheckHVParams(self, node_uuids, hv_name, hv_params)
933

    
934
    self._CheckDiskTemplateConsistency()
935

    
936
    if self.op.os_hvp:
937
      # no need to check any newly-enabled hypervisors, since the
938
      # defaults have already been checked in the above code-block
939
      for os_name, os_hvp in self.new_os_hvp.items():
940
        for hv_name, hv_params in os_hvp.items():
941
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
942
          # we need to fill in the new os_hvp on top of the actual hv_p
943
          cluster_defaults = self.new_hvparams.get(hv_name, {})
944
          new_osp = objects.FillDict(cluster_defaults, hv_params)
945
          hv_class = hypervisor.GetHypervisorClass(hv_name)
946
          hv_class.CheckParameterSyntax(new_osp)
947
          CheckHVParams(self, node_uuids, hv_name, new_osp)
948

    
949
    if self.op.default_iallocator:
950
      alloc_script = utils.FindFile(self.op.default_iallocator,
951
                                    constants.IALLOCATOR_SEARCH_PATH,
952
                                    os.path.isfile)
953
      if alloc_script is None:
954
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
955
                                   " specified" % self.op.default_iallocator,
956
                                   errors.ECODE_INVAL)
957

    
958
  def _CheckDiskTemplateConsistency(self):
959
    """Check whether the disk templates that are going to be disabled
960
       are still in use by some instances.
961

962
    """
963
    if self.op.enabled_disk_templates:
964
      cluster = self.cfg.GetClusterInfo()
965
      instances = self.cfg.GetAllInstancesInfo()
966

    
967
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
968
        - set(self.op.enabled_disk_templates)
969
      for instance in instances.itervalues():
970
        if instance.disk_template in disk_templates_to_remove:
971
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
972
                                     " because instance '%s' is using it." %
973
                                     (instance.disk_template, instance.name))
974

    
975
  def _SetVgName(self, feedback_fn):
976
    """Determines and sets the new volume group name.
977

978
    """
979
    if self.op.vg_name is not None:
980
      if self.op.vg_name and not \
981
           utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
982
        feedback_fn("Note that you specified a volume group, but did not"
983
                    " enable any lvm disk template.")
984
      new_volume = self.op.vg_name
985
      if not new_volume:
986
        if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
987
          raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
988
                                     " disk templates are enabled.")
989
        new_volume = None
990
      if new_volume != self.cfg.GetVGName():
991
        self.cfg.SetVGName(new_volume)
992
      else:
993
        feedback_fn("Cluster LVM configuration already in desired"
994
                    " state, not changing")
995
    else:
996
      if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
997
          not self.cfg.GetVGName():
998
        raise errors.OpPrereqError("Please specify a volume group when"
999
                                   " enabling lvm-based disk-templates.")
1000

    
1001
  def Exec(self, feedback_fn):
1002
    """Change the parameters of the cluster.
1003

1004
    """
1005
    if self.op.enabled_disk_templates:
1006
      self.cluster.enabled_disk_templates = \
1007
        list(set(self.op.enabled_disk_templates))
1008

    
1009
    self._SetVgName(feedback_fn)
1010

    
1011
    if self.op.drbd_helper is not None:
1012
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1013
        feedback_fn("Note that you specified a drbd user helper, but did"
1014
                    " enabled the drbd disk template.")
1015
      new_helper = self.op.drbd_helper
1016
      if not new_helper:
1017
        new_helper = None
1018
      if new_helper != self.cfg.GetDRBDHelper():
1019
        self.cfg.SetDRBDHelper(new_helper)
1020
      else:
1021
        feedback_fn("Cluster DRBD helper already in desired state,"
1022
                    " not changing")
1023
    if self.op.hvparams:
1024
      self.cluster.hvparams = self.new_hvparams
1025
    if self.op.os_hvp:
1026
      self.cluster.os_hvp = self.new_os_hvp
1027
    if self.op.enabled_hypervisors is not None:
1028
      self.cluster.hvparams = self.new_hvparams
1029
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1030
    if self.op.beparams:
1031
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1032
    if self.op.nicparams:
1033
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1034
    if self.op.ipolicy:
1035
      self.cluster.ipolicy = self.new_ipolicy
1036
    if self.op.osparams:
1037
      self.cluster.osparams = self.new_osp
1038
    if self.op.ndparams:
1039
      self.cluster.ndparams = self.new_ndparams
1040
    if self.op.diskparams:
1041
      self.cluster.diskparams = self.new_diskparams
1042
    if self.op.hv_state:
1043
      self.cluster.hv_state_static = self.new_hv_state
1044
    if self.op.disk_state:
1045
      self.cluster.disk_state_static = self.new_disk_state
1046

    
1047
    if self.op.candidate_pool_size is not None:
1048
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1049
      # we need to update the pool size here, otherwise the save will fail
1050
      AdjustCandidatePool(self, [])
1051

    
1052
    if self.op.maintain_node_health is not None:
1053
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1054
        feedback_fn("Note: CONFD was disabled at build time, node health"
1055
                    " maintenance is not useful (still enabling it)")
1056
      self.cluster.maintain_node_health = self.op.maintain_node_health
1057

    
1058
    if self.op.prealloc_wipe_disks is not None:
1059
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1060

    
1061
    if self.op.add_uids is not None:
1062
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1063

    
1064
    if self.op.remove_uids is not None:
1065
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1066

    
1067
    if self.op.uid_pool is not None:
1068
      self.cluster.uid_pool = self.op.uid_pool
1069

    
1070
    if self.op.default_iallocator is not None:
1071
      self.cluster.default_iallocator = self.op.default_iallocator
1072

    
1073
    if self.op.reserved_lvs is not None:
1074
      self.cluster.reserved_lvs = self.op.reserved_lvs
1075

    
1076
    if self.op.use_external_mip_script is not None:
1077
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1078

    
1079
    def helper_os(aname, mods, desc):
1080
      desc += " OS list"
1081
      lst = getattr(self.cluster, aname)
1082
      for key, val in mods:
1083
        if key == constants.DDM_ADD:
1084
          if val in lst:
1085
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1086
          else:
1087
            lst.append(val)
1088
        elif key == constants.DDM_REMOVE:
1089
          if val in lst:
1090
            lst.remove(val)
1091
          else:
1092
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1093
        else:
1094
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1095

    
1096
    if self.op.hidden_os:
1097
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1098

    
1099
    if self.op.blacklisted_os:
1100
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1101

    
1102
    if self.op.master_netdev:
1103
      master_params = self.cfg.GetMasterNetworkParameters()
1104
      ems = self.cfg.GetUseExternalMipScript()
1105
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1106
                  self.cluster.master_netdev)
1107
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1108
                                                       master_params, ems)
1109
      if not self.op.force:
1110
        result.Raise("Could not disable the master ip")
1111
      else:
1112
        if result.fail_msg:
1113
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1114
                 result.fail_msg)
1115
          feedback_fn(msg)
1116
      feedback_fn("Changing master_netdev from %s to %s" %
1117
                  (master_params.netdev, self.op.master_netdev))
1118
      self.cluster.master_netdev = self.op.master_netdev
1119

    
1120
    if self.op.master_netmask:
1121
      master_params = self.cfg.GetMasterNetworkParameters()
1122
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1123
      result = self.rpc.call_node_change_master_netmask(
1124
                 master_params.uuid, master_params.netmask,
1125
                 self.op.master_netmask, master_params.ip,
1126
                 master_params.netdev)
1127
      result.Warn("Could not change the master IP netmask", feedback_fn)
1128
      self.cluster.master_netmask = self.op.master_netmask
1129

    
1130
    self.cfg.Update(self.cluster, feedback_fn)
1131

    
1132
    if self.op.master_netdev:
1133
      master_params = self.cfg.GetMasterNetworkParameters()
1134
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1135
                  self.op.master_netdev)
1136
      ems = self.cfg.GetUseExternalMipScript()
1137
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1138
                                                     master_params, ems)
1139
      result.Warn("Could not re-enable the master ip on the master,"
1140
                  " please restart manually", self.LogWarning)
1141

    
1142

    
1143
class LUClusterVerify(NoHooksLU):
1144
  """Submits all jobs necessary to verify the cluster.
1145

1146
  """
1147
  REQ_BGL = False
1148

    
1149
  def ExpandNames(self):
1150
    self.needed_locks = {}
1151

    
1152
  def Exec(self, feedback_fn):
1153
    jobs = []
1154

    
1155
    if self.op.group_name:
1156
      groups = [self.op.group_name]
1157
      depends_fn = lambda: None
1158
    else:
1159
      groups = self.cfg.GetNodeGroupList()
1160

    
1161
      # Verify global configuration
1162
      jobs.append([
1163
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1164
        ])
1165

    
1166
      # Always depend on global verification
1167
      depends_fn = lambda: [(-len(jobs), [])]
1168

    
1169
    jobs.extend(
1170
      [opcodes.OpClusterVerifyGroup(group_name=group,
1171
                                    ignore_errors=self.op.ignore_errors,
1172
                                    depends=depends_fn())]
1173
      for group in groups)
1174

    
1175
    # Fix up all parameters
1176
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1177
      op.debug_simulate_errors = self.op.debug_simulate_errors
1178
      op.verbose = self.op.verbose
1179
      op.error_codes = self.op.error_codes
1180
      try:
1181
        op.skip_checks = self.op.skip_checks
1182
      except AttributeError:
1183
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1184

    
1185
    return ResultWithJobs(jobs)
1186

    
1187

    
1188
class _VerifyErrors(object):
1189
  """Mix-in for cluster/group verify LUs.
1190

1191
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1192
  self.op and self._feedback_fn to be available.)
1193

1194
  """
1195

    
1196
  ETYPE_FIELD = "code"
1197
  ETYPE_ERROR = "ERROR"
1198
  ETYPE_WARNING = "WARNING"
1199

    
1200
  def _Error(self, ecode, item, msg, *args, **kwargs):
1201
    """Format an error message.
1202

1203
    Based on the opcode's error_codes parameter, either format a
1204
    parseable error code, or a simpler error string.
1205

1206
    This must be called only from Exec and functions called from Exec.
1207

1208
    """
1209
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1210
    itype, etxt, _ = ecode
1211
    # If the error code is in the list of ignored errors, demote the error to a
1212
    # warning
1213
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1214
      ltype = self.ETYPE_WARNING
1215
    # first complete the msg
1216
    if args:
1217
      msg = msg % args
1218
    # then format the whole message
1219
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1220
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1221
    else:
1222
      if item:
1223
        item = " " + item
1224
      else:
1225
        item = ""
1226
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1227
    # and finally report it via the feedback_fn
1228
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1229
    # do not mark the operation as failed for WARN cases only
1230
    if ltype == self.ETYPE_ERROR:
1231
      self.bad = True
1232

    
1233
  def _ErrorIf(self, cond, *args, **kwargs):
1234
    """Log an error message if the passed condition is True.
1235

1236
    """
1237
    if (bool(cond)
1238
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1239
      self._Error(*args, **kwargs)
1240

    
1241

    
1242
def _VerifyCertificate(filename):
1243
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1244

1245
  @type filename: string
1246
  @param filename: Path to PEM file
1247

1248
  """
1249
  try:
1250
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1251
                                           utils.ReadFile(filename))
1252
  except Exception, err: # pylint: disable=W0703
1253
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1254
            "Failed to load X509 certificate %s: %s" % (filename, err))
1255

    
1256
  (errcode, msg) = \
1257
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1258
                                constants.SSL_CERT_EXPIRATION_ERROR)
1259

    
1260
  if msg:
1261
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1262
  else:
1263
    fnamemsg = None
1264

    
1265
  if errcode is None:
1266
    return (None, fnamemsg)
1267
  elif errcode == utils.CERT_WARNING:
1268
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1269
  elif errcode == utils.CERT_ERROR:
1270
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1271

    
1272
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1273

    
1274

    
1275
def _GetAllHypervisorParameters(cluster, instances):
1276
  """Compute the set of all hypervisor parameters.
1277

1278
  @type cluster: L{objects.Cluster}
1279
  @param cluster: the cluster object
1280
  @param instances: list of L{objects.Instance}
1281
  @param instances: additional instances from which to obtain parameters
1282
  @rtype: list of (origin, hypervisor, parameters)
1283
  @return: a list with all parameters found, indicating the hypervisor they
1284
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1285

1286
  """
1287
  hvp_data = []
1288

    
1289
  for hv_name in cluster.enabled_hypervisors:
1290
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1291

    
1292
  for os_name, os_hvp in cluster.os_hvp.items():
1293
    for hv_name, hv_params in os_hvp.items():
1294
      if hv_params:
1295
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1296
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1297

    
1298
  # TODO: collapse identical parameter values in a single one
1299
  for instance in instances:
1300
    if instance.hvparams:
1301
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1302
                       cluster.FillHV(instance)))
1303

    
1304
  return hvp_data
1305

    
1306

    
1307
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1308
  """Verifies the cluster config.
1309

1310
  """
1311
  REQ_BGL = False
1312

    
1313
  def _VerifyHVP(self, hvp_data):
1314
    """Verifies locally the syntax of the hypervisor parameters.
1315

1316
    """
1317
    for item, hv_name, hv_params in hvp_data:
1318
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1319
             (item, hv_name))
1320
      try:
1321
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1322
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1323
        hv_class.CheckParameterSyntax(hv_params)
1324
      except errors.GenericError, err:
1325
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1326

    
1327
  def ExpandNames(self):
1328
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1329
    self.share_locks = ShareAll()
1330

    
1331
  def CheckPrereq(self):
1332
    """Check prerequisites.
1333

1334
    """
1335
    # Retrieve all information
1336
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1337
    self.all_node_info = self.cfg.GetAllNodesInfo()
1338
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1339

    
1340
  def Exec(self, feedback_fn):
1341
    """Verify integrity of cluster, performing various test on nodes.
1342

1343
    """
1344
    self.bad = False
1345
    self._feedback_fn = feedback_fn
1346

    
1347
    feedback_fn("* Verifying cluster config")
1348

    
1349
    for msg in self.cfg.VerifyConfig():
1350
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1351

    
1352
    feedback_fn("* Verifying cluster certificate files")
1353

    
1354
    for cert_filename in pathutils.ALL_CERT_FILES:
1355
      (errcode, msg) = _VerifyCertificate(cert_filename)
1356
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1357

    
1358
    feedback_fn("* Verifying hypervisor parameters")
1359

    
1360
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1361
                                                self.all_inst_info.values()))
1362

    
1363
    feedback_fn("* Verifying all nodes belong to an existing group")
1364

    
1365
    # We do this verification here because, should this bogus circumstance
1366
    # occur, it would never be caught by VerifyGroup, which only acts on
1367
    # nodes/instances reachable from existing node groups.
1368

    
1369
    dangling_nodes = set(node for node in self.all_node_info.values()
1370
                         if node.group not in self.all_group_info)
1371

    
1372
    dangling_instances = {}
1373
    no_node_instances = []
1374

    
1375
    for inst in self.all_inst_info.values():
1376
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1377
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1378
      elif inst.primary_node not in self.all_node_info:
1379
        no_node_instances.append(inst)
1380

    
1381
    pretty_dangling = [
1382
        "%s (%s)" %
1383
        (node.name,
1384
         utils.CommaJoin(
1385
           self.cfg.GetInstanceNames(
1386
             dangling_instances.get(node.uuid, ["no instances"]))))
1387
        for node in dangling_nodes]
1388

    
1389
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1390
                  None,
1391
                  "the following nodes (and their instances) belong to a non"
1392
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1393

    
1394
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1395
                  None,
1396
                  "the following instances have a non-existing primary-node:"
1397
                  " %s", utils.CommaJoin(
1398
                           self.cfg.GetInstanceNames(no_node_instances)))
1399

    
1400
    return not self.bad
1401

    
1402

    
1403
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1404
  """Verifies the status of a node group.
1405

1406
  """
1407
  HPATH = "cluster-verify"
1408
  HTYPE = constants.HTYPE_CLUSTER
1409
  REQ_BGL = False
1410

    
1411
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1412

    
1413
  class NodeImage(object):
1414
    """A class representing the logical and physical status of a node.
1415

1416
    @type uuid: string
1417
    @ivar uuid: the node UUID to which this object refers
1418
    @ivar volumes: a structure as returned from
1419
        L{ganeti.backend.GetVolumeList} (runtime)
1420
    @ivar instances: a list of running instances (runtime)
1421
    @ivar pinst: list of configured primary instances (config)
1422
    @ivar sinst: list of configured secondary instances (config)
1423
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1424
        instances for which this node is secondary (config)
1425
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1426
    @ivar dfree: free disk, as reported by the node (runtime)
1427
    @ivar offline: the offline status (config)
1428
    @type rpc_fail: boolean
1429
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1430
        not whether the individual keys were correct) (runtime)
1431
    @type lvm_fail: boolean
1432
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1433
    @type hyp_fail: boolean
1434
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1435
    @type ghost: boolean
1436
    @ivar ghost: whether this is a known node or not (config)
1437
    @type os_fail: boolean
1438
    @ivar os_fail: whether the RPC call didn't return valid OS data
1439
    @type oslist: list
1440
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1441
    @type vm_capable: boolean
1442
    @ivar vm_capable: whether the node can host instances
1443
    @type pv_min: float
1444
    @ivar pv_min: size in MiB of the smallest PVs
1445
    @type pv_max: float
1446
    @ivar pv_max: size in MiB of the biggest PVs
1447

1448
    """
1449
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1450
      self.uuid = uuid
1451
      self.volumes = {}
1452
      self.instances = []
1453
      self.pinst = []
1454
      self.sinst = []
1455
      self.sbp = {}
1456
      self.mfree = 0
1457
      self.dfree = 0
1458
      self.offline = offline
1459
      self.vm_capable = vm_capable
1460
      self.rpc_fail = False
1461
      self.lvm_fail = False
1462
      self.hyp_fail = False
1463
      self.ghost = False
1464
      self.os_fail = False
1465
      self.oslist = {}
1466
      self.pv_min = None
1467
      self.pv_max = None
1468

    
1469
  def ExpandNames(self):
1470
    # This raises errors.OpPrereqError on its own:
1471
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1472

    
1473
    # Get instances in node group; this is unsafe and needs verification later
1474
    inst_uuids = \
1475
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1476

    
1477
    self.needed_locks = {
1478
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1479
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1480
      locking.LEVEL_NODE: [],
1481

    
1482
      # This opcode is run by watcher every five minutes and acquires all nodes
1483
      # for a group. It doesn't run for a long time, so it's better to acquire
1484
      # the node allocation lock as well.
1485
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1486
      }
1487

    
1488
    self.share_locks = ShareAll()
1489

    
1490
  def DeclareLocks(self, level):
1491
    if level == locking.LEVEL_NODE:
1492
      # Get members of node group; this is unsafe and needs verification later
1493
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1494

    
1495
      # In Exec(), we warn about mirrored instances that have primary and
1496
      # secondary living in separate node groups. To fully verify that
1497
      # volumes for these instances are healthy, we will need to do an
1498
      # extra call to their secondaries. We ensure here those nodes will
1499
      # be locked.
1500
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1501
        # Important: access only the instances whose lock is owned
1502
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1503
        if instance.disk_template in constants.DTS_INT_MIRROR:
1504
          nodes.update(instance.secondary_nodes)
1505

    
1506
      self.needed_locks[locking.LEVEL_NODE] = nodes
1507

    
1508
  def CheckPrereq(self):
1509
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1510
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1511

    
1512
    group_node_uuids = set(self.group_info.members)
1513
    group_inst_uuids = \
1514
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1515

    
1516
    unlocked_node_uuids = \
1517
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1518

    
1519
    unlocked_inst_uuids = \
1520
        group_inst_uuids.difference(
1521
          [self.cfg.GetInstanceInfoByName(name).uuid
1522
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1523

    
1524
    if unlocked_node_uuids:
1525
      raise errors.OpPrereqError(
1526
        "Missing lock for nodes: %s" %
1527
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1528
        errors.ECODE_STATE)
1529

    
1530
    if unlocked_inst_uuids:
1531
      raise errors.OpPrereqError(
1532
        "Missing lock for instances: %s" %
1533
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1534
        errors.ECODE_STATE)
1535

    
1536
    self.all_node_info = self.cfg.GetAllNodesInfo()
1537
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1538

    
1539
    self.my_node_uuids = group_node_uuids
1540
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1541
                             for node_uuid in group_node_uuids)
1542

    
1543
    self.my_inst_uuids = group_inst_uuids
1544
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1545
                             for inst_uuid in group_inst_uuids)
1546

    
1547
    # We detect here the nodes that will need the extra RPC calls for verifying
1548
    # split LV volumes; they should be locked.
1549
    extra_lv_nodes = set()
1550

    
1551
    for inst in self.my_inst_info.values():
1552
      if inst.disk_template in constants.DTS_INT_MIRROR:
1553
        for nuuid in inst.all_nodes:
1554
          if self.all_node_info[nuuid].group != self.group_uuid:
1555
            extra_lv_nodes.add(nuuid)
1556

    
1557
    unlocked_lv_nodes = \
1558
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1559

    
1560
    if unlocked_lv_nodes:
1561
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1562
                                 utils.CommaJoin(unlocked_lv_nodes),
1563
                                 errors.ECODE_STATE)
1564
    self.extra_lv_nodes = list(extra_lv_nodes)
1565

    
1566
  def _VerifyNode(self, ninfo, nresult):
1567
    """Perform some basic validation on data returned from a node.
1568

1569
      - check the result data structure is well formed and has all the
1570
        mandatory fields
1571
      - check ganeti version
1572

1573
    @type ninfo: L{objects.Node}
1574
    @param ninfo: the node to check
1575
    @param nresult: the results from the node
1576
    @rtype: boolean
1577
    @return: whether overall this call was successful (and we can expect
1578
         reasonable values in the respose)
1579

1580
    """
1581
    # main result, nresult should be a non-empty dict
1582
    test = not nresult or not isinstance(nresult, dict)
1583
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1584
                  "unable to verify node: no data returned")
1585
    if test:
1586
      return False
1587

    
1588
    # compares ganeti version
1589
    local_version = constants.PROTOCOL_VERSION
1590
    remote_version = nresult.get("version", None)
1591
    test = not (remote_version and
1592
                isinstance(remote_version, (list, tuple)) and
1593
                len(remote_version) == 2)
1594
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1595
                  "connection to node returned invalid data")
1596
    if test:
1597
      return False
1598

    
1599
    test = local_version != remote_version[0]
1600
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1601
                  "incompatible protocol versions: master %s,"
1602
                  " node %s", local_version, remote_version[0])
1603
    if test:
1604
      return False
1605

    
1606
    # node seems compatible, we can actually try to look into its results
1607

    
1608
    # full package version
1609
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1610
                  constants.CV_ENODEVERSION, ninfo.name,
1611
                  "software version mismatch: master %s, node %s",
1612
                  constants.RELEASE_VERSION, remote_version[1],
1613
                  code=self.ETYPE_WARNING)
1614

    
1615
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1616
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1617
      for hv_name, hv_result in hyp_result.iteritems():
1618
        test = hv_result is not None
1619
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1620
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1621

    
1622
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1623
    if ninfo.vm_capable and isinstance(hvp_result, list):
1624
      for item, hv_name, hv_result in hvp_result:
1625
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1626
                      "hypervisor %s parameter verify failure (source %s): %s",
1627
                      hv_name, item, hv_result)
1628

    
1629
    test = nresult.get(constants.NV_NODESETUP,
1630
                       ["Missing NODESETUP results"])
1631
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1632
                  "node setup error: %s", "; ".join(test))
1633

    
1634
    return True
1635

    
1636
  def _VerifyNodeTime(self, ninfo, nresult,
1637
                      nvinfo_starttime, nvinfo_endtime):
1638
    """Check the node time.
1639

1640
    @type ninfo: L{objects.Node}
1641
    @param ninfo: the node to check
1642
    @param nresult: the remote results for the node
1643
    @param nvinfo_starttime: the start time of the RPC call
1644
    @param nvinfo_endtime: the end time of the RPC call
1645

1646
    """
1647
    ntime = nresult.get(constants.NV_TIME, None)
1648
    try:
1649
      ntime_merged = utils.MergeTime(ntime)
1650
    except (ValueError, TypeError):
1651
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1652
                    "Node returned invalid time")
1653
      return
1654

    
1655
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1656
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1657
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1658
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1659
    else:
1660
      ntime_diff = None
1661

    
1662
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1663
                  "Node time diverges by at least %s from master node time",
1664
                  ntime_diff)
1665

    
1666
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1667
    """Check the node LVM results and update info for cross-node checks.
1668

1669
    @type ninfo: L{objects.Node}
1670
    @param ninfo: the node to check
1671
    @param nresult: the remote results for the node
1672
    @param vg_name: the configured VG name
1673
    @type nimg: L{NodeImage}
1674
    @param nimg: node image
1675

1676
    """
1677
    if vg_name is None:
1678
      return
1679

    
1680
    # checks vg existence and size > 20G
1681
    vglist = nresult.get(constants.NV_VGLIST, None)
1682
    test = not vglist
1683
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1684
                  "unable to check volume groups")
1685
    if not test:
1686
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1687
                                            constants.MIN_VG_SIZE)
1688
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1689

    
1690
    # Check PVs
1691
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1692
    for em in errmsgs:
1693
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1694
    if pvminmax is not None:
1695
      (nimg.pv_min, nimg.pv_max) = pvminmax
1696

    
1697
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1698
    """Check cross-node DRBD version consistency.
1699

1700
    @type node_verify_infos: dict
1701
    @param node_verify_infos: infos about nodes as returned from the
1702
      node_verify call.
1703

1704
    """
1705
    node_versions = {}
1706
    for node_uuid, ndata in node_verify_infos.items():
1707
      nresult = ndata.payload
1708
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1709
      node_versions[node_uuid] = version
1710

    
1711
    if len(set(node_versions.values())) > 1:
1712
      for node_uuid, version in sorted(node_versions.items()):
1713
        msg = "DRBD version mismatch: %s" % version
1714
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1715
                    code=self.ETYPE_WARNING)
1716

    
1717
  def _VerifyGroupLVM(self, node_image, vg_name):
1718
    """Check cross-node consistency in LVM.
1719

1720
    @type node_image: dict
1721
    @param node_image: info about nodes, mapping from node to names to
1722
      L{NodeImage} objects
1723
    @param vg_name: the configured VG name
1724

1725
    """
1726
    if vg_name is None:
1727
      return
1728

    
1729
    # Only exclusive storage needs this kind of checks
1730
    if not self._exclusive_storage:
1731
      return
1732

    
1733
    # exclusive_storage wants all PVs to have the same size (approximately),
1734
    # if the smallest and the biggest ones are okay, everything is fine.
1735
    # pv_min is None iff pv_max is None
1736
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1737
    if not vals:
1738
      return
1739
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1740
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1741
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1742
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1743
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1744
                  " on %s, biggest (%s MB) is on %s",
1745
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1746
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1747

    
1748
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1749
    """Check the node bridges.
1750

1751
    @type ninfo: L{objects.Node}
1752
    @param ninfo: the node to check
1753
    @param nresult: the remote results for the node
1754
    @param bridges: the expected list of bridges
1755

1756
    """
1757
    if not bridges:
1758
      return
1759

    
1760
    missing = nresult.get(constants.NV_BRIDGES, None)
1761
    test = not isinstance(missing, list)
1762
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1763
                  "did not return valid bridge information")
1764
    if not test:
1765
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1766
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1767

    
1768
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1769
    """Check the results of user scripts presence and executability on the node
1770

1771
    @type ninfo: L{objects.Node}
1772
    @param ninfo: the node to check
1773
    @param nresult: the remote results for the node
1774

1775
    """
1776
    test = not constants.NV_USERSCRIPTS in nresult
1777
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1778
                  "did not return user scripts information")
1779

    
1780
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1781
    if not test:
1782
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1783
                    "user scripts not present or not executable: %s" %
1784
                    utils.CommaJoin(sorted(broken_scripts)))
1785

    
1786
  def _VerifyNodeNetwork(self, ninfo, nresult):
1787
    """Check the node network connectivity results.
1788

1789
    @type ninfo: L{objects.Node}
1790
    @param ninfo: the node to check
1791
    @param nresult: the remote results for the node
1792

1793
    """
1794
    test = constants.NV_NODELIST not in nresult
1795
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1796
                  "node hasn't returned node ssh connectivity data")
1797
    if not test:
1798
      if nresult[constants.NV_NODELIST]:
1799
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1800
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1801
                        "ssh communication with node '%s': %s", a_node, a_msg)
1802

    
1803
    test = constants.NV_NODENETTEST not in nresult
1804
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1805
                  "node hasn't returned node tcp connectivity data")
1806
    if not test:
1807
      if nresult[constants.NV_NODENETTEST]:
1808
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1809
        for anode in nlist:
1810
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1811
                        "tcp communication with node '%s': %s",
1812
                        anode, nresult[constants.NV_NODENETTEST][anode])
1813

    
1814
    test = constants.NV_MASTERIP not in nresult
1815
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1816
                  "node hasn't returned node master IP reachability data")
1817
    if not test:
1818
      if not nresult[constants.NV_MASTERIP]:
1819
        if ninfo.uuid == self.master_node:
1820
          msg = "the master node cannot reach the master IP (not configured?)"
1821
        else:
1822
          msg = "cannot reach the master IP"
1823
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1824

    
1825
  def _VerifyInstance(self, instance, node_image, diskstatus):
1826
    """Verify an instance.
1827

1828
    This function checks to see if the required block devices are
1829
    available on the instance's node, and that the nodes are in the correct
1830
    state.
1831

1832
    """
1833
    pnode_uuid = instance.primary_node
1834
    pnode_img = node_image[pnode_uuid]
1835
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1836

    
1837
    node_vol_should = {}
1838
    instance.MapLVsByNode(node_vol_should)
1839

    
1840
    cluster = self.cfg.GetClusterInfo()
1841
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1842
                                                            self.group_info)
1843
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1844
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1845
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
1846

    
1847
    for node_uuid in node_vol_should:
1848
      n_img = node_image[node_uuid]
1849
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1850
        # ignore missing volumes on offline or broken nodes
1851
        continue
1852
      for volume in node_vol_should[node_uuid]:
1853
        test = volume not in n_img.volumes
1854
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1855
                      "volume %s missing on node %s", volume,
1856
                      self.cfg.GetNodeName(node_uuid))
1857

    
1858
    if instance.admin_state == constants.ADMINST_UP:
1859
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1860
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1861
                    "instance not running on its primary node %s",
1862
                     self.cfg.GetNodeName(pnode_uuid))
1863
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1864
                    instance.name, "instance is marked as running and lives on"
1865
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1866

    
1867
    diskdata = [(nname, success, status, idx)
1868
                for (nname, disks) in diskstatus.items()
1869
                for idx, (success, status) in enumerate(disks)]
1870

    
1871
    for nname, success, bdev_status, idx in diskdata:
1872
      # the 'ghost node' construction in Exec() ensures that we have a
1873
      # node here
1874
      snode = node_image[nname]
1875
      bad_snode = snode.ghost or snode.offline
1876
      self._ErrorIf(instance.disks_active and
1877
                    not success and not bad_snode,
1878
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
1879
                    "couldn't retrieve status for disk/%s on %s: %s",
1880
                    idx, self.cfg.GetNodeName(nname), bdev_status)
1881

    
1882
      if instance.disks_active and success and \
1883
         (bdev_status.is_degraded or
1884
          bdev_status.ldisk_status != constants.LDS_OKAY):
1885
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
1886
        if bdev_status.is_degraded:
1887
          msg += " is degraded"
1888
        if bdev_status.ldisk_status != constants.LDS_OKAY:
1889
          msg += "; state is '%s'" % \
1890
                 constants.LDS_NAMES[bdev_status.ldisk_status]
1891

    
1892
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
1893

    
1894
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1895
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
1896
                  "instance %s, connection to primary node failed",
1897
                  instance.name)
1898

    
1899
    self._ErrorIf(len(instance.secondary_nodes) > 1,
1900
                  constants.CV_EINSTANCELAYOUT, instance.name,
1901
                  "instance has multiple secondary nodes: %s",
1902
                  utils.CommaJoin(instance.secondary_nodes),
1903
                  code=self.ETYPE_WARNING)
1904

    
1905
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
1906
    if any(es_flags.values()):
1907
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
1908
        # Disk template not compatible with exclusive_storage: no instance
1909
        # node should have the flag set
1910
        es_nodes = [n
1911
                    for (n, es) in es_flags.items()
1912
                    if es]
1913
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
1914
                    "instance has template %s, which is not supported on nodes"
1915
                    " that have exclusive storage set: %s",
1916
                    instance.disk_template,
1917
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
1918
      for (idx, disk) in enumerate(instance.disks):
1919
        self._ErrorIf(disk.spindles is None,
1920
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
1921
                      "number of spindles not configured for disk %s while"
1922
                      " exclusive storage is enabled, try running"
1923
                      " gnt-cluster repair-disk-sizes", idx)
1924

    
1925
    if instance.disk_template in constants.DTS_INT_MIRROR:
1926
      instance_nodes = utils.NiceSort(instance.all_nodes)
1927
      instance_groups = {}
1928

    
1929
      for node_uuid in instance_nodes:
1930
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
1931
                                   []).append(node_uuid)
1932

    
1933
      pretty_list = [
1934
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
1935
                           groupinfo[group].name)
1936
        # Sort so that we always list the primary node first.
1937
        for group, nodes in sorted(instance_groups.items(),
1938
                                   key=lambda (_, nodes): pnode_uuid in nodes,
1939
                                   reverse=True)]
1940

    
1941
      self._ErrorIf(len(instance_groups) > 1,
1942
                    constants.CV_EINSTANCESPLITGROUPS,
1943
                    instance.name, "instance has primary and secondary nodes in"
1944
                    " different groups: %s", utils.CommaJoin(pretty_list),
1945
                    code=self.ETYPE_WARNING)
1946

    
1947
    inst_nodes_offline = []
1948
    for snode in instance.secondary_nodes:
1949
      s_img = node_image[snode]
1950
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1951
                    self.cfg.GetNodeName(snode),
1952
                    "instance %s, connection to secondary node failed",
1953
                    instance.name)
1954

    
1955
      if s_img.offline:
1956
        inst_nodes_offline.append(snode)
1957

    
1958
    # warn that the instance lives on offline nodes
1959
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
1960
                  instance.name, "instance has offline secondary node(s) %s",
1961
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
1962
    # ... or ghost/non-vm_capable nodes
1963
    for node_uuid in instance.all_nodes:
1964
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
1965
                    instance.name, "instance lives on ghost node %s",
1966
                    self.cfg.GetNodeName(node_uuid))
1967
      self._ErrorIf(not node_image[node_uuid].vm_capable,
1968
                    constants.CV_EINSTANCEBADNODE, instance.name,
1969
                    "instance lives on non-vm_capable node %s",
1970
                    self.cfg.GetNodeName(node_uuid))
1971

    
1972
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1973
    """Verify if there are any unknown volumes in the cluster.
1974

1975
    The .os, .swap and backup volumes are ignored. All other volumes are
1976
    reported as unknown.
1977

1978
    @type reserved: L{ganeti.utils.FieldSet}
1979
    @param reserved: a FieldSet of reserved volume names
1980

1981
    """
1982
    for node_uuid, n_img in node_image.items():
1983
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1984
          self.all_node_info[node_uuid].group != self.group_uuid):
1985
        # skip non-healthy nodes
1986
        continue
1987
      for volume in n_img.volumes:
1988
        test = ((node_uuid not in node_vol_should or
1989
                volume not in node_vol_should[node_uuid]) and
1990
                not reserved.Matches(volume))
1991
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
1992
                      self.cfg.GetNodeName(node_uuid),
1993
                      "volume %s is unknown", volume)
1994

    
1995
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
1996
    """Verify N+1 Memory Resilience.
1997

1998
    Check that if one single node dies we can still start all the
1999
    instances it was primary for.
2000

2001
    """
2002
    cluster_info = self.cfg.GetClusterInfo()
2003
    for node_uuid, n_img in node_image.items():
2004
      # This code checks that every node which is now listed as
2005
      # secondary has enough memory to host all instances it is
2006
      # supposed to should a single other node in the cluster fail.
2007
      # FIXME: not ready for failover to an arbitrary node
2008
      # FIXME: does not support file-backed instances
2009
      # WARNING: we currently take into account down instances as well
2010
      # as up ones, considering that even if they're down someone
2011
      # might want to start them even in the event of a node failure.
2012
      if n_img.offline or \
2013
         self.all_node_info[node_uuid].group != self.group_uuid:
2014
        # we're skipping nodes marked offline and nodes in other groups from
2015
        # the N+1 warning, since most likely we don't have good memory
2016
        # infromation from them; we already list instances living on such
2017
        # nodes, and that's enough warning
2018
        continue
2019
      #TODO(dynmem): also consider ballooning out other instances
2020
      for prinode, inst_uuids in n_img.sbp.items():
2021
        needed_mem = 0
2022
        for inst_uuid in inst_uuids:
2023
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2024
          if bep[constants.BE_AUTO_BALANCE]:
2025
            needed_mem += bep[constants.BE_MINMEM]
2026
        test = n_img.mfree < needed_mem
2027
        self._ErrorIf(test, constants.CV_ENODEN1,
2028
                      self.cfg.GetNodeName(node_uuid),
2029
                      "not enough memory to accomodate instance failovers"
2030
                      " should node %s fail (%dMiB needed, %dMiB available)",
2031
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2032

    
2033
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2034
                   (files_all, files_opt, files_mc, files_vm)):
2035
    """Verifies file checksums collected from all nodes.
2036

2037
    @param nodes: List of L{objects.Node} objects
2038
    @param master_node_uuid: UUID of master node
2039
    @param all_nvinfo: RPC results
2040

2041
    """
2042
    # Define functions determining which nodes to consider for a file
2043
    files2nodefn = [
2044
      (files_all, None),
2045
      (files_mc, lambda node: (node.master_candidate or
2046
                               node.uuid == master_node_uuid)),
2047
      (files_vm, lambda node: node.vm_capable),
2048
      ]
2049

    
2050
    # Build mapping from filename to list of nodes which should have the file
2051
    nodefiles = {}
2052
    for (files, fn) in files2nodefn:
2053
      if fn is None:
2054
        filenodes = nodes
2055
      else:
2056
        filenodes = filter(fn, nodes)
2057
      nodefiles.update((filename,
2058
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2059
                       for filename in files)
2060

    
2061
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2062

    
2063
    fileinfo = dict((filename, {}) for filename in nodefiles)
2064
    ignore_nodes = set()
2065

    
2066
    for node in nodes:
2067
      if node.offline:
2068
        ignore_nodes.add(node.uuid)
2069
        continue
2070

    
2071
      nresult = all_nvinfo[node.uuid]
2072

    
2073
      if nresult.fail_msg or not nresult.payload:
2074
        node_files = None
2075
      else:
2076
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2077
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2078
                          for (key, value) in fingerprints.items())
2079
        del fingerprints
2080

    
2081
      test = not (node_files and isinstance(node_files, dict))
2082
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2083
                    "Node did not return file checksum data")
2084
      if test:
2085
        ignore_nodes.add(node.uuid)
2086
        continue
2087

    
2088
      # Build per-checksum mapping from filename to nodes having it
2089
      for (filename, checksum) in node_files.items():
2090
        assert filename in nodefiles
2091
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2092

    
2093
    for (filename, checksums) in fileinfo.items():
2094
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2095

    
2096
      # Nodes having the file
2097
      with_file = frozenset(node_uuid
2098
                            for node_uuids in fileinfo[filename].values()
2099
                            for node_uuid in node_uuids) - ignore_nodes
2100

    
2101
      expected_nodes = nodefiles[filename] - ignore_nodes
2102

    
2103
      # Nodes missing file
2104
      missing_file = expected_nodes - with_file
2105

    
2106
      if filename in files_opt:
2107
        # All or no nodes
2108
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2109
                      constants.CV_ECLUSTERFILECHECK, None,
2110
                      "File %s is optional, but it must exist on all or no"
2111
                      " nodes (not found on %s)",
2112
                      filename,
2113
                      utils.CommaJoin(
2114
                        utils.NiceSort(
2115
                          map(self.cfg.GetNodeName, missing_file))))
2116
      else:
2117
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2118
                      "File %s is missing from node(s) %s", filename,
2119
                      utils.CommaJoin(
2120
                        utils.NiceSort(
2121
                          map(self.cfg.GetNodeName, missing_file))))
2122

    
2123
        # Warn if a node has a file it shouldn't
2124
        unexpected = with_file - expected_nodes
2125
        self._ErrorIf(unexpected,
2126
                      constants.CV_ECLUSTERFILECHECK, None,
2127
                      "File %s should not exist on node(s) %s",
2128
                      filename, utils.CommaJoin(
2129
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2130

    
2131
      # See if there are multiple versions of the file
2132
      test = len(checksums) > 1
2133
      if test:
2134
        variants = ["variant %s on %s" %
2135
                    (idx + 1,
2136
                     utils.CommaJoin(utils.NiceSort(
2137
                       map(self.cfg.GetNodeName, node_uuids))))
2138
                    for (idx, (checksum, node_uuids)) in
2139
                      enumerate(sorted(checksums.items()))]
2140
      else:
2141
        variants = []
2142

    
2143
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2144
                    "File %s found with %s different checksums (%s)",
2145
                    filename, len(checksums), "; ".join(variants))
2146

    
2147
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2148
                      drbd_map):
2149
    """Verifies and the node DRBD status.
2150

2151
    @type ninfo: L{objects.Node}
2152
    @param ninfo: the node to check
2153
    @param nresult: the remote results for the node
2154
    @param instanceinfo: the dict of instances
2155
    @param drbd_helper: the configured DRBD usermode helper
2156
    @param drbd_map: the DRBD map as returned by
2157
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2158

2159
    """
2160
    if drbd_helper:
2161
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2162
      test = (helper_result is None)
2163
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2164
                    "no drbd usermode helper returned")
2165
      if helper_result:
2166
        status, payload = helper_result
2167
        test = not status
2168
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2169
                      "drbd usermode helper check unsuccessful: %s", payload)
2170
        test = status and (payload != drbd_helper)
2171
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2172
                      "wrong drbd usermode helper: %s", payload)
2173

    
2174
    # compute the DRBD minors
2175
    node_drbd = {}
2176
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2177
      test = inst_uuid not in instanceinfo
2178
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2179
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2180
        # ghost instance should not be running, but otherwise we
2181
        # don't give double warnings (both ghost instance and
2182
        # unallocated minor in use)
2183
      if test:
2184
        node_drbd[minor] = (inst_uuid, False)
2185
      else:
2186
        instance = instanceinfo[inst_uuid]
2187
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2188

    
2189
    # and now check them
2190
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2191
    test = not isinstance(used_minors, (tuple, list))
2192
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2193
                  "cannot parse drbd status file: %s", str(used_minors))
2194
    if test:
2195
      # we cannot check drbd status
2196
      return
2197

    
2198
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2199
      test = minor not in used_minors and must_exist
2200
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2201
                    "drbd minor %d of instance %s is not active", minor,
2202
                    self.cfg.GetInstanceName(inst_uuid))
2203
    for minor in used_minors:
2204
      test = minor not in node_drbd
2205
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2206
                    "unallocated drbd minor %d is in use", minor)
2207

    
2208
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2209
    """Builds the node OS structures.
2210

2211
    @type ninfo: L{objects.Node}
2212
    @param ninfo: the node to check
2213
    @param nresult: the remote results for the node
2214
    @param nimg: the node image object
2215

2216
    """
2217
    remote_os = nresult.get(constants.NV_OSLIST, None)
2218
    test = (not isinstance(remote_os, list) or
2219
            not compat.all(isinstance(v, list) and len(v) == 7
2220
                           for v in remote_os))
2221

    
2222
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2223
                  "node hasn't returned valid OS data")
2224

    
2225
    nimg.os_fail = test
2226

    
2227
    if test:
2228
      return
2229

    
2230
    os_dict = {}
2231

    
2232
    for (name, os_path, status, diagnose,
2233
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2234

    
2235
      if name not in os_dict:
2236
        os_dict[name] = []
2237

    
2238
      # parameters is a list of lists instead of list of tuples due to
2239
      # JSON lacking a real tuple type, fix it:
2240
      parameters = [tuple(v) for v in parameters]
2241
      os_dict[name].append((os_path, status, diagnose,
2242
                            set(variants), set(parameters), set(api_ver)))
2243

    
2244
    nimg.oslist = os_dict
2245

    
2246
  def _VerifyNodeOS(self, ninfo, nimg, base):
2247
    """Verifies the node OS list.
2248

2249
    @type ninfo: L{objects.Node}
2250
    @param ninfo: the node to check
2251
    @param nimg: the node image object
2252
    @param base: the 'template' node we match against (e.g. from the master)
2253

2254
    """
2255
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2256

    
2257
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2258
    for os_name, os_data in nimg.oslist.items():
2259
      assert os_data, "Empty OS status for OS %s?!" % os_name
2260
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2261
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2262
                    "Invalid OS %s (located at %s): %s",
2263
                    os_name, f_path, f_diag)
2264
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2265
                    "OS '%s' has multiple entries"
2266
                    " (first one shadows the rest): %s",
2267
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2268
      # comparisons with the 'base' image
2269
      test = os_name not in base.oslist
2270
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2271
                    "Extra OS %s not present on reference node (%s)",
2272
                    os_name, self.cfg.GetNodeName(base.uuid))
2273
      if test:
2274
        continue
2275
      assert base.oslist[os_name], "Base node has empty OS status?"
2276
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2277
      if not b_status:
2278
        # base OS is invalid, skipping
2279
        continue
2280
      for kind, a, b in [("API version", f_api, b_api),
2281
                         ("variants list", f_var, b_var),
2282
                         ("parameters", beautify_params(f_param),
2283
                          beautify_params(b_param))]:
2284
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2285
                      "OS %s for %s differs from reference node %s:"
2286
                      " [%s] vs. [%s]", kind, os_name,
2287
                      self.cfg.GetNodeName(base.uuid),
2288
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2289

    
2290
    # check any missing OSes
2291
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2292
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2293
                  "OSes present on reference node %s"
2294
                  " but missing on this node: %s",
2295
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2296

    
2297
  def _VerifyFileStoragePaths(self, ninfo, nresult, is_master,
2298
                              enabled_disk_templates):
2299
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2300

2301
    @type ninfo: L{objects.Node}
2302
    @param ninfo: the node to check
2303
    @param nresult: the remote results for the node
2304
    @type is_master: bool
2305
    @param is_master: Whether node is the master node
2306

2307
    """
2308
    if (is_master and
2309
        (utils.storage.IsFileStorageEnabled(enabled_disk_templates) or
2310
         utils.storage.IsSharedFileStorageEnabled(enabled_disk_templates))):
2311
      try:
2312
        fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2313
      except KeyError:
2314
        # This should never happen
2315
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2316
                      "Node did not return forbidden file storage paths")
2317
      else:
2318
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2319
                      "Found forbidden file storage paths: %s",
2320
                      utils.CommaJoin(fspaths))
2321
    else:
2322
      self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2323
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2324
                    "Node should not have returned forbidden file storage"
2325
                    " paths")
2326

    
2327
  def _VerifyOob(self, ninfo, nresult):
2328
    """Verifies out of band functionality of a node.
2329

2330
    @type ninfo: L{objects.Node}
2331
    @param ninfo: the node to check
2332
    @param nresult: the remote results for the node
2333

2334
    """
2335
    # We just have to verify the paths on master and/or master candidates
2336
    # as the oob helper is invoked on the master
2337
    if ((ninfo.master_candidate or ninfo.master_capable) and
2338
        constants.NV_OOB_PATHS in nresult):
2339
      for path_result in nresult[constants.NV_OOB_PATHS]:
2340
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2341
                      ninfo.name, path_result)
2342

    
2343
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2344
    """Verifies and updates the node volume data.
2345

2346
    This function will update a L{NodeImage}'s internal structures
2347
    with data from the remote call.
2348

2349
    @type ninfo: L{objects.Node}
2350
    @param ninfo: the node to check
2351
    @param nresult: the remote results for the node
2352
    @param nimg: the node image object
2353
    @param vg_name: the configured VG name
2354

2355
    """
2356
    nimg.lvm_fail = True
2357
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2358
    if vg_name is None:
2359
      pass
2360
    elif isinstance(lvdata, basestring):
2361
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2362
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2363
    elif not isinstance(lvdata, dict):
2364
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2365
                    "rpc call to node failed (lvlist)")
2366
    else:
2367
      nimg.volumes = lvdata
2368
      nimg.lvm_fail = False
2369

    
2370
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2371
    """Verifies and updates the node instance list.
2372

2373
    If the listing was successful, then updates this node's instance
2374
    list. Otherwise, it marks the RPC call as failed for the instance
2375
    list key.
2376

2377
    @type ninfo: L{objects.Node}
2378
    @param ninfo: the node to check
2379
    @param nresult: the remote results for the node
2380
    @param nimg: the node image object
2381

2382
    """
2383
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2384
    test = not isinstance(idata, list)
2385
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2386
                  "rpc call to node failed (instancelist): %s",
2387
                  utils.SafeEncode(str(idata)))
2388
    if test:
2389
      nimg.hyp_fail = True
2390
    else:
2391
      nimg.instances = [inst.uuid for (_, inst) in
2392
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2393

    
2394
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2395
    """Verifies and computes a node information map
2396

2397
    @type ninfo: L{objects.Node}
2398
    @param ninfo: the node to check
2399
    @param nresult: the remote results for the node
2400
    @param nimg: the node image object
2401
    @param vg_name: the configured VG name
2402

2403
    """
2404
    # try to read free memory (from the hypervisor)
2405
    hv_info = nresult.get(constants.NV_HVINFO, None)
2406
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2407
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2408
                  "rpc call to node failed (hvinfo)")
2409
    if not test:
2410
      try:
2411
        nimg.mfree = int(hv_info["memory_free"])
2412
      except (ValueError, TypeError):
2413
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2414
                      "node returned invalid nodeinfo, check hypervisor")
2415

    
2416
    # FIXME: devise a free space model for file based instances as well
2417
    if vg_name is not None:
2418
      test = (constants.NV_VGLIST not in nresult or
2419
              vg_name not in nresult[constants.NV_VGLIST])
2420
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2421
                    "node didn't return data for the volume group '%s'"
2422
                    " - it is either missing or broken", vg_name)
2423
      if not test:
2424
        try:
2425
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2426
        except (ValueError, TypeError):
2427
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2428
                        "node returned invalid LVM info, check LVM status")
2429

    
2430
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2431
    """Gets per-disk status information for all instances.
2432

2433
    @type node_uuids: list of strings
2434
    @param node_uuids: Node UUIDs
2435
    @type node_image: dict of (UUID, L{objects.Node})
2436
    @param node_image: Node objects
2437
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2438
    @param instanceinfo: Instance objects
2439
    @rtype: {instance: {node: [(succes, payload)]}}
2440
    @return: a dictionary of per-instance dictionaries with nodes as
2441
        keys and disk information as values; the disk information is a
2442
        list of tuples (success, payload)
2443

2444
    """
2445
    node_disks = {}
2446
    node_disks_devonly = {}
2447
    diskless_instances = set()
2448
    diskless = constants.DT_DISKLESS
2449

    
2450
    for nuuid in node_uuids:
2451
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2452
                                             node_image[nuuid].sinst))
2453
      diskless_instances.update(uuid for uuid in node_inst_uuids
2454
                                if instanceinfo[uuid].disk_template == diskless)
2455
      disks = [(inst_uuid, disk)
2456
               for inst_uuid in node_inst_uuids
2457
               for disk in instanceinfo[inst_uuid].disks]
2458

    
2459
      if not disks:
2460
        # No need to collect data
2461
        continue
2462

    
2463
      node_disks[nuuid] = disks
2464

    
2465
      # _AnnotateDiskParams makes already copies of the disks
2466
      devonly = []
2467
      for (inst_uuid, dev) in disks:
2468
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2469
                                          self.cfg)
2470
        self.cfg.SetDiskID(anno_disk, nuuid)
2471
        devonly.append(anno_disk)
2472

    
2473
      node_disks_devonly[nuuid] = devonly
2474

    
2475
    assert len(node_disks) == len(node_disks_devonly)
2476

    
2477
    # Collect data from all nodes with disks
2478
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2479
                                                          node_disks_devonly)
2480

    
2481
    assert len(result) == len(node_disks)
2482

    
2483
    instdisk = {}
2484

    
2485
    for (nuuid, nres) in result.items():
2486
      node = self.cfg.GetNodeInfo(nuuid)
2487
      disks = node_disks[node.uuid]
2488

    
2489
      if nres.offline:
2490
        # No data from this node
2491
        data = len(disks) * [(False, "node offline")]
2492
      else:
2493
        msg = nres.fail_msg
2494
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2495
                      "while getting disk information: %s", msg)
2496
        if msg:
2497
          # No data from this node
2498
          data = len(disks) * [(False, msg)]
2499
        else:
2500
          data = []
2501
          for idx, i in enumerate(nres.payload):
2502
            if isinstance(i, (tuple, list)) and len(i) == 2:
2503
              data.append(i)
2504
            else:
2505
              logging.warning("Invalid result from node %s, entry %d: %s",
2506
                              node.name, idx, i)
2507
              data.append((False, "Invalid result from the remote node"))
2508

    
2509
      for ((inst_uuid, _), status) in zip(disks, data):
2510
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2511
          .append(status)
2512

    
2513
    # Add empty entries for diskless instances.
2514
    for inst_uuid in diskless_instances:
2515
      assert inst_uuid not in instdisk
2516
      instdisk[inst_uuid] = {}
2517

    
2518
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2519
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2520
                      compat.all(isinstance(s, (tuple, list)) and
2521
                                 len(s) == 2 for s in statuses)
2522
                      for inst, nuuids in instdisk.items()
2523
                      for nuuid, statuses in nuuids.items())
2524
    if __debug__:
2525
      instdisk_keys = set(instdisk)
2526
      instanceinfo_keys = set(instanceinfo)
2527
      assert instdisk_keys == instanceinfo_keys, \
2528
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2529
         (instdisk_keys, instanceinfo_keys))
2530

    
2531
    return instdisk
2532

    
2533
  @staticmethod
2534
  def _SshNodeSelector(group_uuid, all_nodes):
2535
    """Create endless iterators for all potential SSH check hosts.
2536

2537
    """
2538
    nodes = [node for node in all_nodes
2539
             if (node.group != group_uuid and
2540
                 not node.offline)]
2541
    keyfunc = operator.attrgetter("group")
2542

    
2543
    return map(itertools.cycle,
2544
               [sorted(map(operator.attrgetter("name"), names))
2545
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2546
                                                  keyfunc)])
2547

    
2548
  @classmethod
2549
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2550
    """Choose which nodes should talk to which other nodes.
2551

2552
    We will make nodes contact all nodes in their group, and one node from
2553
    every other group.
2554

2555
    @warning: This algorithm has a known issue if one node group is much
2556
      smaller than others (e.g. just one node). In such a case all other
2557
      nodes will talk to the single node.
2558

2559
    """
2560
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2561
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2562

    
2563
    return (online_nodes,
2564
            dict((name, sorted([i.next() for i in sel]))
2565
                 for name in online_nodes))
2566

    
2567
  def BuildHooksEnv(self):
2568
    """Build hooks env.
2569

2570
    Cluster-Verify hooks just ran in the post phase and their failure makes
2571
    the output be logged in the verify output and the verification to fail.
2572

2573
    """
2574
    env = {
2575
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2576
      }
2577

    
2578
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2579
               for node in self.my_node_info.values())
2580

    
2581
    return env
2582

    
2583
  def BuildHooksNodes(self):
2584
    """Build hooks nodes.
2585

2586
    """
2587
    return ([], list(self.my_node_info.keys()))
2588

    
2589
  def Exec(self, feedback_fn):
2590
    """Verify integrity of the node group, performing various test on nodes.
2591

2592
    """
2593
    # This method has too many local variables. pylint: disable=R0914
2594
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2595

    
2596
    if not self.my_node_uuids:
2597
      # empty node group
2598
      feedback_fn("* Empty node group, skipping verification")
2599
      return True
2600

    
2601
    self.bad = False
2602
    verbose = self.op.verbose
2603
    self._feedback_fn = feedback_fn
2604

    
2605
    vg_name = self.cfg.GetVGName()
2606
    drbd_helper = self.cfg.GetDRBDHelper()
2607
    cluster = self.cfg.GetClusterInfo()
2608
    hypervisors = cluster.enabled_hypervisors
2609
    node_data_list = self.my_node_info.values()
2610

    
2611
    i_non_redundant = [] # Non redundant instances
2612
    i_non_a_balanced = [] # Non auto-balanced instances
2613
    i_offline = 0 # Count of offline instances
2614
    n_offline = 0 # Count of offline nodes
2615
    n_drained = 0 # Count of nodes being drained
2616
    node_vol_should = {}
2617

    
2618
    # FIXME: verify OS list
2619

    
2620
    # File verification
2621
    filemap = ComputeAncillaryFiles(cluster, False)
2622

    
2623
    # do local checksums
2624
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2625
    master_ip = self.cfg.GetMasterIP()
2626

    
2627
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2628

    
2629
    user_scripts = []
2630
    if self.cfg.GetUseExternalMipScript():
2631
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2632

    
2633
    node_verify_param = {
2634
      constants.NV_FILELIST:
2635
        map(vcluster.MakeVirtualPath,
2636
            utils.UniqueSequence(filename
2637
                                 for files in filemap
2638
                                 for filename in files)),
2639
      constants.NV_NODELIST:
2640
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2641
                                  self.all_node_info.values()),
2642
      constants.NV_HYPERVISOR: hypervisors,
2643
      constants.NV_HVPARAMS:
2644
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2645
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2646
                                 for node in node_data_list
2647
                                 if not node.offline],
2648
      constants.NV_INSTANCELIST: hypervisors,
2649
      constants.NV_VERSION: None,
2650
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2651
      constants.NV_NODESETUP: None,
2652
      constants.NV_TIME: None,
2653
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2654
      constants.NV_OSLIST: None,
2655
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2656
      constants.NV_USERSCRIPTS: user_scripts,
2657
      }
2658

    
2659
    if vg_name is not None:
2660
      node_verify_param[constants.NV_VGLIST] = None
2661
      node_verify_param[constants.NV_LVLIST] = vg_name
2662
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2663

    
2664
    if drbd_helper:
2665
      node_verify_param[constants.NV_DRBDVERSION] = None
2666
      node_verify_param[constants.NV_DRBDLIST] = None
2667
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2668

    
2669
    if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2670
      # Load file storage paths only from master node
2671
      node_verify_param[constants.NV_FILE_STORAGE_PATHS] = \
2672
        self.cfg.GetMasterNodeName()
2673

    
2674
    # bridge checks
2675
    # FIXME: this needs to be changed per node-group, not cluster-wide
2676
    bridges = set()
2677
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2678
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2679
      bridges.add(default_nicpp[constants.NIC_LINK])
2680
    for inst_uuid in self.my_inst_info.values():
2681
      for nic in inst_uuid.nics:
2682
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2683
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2684
          bridges.add(full_nic[constants.NIC_LINK])
2685

    
2686
    if bridges:
2687
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2688

    
2689
    # Build our expected cluster state
2690
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2691
                                                 uuid=node.uuid,
2692
                                                 vm_capable=node.vm_capable))
2693
                      for node in node_data_list)
2694

    
2695
    # Gather OOB paths
2696
    oob_paths = []
2697
    for node in self.all_node_info.values():
2698
      path = SupportsOob(self.cfg, node)
2699
      if path and path not in oob_paths:
2700
        oob_paths.append(path)
2701

    
2702
    if oob_paths:
2703
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2704

    
2705
    for inst_uuid in self.my_inst_uuids:
2706
      instance = self.my_inst_info[inst_uuid]
2707
      if instance.admin_state == constants.ADMINST_OFFLINE:
2708
        i_offline += 1
2709

    
2710
      for nuuid in instance.all_nodes:
2711
        if nuuid not in node_image:
2712
          gnode = self.NodeImage(uuid=nuuid)
2713
          gnode.ghost = (nuuid not in self.all_node_info)
2714
          node_image[nuuid] = gnode
2715

    
2716
      instance.MapLVsByNode(node_vol_should)
2717

    
2718
      pnode = instance.primary_node
2719
      node_image[pnode].pinst.append(instance.uuid)
2720

    
2721
      for snode in instance.secondary_nodes:
2722
        nimg = node_image[snode]
2723
        nimg.sinst.append(instance.uuid)
2724
        if pnode not in nimg.sbp:
2725
          nimg.sbp[pnode] = []
2726
        nimg.sbp[pnode].append(instance.uuid)
2727

    
2728
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2729
                                               self.my_node_info.keys())
2730
    # The value of exclusive_storage should be the same across the group, so if
2731
    # it's True for at least a node, we act as if it were set for all the nodes
2732
    self._exclusive_storage = compat.any(es_flags.values())
2733
    if self._exclusive_storage:
2734
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2735

    
2736
    # At this point, we have the in-memory data structures complete,
2737
    # except for the runtime information, which we'll gather next
2738

    
2739
    # Due to the way our RPC system works, exact response times cannot be
2740
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2741
    # time before and after executing the request, we can at least have a time
2742
    # window.
2743
    nvinfo_starttime = time.time()
2744
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2745
                                           node_verify_param,
2746
                                           self.cfg.GetClusterName(),
2747
                                           self.cfg.GetClusterInfo().hvparams)
2748
    nvinfo_endtime = time.time()
2749

    
2750
    if self.extra_lv_nodes and vg_name is not None:
2751
      extra_lv_nvinfo = \
2752
          self.rpc.call_node_verify(self.extra_lv_nodes,
2753
                                    {constants.NV_LVLIST: vg_name},
2754
                                    self.cfg.GetClusterName(),
2755
                                    self.cfg.GetClusterInfo().hvparams)
2756
    else:
2757
      extra_lv_nvinfo = {}
2758

    
2759
    all_drbd_map = self.cfg.ComputeDRBDMap()
2760

    
2761
    feedback_fn("* Gathering disk information (%s nodes)" %
2762
                len(self.my_node_uuids))
2763
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2764
                                     self.my_inst_info)
2765

    
2766
    feedback_fn("* Verifying configuration file consistency")
2767

    
2768
    # If not all nodes are being checked, we need to make sure the master node
2769
    # and a non-checked vm_capable node are in the list.
2770
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2771
    if absent_node_uuids:
2772
      vf_nvinfo = all_nvinfo.copy()
2773
      vf_node_info = list(self.my_node_info.values())
2774
      additional_node_uuids = []
2775
      if master_node_uuid not in self.my_node_info:
2776
        additional_node_uuids.append(master_node_uuid)
2777
        vf_node_info.append(self.all_node_info[master_node_uuid])
2778
      # Add the first vm_capable node we find which is not included,
2779
      # excluding the master node (which we already have)
2780
      for node_uuid in absent_node_uuids:
2781
        nodeinfo = self.all_node_info[node_uuid]
2782
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2783
            node_uuid != master_node_uuid):
2784
          additional_node_uuids.append(node_uuid)
2785
          vf_node_info.append(self.all_node_info[node_uuid])
2786
          break
2787
      key = constants.NV_FILELIST
2788
      vf_nvinfo.update(self.rpc.call_node_verify(
2789
         additional_node_uuids, {key: node_verify_param[key]},
2790
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2791
    else:
2792
      vf_nvinfo = all_nvinfo
2793
      vf_node_info = self.my_node_info.values()
2794

    
2795
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2796

    
2797
    feedback_fn("* Verifying node status")
2798

    
2799
    refos_img = None
2800

    
2801
    for node_i in node_data_list:
2802
      nimg = node_image[node_i.uuid]
2803

    
2804
      if node_i.offline:
2805
        if verbose:
2806
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
2807
        n_offline += 1
2808
        continue
2809

    
2810
      if node_i.uuid == master_node_uuid:
2811
        ntype = "master"
2812
      elif node_i.master_candidate:
2813
        ntype = "master candidate"
2814
      elif node_i.drained:
2815
        ntype = "drained"
2816
        n_drained += 1
2817
      else:
2818
        ntype = "regular"
2819
      if verbose:
2820
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2821

    
2822
      msg = all_nvinfo[node_i.uuid].fail_msg
2823
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2824
                    "while contacting node: %s", msg)
2825
      if msg:
2826
        nimg.rpc_fail = True
2827
        continue
2828

    
2829
      nresult = all_nvinfo[node_i.uuid].payload
2830

    
2831
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2832
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2833
      self._VerifyNodeNetwork(node_i, nresult)
2834
      self._VerifyNodeUserScripts(node_i, nresult)
2835
      self._VerifyOob(node_i, nresult)
2836
      self._VerifyFileStoragePaths(node_i, nresult,
2837
                                   node_i.uuid == master_node_uuid,
2838
                                   cluster.enabled_disk_templates)
2839

    
2840
      if nimg.vm_capable:
2841
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2842
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2843
                             all_drbd_map)
2844

    
2845
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2846
        self._UpdateNodeInstances(node_i, nresult, nimg)
2847
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2848
        self._UpdateNodeOS(node_i, nresult, nimg)
2849

    
2850
        if not nimg.os_fail:
2851
          if refos_img is None:
2852
            refos_img = nimg
2853
          self._VerifyNodeOS(node_i, nimg, refos_img)
2854
        self._VerifyNodeBridges(node_i, nresult, bridges)
2855

    
2856
        # Check whether all running instances are primary for the node. (This
2857
        # can no longer be done from _VerifyInstance below, since some of the
2858
        # wrong instances could be from other node groups.)
2859
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
2860

    
2861
        for inst_uuid in non_primary_inst_uuids:
2862
          test = inst_uuid in self.all_inst_info
2863
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
2864
                        self.cfg.GetInstanceName(inst_uuid),
2865
                        "instance should not run on node %s", node_i.name)
2866
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2867
                        "node is running unknown instance %s", inst_uuid)
2868

    
2869
    self._VerifyGroupDRBDVersion(all_nvinfo)
2870
    self._VerifyGroupLVM(node_image, vg_name)
2871

    
2872
    for node_uuid, result in extra_lv_nvinfo.items():
2873
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
2874
                              node_image[node_uuid], vg_name)
2875

    
2876
    feedback_fn("* Verifying instance status")
2877
    for inst_uuid in self.my_inst_uuids:
2878
      instance = self.my_inst_info[inst_uuid]
2879
      if verbose:
2880
        feedback_fn("* Verifying instance %s" % instance.name)
2881
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
2882

    
2883
      # If the instance is non-redundant we cannot survive losing its primary
2884
      # node, so we are not N+1 compliant.
2885
      if instance.disk_template not in constants.DTS_MIRRORED:
2886
        i_non_redundant.append(instance)
2887

    
2888
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
2889
        i_non_a_balanced.append(instance)
2890

    
2891
    feedback_fn("* Verifying orphan volumes")
2892
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2893

    
2894
    # We will get spurious "unknown volume" warnings if any node of this group
2895
    # is secondary for an instance whose primary is in another group. To avoid
2896
    # them, we find these instances and add their volumes to node_vol_should.
2897
    for instance in self.all_inst_info.values():
2898
      for secondary in instance.secondary_nodes:
2899
        if (secondary in self.my_node_info
2900
            and instance.name not in self.my_inst_info):
2901
          instance.MapLVsByNode(node_vol_should)
2902
          break
2903

    
2904
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2905

    
2906
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2907
      feedback_fn("* Verifying N+1 Memory redundancy")
2908
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2909

    
2910
    feedback_fn("* Other Notes")
2911
    if i_non_redundant:
2912
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2913
                  % len(i_non_redundant))
2914

    
2915
    if i_non_a_balanced:
2916
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2917
                  % len(i_non_a_balanced))
2918

    
2919
    if i_offline:
2920
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2921

    
2922
    if n_offline:
2923
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2924

    
2925
    if n_drained:
2926
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2927

    
2928
    return not self.bad
2929

    
2930
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2931
    """Analyze the post-hooks' result
2932

2933
    This method analyses the hook result, handles it, and sends some
2934
    nicely-formatted feedback back to the user.
2935

2936
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2937
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2938
    @param hooks_results: the results of the multi-node hooks rpc call
2939
    @param feedback_fn: function used send feedback back to the caller
2940
    @param lu_result: previous Exec result
2941
    @return: the new Exec result, based on the previous result
2942
        and hook results
2943

2944
    """
2945
    # We only really run POST phase hooks, only for non-empty groups,
2946
    # and are only interested in their results
2947
    if not self.my_node_uuids:
2948
      # empty node group
2949
      pass
2950
    elif phase == constants.HOOKS_PHASE_POST:
2951
      # Used to change hooks' output to proper indentation
2952
      feedback_fn("* Hooks Results")
2953
      assert hooks_results, "invalid result from hooks"
2954

    
2955
      for node_name in hooks_results:
2956
        res = hooks_results[node_name]
2957
        msg = res.fail_msg
2958
        test = msg and not res.offline
2959
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2960
                      "Communication failure in hooks execution: %s", msg)
2961
        if res.offline or msg:
2962
          # No need to investigate payload if node is offline or gave
2963
          # an error.
2964
          continue
2965
        for script, hkr, output in res.payload:
2966
          test = hkr == constants.HKR_FAIL
2967
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2968
                        "Script %s failed, output:", script)
2969
          if test:
2970
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2971
            feedback_fn("%s" % output)
2972
            lu_result = False
2973

    
2974
    return lu_result
2975

    
2976

    
2977
class LUClusterVerifyDisks(NoHooksLU):
2978
  """Verifies the cluster disks status.
2979

2980
  """
2981
  REQ_BGL = False
2982

    
2983
  def ExpandNames(self):
2984
    self.share_locks = ShareAll()
2985
    self.needed_locks = {
2986
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2987
      }
2988

    
2989
  def Exec(self, feedback_fn):
2990
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2991

    
2992
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2993
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2994
                           for group in group_names])