Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ d0d7d7cf

History | View | Annotate | Download (110.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
from ganeti import rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
class LUClusterActivateMasterIp(NoHooksLU):
65
  """Activate the master IP on the master node.
66

67
  """
68
  def Exec(self, feedback_fn):
69
    """Activate the master IP.
70

71
    """
72
    master_params = self.cfg.GetMasterNetworkParameters()
73
    ems = self.cfg.GetUseExternalMipScript()
74
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
75
                                                   master_params, ems)
76
    result.Raise("Could not activate the master IP")
77

    
78

    
79
class LUClusterDeactivateMasterIp(NoHooksLU):
80
  """Deactivate the master IP on the master node.
81

82
  """
83
  def Exec(self, feedback_fn):
84
    """Deactivate the master IP.
85

86
    """
87
    master_params = self.cfg.GetMasterNetworkParameters()
88
    ems = self.cfg.GetUseExternalMipScript()
89
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
90
                                                     master_params, ems)
91
    result.Raise("Could not deactivate the master IP")
92

    
93

    
94
class LUClusterConfigQuery(NoHooksLU):
95
  """Return configuration values.
96

97
  """
98
  REQ_BGL = False
99

    
100
  def CheckArguments(self):
101
    self.cq = ClusterQuery(None, self.op.output_fields, False)
102

    
103
  def ExpandNames(self):
104
    self.cq.ExpandNames(self)
105

    
106
  def DeclareLocks(self, level):
107
    self.cq.DeclareLocks(self, level)
108

    
109
  def Exec(self, feedback_fn):
110
    result = self.cq.OldStyleQuery(self)
111

    
112
    assert len(result) == 1
113

    
114
    return result[0]
115

    
116

    
117
class LUClusterDestroy(LogicalUnit):
118
  """Logical unit for destroying the cluster.
119

120
  """
121
  HPATH = "cluster-destroy"
122
  HTYPE = constants.HTYPE_CLUSTER
123

    
124
  def BuildHooksEnv(self):
125
    """Build hooks env.
126

127
    """
128
    return {
129
      "OP_TARGET": self.cfg.GetClusterName(),
130
      }
131

    
132
  def BuildHooksNodes(self):
133
    """Build hooks nodes.
134

135
    """
136
    return ([], [])
137

    
138
  def CheckPrereq(self):
139
    """Check prerequisites.
140

141
    This checks whether the cluster is empty.
142

143
    Any errors are signaled by raising errors.OpPrereqError.
144

145
    """
146
    master = self.cfg.GetMasterNode()
147

    
148
    nodelist = self.cfg.GetNodeList()
149
    if len(nodelist) != 1 or nodelist[0] != master:
150
      raise errors.OpPrereqError("There are still %d node(s) in"
151
                                 " this cluster." % (len(nodelist) - 1),
152
                                 errors.ECODE_INVAL)
153
    instancelist = self.cfg.GetInstanceList()
154
    if instancelist:
155
      raise errors.OpPrereqError("There are still %d instance(s) in"
156
                                 " this cluster." % len(instancelist),
157
                                 errors.ECODE_INVAL)
158

    
159
  def Exec(self, feedback_fn):
160
    """Destroys the cluster.
161

162
    """
163
    master_params = self.cfg.GetMasterNetworkParameters()
164

    
165
    # Run post hooks on master node before it's removed
166
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
167

    
168
    ems = self.cfg.GetUseExternalMipScript()
169
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
170
                                                     master_params, ems)
171
    result.Warn("Error disabling the master IP address", self.LogWarning)
172
    return master_params.uuid
173

    
174

    
175
class LUClusterPostInit(LogicalUnit):
176
  """Logical unit for running hooks after cluster initialization.
177

178
  """
179
  HPATH = "cluster-init"
180
  HTYPE = constants.HTYPE_CLUSTER
181

    
182
  def BuildHooksEnv(self):
183
    """Build hooks env.
184

185
    """
186
    return {
187
      "OP_TARGET": self.cfg.GetClusterName(),
188
      }
189

    
190
  def BuildHooksNodes(self):
191
    """Build hooks nodes.
192

193
    """
194
    return ([], [self.cfg.GetMasterNode()])
195

    
196
  def Exec(self, feedback_fn):
197
    """Nothing to do.
198

199
    """
200
    return True
201

    
202

    
203
class ClusterQuery(QueryBase):
204
  FIELDS = query.CLUSTER_FIELDS
205

    
206
  #: Do not sort (there is only one item)
207
  SORT_FIELD = None
208

    
209
  def ExpandNames(self, lu):
210
    lu.needed_locks = {}
211

    
212
    # The following variables interact with _QueryBase._GetNames
213
    self.wanted = locking.ALL_SET
214
    self.do_locking = self.use_locking
215

    
216
    if self.do_locking:
217
      raise errors.OpPrereqError("Can not use locking for cluster queries",
218
                                 errors.ECODE_INVAL)
219

    
220
  def DeclareLocks(self, lu, level):
221
    pass
222

    
223
  def _GetQueryData(self, lu):
224
    """Computes the list of nodes and their attributes.
225

226
    """
227
    # Locking is not used
228
    assert not (compat.any(lu.glm.is_owned(level)
229
                           for level in locking.LEVELS
230
                           if level != locking.LEVEL_CLUSTER) or
231
                self.do_locking or self.use_locking)
232

    
233
    if query.CQ_CONFIG in self.requested_data:
234
      cluster = lu.cfg.GetClusterInfo()
235
      nodes = lu.cfg.GetAllNodesInfo()
236
    else:
237
      cluster = NotImplemented
238
      nodes = NotImplemented
239

    
240
    if query.CQ_QUEUE_DRAINED in self.requested_data:
241
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
242
    else:
243
      drain_flag = NotImplemented
244

    
245
    if query.CQ_WATCHER_PAUSE in self.requested_data:
246
      master_node_uuid = lu.cfg.GetMasterNode()
247

    
248
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
250
                   lu.cfg.GetMasterNodeName())
251

    
252
      watcher_pause = result.payload
253
    else:
254
      watcher_pause = NotImplemented
255

    
256
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
257

    
258

    
259
class LUClusterQuery(NoHooksLU):
260
  """Query cluster configuration.
261

262
  """
263
  REQ_BGL = False
264

    
265
  def ExpandNames(self):
266
    self.needed_locks = {}
267

    
268
  def Exec(self, feedback_fn):
269
    """Return cluster config.
270

271
    """
272
    cluster = self.cfg.GetClusterInfo()
273
    os_hvp = {}
274

    
275
    # Filter just for enabled hypervisors
276
    for os_name, hv_dict in cluster.os_hvp.items():
277
      os_hvp[os_name] = {}
278
      for hv_name, hv_params in hv_dict.items():
279
        if hv_name in cluster.enabled_hypervisors:
280
          os_hvp[os_name][hv_name] = hv_params
281

    
282
    # Convert ip_family to ip_version
283
    primary_ip_version = constants.IP4_VERSION
284
    if cluster.primary_ip_family == netutils.IP6Address.family:
285
      primary_ip_version = constants.IP6_VERSION
286

    
287
    result = {
288
      "software_version": constants.RELEASE_VERSION,
289
      "protocol_version": constants.PROTOCOL_VERSION,
290
      "config_version": constants.CONFIG_VERSION,
291
      "os_api_version": max(constants.OS_API_VERSIONS),
292
      "export_version": constants.EXPORT_VERSION,
293
      "architecture": runtime.GetArchInfo(),
294
      "name": cluster.cluster_name,
295
      "master": self.cfg.GetMasterNodeName(),
296
      "default_hypervisor": cluster.primary_hypervisor,
297
      "enabled_hypervisors": cluster.enabled_hypervisors,
298
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
299
                        for hypervisor_name in cluster.enabled_hypervisors]),
300
      "os_hvp": os_hvp,
301
      "beparams": cluster.beparams,
302
      "osparams": cluster.osparams,
303
      "ipolicy": cluster.ipolicy,
304
      "nicparams": cluster.nicparams,
305
      "ndparams": cluster.ndparams,
306
      "diskparams": cluster.diskparams,
307
      "candidate_pool_size": cluster.candidate_pool_size,
308
      "master_netdev": cluster.master_netdev,
309
      "master_netmask": cluster.master_netmask,
310
      "use_external_mip_script": cluster.use_external_mip_script,
311
      "volume_group_name": cluster.volume_group_name,
312
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
313
      "file_storage_dir": cluster.file_storage_dir,
314
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
315
      "maintain_node_health": cluster.maintain_node_health,
316
      "ctime": cluster.ctime,
317
      "mtime": cluster.mtime,
318
      "uuid": cluster.uuid,
319
      "tags": list(cluster.GetTags()),
320
      "uid_pool": cluster.uid_pool,
321
      "default_iallocator": cluster.default_iallocator,
322
      "reserved_lvs": cluster.reserved_lvs,
323
      "primary_ip_version": primary_ip_version,
324
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
325
      "hidden_os": cluster.hidden_os,
326
      "blacklisted_os": cluster.blacklisted_os,
327
      }
328

    
329
    return result
330

    
331

    
332
class LUClusterRedistConf(NoHooksLU):
333
  """Force the redistribution of cluster configuration.
334

335
  This is a very simple LU.
336

337
  """
338
  REQ_BGL = False
339

    
340
  def ExpandNames(self):
341
    self.needed_locks = {
342
      locking.LEVEL_NODE: locking.ALL_SET,
343
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
344
    }
345
    self.share_locks = ShareAll()
346

    
347
  def Exec(self, feedback_fn):
348
    """Redistribute the configuration.
349

350
    """
351
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
352
    RedistributeAncillaryFiles(self)
353

    
354

    
355
class LUClusterRename(LogicalUnit):
356
  """Rename the cluster.
357

358
  """
359
  HPATH = "cluster-rename"
360
  HTYPE = constants.HTYPE_CLUSTER
361

    
362
  def BuildHooksEnv(self):
363
    """Build hooks env.
364

365
    """
366
    return {
367
      "OP_TARGET": self.cfg.GetClusterName(),
368
      "NEW_NAME": self.op.name,
369
      }
370

    
371
  def BuildHooksNodes(self):
372
    """Build hooks nodes.
373

374
    """
375
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
376

    
377
  def CheckPrereq(self):
378
    """Verify that the passed name is a valid one.
379

380
    """
381
    hostname = netutils.GetHostname(name=self.op.name,
382
                                    family=self.cfg.GetPrimaryIPFamily())
383

    
384
    new_name = hostname.name
385
    self.ip = new_ip = hostname.ip
386
    old_name = self.cfg.GetClusterName()
387
    old_ip = self.cfg.GetMasterIP()
388
    if new_name == old_name and new_ip == old_ip:
389
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
390
                                 " cluster has changed",
391
                                 errors.ECODE_INVAL)
392
    if new_ip != old_ip:
393
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
394
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
395
                                   " reachable on the network" %
396
                                   new_ip, errors.ECODE_NOTUNIQUE)
397

    
398
    self.op.name = new_name
399

    
400
  def Exec(self, feedback_fn):
401
    """Rename the cluster.
402

403
    """
404
    clustername = self.op.name
405
    new_ip = self.ip
406

    
407
    # shutdown the master IP
408
    master_params = self.cfg.GetMasterNetworkParameters()
409
    ems = self.cfg.GetUseExternalMipScript()
410
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
411
                                                     master_params, ems)
412
    result.Raise("Could not disable the master role")
413

    
414
    try:
415
      cluster = self.cfg.GetClusterInfo()
416
      cluster.cluster_name = clustername
417
      cluster.master_ip = new_ip
418
      self.cfg.Update(cluster, feedback_fn)
419

    
420
      # update the known hosts file
421
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
422
      node_list = self.cfg.GetOnlineNodeList()
423
      try:
424
        node_list.remove(master_params.uuid)
425
      except ValueError:
426
        pass
427
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
428
    finally:
429
      master_params.ip = new_ip
430
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
431
                                                     master_params, ems)
432
      result.Warn("Could not re-enable the master role on the master,"
433
                  " please restart manually", self.LogWarning)
434

    
435
    return clustername
436

    
437

    
438
class LUClusterRepairDiskSizes(NoHooksLU):
439
  """Verifies the cluster disks sizes.
440

441
  """
442
  REQ_BGL = False
443

    
444
  def ExpandNames(self):
445
    if self.op.instances:
446
      self.wanted_names = GetWantedInstances(self, self.op.instances)
447
      # Not getting the node allocation lock as only a specific set of
448
      # instances (and their nodes) is going to be acquired
449
      self.needed_locks = {
450
        locking.LEVEL_NODE_RES: [],
451
        locking.LEVEL_INSTANCE: self.wanted_names,
452
        }
453
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
454
    else:
455
      self.wanted_names = None
456
      self.needed_locks = {
457
        locking.LEVEL_NODE_RES: locking.ALL_SET,
458
        locking.LEVEL_INSTANCE: locking.ALL_SET,
459

    
460
        # This opcode is acquires the node locks for all instances
461
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
462
        }
463

    
464
    self.share_locks = {
465
      locking.LEVEL_NODE_RES: 1,
466
      locking.LEVEL_INSTANCE: 0,
467
      locking.LEVEL_NODE_ALLOC: 1,
468
      }
469

    
470
  def DeclareLocks(self, level):
471
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
472
      self._LockInstancesNodes(primary_only=True, level=level)
473

    
474
  def CheckPrereq(self):
475
    """Check prerequisites.
476

477
    This only checks the optional instance list against the existing names.
478

479
    """
480
    if self.wanted_names is None:
481
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
482

    
483
    self.wanted_instances = \
484
        map(compat.snd, self.cfg.GetMultiInstanceInfo(self.wanted_names))
485

    
486
  def _EnsureChildSizes(self, disk):
487
    """Ensure children of the disk have the needed disk size.
488

489
    This is valid mainly for DRBD8 and fixes an issue where the
490
    children have smaller disk size.
491

492
    @param disk: an L{ganeti.objects.Disk} object
493

494
    """
495
    if disk.dev_type == constants.LD_DRBD8:
496
      assert disk.children, "Empty children for DRBD8?"
497
      fchild = disk.children[0]
498
      mismatch = fchild.size < disk.size
499
      if mismatch:
500
        self.LogInfo("Child disk has size %d, parent %d, fixing",
501
                     fchild.size, disk.size)
502
        fchild.size = disk.size
503

    
504
      # and we recurse on this child only, not on the metadev
505
      return self._EnsureChildSizes(fchild) or mismatch
506
    else:
507
      return False
508

    
509
  def Exec(self, feedback_fn):
510
    """Verify the size of cluster disks.
511

512
    """
513
    # TODO: check child disks too
514
    # TODO: check differences in size between primary/secondary nodes
515
    per_node_disks = {}
516
    for instance in self.wanted_instances:
517
      pnode = instance.primary_node
518
      if pnode not in per_node_disks:
519
        per_node_disks[pnode] = []
520
      for idx, disk in enumerate(instance.disks):
521
        per_node_disks[pnode].append((instance, idx, disk))
522

    
523
    assert not (frozenset(per_node_disks.keys()) -
524
                self.owned_locks(locking.LEVEL_NODE_RES)), \
525
      "Not owning correct locks"
526
    assert not self.owned_locks(locking.LEVEL_NODE)
527

    
528
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
529
                                               per_node_disks.keys())
530

    
531
    changed = []
532
    for node_uuid, dskl in per_node_disks.items():
533
      newl = [v[2].Copy() for v in dskl]
534
      for dsk in newl:
535
        self.cfg.SetDiskID(dsk, node_uuid)
536
      node_name = self.cfg.GetNodeName(node_uuid)
537
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
538
      if result.fail_msg:
539
        self.LogWarning("Failure in blockdev_getdimensions call to node"
540
                        " %s, ignoring", node_name)
541
        continue
542
      if len(result.payload) != len(dskl):
543
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
544
                        " result.payload=%s", node_name, len(dskl),
545
                        result.payload)
546
        self.LogWarning("Invalid result from node %s, ignoring node results",
547
                        node_name)
548
        continue
549
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
550
        if dimensions is None:
551
          self.LogWarning("Disk %d of instance %s did not return size"
552
                          " information, ignoring", idx, instance.name)
553
          continue
554
        if not isinstance(dimensions, (tuple, list)):
555
          self.LogWarning("Disk %d of instance %s did not return valid"
556
                          " dimension information, ignoring", idx,
557
                          instance.name)
558
          continue
559
        (size, spindles) = dimensions
560
        if not isinstance(size, (int, long)):
561
          self.LogWarning("Disk %d of instance %s did not return valid"
562
                          " size information, ignoring", idx, instance.name)
563
          continue
564
        size = size >> 20
565
        if size != disk.size:
566
          self.LogInfo("Disk %d of instance %s has mismatched size,"
567
                       " correcting: recorded %d, actual %d", idx,
568
                       instance.name, disk.size, size)
569
          disk.size = size
570
          self.cfg.Update(instance, feedback_fn)
571
          changed.append((instance.name, idx, "size", size))
572
        if es_flags[node_uuid]:
573
          if spindles is None:
574
            self.LogWarning("Disk %d of instance %s did not return valid"
575
                            " spindles information, ignoring", idx,
576
                            instance.name)
577
          elif disk.spindles is None or disk.spindles != spindles:
578
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
579
                         " correcting: recorded %s, actual %s",
580
                         idx, instance.name, disk.spindles, spindles)
581
            disk.spindles = spindles
582
            self.cfg.Update(instance, feedback_fn)
583
            changed.append((instance.name, idx, "spindles", disk.spindles))
584
        if self._EnsureChildSizes(disk):
585
          self.cfg.Update(instance, feedback_fn)
586
          changed.append((instance.name, idx, "size", disk.size))
587
    return changed
588

    
589

    
590
def _ValidateNetmask(cfg, netmask):
591
  """Checks if a netmask is valid.
592

593
  @type cfg: L{config.ConfigWriter}
594
  @param cfg: The cluster configuration
595
  @type netmask: int
596
  @param netmask: the netmask to be verified
597
  @raise errors.OpPrereqError: if the validation fails
598

599
  """
600
  ip_family = cfg.GetPrimaryIPFamily()
601
  try:
602
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
603
  except errors.ProgrammerError:
604
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
605
                               ip_family, errors.ECODE_INVAL)
606
  if not ipcls.ValidateNetmask(netmask):
607
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
608
                               (netmask), errors.ECODE_INVAL)
609

    
610

    
611
class LUClusterSetParams(LogicalUnit):
612
  """Change the parameters of the cluster.
613

614
  """
615
  HPATH = "cluster-modify"
616
  HTYPE = constants.HTYPE_CLUSTER
617
  REQ_BGL = False
618

    
619
  def CheckArguments(self):
620
    """Check parameters
621

622
    """
623
    if self.op.uid_pool:
624
      uidpool.CheckUidPool(self.op.uid_pool)
625

    
626
    if self.op.add_uids:
627
      uidpool.CheckUidPool(self.op.add_uids)
628

    
629
    if self.op.remove_uids:
630
      uidpool.CheckUidPool(self.op.remove_uids)
631

    
632
    if self.op.master_netmask is not None:
633
      _ValidateNetmask(self.cfg, self.op.master_netmask)
634

    
635
    if self.op.diskparams:
636
      for dt_params in self.op.diskparams.values():
637
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
638
      try:
639
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
640
      except errors.OpPrereqError, err:
641
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
642
                                   errors.ECODE_INVAL)
643

    
644
  def ExpandNames(self):
645
    # FIXME: in the future maybe other cluster params won't require checking on
646
    # all nodes to be modified.
647
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
648
    # resource locks the right thing, shouldn't it be the BGL instead?
649
    self.needed_locks = {
650
      locking.LEVEL_NODE: locking.ALL_SET,
651
      locking.LEVEL_INSTANCE: locking.ALL_SET,
652
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
653
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
654
    }
655
    self.share_locks = ShareAll()
656

    
657
  def BuildHooksEnv(self):
658
    """Build hooks env.
659

660
    """
661
    return {
662
      "OP_TARGET": self.cfg.GetClusterName(),
663
      "NEW_VG_NAME": self.op.vg_name,
664
      }
665

    
666
  def BuildHooksNodes(self):
667
    """Build hooks nodes.
668

669
    """
670
    mn = self.cfg.GetMasterNode()
671
    return ([mn], [mn])
672

    
673
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
674
                   new_enabled_disk_templates):
675
    """Check the consistency of the vg name on all nodes and in case it gets
676
       unset whether there are instances still using it.
677

678
    """
679
    if self.op.vg_name is not None and not self.op.vg_name:
680
      if self.cfg.HasAnyDiskOfType(constants.LD_LV):
681
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
682
                                   " instances exist", errors.ECODE_INVAL)
683

    
684
    if (self.op.vg_name is not None and
685
        utils.IsLvmEnabled(enabled_disk_templates)) or \
686
           (self.cfg.GetVGName() is not None and
687
            utils.LvmGetsEnabled(enabled_disk_templates,
688
                                 new_enabled_disk_templates)):
689
      self._CheckVgNameOnNodes(node_uuids)
690

    
691
  def _CheckVgNameOnNodes(self, node_uuids):
692
    """Check the status of the volume group on each node.
693

694
    """
695
    vglist = self.rpc.call_vg_list(node_uuids)
696
    for node_uuid in node_uuids:
697
      msg = vglist[node_uuid].fail_msg
698
      if msg:
699
        # ignoring down node
700
        self.LogWarning("Error while gathering data on node %s"
701
                        " (ignoring node): %s",
702
                        self.cfg.GetNodeName(node_uuid), msg)
703
        continue
704
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
705
                                            self.op.vg_name,
706
                                            constants.MIN_VG_SIZE)
707
      if vgstatus:
708
        raise errors.OpPrereqError("Error on node '%s': %s" %
709
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
710
                                   errors.ECODE_ENVIRON)
711

    
712
  def _GetEnabledDiskTemplates(self, cluster):
713
    """Determines the enabled disk templates and the subset of disk templates
714
       that are newly enabled by this operation.
715

716
    """
717
    enabled_disk_templates = None
718
    new_enabled_disk_templates = []
719
    if self.op.enabled_disk_templates:
720
      enabled_disk_templates = self.op.enabled_disk_templates
721
      new_enabled_disk_templates = \
722
        list(set(enabled_disk_templates)
723
             - set(cluster.enabled_disk_templates))
724
    else:
725
      enabled_disk_templates = cluster.enabled_disk_templates
726
    return (enabled_disk_templates, new_enabled_disk_templates)
727

    
728
  def CheckPrereq(self):
729
    """Check prerequisites.
730

731
    This checks whether the given params don't conflict and
732
    if the given volume group is valid.
733

734
    """
735
    if self.op.drbd_helper is not None and not self.op.drbd_helper:
736
      if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
737
        raise errors.OpPrereqError("Cannot disable drbd helper while"
738
                                   " drbd-based instances exist",
739
                                   errors.ECODE_INVAL)
740

    
741
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
742
    self.cluster = cluster = self.cfg.GetClusterInfo()
743

    
744
    vm_capable_node_uuids = [node.uuid
745
                             for node in self.cfg.GetAllNodesInfo().values()
746
                             if node.uuid in node_uuids and node.vm_capable]
747

    
748
    (enabled_disk_templates, new_enabled_disk_templates) = \
749
      self._GetEnabledDiskTemplates(cluster)
750

    
751
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
752
                      new_enabled_disk_templates)
753

    
754
    if self.op.drbd_helper:
755
      # checks given drbd helper on all nodes
756
      helpers = self.rpc.call_drbd_helper(node_uuids)
757
      for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
758
        if ninfo.offline:
759
          self.LogInfo("Not checking drbd helper on offline node %s",
760
                       ninfo.name)
761
          continue
762
        msg = helpers[ninfo.uuid].fail_msg
763
        if msg:
764
          raise errors.OpPrereqError("Error checking drbd helper on node"
765
                                     " '%s': %s" % (ninfo.name, msg),
766
                                     errors.ECODE_ENVIRON)
767
        node_helper = helpers[ninfo.uuid].payload
768
        if node_helper != self.op.drbd_helper:
769
          raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
770
                                     (ninfo.name, node_helper),
771
                                     errors.ECODE_ENVIRON)
772

    
773
    # validate params changes
774
    if self.op.beparams:
775
      objects.UpgradeBeParams(self.op.beparams)
776
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
777
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
778

    
779
    if self.op.ndparams:
780
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
781
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
782

    
783
      # TODO: we need a more general way to handle resetting
784
      # cluster-level parameters to default values
785
      if self.new_ndparams["oob_program"] == "":
786
        self.new_ndparams["oob_program"] = \
787
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
788

    
789
    if self.op.hv_state:
790
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
791
                                           self.cluster.hv_state_static)
792
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
793
                               for hv, values in new_hv_state.items())
794

    
795
    if self.op.disk_state:
796
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
797
                                               self.cluster.disk_state_static)
798
      self.new_disk_state = \
799
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
800
                            for name, values in svalues.items()))
801
             for storage, svalues in new_disk_state.items())
802

    
803
    if self.op.ipolicy:
804
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
805
                                           group_policy=False)
806

    
807
      all_instances = self.cfg.GetAllInstancesInfo().values()
808
      violations = set()
809
      for group in self.cfg.GetAllNodeGroupsInfo().values():
810
        instances = frozenset([inst for inst in all_instances
811
                               if compat.any(nuuid in group.members
812
                                             for nuuid in inst.all_nodes)])
813
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
814
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
815
        new = ComputeNewInstanceViolations(ipol,
816
                                           new_ipolicy, instances, self.cfg)
817
        if new:
818
          violations.update(new)
819

    
820
      if violations:
821
        self.LogWarning("After the ipolicy change the following instances"
822
                        " violate them: %s",
823
                        utils.CommaJoin(utils.NiceSort(violations)))
824

    
825
    if self.op.nicparams:
826
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
827
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
828
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
829
      nic_errors = []
830

    
831
      # check all instances for consistency
832
      for instance in self.cfg.GetAllInstancesInfo().values():
833
        for nic_idx, nic in enumerate(instance.nics):
834
          params_copy = copy.deepcopy(nic.nicparams)
835
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
836

    
837
          # check parameter syntax
838
          try:
839
            objects.NIC.CheckParameterSyntax(params_filled)
840
          except errors.ConfigurationError, err:
841
            nic_errors.append("Instance %s, nic/%d: %s" %
842
                              (instance.name, nic_idx, err))
843

    
844
          # if we're moving instances to routed, check that they have an ip
845
          target_mode = params_filled[constants.NIC_MODE]
846
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
847
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
848
                              " address" % (instance.name, nic_idx))
849
      if nic_errors:
850
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
851
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
852

    
853
    # hypervisor list/parameters
854
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
855
    if self.op.hvparams:
856
      for hv_name, hv_dict in self.op.hvparams.items():
857
        if hv_name not in self.new_hvparams:
858
          self.new_hvparams[hv_name] = hv_dict
859
        else:
860
          self.new_hvparams[hv_name].update(hv_dict)
861

    
862
    # disk template parameters
863
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
864
    if self.op.diskparams:
865
      for dt_name, dt_params in self.op.diskparams.items():
866
        if dt_name not in self.op.diskparams:
867
          self.new_diskparams[dt_name] = dt_params
868
        else:
869
          self.new_diskparams[dt_name].update(dt_params)
870

    
871
    # os hypervisor parameters
872
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
873
    if self.op.os_hvp:
874
      for os_name, hvs in self.op.os_hvp.items():
875
        if os_name not in self.new_os_hvp:
876
          self.new_os_hvp[os_name] = hvs
877
        else:
878
          for hv_name, hv_dict in hvs.items():
879
            if hv_dict is None:
880
              # Delete if it exists
881
              self.new_os_hvp[os_name].pop(hv_name, None)
882
            elif hv_name not in self.new_os_hvp[os_name]:
883
              self.new_os_hvp[os_name][hv_name] = hv_dict
884
            else:
885
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
886

    
887
    # os parameters
888
    self.new_osp = objects.FillDict(cluster.osparams, {})
889
    if self.op.osparams:
890
      for os_name, osp in self.op.osparams.items():
891
        if os_name not in self.new_osp:
892
          self.new_osp[os_name] = {}
893

    
894
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
895
                                                 use_none=True)
896

    
897
        if not self.new_osp[os_name]:
898
          # we removed all parameters
899
          del self.new_osp[os_name]
900
        else:
901
          # check the parameter validity (remote check)
902
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
903
                        os_name, self.new_osp[os_name])
904

    
905
    # changes to the hypervisor list
906
    if self.op.enabled_hypervisors is not None:
907
      self.hv_list = self.op.enabled_hypervisors
908
      for hv in self.hv_list:
909
        # if the hypervisor doesn't already exist in the cluster
910
        # hvparams, we initialize it to empty, and then (in both
911
        # cases) we make sure to fill the defaults, as we might not
912
        # have a complete defaults list if the hypervisor wasn't
913
        # enabled before
914
        if hv not in new_hvp:
915
          new_hvp[hv] = {}
916
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
917
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
918
    else:
919
      self.hv_list = cluster.enabled_hypervisors
920

    
921
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
922
      # either the enabled list has changed, or the parameters have, validate
923
      for hv_name, hv_params in self.new_hvparams.items():
924
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
925
            (self.op.enabled_hypervisors and
926
             hv_name in self.op.enabled_hypervisors)):
927
          # either this is a new hypervisor, or its parameters have changed
928
          hv_class = hypervisor.GetHypervisorClass(hv_name)
929
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
930
          hv_class.CheckParameterSyntax(hv_params)
931
          CheckHVParams(self, node_uuids, hv_name, hv_params)
932

    
933
    self._CheckDiskTemplateConsistency()
934

    
935
    if self.op.os_hvp:
936
      # no need to check any newly-enabled hypervisors, since the
937
      # defaults have already been checked in the above code-block
938
      for os_name, os_hvp in self.new_os_hvp.items():
939
        for hv_name, hv_params in os_hvp.items():
940
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
941
          # we need to fill in the new os_hvp on top of the actual hv_p
942
          cluster_defaults = self.new_hvparams.get(hv_name, {})
943
          new_osp = objects.FillDict(cluster_defaults, hv_params)
944
          hv_class = hypervisor.GetHypervisorClass(hv_name)
945
          hv_class.CheckParameterSyntax(new_osp)
946
          CheckHVParams(self, node_uuids, hv_name, new_osp)
947

    
948
    if self.op.default_iallocator:
949
      alloc_script = utils.FindFile(self.op.default_iallocator,
950
                                    constants.IALLOCATOR_SEARCH_PATH,
951
                                    os.path.isfile)
952
      if alloc_script is None:
953
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
954
                                   " specified" % self.op.default_iallocator,
955
                                   errors.ECODE_INVAL)
956

    
957
  def _CheckDiskTemplateConsistency(self):
958
    """Check whether the disk templates that are going to be disabled
959
       are still in use by some instances.
960

961
    """
962
    if self.op.enabled_disk_templates:
963
      cluster = self.cfg.GetClusterInfo()
964
      instances = self.cfg.GetAllInstancesInfo()
965

    
966
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
967
        - set(self.op.enabled_disk_templates)
968
      for instance in instances.itervalues():
969
        if instance.disk_template in disk_templates_to_remove:
970
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
971
                                     " because instance '%s' is using it." %
972
                                     (instance.disk_template, instance.name))
973

    
974
  def _SetVgName(self, feedback_fn):
975
    """Determines and sets the new volume group name.
976

977
    """
978
    if self.op.vg_name is not None:
979
      if self.op.vg_name and not \
980
           utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
981
        feedback_fn("Note that you specified a volume group, but did not"
982
                    " enable any lvm disk template.")
983
      new_volume = self.op.vg_name
984
      if not new_volume:
985
        if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
986
          raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
987
                                     " disk templates are enabled.")
988
        new_volume = None
989
      if new_volume != self.cfg.GetVGName():
990
        self.cfg.SetVGName(new_volume)
991
      else:
992
        feedback_fn("Cluster LVM configuration already in desired"
993
                    " state, not changing")
994
    else:
995
      if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
996
          not self.cfg.GetVGName():
997
        raise errors.OpPrereqError("Please specify a volume group when"
998
                                   " enabling lvm-based disk-templates.")
999

    
1000
  def Exec(self, feedback_fn):
1001
    """Change the parameters of the cluster.
1002

1003
    """
1004
    if self.op.enabled_disk_templates:
1005
      self.cluster.enabled_disk_templates = \
1006
        list(set(self.op.enabled_disk_templates))
1007

    
1008
    self._SetVgName(feedback_fn)
1009

    
1010
    if self.op.drbd_helper is not None:
1011
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1012
        feedback_fn("Note that you specified a drbd user helper, but did"
1013
                    " enabled the drbd disk template.")
1014
      new_helper = self.op.drbd_helper
1015
      if not new_helper:
1016
        new_helper = None
1017
      if new_helper != self.cfg.GetDRBDHelper():
1018
        self.cfg.SetDRBDHelper(new_helper)
1019
      else:
1020
        feedback_fn("Cluster DRBD helper already in desired state,"
1021
                    " not changing")
1022
    if self.op.hvparams:
1023
      self.cluster.hvparams = self.new_hvparams
1024
    if self.op.os_hvp:
1025
      self.cluster.os_hvp = self.new_os_hvp
1026
    if self.op.enabled_hypervisors is not None:
1027
      self.cluster.hvparams = self.new_hvparams
1028
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1029
    if self.op.beparams:
1030
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1031
    if self.op.nicparams:
1032
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1033
    if self.op.ipolicy:
1034
      self.cluster.ipolicy = self.new_ipolicy
1035
    if self.op.osparams:
1036
      self.cluster.osparams = self.new_osp
1037
    if self.op.ndparams:
1038
      self.cluster.ndparams = self.new_ndparams
1039
    if self.op.diskparams:
1040
      self.cluster.diskparams = self.new_diskparams
1041
    if self.op.hv_state:
1042
      self.cluster.hv_state_static = self.new_hv_state
1043
    if self.op.disk_state:
1044
      self.cluster.disk_state_static = self.new_disk_state
1045

    
1046
    if self.op.candidate_pool_size is not None:
1047
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1048
      # we need to update the pool size here, otherwise the save will fail
1049
      AdjustCandidatePool(self, [])
1050

    
1051
    if self.op.maintain_node_health is not None:
1052
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1053
        feedback_fn("Note: CONFD was disabled at build time, node health"
1054
                    " maintenance is not useful (still enabling it)")
1055
      self.cluster.maintain_node_health = self.op.maintain_node_health
1056

    
1057
    if self.op.prealloc_wipe_disks is not None:
1058
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1059

    
1060
    if self.op.add_uids is not None:
1061
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1062

    
1063
    if self.op.remove_uids is not None:
1064
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1065

    
1066
    if self.op.uid_pool is not None:
1067
      self.cluster.uid_pool = self.op.uid_pool
1068

    
1069
    if self.op.default_iallocator is not None:
1070
      self.cluster.default_iallocator = self.op.default_iallocator
1071

    
1072
    if self.op.reserved_lvs is not None:
1073
      self.cluster.reserved_lvs = self.op.reserved_lvs
1074

    
1075
    if self.op.use_external_mip_script is not None:
1076
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1077

    
1078
    def helper_os(aname, mods, desc):
1079
      desc += " OS list"
1080
      lst = getattr(self.cluster, aname)
1081
      for key, val in mods:
1082
        if key == constants.DDM_ADD:
1083
          if val in lst:
1084
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1085
          else:
1086
            lst.append(val)
1087
        elif key == constants.DDM_REMOVE:
1088
          if val in lst:
1089
            lst.remove(val)
1090
          else:
1091
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1092
        else:
1093
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1094

    
1095
    if self.op.hidden_os:
1096
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1097

    
1098
    if self.op.blacklisted_os:
1099
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1100

    
1101
    if self.op.master_netdev:
1102
      master_params = self.cfg.GetMasterNetworkParameters()
1103
      ems = self.cfg.GetUseExternalMipScript()
1104
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1105
                  self.cluster.master_netdev)
1106
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1107
                                                       master_params, ems)
1108
      result.Raise("Could not disable the master ip")
1109
      feedback_fn("Changing master_netdev from %s to %s" %
1110
                  (master_params.netdev, self.op.master_netdev))
1111
      self.cluster.master_netdev = self.op.master_netdev
1112

    
1113
    if self.op.master_netmask:
1114
      master_params = self.cfg.GetMasterNetworkParameters()
1115
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1116
      result = self.rpc.call_node_change_master_netmask(
1117
                 master_params.uuid, master_params.netmask,
1118
                 self.op.master_netmask, master_params.ip,
1119
                 master_params.netdev)
1120
      result.Warn("Could not change the master IP netmask", feedback_fn)
1121
      self.cluster.master_netmask = self.op.master_netmask
1122

    
1123
    self.cfg.Update(self.cluster, feedback_fn)
1124

    
1125
    if self.op.master_netdev:
1126
      master_params = self.cfg.GetMasterNetworkParameters()
1127
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1128
                  self.op.master_netdev)
1129
      ems = self.cfg.GetUseExternalMipScript()
1130
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1131
                                                     master_params, ems)
1132
      result.Warn("Could not re-enable the master ip on the master,"
1133
                  " please restart manually", self.LogWarning)
1134

    
1135

    
1136
class LUClusterVerify(NoHooksLU):
1137
  """Submits all jobs necessary to verify the cluster.
1138

1139
  """
1140
  REQ_BGL = False
1141

    
1142
  def ExpandNames(self):
1143
    self.needed_locks = {}
1144

    
1145
  def Exec(self, feedback_fn):
1146
    jobs = []
1147

    
1148
    if self.op.group_name:
1149
      groups = [self.op.group_name]
1150
      depends_fn = lambda: None
1151
    else:
1152
      groups = self.cfg.GetNodeGroupList()
1153

    
1154
      # Verify global configuration
1155
      jobs.append([
1156
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1157
        ])
1158

    
1159
      # Always depend on global verification
1160
      depends_fn = lambda: [(-len(jobs), [])]
1161

    
1162
    jobs.extend(
1163
      [opcodes.OpClusterVerifyGroup(group_name=group,
1164
                                    ignore_errors=self.op.ignore_errors,
1165
                                    depends=depends_fn())]
1166
      for group in groups)
1167

    
1168
    # Fix up all parameters
1169
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1170
      op.debug_simulate_errors = self.op.debug_simulate_errors
1171
      op.verbose = self.op.verbose
1172
      op.error_codes = self.op.error_codes
1173
      try:
1174
        op.skip_checks = self.op.skip_checks
1175
      except AttributeError:
1176
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1177

    
1178
    return ResultWithJobs(jobs)
1179

    
1180

    
1181
class _VerifyErrors(object):
1182
  """Mix-in for cluster/group verify LUs.
1183

1184
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1185
  self.op and self._feedback_fn to be available.)
1186

1187
  """
1188

    
1189
  ETYPE_FIELD = "code"
1190
  ETYPE_ERROR = "ERROR"
1191
  ETYPE_WARNING = "WARNING"
1192

    
1193
  def _Error(self, ecode, item, msg, *args, **kwargs):
1194
    """Format an error message.
1195

1196
    Based on the opcode's error_codes parameter, either format a
1197
    parseable error code, or a simpler error string.
1198

1199
    This must be called only from Exec and functions called from Exec.
1200

1201
    """
1202
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1203
    itype, etxt, _ = ecode
1204
    # If the error code is in the list of ignored errors, demote the error to a
1205
    # warning
1206
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1207
      ltype = self.ETYPE_WARNING
1208
    # first complete the msg
1209
    if args:
1210
      msg = msg % args
1211
    # then format the whole message
1212
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1213
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1214
    else:
1215
      if item:
1216
        item = " " + item
1217
      else:
1218
        item = ""
1219
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1220
    # and finally report it via the feedback_fn
1221
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1222
    # do not mark the operation as failed for WARN cases only
1223
    if ltype == self.ETYPE_ERROR:
1224
      self.bad = True
1225

    
1226
  def _ErrorIf(self, cond, *args, **kwargs):
1227
    """Log an error message if the passed condition is True.
1228

1229
    """
1230
    if (bool(cond)
1231
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1232
      self._Error(*args, **kwargs)
1233

    
1234

    
1235
def _VerifyCertificate(filename):
1236
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1237

1238
  @type filename: string
1239
  @param filename: Path to PEM file
1240

1241
  """
1242
  try:
1243
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1244
                                           utils.ReadFile(filename))
1245
  except Exception, err: # pylint: disable=W0703
1246
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1247
            "Failed to load X509 certificate %s: %s" % (filename, err))
1248

    
1249
  (errcode, msg) = \
1250
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1251
                                constants.SSL_CERT_EXPIRATION_ERROR)
1252

    
1253
  if msg:
1254
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1255
  else:
1256
    fnamemsg = None
1257

    
1258
  if errcode is None:
1259
    return (None, fnamemsg)
1260
  elif errcode == utils.CERT_WARNING:
1261
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1262
  elif errcode == utils.CERT_ERROR:
1263
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1264

    
1265
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1266

    
1267

    
1268
def _GetAllHypervisorParameters(cluster, instances):
1269
  """Compute the set of all hypervisor parameters.
1270

1271
  @type cluster: L{objects.Cluster}
1272
  @param cluster: the cluster object
1273
  @param instances: list of L{objects.Instance}
1274
  @param instances: additional instances from which to obtain parameters
1275
  @rtype: list of (origin, hypervisor, parameters)
1276
  @return: a list with all parameters found, indicating the hypervisor they
1277
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1278

1279
  """
1280
  hvp_data = []
1281

    
1282
  for hv_name in cluster.enabled_hypervisors:
1283
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1284

    
1285
  for os_name, os_hvp in cluster.os_hvp.items():
1286
    for hv_name, hv_params in os_hvp.items():
1287
      if hv_params:
1288
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1289
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1290

    
1291
  # TODO: collapse identical parameter values in a single one
1292
  for instance in instances:
1293
    if instance.hvparams:
1294
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1295
                       cluster.FillHV(instance)))
1296

    
1297
  return hvp_data
1298

    
1299

    
1300
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1301
  """Verifies the cluster config.
1302

1303
  """
1304
  REQ_BGL = False
1305

    
1306
  def _VerifyHVP(self, hvp_data):
1307
    """Verifies locally the syntax of the hypervisor parameters.
1308

1309
    """
1310
    for item, hv_name, hv_params in hvp_data:
1311
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1312
             (item, hv_name))
1313
      try:
1314
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1315
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1316
        hv_class.CheckParameterSyntax(hv_params)
1317
      except errors.GenericError, err:
1318
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1319

    
1320
  def ExpandNames(self):
1321
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1322
    self.share_locks = ShareAll()
1323

    
1324
  def CheckPrereq(self):
1325
    """Check prerequisites.
1326

1327
    """
1328
    # Retrieve all information
1329
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1330
    self.all_node_info = self.cfg.GetAllNodesInfo()
1331
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1332

    
1333
  def Exec(self, feedback_fn):
1334
    """Verify integrity of cluster, performing various test on nodes.
1335

1336
    """
1337
    self.bad = False
1338
    self._feedback_fn = feedback_fn
1339

    
1340
    feedback_fn("* Verifying cluster config")
1341

    
1342
    for msg in self.cfg.VerifyConfig():
1343
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1344

    
1345
    feedback_fn("* Verifying cluster certificate files")
1346

    
1347
    for cert_filename in pathutils.ALL_CERT_FILES:
1348
      (errcode, msg) = _VerifyCertificate(cert_filename)
1349
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1350

    
1351
    feedback_fn("* Verifying hypervisor parameters")
1352

    
1353
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1354
                                                self.all_inst_info.values()))
1355

    
1356
    feedback_fn("* Verifying all nodes belong to an existing group")
1357

    
1358
    # We do this verification here because, should this bogus circumstance
1359
    # occur, it would never be caught by VerifyGroup, which only acts on
1360
    # nodes/instances reachable from existing node groups.
1361

    
1362
    dangling_nodes = set(node for node in self.all_node_info.values()
1363
                         if node.group not in self.all_group_info)
1364

    
1365
    dangling_instances = {}
1366
    no_node_instances = []
1367

    
1368
    for inst in self.all_inst_info.values():
1369
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1370
        dangling_instances.setdefault(inst.primary_node, []).append(inst.name)
1371
      elif inst.primary_node not in self.all_node_info:
1372
        no_node_instances.append(inst.name)
1373

    
1374
    pretty_dangling = [
1375
        "%s (%s)" %
1376
        (node.name,
1377
         utils.CommaJoin(dangling_instances.get(node.uuid,
1378
                                                ["no instances"])))
1379
        for node in dangling_nodes]
1380

    
1381
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1382
                  None,
1383
                  "the following nodes (and their instances) belong to a non"
1384
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1385

    
1386
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1387
                  None,
1388
                  "the following instances have a non-existing primary-node:"
1389
                  " %s", utils.CommaJoin(no_node_instances))
1390

    
1391
    return not self.bad
1392

    
1393

    
1394
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1395
  """Verifies the status of a node group.
1396

1397
  """
1398
  HPATH = "cluster-verify"
1399
  HTYPE = constants.HTYPE_CLUSTER
1400
  REQ_BGL = False
1401

    
1402
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1403

    
1404
  class NodeImage(object):
1405
    """A class representing the logical and physical status of a node.
1406

1407
    @type uuid: string
1408
    @ivar uuid: the node UUID to which this object refers
1409
    @ivar volumes: a structure as returned from
1410
        L{ganeti.backend.GetVolumeList} (runtime)
1411
    @ivar instances: a list of running instances (runtime)
1412
    @ivar pinst: list of configured primary instances (config)
1413
    @ivar sinst: list of configured secondary instances (config)
1414
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1415
        instances for which this node is secondary (config)
1416
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1417
    @ivar dfree: free disk, as reported by the node (runtime)
1418
    @ivar offline: the offline status (config)
1419
    @type rpc_fail: boolean
1420
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1421
        not whether the individual keys were correct) (runtime)
1422
    @type lvm_fail: boolean
1423
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1424
    @type hyp_fail: boolean
1425
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1426
    @type ghost: boolean
1427
    @ivar ghost: whether this is a known node or not (config)
1428
    @type os_fail: boolean
1429
    @ivar os_fail: whether the RPC call didn't return valid OS data
1430
    @type oslist: list
1431
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1432
    @type vm_capable: boolean
1433
    @ivar vm_capable: whether the node can host instances
1434
    @type pv_min: float
1435
    @ivar pv_min: size in MiB of the smallest PVs
1436
    @type pv_max: float
1437
    @ivar pv_max: size in MiB of the biggest PVs
1438

1439
    """
1440
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1441
      self.uuid = uuid
1442
      self.volumes = {}
1443
      self.instances = []
1444
      self.pinst = []
1445
      self.sinst = []
1446
      self.sbp = {}
1447
      self.mfree = 0
1448
      self.dfree = 0
1449
      self.offline = offline
1450
      self.vm_capable = vm_capable
1451
      self.rpc_fail = False
1452
      self.lvm_fail = False
1453
      self.hyp_fail = False
1454
      self.ghost = False
1455
      self.os_fail = False
1456
      self.oslist = {}
1457
      self.pv_min = None
1458
      self.pv_max = None
1459

    
1460
  def ExpandNames(self):
1461
    # This raises errors.OpPrereqError on its own:
1462
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1463

    
1464
    # Get instances in node group; this is unsafe and needs verification later
1465
    inst_names = \
1466
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1467

    
1468
    self.needed_locks = {
1469
      locking.LEVEL_INSTANCE: inst_names,
1470
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1471
      locking.LEVEL_NODE: [],
1472

    
1473
      # This opcode is run by watcher every five minutes and acquires all nodes
1474
      # for a group. It doesn't run for a long time, so it's better to acquire
1475
      # the node allocation lock as well.
1476
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1477
      }
1478

    
1479
    self.share_locks = ShareAll()
1480

    
1481
  def DeclareLocks(self, level):
1482
    if level == locking.LEVEL_NODE:
1483
      # Get members of node group; this is unsafe and needs verification later
1484
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1485

    
1486
      all_inst_info = self.cfg.GetAllInstancesInfo()
1487

    
1488
      # In Exec(), we warn about mirrored instances that have primary and
1489
      # secondary living in separate node groups. To fully verify that
1490
      # volumes for these instances are healthy, we will need to do an
1491
      # extra call to their secondaries. We ensure here those nodes will
1492
      # be locked.
1493
      for inst in self.owned_locks(locking.LEVEL_INSTANCE):
1494
        # Important: access only the instances whose lock is owned
1495
        if all_inst_info[inst].disk_template in constants.DTS_INT_MIRROR:
1496
          nodes.update(all_inst_info[inst].secondary_nodes)
1497

    
1498
      self.needed_locks[locking.LEVEL_NODE] = nodes
1499

    
1500
  def CheckPrereq(self):
1501
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1502
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1503

    
1504
    group_node_uuids = set(self.group_info.members)
1505
    group_instances = \
1506
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1507

    
1508
    unlocked_node_uuids = \
1509
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1510

    
1511
    unlocked_instances = \
1512
        group_instances.difference(self.owned_locks(locking.LEVEL_INSTANCE))
1513

    
1514
    if unlocked_node_uuids:
1515
      raise errors.OpPrereqError(
1516
        "Missing lock for nodes: %s" %
1517
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1518
        errors.ECODE_STATE)
1519

    
1520
    if unlocked_instances:
1521
      raise errors.OpPrereqError("Missing lock for instances: %s" %
1522
                                 utils.CommaJoin(unlocked_instances),
1523
                                 errors.ECODE_STATE)
1524

    
1525
    self.all_node_info = self.cfg.GetAllNodesInfo()
1526
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1527

    
1528
    self.my_node_uuids = group_node_uuids
1529
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1530
                             for node_uuid in group_node_uuids)
1531

    
1532
    self.my_inst_names = utils.NiceSort(group_instances)
1533
    self.my_inst_info = dict((name, self.all_inst_info[name])
1534
                             for name in self.my_inst_names)
1535

    
1536
    # We detect here the nodes that will need the extra RPC calls for verifying
1537
    # split LV volumes; they should be locked.
1538
    extra_lv_nodes = set()
1539

    
1540
    for inst in self.my_inst_info.values():
1541
      if inst.disk_template in constants.DTS_INT_MIRROR:
1542
        for nuuid in inst.all_nodes:
1543
          if self.all_node_info[nuuid].group != self.group_uuid:
1544
            extra_lv_nodes.add(nuuid)
1545

    
1546
    unlocked_lv_nodes = \
1547
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1548

    
1549
    if unlocked_lv_nodes:
1550
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1551
                                 utils.CommaJoin(unlocked_lv_nodes),
1552
                                 errors.ECODE_STATE)
1553
    self.extra_lv_nodes = list(extra_lv_nodes)
1554

    
1555
  def _VerifyNode(self, ninfo, nresult):
1556
    """Perform some basic validation on data returned from a node.
1557

1558
      - check the result data structure is well formed and has all the
1559
        mandatory fields
1560
      - check ganeti version
1561

1562
    @type ninfo: L{objects.Node}
1563
    @param ninfo: the node to check
1564
    @param nresult: the results from the node
1565
    @rtype: boolean
1566
    @return: whether overall this call was successful (and we can expect
1567
         reasonable values in the respose)
1568

1569
    """
1570
    # main result, nresult should be a non-empty dict
1571
    test = not nresult or not isinstance(nresult, dict)
1572
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1573
                  "unable to verify node: no data returned")
1574
    if test:
1575
      return False
1576

    
1577
    # compares ganeti version
1578
    local_version = constants.PROTOCOL_VERSION
1579
    remote_version = nresult.get("version", None)
1580
    test = not (remote_version and
1581
                isinstance(remote_version, (list, tuple)) and
1582
                len(remote_version) == 2)
1583
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1584
                  "connection to node returned invalid data")
1585
    if test:
1586
      return False
1587

    
1588
    test = local_version != remote_version[0]
1589
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1590
                  "incompatible protocol versions: master %s,"
1591
                  " node %s", local_version, remote_version[0])
1592
    if test:
1593
      return False
1594

    
1595
    # node seems compatible, we can actually try to look into its results
1596

    
1597
    # full package version
1598
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1599
                  constants.CV_ENODEVERSION, ninfo.name,
1600
                  "software version mismatch: master %s, node %s",
1601
                  constants.RELEASE_VERSION, remote_version[1],
1602
                  code=self.ETYPE_WARNING)
1603

    
1604
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1605
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1606
      for hv_name, hv_result in hyp_result.iteritems():
1607
        test = hv_result is not None
1608
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1609
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1610

    
1611
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1612
    if ninfo.vm_capable and isinstance(hvp_result, list):
1613
      for item, hv_name, hv_result in hvp_result:
1614
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1615
                      "hypervisor %s parameter verify failure (source %s): %s",
1616
                      hv_name, item, hv_result)
1617

    
1618
    test = nresult.get(constants.NV_NODESETUP,
1619
                       ["Missing NODESETUP results"])
1620
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1621
                  "node setup error: %s", "; ".join(test))
1622

    
1623
    return True
1624

    
1625
  def _VerifyNodeTime(self, ninfo, nresult,
1626
                      nvinfo_starttime, nvinfo_endtime):
1627
    """Check the node time.
1628

1629
    @type ninfo: L{objects.Node}
1630
    @param ninfo: the node to check
1631
    @param nresult: the remote results for the node
1632
    @param nvinfo_starttime: the start time of the RPC call
1633
    @param nvinfo_endtime: the end time of the RPC call
1634

1635
    """
1636
    ntime = nresult.get(constants.NV_TIME, None)
1637
    try:
1638
      ntime_merged = utils.MergeTime(ntime)
1639
    except (ValueError, TypeError):
1640
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1641
                    "Node returned invalid time")
1642
      return
1643

    
1644
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1645
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1646
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1647
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1648
    else:
1649
      ntime_diff = None
1650

    
1651
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1652
                  "Node time diverges by at least %s from master node time",
1653
                  ntime_diff)
1654

    
1655
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1656
    """Check the node LVM results and update info for cross-node checks.
1657

1658
    @type ninfo: L{objects.Node}
1659
    @param ninfo: the node to check
1660
    @param nresult: the remote results for the node
1661
    @param vg_name: the configured VG name
1662
    @type nimg: L{NodeImage}
1663
    @param nimg: node image
1664

1665
    """
1666
    if vg_name is None:
1667
      return
1668

    
1669
    # checks vg existence and size > 20G
1670
    vglist = nresult.get(constants.NV_VGLIST, None)
1671
    test = not vglist
1672
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1673
                  "unable to check volume groups")
1674
    if not test:
1675
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1676
                                            constants.MIN_VG_SIZE)
1677
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1678

    
1679
    # Check PVs
1680
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1681
    for em in errmsgs:
1682
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1683
    if pvminmax is not None:
1684
      (nimg.pv_min, nimg.pv_max) = pvminmax
1685

    
1686
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1687
    """Check cross-node DRBD version consistency.
1688

1689
    @type node_verify_infos: dict
1690
    @param node_verify_infos: infos about nodes as returned from the
1691
      node_verify call.
1692

1693
    """
1694
    node_versions = {}
1695
    for node_uuid, ndata in node_verify_infos.items():
1696
      nresult = ndata.payload
1697
      version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1698
      node_versions[node_uuid] = version
1699

    
1700
    if len(set(node_versions.values())) > 1:
1701
      for node_uuid, version in sorted(node_versions.items()):
1702
        msg = "DRBD version mismatch: %s" % version
1703
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1704
                    code=self.ETYPE_WARNING)
1705

    
1706
  def _VerifyGroupLVM(self, node_image, vg_name):
1707
    """Check cross-node consistency in LVM.
1708

1709
    @type node_image: dict
1710
    @param node_image: info about nodes, mapping from node to names to
1711
      L{NodeImage} objects
1712
    @param vg_name: the configured VG name
1713

1714
    """
1715
    if vg_name is None:
1716
      return
1717

    
1718
    # Only exlcusive storage needs this kind of checks
1719
    if not self._exclusive_storage:
1720
      return
1721

    
1722
    # exclusive_storage wants all PVs to have the same size (approximately),
1723
    # if the smallest and the biggest ones are okay, everything is fine.
1724
    # pv_min is None iff pv_max is None
1725
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1726
    if not vals:
1727
      return
1728
    (pvmin, minnode) = min((ni.pv_min, ni.name) for ni in vals)
1729
    (pvmax, maxnode) = max((ni.pv_max, ni.name) for ni in vals)
1730
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1731
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1732
                  "PV sizes differ too much in the group; smallest (%s MB) is"
1733
                  " on %s, biggest (%s MB) is on %s",
1734
                  pvmin, minnode, pvmax, maxnode)
1735

    
1736
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1737
    """Check the node bridges.
1738

1739
    @type ninfo: L{objects.Node}
1740
    @param ninfo: the node to check
1741
    @param nresult: the remote results for the node
1742
    @param bridges: the expected list of bridges
1743

1744
    """
1745
    if not bridges:
1746
      return
1747

    
1748
    missing = nresult.get(constants.NV_BRIDGES, None)
1749
    test = not isinstance(missing, list)
1750
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1751
                  "did not return valid bridge information")
1752
    if not test:
1753
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1754
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1755

    
1756
  def _VerifyNodeUserScripts(self, ninfo, nresult):
1757
    """Check the results of user scripts presence and executability on the node
1758

1759
    @type ninfo: L{objects.Node}
1760
    @param ninfo: the node to check
1761
    @param nresult: the remote results for the node
1762

1763
    """
1764
    test = not constants.NV_USERSCRIPTS in nresult
1765
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1766
                  "did not return user scripts information")
1767

    
1768
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1769
    if not test:
1770
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1771
                    "user scripts not present or not executable: %s" %
1772
                    utils.CommaJoin(sorted(broken_scripts)))
1773

    
1774
  def _VerifyNodeNetwork(self, ninfo, nresult):
1775
    """Check the node network connectivity results.
1776

1777
    @type ninfo: L{objects.Node}
1778
    @param ninfo: the node to check
1779
    @param nresult: the remote results for the node
1780

1781
    """
1782
    test = constants.NV_NODELIST not in nresult
1783
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1784
                  "node hasn't returned node ssh connectivity data")
1785
    if not test:
1786
      if nresult[constants.NV_NODELIST]:
1787
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1788
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1789
                        "ssh communication with node '%s': %s", a_node, a_msg)
1790

    
1791
    test = constants.NV_NODENETTEST not in nresult
1792
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1793
                  "node hasn't returned node tcp connectivity data")
1794
    if not test:
1795
      if nresult[constants.NV_NODENETTEST]:
1796
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1797
        for anode in nlist:
1798
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1799
                        "tcp communication with node '%s': %s",
1800
                        anode, nresult[constants.NV_NODENETTEST][anode])
1801

    
1802
    test = constants.NV_MASTERIP not in nresult
1803
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1804
                  "node hasn't returned node master IP reachability data")
1805
    if not test:
1806
      if not nresult[constants.NV_MASTERIP]:
1807
        if ninfo.uuid == self.master_node:
1808
          msg = "the master node cannot reach the master IP (not configured?)"
1809
        else:
1810
          msg = "cannot reach the master IP"
1811
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1812

    
1813
  def _VerifyInstance(self, instance, inst_config, node_image,
1814
                      diskstatus):
1815
    """Verify an instance.
1816

1817
    This function checks to see if the required block devices are
1818
    available on the instance's node, and that the nodes are in the correct
1819
    state.
1820

1821
    """
1822
    pnode = inst_config.primary_node
1823
    pnode_img = node_image[pnode]
1824
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
1825

    
1826
    node_vol_should = {}
1827
    inst_config.MapLVsByNode(node_vol_should)
1828

    
1829
    cluster = self.cfg.GetClusterInfo()
1830
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1831
                                                            self.group_info)
1832
    err = ComputeIPolicyInstanceViolation(ipolicy, inst_config, self.cfg)
1833
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance,
1834
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
1835

    
1836
    for node in node_vol_should:
1837
      n_img = node_image[node]
1838
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1839
        # ignore missing volumes on offline or broken nodes
1840
        continue
1841
      for volume in node_vol_should[node]:
1842
        test = volume not in n_img.volumes
1843
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance,
1844
                      "volume %s missing on node %s", volume,
1845
                      self.cfg.GetNodeName(node))
1846

    
1847
    if inst_config.admin_state == constants.ADMINST_UP:
1848
      test = instance not in pnode_img.instances and not pnode_img.offline
1849
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance,
1850
                    "instance not running on its primary node %s",
1851
                     self.cfg.GetNodeName(pnode))
1852
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE, instance,
1853
                    "instance is marked as running and lives on"
1854
                    " offline node %s", self.cfg.GetNodeName(pnode))
1855

    
1856
    diskdata = [(nname, success, status, idx)
1857
                for (nname, disks) in diskstatus.items()
1858
                for idx, (success, status) in enumerate(disks)]
1859

    
1860
    for nname, success, bdev_status, idx in diskdata:
1861
      # the 'ghost node' construction in Exec() ensures that we have a
1862
      # node here
1863
      snode = node_image[nname]
1864
      bad_snode = snode.ghost or snode.offline
1865
      self._ErrorIf(inst_config.disks_active and
1866
                    not success and not bad_snode,
1867
                    constants.CV_EINSTANCEFAULTYDISK, instance,
1868
                    "couldn't retrieve status for disk/%s on %s: %s",
1869
                    idx, self.cfg.GetNodeName(nname), bdev_status)
1870
      self._ErrorIf((inst_config.disks_active and
1871
                     success and
1872
                     bdev_status.ldisk_status == constants.LDS_FAULTY),
1873
                    constants.CV_EINSTANCEFAULTYDISK, instance,
1874
                    "disk/%s on %s is faulty", idx, self.cfg.GetNodeName(nname))
1875

    
1876
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1877
                  constants.CV_ENODERPC, pnode, "instance %s, connection to"
1878
                  " primary node failed", instance)
1879

    
1880
    self._ErrorIf(len(inst_config.secondary_nodes) > 1,
1881
                  constants.CV_EINSTANCELAYOUT,
1882
                  instance, "instance has multiple secondary nodes: %s",
1883
                  utils.CommaJoin(inst_config.secondary_nodes),
1884
                  code=self.ETYPE_WARNING)
1885

    
1886
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
1887
                                               inst_config.all_nodes)
1888
    if any(es_flags.values()):
1889
      if inst_config.disk_template not in constants.DTS_EXCL_STORAGE:
1890
        # Disk template not compatible with exclusive_storage: no instance
1891
        # node should have the flag set
1892
        es_nodes = [n
1893
                    for (n, es) in es_flags.items()
1894
                    if es]
1895
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance,
1896
                    "instance has template %s, which is not supported on nodes"
1897
                    " that have exclusive storage set: %s",
1898
                    inst_config.disk_template,
1899
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
1900
      for (idx, disk) in enumerate(inst_config.disks):
1901
        self._ErrorIf(disk.spindles is None,
1902
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance,
1903
                      "number of spindles not configured for disk %s while"
1904
                      " exclusive storage is enabled, try running"
1905
                      " gnt-cluster repair-disk-sizes", idx)
1906

    
1907
    if inst_config.disk_template in constants.DTS_INT_MIRROR:
1908
      instance_nodes = utils.NiceSort(inst_config.all_nodes)
1909
      instance_groups = {}
1910

    
1911
      for node in instance_nodes:
1912
        instance_groups.setdefault(self.all_node_info[node].group,
1913
                                   []).append(node)
1914

    
1915
      pretty_list = [
1916
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
1917
                           groupinfo[group].name)
1918
        # Sort so that we always list the primary node first.
1919
        for group, nodes in sorted(instance_groups.items(),
1920
                                   key=lambda (_, nodes): pnode in nodes,
1921
                                   reverse=True)]
1922

    
1923
      self._ErrorIf(len(instance_groups) > 1,
1924
                    constants.CV_EINSTANCESPLITGROUPS,
1925
                    instance, "instance has primary and secondary nodes in"
1926
                    " different groups: %s", utils.CommaJoin(pretty_list),
1927
                    code=self.ETYPE_WARNING)
1928

    
1929
    inst_nodes_offline = []
1930
    for snode in inst_config.secondary_nodes:
1931
      s_img = node_image[snode]
1932
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1933
                    snode, "instance %s, connection to secondary node failed",
1934
                    instance)
1935

    
1936
      if s_img.offline:
1937
        inst_nodes_offline.append(snode)
1938

    
1939
    # warn that the instance lives on offline nodes
1940
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE, instance,
1941
                  "instance has offline secondary node(s) %s",
1942
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
1943
    # ... or ghost/non-vm_capable nodes
1944
    for node in inst_config.all_nodes:
1945
      self._ErrorIf(node_image[node].ghost, constants.CV_EINSTANCEBADNODE,
1946
                    instance, "instance lives on ghost node %s",
1947
                    self.cfg.GetNodeName(node))
1948
      self._ErrorIf(not node_image[node].vm_capable,
1949
                    constants.CV_EINSTANCEBADNODE, instance,
1950
                    "instance lives on non-vm_capable node %s",
1951
                    self.cfg.GetNodeName(node))
1952

    
1953
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1954
    """Verify if there are any unknown volumes in the cluster.
1955

1956
    The .os, .swap and backup volumes are ignored. All other volumes are
1957
    reported as unknown.
1958

1959
    @type reserved: L{ganeti.utils.FieldSet}
1960
    @param reserved: a FieldSet of reserved volume names
1961

1962
    """
1963
    for node_uuid, n_img in node_image.items():
1964
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1965
          self.all_node_info[node_uuid].group != self.group_uuid):
1966
        # skip non-healthy nodes
1967
        continue
1968
      for volume in n_img.volumes:
1969
        test = ((node_uuid not in node_vol_should or
1970
                volume not in node_vol_should[node_uuid]) and
1971
                not reserved.Matches(volume))
1972
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
1973
                      self.cfg.GetNodeName(node_uuid),
1974
                      "volume %s is unknown", volume)
1975

    
1976
  def _VerifyNPlusOneMemory(self, node_image, instance_cfg):
1977
    """Verify N+1 Memory Resilience.
1978

1979
    Check that if one single node dies we can still start all the
1980
    instances it was primary for.
1981

1982
    """
1983
    cluster_info = self.cfg.GetClusterInfo()
1984
    for node_uuid, n_img in node_image.items():
1985
      # This code checks that every node which is now listed as
1986
      # secondary has enough memory to host all instances it is
1987
      # supposed to should a single other node in the cluster fail.
1988
      # FIXME: not ready for failover to an arbitrary node
1989
      # FIXME: does not support file-backed instances
1990
      # WARNING: we currently take into account down instances as well
1991
      # as up ones, considering that even if they're down someone
1992
      # might want to start them even in the event of a node failure.
1993
      if n_img.offline or \
1994
         self.all_node_info[node_uuid].group != self.group_uuid:
1995
        # we're skipping nodes marked offline and nodes in other groups from
1996
        # the N+1 warning, since most likely we don't have good memory
1997
        # infromation from them; we already list instances living on such
1998
        # nodes, and that's enough warning
1999
        continue
2000
      #TODO(dynmem): also consider ballooning out other instances
2001
      for prinode, instances in n_img.sbp.items():
2002
        needed_mem = 0
2003
        for instance in instances:
2004
          bep = cluster_info.FillBE(instance_cfg[instance])
2005
          if bep[constants.BE_AUTO_BALANCE]:
2006
            needed_mem += bep[constants.BE_MINMEM]
2007
        test = n_img.mfree < needed_mem
2008
        self._ErrorIf(test, constants.CV_ENODEN1,
2009
                      self.cfg.GetNodeName(node_uuid),
2010
                      "not enough memory to accomodate instance failovers"
2011
                      " should node %s fail (%dMiB needed, %dMiB available)",
2012
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2013

    
2014
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2015
                   (files_all, files_opt, files_mc, files_vm)):
2016
    """Verifies file checksums collected from all nodes.
2017

2018
    @param nodes: List of L{objects.Node} objects
2019
    @param master_node_uuid: UUID of master node
2020
    @param all_nvinfo: RPC results
2021

2022
    """
2023
    # Define functions determining which nodes to consider for a file
2024
    files2nodefn = [
2025
      (files_all, None),
2026
      (files_mc, lambda node: (node.master_candidate or
2027
                               node.uuid == master_node_uuid)),
2028
      (files_vm, lambda node: node.vm_capable),
2029
      ]
2030

    
2031
    # Build mapping from filename to list of nodes which should have the file
2032
    nodefiles = {}
2033
    for (files, fn) in files2nodefn:
2034
      if fn is None:
2035
        filenodes = nodes
2036
      else:
2037
        filenodes = filter(fn, nodes)
2038
      nodefiles.update((filename,
2039
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2040
                       for filename in files)
2041

    
2042
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2043

    
2044
    fileinfo = dict((filename, {}) for filename in nodefiles)
2045
    ignore_nodes = set()
2046

    
2047
    for node in nodes:
2048
      if node.offline:
2049
        ignore_nodes.add(node.uuid)
2050
        continue
2051

    
2052
      nresult = all_nvinfo[node.uuid]
2053

    
2054
      if nresult.fail_msg or not nresult.payload:
2055
        node_files = None
2056
      else:
2057
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2058
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2059
                          for (key, value) in fingerprints.items())
2060
        del fingerprints
2061

    
2062
      test = not (node_files and isinstance(node_files, dict))
2063
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2064
                    "Node did not return file checksum data")
2065
      if test:
2066
        ignore_nodes.add(node.uuid)
2067
        continue
2068

    
2069
      # Build per-checksum mapping from filename to nodes having it
2070
      for (filename, checksum) in node_files.items():
2071
        assert filename in nodefiles
2072
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2073

    
2074
    for (filename, checksums) in fileinfo.items():
2075
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2076

    
2077
      # Nodes having the file
2078
      with_file = frozenset(node_uuid
2079
                            for node_uuids in fileinfo[filename].values()
2080
                            for node_uuid in node_uuids) - ignore_nodes
2081

    
2082
      expected_nodes = nodefiles[filename] - ignore_nodes
2083

    
2084
      # Nodes missing file
2085
      missing_file = expected_nodes - with_file
2086

    
2087
      if filename in files_opt:
2088
        # All or no nodes
2089
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2090
                      constants.CV_ECLUSTERFILECHECK, None,
2091
                      "File %s is optional, but it must exist on all or no"
2092
                      " nodes (not found on %s)",
2093
                      filename,
2094
                      utils.CommaJoin(
2095
                        utils.NiceSort(
2096
                          map(self.cfg.GetNodeName, missing_file))))
2097
      else:
2098
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2099
                      "File %s is missing from node(s) %s", filename,
2100
                      utils.CommaJoin(
2101
                        utils.NiceSort(
2102
                          map(self.cfg.GetNodeName, missing_file))))
2103

    
2104
        # Warn if a node has a file it shouldn't
2105
        unexpected = with_file - expected_nodes
2106
        self._ErrorIf(unexpected,
2107
                      constants.CV_ECLUSTERFILECHECK, None,
2108
                      "File %s should not exist on node(s) %s",
2109
                      filename, utils.CommaJoin(
2110
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2111

    
2112
      # See if there are multiple versions of the file
2113
      test = len(checksums) > 1
2114
      if test:
2115
        variants = ["variant %s on %s" %
2116
                    (idx + 1,
2117
                     utils.CommaJoin(utils.NiceSort(
2118
                       map(self.cfg.GetNodeName, node_uuids))))
2119
                    for (idx, (checksum, node_uuids)) in
2120
                      enumerate(sorted(checksums.items()))]
2121
      else:
2122
        variants = []
2123

    
2124
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2125
                    "File %s found with %s different checksums (%s)",
2126
                    filename, len(checksums), "; ".join(variants))
2127

    
2128
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2129
                      drbd_map):
2130
    """Verifies and the node DRBD status.
2131

2132
    @type ninfo: L{objects.Node}
2133
    @param ninfo: the node to check
2134
    @param nresult: the remote results for the node
2135
    @param instanceinfo: the dict of instances
2136
    @param drbd_helper: the configured DRBD usermode helper
2137
    @param drbd_map: the DRBD map as returned by
2138
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2139

2140
    """
2141
    if drbd_helper:
2142
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2143
      test = (helper_result is None)
2144
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2145
                    "no drbd usermode helper returned")
2146
      if helper_result:
2147
        status, payload = helper_result
2148
        test = not status
2149
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2150
                      "drbd usermode helper check unsuccessful: %s", payload)
2151
        test = status and (payload != drbd_helper)
2152
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2153
                      "wrong drbd usermode helper: %s", payload)
2154

    
2155
    # compute the DRBD minors
2156
    node_drbd = {}
2157
    for minor, instance in drbd_map[ninfo.uuid].items():
2158
      test = instance not in instanceinfo
2159
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2160
                    "ghost instance '%s' in temporary DRBD map", instance)
2161
        # ghost instance should not be running, but otherwise we
2162
        # don't give double warnings (both ghost instance and
2163
        # unallocated minor in use)
2164
      if test:
2165
        node_drbd[minor] = (instance, False)
2166
      else:
2167
        instance = instanceinfo[instance]
2168
        node_drbd[minor] = (instance.name, instance.disks_active)
2169

    
2170
    # and now check them
2171
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2172
    test = not isinstance(used_minors, (tuple, list))
2173
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2174
                  "cannot parse drbd status file: %s", str(used_minors))
2175
    if test:
2176
      # we cannot check drbd status
2177
      return
2178

    
2179
    for minor, (iname, must_exist) in node_drbd.items():
2180
      test = minor not in used_minors and must_exist
2181
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2182
                    "drbd minor %d of instance %s is not active", minor, iname)
2183
    for minor in used_minors:
2184
      test = minor not in node_drbd
2185
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2186
                    "unallocated drbd minor %d is in use", minor)
2187

    
2188
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2189
    """Builds the node OS structures.
2190

2191
    @type ninfo: L{objects.Node}
2192
    @param ninfo: the node to check
2193
    @param nresult: the remote results for the node
2194
    @param nimg: the node image object
2195

2196
    """
2197
    remote_os = nresult.get(constants.NV_OSLIST, None)
2198
    test = (not isinstance(remote_os, list) or
2199
            not compat.all(isinstance(v, list) and len(v) == 7
2200
                           for v in remote_os))
2201

    
2202
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2203
                  "node hasn't returned valid OS data")
2204

    
2205
    nimg.os_fail = test
2206

    
2207
    if test:
2208
      return
2209

    
2210
    os_dict = {}
2211

    
2212
    for (name, os_path, status, diagnose,
2213
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2214

    
2215
      if name not in os_dict:
2216
        os_dict[name] = []
2217

    
2218
      # parameters is a list of lists instead of list of tuples due to
2219
      # JSON lacking a real tuple type, fix it:
2220
      parameters = [tuple(v) for v in parameters]
2221
      os_dict[name].append((os_path, status, diagnose,
2222
                            set(variants), set(parameters), set(api_ver)))
2223

    
2224
    nimg.oslist = os_dict
2225

    
2226
  def _VerifyNodeOS(self, ninfo, nimg, base):
2227
    """Verifies the node OS list.
2228

2229
    @type ninfo: L{objects.Node}
2230
    @param ninfo: the node to check
2231
    @param nimg: the node image object
2232
    @param base: the 'template' node we match against (e.g. from the master)
2233

2234
    """
2235
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2236

    
2237
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2238
    for os_name, os_data in nimg.oslist.items():
2239
      assert os_data, "Empty OS status for OS %s?!" % os_name
2240
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2241
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2242
                    "Invalid OS %s (located at %s): %s",
2243
                    os_name, f_path, f_diag)
2244
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2245
                    "OS '%s' has multiple entries"
2246
                    " (first one shadows the rest): %s",
2247
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2248
      # comparisons with the 'base' image
2249
      test = os_name not in base.oslist
2250
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2251
                    "Extra OS %s not present on reference node (%s)",
2252
                    os_name, self.cfg.GetNodeName(base.uuid))
2253
      if test:
2254
        continue
2255
      assert base.oslist[os_name], "Base node has empty OS status?"
2256
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2257
      if not b_status:
2258
        # base OS is invalid, skipping
2259
        continue
2260
      for kind, a, b in [("API version", f_api, b_api),
2261
                         ("variants list", f_var, b_var),
2262
                         ("parameters", beautify_params(f_param),
2263
                          beautify_params(b_param))]:
2264
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2265
                      "OS %s for %s differs from reference node %s:"
2266
                      " [%s] vs. [%s]", kind, os_name,
2267
                      self.cfg.GetNodeName(base.uuid),
2268
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2269

    
2270
    # check any missing OSes
2271
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2272
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2273
                  "OSes present on reference node %s"
2274
                  " but missing on this node: %s",
2275
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2276

    
2277
  def _VerifyFileStoragePaths(self, ninfo, nresult, is_master):
2278
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2279

2280
    @type ninfo: L{objects.Node}
2281
    @param ninfo: the node to check
2282
    @param nresult: the remote results for the node
2283
    @type is_master: bool
2284
    @param is_master: Whether node is the master node
2285

2286
    """
2287
    if (is_master and
2288
        (constants.ENABLE_FILE_STORAGE or
2289
         constants.ENABLE_SHARED_FILE_STORAGE)):
2290
      try:
2291
        fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2292
      except KeyError:
2293
        # This should never happen
2294
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2295
                      "Node did not return forbidden file storage paths")
2296
      else:
2297
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2298
                      "Found forbidden file storage paths: %s",
2299
                      utils.CommaJoin(fspaths))
2300
    else:
2301
      self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2302
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2303
                    "Node should not have returned forbidden file storage"
2304
                    " paths")
2305

    
2306
  def _VerifyOob(self, ninfo, nresult):
2307
    """Verifies out of band functionality of a node.
2308

2309
    @type ninfo: L{objects.Node}
2310
    @param ninfo: the node to check
2311
    @param nresult: the remote results for the node
2312

2313
    """
2314
    # We just have to verify the paths on master and/or master candidates
2315
    # as the oob helper is invoked on the master
2316
    if ((ninfo.master_candidate or ninfo.master_capable) and
2317
        constants.NV_OOB_PATHS in nresult):
2318
      for path_result in nresult[constants.NV_OOB_PATHS]:
2319
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2320
                      ninfo.name, path_result)
2321

    
2322
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2323
    """Verifies and updates the node volume data.
2324

2325
    This function will update a L{NodeImage}'s internal structures
2326
    with data from the remote call.
2327

2328
    @type ninfo: L{objects.Node}
2329
    @param ninfo: the node to check
2330
    @param nresult: the remote results for the node
2331
    @param nimg: the node image object
2332
    @param vg_name: the configured VG name
2333

2334
    """
2335
    nimg.lvm_fail = True
2336
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2337
    if vg_name is None:
2338
      pass
2339
    elif isinstance(lvdata, basestring):
2340
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2341
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2342
    elif not isinstance(lvdata, dict):
2343
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2344
                    "rpc call to node failed (lvlist)")
2345
    else:
2346
      nimg.volumes = lvdata
2347
      nimg.lvm_fail = False
2348

    
2349
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2350
    """Verifies and updates the node instance list.
2351

2352
    If the listing was successful, then updates this node's instance
2353
    list. Otherwise, it marks the RPC call as failed for the instance
2354
    list key.
2355

2356
    @type ninfo: L{objects.Node}
2357
    @param ninfo: the node to check
2358
    @param nresult: the remote results for the node
2359
    @param nimg: the node image object
2360

2361
    """
2362
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2363
    test = not isinstance(idata, list)
2364
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2365
                  "rpc call to node failed (instancelist): %s",
2366
                  utils.SafeEncode(str(idata)))
2367
    if test:
2368
      nimg.hyp_fail = True
2369
    else:
2370
      nimg.instances = idata
2371

    
2372
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2373
    """Verifies and computes a node information map
2374

2375
    @type ninfo: L{objects.Node}
2376
    @param ninfo: the node to check
2377
    @param nresult: the remote results for the node
2378
    @param nimg: the node image object
2379
    @param vg_name: the configured VG name
2380

2381
    """
2382
    # try to read free memory (from the hypervisor)
2383
    hv_info = nresult.get(constants.NV_HVINFO, None)
2384
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2385
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2386
                  "rpc call to node failed (hvinfo)")
2387
    if not test:
2388
      try:
2389
        nimg.mfree = int(hv_info["memory_free"])
2390
      except (ValueError, TypeError):
2391
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2392
                      "node returned invalid nodeinfo, check hypervisor")
2393

    
2394
    # FIXME: devise a free space model for file based instances as well
2395
    if vg_name is not None:
2396
      test = (constants.NV_VGLIST not in nresult or
2397
              vg_name not in nresult[constants.NV_VGLIST])
2398
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2399
                    "node didn't return data for the volume group '%s'"
2400
                    " - it is either missing or broken", vg_name)
2401
      if not test:
2402
        try:
2403
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2404
        except (ValueError, TypeError):
2405
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2406
                        "node returned invalid LVM info, check LVM status")
2407

    
2408
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2409
    """Gets per-disk status information for all instances.
2410

2411
    @type node_uuids: list of strings
2412
    @param node_uuids: Node UUIDs
2413
    @type node_image: dict of (name, L{objects.Node})
2414
    @param node_image: Node objects
2415
    @type instanceinfo: dict of (name, L{objects.Instance})
2416
    @param instanceinfo: Instance objects
2417
    @rtype: {instance: {node: [(succes, payload)]}}
2418
    @return: a dictionary of per-instance dictionaries with nodes as
2419
        keys and disk information as values; the disk information is a
2420
        list of tuples (success, payload)
2421

2422
    """
2423
    node_disks = {}
2424
    node_disks_devonly = {}
2425
    diskless_instances = set()
2426
    diskless = constants.DT_DISKLESS
2427

    
2428
    for nuuid in node_uuids:
2429
      node_instances = list(itertools.chain(node_image[nuuid].pinst,
2430
                                            node_image[nuuid].sinst))
2431
      diskless_instances.update(inst for inst in node_instances
2432
                                if instanceinfo[inst].disk_template == diskless)
2433
      disks = [(inst, disk)
2434
               for inst in node_instances
2435
               for disk in instanceinfo[inst].disks]
2436

    
2437
      if not disks:
2438
        # No need to collect data
2439
        continue
2440

    
2441
      node_disks[nuuid] = disks
2442

    
2443
      # _AnnotateDiskParams makes already copies of the disks
2444
      devonly = []
2445
      for (inst, dev) in disks:
2446
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst], [dev], self.cfg)
2447
        self.cfg.SetDiskID(anno_disk, nuuid)
2448
        devonly.append(anno_disk)
2449

    
2450
      node_disks_devonly[nuuid] = devonly
2451

    
2452
    assert len(node_disks) == len(node_disks_devonly)
2453

    
2454
    # Collect data from all nodes with disks
2455
    result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2456
                                                          node_disks_devonly)
2457

    
2458
    assert len(result) == len(node_disks)
2459

    
2460
    instdisk = {}
2461

    
2462
    for (nuuid, nres) in result.items():
2463
      node = self.cfg.GetNodeInfo(nuuid)
2464
      disks = node_disks[node.uuid]
2465

    
2466
      if nres.offline:
2467
        # No data from this node
2468
        data = len(disks) * [(False, "node offline")]
2469
      else:
2470
        msg = nres.fail_msg
2471
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2472
                      "while getting disk information: %s", msg)
2473
        if msg:
2474
          # No data from this node
2475
          data = len(disks) * [(False, msg)]
2476
        else:
2477
          data = []
2478
          for idx, i in enumerate(nres.payload):
2479
            if isinstance(i, (tuple, list)) and len(i) == 2:
2480
              data.append(i)
2481
            else:
2482
              logging.warning("Invalid result from node %s, entry %d: %s",
2483
                              node.name, idx, i)
2484
              data.append((False, "Invalid result from the remote node"))
2485

    
2486
      for ((inst, _), status) in zip(disks, data):
2487
        instdisk.setdefault(inst, {}).setdefault(node.uuid, []).append(status)
2488

    
2489
    # Add empty entries for diskless instances.
2490
    for inst in diskless_instances:
2491
      assert inst not in instdisk
2492
      instdisk[inst] = {}
2493

    
2494
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2495
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2496
                      compat.all(isinstance(s, (tuple, list)) and
2497
                                 len(s) == 2 for s in statuses)
2498
                      for inst, nuuids in instdisk.items()
2499
                      for nuuid, statuses in nuuids.items())
2500
    if __debug__:
2501
      instdisk_keys = set(instdisk)
2502
      instanceinfo_keys = set(instanceinfo)
2503
      assert instdisk_keys == instanceinfo_keys, \
2504
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2505
         (instdisk_keys, instanceinfo_keys))
2506

    
2507
    return instdisk
2508

    
2509
  @staticmethod
2510
  def _SshNodeSelector(group_uuid, all_nodes):
2511
    """Create endless iterators for all potential SSH check hosts.
2512

2513
    """
2514
    nodes = [node for node in all_nodes
2515
             if (node.group != group_uuid and
2516
                 not node.offline)]
2517
    keyfunc = operator.attrgetter("group")
2518

    
2519
    return map(itertools.cycle,
2520
               [sorted(map(operator.attrgetter("name"), names))
2521
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2522
                                                  keyfunc)])
2523

    
2524
  @classmethod
2525
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2526
    """Choose which nodes should talk to which other nodes.
2527

2528
    We will make nodes contact all nodes in their group, and one node from
2529
    every other group.
2530

2531
    @warning: This algorithm has a known issue if one node group is much
2532
      smaller than others (e.g. just one node). In such a case all other
2533
      nodes will talk to the single node.
2534

2535
    """
2536
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2537
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2538

    
2539
    return (online_nodes,
2540
            dict((name, sorted([i.next() for i in sel]))
2541
                 for name in online_nodes))
2542

    
2543
  def BuildHooksEnv(self):
2544
    """Build hooks env.
2545

2546
    Cluster-Verify hooks just ran in the post phase and their failure makes
2547
    the output be logged in the verify output and the verification to fail.
2548

2549
    """
2550
    env = {
2551
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2552
      }
2553

    
2554
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2555
               for node in self.my_node_info.values())
2556

    
2557
    return env
2558

    
2559
  def BuildHooksNodes(self):
2560
    """Build hooks nodes.
2561

2562
    """
2563
    return ([], list(self.my_node_info.keys()))
2564

    
2565
  def Exec(self, feedback_fn):
2566
    """Verify integrity of the node group, performing various test on nodes.
2567

2568
    """
2569
    # This method has too many local variables. pylint: disable=R0914
2570
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2571

    
2572
    if not self.my_node_uuids:
2573
      # empty node group
2574
      feedback_fn("* Empty node group, skipping verification")
2575
      return True
2576

    
2577
    self.bad = False
2578
    verbose = self.op.verbose
2579
    self._feedback_fn = feedback_fn
2580

    
2581
    vg_name = self.cfg.GetVGName()
2582
    drbd_helper = self.cfg.GetDRBDHelper()
2583
    cluster = self.cfg.GetClusterInfo()
2584
    hypervisors = cluster.enabled_hypervisors
2585
    node_data_list = self.my_node_info.values()
2586

    
2587
    i_non_redundant = [] # Non redundant instances
2588
    i_non_a_balanced = [] # Non auto-balanced instances
2589
    i_offline = 0 # Count of offline instances
2590
    n_offline = 0 # Count of offline nodes
2591
    n_drained = 0 # Count of nodes being drained
2592
    node_vol_should = {}
2593

    
2594
    # FIXME: verify OS list
2595

    
2596
    # File verification
2597
    filemap = ComputeAncillaryFiles(cluster, False)
2598

    
2599
    # do local checksums
2600
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2601
    master_ip = self.cfg.GetMasterIP()
2602

    
2603
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2604

    
2605
    user_scripts = []
2606
    if self.cfg.GetUseExternalMipScript():
2607
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2608

    
2609
    node_verify_param = {
2610
      constants.NV_FILELIST:
2611
        map(vcluster.MakeVirtualPath,
2612
            utils.UniqueSequence(filename
2613
                                 for files in filemap
2614
                                 for filename in files)),
2615
      constants.NV_NODELIST:
2616
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2617
                                  self.all_node_info.values()),
2618
      constants.NV_HYPERVISOR: hypervisors,
2619
      constants.NV_HVPARAMS:
2620
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2621
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2622
                                 for node in node_data_list
2623
                                 if not node.offline],
2624
      constants.NV_INSTANCELIST: hypervisors,
2625
      constants.NV_VERSION: None,
2626
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2627
      constants.NV_NODESETUP: None,
2628
      constants.NV_TIME: None,
2629
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2630
      constants.NV_OSLIST: None,
2631
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2632
      constants.NV_USERSCRIPTS: user_scripts,
2633
      }
2634

    
2635
    if vg_name is not None:
2636
      node_verify_param[constants.NV_VGLIST] = None
2637
      node_verify_param[constants.NV_LVLIST] = vg_name
2638
      node_verify_param[constants.NV_PVLIST] = [vg_name]
2639

    
2640
    if drbd_helper:
2641
      node_verify_param[constants.NV_DRBDVERSION] = None
2642
      node_verify_param[constants.NV_DRBDLIST] = None
2643
      node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2644

    
2645
    if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2646
      # Load file storage paths only from master node
2647
      node_verify_param[constants.NV_FILE_STORAGE_PATHS] = \
2648
        self.cfg.GetMasterNodeName()
2649

    
2650
    # bridge checks
2651
    # FIXME: this needs to be changed per node-group, not cluster-wide
2652
    bridges = set()
2653
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2654
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2655
      bridges.add(default_nicpp[constants.NIC_LINK])
2656
    for instance in self.my_inst_info.values():
2657
      for nic in instance.nics:
2658
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
2659
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2660
          bridges.add(full_nic[constants.NIC_LINK])
2661

    
2662
    if bridges:
2663
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
2664

    
2665
    # Build our expected cluster state
2666
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2667
                                                 uuid=node.uuid,
2668
                                                 vm_capable=node.vm_capable))
2669
                      for node in node_data_list)
2670

    
2671
    # Gather OOB paths
2672
    oob_paths = []
2673
    for node in self.all_node_info.values():
2674
      path = SupportsOob(self.cfg, node)
2675
      if path and path not in oob_paths:
2676
        oob_paths.append(path)
2677

    
2678
    if oob_paths:
2679
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2680

    
2681
    for instance in self.my_inst_names:
2682
      inst_config = self.my_inst_info[instance]
2683
      if inst_config.admin_state == constants.ADMINST_OFFLINE:
2684
        i_offline += 1
2685

    
2686
      for nuuid in inst_config.all_nodes:
2687
        if nuuid not in node_image:
2688
          gnode = self.NodeImage(uuid=nuuid)
2689
          gnode.ghost = (nuuid not in self.all_node_info)
2690
          node_image[nuuid] = gnode
2691

    
2692
      inst_config.MapLVsByNode(node_vol_should)
2693

    
2694
      pnode = inst_config.primary_node
2695
      node_image[pnode].pinst.append(instance)
2696

    
2697
      for snode in inst_config.secondary_nodes:
2698
        nimg = node_image[snode]
2699
        nimg.sinst.append(instance)
2700
        if pnode not in nimg.sbp:
2701
          nimg.sbp[pnode] = []
2702
        nimg.sbp[pnode].append(instance)
2703

    
2704
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2705
                                               self.my_node_info.keys())
2706
    # The value of exclusive_storage should be the same across the group, so if
2707
    # it's True for at least a node, we act as if it were set for all the nodes
2708
    self._exclusive_storage = compat.any(es_flags.values())
2709
    if self._exclusive_storage:
2710
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2711

    
2712
    # At this point, we have the in-memory data structures complete,
2713
    # except for the runtime information, which we'll gather next
2714

    
2715
    # Due to the way our RPC system works, exact response times cannot be
2716
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2717
    # time before and after executing the request, we can at least have a time
2718
    # window.
2719
    nvinfo_starttime = time.time()
2720
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2721
                                           node_verify_param,
2722
                                           self.cfg.GetClusterName(),
2723
                                           self.cfg.GetClusterInfo().hvparams)
2724
    nvinfo_endtime = time.time()
2725

    
2726
    if self.extra_lv_nodes and vg_name is not None:
2727
      extra_lv_nvinfo = \
2728
          self.rpc.call_node_verify(self.extra_lv_nodes,
2729
                                    {constants.NV_LVLIST: vg_name},
2730
                                    self.cfg.GetClusterName(),
2731
                                    self.cfg.GetClusterInfo().hvparams)
2732
    else:
2733
      extra_lv_nvinfo = {}
2734

    
2735
    all_drbd_map = self.cfg.ComputeDRBDMap()
2736

    
2737
    feedback_fn("* Gathering disk information (%s nodes)" %
2738
                len(self.my_node_uuids))
2739
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2740
                                     self.my_inst_info)
2741

    
2742
    feedback_fn("* Verifying configuration file consistency")
2743

    
2744
    # If not all nodes are being checked, we need to make sure the master node
2745
    # and a non-checked vm_capable node are in the list.
2746
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2747
    if absent_node_uuids:
2748
      vf_nvinfo = all_nvinfo.copy()
2749
      vf_node_info = list(self.my_node_info.values())
2750
      additional_node_uuids = []
2751
      if master_node_uuid not in self.my_node_info:
2752
        additional_node_uuids.append(master_node_uuid)
2753
        vf_node_info.append(self.all_node_info[master_node_uuid])
2754
      # Add the first vm_capable node we find which is not included,
2755
      # excluding the master node (which we already have)
2756
      for node_uuid in absent_node_uuids:
2757
        nodeinfo = self.all_node_info[node_uuid]
2758
        if (nodeinfo.vm_capable and not nodeinfo.offline and
2759
            node_uuid != master_node_uuid):
2760
          additional_node_uuids.append(node_uuid)
2761
          vf_node_info.append(self.all_node_info[node_uuid])
2762
          break
2763
      key = constants.NV_FILELIST
2764
      vf_nvinfo.update(self.rpc.call_node_verify(
2765
         additional_node_uuids, {key: node_verify_param[key]},
2766
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2767
    else:
2768
      vf_nvinfo = all_nvinfo
2769
      vf_node_info = self.my_node_info.values()
2770

    
2771
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2772

    
2773
    feedback_fn("* Verifying node status")
2774

    
2775
    refos_img = None
2776

    
2777
    for node_i in node_data_list:
2778
      nimg = node_image[node_i.uuid]
2779

    
2780
      if node_i.offline:
2781
        if verbose:
2782
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
2783
        n_offline += 1
2784
        continue
2785

    
2786
      if node_i.uuid == master_node_uuid:
2787
        ntype = "master"
2788
      elif node_i.master_candidate:
2789
        ntype = "master candidate"
2790
      elif node_i.drained:
2791
        ntype = "drained"
2792
        n_drained += 1
2793
      else:
2794
        ntype = "regular"
2795
      if verbose:
2796
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2797

    
2798
      msg = all_nvinfo[node_i.uuid].fail_msg
2799
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2800
                    "while contacting node: %s", msg)
2801
      if msg:
2802
        nimg.rpc_fail = True
2803
        continue
2804

    
2805
      nresult = all_nvinfo[node_i.uuid].payload
2806

    
2807
      nimg.call_ok = self._VerifyNode(node_i, nresult)
2808
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2809
      self._VerifyNodeNetwork(node_i, nresult)
2810
      self._VerifyNodeUserScripts(node_i, nresult)
2811
      self._VerifyOob(node_i, nresult)
2812
      self._VerifyFileStoragePaths(node_i, nresult,
2813
                                   node_i.uuid == master_node_uuid)
2814

    
2815
      if nimg.vm_capable:
2816
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2817
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2818
                             all_drbd_map)
2819

    
2820
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2821
        self._UpdateNodeInstances(node_i, nresult, nimg)
2822
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2823
        self._UpdateNodeOS(node_i, nresult, nimg)
2824

    
2825
        if not nimg.os_fail:
2826
          if refos_img is None:
2827
            refos_img = nimg
2828
          self._VerifyNodeOS(node_i, nimg, refos_img)
2829
        self._VerifyNodeBridges(node_i, nresult, bridges)
2830

    
2831
        # Check whether all running instancies are primary for the node. (This
2832
        # can no longer be done from _VerifyInstance below, since some of the
2833
        # wrong instances could be from other node groups.)
2834
        non_primary_inst = set(nimg.instances).difference(nimg.pinst)
2835

    
2836
        for inst in non_primary_inst:
2837
          test = inst in self.all_inst_info
2838
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE, inst,
2839
                        "instance should not run on node %s", node_i.name)
2840
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2841
                        "node is running unknown instance %s", inst)
2842

    
2843
    self._VerifyGroupDRBDVersion(all_nvinfo)
2844
    self._VerifyGroupLVM(node_image, vg_name)
2845

    
2846
    for node_uuid, result in extra_lv_nvinfo.items():
2847
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
2848
                              node_image[node_uuid], vg_name)
2849

    
2850
    feedback_fn("* Verifying instance status")
2851
    for instance in self.my_inst_names:
2852
      if verbose:
2853
        feedback_fn("* Verifying instance %s" % instance)
2854
      inst_config = self.my_inst_info[instance]
2855
      self._VerifyInstance(instance, inst_config, node_image,
2856
                           instdisk[instance])
2857

    
2858
      # If the instance is non-redundant we cannot survive losing its primary
2859
      # node, so we are not N+1 compliant.
2860
      if inst_config.disk_template not in constants.DTS_MIRRORED:
2861
        i_non_redundant.append(instance)
2862

    
2863
      if not cluster.FillBE(inst_config)[constants.BE_AUTO_BALANCE]:
2864
        i_non_a_balanced.append(instance)
2865

    
2866
    feedback_fn("* Verifying orphan volumes")
2867
    reserved = utils.FieldSet(*cluster.reserved_lvs)
2868

    
2869
    # We will get spurious "unknown volume" warnings if any node of this group
2870
    # is secondary for an instance whose primary is in another group. To avoid
2871
    # them, we find these instances and add their volumes to node_vol_should.
2872
    for inst in self.all_inst_info.values():
2873
      for secondary in inst.secondary_nodes:
2874
        if (secondary in self.my_node_info
2875
            and inst.name not in self.my_inst_info):
2876
          inst.MapLVsByNode(node_vol_should)
2877
          break
2878

    
2879
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2880

    
2881
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2882
      feedback_fn("* Verifying N+1 Memory redundancy")
2883
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2884

    
2885
    feedback_fn("* Other Notes")
2886
    if i_non_redundant:
2887
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2888
                  % len(i_non_redundant))
2889

    
2890
    if i_non_a_balanced:
2891
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2892
                  % len(i_non_a_balanced))
2893

    
2894
    if i_offline:
2895
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2896

    
2897
    if n_offline:
2898
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2899

    
2900
    if n_drained:
2901
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2902

    
2903
    return not self.bad
2904

    
2905
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2906
    """Analyze the post-hooks' result
2907

2908
    This method analyses the hook result, handles it, and sends some
2909
    nicely-formatted feedback back to the user.
2910

2911
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
2912
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2913
    @param hooks_results: the results of the multi-node hooks rpc call
2914
    @param feedback_fn: function used send feedback back to the caller
2915
    @param lu_result: previous Exec result
2916
    @return: the new Exec result, based on the previous result
2917
        and hook results
2918

2919
    """
2920
    # We only really run POST phase hooks, only for non-empty groups,
2921
    # and are only interested in their results
2922
    if not self.my_node_uuids:
2923
      # empty node group
2924
      pass
2925
    elif phase == constants.HOOKS_PHASE_POST:
2926
      # Used to change hooks' output to proper indentation
2927
      feedback_fn("* Hooks Results")
2928
      assert hooks_results, "invalid result from hooks"
2929

    
2930
      for node_name in hooks_results:
2931
        res = hooks_results[node_name]
2932
        msg = res.fail_msg
2933
        test = msg and not res.offline
2934
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2935
                      "Communication failure in hooks execution: %s", msg)
2936
        if res.offline or msg:
2937
          # No need to investigate payload if node is offline or gave
2938
          # an error.
2939
          continue
2940
        for script, hkr, output in res.payload:
2941
          test = hkr == constants.HKR_FAIL
2942
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2943
                        "Script %s failed, output:", script)
2944
          if test:
2945
            output = self._HOOKS_INDENT_RE.sub("      ", output)
2946
            feedback_fn("%s" % output)
2947
            lu_result = False
2948

    
2949
    return lu_result
2950

    
2951

    
2952
class LUClusterVerifyDisks(NoHooksLU):
2953
  """Verifies the cluster disks status.
2954

2955
  """
2956
  REQ_BGL = False
2957

    
2958
  def ExpandNames(self):
2959
    self.share_locks = ShareAll()
2960
    self.needed_locks = {
2961
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
2962
      }
2963

    
2964
  def Exec(self, feedback_fn):
2965
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2966

    
2967
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2968
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2969
                           for group in group_names])