Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 9b0e86e2

History | View | Annotate | Download (110.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
170
                                                     master_params, ems)
171
    result.Warn("Error disabling the master IP address", self.LogWarning)
172
    return master_params.uuid
173

    
174

    
175
class LUClusterPostInit(LogicalUnit):
176
  """Logical unit for running hooks after cluster initialization.
177

178
  """
179
  HPATH = "cluster-init"
180
  HTYPE = constants.HTYPE_CLUSTER
181

    
182
  def BuildHooksEnv(self):
183
    """Build hooks env.
184

185
    """
186
    return {
187
      "OP_TARGET": self.cfg.GetClusterName(),
188
      }
189

    
190
  def BuildHooksNodes(self):
191
    """Build hooks nodes.
192

193
    """
194
    return ([], [self.cfg.GetMasterNode()])
195

    
196
  def Exec(self, feedback_fn):
197
    """Nothing to do.
198

199
    """
200
    return True
201

    
202

    
203
class ClusterQuery(QueryBase):
204
  FIELDS = query.CLUSTER_FIELDS
205

    
206
  #: Do not sort (there is only one item)
207
  SORT_FIELD = None
208

    
209
  def ExpandNames(self, lu):
210
    lu.needed_locks = {}
211

    
212
    # The following variables interact with _QueryBase._GetNames
213
    self.wanted = locking.ALL_SET
214
    self.do_locking = self.use_locking
215

    
216
    if self.do_locking:
217
      raise errors.OpPrereqError("Can not use locking for cluster queries",
218
                                 errors.ECODE_INVAL)
219

    
220
  def DeclareLocks(self, lu, level):
221
    pass
222

    
223
  def _GetQueryData(self, lu):
224
    """Computes the list of nodes and their attributes.
225

226
    """
227
    # Locking is not used
228
    assert not (compat.any(lu.glm.is_owned(level)
229
                           for level in locking.LEVELS
230
                           if level != locking.LEVEL_CLUSTER) or
231
                self.do_locking or self.use_locking)
232

    
233
    if query.CQ_CONFIG in self.requested_data:
234
      cluster = lu.cfg.GetClusterInfo()
235
      nodes = lu.cfg.GetAllNodesInfo()
236
    else:
237
      cluster = NotImplemented
238
      nodes = NotImplemented
239

    
240
    if query.CQ_QUEUE_DRAINED in self.requested_data:
241
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
242
    else:
243
      drain_flag = NotImplemented
244

    
245
    if query.CQ_WATCHER_PAUSE in self.requested_data:
246
      master_node_uuid = lu.cfg.GetMasterNode()
247

    
248
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
250
                   lu.cfg.GetMasterNodeName())
251

    
252
      watcher_pause = result.payload
253
    else:
254
      watcher_pause = NotImplemented
255

    
256
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
257

    
258

    
259
class LUClusterQuery(NoHooksLU):
260
  """Query cluster configuration.
261

262
  """
263
  REQ_BGL = False
264

    
265
  def ExpandNames(self):
266
    self.needed_locks = {}
267

    
268
  def Exec(self, feedback_fn):
269
    """Return cluster config.
270

271
    """
272
    cluster = self.cfg.GetClusterInfo()
273
    os_hvp = {}
274

    
275
    # Filter just for enabled hypervisors
276
    for os_name, hv_dict in cluster.os_hvp.items():
277
      os_hvp[os_name] = {}
278
      for hv_name, hv_params in hv_dict.items():
279
        if hv_name in cluster.enabled_hypervisors:
280
          os_hvp[os_name][hv_name] = hv_params
281

    
282
    # Convert ip_family to ip_version
283
    primary_ip_version = constants.IP4_VERSION
284
    if cluster.primary_ip_family == netutils.IP6Address.family:
285
      primary_ip_version = constants.IP6_VERSION
286

    
287
    result = {
288
      "software_version": constants.RELEASE_VERSION,
289
      "protocol_version": constants.PROTOCOL_VERSION,
290
      "config_version": constants.CONFIG_VERSION,
291
      "os_api_version": max(constants.OS_API_VERSIONS),
292
      "export_version": constants.EXPORT_VERSION,
293
      "architecture": runtime.GetArchInfo(),
294
      "name": cluster.cluster_name,
295
      "master": self.cfg.GetMasterNodeName(),
296
      "default_hypervisor": cluster.primary_hypervisor,
297
      "enabled_hypervisors": cluster.enabled_hypervisors,
298
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
299
                        for hypervisor_name in cluster.enabled_hypervisors]),
300
      "os_hvp": os_hvp,
301
      "beparams": cluster.beparams,
302
      "osparams": cluster.osparams,
303
      "ipolicy": cluster.ipolicy,
304
      "nicparams": cluster.nicparams,
305
      "ndparams": cluster.ndparams,
306
      "diskparams": cluster.diskparams,
307
      "candidate_pool_size": cluster.candidate_pool_size,
308
      "master_netdev": cluster.master_netdev,
309
      "master_netmask": cluster.master_netmask,
310
      "use_external_mip_script": cluster.use_external_mip_script,
311
      "volume_group_name": cluster.volume_group_name,
312
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
313
      "file_storage_dir": cluster.file_storage_dir,
314
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
315
      "maintain_node_health": cluster.maintain_node_health,
316
      "ctime": cluster.ctime,
317
      "mtime": cluster.mtime,
318
      "uuid": cluster.uuid,
319
      "tags": list(cluster.GetTags()),
320
      "uid_pool": cluster.uid_pool,
321
      "default_iallocator": cluster.default_iallocator,
322
      "reserved_lvs": cluster.reserved_lvs,
323
      "primary_ip_version": primary_ip_version,
324
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
325
      "hidden_os": cluster.hidden_os,
326
      "blacklisted_os": cluster.blacklisted_os,
327
      }
328

    
329
    return result
330

    
331

    
332
class LUClusterRedistConf(NoHooksLU):
333
  """Force the redistribution of cluster configuration.
334

335
  This is a very simple LU.
336

337
  """
338
  REQ_BGL = False
339

    
340
  def ExpandNames(self):
341
    self.needed_locks = {
342
      locking.LEVEL_NODE: locking.ALL_SET,
343
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
344
    }
345
    self.share_locks = ShareAll()
346

    
347
  def Exec(self, feedback_fn):
348
    """Redistribute the configuration.
349

350
    """
351
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
352
    RedistributeAncillaryFiles(self)
353

    
354

    
355
class LUClusterRename(LogicalUnit):
356
  """Rename the cluster.
357

358
  """
359
  HPATH = "cluster-rename"
360
  HTYPE = constants.HTYPE_CLUSTER
361

    
362
  def BuildHooksEnv(self):
363
    """Build hooks env.
364

365
    """
366
    return {
367
      "OP_TARGET": self.cfg.GetClusterName(),
368
      "NEW_NAME": self.op.name,
369
      }
370

    
371
  def BuildHooksNodes(self):
372
    """Build hooks nodes.
373

374
    """
375
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
376

    
377
  def CheckPrereq(self):
378
    """Verify that the passed name is a valid one.
379

380
    """
381
    hostname = netutils.GetHostname(name=self.op.name,
382
                                    family=self.cfg.GetPrimaryIPFamily())
383

    
384
    new_name = hostname.name
385
    self.ip = new_ip = hostname.ip
386
    old_name = self.cfg.GetClusterName()
387
    old_ip = self.cfg.GetMasterIP()
388
    if new_name == old_name and new_ip == old_ip:
389
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
390
                                 " cluster has changed",
391
                                 errors.ECODE_INVAL)
392
    if new_ip != old_ip:
393
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
394
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
395
                                   " reachable on the network" %
396
                                   new_ip, errors.ECODE_NOTUNIQUE)
397

    
398
    self.op.name = new_name
399

    
400
  def Exec(self, feedback_fn):
401
    """Rename the cluster.
402

403
    """
404
    clustername = self.op.name
405
    new_ip = self.ip
406

    
407
    # shutdown the master IP
408
    master_params = self.cfg.GetMasterNetworkParameters()
409
    ems = self.cfg.GetUseExternalMipScript()
410
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
411
                                                     master_params, ems)
412
    result.Raise("Could not disable the master role")
413

    
414
    try:
415
      cluster = self.cfg.GetClusterInfo()
416
      cluster.cluster_name = clustername
417
      cluster.master_ip = new_ip
418
      self.cfg.Update(cluster, feedback_fn)
419

    
420
      # update the known hosts file
421
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
422
      node_list = self.cfg.GetOnlineNodeList()
423
      try:
424
        node_list.remove(master_params.uuid)
425
      except ValueError:
426
        pass
427
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
428
    finally:
429
      master_params.ip = new_ip
430
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
431
                                                     master_params, ems)
432
      result.Warn("Could not re-enable the master role on the master,"
433
                  " please restart manually", self.LogWarning)
434

    
435
    return clustername
436

    
437

    
438
class LUClusterRepairDiskSizes(NoHooksLU):
439
  """Verifies the cluster disks sizes.
440

441
  """
442
  REQ_BGL = False
443

    
444
  def ExpandNames(self):
445
    if self.op.instances:
446
      self.wanted_names = GetWantedInstances(self, self.op.instances)
447
      # Not getting the node allocation lock as only a specific set of
448
      # instances (and their nodes) is going to be acquired
449
      self.needed_locks = {
450
        locking.LEVEL_NODE_RES: [],
451
        locking.LEVEL_INSTANCE: self.wanted_names,
452
        }
453
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
454
    else:
455
      self.wanted_names = None
456
      self.needed_locks = {
457
        locking.LEVEL_NODE_RES: locking.ALL_SET,
458
        locking.LEVEL_INSTANCE: locking.ALL_SET,
459

    
460
        # This opcode is acquires the node locks for all instances
461
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
462
        }
463

    
464
    self.share_locks = {
465
      locking.LEVEL_NODE_RES: 1,
466
      locking.LEVEL_INSTANCE: 0,
467
      locking.LEVEL_NODE_ALLOC: 1,
468
      }
469

    
470
  def DeclareLocks(self, level):
471
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
472
      self._LockInstancesNodes(primary_only=True, level=level)
473

    
474
  def CheckPrereq(self):
475
    """Check prerequisites.
476

477
    This only checks the optional instance list against the existing names.
478

479
    """
480
    if self.wanted_names is None:
481
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
482

    
483
    self.wanted_instances = \
484
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
485

    
486
  def _EnsureChildSizes(self, disk):
487
    """Ensure children of the disk have the needed disk size.
488

489
    This is valid mainly for DRBD8 and fixes an issue where the
490
    children have smaller disk size.
491

492
    @param disk: an L{ganeti.objects.Disk} object
493

494
    """
495
    if disk.dev_type == constants.LD_DRBD8:
496
      assert disk.children, "Empty children for DRBD8?"
497
      fchild = disk.children[0]
498
      mismatch = fchild.size < disk.size
499
      if mismatch:
500
        self.LogInfo("Child disk has size %d, parent %d, fixing",
501
                     fchild.size, disk.size)
502
        fchild.size = disk.size
503

    
504
      # and we recurse on this child only, not on the metadev
505
      return self._EnsureChildSizes(fchild) or mismatch
506
    else:
507
      return False
508

    
509
  def Exec(self, feedback_fn):
510
    """Verify the size of cluster disks.
511

512
    """
513
    # TODO: check child disks too
514
    # TODO: check differences in size between primary/secondary nodes
515
    per_node_disks = {}
516
    for instance in self.wanted_instances:
517
      pnode = instance.primary_node
518
      if pnode not in per_node_disks:
519
        per_node_disks[pnode] = []
520
      for idx, disk in enumerate(instance.disks):
521
        per_node_disks[pnode].append((instance, idx, disk))
522

    
523
    assert not (frozenset(per_node_disks.keys()) -
524
                self.owned_locks(locking.LEVEL_NODE_RES)), \
525
      "Not owning correct locks"
526
    assert not self.owned_locks(locking.LEVEL_NODE)
527

    
528
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
529
                                               per_node_disks.keys())
530

    
531
    changed = []
532
    for node_uuid, dskl in per_node_disks.items():
533
      newl = [v[2].Copy() for v in dskl]
534
      for dsk in newl:
535
        self.cfg.SetDiskID(dsk, node_uuid)
536
      node_name = self.cfg.GetNodeName(node_uuid)
537
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
538
      if result.fail_msg:
539
        self.LogWarning("Failure in blockdev_getdimensions call to node"
540
                        " %s, ignoring", node_name)
541
        continue
542
      if len(result.payload) != len(dskl):
543
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
544
                        " result.payload=%s", node_name, len(dskl),
545
                        result.payload)
546
        self.LogWarning("Invalid result from node %s, ignoring node results",
547
                        node_name)
548
        continue
549
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
550
        if dimensions is None:
551
          self.LogWarning("Disk %d of instance %s did not return size"
552
                          " information, ignoring", idx, instance.name)
553
          continue
554
        if not isinstance(dimensions, (tuple, list)):
555
          self.LogWarning("Disk %d of instance %s did not return valid"
556
                          " dimension information, ignoring", idx,
557
                          instance.name)
558
          continue
559
        (size, spindles) = dimensions
560
        if not isinstance(size, (int, long)):
561
          self.LogWarning("Disk %d of instance %s did not return valid"
562
                          " size information, ignoring", idx, instance.name)
563
          continue
564
        size = size >> 20
565
        if size != disk.size:
566
          self.LogInfo("Disk %d of instance %s has mismatched size,"
567
                       " correcting: recorded %d, actual %d", idx,
568
                       instance.name, disk.size, size)
569
          disk.size = size
570
          self.cfg.Update(instance, feedback_fn)
571
          changed.append((instance.name, idx, "size", size))
572
        if es_flags[node_uuid]:
573
          if spindles is None:
574
            self.LogWarning("Disk %d of instance %s did not return valid"
575
                            " spindles information, ignoring", idx,
576
                            instance.name)
577
          elif disk.spindles is None or disk.spindles != spindles:
578
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
579
                         " correcting: recorded %s, actual %s",
580
                         idx, instance.name, disk.spindles, spindles)
581
            disk.spindles = spindles
582
            self.cfg.Update(instance, feedback_fn)
583
            changed.append((instance.name, idx, "spindles", disk.spindles))
584
        if self._EnsureChildSizes(disk):
585
          self.cfg.Update(instance, feedback_fn)
586
          changed.append((instance.name, idx, "size", disk.size))
587
    return changed
588

    
589

    
590
def _ValidateNetmask(cfg, netmask):
591
  """Checks if a netmask is valid.
592

593
  @type cfg: L{config.ConfigWriter}
594
  @param cfg: The cluster configuration
595
  @type netmask: int
596
  @param netmask: the netmask to be verified
597
  @raise errors.OpPrereqError: if the validation fails
598

599
  """
600
  ip_family = cfg.GetPrimaryIPFamily()
601
  try:
602
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
603
  except errors.ProgrammerError:
604
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
605
                               ip_family, errors.ECODE_INVAL)
606
  if not ipcls.ValidateNetmask(netmask):
607
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
608
                               (netmask), errors.ECODE_INVAL)
609

    
610

    
611
class LUClusterSetParams(LogicalUnit):
612
  """Change the parameters of the cluster.
613

614
  """
615
  HPATH = "cluster-modify"
616
  HTYPE = constants.HTYPE_CLUSTER
617
  REQ_BGL = False
618

    
619
  def CheckArguments(self):
620
    """Check parameters
621

622
    """
623
    if self.op.uid_pool:
624
      uidpool.CheckUidPool(self.op.uid_pool)
625

    
626
    if self.op.add_uids:
627
      uidpool.CheckUidPool(self.op.add_uids)
628

    
629
    if self.op.remove_uids:
630
      uidpool.CheckUidPool(self.op.remove_uids)
631

    
632
    if self.op.master_netmask is not None:
633
      _ValidateNetmask(self.cfg, self.op.master_netmask)
634

    
635
    if self.op.diskparams:
636
      for dt_params in self.op.diskparams.values():
637
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
638
      try:
639
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
640
      except errors.OpPrereqError, err:
641
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
642
                                   errors.ECODE_INVAL)
643

    
644
  def ExpandNames(self):
645
    # FIXME: in the future maybe other cluster params won't require checking on
646
    # all nodes to be modified.
647
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
648
    # resource locks the right thing, shouldn't it be the BGL instead?
649
    self.needed_locks = {
650
      locking.LEVEL_NODE: locking.ALL_SET,
651
      locking.LEVEL_INSTANCE: locking.ALL_SET,
652
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
653
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
654
    }
655
    self.share_locks = ShareAll()
656

    
657
  def BuildHooksEnv(self):
658
    """Build hooks env.
659

660
    """
661
    return {
662
      "OP_TARGET": self.cfg.GetClusterName(),
663
      "NEW_VG_NAME": self.op.vg_name,
664
      }
665

    
666
  def BuildHooksNodes(self):
667
    """Build hooks nodes.
668

669
    """
670
    mn = self.cfg.GetMasterNode()
671
    return ([mn], [mn])
672

    
673
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
674
                   new_enabled_disk_templates):
675
    """Check the consistency of the vg name on all nodes and in case it gets
676
       unset whether there are instances still using it.
677

678
    """
679
    if self.op.vg_name is not None and not self.op.vg_name:
680
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
681
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
682
                                   " instances exist", errors.ECODE_INVAL)
683

    
684
    if (self.op.vg_name is not None and
685
        utils.IsLvmEnabled(enabled_disk_templates)) or \
686
           (self.cfg.GetVGName() is not None and
687
            utils.LvmGetsEnabled(enabled_disk_templates,
688
                                 new_enabled_disk_templates)):
689
      self._CheckVgNameOnNodes(node_uuids)
690

    
691
  def _CheckVgNameOnNodes(self, node_uuids):
692
    """Check the status of the volume group on each node.
693

694
    """
695
    vglist = self.rpc.call_vg_list(node_uuids)
696
    for node_uuid in node_uuids:
697
      msg = vglist[node_uuid].fail_msg
698
      if msg:
699
        # ignoring down node
700
        self.LogWarning("Error while gathering data on node %s"
701
                        " (ignoring node): %s",
702
                        self.cfg.GetNodeName(node_uuid), msg)
703
        continue
704
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
705
                                            self.op.vg_name,
706
                                            constants.MIN_VG_SIZE)
707
      if vgstatus:
708
        raise errors.OpPrereqError("Error on node '%s': %s" %
709
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
710
                                   errors.ECODE_ENVIRON)
711

    
712
  def _GetEnabledDiskTemplates(self, cluster):
713
    """Determines the enabled disk templates and the subset of disk templates
714
       that are newly enabled by this operation.
715

716
    """
717
    enabled_disk_templates = None
718
    new_enabled_disk_templates = []
719
    if self.op.enabled_disk_templates:
720
      enabled_disk_templates = self.op.enabled_disk_templates
721
      new_enabled_disk_templates = \
722
        list(set(enabled_disk_templates)
723
             - set(cluster.enabled_disk_templates))
724
    else:
725
      enabled_disk_templates = cluster.enabled_disk_templates
726
    return (enabled_disk_templates, new_enabled_disk_templates)
727

    
728
  def CheckPrereq(self):
729
    """Check prerequisites.
730

731
    This checks whether the given params don't conflict and
732
    if the given volume group is valid.
733

734
    """
735
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
736
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
737
        raise errors.OpPrereqError("Cannot disable drbd helper while"
738
                                   " drbd-based instances exist",
739
                                   errors.ECODE_INVAL)
740

    
741
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
742
    self.cluster = cluster = self.cfg.GetClusterInfo()
743

    
744
    vm_capable_node_uuids = [node.uuid
745
                             for node in self.cfg.GetAllNodesInfo().values()
746
                             if node.uuid in node_uuids and node.vm_capable]
747

    
748
    (enabled_disk_templates, new_enabled_disk_templates) = \
749
      self._GetEnabledDiskTemplates(cluster)
750

    
751
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
752
                      new_enabled_disk_templates)
753

    
754
    if self.op.drbd_helper:
755
      # checks given drbd helper on all nodes
756
      helpers = self.rpc.call_drbd_helper(node_uuids)
757
      for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
758
        if ninfo.offline:
759
          self.LogInfo("Not checking drbd helper on offline node %s",
760
                       ninfo.name)
761
          continue
762
        msg = helpers[ninfo.uuid].fail_msg
763
        if msg:
764
          raise errors.OpPrereqError("Error checking drbd helper on node"
765
                                     " '%s': %s" % (ninfo.name, msg),
766
                                     errors.ECODE_ENVIRON)
767
        node_helper = helpers[ninfo.uuid].payload
768
        if node_helper != self.op.drbd_helper:
769
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
770
                                     (ninfo.name, node_helper),
771
                                     errors.ECODE_ENVIRON)
772

    
773
    # validate params changes
774
    if self.op.beparams:
775
      objects.UpgradeBeParams(self.op.beparams)
776
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
777
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
778

    
779
    if self.op.ndparams:
780
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
781
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
782

    
783
      # TODO: we need a more general way to handle resetting
784
      # cluster-level parameters to default values
785
      if self.new_ndparams["oob_program"] == "":
786
        self.new_ndparams["oob_program"] = \
787
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
788

    
789
    if self.op.hv_state:
790
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
791
                                           self.cluster.hv_state_static)
792
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
793
                               for hv, values in new_hv_state.items())
794

    
795
    if self.op.disk_state:
796
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
797
                                               self.cluster.disk_state_static)
798
      self.new_disk_state = \
799
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
800
                            for name, values in svalues.items()))
801
             for storage, svalues in new_disk_state.items())
802

    
803
    if self.op.ipolicy:
804
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
805
                                           group_policy=False)
806

    
807
      all_instances = self.cfg.GetAllInstancesInfo().values()
808
      violations = set()
809
      for group in self.cfg.GetAllNodeGroupsInfo().values():
810
        instances = frozenset([inst for inst in all_instances
811
                               if compat.any(nuuid in group.members
812
                                             for nuuid in inst.all_nodes)])
813
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
814
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
815
        new = ComputeNewInstanceViolations(ipol,
816
                                           new_ipolicy, instances, self.cfg)
817
        if new:
818
          violations.update(new)
819

    
820
      if violations:
821
        self.LogWarning("After the ipolicy change the following instances"
822
                        " violate them: %s",
823
                        utils.CommaJoin(utils.NiceSort(violations)))
824

    
825
    if self.op.nicparams:
826
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
827
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
828
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
829
      nic_errors = []
830

    
831
      # check all instances for consistency
832
      for instance in self.cfg.GetAllInstancesInfo().values():
833
        for nic_idx, nic in enumerate(instance.nics):
834
          params_copy = copy.deepcopy(nic.nicparams)
835
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
836

    
837
          # check parameter syntax
838
          try:
839
            objects.NIC.CheckParameterSyntax(params_filled)
840
          except errors.ConfigurationError, err:
841
            nic_errors.append("Instance %s, nic/%d: %s" %
842
                              (instance.name, nic_idx, err))
843

    
844
          # if we're moving instances to routed, check that they have an ip
845
          target_mode = params_filled[constants.NIC_MODE]
846
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
847
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
848
                              " address" % (instance.name, nic_idx))
849
      if nic_errors:
850
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
851
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
852

    
853
    # hypervisor list/parameters
854
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
855
    if self.op.hvparams:
856
      for hv_name, hv_dict in self.op.hvparams.items():
857
        if hv_name not in self.new_hvparams:
858
          self.new_hvparams[hv_name] = hv_dict
859
        else:
860
          self.new_hvparams[hv_name].update(hv_dict)
861

    
862
    # disk template parameters
863
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
864
    if self.op.diskparams:
865
      for dt_name, dt_params in self.op.diskparams.items():
866
        if dt_name not in self.op.diskparams:
867
          self.new_diskparams[dt_name] = dt_params
868
        else:
869
          self.new_diskparams[dt_name].update(dt_params)
870

    
871
    # os hypervisor parameters
872
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
873
    if self.op.os_hvp:
874
      for os_name, hvs in self.op.os_hvp.items():
875
        if os_name not in self.new_os_hvp:
876
          self.new_os_hvp[os_name] = hvs
877
        else:
878
          for hv_name, hv_dict in hvs.items():
879
            if hv_dict is None:
880
              # Delete if it exists
881
              self.new_os_hvp[os_name].pop(hv_name, None)
882
            elif hv_name not in self.new_os_hvp[os_name]:
883
              self.new_os_hvp[os_name][hv_name] = hv_dict
884
            else:
885
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
886

    
887
    # os parameters
888
    self.new_osp = objects.FillDict(cluster.osparams, {})
889
    if self.op.osparams:
890
      for os_name, osp in self.op.osparams.items():
891
        if os_name not in self.new_osp:
892
          self.new_osp[os_name] = {}
893

    
894
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
895
                                                 use_none=True)
896

    
897
        if not self.new_osp[os_name]:
898
          # we removed all parameters
899
          del self.new_osp[os_name]
900
        else:
901
          # check the parameter validity (remote check)
902
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
903
                        os_name, self.new_osp[os_name])
904

    
905
    # changes to the hypervisor list
906
    if self.op.enabled_hypervisors is not None:
907
      self.hv_list = self.op.enabled_hypervisors
908
      for hv in self.hv_list:
909
        # if the hypervisor doesn't already exist in the cluster
910
        # hvparams, we initialize it to empty, and then (in both
911
        # cases) we make sure to fill the defaults, as we might not
912
        # have a complete defaults list if the hypervisor wasn't
913
        # enabled before
914
        if hv not in new_hvp:
915
          new_hvp[hv] = {}
916
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
917
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
918
    else:
919
      self.hv_list = cluster.enabled_hypervisors
920

    
921
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
922
      # either the enabled list has changed, or the parameters have, validate
923
      for hv_name, hv_params in self.new_hvparams.items():
924
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
925
            (self.op.enabled_hypervisors and
926
             hv_name in self.op.enabled_hypervisors)):
927
          # either this is a new hypervisor, or its parameters have changed
928
          hv_class = hypervisor.GetHypervisorClass(hv_name)
929
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
930
          hv_class.CheckParameterSyntax(hv_params)
931
          CheckHVParams(self, node_uuids, hv_name, hv_params)
932

    
933
    self._CheckDiskTemplateConsistency()
934

    
935
    if self.op.os_hvp:
936
      # no need to check any newly-enabled hypervisors, since the
937
      # defaults have already been checked in the above code-block
938
      for os_name, os_hvp in self.new_os_hvp.items():
939
        for hv_name, hv_params in os_hvp.items():
940
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
941
          # we need to fill in the new os_hvp on top of the actual hv_p
942
          cluster_defaults = self.new_hvparams.get(hv_name, {})
943
          new_osp = objects.FillDict(cluster_defaults, hv_params)
944
          hv_class = hypervisor.GetHypervisorClass(hv_name)
945
          hv_class.CheckParameterSyntax(new_osp)
946
          CheckHVParams(self, node_uuids, hv_name, new_osp)
947

    
948
    if self.op.default_iallocator:
949
      alloc_script = utils.FindFile(self.op.default_iallocator,
950
                                    constants.IALLOCATOR_SEARCH_PATH,
951
                                    os.path.isfile)
952
      if alloc_script is None:
953
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
954
                                   " specified" % self.op.default_iallocator,
955
                                   errors.ECODE_INVAL)
956

    
957
  def _CheckDiskTemplateConsistency(self):
958
    """Check whether the disk templates that are going to be disabled
959
       are still in use by some instances.
960

961
    """
962
    if self.op.enabled_disk_templates:
963
      cluster = self.cfg.GetClusterInfo()
964
      instances = self.cfg.GetAllInstancesInfo()
965

    
966
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
967
        - set(self.op.enabled_disk_templates)
968
      for instance in instances.itervalues():
969
        if instance.disk_template in disk_templates_to_remove:
970
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
971
                                     " because instance '%s' is using it." %
972
                                     (instance.disk_template, instance.name))
973

    
974
  def _SetVgName(self, feedback_fn):
975
    """Determines and sets the new volume group name.
976

977
    """
978
    if self.op.vg_name is not None:
979
      if self.op.vg_name and not \
980
           utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
981
        feedback_fn("Note that you specified a volume group, but did not"
982
                    " enable any lvm disk template.")
983
      new_volume = self.op.vg_name
984
      if not new_volume:
985
        if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
986
          raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
987
                                     " disk templates are enabled.")
988
        new_volume = None
989
      if new_volume != self.cfg.GetVGName():
990
        self.cfg.SetVGName(new_volume)
991
      else:
992
        feedback_fn("Cluster LVM configuration already in desired"
993
                    " state, not changing")
994
    else:
995
      if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
996
          not self.cfg.GetVGName():
997
        raise errors.OpPrereqError("Please specify a volume group when"
998
                                   " enabling lvm-based disk-templates.")
999

    
1000
  def Exec(self, feedback_fn):
1001
    """Change the parameters of the cluster.
1002

1003
    """
1004
    if self.op.enabled_disk_templates:
1005
      self.cluster.enabled_disk_templates = \
1006
        list(set(self.op.enabled_disk_templates))
1007

    
1008
    self._SetVgName(feedback_fn)
1009

    
1010
    if self.op.drbd_helper is not None:
1011
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1012
        feedback_fn("Note that you specified a drbd user helper, but did"
1013
                    " enabled the drbd disk template.")
1014
      new_helper = self.op.drbd_helper
1015
      if not new_helper:
1016
        new_helper = None
1017
      if new_helper != self.cfg.GetDRBDHelper():
1018
        self.cfg.SetDRBDHelper(new_helper)
1019
      else:
1020
        feedback_fn("Cluster DRBD helper already in desired state,"
1021
                    " not changing")
1022
    if self.op.hvparams:
1023
      self.cluster.hvparams = self.new_hvparams
1024
    if self.op.os_hvp:
1025
      self.cluster.os_hvp = self.new_os_hvp
1026
    if self.op.enabled_hypervisors is not None:
1027
      self.cluster.hvparams = self.new_hvparams
1028
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1029
    if self.op.beparams:
1030
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1031
    if self.op.nicparams:
1032
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1033
    if self.op.ipolicy:
1034
      self.cluster.ipolicy = self.new_ipolicy
1035
    if self.op.osparams:
1036
      self.cluster.osparams = self.new_osp
1037
    if self.op.ndparams:
1038
      self.cluster.ndparams = self.new_ndparams
1039
    if self.op.diskparams:
1040
      self.cluster.diskparams = self.new_diskparams
1041
    if self.op.hv_state:
1042
      self.cluster.hv_state_static = self.new_hv_state
1043
    if self.op.disk_state:
1044
      self.cluster.disk_state_static = self.new_disk_state
1045

    
1046
    if self.op.candidate_pool_size is not None:
1047
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1048
      # we need to update the pool size here, otherwise the save will fail
1049
      AdjustCandidatePool(self, [])
1050

    
1051
    if self.op.maintain_node_health is not None:
1052
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1053
        feedback_fn("Note: CONFD was disabled at build time, node health"
1054
                    " maintenance is not useful (still enabling it)")
1055
      self.cluster.maintain_node_health = self.op.maintain_node_health
1056

    
1057
    if self.op.prealloc_wipe_disks is not None:
1058
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1059

    
1060
    if self.op.add_uids is not None:
1061
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1062

    
1063
    if self.op.remove_uids is not None:
1064
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1065

    
1066
    if self.op.uid_pool is not None:
1067
      self.cluster.uid_pool = self.op.uid_pool
1068

    
1069
    if self.op.default_iallocator is not None:
1070
      self.cluster.default_iallocator = self.op.default_iallocator
1071

    
1072
    if self.op.reserved_lvs is not None:
1073
      self.cluster.reserved_lvs = self.op.reserved_lvs
1074

    
1075
    if self.op.use_external_mip_script is not None:
1076
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1077

    
1078
    def helper_os(aname, mods, desc):
1079
      desc += " OS list"
1080
      lst = getattr(self.cluster, aname)
1081
      for key, val in mods:
1082
        if key == constants.DDM_ADD:
1083
          if val in lst:
1084
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1085
          else:
1086
            lst.append(val)
1087
        elif key == constants.DDM_REMOVE:
1088
          if val in lst:
1089
            lst.remove(val)
1090
          else:
1091
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1092
        else:
1093
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1094

    
1095
    if self.op.hidden_os:
1096
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1097

    
1098
    if self.op.blacklisted_os:
1099
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1100

    
1101
    if self.op.master_netdev:
1102
      master_params = self.cfg.GetMasterNetworkParameters()
1103
      ems = self.cfg.GetUseExternalMipScript()
1104
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1105
                  self.cluster.master_netdev)
1106
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1107
                                                       master_params, ems)
1108
      if not self.op.force:
1109
        result.Raise("Could not disable the master ip")
1110
      else:
1111
        if result.fail_msg:
1112
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1113
                 result.fail_msg)
1114
          feedback_fn(msg)
1115
      feedback_fn("Changing master_netdev from %s to %s" %
1116
                  (master_params.netdev, self.op.master_netdev))
1117
      self.cluster.master_netdev = self.op.master_netdev
1118

    
1119
    if self.op.master_netmask:
1120
      master_params = self.cfg.GetMasterNetworkParameters()
1121
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1122
      result = self.rpc.call_node_change_master_netmask(
1123
                 master_params.uuid, master_params.netmask,
1124
                 self.op.master_netmask, master_params.ip,
1125
                 master_params.netdev)
1126
      result.Warn("Could not change the master IP netmask", feedback_fn)
1127
      self.cluster.master_netmask = self.op.master_netmask
1128

    
1129
    self.cfg.Update(self.cluster, feedback_fn)
1130

    
1131
    if self.op.master_netdev:
1132
      master_params = self.cfg.GetMasterNetworkParameters()
1133
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1134
                  self.op.master_netdev)
1135
      ems = self.cfg.GetUseExternalMipScript()
1136
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1137
                                                     master_params, ems)
1138
      result.Warn("Could not re-enable the master ip on the master,"
1139
                  " please restart manually", self.LogWarning)
1140

    
1141

    
1142
class LUClusterVerify(NoHooksLU):
1143
  """Submits all jobs necessary to verify the cluster.
1144

1145
  """
1146
  REQ_BGL = False
1147

    
1148
  def ExpandNames(self):
1149
    self.needed_locks = {}
1150

    
1151
  def Exec(self, feedback_fn):
1152
    jobs = []
1153

    
1154
    if self.op.group_name:
1155
      groups = [self.op.group_name]
1156
      depends_fn = lambda: None
1157
    else:
1158
      groups = self.cfg.GetNodeGroupList()
1159

    
1160
      # Verify global configuration
1161
      jobs.append([
1162
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1163
        ])
1164

    
1165
      # Always depend on global verification
1166
      depends_fn = lambda: [(-len(jobs), [])]
1167

    
1168
    jobs.extend(
1169
      [opcodes.OpClusterVerifyGroup(group_name=group,
1170
                                    ignore_errors=self.op.ignore_errors,
1171
                                    depends=depends_fn())]
1172
      for group in groups)
1173

    
1174
    # Fix up all parameters
1175
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1176
      op.debug_simulate_errors = self.op.debug_simulate_errors
1177
      op.verbose = self.op.verbose
1178
      op.error_codes = self.op.error_codes
1179
      try:
1180
        op.skip_checks = self.op.skip_checks
1181
      except AttributeError:
1182
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1183

    
1184
    return ResultWithJobs(jobs)
1185

    
1186

    
1187
class _VerifyErrors(object):
1188
  """Mix-in for cluster/group verify LUs.
1189

1190
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1191
  self.op and self._feedback_fn to be available.)
1192

1193
  """
1194

    
1195
  ETYPE_FIELD = "code"
1196
  ETYPE_ERROR = "ERROR"
1197
  ETYPE_WARNING = "WARNING"
1198

    
1199
  def _Error(self, ecode, item, msg, *args, **kwargs):
1200
    """Format an error message.
1201

1202
    Based on the opcode's error_codes parameter, either format a
1203
    parseable error code, or a simpler error string.
1204

1205
    This must be called only from Exec and functions called from Exec.
1206

1207
    """
1208
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1209
    itype, etxt, _ = ecode
1210
    # If the error code is in the list of ignored errors, demote the error to a
1211
    # warning
1212
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1213
      ltype = self.ETYPE_WARNING
1214
    # first complete the msg
1215
    if args:
1216
      msg = msg % args
1217
    # then format the whole message
1218
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1219
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1220
    else:
1221
      if item:
1222
        item = " " + item
1223
      else:
1224
        item = ""
1225
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1226
    # and finally report it via the feedback_fn
1227
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1228
    # do not mark the operation as failed for WARN cases only
1229
    if ltype == self.ETYPE_ERROR:
1230
      self.bad = True
1231

    
1232
  def _ErrorIf(self, cond, *args, **kwargs):
1233
    """Log an error message if the passed condition is True.
1234

1235
    """
1236
    if (bool(cond)
1237
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1238
      self._Error(*args, **kwargs)
1239

    
1240

    
1241
def _VerifyCertificate(filename):
1242
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1243

1244
  @type filename: string
1245
  @param filename: Path to PEM file
1246

1247
  """
1248
  try:
1249
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1250
                                           utils.ReadFile(filename))
1251
  except Exception, err: # pylint: disable=W0703
1252
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1253
            "Failed to load X509 certificate %s: %s" % (filename, err))
1254

    
1255
  (errcode, msg) = \
1256
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1257
                                constants.SSL_CERT_EXPIRATION_ERROR)
1258

    
1259
  if msg:
1260
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1261
  else:
1262
    fnamemsg = None
1263

    
1264
  if errcode is None:
1265
    return (None, fnamemsg)
1266
  elif errcode == utils.CERT_WARNING:
1267
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1268
  elif errcode == utils.CERT_ERROR:
1269
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1270

    
1271
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1272

    
1273

    
1274
def _GetAllHypervisorParameters(cluster, instances):
1275
  """Compute the set of all hypervisor parameters.
1276

1277
  @type cluster: L{objects.Cluster}
1278
  @param cluster: the cluster object
1279
  @param instances: list of L{objects.Instance}
1280
  @param instances: additional instances from which to obtain parameters
1281
  @rtype: list of (origin, hypervisor, parameters)
1282
  @return: a list with all parameters found, indicating the hypervisor they
1283
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1284

1285
  """
1286
  hvp_data = []
1287

    
1288
  for hv_name in cluster.enabled_hypervisors:
1289
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1290

    
1291
  for os_name, os_hvp in cluster.os_hvp.items():
1292
    for hv_name, hv_params in os_hvp.items():
1293
      if hv_params:
1294
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1295
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1296

    
1297
  # TODO: collapse identical parameter values in a single one
1298
  for instance in instances:
1299
    if instance.hvparams:
1300
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1301
                       cluster.FillHV(instance)))
1302

    
1303
  return hvp_data
1304

    
1305

    
1306
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1307
  """Verifies the cluster config.
1308

1309
  """
1310
  REQ_BGL = False
1311

    
1312
  def _VerifyHVP(self, hvp_data):
1313
    """Verifies locally the syntax of the hypervisor parameters.
1314

1315
    """
1316
    for item, hv_name, hv_params in hvp_data:
1317
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1318
             (item, hv_name))
1319
      try:
1320
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1321
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1322
        hv_class.CheckParameterSyntax(hv_params)
1323
      except errors.GenericError, err:
1324
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1325

    
1326
  def ExpandNames(self):
1327
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1328
    self.share_locks = ShareAll()
1329

    
1330
  def CheckPrereq(self):
1331
    """Check prerequisites.
1332

1333
    """
1334
    # Retrieve all information
1335
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1336
    self.all_node_info = self.cfg.GetAllNodesInfo()
1337
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1338

    
1339
  def Exec(self, feedback_fn):
1340
    """Verify integrity of cluster, performing various test on nodes.
1341

1342
    """
1343
    self.bad = False
1344
    self._feedback_fn = feedback_fn
1345

    
1346
    feedback_fn("* Verifying cluster config")
1347

    
1348
    for msg in self.cfg.VerifyConfig():
1349
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1350

    
1351
    feedback_fn("* Verifying cluster certificate files")
1352

    
1353
    for cert_filename in pathutils.ALL_CERT_FILES:
1354
      (errcode, msg) = _VerifyCertificate(cert_filename)
1355
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1356

    
1357
    feedback_fn("* Verifying hypervisor parameters")
1358

    
1359
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1360
                                                self.all_inst_info.values()))
1361

    
1362
    feedback_fn("* Verifying all nodes belong to an existing group")
1363

    
1364
    # We do this verification here because, should this bogus circumstance
1365
    # occur, it would never be caught by VerifyGroup, which only acts on
1366
    # nodes/instances reachable from existing node groups.
1367

    
1368
    dangling_nodes = set(node for node in self.all_node_info.values()
1369
                         if node.group not in self.all_group_info)
1370

    
1371
    dangling_instances = {}
1372
    no_node_instances = []
1373

    
1374
    for inst in self.all_inst_info.values():
1375
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1376
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1377
      elif inst.primary_node not in self.all_node_info:
1378
        no_node_instances.append(inst.name)
1379

    
1380
    pretty_dangling = [
1381
        "%s (%s)" %
1382
        (node.name,
1383
         utils.CommaJoin(dangling_instances.get(node.uuid,
1384
                                                ["no instances"])))
1385
        for node in dangling_nodes]
1386

    
1387
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1388
                  None,
1389
                  "the following nodes (and their instances) belong to a non"
1390
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1391

    
1392
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1393
                  None,
1394
                  "the following instances have a non-existing primary-node:"
1395
                  " %s", utils.CommaJoin(no_node_instances))
1396

    
1397
    return not self.bad
1398

    
1399

    
1400
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1401
  """Verifies the status of a node group.
1402

1403
  """
1404
  HPATH = "cluster-verify"
1405
  HTYPE = constants.HTYPE_CLUSTER
1406
  REQ_BGL = False
1407

    
1408
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1409

    
1410
  class NodeImage(object):
1411
    """A class representing the logical and physical status of a node.
1412

1413
    @type uuid: string
1414
    @ivar uuid: the node UUID to which this object refers
1415
    @ivar volumes: a structure as returned from
1416
        L{ganeti.backend.GetVolumeList} (runtime)
1417
    @ivar instances: a list of running instances (runtime)
1418
    @ivar pinst: list of configured primary instances (config)
1419
    @ivar sinst: list of configured secondary instances (config)
1420
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1421
        instances for which this node is secondary (config)
1422
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1423
    @ivar dfree: free disk, as reported by the node (runtime)
1424
    @ivar offline: the offline status (config)
1425
    @type rpc_fail: boolean
1426
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1427
        not whether the individual keys were correct) (runtime)
1428
    @type lvm_fail: boolean
1429
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1430
    @type hyp_fail: boolean
1431
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1432
    @type ghost: boolean
1433
    @ivar ghost: whether this is a known node or not (config)
1434
    @type os_fail: boolean
1435
    @ivar os_fail: whether the RPC call didn't return valid OS data
1436
    @type oslist: list
1437
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1438
    @type vm_capable: boolean
1439
    @ivar vm_capable: whether the node can host instances
1440
    @type pv_min: float
1441
    @ivar pv_min: size in MiB of the smallest PVs
1442
    @type pv_max: float
1443
    @ivar pv_max: size in MiB of the biggest PVs
1444

1445
    """
1446
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1447
      self.uuid = uuid
1448
      self.volumes = {}
1449
      self.instances = []
1450
      self.pinst = []
1451
      self.sinst = []
1452
      self.sbp = {}
1453
      self.mfree = 0
1454
      self.dfree = 0
1455
      self.offline = offline
1456
      self.vm_capable = vm_capable
1457
      self.rpc_fail = False
1458
      self.lvm_fail = False
1459
      self.hyp_fail = False
1460
      self.ghost = False
1461
      self.os_fail = False
1462
      self.oslist = {}
1463
      self.pv_min = None
1464
      self.pv_max = None
1465

    
1466
  def ExpandNames(self):
1467
    # This raises errors.OpPrereqError on its own:
1468
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1469

    
1470
    # Get instances in node group; this is unsafe and needs verification later
1471
    inst_names = \
1472
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1473

    
1474
    self.needed_locks = {
1475
      locking.LEVEL_INSTANCE: inst_names,
1476
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1477
      locking.LEVEL_NODE: [],
1478

    
1479
      # This opcode is run by watcher every five minutes and acquires all nodes
1480
      # for a group. It doesn't run for a long time, so it's better to acquire
1481
      # the node allocation lock as well.
1482
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1483
      }
1484

    
1485
    self.share_locks = ShareAll()
1486

    
1487
  def DeclareLocks(self, level):
1488
    if level == locking.LEVEL_NODE:
1489
      # Get members of node group; this is unsafe and needs verification later
1490
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1491

    
1492
      all_inst_info = self.cfg.GetAllInstancesInfo()
1493

    
1494
      # In Exec(), we warn about mirrored instances that have primary and
1495
      # secondary living in separate node groups. To fully verify that
1496
      # volumes for these instances are healthy, we will need to do an
1497
      # extra call to their secondaries. We ensure here those nodes will
1498
      # be locked.
1499
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1500
        # Important: access only the instances whose lock is owned
1501
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1502
          nodes.update(all_inst_info[inst].secondary_nodes)
1503

    
1504
      self.needed_locks[locking.LEVEL_NODE] = nodes
1505

    
1506
  def CheckPrereq(self):
1507
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1508
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1509

    
1510
    group_node_uuids = set(self.group_info.members)
1511
    group_instances = \
1512
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1513

    
1514
    unlocked_node_uuids = \
1515
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1516

    
1517
    unlocked_instances = \
1518
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1519

    
1520
    if unlocked_node_uuids:
1521
      raise errors.OpPrereqError(
1522
        "Missing lock for nodes: %s" %
1523
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1524
        errors.ECODE_STATE)
1525

    
1526
    if unlocked_instances:
1527
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1528
                                 utils.CommaJoin(unlocked_instances),
1529
                                 errors.ECODE_STATE)
1530

    
1531
    self.all_node_info = self.cfg.GetAllNodesInfo()
1532
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1533

    
1534
    self.my_node_uuids = group_node_uuids
1535
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1536
                             for node_uuid in group_node_uuids)
1537

    
1538
    self.my_inst_names = utils.NiceSort(group_instances)
1539
    self.my_inst_info = dict((name, self.all_inst_info[name])
1540
                             for name in self.my_inst_names)
1541

    
1542
    # We detect here the nodes that will need the extra RPC calls for verifying
1543
    # split LV volumes; they should be locked.
1544
    extra_lv_nodes = set()
1545

    
1546
    for inst in self.my_inst_info.values():
1547
      if inst.disk_template in constants.DTS_INT_MIRROR:
1548
        for nuuid in inst.all_nodes:
1549
          if self.all_node_info[nuuid].group != self.group_uuid:
1550
            extra_lv_nodes.add(nuuid)
1551

    
1552
    unlocked_lv_nodes = \
1553
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1554

    
1555
    if unlocked_lv_nodes:
1556
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1557
                                 utils.CommaJoin(unlocked_lv_nodes),
1558
                                 errors.ECODE_STATE)
1559
    self.extra_lv_nodes = list(extra_lv_nodes)
1560

    
1561
  def _VerifyNode(self, ninfo, nresult):
1562
    """Perform some basic validation on data returned from a node.
1563

1564
      - check the result data structure is well formed and has all the
1565
        mandatory fields
1566
      - check ganeti version
1567

1568
    @type ninfo: L{objects.Node}
1569
    @param ninfo: the node to check
1570
    @param nresult: the results from the node
1571
    @rtype: boolean
1572
    @return: whether overall this call was successful (and we can expect
1573
         reasonable values in the respose)
1574

1575
    """
1576
    # main result, nresult should be a non-empty dict
1577
    test = not nresult or not isinstance(nresult, dict)
1578
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1579
                  "unable to verify node: no data returned")
1580
    if test:
1581
      return False
1582

    
1583
    # compares ganeti version
1584
    local_version = constants.PROTOCOL_VERSION
1585
    remote_version = nresult.get("version", None)
1586
    test = not (remote_version and
1587
                isinstance(remote_version, (list, tuple)) and
1588
                len(remote_version) == 2)
1589
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1590
                  "connection to node returned invalid data")
1591
    if test:
1592
      return False
1593

    
1594
    test = local_version != remote_version[0]
1595
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1596
                  "incompatible protocol versions: master %s,"
1597
                  " node %s", local_version, remote_version[0])
1598
    if test:
1599
      return False
1600

    
1601
    # node seems compatible, we can actually try to look into its results
1602

    
1603
    # full package version
1604
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1605
                  constants.CV_ENODEVERSION, ninfo.name,
1606
                  "software version mismatch: master %s, node %s",
1607
                  constants.RELEASE_VERSION, remote_version[1],
1608
                  code=self.ETYPE_WARNING)
1609

    
1610
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1611
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1612
      for hv_name, hv_result in hyp_result.iteritems():
1613
        test = hv_result is not None
1614
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1615
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1616

    
1617
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1618
    if ninfo.vm_capable and isinstance(hvp_result, list):
1619
      for item, hv_name, hv_result in hvp_result:
1620
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1621
                      "hypervisor %s parameter verify failure (source %s): %s",
1622
                      hv_name, item, hv_result)
1623

    
1624
    test = nresult.get(constants.NV_NODESETUP,
1625
                       ["Missing NODESETUP results"])
1626
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1627
                  "node setup error: %s", "; ".join(test))
1628

    
1629
    return True
1630

    
1631
  def _VerifyNodeTime(self, ninfo, nresult,
1632
                      nvinfo_starttime, nvinfo_endtime):
1633
    """Check the node time.
1634

1635
    @type ninfo: L{objects.Node}
1636
    @param ninfo: the node to check
1637
    @param nresult: the remote results for the node
1638
    @param nvinfo_starttime: the start time of the RPC call
1639
    @param nvinfo_endtime: the end time of the RPC call
1640

1641
    """
1642
    ntime = nresult.get(constants.NV_TIME, None)
1643
    try:
1644
      ntime_merged = utils.MergeTime(ntime)
1645
    except (ValueError, TypeError):
1646
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1647
                    "Node returned invalid time")
1648
      return
1649

    
1650
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1651
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1652
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1653
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1654
    else:
1655
      ntime_diff = None
1656

    
1657
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1658
                  "Node time diverges by at least %s from master node time",
1659
                  ntime_diff)
1660

    
1661
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1662
    """Check the node LVM results and update info for cross-node checks.
1663

1664
    @type ninfo: L{objects.Node}
1665
    @param ninfo: the node to check
1666
    @param nresult: the remote results for the node
1667
    @param vg_name: the configured VG name
1668
    @type nimg: L{NodeImage}
1669
    @param nimg: node image
1670

1671
    """
1672
    if vg_name is None:
1673
      return
1674

    
1675
    # checks vg existence and size > 20G
1676
    vglist = nresult.get(constants.NV_VGLIST, None)
1677
    test = not vglist
1678
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1679
                  "unable to check volume groups")
1680
    if not test:
1681
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1682
                                            constants.MIN_VG_SIZE)
1683
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1684

    
1685
    # Check PVs
1686
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1687
    for em in errmsgs:
1688
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1689
    if pvminmax is not None:
1690
      (nimg.pv_min, nimg.pv_max) = pvminmax
1691

    
1692
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1693
    """Check cross-node DRBD version consistency.
1694

1695
    @type node_verify_infos: dict
1696
    @param node_verify_infos: infos about nodes as returned from the
1697
      node_verify call.
1698

1699
    """
1700
    node_versions = {}
1701
    for node_uuid, ndata in node_verify_infos.items():
1702
      nresult = ndata.payload
1703
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1704
      node_versions[node_uuid] = version
1705

    
1706
    if len(set(node_versions.values())) > 1:
1707
      for node_uuid, version in sorted(node_versions.items()):
1708
        msg = "DRBD version mismatch: %s" % version
1709
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1710
                    code=self.ETYPE_WARNING)
1711

    
1712
  def _VerifyGroupLVM(self, node_image, vg_name):
1713
    """Check cross-node consistency in LVM.
1714

1715
    @type node_image: dict
1716
    @param node_image: info about nodes, mapping from node to names to
1717
      L{NodeImage} objects
1718
    @param vg_name: the configured VG name
1719

1720
    """
1721
    if vg_name is None:
1722
      return
1723

    
1724
    # Only exclusive storage needs this kind of checks
1725
    if not self._exclusive_storage:
1726
      return
1727

    
1728
    # exclusive_storage wants all PVs to have the same size (approximately),
1729
    # if the smallest and the biggest ones are okay, everything is fine.
1730
    # pv_min is None iff pv_max is None
1731
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1732
    if not vals:
1733
      return
1734
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1735
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1736
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1737
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1738
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1739
                  " on %s, biggest (%s MB) is on %s",
1740
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
1741
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
1742

    
1743
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1744
    """Check the node bridges.
1745

1746
    @type ninfo: L{objects.Node}
1747
    @param ninfo: the node to check
1748
    @param nresult: the remote results for the node
1749
    @param bridges: the expected list of bridges
1750

1751
    """
1752
    if not bridges:
1753
      return
1754

    
1755
    missing = nresult.get(constants.NV_BRIDGES, None)
1756
    test = not isinstance(missing, list)
1757
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1758
                  "did not return valid bridge information")
1759
    if not test:
1760
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1761
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1762

    
1763
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1764
    """Check the results of user scripts presence and executability on the node
1765

1766
    @type ninfo: L{objects.Node}
1767
    @param ninfo: the node to check
1768
    @param nresult: the remote results for the node
1769

1770
    """
1771
    test = not constants.NV_USERSCRIPTS in nresult
1772
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1773
                  "did not return user scripts information")
1774

    
1775
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1776
    if not test:
1777
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1778
                    "user scripts not present or not executable: %s" %
1779
                    utils.CommaJoin(sorted(broken_scripts)))
1780

    
1781
  def _VerifyNodeNetwork(self, ninfo, nresult):
1782
    """Check the node network connectivity results.
1783

1784
    @type ninfo: L{objects.Node}
1785
    @param ninfo: the node to check
1786
    @param nresult: the remote results for the node
1787

1788
    """
1789
    test = constants.NV_NODELIST not in nresult
1790
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1791
                  "node hasn't returned node ssh connectivity data")
1792
    if not test:
1793
      if nresult[constants.NV_NODELIST]:
1794
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1795
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1796
                        "ssh communication with node '%s': %s", a_node, a_msg)
1797

    
1798
    test = constants.NV_NODENETTEST not in nresult
1799
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1800
                  "node hasn't returned node tcp connectivity data")
1801
    if not test:
1802
      if nresult[constants.NV_NODENETTEST]:
1803
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1804
        for anode in nlist:
1805
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1806
                        "tcp communication with node '%s': %s",
1807
                        anode, nresult[constants.NV_NODENETTEST][anode])
1808

    
1809
    test = constants.NV_MASTERIP not in nresult
1810
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1811
                  "node hasn't returned node master IP reachability data")
1812
    if not test:
1813
      if not nresult[constants.NV_MASTERIP]:
1814
        if ninfo.uuid == self.master_node:
1815
          msg = "the master node cannot reach the master IP (not configured?)"
1816
        else:
1817
          msg = "cannot reach the master IP"
1818
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1819

    
1820
  def _VerifyInstance(self, instance, inst_config, node_image,
1821
                      diskstatus):
1822
    """Verify an instance.
1823

1824
    This function checks to see if the required block devices are
1825
    available on the instance's node, and that the nodes are in the correct
1826
    state.
1827

1828
    """
1829
    pnode = inst_config.primary_node
1830
    pnode_img = node_image[pnode]
1831
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1832

    
1833
    node_vol_should = {}
1834
    inst_config.MapLVsByNode(node_vol_should)
1835

    
1836
    cluster = self.cfg.GetClusterInfo()
1837
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1838
                                                            self.group_info)
1839
    err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1840
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance,
1841
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
1842

    
1843
    for node in node_vol_should:
1844
      n_img = node_image[node]
1845
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1846
        # ignore missing volumes on offline or broken nodes
1847
        continue
1848
      for volume in node_vol_should[node]:
1849
        test = volume not in n_img.volumes
1850
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1851
                      "volume %s missing on node %s", volume,
1852
                      self.cfg.GetNodeName(node))
1853

    
1854
    if inst_config.admin_state == constants.ADMINST_UP:
1855
      test = instance not in pnode_img.instances and not pnode_img.offline
1856
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1857
                    "instance not running on its primary node %s",
1858
                     self.cfg.GetNodeName(pnode))
1859
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1860
                    "instance is marked as running and lives on"
1861
                    " offline node %s", self.cfg.GetNodeName(pnode))
1862

    
1863
    diskdata = [(nname, success, status, idx)
1864
                for (nname, disks) in diskstatus.items()
1865
                for idx, (success, status) in enumerate(disks)]
1866

    
1867
    for nname, success, bdev_status, idx in diskdata:
1868
      # the 'ghost node' construction in Exec() ensures that we have a
1869
      # node here
1870
      snode = node_image[nname]
1871
      bad_snode = snode.ghost or snode.offline
1872
      self._ErrorIf(inst_config.disks_active and
1873
                    not success and not bad_snode,
1874
                    constants.CV_EINSTANCEFAULTYDISK, instance,
1875
                    "couldn't retrieve status for disk/%s on %s: %s",
1876
                    idx, self.cfg.GetNodeName(nname), bdev_status)
1877

    
1878
      if inst_config.disks_active and success and \
1879
         (bdev_status.is_degraded or
1880
          bdev_status.ldisk_status != constants.LDS_OKAY):
1881
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
1882
        if bdev_status.is_degraded:
1883
          msg += " is degraded"
1884
        if bdev_status.ldisk_status != constants.LDS_OKAY:
1885
          msg += "; state is '%s'" % \
1886
                 constants.LDS_NAMES[bdev_status.ldisk_status]
1887

    
1888
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance, msg)
1889

    
1890
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1891
                  constants.CV_ENODERPC, pnode, "instance %s, connection to"
1892
                  " primary node failed", instance)
1893

    
1894
    self._ErrorIf(len(inst_config.secondary_nodes) > 1,
1895
                  constants.CV_EINSTANCELAYOUT,
1896
                  instance, "instance has multiple secondary nodes: %s",
1897
                  utils.CommaJoin(inst_config.secondary_nodes),
1898
                  code=self.ETYPE_WARNING)
1899

    
1900
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
1901
                                               inst_config.all_nodes)
1902
    if any(es_flags.values()):
1903
      if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1904
        # Disk template not compatible with exclusive_storage: no instance
1905
        # node should have the flag set
1906
        es_nodes = [n
1907
                    for (n, es) in es_flags.items()
1908
                    if es]
1909
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance,
1910
                    "instance has template %s, which is not supported on nodes"
1911
                    " that have exclusive storage set: %s",
1912
                    inst_config.disk_template,
1913
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
1914
      for (idx, disk) in enumerate(inst_config.disks):
1915
        self._ErrorIf(disk.spindles is None,
1916
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance,
1917
                      "number of spindles not configured for disk %s while"
1918
                      " exclusive storage is enabled, try running"
1919
                      " gnt-cluster repair-disk-sizes", idx)
1920

    
1921
    if inst_config.disk_template in constants.DTS_INT_MIRROR:
1922
      instance_nodes = utils.NiceSort(inst_config.all_nodes)
1923
      instance_groups = {}
1924

    
1925
      for node in instance_nodes:
1926
        instance_groups.setdefault(self.all_node_info[node].group,
1927
                                   []).append(node)
1928

    
1929
      pretty_list = [
1930
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
1931
                           groupinfo[group].name)
1932
        # Sort so that we always list the primary node first.
1933
        for group, nodes in sorted(instance_groups.items(),
1934
                                   key=lambda (_, nodes): pnode in nodes,
1935
                                   reverse=True)]
1936

    
1937
      self._ErrorIf(len(instance_groups) > 1,
1938
                    constants.CV_EINSTANCESPLITGROUPS,
1939
                    instance, "instance has primary and secondary nodes in"
1940
                    " different groups: %s", utils.CommaJoin(pretty_list),
1941
                    code=self.ETYPE_WARNING)
1942

    
1943
    inst_nodes_offline = []
1944
    for snode in inst_config.secondary_nodes:
1945
      s_img = node_image[snode]
1946
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1947
                    snode, "instance %s, connection to secondary node failed",
1948
                    instance)
1949

    
1950
      if s_img.offline:
1951
        inst_nodes_offline.append(snode)
1952

    
1953
    # warn that the instance lives on offline nodes
1954
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1955
                  "instance has offline secondary node(s) %s",
1956
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
1957
    # ... or ghost/non-vm_capable nodes
1958
    for node in inst_config.all_nodes:
1959
      self._ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1960
                    instance, "instance lives on ghost node %s",
1961
                    self.cfg.GetNodeName(node))
1962
      self._ErrorIf(not node_image[node].vm_capable,
1963
                    constants.CV_EINSTANCEBADNODE, instance,
1964
                    "instance lives on non-vm_capable node %s",
1965
                    self.cfg.GetNodeName(node))
1966

    
1967
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1968
    """Verify if there are any unknown volumes in the cluster.
1969

1970
    The .os, .swap and backup volumes are ignored. All other volumes are
1971
    reported as unknown.
1972

1973
    @type reserved: L{ganeti.utils.FieldSet}
1974
    @param reserved: a FieldSet of reserved volume names
1975

1976
    """
1977
    for node_uuid, n_img in node_image.items():
1978
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1979
          self.all_node_info[node_uuid].group != self.group_uuid):
1980
        # skip non-healthy nodes
1981
        continue
1982
      for volume in n_img.volumes:
1983
        test = ((node_uuid not in node_vol_should or
1984
                volume not in node_vol_should[node_uuid]) and
1985
                not reserved.Matches(volume))
1986
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
1987
                      self.cfg.GetNodeName(node_uuid),
1988
                      "volume %s is unknown", volume)
1989

    
1990
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1991
    """Verify N+1 Memory Resilience.
1992

1993
    Check that if one single node dies we can still start all the
1994
    instances it was primary for.
1995

1996
    """
1997
    cluster_info = self.cfg.GetClusterInfo()
1998
    for node_uuid, n_img in node_image.items():
1999
      # This code checks that every node which is now listed as
2000
      # secondary has enough memory to host all instances it is
2001
      # supposed to should a single other node in the cluster fail.
2002
      # FIXME: not ready for failover to an arbitrary node
2003
      # FIXME: does not support file-backed instances
2004
      # WARNING: we currently take into account down instances as well
2005
      # as up ones, considering that even if they're down someone
2006
      # might want to start them even in the event of a node failure.
2007
      if n_img.offline or \
2008
         self.all_node_info[node_uuid].group != self.group_uuid:
2009
        # we're skipping nodes marked offline and nodes in other groups from
2010
        # the N+1 warning, since most likely we don't have good memory
2011
        # infromation from them; we already list instances living on such
2012
        # nodes, and that's enough warning
2013
        continue
2014
      #TODO(dynmem): also consider ballooning out other instances
2015
      for prinode, instances in n_img.sbp.items():
2016
        needed_mem = 0
2017
        for instance in instances:
2018
          bep = cluster_info.FillBE(instance_cfg[instance])
2019
          if bep[constants.BE_AUTO_BALANCE]:
2020
            needed_mem += bep[constants.BE_MINMEM]
2021
        test = n_img.mfree < needed_mem
2022
        self._ErrorIf(test, constants.CV_ENODEN1,
2023
                      self.cfg.GetNodeName(node_uuid),
2024
                      "not enough memory to accomodate instance failovers"
2025
                      " should node %s fail (%dMiB needed, %dMiB available)",
2026
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2027

    
2028
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2029
                   (files_all, files_opt, files_mc, files_vm)):
2030
    """Verifies file checksums collected from all nodes.
2031

2032
    @param nodes: List of L{objects.Node} objects
2033
    @param master_node_uuid: UUID of master node
2034
    @param all_nvinfo: RPC results
2035

2036
    """
2037
    # Define functions determining which nodes to consider for a file
2038
    files2nodefn = [
2039
      (files_all, None),
2040
      (files_mc, lambda node: (node.master_candidate or
2041
                               node.uuid == master_node_uuid)),
2042
      (files_vm, lambda node: node.vm_capable),
2043
      ]
2044

    
2045
    # Build mapping from filename to list of nodes which should have the file
2046
    nodefiles = {}
2047
    for (files, fn) in files2nodefn:
2048
      if fn is None:
2049
        filenodes = nodes
2050
      else:
2051
        filenodes = filter(fn, nodes)
2052
      nodefiles.update((filename,
2053
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2054
                       for filename in files)
2055

    
2056
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2057

    
2058
    fileinfo = dict((filename, {}) for filename in nodefiles)
2059
    ignore_nodes = set()
2060

    
2061
    for node in nodes:
2062
      if node.offline:
2063
        ignore_nodes.add(node.uuid)
2064
        continue
2065

    
2066
      nresult = all_nvinfo[node.uuid]
2067

    
2068
      if nresult.fail_msg or not nresult.payload:
2069
        node_files = None
2070
      else:
2071
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2072
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2073
                          for (key, value) in fingerprints.items())
2074
        del fingerprints
2075

    
2076
      test = not (node_files and isinstance(node_files, dict))
2077
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2078
                    "Node did not return file checksum data")
2079
      if test:
2080
        ignore_nodes.add(node.uuid)
2081
        continue
2082

    
2083
      # Build per-checksum mapping from filename to nodes having it
2084
      for (filename, checksum) in node_files.items():
2085
        assert filename in nodefiles
2086
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2087

    
2088
    for (filename, checksums) in fileinfo.items():
2089
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2090

    
2091
      # Nodes having the file
2092
      with_file = frozenset(node_uuid
2093
                            for node_uuids in fileinfo[filename].values()
2094
                            for node_uuid in node_uuids) - ignore_nodes
2095

    
2096
      expected_nodes = nodefiles[filename] - ignore_nodes
2097

    
2098
      # Nodes missing file
2099
      missing_file = expected_nodes - with_file
2100

    
2101
      if filename in files_opt:
2102
        # All or no nodes
2103
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2104
                      constants.CV_ECLUSTERFILECHECK, None,
2105
                      "File %s is optional, but it must exist on all or no"
2106
                      " nodes (not found on %s)",
2107
                      filename,
2108
                      utils.CommaJoin(
2109
                        utils.NiceSort(
2110
                          map(self.cfg.GetNodeName, missing_file))))
2111
      else:
2112
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2113
                      "File %s is missing from node(s) %s", filename,
2114
                      utils.CommaJoin(
2115
                        utils.NiceSort(
2116
                          map(self.cfg.GetNodeName, missing_file))))
2117

    
2118
        # Warn if a node has a file it shouldn't
2119
        unexpected = with_file - expected_nodes
2120
        self._ErrorIf(unexpected,
2121
                      constants.CV_ECLUSTERFILECHECK, None,
2122
                      "File %s should not exist on node(s) %s",
2123
                      filename, utils.CommaJoin(
2124
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2125

    
2126
      # See if there are multiple versions of the file
2127
      test = len(checksums) > 1
2128
      if test:
2129
        variants = ["variant %s on %s" %
2130
                    (idx + 1,
2131
                     utils.CommaJoin(utils.NiceSort(
2132
                       map(self.cfg.GetNodeName, node_uuids))))
2133
                    for (idx, (checksum, node_uuids)) in
2134
                      enumerate(sorted(checksums.items()))]
2135
      else:
2136
        variants = []
2137

    
2138
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2139
                    "File %s found with %s different checksums (%s)",
2140
                    filename, len(checksums), "; ".join(variants))
2141

    
2142
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2143
                      drbd_map):
2144
    """Verifies and the node DRBD status.
2145

2146
    @type ninfo: L{objects.Node}
2147
    @param ninfo: the node to check
2148
    @param nresult: the remote results for the node
2149
    @param instanceinfo: the dict of instances
2150
    @param drbd_helper: the configured DRBD usermode helper
2151
    @param drbd_map: the DRBD map as returned by
2152
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2153

2154
    """
2155
    if drbd_helper:
2156
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2157
      test = (helper_result is None)
2158
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2159
                    "no drbd usermode helper returned")
2160
      if helper_result:
2161
        status, payload = helper_result
2162
        test = not status
2163
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2164
                      "drbd usermode helper check unsuccessful: %s", payload)
2165
        test = status and (payload != drbd_helper)
2166
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2167
                      "wrong drbd usermode helper: %s", payload)
2168

    
2169
    # compute the DRBD minors
2170
    node_drbd = {}
2171
    for minor, instance in drbd_map[ninfo.uuid].items():
2172
      test = instance not in instanceinfo
2173
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2174
                    "ghost instance '%s' in temporary DRBD map", instance)
2175
        # ghost instance should not be running, but otherwise we
2176
        # don't give double warnings (both ghost instance and
2177
        # unallocated minor in use)
2178
      if test:
2179
        node_drbd[minor] = (instance, False)
2180
      else:
2181
        instance = instanceinfo[instance]
2182
        node_drbd[minor] = (instance.name, instance.disks_active)
2183

    
2184
    # and now check them
2185
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2186
    test = not isinstance(used_minors, (tuple, list))
2187
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2188
                  "cannot parse drbd status file: %s", str(used_minors))
2189
    if test:
2190
      # we cannot check drbd status
2191
      return
2192

    
2193
    for minor, (iname, must_exist) in node_drbd.items():
2194
      test = minor not in used_minors and must_exist
2195
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2196
                    "drbd minor %d of instance %s is not active", minor, iname)
2197
    for minor in used_minors:
2198
      test = minor not in node_drbd
2199
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2200
                    "unallocated drbd minor %d is in use", minor)
2201

    
2202
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2203
    """Builds the node OS structures.
2204

2205
    @type ninfo: L{objects.Node}
2206
    @param ninfo: the node to check
2207
    @param nresult: the remote results for the node
2208
    @param nimg: the node image object
2209

2210
    """
2211
    remote_os = nresult.get(constants.NV_OSLIST, None)
2212
    test = (not isinstance(remote_os, list) or
2213
            not compat.all(isinstance(v, list) and len(v) == 7
2214
                           for v in remote_os))
2215

    
2216
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2217
                  "node hasn't returned valid OS data")
2218

    
2219
    nimg.os_fail = test
2220

    
2221
    if test:
2222
      return
2223

    
2224
    os_dict = {}
2225

    
2226
    for (name, os_path, status, diagnose,
2227
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2228

    
2229
      if name not in os_dict:
2230
        os_dict[name] = []
2231

    
2232
      # parameters is a list of lists instead of list of tuples due to
2233
      # JSON lacking a real tuple type, fix it:
2234
      parameters = [tuple(v) for v in parameters]
2235
      os_dict[name].append((os_path, status, diagnose,
2236
                            set(variants), set(parameters), set(api_ver)))
2237

    
2238
    nimg.oslist = os_dict
2239

    
2240
  def _VerifyNodeOS(self, ninfo, nimg, base):
2241
    """Verifies the node OS list.
2242

2243
    @type ninfo: L{objects.Node}
2244
    @param ninfo: the node to check
2245
    @param nimg: the node image object
2246
    @param base: the 'template' node we match against (e.g. from the master)
2247

2248
    """
2249
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2250

    
2251
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2252
    for os_name, os_data in nimg.oslist.items():
2253
      assert os_data, "Empty OS status for OS %s?!" % os_name
2254
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2255
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2256
                    "Invalid OS %s (located at %s): %s",
2257
                    os_name, f_path, f_diag)
2258
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2259
                    "OS '%s' has multiple entries"
2260
                    " (first one shadows the rest): %s",
2261
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2262
      # comparisons with the 'base' image
2263
      test = os_name not in base.oslist
2264
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2265
                    "Extra OS %s not present on reference node (%s)",
2266
                    os_name, self.cfg.GetNodeName(base.uuid))
2267
      if test:
2268
        continue
2269
      assert base.oslist[os_name], "Base node has empty OS status?"
2270
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2271
      if not b_status:
2272
        # base OS is invalid, skipping
2273
        continue
2274
      for kind, a, b in [("API version", f_api, b_api),
2275
                         ("variants list", f_var, b_var),
2276
                         ("parameters", beautify_params(f_param),
2277
                          beautify_params(b_param))]:
2278
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2279
                      "OS %s for %s differs from reference node %s:"
2280
                      " [%s] vs. [%s]", kind, os_name,
2281
                      self.cfg.GetNodeName(base.uuid),
2282
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2283

    
2284
    # check any missing OSes
2285
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2286
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2287
                  "OSes present on reference node %s"
2288
                  " but missing on this node: %s",
2289
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2290

    
2291
  def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2292
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2293

2294
    @type ninfo: L{objects.Node}
2295
    @param ninfo: the node to check
2296
    @param nresult: the remote results for the node
2297
    @type is_master: bool
2298
    @param is_master: Whether node is the master node
2299

2300
    """
2301
    if (is_master and
2302
        (constants.ENABLE_FILE_STORAGE or
2303
         constants.ENABLE_SHARED_FILE_STORAGE)):
2304
      try:
2305
        fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2306
      except KeyError:
2307
        # This should never happen
2308
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2309
                      "Node did not return forbidden file storage paths")
2310
      else:
2311
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2312
                      "Found forbidden file storage paths: %s",
2313
                      utils.CommaJoin(fspaths))
2314
    else:
2315
      self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2316
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2317
                    "Node should not have returned forbidden file storage"
2318
                    " paths")
2319

    
2320
  def _VerifyOob(self, ninfo, nresult):
2321
    """Verifies out of band functionality of a node.
2322

2323
    @type ninfo: L{objects.Node}
2324
    @param ninfo: the node to check
2325
    @param nresult: the remote results for the node
2326

2327
    """
2328
    # We just have to verify the paths on master and/or master candidates
2329
    # as the oob helper is invoked on the master
2330
    if ((ninfo.master_candidate or ninfo.master_capable) and
2331
        constants.NV_OOB_PATHS in nresult):
2332
      for path_result in nresult[constants.NV_OOB_PATHS]:
2333
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2334
                      ninfo.name, path_result)
2335

    
2336
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2337
    """Verifies and updates the node volume data.
2338

2339
    This function will update a L{NodeImage}'s internal structures
2340
    with data from the remote call.
2341

2342
    @type ninfo: L{objects.Node}
2343
    @param ninfo: the node to check
2344
    @param nresult: the remote results for the node
2345
    @param nimg: the node image object
2346
    @param vg_name: the configured VG name
2347

2348
    """
2349
    nimg.lvm_fail = True
2350
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2351
    if vg_name is None:
2352
      pass
2353
    elif isinstance(lvdata, basestring):
2354
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2355
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2356
    elif not isinstance(lvdata, dict):
2357
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2358
                    "rpc call to node failed (lvlist)")
2359
    else:
2360
      nimg.volumes = lvdata
2361
      nimg.lvm_fail = False
2362

    
2363
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2364
    """Verifies and updates the node instance list.
2365

2366
    If the listing was successful, then updates this node's instance
2367
    list. Otherwise, it marks the RPC call as failed for the instance
2368
    list key.
2369

2370
    @type ninfo: L{objects.Node}
2371
    @param ninfo: the node to check
2372
    @param nresult: the remote results for the node
2373
    @param nimg: the node image object
2374

2375
    """
2376
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2377
    test = not isinstance(idata, list)
2378
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2379
                  "rpc call to node failed (instancelist): %s",
2380
                  utils.SafeEncode(str(idata)))
2381
    if test:
2382
      nimg.hyp_fail = True
2383
    else:
2384
      nimg.instances = idata
2385

    
2386
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2387
    """Verifies and computes a node information map
2388

2389
    @type ninfo: L{objects.Node}
2390
    @param ninfo: the node to check
2391
    @param nresult: the remote results for the node
2392
    @param nimg: the node image object
2393
    @param vg_name: the configured VG name
2394

2395
    """
2396
    # try to read free memory (from the hypervisor)
2397
    hv_info = nresult.get(constants.NV_HVINFO, None)
2398
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2399
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2400
                  "rpc call to node failed (hvinfo)")
2401
    if not test:
2402
      try:
2403
        nimg.mfree = int(hv_info["memory_free"])
2404
      except (ValueError, TypeError):
2405
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2406
                      "node returned invalid nodeinfo, check hypervisor")
2407

    
2408
    # FIXME: devise a free space model for file based instances as well
2409
    if vg_name is not None:
2410
      test = (constants.NV_VGLIST not in nresult or
2411
              vg_name not in nresult[constants.NV_VGLIST])
2412
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2413
                    "node didn't return data for the volume group '%s'"
2414
                    " - it is either missing or broken", vg_name)
2415
      if not test:
2416
        try:
2417
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2418
        except (ValueError, TypeError):
2419
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2420
                        "node returned invalid LVM info, check LVM status")
2421

    
2422
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2423
    """Gets per-disk status information for all instances.
2424

2425
    @type node_uuids: list of strings
2426
    @param node_uuids: Node UUIDs
2427
    @type node_image: dict of (name, L{objects.Node})
2428
    @param node_image: Node objects
2429
    @type instanceinfo: dict of (name, L{objects.Instance})
2430
    @param instanceinfo: Instance objects
2431
    @rtype: {instance: {node: [(succes, payload)]}}
2432
    @return: a dictionary of per-instance dictionaries with nodes as
2433
        keys and disk information as values; the disk information is a
2434
        list of tuples (success, payload)
2435

2436
    """
2437
    node_disks = {}
2438
    node_disks_devonly = {}
2439
    diskless_instances = set()
2440
    diskless = constants.DT_DISKLESS
2441

    
2442
    for nuuid in node_uuids:
2443
      node_instances = list(itertools.chain(node_image[nuuid].pinst,
2444
                                            node_image[nuuid].sinst))
2445
      diskless_instances.update(inst for inst in node_instances
2446
                                if instanceinfo[inst].disk_template == diskless)
2447
      disks = [(inst, disk)
2448
               for inst in node_instances
2449
               for disk in instanceinfo[inst].disks]
2450

    
2451
      if not disks:
2452
        # No need to collect data
2453
        continue
2454

    
2455
      node_disks[nuuid] = disks
2456

    
2457
      # _AnnotateDiskParams makes already copies of the disks
2458
      devonly = []
2459
      for (inst, dev) in disks:
2460
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2461
        self.cfg.SetDiskID(anno_disk, nuuid)
2462
        devonly.append(anno_disk)
2463

    
2464
      node_disks_devonly[nuuid] = devonly
2465

    
2466
    assert len(node_disks) == len(node_disks_devonly)
2467

    
2468
    # Collect data from all nodes with disks
2469
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2470
                                                          node_disks_devonly)
2471

    
2472
    assert len(result) == len(node_disks)
2473

    
2474
    instdisk = {}
2475

    
2476
    for (nuuid, nres) in result.items():
2477
      node = self.cfg.GetNodeInfo(nuuid)
2478
      disks = node_disks[node.uuid]
2479

    
2480
      if nres.offline:
2481
        # No data from this node
2482
        data = len(disks) * [(False, "node offline")]
2483
      else:
2484
        msg = nres.fail_msg
2485
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2486
                      "while getting disk information: %s", msg)
2487
        if msg:
2488
          # No data from this node
2489
          data = len(disks) * [(False, msg)]
2490
        else:
2491
          data = []
2492
          for idx, i in enumerate(nres.payload):
2493
            if isinstance(i, (tuple, list)) and len(i) == 2:
2494
              data.append(i)
2495
            else:
2496
              logging.warning("Invalid result from node %s, entry %d: %s",
2497
                              node.name, idx, i)
2498
              data.append((False, "Invalid result from the remote node"))
2499

    
2500
      for ((inst, _), status) in zip(disks, data):
2501
        instdisk.setdefault(inst, {}).setdefault(node.uuid, []).append(status)
2502

    
2503
    # Add empty entries for diskless instances.
2504
    for inst in diskless_instances:
2505
      assert inst not in instdisk
2506
      instdisk[inst] = {}
2507

    
2508
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2509
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2510
                      compat.all(isinstance(s, (tuple, list)) and
2511
                                 len(s) == 2 for s in statuses)
2512
                      for inst, nuuids in instdisk.items()
2513
                      for nuuid, statuses in nuuids.items())
2514
    if __debug__:
2515
      instdisk_keys = set(instdisk)
2516
      instanceinfo_keys = set(instanceinfo)
2517
      assert instdisk_keys == instanceinfo_keys, \
2518
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2519
         (instdisk_keys, instanceinfo_keys))
2520

    
2521
    return instdisk
2522

    
2523
  @staticmethod
2524
  def _SshNodeSelector(group_uuid, all_nodes):
2525
    """Create endless iterators for all potential SSH check hosts.
2526

2527
    """
2528
    nodes = [node for node in all_nodes
2529
             if (node.group != group_uuid and
2530
                 not node.offline)]
2531
    keyfunc = operator.attrgetter("group")
2532

    
2533
    return map(itertools.cycle,
2534
               [sorted(map(operator.attrgetter("name"), names))
2535
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2536
                                                  keyfunc)])
2537

    
2538
  @classmethod
2539
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2540
    """Choose which nodes should talk to which other nodes.
2541

2542
    We will make nodes contact all nodes in their group, and one node from
2543
    every other group.
2544

2545
    @warning: This algorithm has a known issue if one node group is much
2546
      smaller than others (e.g. just one node). In such a case all other
2547
      nodes will talk to the single node.
2548

2549
    """
2550
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2551
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2552

    
2553
    return (online_nodes,
2554
            dict((name, sorted([i.next() for i in sel]))
2555
                 for name in online_nodes))
2556

    
2557
  def BuildHooksEnv(self):
2558
    """Build hooks env.
2559

2560
    Cluster-Verify hooks just ran in the post phase and their failure makes
2561
    the output be logged in the verify output and the verification to fail.
2562

2563
    """
2564
    env = {
2565
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2566
      }
2567

    
2568
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2569
               for node in self.my_node_info.values())
2570

    
2571
    return env
2572

    
2573
  def BuildHooksNodes(self):
2574
    """Build hooks nodes.
2575

2576
    """
2577
    return ([], list(self.my_node_info.keys()))
2578

    
2579
  def Exec(self, feedback_fn):
2580
    """Verify integrity of the node group, performing various test on nodes.
2581

2582
    """
2583
    # This method has too many local variables. pylint: disable=R0914
2584
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2585

    
2586
    if not self.my_node_uuids:
2587
      # empty node group
2588
      feedback_fn("* Empty node group, skipping verification")
2589
      return True
2590

    
2591
    self.bad = False
2592
    verbose = self.op.verbose
2593
    self._feedback_fn = feedback_fn
2594

    
2595
    vg_name = self.cfg.GetVGName()
2596
    drbd_helper = self.cfg.GetDRBDHelper()
2597
    cluster = self.cfg.GetClusterInfo()
2598
    hypervisors = cluster.enabled_hypervisors
2599
    node_data_list = self.my_node_info.values()
2600

    
2601
    i_non_redundant = [] # Non redundant instances
2602
    i_non_a_balanced = [] # Non auto-balanced instances
2603
    i_offline = 0 # Count of offline instances
2604
    n_offline = 0 # Count of offline nodes
2605
    n_drained = 0 # Count of nodes being drained
2606
    node_vol_should = {}
2607

    
2608
    # FIXME: verify OS list
2609

    
2610
    # File verification
2611
    filemap = ComputeAncillaryFiles(cluster, False)
2612

    
2613
    # do local checksums
2614
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2615
    master_ip = self.cfg.GetMasterIP()
2616

    
2617
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2618

    
2619
    user_scripts = []
2620
    if self.cfg.GetUseExternalMipScript():
2621
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2622

    
2623
    node_verify_param = {
2624
      constants.NV_FILELIST:
2625
        map(vcluster.MakeVirtualPath,
2626
            utils.UniqueSequence(filename
2627
                                 for files in filemap
2628
                                 for filename in files)),
2629
      constants.NV_NODELIST:
2630
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2631
                                  self.all_node_info.values()),
2632
      constants.NV_HYPERVISOR: hypervisors,
2633
      constants.NV_HVPARAMS:
2634
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2635
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2636
                                 for node in node_data_list
2637
                                 if not node.offline],
2638
      constants.NV_INSTANCELIST: hypervisors,
2639
      constants.NV_VERSION: None,
2640
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2641
      constants.NV_NODESETUP: None,
2642
      constants.NV_TIME: None,
2643
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2644
      constants.NV_OSLIST: None,
2645
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2646
      constants.NV_USERSCRIPTS: user_scripts,
2647
      }
2648

    
2649
    if vg_name is not None:
2650
      node_verify_param[constants.NV_VGLIST] = None
2651
      node_verify_param[constants.NV_LVLIST] = vg_name
2652
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2653

    
2654
    if drbd_helper:
2655
      node_verify_param[constants.NV_DRBDVERSION] = None
2656
      node_verify_param[constants.NV_DRBDLIST] = None
2657
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2658

    
2659
    if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2660
      # Load file storage paths only from master node
2661
      node_verify_param[constants.NV_FILE_STORAGE_PATHS] = \
2662
        self.cfg.GetMasterNodeName()
2663

    
2664
    # bridge checks
2665
    # FIXME: this needs to be changed per node-group, not cluster-wide
2666
    bridges = set()
2667
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2668
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2669
      bridges.add(default_nicpp[constants.NIC_LINK])
2670
    for instance in self.my_inst_info.values():
2671
      for nic in instance.nics:
2672
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2673
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2674
          bridges.add(full_nic[constants.NIC_LINK])
2675

    
2676
    if bridges:
2677
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2678

    
2679
    # Build our expected cluster state
2680
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2681
                                                 uuid=node.uuid,
2682
                                                 vm_capable=node.vm_capable))
2683
                      for node in node_data_list)
2684

    
2685
    # Gather OOB paths
2686
    oob_paths = []
2687
    for node in self.all_node_info.values():
2688
      path = SupportsOob(self.cfg, node)
2689
      if path and path not in oob_paths:
2690
        oob_paths.append(path)
2691

    
2692
    if oob_paths:
2693
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2694

    
2695
    for instance in self.my_inst_names:
2696
      inst_config = self.my_inst_info[instance]
2697
      if inst_config.admin_state == constants.ADMINST_OFFLINE:
2698
        i_offline += 1
2699

    
2700
      for nuuid in inst_config.all_nodes:
2701
        if nuuid not in node_image:
2702
          gnode = self.NodeImage(uuid=nuuid)
2703
          gnode.ghost = (nuuid not in self.all_node_info)
2704
          node_image[nuuid] = gnode
2705

    
2706
      inst_config.MapLVsByNode(node_vol_should)
2707

    
2708
      pnode = inst_config.primary_node
2709
      node_image[pnode].pinst.append(instance)
2710

    
2711
      for snode in inst_config.secondary_nodes:
2712
        nimg = node_image[snode]
2713
        nimg.sinst.append(instance)
2714
        if pnode not in nimg.sbp:
2715
          nimg.sbp[pnode] = []
2716
        nimg.sbp[pnode].append(instance)
2717

    
2718
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2719
                                               self.my_node_info.keys())
2720
    # The value of exclusive_storage should be the same across the group, so if
2721
    # it's True for at least a node, we act as if it were set for all the nodes
2722
    self._exclusive_storage = compat.any(es_flags.values())
2723
    if self._exclusive_storage:
2724
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2725

    
2726
    # At this point, we have the in-memory data structures complete,
2727
    # except for the runtime information, which we'll gather next
2728

    
2729
    # Due to the way our RPC system works, exact response times cannot be
2730
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2731
    # time before and after executing the request, we can at least have a time
2732
    # window.
2733
    nvinfo_starttime = time.time()
2734
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2735
                                           node_verify_param,
2736
                                           self.cfg.GetClusterName(),
2737
                                           self.cfg.GetClusterInfo().hvparams)
2738
    nvinfo_endtime = time.time()
2739

    
2740
    if self.extra_lv_nodes and vg_name is not None:
2741
      extra_lv_nvinfo = \
2742
          self.rpc.call_node_verify(self.extra_lv_nodes,
2743
                                    {constants.NV_LVLIST: vg_name},
2744
                                    self.cfg.GetClusterName(),
2745
                                    self.cfg.GetClusterInfo().hvparams)
2746
    else:
2747
      extra_lv_nvinfo = {}
2748

    
2749
    all_drbd_map = self.cfg.ComputeDRBDMap()
2750

    
2751
    feedback_fn("* Gathering disk information (%s nodes)" %
2752
                len(self.my_node_uuids))
2753
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2754
                                     self.my_inst_info)
2755

    
2756
    feedback_fn("* Verifying configuration file consistency")
2757

    
2758
    # If not all nodes are being checked, we need to make sure the master node
2759
    # and a non-checked vm_capable node are in the list.
2760
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2761
    if absent_node_uuids:
2762
      vf_nvinfo = all_nvinfo.copy()
2763
      vf_node_info = list(self.my_node_info.values())
2764
      additional_node_uuids = []
2765
      if master_node_uuid not in self.my_node_info:
2766
        additional_node_uuids.append(master_node_uuid)
2767
        vf_node_info.append(self.all_node_info[master_node_uuid])
2768
      # Add the first vm_capable node we find which is not included,
2769
      # excluding the master node (which we already have)
2770
      for node_uuid in absent_node_uuids:
2771
        nodeinfo = self.all_node_info[node_uuid]
2772
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2773
            node_uuid != master_node_uuid):
2774
          additional_node_uuids.append(node_uuid)
2775
          vf_node_info.append(self.all_node_info[node_uuid])
2776
          break
2777
      key = constants.NV_FILELIST
2778
      vf_nvinfo.update(self.rpc.call_node_verify(
2779
         additional_node_uuids, {key: node_verify_param[key]},
2780
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2781
    else:
2782
      vf_nvinfo = all_nvinfo
2783
      vf_node_info = self.my_node_info.values()
2784

    
2785
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2786

    
2787
    feedback_fn("* Verifying node status")
2788

    
2789
    refos_img = None
2790

    
2791
    for node_i in node_data_list:
2792
      nimg = node_image[node_i.uuid]
2793

    
2794
      if node_i.offline:
2795
        if verbose:
2796
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
2797
        n_offline += 1
2798
        continue
2799

    
2800
      if node_i.uuid == master_node_uuid:
2801
        ntype = "master"
2802
      elif node_i.master_candidate:
2803
        ntype = "master candidate"
2804
      elif node_i.drained:
2805
        ntype = "drained"
2806
        n_drained += 1
2807
      else:
2808
        ntype = "regular"
2809
      if verbose:
2810
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2811

    
2812
      msg = all_nvinfo[node_i.uuid].fail_msg
2813
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2814
                    "while contacting node: %s", msg)
2815
      if msg:
2816
        nimg.rpc_fail = True
2817
        continue
2818

    
2819
      nresult = all_nvinfo[node_i.uuid].payload
2820

    
2821
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2822
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2823
      self._VerifyNodeNetwork(node_i, nresult)
2824
      self._VerifyNodeUserScripts(node_i, nresult)
2825
      self._VerifyOob(node_i, nresult)
2826
      self._VerifyFileStoragePaths(node_i, nresult,
2827
                                   node_i.uuid == master_node_uuid)
2828

    
2829
      if nimg.vm_capable:
2830
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2831
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2832
                             all_drbd_map)
2833

    
2834
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2835
        self._UpdateNodeInstances(node_i, nresult, nimg)
2836
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2837
        self._UpdateNodeOS(node_i, nresult, nimg)
2838

    
2839
        if not nimg.os_fail:
2840
          if refos_img is None:
2841
            refos_img = nimg
2842
          self._VerifyNodeOS(node_i, nimg, refos_img)
2843
        self._VerifyNodeBridges(node_i, nresult, bridges)
2844

    
2845
        # Check whether all running instancies are primary for the node. (This
2846
        # can no longer be done from _VerifyInstance below, since some of the
2847
        # wrong instances could be from other node groups.)
2848
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2849

    
2850
        for inst in non_primary_inst:
2851
          test = inst in self.all_inst_info
2852
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2853
                        "instance should not run on node %s", node_i.name)
2854
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2855
                        "node is running unknown instance %s", inst)
2856

    
2857
    self._VerifyGroupDRBDVersion(all_nvinfo)
2858
    self._VerifyGroupLVM(node_image, vg_name)
2859

    
2860
    for node_uuid, result in extra_lv_nvinfo.items():
2861
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
2862
                              node_image[node_uuid], vg_name)
2863

    
2864
    feedback_fn("* Verifying instance status")
2865
    for instance in self.my_inst_names:
2866
      if verbose:
2867
        feedback_fn("* Verifying instance %s" % instance)
2868
      inst_config = self.my_inst_info[instance]
2869
      self._VerifyInstance(instance, inst_config, node_image,
2870
                           instdisk[instance])
2871

    
2872
      # If the instance is non-redundant we cannot survive losing its primary
2873
      # node, so we are not N+1 compliant.
2874
      if inst_config.disk_template not in constants.DTS_MIRRORED:
2875
        i_non_redundant.append(instance)
2876

    
2877
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2878
        i_non_a_balanced.append(instance)
2879

    
2880
    feedback_fn("* Verifying orphan volumes")
2881
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2882

    
2883
    # We will get spurious "unknown volume" warnings if any node of this group
2884
    # is secondary for an instance whose primary is in another group. To avoid
2885
    # them, we find these instances and add their volumes to node_vol_should.
2886
    for inst in self.all_inst_info.values():
2887
      for secondary in inst.secondary_nodes:
2888
        if (secondary in self.my_node_info
2889
            and inst.name not in self.my_inst_info):
2890
          inst.MapLVsByNode(node_vol_should)
2891
          break
2892

    
2893
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2894

    
2895
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2896
      feedback_fn("* Verifying N+1 Memory redundancy")
2897
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2898

    
2899
    feedback_fn("* Other Notes")
2900
    if i_non_redundant:
2901
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2902
                  % len(i_non_redundant))
2903

    
2904
    if i_non_a_balanced:
2905
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2906
                  % len(i_non_a_balanced))
2907

    
2908
    if i_offline:
2909
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2910

    
2911
    if n_offline:
2912
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2913

    
2914
    if n_drained:
2915
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2916

    
2917
    return not self.bad
2918

    
2919
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2920
    """Analyze the post-hooks' result
2921

2922
    This method analyses the hook result, handles it, and sends some
2923
    nicely-formatted feedback back to the user.
2924

2925
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2926
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2927
    @param hooks_results: the results of the multi-node hooks rpc call
2928
    @param feedback_fn: function used send feedback back to the caller
2929
    @param lu_result: previous Exec result
2930
    @return: the new Exec result, based on the previous result
2931
        and hook results
2932

2933
    """
2934
    # We only really run POST phase hooks, only for non-empty groups,
2935
    # and are only interested in their results
2936
    if not self.my_node_uuids:
2937
      # empty node group
2938
      pass
2939
    elif phase == constants.HOOKS_PHASE_POST:
2940
      # Used to change hooks' output to proper indentation
2941
      feedback_fn("* Hooks Results")
2942
      assert hooks_results, "invalid result from hooks"
2943

    
2944
      for node_name in hooks_results:
2945
        res = hooks_results[node_name]
2946
        msg = res.fail_msg
2947
        test = msg and not res.offline
2948
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2949
                      "Communication failure in hooks execution: %s", msg)
2950
        if res.offline or msg:
2951
          # No need to investigate payload if node is offline or gave
2952
          # an error.
2953
          continue
2954
        for script, hkr, output in res.payload:
2955
          test = hkr == constants.HKR_FAIL
2956
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2957
                        "Script %s failed, output:", script)
2958
          if test:
2959
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2960
            feedback_fn("%s" % output)
2961
            lu_result = False
2962

    
2963
    return lu_result
2964

    
2965

    
2966
class LUClusterVerifyDisks(NoHooksLU):
2967
  """Verifies the cluster disks status.
2968

2969
  """
2970
  REQ_BGL = False
2971

    
2972
  def ExpandNames(self):
2973
    self.share_locks = ShareAll()
2974
    self.needed_locks = {
2975
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2976
      }
2977

    
2978
  def Exec(self, feedback_fn):
2979
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2980

    
2981
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2982
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2983
                           for group in group_names])