Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ bb935d8d

History | View | Annotate | Download (110.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
170
                                                     master_params, ems)
171
    result.Warn("Error disabling the master IP address", self.LogWarning)
172
    return master_params.uuid
173

    
174

    
175
class LUClusterPostInit(LogicalUnit):
176
  """Logical unit for running hooks after cluster initialization.
177

178
  """
179
  HPATH = "cluster-init"
180
  HTYPE = constants.HTYPE_CLUSTER
181

    
182
  def BuildHooksEnv(self):
183
    """Build hooks env.
184

185
    """
186
    return {
187
      "OP_TARGET": self.cfg.GetClusterName(),
188
      }
189

    
190
  def BuildHooksNodes(self):
191
    """Build hooks nodes.
192

193
    """
194
    return ([], [self.cfg.GetMasterNode()])
195

    
196
  def Exec(self, feedback_fn):
197
    """Nothing to do.
198

199
    """
200
    return True
201

    
202

    
203
class ClusterQuery(QueryBase):
204
  FIELDS = query.CLUSTER_FIELDS
205

    
206
  #: Do not sort (there is only one item)
207
  SORT_FIELD = None
208

    
209
  def ExpandNames(self, lu):
210
    lu.needed_locks = {}
211

    
212
    # The following variables interact with _QueryBase._GetNames
213
    self.wanted = locking.ALL_SET
214
    self.do_locking = self.use_locking
215

    
216
    if self.do_locking:
217
      raise errors.OpPrereqError("Can not use locking for cluster queries",
218
                                 errors.ECODE_INVAL)
219

    
220
  def DeclareLocks(self, lu, level):
221
    pass
222

    
223
  def _GetQueryData(self, lu):
224
    """Computes the list of nodes and their attributes.
225

226
    """
227
    # Locking is not used
228
    assert not (compat.any(lu.glm.is_owned(level)
229
                           for level in locking.LEVELS
230
                           if level != locking.LEVEL_CLUSTER) or
231
                self.do_locking or self.use_locking)
232

    
233
    if query.CQ_CONFIG in self.requested_data:
234
      cluster = lu.cfg.GetClusterInfo()
235
      nodes = lu.cfg.GetAllNodesInfo()
236
    else:
237
      cluster = NotImplemented
238
      nodes = NotImplemented
239

    
240
    if query.CQ_QUEUE_DRAINED in self.requested_data:
241
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
242
    else:
243
      drain_flag = NotImplemented
244

    
245
    if query.CQ_WATCHER_PAUSE in self.requested_data:
246
      master_node_uuid = lu.cfg.GetMasterNode()
247

    
248
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
250
                   lu.cfg.GetMasterNodeName())
251

    
252
      watcher_pause = result.payload
253
    else:
254
      watcher_pause = NotImplemented
255

    
256
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
257

    
258

    
259
class LUClusterQuery(NoHooksLU):
260
  """Query cluster configuration.
261

262
  """
263
  REQ_BGL = False
264

    
265
  def ExpandNames(self):
266
    self.needed_locks = {}
267

    
268
  def Exec(self, feedback_fn):
269
    """Return cluster config.
270

271
    """
272
    cluster = self.cfg.GetClusterInfo()
273
    os_hvp = {}
274

    
275
    # Filter just for enabled hypervisors
276
    for os_name, hv_dict in cluster.os_hvp.items():
277
      os_hvp[os_name] = {}
278
      for hv_name, hv_params in hv_dict.items():
279
        if hv_name in cluster.enabled_hypervisors:
280
          os_hvp[os_name][hv_name] = hv_params
281

    
282
    # Convert ip_family to ip_version
283
    primary_ip_version = constants.IP4_VERSION
284
    if cluster.primary_ip_family == netutils.IP6Address.family:
285
      primary_ip_version = constants.IP6_VERSION
286

    
287
    result = {
288
      "software_version": constants.RELEASE_VERSION,
289
      "protocol_version": constants.PROTOCOL_VERSION,
290
      "config_version": constants.CONFIG_VERSION,
291
      "os_api_version": max(constants.OS_API_VERSIONS),
292
      "export_version": constants.EXPORT_VERSION,
293
      "architecture": runtime.GetArchInfo(),
294
      "name": cluster.cluster_name,
295
      "master": self.cfg.GetMasterNodeName(),
296
      "default_hypervisor": cluster.primary_hypervisor,
297
      "enabled_hypervisors": cluster.enabled_hypervisors,
298
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
299
                        for hypervisor_name in cluster.enabled_hypervisors]),
300
      "os_hvp": os_hvp,
301
      "beparams": cluster.beparams,
302
      "osparams": cluster.osparams,
303
      "ipolicy": cluster.ipolicy,
304
      "nicparams": cluster.nicparams,
305
      "ndparams": cluster.ndparams,
306
      "diskparams": cluster.diskparams,
307
      "candidate_pool_size": cluster.candidate_pool_size,
308
      "master_netdev": cluster.master_netdev,
309
      "master_netmask": cluster.master_netmask,
310
      "use_external_mip_script": cluster.use_external_mip_script,
311
      "volume_group_name": cluster.volume_group_name,
312
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
313
      "file_storage_dir": cluster.file_storage_dir,
314
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
315
      "maintain_node_health": cluster.maintain_node_health,
316
      "ctime": cluster.ctime,
317
      "mtime": cluster.mtime,
318
      "uuid": cluster.uuid,
319
      "tags": list(cluster.GetTags()),
320
      "uid_pool": cluster.uid_pool,
321
      "default_iallocator": cluster.default_iallocator,
322
      "reserved_lvs": cluster.reserved_lvs,
323
      "primary_ip_version": primary_ip_version,
324
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
325
      "hidden_os": cluster.hidden_os,
326
      "blacklisted_os": cluster.blacklisted_os,
327
      }
328

    
329
    return result
330

    
331

    
332
class LUClusterRedistConf(NoHooksLU):
333
  """Force the redistribution of cluster configuration.
334

335
  This is a very simple LU.
336

337
  """
338
  REQ_BGL = False
339

    
340
  def ExpandNames(self):
341
    self.needed_locks = {
342
      locking.LEVEL_NODE: locking.ALL_SET,
343
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
344
    }
345
    self.share_locks = ShareAll()
346

    
347
  def Exec(self, feedback_fn):
348
    """Redistribute the configuration.
349

350
    """
351
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
352
    RedistributeAncillaryFiles(self)
353

    
354

    
355
class LUClusterRename(LogicalUnit):
356
  """Rename the cluster.
357

358
  """
359
  HPATH = "cluster-rename"
360
  HTYPE = constants.HTYPE_CLUSTER
361

    
362
  def BuildHooksEnv(self):
363
    """Build hooks env.
364

365
    """
366
    return {
367
      "OP_TARGET": self.cfg.GetClusterName(),
368
      "NEW_NAME": self.op.name,
369
      }
370

    
371
  def BuildHooksNodes(self):
372
    """Build hooks nodes.
373

374
    """
375
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
376

    
377
  def CheckPrereq(self):
378
    """Verify that the passed name is a valid one.
379

380
    """
381
    hostname = netutils.GetHostname(name=self.op.name,
382
                                    family=self.cfg.GetPrimaryIPFamily())
383

    
384
    new_name = hostname.name
385
    self.ip = new_ip = hostname.ip
386
    old_name = self.cfg.GetClusterName()
387
    old_ip = self.cfg.GetMasterIP()
388
    if new_name == old_name and new_ip == old_ip:
389
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
390
                                 " cluster has changed",
391
                                 errors.ECODE_INVAL)
392
    if new_ip != old_ip:
393
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
394
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
395
                                   " reachable on the network" %
396
                                   new_ip, errors.ECODE_NOTUNIQUE)
397

    
398
    self.op.name = new_name
399

    
400
  def Exec(self, feedback_fn):
401
    """Rename the cluster.
402

403
    """
404
    clustername = self.op.name
405
    new_ip = self.ip
406

    
407
    # shutdown the master IP
408
    master_params = self.cfg.GetMasterNetworkParameters()
409
    ems = self.cfg.GetUseExternalMipScript()
410
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
411
                                                     master_params, ems)
412
    result.Raise("Could not disable the master role")
413

    
414
    try:
415
      cluster = self.cfg.GetClusterInfo()
416
      cluster.cluster_name = clustername
417
      cluster.master_ip = new_ip
418
      self.cfg.Update(cluster, feedback_fn)
419

    
420
      # update the known hosts file
421
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
422
      node_list = self.cfg.GetOnlineNodeList()
423
      try:
424
        node_list.remove(master_params.uuid)
425
      except ValueError:
426
        pass
427
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
428
    finally:
429
      master_params.ip = new_ip
430
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
431
                                                     master_params, ems)
432
      result.Warn("Could not re-enable the master role on the master,"
433
                  " please restart manually", self.LogWarning)
434

    
435
    return clustername
436

    
437

    
438
class LUClusterRepairDiskSizes(NoHooksLU):
439
  """Verifies the cluster disks sizes.
440

441
  """
442
  REQ_BGL = False
443

    
444
  def ExpandNames(self):
445
    if self.op.instances:
446
      self.wanted_names = GetWantedInstances(self, self.op.instances)
447
      # Not getting the node allocation lock as only a specific set of
448
      # instances (and their nodes) is going to be acquired
449
      self.needed_locks = {
450
        locking.LEVEL_NODE_RES: [],
451
        locking.LEVEL_INSTANCE: self.wanted_names,
452
        }
453
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
454
    else:
455
      self.wanted_names = None
456
      self.needed_locks = {
457
        locking.LEVEL_NODE_RES: locking.ALL_SET,
458
        locking.LEVEL_INSTANCE: locking.ALL_SET,
459

    
460
        # This opcode is acquires the node locks for all instances
461
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
462
        }
463

    
464
    self.share_locks = {
465
      locking.LEVEL_NODE_RES: 1,
466
      locking.LEVEL_INSTANCE: 0,
467
      locking.LEVEL_NODE_ALLOC: 1,
468
      }
469

    
470
  def DeclareLocks(self, level):
471
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
472
      self._LockInstancesNodes(primary_only=True, level=level)
473

    
474
  def CheckPrereq(self):
475
    """Check prerequisites.
476

477
    This only checks the optional instance list against the existing names.
478

479
    """
480
    if self.wanted_names is None:
481
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
482

    
483
    self.wanted_instances = \
484
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
485

    
486
  def _EnsureChildSizes(self, disk):
487
    """Ensure children of the disk have the needed disk size.
488

489
    This is valid mainly for DRBD8 and fixes an issue where the
490
    children have smaller disk size.
491

492
    @param disk: an L{ganeti.objects.Disk} object
493

494
    """
495
    if disk.dev_type == constants.LD_DRBD8:
496
      assert disk.children, "Empty children for DRBD8?"
497
      fchild = disk.children[0]
498
      mismatch = fchild.size < disk.size
499
      if mismatch:
500
        self.LogInfo("Child disk has size %d, parent %d, fixing",
501
                     fchild.size, disk.size)
502
        fchild.size = disk.size
503

    
504
      # and we recurse on this child only, not on the metadev
505
      return self._EnsureChildSizes(fchild) or mismatch
506
    else:
507
      return False
508

    
509
  def Exec(self, feedback_fn):
510
    """Verify the size of cluster disks.
511

512
    """
513
    # TODO: check child disks too
514
    # TODO: check differences in size between primary/secondary nodes
515
    per_node_disks = {}
516
    for instance in self.wanted_instances:
517
      pnode = instance.primary_node
518
      if pnode not in per_node_disks:
519
        per_node_disks[pnode] = []
520
      for idx, disk in enumerate(instance.disks):
521
        per_node_disks[pnode].append((instance, idx, disk))
522

    
523
    assert not (frozenset(per_node_disks.keys()) -
524
                self.owned_locks(locking.LEVEL_NODE_RES)), \
525
      "Not owning correct locks"
526
    assert not self.owned_locks(locking.LEVEL_NODE)
527

    
528
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
529
                                               per_node_disks.keys())
530

    
531
    changed = []
532
    for node_uuid, dskl in per_node_disks.items():
533
      newl = [v[2].Copy() for v in dskl]
534
      for dsk in newl:
535
        self.cfg.SetDiskID(dsk, node_uuid)
536
      node_name = self.cfg.GetNodeName(node_uuid)
537
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
538
      if result.fail_msg:
539
        self.LogWarning("Failure in blockdev_getdimensions call to node"
540
                        " %s, ignoring", node_name)
541
        continue
542
      if len(result.payload) != len(dskl):
543
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
544
                        " result.payload=%s", node_name, len(dskl),
545
                        result.payload)
546
        self.LogWarning("Invalid result from node %s, ignoring node results",
547
                        node_name)
548
        continue
549
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
550
        if dimensions is None:
551
          self.LogWarning("Disk %d of instance %s did not return size"
552
                          " information, ignoring", idx, instance.name)
553
          continue
554
        if not isinstance(dimensions, (tuple, list)):
555
          self.LogWarning("Disk %d of instance %s did not return valid"
556
                          " dimension information, ignoring", idx,
557
                          instance.name)
558
          continue
559
        (size, spindles) = dimensions
560
        if not isinstance(size, (int, long)):
561
          self.LogWarning("Disk %d of instance %s did not return valid"
562
                          " size information, ignoring", idx, instance.name)
563
          continue
564
        size = size >> 20
565
        if size != disk.size:
566
          self.LogInfo("Disk %d of instance %s has mismatched size,"
567
                       " correcting: recorded %d, actual %d", idx,
568
                       instance.name, disk.size, size)
569
          disk.size = size
570
          self.cfg.Update(instance, feedback_fn)
571
          changed.append((instance.name, idx, "size", size))
572
        if es_flags[node_uuid]:
573
          if spindles is None:
574
            self.LogWarning("Disk %d of instance %s did not return valid"
575
                            " spindles information, ignoring", idx,
576
                            instance.name)
577
          elif disk.spindles is None or disk.spindles != spindles:
578
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
579
                         " correcting: recorded %s, actual %s",
580
                         idx, instance.name, disk.spindles, spindles)
581
            disk.spindles = spindles
582
            self.cfg.Update(instance, feedback_fn)
583
            changed.append((instance.name, idx, "spindles", disk.spindles))
584
        if self._EnsureChildSizes(disk):
585
          self.cfg.Update(instance, feedback_fn)
586
          changed.append((instance.name, idx, "size", disk.size))
587
    return changed
588

    
589

    
590
def _ValidateNetmask(cfg, netmask):
591
  """Checks if a netmask is valid.
592

593
  @type cfg: L{config.ConfigWriter}
594
  @param cfg: The cluster configuration
595
  @type netmask: int
596
  @param netmask: the netmask to be verified
597
  @raise errors.OpPrereqError: if the validation fails
598

599
  """
600
  ip_family = cfg.GetPrimaryIPFamily()
601
  try:
602
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
603
  except errors.ProgrammerError:
604
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
605
                               ip_family, errors.ECODE_INVAL)
606
  if not ipcls.ValidateNetmask(netmask):
607
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
608
                               (netmask), errors.ECODE_INVAL)
609

    
610

    
611
class LUClusterSetParams(LogicalUnit):
612
  """Change the parameters of the cluster.
613

614
  """
615
  HPATH = "cluster-modify"
616
  HTYPE = constants.HTYPE_CLUSTER
617
  REQ_BGL = False
618

    
619
  def CheckArguments(self):
620
    """Check parameters
621

622
    """
623
    if self.op.uid_pool:
624
      uidpool.CheckUidPool(self.op.uid_pool)
625

    
626
    if self.op.add_uids:
627
      uidpool.CheckUidPool(self.op.add_uids)
628

    
629
    if self.op.remove_uids:
630
      uidpool.CheckUidPool(self.op.remove_uids)
631

    
632
    if self.op.master_netmask is not None:
633
      _ValidateNetmask(self.cfg, self.op.master_netmask)
634

    
635
    if self.op.diskparams:
636
      for dt_params in self.op.diskparams.values():
637
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
638
      try:
639
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
640
      except errors.OpPrereqError, err:
641
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
642
                                   errors.ECODE_INVAL)
643

    
644
  def ExpandNames(self):
645
    # FIXME: in the future maybe other cluster params won't require checking on
646
    # all nodes to be modified.
647
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
648
    # resource locks the right thing, shouldn't it be the BGL instead?
649
    self.needed_locks = {
650
      locking.LEVEL_NODE: locking.ALL_SET,
651
      locking.LEVEL_INSTANCE: locking.ALL_SET,
652
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
653
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
654
    }
655
    self.share_locks = ShareAll()
656

    
657
  def BuildHooksEnv(self):
658
    """Build hooks env.
659

660
    """
661
    return {
662
      "OP_TARGET": self.cfg.GetClusterName(),
663
      "NEW_VG_NAME": self.op.vg_name,
664
      }
665

    
666
  def BuildHooksNodes(self):
667
    """Build hooks nodes.
668

669
    """
670
    mn = self.cfg.GetMasterNode()
671
    return ([mn], [mn])
672

    
673
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
674
                   new_enabled_disk_templates):
675
    """Check the consistency of the vg name on all nodes and in case it gets
676
       unset whether there are instances still using it.
677

678
    """
679
    if self.op.vg_name is not None and not self.op.vg_name:
680
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
681
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
682
                                   " instances exist", errors.ECODE_INVAL)
683

    
684
    if (self.op.vg_name is not None and
685
        utils.IsLvmEnabled(enabled_disk_templates)) or \
686
           (self.cfg.GetVGName() is not None and
687
            utils.LvmGetsEnabled(enabled_disk_templates,
688
                                 new_enabled_disk_templates)):
689
      self._CheckVgNameOnNodes(node_uuids)
690

    
691
  def _CheckVgNameOnNodes(self, node_uuids):
692
    """Check the status of the volume group on each node.
693

694
    """
695
    vglist = self.rpc.call_vg_list(node_uuids)
696
    for node_uuid in node_uuids:
697
      msg = vglist[node_uuid].fail_msg
698
      if msg:
699
        # ignoring down node
700
        self.LogWarning("Error while gathering data on node %s"
701
                        " (ignoring node): %s",
702
                        self.cfg.GetNodeName(node_uuid), msg)
703
        continue
704
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
705
                                            self.op.vg_name,
706
                                            constants.MIN_VG_SIZE)
707
      if vgstatus:
708
        raise errors.OpPrereqError("Error on node '%s': %s" %
709
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
710
                                   errors.ECODE_ENVIRON)
711

    
712
  def _GetEnabledDiskTemplates(self, cluster):
713
    """Determines the enabled disk templates and the subset of disk templates
714
       that are newly enabled by this operation.
715

716
    """
717
    enabled_disk_templates = None
718
    new_enabled_disk_templates = []
719
    if self.op.enabled_disk_templates:
720
      enabled_disk_templates = self.op.enabled_disk_templates
721
      new_enabled_disk_templates = \
722
        list(set(enabled_disk_templates)
723
             - set(cluster.enabled_disk_templates))
724
    else:
725
      enabled_disk_templates = cluster.enabled_disk_templates
726
    return (enabled_disk_templates, new_enabled_disk_templates)
727

    
728
  def CheckPrereq(self):
729
    """Check prerequisites.
730

731
    This checks whether the given params don't conflict and
732
    if the given volume group is valid.
733

734
    """
735
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
736
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
737
        raise errors.OpPrereqError("Cannot disable drbd helper while"
738
                                   " drbd-based instances exist",
739
                                   errors.ECODE_INVAL)
740

    
741
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
742
    self.cluster = cluster = self.cfg.GetClusterInfo()
743

    
744
    vm_capable_node_uuids = [node.uuid
745
                             for node in self.cfg.GetAllNodesInfo().values()
746
                             if node.uuid in node_uuids and node.vm_capable]
747

    
748
    (enabled_disk_templates, new_enabled_disk_templates) = \
749
      self._GetEnabledDiskTemplates(cluster)
750

    
751
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
752
                      new_enabled_disk_templates)
753

    
754
    if self.op.drbd_helper:
755
      # checks given drbd helper on all nodes
756
      helpers = self.rpc.call_drbd_helper(node_uuids)
757
      for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
758
        if ninfo.offline:
759
          self.LogInfo("Not checking drbd helper on offline node %s",
760
                       ninfo.name)
761
          continue
762
        msg = helpers[ninfo.uuid].fail_msg
763
        if msg:
764
          raise errors.OpPrereqError("Error checking drbd helper on node"
765
                                     " '%s': %s" % (ninfo.name, msg),
766
                                     errors.ECODE_ENVIRON)
767
        node_helper = helpers[ninfo.uuid].payload
768
        if node_helper != self.op.drbd_helper:
769
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
770
                                     (ninfo.name, node_helper),
771
                                     errors.ECODE_ENVIRON)
772

    
773
    # validate params changes
774
    if self.op.beparams:
775
      objects.UpgradeBeParams(self.op.beparams)
776
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
777
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
778

    
779
    if self.op.ndparams:
780
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
781
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
782

    
783
      # TODO: we need a more general way to handle resetting
784
      # cluster-level parameters to default values
785
      if self.new_ndparams["oob_program"] == "":
786
        self.new_ndparams["oob_program"] = \
787
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
788

    
789
    if self.op.hv_state:
790
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
791
                                           self.cluster.hv_state_static)
792
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
793
                               for hv, values in new_hv_state.items())
794

    
795
    if self.op.disk_state:
796
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
797
                                               self.cluster.disk_state_static)
798
      self.new_disk_state = \
799
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
800
                            for name, values in svalues.items()))
801
             for storage, svalues in new_disk_state.items())
802

    
803
    if self.op.ipolicy:
804
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
805
                                           group_policy=False)
806

    
807
      all_instances = self.cfg.GetAllInstancesInfo().values()
808
      violations = set()
809
      for group in self.cfg.GetAllNodeGroupsInfo().values():
810
        instances = frozenset([inst for inst in all_instances
811
                               if compat.any(nuuid in group.members
812
                                             for nuuid in inst.all_nodes)])
813
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
814
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
815
        new = ComputeNewInstanceViolations(ipol,
816
                                           new_ipolicy, instances, self.cfg)
817
        if new:
818
          violations.update(new)
819

    
820
      if violations:
821
        self.LogWarning("After the ipolicy change the following instances"
822
                        " violate them: %s",
823
                        utils.CommaJoin(utils.NiceSort(violations)))
824

    
825
    if self.op.nicparams:
826
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
827
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
828
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
829
      nic_errors = []
830

    
831
      # check all instances for consistency
832
      for instance in self.cfg.GetAllInstancesInfo().values():
833
        for nic_idx, nic in enumerate(instance.nics):
834
          params_copy = copy.deepcopy(nic.nicparams)
835
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
836

    
837
          # check parameter syntax
838
          try:
839
            objects.NIC.CheckParameterSyntax(params_filled)
840
          except errors.ConfigurationError, err:
841
            nic_errors.append("Instance %s, nic/%d: %s" %
842
                              (instance.name, nic_idx, err))
843

    
844
          # if we're moving instances to routed, check that they have an ip
845
          target_mode = params_filled[constants.NIC_MODE]
846
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
847
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
848
                              " address" % (instance.name, nic_idx))
849
      if nic_errors:
850
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
851
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
852

    
853
    # hypervisor list/parameters
854
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
855
    if self.op.hvparams:
856
      for hv_name, hv_dict in self.op.hvparams.items():
857
        if hv_name not in self.new_hvparams:
858
          self.new_hvparams[hv_name] = hv_dict
859
        else:
860
          self.new_hvparams[hv_name].update(hv_dict)
861

    
862
    # disk template parameters
863
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
864
    if self.op.diskparams:
865
      for dt_name, dt_params in self.op.diskparams.items():
866
        if dt_name not in self.op.diskparams:
867
          self.new_diskparams[dt_name] = dt_params
868
        else:
869
          self.new_diskparams[dt_name].update(dt_params)
870

    
871
    # os hypervisor parameters
872
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
873
    if self.op.os_hvp:
874
      for os_name, hvs in self.op.os_hvp.items():
875
        if os_name not in self.new_os_hvp:
876
          self.new_os_hvp[os_name] = hvs
877
        else:
878
          for hv_name, hv_dict in hvs.items():
879
            if hv_dict is None:
880
              # Delete if it exists
881
              self.new_os_hvp[os_name].pop(hv_name, None)
882
            elif hv_name not in self.new_os_hvp[os_name]:
883
              self.new_os_hvp[os_name][hv_name] = hv_dict
884
            else:
885
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
886

    
887
    # os parameters
888
    self.new_osp = objects.FillDict(cluster.osparams, {})
889
    if self.op.osparams:
890
      for os_name, osp in self.op.osparams.items():
891
        if os_name not in self.new_osp:
892
          self.new_osp[os_name] = {}
893

    
894
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
895
                                                 use_none=True)
896

    
897
        if not self.new_osp[os_name]:
898
          # we removed all parameters
899
          del self.new_osp[os_name]
900
        else:
901
          # check the parameter validity (remote check)
902
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
903
                        os_name, self.new_osp[os_name])
904

    
905
    # changes to the hypervisor list
906
    if self.op.enabled_hypervisors is not None:
907
      self.hv_list = self.op.enabled_hypervisors
908
      for hv in self.hv_list:
909
        # if the hypervisor doesn't already exist in the cluster
910
        # hvparams, we initialize it to empty, and then (in both
911
        # cases) we make sure to fill the defaults, as we might not
912
        # have a complete defaults list if the hypervisor wasn't
913
        # enabled before
914
        if hv not in new_hvp:
915
          new_hvp[hv] = {}
916
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
917
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
918
    else:
919
      self.hv_list = cluster.enabled_hypervisors
920

    
921
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
922
      # either the enabled list has changed, or the parameters have, validate
923
      for hv_name, hv_params in self.new_hvparams.items():
924
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
925
            (self.op.enabled_hypervisors and
926
             hv_name in self.op.enabled_hypervisors)):
927
          # either this is a new hypervisor, or its parameters have changed
928
          hv_class = hypervisor.GetHypervisorClass(hv_name)
929
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
930
          hv_class.CheckParameterSyntax(hv_params)
931
          CheckHVParams(self, node_uuids, hv_name, hv_params)
932

    
933
    self._CheckDiskTemplateConsistency()
934

    
935
    if self.op.os_hvp:
936
      # no need to check any newly-enabled hypervisors, since the
937
      # defaults have already been checked in the above code-block
938
      for os_name, os_hvp in self.new_os_hvp.items():
939
        for hv_name, hv_params in os_hvp.items():
940
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
941
          # we need to fill in the new os_hvp on top of the actual hv_p
942
          cluster_defaults = self.new_hvparams.get(hv_name, {})
943
          new_osp = objects.FillDict(cluster_defaults, hv_params)
944
          hv_class = hypervisor.GetHypervisorClass(hv_name)
945
          hv_class.CheckParameterSyntax(new_osp)
946
          CheckHVParams(self, node_uuids, hv_name, new_osp)
947

    
948
    if self.op.default_iallocator:
949
      alloc_script = utils.FindFile(self.op.default_iallocator,
950
                                    constants.IALLOCATOR_SEARCH_PATH,
951
                                    os.path.isfile)
952
      if alloc_script is None:
953
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
954
                                   " specified" % self.op.default_iallocator,
955
                                   errors.ECODE_INVAL)
956

    
957
  def _CheckDiskTemplateConsistency(self):
958
    """Check whether the disk templates that are going to be disabled
959
       are still in use by some instances.
960

961
    """
962
    if self.op.enabled_disk_templates:
963
      cluster = self.cfg.GetClusterInfo()
964
      instances = self.cfg.GetAllInstancesInfo()
965

    
966
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
967
        - set(self.op.enabled_disk_templates)
968
      for instance in instances.itervalues():
969
        if instance.disk_template in disk_templates_to_remove:
970
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
971
                                     " because instance '%s' is using it." %
972
                                     (instance.disk_template, instance.name))
973

    
974
  def _SetVgName(self, feedback_fn):
975
    """Determines and sets the new volume group name.
976

977
    """
978
    if self.op.vg_name is not None:
979
      if self.op.vg_name and not \
980
           utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
981
        feedback_fn("Note that you specified a volume group, but did not"
982
                    " enable any lvm disk template.")
983
      new_volume = self.op.vg_name
984
      if not new_volume:
985
        if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
986
          raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
987
                                     " disk templates are enabled.")
988
        new_volume = None
989
      if new_volume != self.cfg.GetVGName():
990
        self.cfg.SetVGName(new_volume)
991
      else:
992
        feedback_fn("Cluster LVM configuration already in desired"
993
                    " state, not changing")
994
    else:
995
      if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
996
          not self.cfg.GetVGName():
997
        raise errors.OpPrereqError("Please specify a volume group when"
998
                                   " enabling lvm-based disk-templates.")
999

    
1000
  def Exec(self, feedback_fn):
1001
    """Change the parameters of the cluster.
1002

1003
    """
1004
    if self.op.enabled_disk_templates:
1005
      self.cluster.enabled_disk_templates = \
1006
        list(set(self.op.enabled_disk_templates))
1007

    
1008
    self._SetVgName(feedback_fn)
1009

    
1010
    if self.op.drbd_helper is not None:
1011
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1012
        feedback_fn("Note that you specified a drbd user helper, but did"
1013
                    " enabled the drbd disk template.")
1014
      new_helper = self.op.drbd_helper
1015
      if not new_helper:
1016
        new_helper = None
1017
      if new_helper != self.cfg.GetDRBDHelper():
1018
        self.cfg.SetDRBDHelper(new_helper)
1019
      else:
1020
        feedback_fn("Cluster DRBD helper already in desired state,"
1021
                    " not changing")
1022
    if self.op.hvparams:
1023
      self.cluster.hvparams = self.new_hvparams
1024
    if self.op.os_hvp:
1025
      self.cluster.os_hvp = self.new_os_hvp
1026
    if self.op.enabled_hypervisors is not None:
1027
      self.cluster.hvparams = self.new_hvparams
1028
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1029
    if self.op.beparams:
1030
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1031
    if self.op.nicparams:
1032
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1033
    if self.op.ipolicy:
1034
      self.cluster.ipolicy = self.new_ipolicy
1035
    if self.op.osparams:
1036
      self.cluster.osparams = self.new_osp
1037
    if self.op.ndparams:
1038
      self.cluster.ndparams = self.new_ndparams
1039
    if self.op.diskparams:
1040
      self.cluster.diskparams = self.new_diskparams
1041
    if self.op.hv_state:
1042
      self.cluster.hv_state_static = self.new_hv_state
1043
    if self.op.disk_state:
1044
      self.cluster.disk_state_static = self.new_disk_state
1045

    
1046
    if self.op.candidate_pool_size is not None:
1047
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1048
      # we need to update the pool size here, otherwise the save will fail
1049
      AdjustCandidatePool(self, [])
1050

    
1051
    if self.op.maintain_node_health is not None:
1052
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1053
        feedback_fn("Note: CONFD was disabled at build time, node health"
1054
                    " maintenance is not useful (still enabling it)")
1055
      self.cluster.maintain_node_health = self.op.maintain_node_health
1056

    
1057
    if self.op.prealloc_wipe_disks is not None:
1058
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1059

    
1060
    if self.op.add_uids is not None:
1061
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1062

    
1063
    if self.op.remove_uids is not None:
1064
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1065

    
1066
    if self.op.uid_pool is not None:
1067
      self.cluster.uid_pool = self.op.uid_pool
1068

    
1069
    if self.op.default_iallocator is not None:
1070
      self.cluster.default_iallocator = self.op.default_iallocator
1071

    
1072
    if self.op.reserved_lvs is not None:
1073
      self.cluster.reserved_lvs = self.op.reserved_lvs
1074

    
1075
    if self.op.use_external_mip_script is not None:
1076
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1077

    
1078
    def helper_os(aname, mods, desc):
1079
      desc += " OS list"
1080
      lst = getattr(self.cluster, aname)
1081
      for key, val in mods:
1082
        if key == constants.DDM_ADD:
1083
          if val in lst:
1084
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1085
          else:
1086
            lst.append(val)
1087
        elif key == constants.DDM_REMOVE:
1088
          if val in lst:
1089
            lst.remove(val)
1090
          else:
1091
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1092
        else:
1093
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1094

    
1095
    if self.op.hidden_os:
1096
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1097

    
1098
    if self.op.blacklisted_os:
1099
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1100

    
1101
    if self.op.master_netdev:
1102
      master_params = self.cfg.GetMasterNetworkParameters()
1103
      ems = self.cfg.GetUseExternalMipScript()
1104
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1105
                  self.cluster.master_netdev)
1106
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1107
                                                       master_params, ems)
1108
      if not self.op.force:
1109
        result.Raise("Could not disable the master ip")
1110
      else:
1111
        if result.fail_msg:
1112
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1113
                 result.fail_msg)
1114
          feedback_fn(msg)
1115
      feedback_fn("Changing master_netdev from %s to %s" %
1116
                  (master_params.netdev, self.op.master_netdev))
1117
      self.cluster.master_netdev = self.op.master_netdev
1118

    
1119
    if self.op.master_netmask:
1120
      master_params = self.cfg.GetMasterNetworkParameters()
1121
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1122
      result = self.rpc.call_node_change_master_netmask(
1123
                 master_params.uuid, master_params.netmask,
1124
                 self.op.master_netmask, master_params.ip,
1125
                 master_params.netdev)
1126
      result.Warn("Could not change the master IP netmask", feedback_fn)
1127
      self.cluster.master_netmask = self.op.master_netmask
1128

    
1129
    self.cfg.Update(self.cluster, feedback_fn)
1130

    
1131
    if self.op.master_netdev:
1132
      master_params = self.cfg.GetMasterNetworkParameters()
1133
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1134
                  self.op.master_netdev)
1135
      ems = self.cfg.GetUseExternalMipScript()
1136
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1137
                                                     master_params, ems)
1138
      result.Warn("Could not re-enable the master ip on the master,"
1139
                  " please restart manually", self.LogWarning)
1140

    
1141

    
1142
class LUClusterVerify(NoHooksLU):
1143
  """Submits all jobs necessary to verify the cluster.
1144

1145
  """
1146
  REQ_BGL = False
1147

    
1148
  def ExpandNames(self):
1149
    self.needed_locks = {}
1150

    
1151
  def Exec(self, feedback_fn):
1152
    jobs = []
1153

    
1154
    if self.op.group_name:
1155
      groups = [self.op.group_name]
1156
      depends_fn = lambda: None
1157
    else:
1158
      groups = self.cfg.GetNodeGroupList()
1159

    
1160
      # Verify global configuration
1161
      jobs.append([
1162
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1163
        ])
1164

    
1165
      # Always depend on global verification
1166
      depends_fn = lambda: [(-len(jobs), [])]
1167

    
1168
    jobs.extend(
1169
      [opcodes.OpClusterVerifyGroup(group_name=group,
1170
                                    ignore_errors=self.op.ignore_errors,
1171
                                    depends=depends_fn())]
1172
      for group in groups)
1173

    
1174
    # Fix up all parameters
1175
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1176
      op.debug_simulate_errors = self.op.debug_simulate_errors
1177
      op.verbose = self.op.verbose
1178
      op.error_codes = self.op.error_codes
1179
      try:
1180
        op.skip_checks = self.op.skip_checks
1181
      except AttributeError:
1182
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1183

    
1184
    return ResultWithJobs(jobs)
1185

    
1186

    
1187
class _VerifyErrors(object):
1188
  """Mix-in for cluster/group verify LUs.
1189

1190
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1191
  self.op and self._feedback_fn to be available.)
1192

1193
  """
1194

    
1195
  ETYPE_FIELD = "code"
1196
  ETYPE_ERROR = "ERROR"
1197
  ETYPE_WARNING = "WARNING"
1198

    
1199
  def _Error(self, ecode, item, msg, *args, **kwargs):
1200
    """Format an error message.
1201

1202
    Based on the opcode's error_codes parameter, either format a
1203
    parseable error code, or a simpler error string.
1204

1205
    This must be called only from Exec and functions called from Exec.
1206

1207
    """
1208
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1209
    itype, etxt, _ = ecode
1210
    # If the error code is in the list of ignored errors, demote the error to a
1211
    # warning
1212
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1213
      ltype = self.ETYPE_WARNING
1214
    # first complete the msg
1215
    if args:
1216
      msg = msg % args
1217
    # then format the whole message
1218
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1219
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1220
    else:
1221
      if item:
1222
        item = " " + item
1223
      else:
1224
        item = ""
1225
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1226
    # and finally report it via the feedback_fn
1227
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1228
    # do not mark the operation as failed for WARN cases only
1229
    if ltype == self.ETYPE_ERROR:
1230
      self.bad = True
1231

    
1232
  def _ErrorIf(self, cond, *args, **kwargs):
1233
    """Log an error message if the passed condition is True.
1234

1235
    """
1236
    if (bool(cond)
1237
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1238
      self._Error(*args, **kwargs)
1239

    
1240

    
1241
def _VerifyCertificate(filename):
1242
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1243

1244
  @type filename: string
1245
  @param filename: Path to PEM file
1246

1247
  """
1248
  try:
1249
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1250
                                           utils.ReadFile(filename))
1251
  except Exception, err: # pylint: disable=W0703
1252
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1253
            "Failed to load X509 certificate %s: %s" % (filename, err))
1254

    
1255
  (errcode, msg) = \
1256
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1257
                                constants.SSL_CERT_EXPIRATION_ERROR)
1258

    
1259
  if msg:
1260
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1261
  else:
1262
    fnamemsg = None
1263

    
1264
  if errcode is None:
1265
    return (None, fnamemsg)
1266
  elif errcode == utils.CERT_WARNING:
1267
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1268
  elif errcode == utils.CERT_ERROR:
1269
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1270

    
1271
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1272

    
1273

    
1274
def _GetAllHypervisorParameters(cluster, instances):
1275
  """Compute the set of all hypervisor parameters.
1276

1277
  @type cluster: L{objects.Cluster}
1278
  @param cluster: the cluster object
1279
  @param instances: list of L{objects.Instance}
1280
  @param instances: additional instances from which to obtain parameters
1281
  @rtype: list of (origin, hypervisor, parameters)
1282
  @return: a list with all parameters found, indicating the hypervisor they
1283
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1284

1285
  """
1286
  hvp_data = []
1287

    
1288
  for hv_name in cluster.enabled_hypervisors:
1289
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1290

    
1291
  for os_name, os_hvp in cluster.os_hvp.items():
1292
    for hv_name, hv_params in os_hvp.items():
1293
      if hv_params:
1294
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1295
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1296

    
1297
  # TODO: collapse identical parameter values in a single one
1298
  for instance in instances:
1299
    if instance.hvparams:
1300
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1301
                       cluster.FillHV(instance)))
1302

    
1303
  return hvp_data
1304

    
1305

    
1306
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1307
  """Verifies the cluster config.
1308

1309
  """
1310
  REQ_BGL = False
1311

    
1312
  def _VerifyHVP(self, hvp_data):
1313
    """Verifies locally the syntax of the hypervisor parameters.
1314

1315
    """
1316
    for item, hv_name, hv_params in hvp_data:
1317
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1318
             (item, hv_name))
1319
      try:
1320
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1321
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1322
        hv_class.CheckParameterSyntax(hv_params)
1323
      except errors.GenericError, err:
1324
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1325

    
1326
  def ExpandNames(self):
1327
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1328
    self.share_locks = ShareAll()
1329

    
1330
  def CheckPrereq(self):
1331
    """Check prerequisites.
1332

1333
    """
1334
    # Retrieve all information
1335
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1336
    self.all_node_info = self.cfg.GetAllNodesInfo()
1337
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1338

    
1339
  def Exec(self, feedback_fn):
1340
    """Verify integrity of cluster, performing various test on nodes.
1341

1342
    """
1343
    self.bad = False
1344
    self._feedback_fn = feedback_fn
1345

    
1346
    feedback_fn("* Verifying cluster config")
1347

    
1348
    for msg in self.cfg.VerifyConfig():
1349
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1350

    
1351
    feedback_fn("* Verifying cluster certificate files")
1352

    
1353
    for cert_filename in pathutils.ALL_CERT_FILES:
1354
      (errcode, msg) = _VerifyCertificate(cert_filename)
1355
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1356

    
1357
    feedback_fn("* Verifying hypervisor parameters")
1358

    
1359
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1360
                                                self.all_inst_info.values()))
1361

    
1362
    feedback_fn("* Verifying all nodes belong to an existing group")
1363

    
1364
    # We do this verification here because, should this bogus circumstance
1365
    # occur, it would never be caught by VerifyGroup, which only acts on
1366
    # nodes/instances reachable from existing node groups.
1367

    
1368
    dangling_nodes = set(node for node in self.all_node_info.values()
1369
                         if node.group not in self.all_group_info)
1370

    
1371
    dangling_instances = {}
1372
    no_node_instances = []
1373

    
1374
    for inst in self.all_inst_info.values():
1375
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1376
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1377
      elif inst.primary_node not in self.all_node_info:
1378
        no_node_instances.append(inst.name)
1379

    
1380
    pretty_dangling = [
1381
        "%s (%s)" %
1382
        (node.name,
1383
         utils.CommaJoin(dangling_instances.get(node.uuid,
1384
                                                ["no instances"])))
1385
        for node in dangling_nodes]
1386

    
1387
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1388
                  None,
1389
                  "the following nodes (and their instances) belong to a non"
1390
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1391

    
1392
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1393
                  None,
1394
                  "the following instances have a non-existing primary-node:"
1395
                  " %s", utils.CommaJoin(no_node_instances))
1396

    
1397
    return not self.bad
1398

    
1399

    
1400
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1401
  """Verifies the status of a node group.
1402

1403
  """
1404
  HPATH = "cluster-verify"
1405
  HTYPE = constants.HTYPE_CLUSTER
1406
  REQ_BGL = False
1407

    
1408
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1409

    
1410
  class NodeImage(object):
1411
    """A class representing the logical and physical status of a node.
1412

1413
    @type uuid: string
1414
    @ivar uuid: the node UUID to which this object refers
1415
    @ivar volumes: a structure as returned from
1416
        L{ganeti.backend.GetVolumeList} (runtime)
1417
    @ivar instances: a list of running instances (runtime)
1418
    @ivar pinst: list of configured primary instances (config)
1419
    @ivar sinst: list of configured secondary instances (config)
1420
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1421
        instances for which this node is secondary (config)
1422
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1423
    @ivar dfree: free disk, as reported by the node (runtime)
1424
    @ivar offline: the offline status (config)
1425
    @type rpc_fail: boolean
1426
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1427
        not whether the individual keys were correct) (runtime)
1428
    @type lvm_fail: boolean
1429
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1430
    @type hyp_fail: boolean
1431
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1432
    @type ghost: boolean
1433
    @ivar ghost: whether this is a known node or not (config)
1434
    @type os_fail: boolean
1435
    @ivar os_fail: whether the RPC call didn't return valid OS data
1436
    @type oslist: list
1437
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1438
    @type vm_capable: boolean
1439
    @ivar vm_capable: whether the node can host instances
1440
    @type pv_min: float
1441
    @ivar pv_min: size in MiB of the smallest PVs
1442
    @type pv_max: float
1443
    @ivar pv_max: size in MiB of the biggest PVs
1444

1445
    """
1446
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1447
      self.uuid = uuid
1448
      self.volumes = {}
1449
      self.instances = []
1450
      self.pinst = []
1451
      self.sinst = []
1452
      self.sbp = {}
1453
      self.mfree = 0
1454
      self.dfree = 0
1455
      self.offline = offline
1456
      self.vm_capable = vm_capable
1457
      self.rpc_fail = False
1458
      self.lvm_fail = False
1459
      self.hyp_fail = False
1460
      self.ghost = False
1461
      self.os_fail = False
1462
      self.oslist = {}
1463
      self.pv_min = None
1464
      self.pv_max = None
1465

    
1466
  def ExpandNames(self):
1467
    # This raises errors.OpPrereqError on its own:
1468
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1469

    
1470
    # Get instances in node group; this is unsafe and needs verification later
1471
    inst_names = \
1472
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1473

    
1474
    self.needed_locks = {
1475
      locking.LEVEL_INSTANCE: inst_names,
1476
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1477
      locking.LEVEL_NODE: [],
1478

    
1479
      # This opcode is run by watcher every five minutes and acquires all nodes
1480
      # for a group. It doesn't run for a long time, so it's better to acquire
1481
      # the node allocation lock as well.
1482
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1483
      }
1484

    
1485
    self.share_locks = ShareAll()
1486

    
1487
  def DeclareLocks(self, level):
1488
    if level == locking.LEVEL_NODE:
1489
      # Get members of node group; this is unsafe and needs verification later
1490
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1491

    
1492
      all_inst_info = self.cfg.GetAllInstancesInfo()
1493

    
1494
      # In Exec(), we warn about mirrored instances that have primary and
1495
      # secondary living in separate node groups. To fully verify that
1496
      # volumes for these instances are healthy, we will need to do an
1497
      # extra call to their secondaries. We ensure here those nodes will
1498
      # be locked.
1499
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1500
        # Important: access only the instances whose lock is owned
1501
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1502
          nodes.update(all_inst_info[inst].secondary_nodes)
1503

    
1504
      self.needed_locks[locking.LEVEL_NODE] = nodes
1505

    
1506
  def CheckPrereq(self):
1507
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1508
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1509

    
1510
    group_node_uuids = set(self.group_info.members)
1511
    group_instances = \
1512
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1513

    
1514
    unlocked_node_uuids = \
1515
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1516

    
1517
    unlocked_instances = \
1518
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1519

    
1520
    if unlocked_node_uuids:
1521
      raise errors.OpPrereqError(
1522
        "Missing lock for nodes: %s" %
1523
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1524
        errors.ECODE_STATE)
1525

    
1526
    if unlocked_instances:
1527
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1528
                                 utils.CommaJoin(unlocked_instances),
1529
                                 errors.ECODE_STATE)
1530

    
1531
    self.all_node_info = self.cfg.GetAllNodesInfo()
1532
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1533

    
1534
    self.my_node_uuids = group_node_uuids
1535
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1536
                             for node_uuid in group_node_uuids)
1537

    
1538
    self.my_inst_names = utils.NiceSort(group_instances)
1539
    self.my_inst_info = dict((name, self.all_inst_info[name])
1540
                             for name in self.my_inst_names)
1541

    
1542
    # We detect here the nodes that will need the extra RPC calls for verifying
1543
    # split LV volumes; they should be locked.
1544
    extra_lv_nodes = set()
1545

    
1546
    for inst in self.my_inst_info.values():
1547
      if inst.disk_template in constants.DTS_INT_MIRROR:
1548
        for nuuid in inst.all_nodes:
1549
          if self.all_node_info[nuuid].group != self.group_uuid:
1550
            extra_lv_nodes.add(nuuid)
1551

    
1552
    unlocked_lv_nodes = \
1553
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1554

    
1555
    if unlocked_lv_nodes:
1556
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1557
                                 utils.CommaJoin(unlocked_lv_nodes),
1558
                                 errors.ECODE_STATE)
1559
    self.extra_lv_nodes = list(extra_lv_nodes)
1560

    
1561
  def _VerifyNode(self, ninfo, nresult):
1562
    """Perform some basic validation on data returned from a node.
1563

1564
      - check the result data structure is well formed and has all the
1565
        mandatory fields
1566
      - check ganeti version
1567

1568
    @type ninfo: L{objects.Node}
1569
    @param ninfo: the node to check
1570
    @param nresult: the results from the node
1571
    @rtype: boolean
1572
    @return: whether overall this call was successful (and we can expect
1573
         reasonable values in the respose)
1574

1575
    """
1576
    # main result, nresult should be a non-empty dict
1577
    test = not nresult or not isinstance(nresult, dict)
1578
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1579
                  "unable to verify node: no data returned")
1580
    if test:
1581
      return False
1582

    
1583
    # compares ganeti version
1584
    local_version = constants.PROTOCOL_VERSION
1585
    remote_version = nresult.get("version", None)
1586
    test = not (remote_version and
1587
                isinstance(remote_version, (list, tuple)) and
1588
                len(remote_version) == 2)
1589
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1590
                  "connection to node returned invalid data")
1591
    if test:
1592
      return False
1593

    
1594
    test = local_version != remote_version[0]
1595
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1596
                  "incompatible protocol versions: master %s,"
1597
                  " node %s", local_version, remote_version[0])
1598
    if test:
1599
      return False
1600

    
1601
    # node seems compatible, we can actually try to look into its results
1602

    
1603
    # full package version
1604
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1605
                  constants.CV_ENODEVERSION, ninfo.name,
1606
                  "software version mismatch: master %s, node %s",
1607
                  constants.RELEASE_VERSION, remote_version[1],
1608
                  code=self.ETYPE_WARNING)
1609

    
1610
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1611
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1612
      for hv_name, hv_result in hyp_result.iteritems():
1613
        test = hv_result is not None
1614
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1615
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1616

    
1617
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1618
    if ninfo.vm_capable and isinstance(hvp_result, list):
1619
      for item, hv_name, hv_result in hvp_result:
1620
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1621
                      "hypervisor %s parameter verify failure (source %s): %s",
1622
                      hv_name, item, hv_result)
1623

    
1624
    test = nresult.get(constants.NV_NODESETUP,
1625
                       ["Missing NODESETUP results"])
1626
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1627
                  "node setup error: %s", "; ".join(test))
1628

    
1629
    return True
1630

    
1631
  def _VerifyNodeTime(self, ninfo, nresult,
1632
                      nvinfo_starttime, nvinfo_endtime):
1633
    """Check the node time.
1634

1635
    @type ninfo: L{objects.Node}
1636
    @param ninfo: the node to check
1637
    @param nresult: the remote results for the node
1638
    @param nvinfo_starttime: the start time of the RPC call
1639
    @param nvinfo_endtime: the end time of the RPC call
1640

1641
    """
1642
    ntime = nresult.get(constants.NV_TIME, None)
1643
    try:
1644
      ntime_merged = utils.MergeTime(ntime)
1645
    except (ValueError, TypeError):
1646
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1647
                    "Node returned invalid time")
1648
      return
1649

    
1650
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1651
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1652
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1653
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1654
    else:
1655
      ntime_diff = None
1656

    
1657
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1658
                  "Node time diverges by at least %s from master node time",
1659
                  ntime_diff)
1660

    
1661
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1662
    """Check the node LVM results and update info for cross-node checks.
1663

1664
    @type ninfo: L{objects.Node}
1665
    @param ninfo: the node to check
1666
    @param nresult: the remote results for the node
1667
    @param vg_name: the configured VG name
1668
    @type nimg: L{NodeImage}
1669
    @param nimg: node image
1670

1671
    """
1672
    if vg_name is None:
1673
      return
1674

    
1675
    # checks vg existence and size > 20G
1676
    vglist = nresult.get(constants.NV_VGLIST, None)
1677
    test = not vglist
1678
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1679
                  "unable to check volume groups")
1680
    if not test:
1681
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1682
                                            constants.MIN_VG_SIZE)
1683
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1684

    
1685
    # Check PVs
1686
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1687
    for em in errmsgs:
1688
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1689
    if pvminmax is not None:
1690
      (nimg.pv_min, nimg.pv_max) = pvminmax
1691

    
1692
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1693
    """Check cross-node DRBD version consistency.
1694

1695
    @type node_verify_infos: dict
1696
    @param node_verify_infos: infos about nodes as returned from the
1697
      node_verify call.
1698

1699
    """
1700
    node_versions = {}
1701
    for node_uuid, ndata in node_verify_infos.items():
1702
      nresult = ndata.payload
1703
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1704
      node_versions[node_uuid] = version
1705

    
1706
    if len(set(node_versions.values())) > 1:
1707
      for node_uuid, version in sorted(node_versions.items()):
1708
        msg = "DRBD version mismatch: %s" % version
1709
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1710
                    code=self.ETYPE_WARNING)
1711

    
1712
  def _VerifyGroupLVM(self, node_image, vg_name):
1713
    """Check cross-node consistency in LVM.
1714

1715
    @type node_image: dict
1716
    @param node_image: info about nodes, mapping from node to names to
1717
      L{NodeImage} objects
1718
    @param vg_name: the configured VG name
1719

1720
    """
1721
    if vg_name is None:
1722
      return
1723

    
1724
    # Only exclusive storage needs this kind of checks
1725
    if not self._exclusive_storage:
1726
      return
1727

    
1728
    # exclusive_storage wants all PVs to have the same size (approximately),
1729
    # if the smallest and the biggest ones are okay, everything is fine.
1730
    # pv_min is None iff pv_max is None
1731
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1732
    if not vals:
1733
      return
1734
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1735
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1736
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1737
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1738
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1739
                  " on %s, biggest (%s MB) is on %s",
1740
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1741
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1742

    
1743
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1744
    """Check the node bridges.
1745

1746
    @type ninfo: L{objects.Node}
1747
    @param ninfo: the node to check
1748
    @param nresult: the remote results for the node
1749
    @param bridges: the expected list of bridges
1750

1751
    """
1752
    if not bridges:
1753
      return
1754

    
1755
    missing = nresult.get(constants.NV_BRIDGES, None)
1756
    test = not isinstance(missing, list)
1757
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1758
                  "did not return valid bridge information")
1759
    if not test:
1760
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1761
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1762

    
1763
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1764
    """Check the results of user scripts presence and executability on the node
1765

1766
    @type ninfo: L{objects.Node}
1767
    @param ninfo: the node to check
1768
    @param nresult: the remote results for the node
1769

1770
    """
1771
    test = not constants.NV_USERSCRIPTS in nresult
1772
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1773
                  "did not return user scripts information")
1774

    
1775
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1776
    if not test:
1777
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1778
                    "user scripts not present or not executable: %s" %
1779
                    utils.CommaJoin(sorted(broken_scripts)))
1780

    
1781
  def _VerifyNodeNetwork(self, ninfo, nresult):
1782
    """Check the node network connectivity results.
1783

1784
    @type ninfo: L{objects.Node}
1785
    @param ninfo: the node to check
1786
    @param nresult: the remote results for the node
1787

1788
    """
1789
    test = constants.NV_NODELIST not in nresult
1790
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1791
                  "node hasn't returned node ssh connectivity data")
1792
    if not test:
1793
      if nresult[constants.NV_NODELIST]:
1794
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1795
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1796
                        "ssh communication with node '%s': %s", a_node, a_msg)
1797

    
1798
    test = constants.NV_NODENETTEST not in nresult
1799
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1800
                  "node hasn't returned node tcp connectivity data")
1801
    if not test:
1802
      if nresult[constants.NV_NODENETTEST]:
1803
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1804
        for anode in nlist:
1805
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1806
                        "tcp communication with node '%s': %s",
1807
                        anode, nresult[constants.NV_NODENETTEST][anode])
1808

    
1809
    test = constants.NV_MASTERIP not in nresult
1810
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1811
                  "node hasn't returned node master IP reachability data")
1812
    if not test:
1813
      if not nresult[constants.NV_MASTERIP]:
1814
        if ninfo.uuid == self.master_node:
1815
          msg = "the master node cannot reach the master IP (not configured?)"
1816
        else:
1817
          msg = "cannot reach the master IP"
1818
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1819

    
1820
  def _VerifyInstance(self, instance, inst_config, node_image,
1821
                      diskstatus):
1822
    """Verify an instance.
1823

1824
    This function checks to see if the required block devices are
1825
    available on the instance's node, and that the nodes are in the correct
1826
    state.
1827

1828
    """
1829
    pnode = inst_config.primary_node
1830
    pnode_img = node_image[pnode]
1831
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1832

    
1833
    node_vol_should = {}
1834
    inst_config.MapLVsByNode(node_vol_should)
1835

    
1836
    cluster = self.cfg.GetClusterInfo()
1837
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1838
                                                            self.group_info)
1839
    err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1840
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance,
1841
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
1842

    
1843
    for node in node_vol_should:
1844
      n_img = node_image[node]
1845
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1846
        # ignore missing volumes on offline or broken nodes
1847
        continue
1848
      for volume in node_vol_should[node]:
1849
        test = volume not in n_img.volumes
1850
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1851
                      "volume %s missing on node %s", volume,
1852
                      self.cfg.GetNodeName(node))
1853

    
1854
    if inst_config.admin_state == constants.ADMINST_UP:
1855
      test = instance not in pnode_img.instances and not pnode_img.offline
1856
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1857
                    "instance not running on its primary node %s",
1858
                     self.cfg.GetNodeName(pnode))
1859
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1860
                    "instance is marked as running and lives on"
1861
                    " offline node %s", self.cfg.GetNodeName(pnode))
1862

    
1863
    diskdata = [(nname, success, status, idx)
1864
                for (nname, disks) in diskstatus.items()
1865
                for idx, (success, status) in enumerate(disks)]
1866

    
1867
    for nname, success, bdev_status, idx in diskdata:
1868
      # the 'ghost node' construction in Exec() ensures that we have a
1869
      # node here
1870
      snode = node_image[nname]
1871
      bad_snode = snode.ghost or snode.offline
1872
      self._ErrorIf(inst_config.disks_active and
1873
                    not success and not bad_snode,
1874
                    constants.CV_EINSTANCEFAULTYDISK, instance,
1875
                    "couldn't retrieve status for disk/%s on %s: %s",
1876
                    idx, self.cfg.GetNodeName(nname), bdev_status)
1877
      self._ErrorIf((inst_config.disks_active and
1878
                     success and
1879
                     bdev_status.ldisk_status == constants.LDS_FAULTY),
1880
                    constants.CV_EINSTANCEFAULTYDISK, instance,
1881
                    "disk/%s on %s is faulty", idx, self.cfg.GetNodeName(nname))
1882

    
1883
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1884
                  constants.CV_ENODERPC, pnode, "instance %s, connection to"
1885
                  " primary node failed", instance)
1886

    
1887
    self._ErrorIf(len(inst_config.secondary_nodes) > 1,
1888
                  constants.CV_EINSTANCELAYOUT,
1889
                  instance, "instance has multiple secondary nodes: %s",
1890
                  utils.CommaJoin(inst_config.secondary_nodes),
1891
                  code=self.ETYPE_WARNING)
1892

    
1893
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
1894
                                               inst_config.all_nodes)
1895
    if any(es_flags.values()):
1896
      if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1897
        # Disk template not compatible with exclusive_storage: no instance
1898
        # node should have the flag set
1899
        es_nodes = [n
1900
                    for (n, es) in es_flags.items()
1901
                    if es]
1902
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance,
1903
                    "instance has template %s, which is not supported on nodes"
1904
                    " that have exclusive storage set: %s",
1905
                    inst_config.disk_template,
1906
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
1907
      for (idx, disk) in enumerate(inst_config.disks):
1908
        self._ErrorIf(disk.spindles is None,
1909
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance,
1910
                      "number of spindles not configured for disk %s while"
1911
                      " exclusive storage is enabled, try running"
1912
                      " gnt-cluster repair-disk-sizes", idx)
1913

    
1914
    if inst_config.disk_template in constants.DTS_INT_MIRROR:
1915
      instance_nodes = utils.NiceSort(inst_config.all_nodes)
1916
      instance_groups = {}
1917

    
1918
      for node in instance_nodes:
1919
        instance_groups.setdefault(self.all_node_info[node].group,
1920
                                   []).append(node)
1921

    
1922
      pretty_list = [
1923
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
1924
                           groupinfo[group].name)
1925
        # Sort so that we always list the primary node first.
1926
        for group, nodes in sorted(instance_groups.items(),
1927
                                   key=lambda (_, nodes): pnode in nodes,
1928
                                   reverse=True)]
1929

    
1930
      self._ErrorIf(len(instance_groups) > 1,
1931
                    constants.CV_EINSTANCESPLITGROUPS,
1932
                    instance, "instance has primary and secondary nodes in"
1933
                    " different groups: %s", utils.CommaJoin(pretty_list),
1934
                    code=self.ETYPE_WARNING)
1935

    
1936
    inst_nodes_offline = []
1937
    for snode in inst_config.secondary_nodes:
1938
      s_img = node_image[snode]
1939
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1940
                    snode, "instance %s, connection to secondary node failed",
1941
                    instance)
1942

    
1943
      if s_img.offline:
1944
        inst_nodes_offline.append(snode)
1945

    
1946
    # warn that the instance lives on offline nodes
1947
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1948
                  "instance has offline secondary node(s) %s",
1949
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
1950
    # ... or ghost/non-vm_capable nodes
1951
    for node in inst_config.all_nodes:
1952
      self._ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1953
                    instance, "instance lives on ghost node %s",
1954
                    self.cfg.GetNodeName(node))
1955
      self._ErrorIf(not node_image[node].vm_capable,
1956
                    constants.CV_EINSTANCEBADNODE, instance,
1957
                    "instance lives on non-vm_capable node %s",
1958
                    self.cfg.GetNodeName(node))
1959

    
1960
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1961
    """Verify if there are any unknown volumes in the cluster.
1962

1963
    The .os, .swap and backup volumes are ignored. All other volumes are
1964
    reported as unknown.
1965

1966
    @type reserved: L{ganeti.utils.FieldSet}
1967
    @param reserved: a FieldSet of reserved volume names
1968

1969
    """
1970
    for node_uuid, n_img in node_image.items():
1971
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1972
          self.all_node_info[node_uuid].group != self.group_uuid):
1973
        # skip non-healthy nodes
1974
        continue
1975
      for volume in n_img.volumes:
1976
        test = ((node_uuid not in node_vol_should or
1977
                volume not in node_vol_should[node_uuid]) and
1978
                not reserved.Matches(volume))
1979
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
1980
                      self.cfg.GetNodeName(node_uuid),
1981
                      "volume %s is unknown", volume)
1982

    
1983
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1984
    """Verify N+1 Memory Resilience.
1985

1986
    Check that if one single node dies we can still start all the
1987
    instances it was primary for.
1988

1989
    """
1990
    cluster_info = self.cfg.GetClusterInfo()
1991
    for node_uuid, n_img in node_image.items():
1992
      # This code checks that every node which is now listed as
1993
      # secondary has enough memory to host all instances it is
1994
      # supposed to should a single other node in the cluster fail.
1995
      # FIXME: not ready for failover to an arbitrary node
1996
      # FIXME: does not support file-backed instances
1997
      # WARNING: we currently take into account down instances as well
1998
      # as up ones, considering that even if they're down someone
1999
      # might want to start them even in the event of a node failure.
2000
      if n_img.offline or \
2001
         self.all_node_info[node_uuid].group != self.group_uuid:
2002
        # we're skipping nodes marked offline and nodes in other groups from
2003
        # the N+1 warning, since most likely we don't have good memory
2004
        # infromation from them; we already list instances living on such
2005
        # nodes, and that's enough warning
2006
        continue
2007
      #TODO(dynmem): also consider ballooning out other instances
2008
      for prinode, instances in n_img.sbp.items():
2009
        needed_mem = 0
2010
        for instance in instances:
2011
          bep = cluster_info.FillBE(instance_cfg[instance])
2012
          if bep[constants.BE_AUTO_BALANCE]:
2013
            needed_mem += bep[constants.BE_MINMEM]
2014
        test = n_img.mfree < needed_mem
2015
        self._ErrorIf(test, constants.CV_ENODEN1,
2016
                      self.cfg.GetNodeName(node_uuid),
2017
                      "not enough memory to accomodate instance failovers"
2018
                      " should node %s fail (%dMiB needed, %dMiB available)",
2019
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2020

    
2021
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2022
                   (files_all, files_opt, files_mc, files_vm)):
2023
    """Verifies file checksums collected from all nodes.
2024

2025
    @param nodes: List of L{objects.Node} objects
2026
    @param master_node_uuid: UUID of master node
2027
    @param all_nvinfo: RPC results
2028

2029
    """
2030
    # Define functions determining which nodes to consider for a file
2031
    files2nodefn = [
2032
      (files_all, None),
2033
      (files_mc, lambda node: (node.master_candidate or
2034
                               node.uuid == master_node_uuid)),
2035
      (files_vm, lambda node: node.vm_capable),
2036
      ]
2037

    
2038
    # Build mapping from filename to list of nodes which should have the file
2039
    nodefiles = {}
2040
    for (files, fn) in files2nodefn:
2041
      if fn is None:
2042
        filenodes = nodes
2043
      else:
2044
        filenodes = filter(fn, nodes)
2045
      nodefiles.update((filename,
2046
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2047
                       for filename in files)
2048

    
2049
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2050

    
2051
    fileinfo = dict((filename, {}) for filename in nodefiles)
2052
    ignore_nodes = set()
2053

    
2054
    for node in nodes:
2055
      if node.offline:
2056
        ignore_nodes.add(node.uuid)
2057
        continue
2058

    
2059
      nresult = all_nvinfo[node.uuid]
2060

    
2061
      if nresult.fail_msg or not nresult.payload:
2062
        node_files = None
2063
      else:
2064
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2065
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2066
                          for (key, value) in fingerprints.items())
2067
        del fingerprints
2068

    
2069
      test = not (node_files and isinstance(node_files, dict))
2070
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2071
                    "Node did not return file checksum data")
2072
      if test:
2073
        ignore_nodes.add(node.uuid)
2074
        continue
2075

    
2076
      # Build per-checksum mapping from filename to nodes having it
2077
      for (filename, checksum) in node_files.items():
2078
        assert filename in nodefiles
2079
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2080

    
2081
    for (filename, checksums) in fileinfo.items():
2082
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2083

    
2084
      # Nodes having the file
2085
      with_file = frozenset(node_uuid
2086
                            for node_uuids in fileinfo[filename].values()
2087
                            for node_uuid in node_uuids) - ignore_nodes
2088

    
2089
      expected_nodes = nodefiles[filename] - ignore_nodes
2090

    
2091
      # Nodes missing file
2092
      missing_file = expected_nodes - with_file
2093

    
2094
      if filename in files_opt:
2095
        # All or no nodes
2096
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2097
                      constants.CV_ECLUSTERFILECHECK, None,
2098
                      "File %s is optional, but it must exist on all or no"
2099
                      " nodes (not found on %s)",
2100
                      filename,
2101
                      utils.CommaJoin(
2102
                        utils.NiceSort(
2103
                          map(self.cfg.GetNodeName, missing_file))))
2104
      else:
2105
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2106
                      "File %s is missing from node(s) %s", filename,
2107
                      utils.CommaJoin(
2108
                        utils.NiceSort(
2109
                          map(self.cfg.GetNodeName, missing_file))))
2110

    
2111
        # Warn if a node has a file it shouldn't
2112
        unexpected = with_file - expected_nodes
2113
        self._ErrorIf(unexpected,
2114
                      constants.CV_ECLUSTERFILECHECK, None,
2115
                      "File %s should not exist on node(s) %s",
2116
                      filename, utils.CommaJoin(
2117
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2118

    
2119
      # See if there are multiple versions of the file
2120
      test = len(checksums) > 1
2121
      if test:
2122
        variants = ["variant %s on %s" %
2123
                    (idx + 1,
2124
                     utils.CommaJoin(utils.NiceSort(
2125
                       map(self.cfg.GetNodeName, node_uuids))))
2126
                    for (idx, (checksum, node_uuids)) in
2127
                      enumerate(sorted(checksums.items()))]
2128
      else:
2129
        variants = []
2130

    
2131
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2132
                    "File %s found with %s different checksums (%s)",
2133
                    filename, len(checksums), "; ".join(variants))
2134

    
2135
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2136
                      drbd_map):
2137
    """Verifies and the node DRBD status.
2138

2139
    @type ninfo: L{objects.Node}
2140
    @param ninfo: the node to check
2141
    @param nresult: the remote results for the node
2142
    @param instanceinfo: the dict of instances
2143
    @param drbd_helper: the configured DRBD usermode helper
2144
    @param drbd_map: the DRBD map as returned by
2145
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2146

2147
    """
2148
    if drbd_helper:
2149
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2150
      test = (helper_result is None)
2151
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2152
                    "no drbd usermode helper returned")
2153
      if helper_result:
2154
        status, payload = helper_result
2155
        test = not status
2156
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2157
                      "drbd usermode helper check unsuccessful: %s", payload)
2158
        test = status and (payload != drbd_helper)
2159
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2160
                      "wrong drbd usermode helper: %s", payload)
2161

    
2162
    # compute the DRBD minors
2163
    node_drbd = {}
2164
    for minor, instance in drbd_map[ninfo.uuid].items():
2165
      test = instance not in instanceinfo
2166
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2167
                    "ghost instance '%s' in temporary DRBD map", instance)
2168
        # ghost instance should not be running, but otherwise we
2169
        # don't give double warnings (both ghost instance and
2170
        # unallocated minor in use)
2171
      if test:
2172
        node_drbd[minor] = (instance, False)
2173
      else:
2174
        instance = instanceinfo[instance]
2175
        node_drbd[minor] = (instance.name, instance.disks_active)
2176

    
2177
    # and now check them
2178
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2179
    test = not isinstance(used_minors, (tuple, list))
2180
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2181
                  "cannot parse drbd status file: %s", str(used_minors))
2182
    if test:
2183
      # we cannot check drbd status
2184
      return
2185

    
2186
    for minor, (iname, must_exist) in node_drbd.items():
2187
      test = minor not in used_minors and must_exist
2188
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2189
                    "drbd minor %d of instance %s is not active", minor, iname)
2190
    for minor in used_minors:
2191
      test = minor not in node_drbd
2192
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2193
                    "unallocated drbd minor %d is in use", minor)
2194

    
2195
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2196
    """Builds the node OS structures.
2197

2198
    @type ninfo: L{objects.Node}
2199
    @param ninfo: the node to check
2200
    @param nresult: the remote results for the node
2201
    @param nimg: the node image object
2202

2203
    """
2204
    remote_os = nresult.get(constants.NV_OSLIST, None)
2205
    test = (not isinstance(remote_os, list) or
2206
            not compat.all(isinstance(v, list) and len(v) == 7
2207
                           for v in remote_os))
2208

    
2209
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2210
                  "node hasn't returned valid OS data")
2211

    
2212
    nimg.os_fail = test
2213

    
2214
    if test:
2215
      return
2216

    
2217
    os_dict = {}
2218

    
2219
    for (name, os_path, status, diagnose,
2220
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2221

    
2222
      if name not in os_dict:
2223
        os_dict[name] = []
2224

    
2225
      # parameters is a list of lists instead of list of tuples due to
2226
      # JSON lacking a real tuple type, fix it:
2227
      parameters = [tuple(v) for v in parameters]
2228
      os_dict[name].append((os_path, status, diagnose,
2229
                            set(variants), set(parameters), set(api_ver)))
2230

    
2231
    nimg.oslist = os_dict
2232

    
2233
  def _VerifyNodeOS(self, ninfo, nimg, base):
2234
    """Verifies the node OS list.
2235

2236
    @type ninfo: L{objects.Node}
2237
    @param ninfo: the node to check
2238
    @param nimg: the node image object
2239
    @param base: the 'template' node we match against (e.g. from the master)
2240

2241
    """
2242
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2243

    
2244
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2245
    for os_name, os_data in nimg.oslist.items():
2246
      assert os_data, "Empty OS status for OS %s?!" % os_name
2247
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2248
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2249
                    "Invalid OS %s (located at %s): %s",
2250
                    os_name, f_path, f_diag)
2251
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2252
                    "OS '%s' has multiple entries"
2253
                    " (first one shadows the rest): %s",
2254
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2255
      # comparisons with the 'base' image
2256
      test = os_name not in base.oslist
2257
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2258
                    "Extra OS %s not present on reference node (%s)",
2259
                    os_name, self.cfg.GetNodeName(base.uuid))
2260
      if test:
2261
        continue
2262
      assert base.oslist[os_name], "Base node has empty OS status?"
2263
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2264
      if not b_status:
2265
        # base OS is invalid, skipping
2266
        continue
2267
      for kind, a, b in [("API version", f_api, b_api),
2268
                         ("variants list", f_var, b_var),
2269
                         ("parameters", beautify_params(f_param),
2270
                          beautify_params(b_param))]:
2271
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2272
                      "OS %s for %s differs from reference node %s:"
2273
                      " [%s] vs. [%s]", kind, os_name,
2274
                      self.cfg.GetNodeName(base.uuid),
2275
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2276

    
2277
    # check any missing OSes
2278
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2279
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2280
                  "OSes present on reference node %s"
2281
                  " but missing on this node: %s",
2282
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2283

    
2284
  def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2285
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2286

2287
    @type ninfo: L{objects.Node}
2288
    @param ninfo: the node to check
2289
    @param nresult: the remote results for the node
2290
    @type is_master: bool
2291
    @param is_master: Whether node is the master node
2292

2293
    """
2294
    if (is_master and
2295
        (constants.ENABLE_FILE_STORAGE or
2296
         constants.ENABLE_SHARED_FILE_STORAGE)):
2297
      try:
2298
        fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2299
      except KeyError:
2300
        # This should never happen
2301
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2302
                      "Node did not return forbidden file storage paths")
2303
      else:
2304
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2305
                      "Found forbidden file storage paths: %s",
2306
                      utils.CommaJoin(fspaths))
2307
    else:
2308
      self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2309
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2310
                    "Node should not have returned forbidden file storage"
2311
                    " paths")
2312

    
2313
  def _VerifyOob(self, ninfo, nresult):
2314
    """Verifies out of band functionality of a node.
2315

2316
    @type ninfo: L{objects.Node}
2317
    @param ninfo: the node to check
2318
    @param nresult: the remote results for the node
2319

2320
    """
2321
    # We just have to verify the paths on master and/or master candidates
2322
    # as the oob helper is invoked on the master
2323
    if ((ninfo.master_candidate or ninfo.master_capable) and
2324
        constants.NV_OOB_PATHS in nresult):
2325
      for path_result in nresult[constants.NV_OOB_PATHS]:
2326
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2327
                      ninfo.name, path_result)
2328

    
2329
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2330
    """Verifies and updates the node volume data.
2331

2332
    This function will update a L{NodeImage}'s internal structures
2333
    with data from the remote call.
2334

2335
    @type ninfo: L{objects.Node}
2336
    @param ninfo: the node to check
2337
    @param nresult: the remote results for the node
2338
    @param nimg: the node image object
2339
    @param vg_name: the configured VG name
2340

2341
    """
2342
    nimg.lvm_fail = True
2343
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2344
    if vg_name is None:
2345
      pass
2346
    elif isinstance(lvdata, basestring):
2347
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2348
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2349
    elif not isinstance(lvdata, dict):
2350
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2351
                    "rpc call to node failed (lvlist)")
2352
    else:
2353
      nimg.volumes = lvdata
2354
      nimg.lvm_fail = False
2355

    
2356
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2357
    """Verifies and updates the node instance list.
2358

2359
    If the listing was successful, then updates this node's instance
2360
    list. Otherwise, it marks the RPC call as failed for the instance
2361
    list key.
2362

2363
    @type ninfo: L{objects.Node}
2364
    @param ninfo: the node to check
2365
    @param nresult: the remote results for the node
2366
    @param nimg: the node image object
2367

2368
    """
2369
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2370
    test = not isinstance(idata, list)
2371
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2372
                  "rpc call to node failed (instancelist): %s",
2373
                  utils.SafeEncode(str(idata)))
2374
    if test:
2375
      nimg.hyp_fail = True
2376
    else:
2377
      nimg.instances = idata
2378

    
2379
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2380
    """Verifies and computes a node information map
2381

2382
    @type ninfo: L{objects.Node}
2383
    @param ninfo: the node to check
2384
    @param nresult: the remote results for the node
2385
    @param nimg: the node image object
2386
    @param vg_name: the configured VG name
2387

2388
    """
2389
    # try to read free memory (from the hypervisor)
2390
    hv_info = nresult.get(constants.NV_HVINFO, None)
2391
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2392
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2393
                  "rpc call to node failed (hvinfo)")
2394
    if not test:
2395
      try:
2396
        nimg.mfree = int(hv_info["memory_free"])
2397
      except (ValueError, TypeError):
2398
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2399
                      "node returned invalid nodeinfo, check hypervisor")
2400

    
2401
    # FIXME: devise a free space model for file based instances as well
2402
    if vg_name is not None:
2403
      test = (constants.NV_VGLIST not in nresult or
2404
              vg_name not in nresult[constants.NV_VGLIST])
2405
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2406
                    "node didn't return data for the volume group '%s'"
2407
                    " - it is either missing or broken", vg_name)
2408
      if not test:
2409
        try:
2410
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2411
        except (ValueError, TypeError):
2412
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2413
                        "node returned invalid LVM info, check LVM status")
2414

    
2415
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2416
    """Gets per-disk status information for all instances.
2417

2418
    @type node_uuids: list of strings
2419
    @param node_uuids: Node UUIDs
2420
    @type node_image: dict of (name, L{objects.Node})
2421
    @param node_image: Node objects
2422
    @type instanceinfo: dict of (name, L{objects.Instance})
2423
    @param instanceinfo: Instance objects
2424
    @rtype: {instance: {node: [(succes, payload)]}}
2425
    @return: a dictionary of per-instance dictionaries with nodes as
2426
        keys and disk information as values; the disk information is a
2427
        list of tuples (success, payload)
2428

2429
    """
2430
    node_disks = {}
2431
    node_disks_devonly = {}
2432
    diskless_instances = set()
2433
    diskless = constants.DT_DISKLESS
2434

    
2435
    for nuuid in node_uuids:
2436
      node_instances = list(itertools.chain(node_image[nuuid].pinst,
2437
                                            node_image[nuuid].sinst))
2438
      diskless_instances.update(inst for inst in node_instances
2439
                                if instanceinfo[inst].disk_template == diskless)
2440
      disks = [(inst, disk)
2441
               for inst in node_instances
2442
               for disk in instanceinfo[inst].disks]
2443

    
2444
      if not disks:
2445
        # No need to collect data
2446
        continue
2447

    
2448
      node_disks[nuuid] = disks
2449

    
2450
      # _AnnotateDiskParams makes already copies of the disks
2451
      devonly = []
2452
      for (inst, dev) in disks:
2453
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2454
        self.cfg.SetDiskID(anno_disk, nuuid)
2455
        devonly.append(anno_disk)
2456

    
2457
      node_disks_devonly[nuuid] = devonly
2458

    
2459
    assert len(node_disks) == len(node_disks_devonly)
2460

    
2461
    # Collect data from all nodes with disks
2462
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2463
                                                          node_disks_devonly)
2464

    
2465
    assert len(result) == len(node_disks)
2466

    
2467
    instdisk = {}
2468

    
2469
    for (nuuid, nres) in result.items():
2470
      node = self.cfg.GetNodeInfo(nuuid)
2471
      disks = node_disks[node.uuid]
2472

    
2473
      if nres.offline:
2474
        # No data from this node
2475
        data = len(disks) * [(False, "node offline")]
2476
      else:
2477
        msg = nres.fail_msg
2478
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2479
                      "while getting disk information: %s", msg)
2480
        if msg:
2481
          # No data from this node
2482
          data = len(disks) * [(False, msg)]
2483
        else:
2484
          data = []
2485
          for idx, i in enumerate(nres.payload):
2486
            if isinstance(i, (tuple, list)) and len(i) == 2:
2487
              data.append(i)
2488
            else:
2489
              logging.warning("Invalid result from node %s, entry %d: %s",
2490
                              node.name, idx, i)
2491
              data.append((False, "Invalid result from the remote node"))
2492

    
2493
      for ((inst, _), status) in zip(disks, data):
2494
        instdisk.setdefault(inst, {}).setdefault(node.uuid, []).append(status)
2495

    
2496
    # Add empty entries for diskless instances.
2497
    for inst in diskless_instances:
2498
      assert inst not in instdisk
2499
      instdisk[inst] = {}
2500

    
2501
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2502
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2503
                      compat.all(isinstance(s, (tuple, list)) and
2504
                                 len(s) == 2 for s in statuses)
2505
                      for inst, nuuids in instdisk.items()
2506
                      for nuuid, statuses in nuuids.items())
2507
    if __debug__:
2508
      instdisk_keys = set(instdisk)
2509
      instanceinfo_keys = set(instanceinfo)
2510
      assert instdisk_keys == instanceinfo_keys, \
2511
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2512
         (instdisk_keys, instanceinfo_keys))
2513

    
2514
    return instdisk
2515

    
2516
  @staticmethod
2517
  def _SshNodeSelector(group_uuid, all_nodes):
2518
    """Create endless iterators for all potential SSH check hosts.
2519

2520
    """
2521
    nodes = [node for node in all_nodes
2522
             if (node.group != group_uuid and
2523
                 not node.offline)]
2524
    keyfunc = operator.attrgetter("group")
2525

    
2526
    return map(itertools.cycle,
2527
               [sorted(map(operator.attrgetter("name"), names))
2528
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2529
                                                  keyfunc)])
2530

    
2531
  @classmethod
2532
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2533
    """Choose which nodes should talk to which other nodes.
2534

2535
    We will make nodes contact all nodes in their group, and one node from
2536
    every other group.
2537

2538
    @warning: This algorithm has a known issue if one node group is much
2539
      smaller than others (e.g. just one node). In such a case all other
2540
      nodes will talk to the single node.
2541

2542
    """
2543
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2544
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2545

    
2546
    return (online_nodes,
2547
            dict((name, sorted([i.next() for i in sel]))
2548
                 for name in online_nodes))
2549

    
2550
  def BuildHooksEnv(self):
2551
    """Build hooks env.
2552

2553
    Cluster-Verify hooks just ran in the post phase and their failure makes
2554
    the output be logged in the verify output and the verification to fail.
2555

2556
    """
2557
    env = {
2558
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2559
      }
2560

    
2561
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2562
               for node in self.my_node_info.values())
2563

    
2564
    return env
2565

    
2566
  def BuildHooksNodes(self):
2567
    """Build hooks nodes.
2568

2569
    """
2570
    return ([], list(self.my_node_info.keys()))
2571

    
2572
  def Exec(self, feedback_fn):
2573
    """Verify integrity of the node group, performing various test on nodes.
2574

2575
    """
2576
    # This method has too many local variables. pylint: disable=R0914
2577
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2578

    
2579
    if not self.my_node_uuids:
2580
      # empty node group
2581
      feedback_fn("* Empty node group, skipping verification")
2582
      return True
2583

    
2584
    self.bad = False
2585
    verbose = self.op.verbose
2586
    self._feedback_fn = feedback_fn
2587

    
2588
    vg_name = self.cfg.GetVGName()
2589
    drbd_helper = self.cfg.GetDRBDHelper()
2590
    cluster = self.cfg.GetClusterInfo()
2591
    hypervisors = cluster.enabled_hypervisors
2592
    node_data_list = self.my_node_info.values()
2593

    
2594
    i_non_redundant = [] # Non redundant instances
2595
    i_non_a_balanced = [] # Non auto-balanced instances
2596
    i_offline = 0 # Count of offline instances
2597
    n_offline = 0 # Count of offline nodes
2598
    n_drained = 0 # Count of nodes being drained
2599
    node_vol_should = {}
2600

    
2601
    # FIXME: verify OS list
2602

    
2603
    # File verification
2604
    filemap = ComputeAncillaryFiles(cluster, False)
2605

    
2606
    # do local checksums
2607
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2608
    master_ip = self.cfg.GetMasterIP()
2609

    
2610
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2611

    
2612
    user_scripts = []
2613
    if self.cfg.GetUseExternalMipScript():
2614
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2615

    
2616
    node_verify_param = {
2617
      constants.NV_FILELIST:
2618
        map(vcluster.MakeVirtualPath,
2619
            utils.UniqueSequence(filename
2620
                                 for files in filemap
2621
                                 for filename in files)),
2622
      constants.NV_NODELIST:
2623
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2624
                                  self.all_node_info.values()),
2625
      constants.NV_HYPERVISOR: hypervisors,
2626
      constants.NV_HVPARAMS:
2627
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2628
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2629
                                 for node in node_data_list
2630
                                 if not node.offline],
2631
      constants.NV_INSTANCELIST: hypervisors,
2632
      constants.NV_VERSION: None,
2633
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2634
      constants.NV_NODESETUP: None,
2635
      constants.NV_TIME: None,
2636
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2637
      constants.NV_OSLIST: None,
2638
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2639
      constants.NV_USERSCRIPTS: user_scripts,
2640
      }
2641

    
2642
    if vg_name is not None:
2643
      node_verify_param[constants.NV_VGLIST] = None
2644
      node_verify_param[constants.NV_LVLIST] = vg_name
2645
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2646

    
2647
    if drbd_helper:
2648
      node_verify_param[constants.NV_DRBDVERSION] = None
2649
      node_verify_param[constants.NV_DRBDLIST] = None
2650
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2651

    
2652
    if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2653
      # Load file storage paths only from master node
2654
      node_verify_param[constants.NV_FILE_STORAGE_PATHS] = \
2655
        self.cfg.GetMasterNodeName()
2656

    
2657
    # bridge checks
2658
    # FIXME: this needs to be changed per node-group, not cluster-wide
2659
    bridges = set()
2660
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2661
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2662
      bridges.add(default_nicpp[constants.NIC_LINK])
2663
    for instance in self.my_inst_info.values():
2664
      for nic in instance.nics:
2665
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2666
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2667
          bridges.add(full_nic[constants.NIC_LINK])
2668

    
2669
    if bridges:
2670
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2671

    
2672
    # Build our expected cluster state
2673
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2674
                                                 uuid=node.uuid,
2675
                                                 vm_capable=node.vm_capable))
2676
                      for node in node_data_list)
2677

    
2678
    # Gather OOB paths
2679
    oob_paths = []
2680
    for node in self.all_node_info.values():
2681
      path = SupportsOob(self.cfg, node)
2682
      if path and path not in oob_paths:
2683
        oob_paths.append(path)
2684

    
2685
    if oob_paths:
2686
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2687

    
2688
    for instance in self.my_inst_names:
2689
      inst_config = self.my_inst_info[instance]
2690
      if inst_config.admin_state == constants.ADMINST_OFFLINE:
2691
        i_offline += 1
2692

    
2693
      for nuuid in inst_config.all_nodes:
2694
        if nuuid not in node_image:
2695
          gnode = self.NodeImage(uuid=nuuid)
2696
          gnode.ghost = (nuuid not in self.all_node_info)
2697
          node_image[nuuid] = gnode
2698

    
2699
      inst_config.MapLVsByNode(node_vol_should)
2700

    
2701
      pnode = inst_config.primary_node
2702
      node_image[pnode].pinst.append(instance)
2703

    
2704
      for snode in inst_config.secondary_nodes:
2705
        nimg = node_image[snode]
2706
        nimg.sinst.append(instance)
2707
        if pnode not in nimg.sbp:
2708
          nimg.sbp[pnode] = []
2709
        nimg.sbp[pnode].append(instance)
2710

    
2711
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2712
                                               self.my_node_info.keys())
2713
    # The value of exclusive_storage should be the same across the group, so if
2714
    # it's True for at least a node, we act as if it were set for all the nodes
2715
    self._exclusive_storage = compat.any(es_flags.values())
2716
    if self._exclusive_storage:
2717
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2718

    
2719
    # At this point, we have the in-memory data structures complete,
2720
    # except for the runtime information, which we'll gather next
2721

    
2722
    # Due to the way our RPC system works, exact response times cannot be
2723
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2724
    # time before and after executing the request, we can at least have a time
2725
    # window.
2726
    nvinfo_starttime = time.time()
2727
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2728
                                           node_verify_param,
2729
                                           self.cfg.GetClusterName(),
2730
                                           self.cfg.GetClusterInfo().hvparams)
2731
    nvinfo_endtime = time.time()
2732

    
2733
    if self.extra_lv_nodes and vg_name is not None:
2734
      extra_lv_nvinfo = \
2735
          self.rpc.call_node_verify(self.extra_lv_nodes,
2736
                                    {constants.NV_LVLIST: vg_name},
2737
                                    self.cfg.GetClusterName(),
2738
                                    self.cfg.GetClusterInfo().hvparams)
2739
    else:
2740
      extra_lv_nvinfo = {}
2741

    
2742
    all_drbd_map = self.cfg.ComputeDRBDMap()
2743

    
2744
    feedback_fn("* Gathering disk information (%s nodes)" %
2745
                len(self.my_node_uuids))
2746
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2747
                                     self.my_inst_info)
2748

    
2749
    feedback_fn("* Verifying configuration file consistency")
2750

    
2751
    # If not all nodes are being checked, we need to make sure the master node
2752
    # and a non-checked vm_capable node are in the list.
2753
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2754
    if absent_node_uuids:
2755
      vf_nvinfo = all_nvinfo.copy()
2756
      vf_node_info = list(self.my_node_info.values())
2757
      additional_node_uuids = []
2758
      if master_node_uuid not in self.my_node_info:
2759
        additional_node_uuids.append(master_node_uuid)
2760
        vf_node_info.append(self.all_node_info[master_node_uuid])
2761
      # Add the first vm_capable node we find which is not included,
2762
      # excluding the master node (which we already have)
2763
      for node_uuid in absent_node_uuids:
2764
        nodeinfo = self.all_node_info[node_uuid]
2765
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2766
            node_uuid != master_node_uuid):
2767
          additional_node_uuids.append(node_uuid)
2768
          vf_node_info.append(self.all_node_info[node_uuid])
2769
          break
2770
      key = constants.NV_FILELIST
2771
      vf_nvinfo.update(self.rpc.call_node_verify(
2772
         additional_node_uuids, {key: node_verify_param[key]},
2773
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2774
    else:
2775
      vf_nvinfo = all_nvinfo
2776
      vf_node_info = self.my_node_info.values()
2777

    
2778
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2779

    
2780
    feedback_fn("* Verifying node status")
2781

    
2782
    refos_img = None
2783

    
2784
    for node_i in node_data_list:
2785
      nimg = node_image[node_i.uuid]
2786

    
2787
      if node_i.offline:
2788
        if verbose:
2789
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
2790
        n_offline += 1
2791
        continue
2792

    
2793
      if node_i.uuid == master_node_uuid:
2794
        ntype = "master"
2795
      elif node_i.master_candidate:
2796
        ntype = "master candidate"
2797
      elif node_i.drained:
2798
        ntype = "drained"
2799
        n_drained += 1
2800
      else:
2801
        ntype = "regular"
2802
      if verbose:
2803
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2804

    
2805
      msg = all_nvinfo[node_i.uuid].fail_msg
2806
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2807
                    "while contacting node: %s", msg)
2808
      if msg:
2809
        nimg.rpc_fail = True
2810
        continue
2811

    
2812
      nresult = all_nvinfo[node_i.uuid].payload
2813

    
2814
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2815
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2816
      self._VerifyNodeNetwork(node_i, nresult)
2817
      self._VerifyNodeUserScripts(node_i, nresult)
2818
      self._VerifyOob(node_i, nresult)
2819
      self._VerifyFileStoragePaths(node_i, nresult,
2820
                                   node_i.uuid == master_node_uuid)
2821

    
2822
      if nimg.vm_capable:
2823
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2824
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2825
                             all_drbd_map)
2826

    
2827
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2828
        self._UpdateNodeInstances(node_i, nresult, nimg)
2829
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2830
        self._UpdateNodeOS(node_i, nresult, nimg)
2831

    
2832
        if not nimg.os_fail:
2833
          if refos_img is None:
2834
            refos_img = nimg
2835
          self._VerifyNodeOS(node_i, nimg, refos_img)
2836
        self._VerifyNodeBridges(node_i, nresult, bridges)
2837

    
2838
        # Check whether all running instancies are primary for the node. (This
2839
        # can no longer be done from _VerifyInstance below, since some of the
2840
        # wrong instances could be from other node groups.)
2841
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2842

    
2843
        for inst in non_primary_inst:
2844
          test = inst in self.all_inst_info
2845
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2846
                        "instance should not run on node %s", node_i.name)
2847
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2848
                        "node is running unknown instance %s", inst)
2849

    
2850
    self._VerifyGroupDRBDVersion(all_nvinfo)
2851
    self._VerifyGroupLVM(node_image, vg_name)
2852

    
2853
    for node_uuid, result in extra_lv_nvinfo.items():
2854
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
2855
                              node_image[node_uuid], vg_name)
2856

    
2857
    feedback_fn("* Verifying instance status")
2858
    for instance in self.my_inst_names:
2859
      if verbose:
2860
        feedback_fn("* Verifying instance %s" % instance)
2861
      inst_config = self.my_inst_info[instance]
2862
      self._VerifyInstance(instance, inst_config, node_image,
2863
                           instdisk[instance])
2864

    
2865
      # If the instance is non-redundant we cannot survive losing its primary
2866
      # node, so we are not N+1 compliant.
2867
      if inst_config.disk_template not in constants.DTS_MIRRORED:
2868
        i_non_redundant.append(instance)
2869

    
2870
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2871
        i_non_a_balanced.append(instance)
2872

    
2873
    feedback_fn("* Verifying orphan volumes")
2874
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2875

    
2876
    # We will get spurious "unknown volume" warnings if any node of this group
2877
    # is secondary for an instance whose primary is in another group. To avoid
2878
    # them, we find these instances and add their volumes to node_vol_should.
2879
    for inst in self.all_inst_info.values():
2880
      for secondary in inst.secondary_nodes:
2881
        if (secondary in self.my_node_info
2882
            and inst.name not in self.my_inst_info):
2883
          inst.MapLVsByNode(node_vol_should)
2884
          break
2885

    
2886
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2887

    
2888
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2889
      feedback_fn("* Verifying N+1 Memory redundancy")
2890
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2891

    
2892
    feedback_fn("* Other Notes")
2893
    if i_non_redundant:
2894
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2895
                  % len(i_non_redundant))
2896

    
2897
    if i_non_a_balanced:
2898
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2899
                  % len(i_non_a_balanced))
2900

    
2901
    if i_offline:
2902
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2903

    
2904
    if n_offline:
2905
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2906

    
2907
    if n_drained:
2908
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2909

    
2910
    return not self.bad
2911

    
2912
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2913
    """Analyze the post-hooks' result
2914

2915
    This method analyses the hook result, handles it, and sends some
2916
    nicely-formatted feedback back to the user.
2917

2918
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2919
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2920
    @param hooks_results: the results of the multi-node hooks rpc call
2921
    @param feedback_fn: function used send feedback back to the caller
2922
    @param lu_result: previous Exec result
2923
    @return: the new Exec result, based on the previous result
2924
        and hook results
2925

2926
    """
2927
    # We only really run POST phase hooks, only for non-empty groups,
2928
    # and are only interested in their results
2929
    if not self.my_node_uuids:
2930
      # empty node group
2931
      pass
2932
    elif phase == constants.HOOKS_PHASE_POST:
2933
      # Used to change hooks' output to proper indentation
2934
      feedback_fn("* Hooks Results")
2935
      assert hooks_results, "invalid result from hooks"
2936

    
2937
      for node_name in hooks_results:
2938
        res = hooks_results[node_name]
2939
        msg = res.fail_msg
2940
        test = msg and not res.offline
2941
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2942
                      "Communication failure in hooks execution: %s", msg)
2943
        if res.offline or msg:
2944
          # No need to investigate payload if node is offline or gave
2945
          # an error.
2946
          continue
2947
        for script, hkr, output in res.payload:
2948
          test = hkr == constants.HKR_FAIL
2949
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2950
                        "Script %s failed, output:", script)
2951
          if test:
2952
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2953
            feedback_fn("%s" % output)
2954
            lu_result = False
2955

    
2956
    return lu_result
2957

    
2958

    
2959
class LUClusterVerifyDisks(NoHooksLU):
2960
  """Verifies the cluster disks status.
2961

2962
  """
2963
  REQ_BGL = False
2964

    
2965
  def ExpandNames(self):
2966
    self.share_locks = ShareAll()
2967
    self.needed_locks = {
2968
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2969
      }
2970

    
2971
  def Exec(self, feedback_fn):
2972
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2973

    
2974
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2975
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2976
                           for group in group_names])