Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ c1410048

History | View | Annotate | Download (127.7 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import copy
25
import itertools
26
import logging
27
import operator
28
import os
29
import re
30
import time
31

    
32
from ganeti import compat
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import hypervisor
36
from ganeti import locking
37
from ganeti import masterd
38
from ganeti import netutils
39
from ganeti import objects
40
from ganeti import opcodes
41
from ganeti import pathutils
42
from ganeti import query
43
import ganeti.rpc.node as rpc
44
from ganeti import runtime
45
from ganeti import ssh
46
from ganeti import uidpool
47
from ganeti import utils
48
from ganeti import vcluster
49

    
50
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
51
  ResultWithJobs
52
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
53
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
54
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
55
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
56
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
57
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
58
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
59
  CheckDiskAccessModeConsistency, CreateNewClientCert
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
def _UpdateMasterClientCert(
65
    lu, master_uuid, cluster, feedback_fn,
66
    client_cert=pathutils.NODED_CLIENT_CERT_FILE,
67
    client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP):
68
  """Renews the master's client certificate and propagates the config.
69

70
  @type lu: C{LogicalUnit}
71
  @param lu: the logical unit holding the config
72
  @type master_uuid: string
73
  @param master_uuid: the master node's UUID
74
  @type cluster: C{objects.Cluster}
75
  @param cluster: the cluster's configuration
76
  @type feedback_fn: function
77
  @param feedback_fn: feedback functions for config updates
78
  @type client_cert: string
79
  @param client_cert: the path of the client certificate
80
  @type client_cert_tmp: string
81
  @param client_cert_tmp: the temporary path of the client certificate
82
  @rtype: string
83
  @return: the digest of the newly created client certificate
84

85
  """
86
  client_digest = CreateNewClientCert(lu, master_uuid, filename=client_cert_tmp)
87
  utils.AddNodeToCandidateCerts(master_uuid, client_digest,
88
                                cluster.candidate_certs)
89
  # This triggers an update of the config and distribution of it with the old
90
  # SSL certificate
91
  lu.cfg.Update(cluster, feedback_fn)
92

    
93
  utils.RemoveFile(client_cert)
94
  utils.RenameFile(client_cert_tmp, client_cert)
95
  return client_digest
96

    
97

    
98
class LUClusterRenewCrypto(NoHooksLU):
99
  """Renew the cluster's crypto tokens.
100

101
  Note that most of this operation is done in gnt_cluster.py, this LU only
102
  takes care of the renewal of the client SSL certificates.
103

104
  """
105
  def Exec(self, feedback_fn):
106
    master_uuid = self.cfg.GetMasterNode()
107
    cluster = self.cfg.GetClusterInfo()
108

    
109
    server_digest = utils.GetCertificateDigest(
110
      cert_filename=pathutils.NODED_CERT_FILE)
111
    utils.AddNodeToCandidateCerts("%s-SERVER" % master_uuid,
112
                                  server_digest,
113
                                  cluster.candidate_certs)
114
    new_master_digest = _UpdateMasterClientCert(self, master_uuid, cluster,
115
                                                feedback_fn)
116

    
117
    cluster.candidate_certs = {master_uuid: new_master_digest}
118
    nodes = self.cfg.GetAllNodesInfo()
119
    for (node_uuid, node_info) in nodes.items():
120
      if node_uuid != master_uuid:
121
        new_digest = CreateNewClientCert(self, node_uuid)
122
        if node_info.master_candidate:
123
          cluster.candidate_certs[node_uuid] = new_digest
124
    # Trigger another update of the config now with the new master cert
125
    self.cfg.Update(cluster, feedback_fn)
126

    
127

    
128
class LUClusterActivateMasterIp(NoHooksLU):
129
  """Activate the master IP on the master node.
130

131
  """
132
  def Exec(self, feedback_fn):
133
    """Activate the master IP.
134

135
    """
136
    master_params = self.cfg.GetMasterNetworkParameters()
137
    ems = self.cfg.GetUseExternalMipScript()
138
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
139
                                                   master_params, ems)
140
    result.Raise("Could not activate the master IP")
141

    
142

    
143
class LUClusterDeactivateMasterIp(NoHooksLU):
144
  """Deactivate the master IP on the master node.
145

146
  """
147
  def Exec(self, feedback_fn):
148
    """Deactivate the master IP.
149

150
    """
151
    master_params = self.cfg.GetMasterNetworkParameters()
152
    ems = self.cfg.GetUseExternalMipScript()
153
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
154
                                                     master_params, ems)
155
    result.Raise("Could not deactivate the master IP")
156

    
157

    
158
class LUClusterConfigQuery(NoHooksLU):
159
  """Return configuration values.
160

161
  """
162
  REQ_BGL = False
163

    
164
  def CheckArguments(self):
165
    self.cq = ClusterQuery(None, self.op.output_fields, False)
166

    
167
  def ExpandNames(self):
168
    self.cq.ExpandNames(self)
169

    
170
  def DeclareLocks(self, level):
171
    self.cq.DeclareLocks(self, level)
172

    
173
  def Exec(self, feedback_fn):
174
    result = self.cq.OldStyleQuery(self)
175

    
176
    assert len(result) == 1
177

    
178
    return result[0]
179

    
180

    
181
class LUClusterDestroy(LogicalUnit):
182
  """Logical unit for destroying the cluster.
183

184
  """
185
  HPATH = "cluster-destroy"
186
  HTYPE = constants.HTYPE_CLUSTER
187

    
188
  def BuildHooksEnv(self):
189
    """Build hooks env.
190

191
    """
192
    return {
193
      "OP_TARGET": self.cfg.GetClusterName(),
194
      }
195

    
196
  def BuildHooksNodes(self):
197
    """Build hooks nodes.
198

199
    """
200
    return ([], [])
201

    
202
  def CheckPrereq(self):
203
    """Check prerequisites.
204

205
    This checks whether the cluster is empty.
206

207
    Any errors are signaled by raising errors.OpPrereqError.
208

209
    """
210
    master = self.cfg.GetMasterNode()
211

    
212
    nodelist = self.cfg.GetNodeList()
213
    if len(nodelist) != 1 or nodelist[0] != master:
214
      raise errors.OpPrereqError("There are still %d node(s) in"
215
                                 " this cluster." % (len(nodelist) - 1),
216
                                 errors.ECODE_INVAL)
217
    instancelist = self.cfg.GetInstanceList()
218
    if instancelist:
219
      raise errors.OpPrereqError("There are still %d instance(s) in"
220
                                 " this cluster." % len(instancelist),
221
                                 errors.ECODE_INVAL)
222

    
223
  def Exec(self, feedback_fn):
224
    """Destroys the cluster.
225

226
    """
227
    master_params = self.cfg.GetMasterNetworkParameters()
228

    
229
    # Run post hooks on master node before it's removed
230
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
231

    
232
    ems = self.cfg.GetUseExternalMipScript()
233
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
234
                                                     master_params, ems)
235
    result.Warn("Error disabling the master IP address", self.LogWarning)
236
    return master_params.uuid
237

    
238

    
239
class LUClusterPostInit(LogicalUnit):
240
  """Logical unit for running hooks after cluster initialization.
241

242
  """
243
  HPATH = "cluster-init"
244
  HTYPE = constants.HTYPE_CLUSTER
245

    
246
  def CheckArguments(self):
247
    self.master_uuid = self.cfg.GetMasterNode()
248
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
249

    
250
    # TODO: When Issue 584 is solved, and None is properly parsed when used
251
    # as a default value, ndparams.get(.., None) can be changed to
252
    # ndparams[..] to access the values directly
253

    
254
    # OpenvSwitch: Warn user if link is missing
255
    if (self.master_ndparams[constants.ND_OVS] and not
256
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
257
      self.LogInfo("No physical interface for OpenvSwitch was given."
258
                   " OpenvSwitch will not have an outside connection. This"
259
                   " might not be what you want.")
260

    
261
  def BuildHooksEnv(self):
262
    """Build hooks env.
263

264
    """
265
    return {
266
      "OP_TARGET": self.cfg.GetClusterName(),
267
      }
268

    
269
  def BuildHooksNodes(self):
270
    """Build hooks nodes.
271

272
    """
273
    return ([], [self.cfg.GetMasterNode()])
274

    
275
  def Exec(self, feedback_fn):
276
    """Create and configure Open vSwitch
277

278
    """
279
    if self.master_ndparams[constants.ND_OVS]:
280
      result = self.rpc.call_node_configure_ovs(
281
                 self.master_uuid,
282
                 self.master_ndparams[constants.ND_OVS_NAME],
283
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
284
      result.Raise("Could not successully configure Open vSwitch")
285

    
286
    cluster = self.cfg.GetClusterInfo()
287
    _UpdateMasterClientCert(self, self.master_uuid, cluster, feedback_fn)
288

    
289
    return True
290

    
291

    
292
class ClusterQuery(QueryBase):
293
  FIELDS = query.CLUSTER_FIELDS
294

    
295
  #: Do not sort (there is only one item)
296
  SORT_FIELD = None
297

    
298
  def ExpandNames(self, lu):
299
    lu.needed_locks = {}
300

    
301
    # The following variables interact with _QueryBase._GetNames
302
    self.wanted = locking.ALL_SET
303
    self.do_locking = self.use_locking
304

    
305
    if self.do_locking:
306
      raise errors.OpPrereqError("Can not use locking for cluster queries",
307
                                 errors.ECODE_INVAL)
308

    
309
  def DeclareLocks(self, lu, level):
310
    pass
311

    
312
  def _GetQueryData(self, lu):
313
    """Computes the list of nodes and their attributes.
314

315
    """
316
    # Locking is not used
317
    assert not (compat.any(lu.glm.is_owned(level)
318
                           for level in locking.LEVELS
319
                           if level != locking.LEVEL_CLUSTER) or
320
                self.do_locking or self.use_locking)
321

    
322
    if query.CQ_CONFIG in self.requested_data:
323
      cluster = lu.cfg.GetClusterInfo()
324
      nodes = lu.cfg.GetAllNodesInfo()
325
    else:
326
      cluster = NotImplemented
327
      nodes = NotImplemented
328

    
329
    if query.CQ_QUEUE_DRAINED in self.requested_data:
330
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
331
    else:
332
      drain_flag = NotImplemented
333

    
334
    if query.CQ_WATCHER_PAUSE in self.requested_data:
335
      master_node_uuid = lu.cfg.GetMasterNode()
336

    
337
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
338
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
339
                   lu.cfg.GetMasterNodeName())
340

    
341
      watcher_pause = result.payload
342
    else:
343
      watcher_pause = NotImplemented
344

    
345
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
346

    
347

    
348
class LUClusterQuery(NoHooksLU):
349
  """Query cluster configuration.
350

351
  """
352
  REQ_BGL = False
353

    
354
  def ExpandNames(self):
355
    self.needed_locks = {}
356

    
357
  def Exec(self, feedback_fn):
358
    """Return cluster config.
359

360
    """
361
    cluster = self.cfg.GetClusterInfo()
362
    os_hvp = {}
363

    
364
    # Filter just for enabled hypervisors
365
    for os_name, hv_dict in cluster.os_hvp.items():
366
      os_hvp[os_name] = {}
367
      for hv_name, hv_params in hv_dict.items():
368
        if hv_name in cluster.enabled_hypervisors:
369
          os_hvp[os_name][hv_name] = hv_params
370

    
371
    # Convert ip_family to ip_version
372
    primary_ip_version = constants.IP4_VERSION
373
    if cluster.primary_ip_family == netutils.IP6Address.family:
374
      primary_ip_version = constants.IP6_VERSION
375

    
376
    result = {
377
      "software_version": constants.RELEASE_VERSION,
378
      "protocol_version": constants.PROTOCOL_VERSION,
379
      "config_version": constants.CONFIG_VERSION,
380
      "os_api_version": max(constants.OS_API_VERSIONS),
381
      "export_version": constants.EXPORT_VERSION,
382
      "vcs_version": constants.VCS_VERSION,
383
      "architecture": runtime.GetArchInfo(),
384
      "name": cluster.cluster_name,
385
      "master": self.cfg.GetMasterNodeName(),
386
      "default_hypervisor": cluster.primary_hypervisor,
387
      "enabled_hypervisors": cluster.enabled_hypervisors,
388
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
389
                        for hypervisor_name in cluster.enabled_hypervisors]),
390
      "os_hvp": os_hvp,
391
      "beparams": cluster.beparams,
392
      "osparams": cluster.osparams,
393
      "ipolicy": cluster.ipolicy,
394
      "nicparams": cluster.nicparams,
395
      "ndparams": cluster.ndparams,
396
      "diskparams": cluster.diskparams,
397
      "candidate_pool_size": cluster.candidate_pool_size,
398
      "master_netdev": cluster.master_netdev,
399
      "master_netmask": cluster.master_netmask,
400
      "use_external_mip_script": cluster.use_external_mip_script,
401
      "volume_group_name": cluster.volume_group_name,
402
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
403
      "file_storage_dir": cluster.file_storage_dir,
404
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
405
      "maintain_node_health": cluster.maintain_node_health,
406
      "ctime": cluster.ctime,
407
      "mtime": cluster.mtime,
408
      "uuid": cluster.uuid,
409
      "tags": list(cluster.GetTags()),
410
      "uid_pool": cluster.uid_pool,
411
      "default_iallocator": cluster.default_iallocator,
412
      "default_iallocator_params": cluster.default_iallocator_params,
413
      "reserved_lvs": cluster.reserved_lvs,
414
      "primary_ip_version": primary_ip_version,
415
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
416
      "hidden_os": cluster.hidden_os,
417
      "blacklisted_os": cluster.blacklisted_os,
418
      "enabled_disk_templates": cluster.enabled_disk_templates,
419
      }
420

    
421
    return result
422

    
423

    
424
class LUClusterRedistConf(NoHooksLU):
425
  """Force the redistribution of cluster configuration.
426

427
  This is a very simple LU.
428

429
  """
430
  REQ_BGL = False
431

    
432
  def ExpandNames(self):
433
    self.needed_locks = {
434
      locking.LEVEL_NODE: locking.ALL_SET,
435
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
436
    }
437
    self.share_locks = ShareAll()
438

    
439
  def Exec(self, feedback_fn):
440
    """Redistribute the configuration.
441

442
    """
443
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
444
    RedistributeAncillaryFiles(self)
445

    
446

    
447
class LUClusterRename(LogicalUnit):
448
  """Rename the cluster.
449

450
  """
451
  HPATH = "cluster-rename"
452
  HTYPE = constants.HTYPE_CLUSTER
453

    
454
  def BuildHooksEnv(self):
455
    """Build hooks env.
456

457
    """
458
    return {
459
      "OP_TARGET": self.cfg.GetClusterName(),
460
      "NEW_NAME": self.op.name,
461
      }
462

    
463
  def BuildHooksNodes(self):
464
    """Build hooks nodes.
465

466
    """
467
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
468

    
469
  def CheckPrereq(self):
470
    """Verify that the passed name is a valid one.
471

472
    """
473
    hostname = netutils.GetHostname(name=self.op.name,
474
                                    family=self.cfg.GetPrimaryIPFamily())
475

    
476
    new_name = hostname.name
477
    self.ip = new_ip = hostname.ip
478
    old_name = self.cfg.GetClusterName()
479
    old_ip = self.cfg.GetMasterIP()
480
    if new_name == old_name and new_ip == old_ip:
481
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
482
                                 " cluster has changed",
483
                                 errors.ECODE_INVAL)
484
    if new_ip != old_ip:
485
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
486
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
487
                                   " reachable on the network" %
488
                                   new_ip, errors.ECODE_NOTUNIQUE)
489

    
490
    self.op.name = new_name
491

    
492
  def Exec(self, feedback_fn):
493
    """Rename the cluster.
494

495
    """
496
    clustername = self.op.name
497
    new_ip = self.ip
498

    
499
    # shutdown the master IP
500
    master_params = self.cfg.GetMasterNetworkParameters()
501
    ems = self.cfg.GetUseExternalMipScript()
502
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
503
                                                     master_params, ems)
504
    result.Raise("Could not disable the master role")
505

    
506
    try:
507
      cluster = self.cfg.GetClusterInfo()
508
      cluster.cluster_name = clustername
509
      cluster.master_ip = new_ip
510
      self.cfg.Update(cluster, feedback_fn)
511

    
512
      # update the known hosts file
513
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
514
      node_list = self.cfg.GetOnlineNodeList()
515
      try:
516
        node_list.remove(master_params.uuid)
517
      except ValueError:
518
        pass
519
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
520
    finally:
521
      master_params.ip = new_ip
522
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
523
                                                     master_params, ems)
524
      result.Warn("Could not re-enable the master role on the master,"
525
                  " please restart manually", self.LogWarning)
526

    
527
    return clustername
528

    
529

    
530
class LUClusterRepairDiskSizes(NoHooksLU):
531
  """Verifies the cluster disks sizes.
532

533
  """
534
  REQ_BGL = False
535

    
536
  def ExpandNames(self):
537
    if self.op.instances:
538
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
539
      # Not getting the node allocation lock as only a specific set of
540
      # instances (and their nodes) is going to be acquired
541
      self.needed_locks = {
542
        locking.LEVEL_NODE_RES: [],
543
        locking.LEVEL_INSTANCE: self.wanted_names,
544
        }
545
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
546
    else:
547
      self.wanted_names = None
548
      self.needed_locks = {
549
        locking.LEVEL_NODE_RES: locking.ALL_SET,
550
        locking.LEVEL_INSTANCE: locking.ALL_SET,
551

    
552
        # This opcode is acquires the node locks for all instances
553
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
554
        }
555

    
556
    self.share_locks = {
557
      locking.LEVEL_NODE_RES: 1,
558
      locking.LEVEL_INSTANCE: 0,
559
      locking.LEVEL_NODE_ALLOC: 1,
560
      }
561

    
562
  def DeclareLocks(self, level):
563
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
564
      self._LockInstancesNodes(primary_only=True, level=level)
565

    
566
  def CheckPrereq(self):
567
    """Check prerequisites.
568

569
    This only checks the optional instance list against the existing names.
570

571
    """
572
    if self.wanted_names is None:
573
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
574

    
575
    self.wanted_instances = \
576
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
577

    
578
  def _EnsureChildSizes(self, disk):
579
    """Ensure children of the disk have the needed disk size.
580

581
    This is valid mainly for DRBD8 and fixes an issue where the
582
    children have smaller disk size.
583

584
    @param disk: an L{ganeti.objects.Disk} object
585

586
    """
587
    if disk.dev_type == constants.DT_DRBD8:
588
      assert disk.children, "Empty children for DRBD8?"
589
      fchild = disk.children[0]
590
      mismatch = fchild.size < disk.size
591
      if mismatch:
592
        self.LogInfo("Child disk has size %d, parent %d, fixing",
593
                     fchild.size, disk.size)
594
        fchild.size = disk.size
595

    
596
      # and we recurse on this child only, not on the metadev
597
      return self._EnsureChildSizes(fchild) or mismatch
598
    else:
599
      return False
600

    
601
  def Exec(self, feedback_fn):
602
    """Verify the size of cluster disks.
603

604
    """
605
    # TODO: check child disks too
606
    # TODO: check differences in size between primary/secondary nodes
607
    per_node_disks = {}
608
    for instance in self.wanted_instances:
609
      pnode = instance.primary_node
610
      if pnode not in per_node_disks:
611
        per_node_disks[pnode] = []
612
      for idx, disk in enumerate(instance.disks):
613
        per_node_disks[pnode].append((instance, idx, disk))
614

    
615
    assert not (frozenset(per_node_disks.keys()) -
616
                self.owned_locks(locking.LEVEL_NODE_RES)), \
617
      "Not owning correct locks"
618
    assert not self.owned_locks(locking.LEVEL_NODE)
619

    
620
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
621
                                               per_node_disks.keys())
622

    
623
    changed = []
624
    for node_uuid, dskl in per_node_disks.items():
625
      if not dskl:
626
        # no disks on the node
627
        continue
628

    
629
      newl = [([v[2].Copy()], v[0]) for v in dskl]
630
      node_name = self.cfg.GetNodeName(node_uuid)
631
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
632
      if result.fail_msg:
633
        self.LogWarning("Failure in blockdev_getdimensions call to node"
634
                        " %s, ignoring", node_name)
635
        continue
636
      if len(result.payload) != len(dskl):
637
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
638
                        " result.payload=%s", node_name, len(dskl),
639
                        result.payload)
640
        self.LogWarning("Invalid result from node %s, ignoring node results",
641
                        node_name)
642
        continue
643
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
644
        if dimensions is None:
645
          self.LogWarning("Disk %d of instance %s did not return size"
646
                          " information, ignoring", idx, instance.name)
647
          continue
648
        if not isinstance(dimensions, (tuple, list)):
649
          self.LogWarning("Disk %d of instance %s did not return valid"
650
                          " dimension information, ignoring", idx,
651
                          instance.name)
652
          continue
653
        (size, spindles) = dimensions
654
        if not isinstance(size, (int, long)):
655
          self.LogWarning("Disk %d of instance %s did not return valid"
656
                          " size information, ignoring", idx, instance.name)
657
          continue
658
        size = size >> 20
659
        if size != disk.size:
660
          self.LogInfo("Disk %d of instance %s has mismatched size,"
661
                       " correcting: recorded %d, actual %d", idx,
662
                       instance.name, disk.size, size)
663
          disk.size = size
664
          self.cfg.Update(instance, feedback_fn)
665
          changed.append((instance.name, idx, "size", size))
666
        if es_flags[node_uuid]:
667
          if spindles is None:
668
            self.LogWarning("Disk %d of instance %s did not return valid"
669
                            " spindles information, ignoring", idx,
670
                            instance.name)
671
          elif disk.spindles is None or disk.spindles != spindles:
672
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
673
                         " correcting: recorded %s, actual %s",
674
                         idx, instance.name, disk.spindles, spindles)
675
            disk.spindles = spindles
676
            self.cfg.Update(instance, feedback_fn)
677
            changed.append((instance.name, idx, "spindles", disk.spindles))
678
        if self._EnsureChildSizes(disk):
679
          self.cfg.Update(instance, feedback_fn)
680
          changed.append((instance.name, idx, "size", disk.size))
681
    return changed
682

    
683

    
684
def _ValidateNetmask(cfg, netmask):
685
  """Checks if a netmask is valid.
686

687
  @type cfg: L{config.ConfigWriter}
688
  @param cfg: The cluster configuration
689
  @type netmask: int
690
  @param netmask: the netmask to be verified
691
  @raise errors.OpPrereqError: if the validation fails
692

693
  """
694
  ip_family = cfg.GetPrimaryIPFamily()
695
  try:
696
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
697
  except errors.ProgrammerError:
698
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
699
                               ip_family, errors.ECODE_INVAL)
700
  if not ipcls.ValidateNetmask(netmask):
701
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
702
                               (netmask), errors.ECODE_INVAL)
703

    
704

    
705
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
706
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
707
    file_disk_template):
708
  """Checks whether the given file-based storage directory is acceptable.
709

710
  Note: This function is public, because it is also used in bootstrap.py.
711

712
  @type logging_warn_fn: function
713
  @param logging_warn_fn: function which accepts a string and logs it
714
  @type file_storage_dir: string
715
  @param file_storage_dir: the directory to be used for file-based instances
716
  @type enabled_disk_templates: list of string
717
  @param enabled_disk_templates: the list of enabled disk templates
718
  @type file_disk_template: string
719
  @param file_disk_template: the file-based disk template for which the
720
      path should be checked
721

722
  """
723
  assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
724
            constants.ST_FILE, constants.ST_SHARED_FILE
725
         ))
726
  file_storage_enabled = file_disk_template in enabled_disk_templates
727
  if file_storage_dir is not None:
728
    if file_storage_dir == "":
729
      if file_storage_enabled:
730
        raise errors.OpPrereqError(
731
            "Unsetting the '%s' storage directory while having '%s' storage"
732
            " enabled is not permitted." %
733
            (file_disk_template, file_disk_template))
734
    else:
735
      if not file_storage_enabled:
736
        logging_warn_fn(
737
            "Specified a %s storage directory, although %s storage is not"
738
            " enabled." % (file_disk_template, file_disk_template))
739
  else:
740
    raise errors.ProgrammerError("Received %s storage dir with value"
741
                                 " 'None'." % file_disk_template)
742

    
743

    
744
def CheckFileStoragePathVsEnabledDiskTemplates(
745
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
746
  """Checks whether the given file storage directory is acceptable.
747

748
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
749

750
  """
751
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
752
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
753
      constants.DT_FILE)
754

    
755

    
756
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
757
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
758
  """Checks whether the given shared file storage directory is acceptable.
759

760
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
761

762
  """
763
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
764
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
765
      constants.DT_SHARED_FILE)
766

    
767

    
768
class LUClusterSetParams(LogicalUnit):
769
  """Change the parameters of the cluster.
770

771
  """
772
  HPATH = "cluster-modify"
773
  HTYPE = constants.HTYPE_CLUSTER
774
  REQ_BGL = False
775

    
776
  def CheckArguments(self):
777
    """Check parameters
778

779
    """
780
    if self.op.uid_pool:
781
      uidpool.CheckUidPool(self.op.uid_pool)
782

    
783
    if self.op.add_uids:
784
      uidpool.CheckUidPool(self.op.add_uids)
785

    
786
    if self.op.remove_uids:
787
      uidpool.CheckUidPool(self.op.remove_uids)
788

    
789
    if self.op.master_netmask is not None:
790
      _ValidateNetmask(self.cfg, self.op.master_netmask)
791

    
792
    if self.op.diskparams:
793
      for dt_params in self.op.diskparams.values():
794
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
795
      try:
796
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
797
        CheckDiskAccessModeValidity(self.op.diskparams)
798
      except errors.OpPrereqError, err:
799
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
800
                                   errors.ECODE_INVAL)
801

    
802
  def ExpandNames(self):
803
    # FIXME: in the future maybe other cluster params won't require checking on
804
    # all nodes to be modified.
805
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
806
    # resource locks the right thing, shouldn't it be the BGL instead?
807
    self.needed_locks = {
808
      locking.LEVEL_NODE: locking.ALL_SET,
809
      locking.LEVEL_INSTANCE: locking.ALL_SET,
810
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
811
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
812
    }
813
    self.share_locks = ShareAll()
814

    
815
  def BuildHooksEnv(self):
816
    """Build hooks env.
817

818
    """
819
    return {
820
      "OP_TARGET": self.cfg.GetClusterName(),
821
      "NEW_VG_NAME": self.op.vg_name,
822
      }
823

    
824
  def BuildHooksNodes(self):
825
    """Build hooks nodes.
826

827
    """
828
    mn = self.cfg.GetMasterNode()
829
    return ([mn], [mn])
830

    
831
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
832
                   new_enabled_disk_templates):
833
    """Check the consistency of the vg name on all nodes and in case it gets
834
       unset whether there are instances still using it.
835

836
    """
837
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
838
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
839
                                            new_enabled_disk_templates)
840
    current_vg_name = self.cfg.GetVGName()
841

    
842
    if self.op.vg_name == '':
843
      if lvm_is_enabled:
844
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
845
                                   " disk templates are or get enabled.")
846

    
847
    if self.op.vg_name is None:
848
      if current_vg_name is None and lvm_is_enabled:
849
        raise errors.OpPrereqError("Please specify a volume group when"
850
                                   " enabling lvm-based disk-templates.")
851

    
852
    if self.op.vg_name is not None and not self.op.vg_name:
853
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
854
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
855
                                   " instances exist", errors.ECODE_INVAL)
856

    
857
    if (self.op.vg_name is not None and lvm_is_enabled) or \
858
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
859
      self._CheckVgNameOnNodes(node_uuids)
860

    
861
  def _CheckVgNameOnNodes(self, node_uuids):
862
    """Check the status of the volume group on each node.
863

864
    """
865
    vglist = self.rpc.call_vg_list(node_uuids)
866
    for node_uuid in node_uuids:
867
      msg = vglist[node_uuid].fail_msg
868
      if msg:
869
        # ignoring down node
870
        self.LogWarning("Error while gathering data on node %s"
871
                        " (ignoring node): %s",
872
                        self.cfg.GetNodeName(node_uuid), msg)
873
        continue
874
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
875
                                            self.op.vg_name,
876
                                            constants.MIN_VG_SIZE)
877
      if vgstatus:
878
        raise errors.OpPrereqError("Error on node '%s': %s" %
879
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
880
                                   errors.ECODE_ENVIRON)
881

    
882
  @staticmethod
883
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
884
                                old_enabled_disk_templates):
885
    """Computes three sets of disk templates.
886

887
    @see: C{_GetDiskTemplateSets} for more details.
888

889
    """
890
    enabled_disk_templates = None
891
    new_enabled_disk_templates = []
892
    disabled_disk_templates = []
893
    if op_enabled_disk_templates:
894
      enabled_disk_templates = op_enabled_disk_templates
895
      new_enabled_disk_templates = \
896
        list(set(enabled_disk_templates)
897
             - set(old_enabled_disk_templates))
898
      disabled_disk_templates = \
899
        list(set(old_enabled_disk_templates)
900
             - set(enabled_disk_templates))
901
    else:
902
      enabled_disk_templates = old_enabled_disk_templates
903
    return (enabled_disk_templates, new_enabled_disk_templates,
904
            disabled_disk_templates)
905

    
906
  def _GetDiskTemplateSets(self, cluster):
907
    """Computes three sets of disk templates.
908

909
    The three sets are:
910
      - disk templates that will be enabled after this operation (no matter if
911
        they were enabled before or not)
912
      - disk templates that get enabled by this operation (thus haven't been
913
        enabled before.)
914
      - disk templates that get disabled by this operation
915

916
    """
917
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
918
                                          cluster.enabled_disk_templates)
919

    
920
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
921
    """Checks the ipolicy.
922

923
    @type cluster: C{objects.Cluster}
924
    @param cluster: the cluster's configuration
925
    @type enabled_disk_templates: list of string
926
    @param enabled_disk_templates: list of (possibly newly) enabled disk
927
      templates
928

929
    """
930
    # FIXME: write unit tests for this
931
    if self.op.ipolicy:
932
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
933
                                           group_policy=False)
934

    
935
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
936
                                  enabled_disk_templates)
937

    
938
      all_instances = self.cfg.GetAllInstancesInfo().values()
939
      violations = set()
940
      for group in self.cfg.GetAllNodeGroupsInfo().values():
941
        instances = frozenset([inst for inst in all_instances
942
                               if compat.any(nuuid in group.members
943
                                             for nuuid in inst.all_nodes)])
944
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
945
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
946
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
947
                                           self.cfg)
948
        if new:
949
          violations.update(new)
950

    
951
      if violations:
952
        self.LogWarning("After the ipolicy change the following instances"
953
                        " violate them: %s",
954
                        utils.CommaJoin(utils.NiceSort(violations)))
955
    else:
956
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
957
                                  enabled_disk_templates)
958

    
959
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
960
    """Checks whether the set DRBD helper actually exists on the nodes.
961

962
    @type drbd_helper: string
963
    @param drbd_helper: path of the drbd usermode helper binary
964
    @type node_uuids: list of strings
965
    @param node_uuids: list of node UUIDs to check for the helper
966

967
    """
968
    # checks given drbd helper on all nodes
969
    helpers = self.rpc.call_drbd_helper(node_uuids)
970
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
971
      if ninfo.offline:
972
        self.LogInfo("Not checking drbd helper on offline node %s",
973
                     ninfo.name)
974
        continue
975
      msg = helpers[ninfo.uuid].fail_msg
976
      if msg:
977
        raise errors.OpPrereqError("Error checking drbd helper on node"
978
                                   " '%s': %s" % (ninfo.name, msg),
979
                                   errors.ECODE_ENVIRON)
980
      node_helper = helpers[ninfo.uuid].payload
981
      if node_helper != drbd_helper:
982
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
983
                                   (ninfo.name, node_helper),
984
                                   errors.ECODE_ENVIRON)
985

    
986
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
987
    """Check the DRBD usermode helper.
988

989
    @type node_uuids: list of strings
990
    @param node_uuids: a list of nodes' UUIDs
991
    @type drbd_enabled: boolean
992
    @param drbd_enabled: whether DRBD will be enabled after this operation
993
      (no matter if it was disabled before or not)
994
    @type drbd_gets_enabled: boolen
995
    @param drbd_gets_enabled: true if DRBD was disabled before this
996
      operation, but will be enabled afterwards
997

998
    """
999
    if self.op.drbd_helper == '':
1000
      if drbd_enabled:
1001
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1002
                                   " DRBD is enabled.")
1003
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
1004
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1005
                                   " drbd-based instances exist",
1006
                                   errors.ECODE_INVAL)
1007

    
1008
    else:
1009
      if self.op.drbd_helper is not None and drbd_enabled:
1010
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
1011
      else:
1012
        if drbd_gets_enabled:
1013
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
1014
          if current_drbd_helper is not None:
1015
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
1016
          else:
1017
            raise errors.OpPrereqError("Cannot enable DRBD without a"
1018
                                       " DRBD usermode helper set.")
1019

    
1020
  def _CheckInstancesOfDisabledDiskTemplates(
1021
      self, disabled_disk_templates):
1022
    """Check whether we try to disable a disk template that is in use.
1023

1024
    @type disabled_disk_templates: list of string
1025
    @param disabled_disk_templates: list of disk templates that are going to
1026
      be disabled by this operation
1027

1028
    """
1029
    for disk_template in disabled_disk_templates:
1030
      if self.cfg.HasAnyDiskOfType(disk_template):
1031
        raise errors.OpPrereqError(
1032
            "Cannot disable disk template '%s', because there is at least one"
1033
            " instance using it." % disk_template)
1034

    
1035
  def CheckPrereq(self):
1036
    """Check prerequisites.
1037

1038
    This checks whether the given params don't conflict and
1039
    if the given volume group is valid.
1040

1041
    """
1042
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1043
    self.cluster = cluster = self.cfg.GetClusterInfo()
1044

    
1045
    vm_capable_node_uuids = [node.uuid
1046
                             for node in self.cfg.GetAllNodesInfo().values()
1047
                             if node.uuid in node_uuids and node.vm_capable]
1048

    
1049
    (enabled_disk_templates, new_enabled_disk_templates,
1050
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
1051
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
1052

    
1053
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
1054
                      new_enabled_disk_templates)
1055

    
1056
    if self.op.file_storage_dir is not None:
1057
      CheckFileStoragePathVsEnabledDiskTemplates(
1058
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
1059

    
1060
    if self.op.shared_file_storage_dir is not None:
1061
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
1062
          self.LogWarning, self.op.shared_file_storage_dir,
1063
          enabled_disk_templates)
1064

    
1065
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1066
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1067
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1068

    
1069
    # validate params changes
1070
    if self.op.beparams:
1071
      objects.UpgradeBeParams(self.op.beparams)
1072
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1073
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1074

    
1075
    if self.op.ndparams:
1076
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1077
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1078

    
1079
      # TODO: we need a more general way to handle resetting
1080
      # cluster-level parameters to default values
1081
      if self.new_ndparams["oob_program"] == "":
1082
        self.new_ndparams["oob_program"] = \
1083
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1084

    
1085
    if self.op.hv_state:
1086
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1087
                                           self.cluster.hv_state_static)
1088
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1089
                               for hv, values in new_hv_state.items())
1090

    
1091
    if self.op.disk_state:
1092
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1093
                                               self.cluster.disk_state_static)
1094
      self.new_disk_state = \
1095
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1096
                            for name, values in svalues.items()))
1097
             for storage, svalues in new_disk_state.items())
1098

    
1099
    self._CheckIpolicy(cluster, enabled_disk_templates)
1100

    
1101
    if self.op.nicparams:
1102
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1103
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1104
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1105
      nic_errors = []
1106

    
1107
      # check all instances for consistency
1108
      for instance in self.cfg.GetAllInstancesInfo().values():
1109
        for nic_idx, nic in enumerate(instance.nics):
1110
          params_copy = copy.deepcopy(nic.nicparams)
1111
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1112

    
1113
          # check parameter syntax
1114
          try:
1115
            objects.NIC.CheckParameterSyntax(params_filled)
1116
          except errors.ConfigurationError, err:
1117
            nic_errors.append("Instance %s, nic/%d: %s" %
1118
                              (instance.name, nic_idx, err))
1119

    
1120
          # if we're moving instances to routed, check that they have an ip
1121
          target_mode = params_filled[constants.NIC_MODE]
1122
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1123
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1124
                              " address" % (instance.name, nic_idx))
1125
      if nic_errors:
1126
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1127
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1128

    
1129
    # hypervisor list/parameters
1130
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1131
    if self.op.hvparams:
1132
      for hv_name, hv_dict in self.op.hvparams.items():
1133
        if hv_name not in self.new_hvparams:
1134
          self.new_hvparams[hv_name] = hv_dict
1135
        else:
1136
          self.new_hvparams[hv_name].update(hv_dict)
1137

    
1138
    # disk template parameters
1139
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1140
    if self.op.diskparams:
1141
      for dt_name, dt_params in self.op.diskparams.items():
1142
        if dt_name not in self.new_diskparams:
1143
          self.new_diskparams[dt_name] = dt_params
1144
        else:
1145
          self.new_diskparams[dt_name].update(dt_params)
1146
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1147

    
1148
    # os hypervisor parameters
1149
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1150
    if self.op.os_hvp:
1151
      for os_name, hvs in self.op.os_hvp.items():
1152
        if os_name not in self.new_os_hvp:
1153
          self.new_os_hvp[os_name] = hvs
1154
        else:
1155
          for hv_name, hv_dict in hvs.items():
1156
            if hv_dict is None:
1157
              # Delete if it exists
1158
              self.new_os_hvp[os_name].pop(hv_name, None)
1159
            elif hv_name not in self.new_os_hvp[os_name]:
1160
              self.new_os_hvp[os_name][hv_name] = hv_dict
1161
            else:
1162
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1163

    
1164
    # os parameters
1165
    self.new_osp = objects.FillDict(cluster.osparams, {})
1166
    if self.op.osparams:
1167
      for os_name, osp in self.op.osparams.items():
1168
        if os_name not in self.new_osp:
1169
          self.new_osp[os_name] = {}
1170

    
1171
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1172
                                                 use_none=True)
1173

    
1174
        if not self.new_osp[os_name]:
1175
          # we removed all parameters
1176
          del self.new_osp[os_name]
1177
        else:
1178
          # check the parameter validity (remote check)
1179
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1180
                        os_name, self.new_osp[os_name])
1181

    
1182
    # changes to the hypervisor list
1183
    if self.op.enabled_hypervisors is not None:
1184
      self.hv_list = self.op.enabled_hypervisors
1185
      for hv in self.hv_list:
1186
        # if the hypervisor doesn't already exist in the cluster
1187
        # hvparams, we initialize it to empty, and then (in both
1188
        # cases) we make sure to fill the defaults, as we might not
1189
        # have a complete defaults list if the hypervisor wasn't
1190
        # enabled before
1191
        if hv not in new_hvp:
1192
          new_hvp[hv] = {}
1193
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1194
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1195
    else:
1196
      self.hv_list = cluster.enabled_hypervisors
1197

    
1198
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1199
      # either the enabled list has changed, or the parameters have, validate
1200
      for hv_name, hv_params in self.new_hvparams.items():
1201
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1202
            (self.op.enabled_hypervisors and
1203
             hv_name in self.op.enabled_hypervisors)):
1204
          # either this is a new hypervisor, or its parameters have changed
1205
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1206
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1207
          hv_class.CheckParameterSyntax(hv_params)
1208
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1209

    
1210
    self._CheckDiskTemplateConsistency()
1211

    
1212
    if self.op.os_hvp:
1213
      # no need to check any newly-enabled hypervisors, since the
1214
      # defaults have already been checked in the above code-block
1215
      for os_name, os_hvp in self.new_os_hvp.items():
1216
        for hv_name, hv_params in os_hvp.items():
1217
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1218
          # we need to fill in the new os_hvp on top of the actual hv_p
1219
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1220
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1221
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1222
          hv_class.CheckParameterSyntax(new_osp)
1223
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1224

    
1225
    if self.op.default_iallocator:
1226
      alloc_script = utils.FindFile(self.op.default_iallocator,
1227
                                    constants.IALLOCATOR_SEARCH_PATH,
1228
                                    os.path.isfile)
1229
      if alloc_script is None:
1230
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1231
                                   " specified" % self.op.default_iallocator,
1232
                                   errors.ECODE_INVAL)
1233

    
1234
  def _CheckDiskTemplateConsistency(self):
1235
    """Check whether the disk templates that are going to be disabled
1236
       are still in use by some instances.
1237

1238
    """
1239
    if self.op.enabled_disk_templates:
1240
      cluster = self.cfg.GetClusterInfo()
1241
      instances = self.cfg.GetAllInstancesInfo()
1242

    
1243
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1244
        - set(self.op.enabled_disk_templates)
1245
      for instance in instances.itervalues():
1246
        if instance.disk_template in disk_templates_to_remove:
1247
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1248
                                     " because instance '%s' is using it." %
1249
                                     (instance.disk_template, instance.name))
1250

    
1251
  def _SetVgName(self, feedback_fn):
1252
    """Determines and sets the new volume group name.
1253

1254
    """
1255
    if self.op.vg_name is not None:
1256
      new_volume = self.op.vg_name
1257
      if not new_volume:
1258
        new_volume = None
1259
      if new_volume != self.cfg.GetVGName():
1260
        self.cfg.SetVGName(new_volume)
1261
      else:
1262
        feedback_fn("Cluster LVM configuration already in desired"
1263
                    " state, not changing")
1264

    
1265
  def _SetFileStorageDir(self, feedback_fn):
1266
    """Set the file storage directory.
1267

1268
    """
1269
    if self.op.file_storage_dir is not None:
1270
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1271
        feedback_fn("Global file storage dir already set to value '%s'"
1272
                    % self.cluster.file_storage_dir)
1273
      else:
1274
        self.cluster.file_storage_dir = self.op.file_storage_dir
1275

    
1276
  def _SetDrbdHelper(self, feedback_fn):
1277
    """Set the DRBD usermode helper.
1278

1279
    """
1280
    if self.op.drbd_helper is not None:
1281
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1282
        feedback_fn("Note that you specified a drbd user helper, but did not"
1283
                    " enable the drbd disk template.")
1284
      new_helper = self.op.drbd_helper
1285
      if not new_helper:
1286
        new_helper = None
1287
      if new_helper != self.cfg.GetDRBDHelper():
1288
        self.cfg.SetDRBDHelper(new_helper)
1289
      else:
1290
        feedback_fn("Cluster DRBD helper already in desired state,"
1291
                    " not changing")
1292

    
1293
  def Exec(self, feedback_fn):
1294
    """Change the parameters of the cluster.
1295

1296
    """
1297
    if self.op.enabled_disk_templates:
1298
      self.cluster.enabled_disk_templates = \
1299
        list(self.op.enabled_disk_templates)
1300

    
1301
    self._SetVgName(feedback_fn)
1302
    self._SetFileStorageDir(feedback_fn)
1303
    self._SetDrbdHelper(feedback_fn)
1304

    
1305
    if self.op.hvparams:
1306
      self.cluster.hvparams = self.new_hvparams
1307
    if self.op.os_hvp:
1308
      self.cluster.os_hvp = self.new_os_hvp
1309
    if self.op.enabled_hypervisors is not None:
1310
      self.cluster.hvparams = self.new_hvparams
1311
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1312
    if self.op.beparams:
1313
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1314
    if self.op.nicparams:
1315
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1316
    if self.op.ipolicy:
1317
      self.cluster.ipolicy = self.new_ipolicy
1318
    if self.op.osparams:
1319
      self.cluster.osparams = self.new_osp
1320
    if self.op.ndparams:
1321
      self.cluster.ndparams = self.new_ndparams
1322
    if self.op.diskparams:
1323
      self.cluster.diskparams = self.new_diskparams
1324
    if self.op.hv_state:
1325
      self.cluster.hv_state_static = self.new_hv_state
1326
    if self.op.disk_state:
1327
      self.cluster.disk_state_static = self.new_disk_state
1328

    
1329
    if self.op.candidate_pool_size is not None:
1330
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1331
      # we need to update the pool size here, otherwise the save will fail
1332
      AdjustCandidatePool(self, [], feedback_fn)
1333

    
1334
    if self.op.maintain_node_health is not None:
1335
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1336
        feedback_fn("Note: CONFD was disabled at build time, node health"
1337
                    " maintenance is not useful (still enabling it)")
1338
      self.cluster.maintain_node_health = self.op.maintain_node_health
1339

    
1340
    if self.op.modify_etc_hosts is not None:
1341
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1342

    
1343
    if self.op.prealloc_wipe_disks is not None:
1344
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1345

    
1346
    if self.op.add_uids is not None:
1347
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1348

    
1349
    if self.op.remove_uids is not None:
1350
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1351

    
1352
    if self.op.uid_pool is not None:
1353
      self.cluster.uid_pool = self.op.uid_pool
1354

    
1355
    if self.op.default_iallocator is not None:
1356
      self.cluster.default_iallocator = self.op.default_iallocator
1357

    
1358
    if self.op.default_iallocator_params is not None:
1359
      self.cluster.default_iallocator_params = self.op.default_iallocator_params
1360

    
1361
    if self.op.reserved_lvs is not None:
1362
      self.cluster.reserved_lvs = self.op.reserved_lvs
1363

    
1364
    if self.op.use_external_mip_script is not None:
1365
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1366

    
1367
    def helper_os(aname, mods, desc):
1368
      desc += " OS list"
1369
      lst = getattr(self.cluster, aname)
1370
      for key, val in mods:
1371
        if key == constants.DDM_ADD:
1372
          if val in lst:
1373
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1374
          else:
1375
            lst.append(val)
1376
        elif key == constants.DDM_REMOVE:
1377
          if val in lst:
1378
            lst.remove(val)
1379
          else:
1380
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1381
        else:
1382
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1383

    
1384
    if self.op.hidden_os:
1385
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1386

    
1387
    if self.op.blacklisted_os:
1388
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1389

    
1390
    if self.op.master_netdev:
1391
      master_params = self.cfg.GetMasterNetworkParameters()
1392
      ems = self.cfg.GetUseExternalMipScript()
1393
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1394
                  self.cluster.master_netdev)
1395
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1396
                                                       master_params, ems)
1397
      if not self.op.force:
1398
        result.Raise("Could not disable the master ip")
1399
      else:
1400
        if result.fail_msg:
1401
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1402
                 result.fail_msg)
1403
          feedback_fn(msg)
1404
      feedback_fn("Changing master_netdev from %s to %s" %
1405
                  (master_params.netdev, self.op.master_netdev))
1406
      self.cluster.master_netdev = self.op.master_netdev
1407

    
1408
    if self.op.master_netmask:
1409
      master_params = self.cfg.GetMasterNetworkParameters()
1410
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1411
      result = self.rpc.call_node_change_master_netmask(
1412
                 master_params.uuid, master_params.netmask,
1413
                 self.op.master_netmask, master_params.ip,
1414
                 master_params.netdev)
1415
      result.Warn("Could not change the master IP netmask", feedback_fn)
1416
      self.cluster.master_netmask = self.op.master_netmask
1417

    
1418
    self.cfg.Update(self.cluster, feedback_fn)
1419

    
1420
    if self.op.master_netdev:
1421
      master_params = self.cfg.GetMasterNetworkParameters()
1422
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1423
                  self.op.master_netdev)
1424
      ems = self.cfg.GetUseExternalMipScript()
1425
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1426
                                                     master_params, ems)
1427
      result.Warn("Could not re-enable the master ip on the master,"
1428
                  " please restart manually", self.LogWarning)
1429

    
1430

    
1431
class LUClusterVerify(NoHooksLU):
1432
  """Submits all jobs necessary to verify the cluster.
1433

1434
  """
1435
  REQ_BGL = False
1436

    
1437
  def ExpandNames(self):
1438
    self.needed_locks = {}
1439

    
1440
  def Exec(self, feedback_fn):
1441
    jobs = []
1442

    
1443
    if self.op.group_name:
1444
      groups = [self.op.group_name]
1445
      depends_fn = lambda: None
1446
    else:
1447
      groups = self.cfg.GetNodeGroupList()
1448

    
1449
      # Verify global configuration
1450
      jobs.append([
1451
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1452
        ])
1453

    
1454
      # Always depend on global verification
1455
      depends_fn = lambda: [(-len(jobs), [])]
1456

    
1457
    jobs.extend(
1458
      [opcodes.OpClusterVerifyGroup(group_name=group,
1459
                                    ignore_errors=self.op.ignore_errors,
1460
                                    depends=depends_fn())]
1461
      for group in groups)
1462

    
1463
    # Fix up all parameters
1464
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1465
      op.debug_simulate_errors = self.op.debug_simulate_errors
1466
      op.verbose = self.op.verbose
1467
      op.error_codes = self.op.error_codes
1468
      try:
1469
        op.skip_checks = self.op.skip_checks
1470
      except AttributeError:
1471
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1472

    
1473
    return ResultWithJobs(jobs)
1474

    
1475

    
1476
class _VerifyErrors(object):
1477
  """Mix-in for cluster/group verify LUs.
1478

1479
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1480
  self.op and self._feedback_fn to be available.)
1481

1482
  """
1483

    
1484
  ETYPE_FIELD = "code"
1485
  ETYPE_ERROR = constants.CV_ERROR
1486
  ETYPE_WARNING = constants.CV_WARNING
1487

    
1488
  def _Error(self, ecode, item, msg, *args, **kwargs):
1489
    """Format an error message.
1490

1491
    Based on the opcode's error_codes parameter, either format a
1492
    parseable error code, or a simpler error string.
1493

1494
    This must be called only from Exec and functions called from Exec.
1495

1496
    """
1497
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1498
    itype, etxt, _ = ecode
1499
    # If the error code is in the list of ignored errors, demote the error to a
1500
    # warning
1501
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1502
      ltype = self.ETYPE_WARNING
1503
    # first complete the msg
1504
    if args:
1505
      msg = msg % args
1506
    # then format the whole message
1507
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1508
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1509
    else:
1510
      if item:
1511
        item = " " + item
1512
      else:
1513
        item = ""
1514
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1515
    # and finally report it via the feedback_fn
1516
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1517
    # do not mark the operation as failed for WARN cases only
1518
    if ltype == self.ETYPE_ERROR:
1519
      self.bad = True
1520

    
1521
  def _ErrorIf(self, cond, *args, **kwargs):
1522
    """Log an error message if the passed condition is True.
1523

1524
    """
1525
    if (bool(cond)
1526
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1527
      self._Error(*args, **kwargs)
1528

    
1529

    
1530
def _GetAllHypervisorParameters(cluster, instances):
1531
  """Compute the set of all hypervisor parameters.
1532

1533
  @type cluster: L{objects.Cluster}
1534
  @param cluster: the cluster object
1535
  @param instances: list of L{objects.Instance}
1536
  @param instances: additional instances from which to obtain parameters
1537
  @rtype: list of (origin, hypervisor, parameters)
1538
  @return: a list with all parameters found, indicating the hypervisor they
1539
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1540

1541
  """
1542
  hvp_data = []
1543

    
1544
  for hv_name in cluster.enabled_hypervisors:
1545
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1546

    
1547
  for os_name, os_hvp in cluster.os_hvp.items():
1548
    for hv_name, hv_params in os_hvp.items():
1549
      if hv_params:
1550
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1551
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1552

    
1553
  # TODO: collapse identical parameter values in a single one
1554
  for instance in instances:
1555
    if instance.hvparams:
1556
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1557
                       cluster.FillHV(instance)))
1558

    
1559
  return hvp_data
1560

    
1561

    
1562
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1563
  """Verifies the cluster config.
1564

1565
  """
1566
  REQ_BGL = False
1567

    
1568
  def _VerifyHVP(self, hvp_data):
1569
    """Verifies locally the syntax of the hypervisor parameters.
1570

1571
    """
1572
    for item, hv_name, hv_params in hvp_data:
1573
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1574
             (item, hv_name))
1575
      try:
1576
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1577
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1578
        hv_class.CheckParameterSyntax(hv_params)
1579
      except errors.GenericError, err:
1580
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1581

    
1582
  def ExpandNames(self):
1583
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1584
    self.share_locks = ShareAll()
1585

    
1586
  def CheckPrereq(self):
1587
    """Check prerequisites.
1588

1589
    """
1590
    # Retrieve all information
1591
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1592
    self.all_node_info = self.cfg.GetAllNodesInfo()
1593
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1594

    
1595
  def Exec(self, feedback_fn):
1596
    """Verify integrity of cluster, performing various test on nodes.
1597

1598
    """
1599
    self.bad = False
1600
    self._feedback_fn = feedback_fn
1601

    
1602
    feedback_fn("* Verifying cluster config")
1603

    
1604
    for msg in self.cfg.VerifyConfig():
1605
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1606

    
1607
    feedback_fn("* Verifying cluster certificate files")
1608

    
1609
    for cert_filename in pathutils.ALL_CERT_FILES:
1610
      (errcode, msg) = utils.VerifyCertificate(cert_filename)
1611
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1612

    
1613
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1614
                                    pathutils.NODED_CERT_FILE),
1615
                  constants.CV_ECLUSTERCERT,
1616
                  None,
1617
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1618
                    constants.LUXID_USER + " user")
1619

    
1620
    feedback_fn("* Verifying hypervisor parameters")
1621

    
1622
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1623
                                                self.all_inst_info.values()))
1624

    
1625
    feedback_fn("* Verifying all nodes belong to an existing group")
1626

    
1627
    # We do this verification here because, should this bogus circumstance
1628
    # occur, it would never be caught by VerifyGroup, which only acts on
1629
    # nodes/instances reachable from existing node groups.
1630

    
1631
    dangling_nodes = set(node for node in self.all_node_info.values()
1632
                         if node.group not in self.all_group_info)
1633

    
1634
    dangling_instances = {}
1635
    no_node_instances = []
1636

    
1637
    for inst in self.all_inst_info.values():
1638
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1639
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1640
      elif inst.primary_node not in self.all_node_info:
1641
        no_node_instances.append(inst)
1642

    
1643
    pretty_dangling = [
1644
        "%s (%s)" %
1645
        (node.name,
1646
         utils.CommaJoin(inst.name for
1647
                         inst in dangling_instances.get(node.uuid, [])))
1648
        for node in dangling_nodes]
1649

    
1650
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1651
                  None,
1652
                  "the following nodes (and their instances) belong to a non"
1653
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1654

    
1655
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1656
                  None,
1657
                  "the following instances have a non-existing primary-node:"
1658
                  " %s", utils.CommaJoin(inst.name for
1659
                                         inst in no_node_instances))
1660

    
1661
    return not self.bad
1662

    
1663

    
1664
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1665
  """Verifies the status of a node group.
1666

1667
  """
1668
  HPATH = "cluster-verify"
1669
  HTYPE = constants.HTYPE_CLUSTER
1670
  REQ_BGL = False
1671

    
1672
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1673

    
1674
  class NodeImage(object):
1675
    """A class representing the logical and physical status of a node.
1676

1677
    @type uuid: string
1678
    @ivar uuid: the node UUID to which this object refers
1679
    @ivar volumes: a structure as returned from
1680
        L{ganeti.backend.GetVolumeList} (runtime)
1681
    @ivar instances: a list of running instances (runtime)
1682
    @ivar pinst: list of configured primary instances (config)
1683
    @ivar sinst: list of configured secondary instances (config)
1684
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1685
        instances for which this node is secondary (config)
1686
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1687
    @ivar dfree: free disk, as reported by the node (runtime)
1688
    @ivar offline: the offline status (config)
1689
    @type rpc_fail: boolean
1690
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1691
        not whether the individual keys were correct) (runtime)
1692
    @type lvm_fail: boolean
1693
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1694
    @type hyp_fail: boolean
1695
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1696
    @type ghost: boolean
1697
    @ivar ghost: whether this is a known node or not (config)
1698
    @type os_fail: boolean
1699
    @ivar os_fail: whether the RPC call didn't return valid OS data
1700
    @type oslist: list
1701
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1702
    @type vm_capable: boolean
1703
    @ivar vm_capable: whether the node can host instances
1704
    @type pv_min: float
1705
    @ivar pv_min: size in MiB of the smallest PVs
1706
    @type pv_max: float
1707
    @ivar pv_max: size in MiB of the biggest PVs
1708

1709
    """
1710
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1711
      self.uuid = uuid
1712
      self.volumes = {}
1713
      self.instances = []
1714
      self.pinst = []
1715
      self.sinst = []
1716
      self.sbp = {}
1717
      self.mfree = 0
1718
      self.dfree = 0
1719
      self.offline = offline
1720
      self.vm_capable = vm_capable
1721
      self.rpc_fail = False
1722
      self.lvm_fail = False
1723
      self.hyp_fail = False
1724
      self.ghost = False
1725
      self.os_fail = False
1726
      self.oslist = {}
1727
      self.pv_min = None
1728
      self.pv_max = None
1729

    
1730
  def ExpandNames(self):
1731
    # This raises errors.OpPrereqError on its own:
1732
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1733

    
1734
    # Get instances in node group; this is unsafe and needs verification later
1735
    inst_uuids = \
1736
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1737

    
1738
    self.needed_locks = {
1739
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1740
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1741
      locking.LEVEL_NODE: [],
1742

    
1743
      # This opcode is run by watcher every five minutes and acquires all nodes
1744
      # for a group. It doesn't run for a long time, so it's better to acquire
1745
      # the node allocation lock as well.
1746
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1747
      }
1748

    
1749
    self.share_locks = ShareAll()
1750

    
1751
  def DeclareLocks(self, level):
1752
    if level == locking.LEVEL_NODE:
1753
      # Get members of node group; this is unsafe and needs verification later
1754
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1755

    
1756
      # In Exec(), we warn about mirrored instances that have primary and
1757
      # secondary living in separate node groups. To fully verify that
1758
      # volumes for these instances are healthy, we will need to do an
1759
      # extra call to their secondaries. We ensure here those nodes will
1760
      # be locked.
1761
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1762
        # Important: access only the instances whose lock is owned
1763
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1764
        if instance.disk_template in constants.DTS_INT_MIRROR:
1765
          nodes.update(instance.secondary_nodes)
1766

    
1767
      self.needed_locks[locking.LEVEL_NODE] = nodes
1768

    
1769
  def CheckPrereq(self):
1770
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1771
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1772

    
1773
    group_node_uuids = set(self.group_info.members)
1774
    group_inst_uuids = \
1775
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1776

    
1777
    unlocked_node_uuids = \
1778
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1779

    
1780
    unlocked_inst_uuids = \
1781
        group_inst_uuids.difference(
1782
          [self.cfg.GetInstanceInfoByName(name).uuid
1783
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1784

    
1785
    if unlocked_node_uuids:
1786
      raise errors.OpPrereqError(
1787
        "Missing lock for nodes: %s" %
1788
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1789
        errors.ECODE_STATE)
1790

    
1791
    if unlocked_inst_uuids:
1792
      raise errors.OpPrereqError(
1793
        "Missing lock for instances: %s" %
1794
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1795
        errors.ECODE_STATE)
1796

    
1797
    self.all_node_info = self.cfg.GetAllNodesInfo()
1798
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1799

    
1800
    self.my_node_uuids = group_node_uuids
1801
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1802
                             for node_uuid in group_node_uuids)
1803

    
1804
    self.my_inst_uuids = group_inst_uuids
1805
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1806
                             for inst_uuid in group_inst_uuids)
1807

    
1808
    # We detect here the nodes that will need the extra RPC calls for verifying
1809
    # split LV volumes; they should be locked.
1810
    extra_lv_nodes = set()
1811

    
1812
    for inst in self.my_inst_info.values():
1813
      if inst.disk_template in constants.DTS_INT_MIRROR:
1814
        for nuuid in inst.all_nodes:
1815
          if self.all_node_info[nuuid].group != self.group_uuid:
1816
            extra_lv_nodes.add(nuuid)
1817

    
1818
    unlocked_lv_nodes = \
1819
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1820

    
1821
    if unlocked_lv_nodes:
1822
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1823
                                 utils.CommaJoin(unlocked_lv_nodes),
1824
                                 errors.ECODE_STATE)
1825
    self.extra_lv_nodes = list(extra_lv_nodes)
1826

    
1827
  def _VerifyNode(self, ninfo, nresult):
1828
    """Perform some basic validation on data returned from a node.
1829

1830
      - check the result data structure is well formed and has all the
1831
        mandatory fields
1832
      - check ganeti version
1833

1834
    @type ninfo: L{objects.Node}
1835
    @param ninfo: the node to check
1836
    @param nresult: the results from the node
1837
    @rtype: boolean
1838
    @return: whether overall this call was successful (and we can expect
1839
         reasonable values in the respose)
1840

1841
    """
1842
    # main result, nresult should be a non-empty dict
1843
    test = not nresult or not isinstance(nresult, dict)
1844
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1845
                  "unable to verify node: no data returned")
1846
    if test:
1847
      return False
1848

    
1849
    # compares ganeti version
1850
    local_version = constants.PROTOCOL_VERSION
1851
    remote_version = nresult.get("version", None)
1852
    test = not (remote_version and
1853
                isinstance(remote_version, (list, tuple)) and
1854
                len(remote_version) == 2)
1855
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1856
                  "connection to node returned invalid data")
1857
    if test:
1858
      return False
1859

    
1860
    test = local_version != remote_version[0]
1861
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1862
                  "incompatible protocol versions: master %s,"
1863
                  " node %s", local_version, remote_version[0])
1864
    if test:
1865
      return False
1866

    
1867
    # node seems compatible, we can actually try to look into its results
1868

    
1869
    # full package version
1870
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1871
                  constants.CV_ENODEVERSION, ninfo.name,
1872
                  "software version mismatch: master %s, node %s",
1873
                  constants.RELEASE_VERSION, remote_version[1],
1874
                  code=self.ETYPE_WARNING)
1875

    
1876
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1877
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1878
      for hv_name, hv_result in hyp_result.iteritems():
1879
        test = hv_result is not None
1880
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1881
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1882

    
1883
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1884
    if ninfo.vm_capable and isinstance(hvp_result, list):
1885
      for item, hv_name, hv_result in hvp_result:
1886
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1887
                      "hypervisor %s parameter verify failure (source %s): %s",
1888
                      hv_name, item, hv_result)
1889

    
1890
    test = nresult.get(constants.NV_NODESETUP,
1891
                       ["Missing NODESETUP results"])
1892
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1893
                  "node setup error: %s", "; ".join(test))
1894

    
1895
    return True
1896

    
1897
  def _VerifyNodeTime(self, ninfo, nresult,
1898
                      nvinfo_starttime, nvinfo_endtime):
1899
    """Check the node time.
1900

1901
    @type ninfo: L{objects.Node}
1902
    @param ninfo: the node to check
1903
    @param nresult: the remote results for the node
1904
    @param nvinfo_starttime: the start time of the RPC call
1905
    @param nvinfo_endtime: the end time of the RPC call
1906

1907
    """
1908
    ntime = nresult.get(constants.NV_TIME, None)
1909
    try:
1910
      ntime_merged = utils.MergeTime(ntime)
1911
    except (ValueError, TypeError):
1912
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1913
                    "Node returned invalid time")
1914
      return
1915

    
1916
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1917
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1918
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1919
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1920
    else:
1921
      ntime_diff = None
1922

    
1923
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1924
                  "Node time diverges by at least %s from master node time",
1925
                  ntime_diff)
1926

    
1927
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1928
    """Check the node LVM results and update info for cross-node checks.
1929

1930
    @type ninfo: L{objects.Node}
1931
    @param ninfo: the node to check
1932
    @param nresult: the remote results for the node
1933
    @param vg_name: the configured VG name
1934
    @type nimg: L{NodeImage}
1935
    @param nimg: node image
1936

1937
    """
1938
    if vg_name is None:
1939
      return
1940

    
1941
    # checks vg existence and size > 20G
1942
    vglist = nresult.get(constants.NV_VGLIST, None)
1943
    test = not vglist
1944
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1945
                  "unable to check volume groups")
1946
    if not test:
1947
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1948
                                            constants.MIN_VG_SIZE)
1949
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1950

    
1951
    # Check PVs
1952
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1953
    for em in errmsgs:
1954
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1955
    if pvminmax is not None:
1956
      (nimg.pv_min, nimg.pv_max) = pvminmax
1957

    
1958
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1959
    """Check cross-node DRBD version consistency.
1960

1961
    @type node_verify_infos: dict
1962
    @param node_verify_infos: infos about nodes as returned from the
1963
      node_verify call.
1964

1965
    """
1966
    node_versions = {}
1967
    for node_uuid, ndata in node_verify_infos.items():
1968
      nresult = ndata.payload
1969
      if nresult:
1970
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1971
        node_versions[node_uuid] = version
1972

    
1973
    if len(set(node_versions.values())) > 1:
1974
      for node_uuid, version in sorted(node_versions.items()):
1975
        msg = "DRBD version mismatch: %s" % version
1976
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1977
                    code=self.ETYPE_WARNING)
1978

    
1979
  def _VerifyGroupLVM(self, node_image, vg_name):
1980
    """Check cross-node consistency in LVM.
1981

1982
    @type node_image: dict
1983
    @param node_image: info about nodes, mapping from node to names to
1984
      L{NodeImage} objects
1985
    @param vg_name: the configured VG name
1986

1987
    """
1988
    if vg_name is None:
1989
      return
1990

    
1991
    # Only exclusive storage needs this kind of checks
1992
    if not self._exclusive_storage:
1993
      return
1994

    
1995
    # exclusive_storage wants all PVs to have the same size (approximately),
1996
    # if the smallest and the biggest ones are okay, everything is fine.
1997
    # pv_min is None iff pv_max is None
1998
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1999
    if not vals:
2000
      return
2001
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
2002
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
2003
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
2004
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
2005
                  "PV sizes differ too much in the group; smallest (%s MB) is"
2006
                  " on %s, biggest (%s MB) is on %s",
2007
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
2008
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
2009

    
2010
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
2011
    """Check the node bridges.
2012

2013
    @type ninfo: L{objects.Node}
2014
    @param ninfo: the node to check
2015
    @param nresult: the remote results for the node
2016
    @param bridges: the expected list of bridges
2017

2018
    """
2019
    if not bridges:
2020
      return
2021

    
2022
    missing = nresult.get(constants.NV_BRIDGES, None)
2023
    test = not isinstance(missing, list)
2024
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2025
                  "did not return valid bridge information")
2026
    if not test:
2027
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
2028
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
2029

    
2030
  def _VerifyNodeUserScripts(self, ninfo, nresult):
2031
    """Check the results of user scripts presence and executability on the node
2032

2033
    @type ninfo: L{objects.Node}
2034
    @param ninfo: the node to check
2035
    @param nresult: the remote results for the node
2036

2037
    """
2038
    test = not constants.NV_USERSCRIPTS in nresult
2039
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2040
                  "did not return user scripts information")
2041

    
2042
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2043
    if not test:
2044
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2045
                    "user scripts not present or not executable: %s" %
2046
                    utils.CommaJoin(sorted(broken_scripts)))
2047

    
2048
  def _VerifyNodeNetwork(self, ninfo, nresult):
2049
    """Check the node network connectivity results.
2050

2051
    @type ninfo: L{objects.Node}
2052
    @param ninfo: the node to check
2053
    @param nresult: the remote results for the node
2054

2055
    """
2056
    test = constants.NV_NODELIST not in nresult
2057
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2058
                  "node hasn't returned node ssh connectivity data")
2059
    if not test:
2060
      if nresult[constants.NV_NODELIST]:
2061
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2062
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2063
                        "ssh communication with node '%s': %s", a_node, a_msg)
2064

    
2065
    test = constants.NV_NODENETTEST not in nresult
2066
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2067
                  "node hasn't returned node tcp connectivity data")
2068
    if not test:
2069
      if nresult[constants.NV_NODENETTEST]:
2070
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2071
        for anode in nlist:
2072
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2073
                        "tcp communication with node '%s': %s",
2074
                        anode, nresult[constants.NV_NODENETTEST][anode])
2075

    
2076
    test = constants.NV_MASTERIP not in nresult
2077
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2078
                  "node hasn't returned node master IP reachability data")
2079
    if not test:
2080
      if not nresult[constants.NV_MASTERIP]:
2081
        if ninfo.uuid == self.master_node:
2082
          msg = "the master node cannot reach the master IP (not configured?)"
2083
        else:
2084
          msg = "cannot reach the master IP"
2085
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2086

    
2087
  def _VerifyInstance(self, instance, node_image, diskstatus):
2088
    """Verify an instance.
2089

2090
    This function checks to see if the required block devices are
2091
    available on the instance's node, and that the nodes are in the correct
2092
    state.
2093

2094
    """
2095
    pnode_uuid = instance.primary_node
2096
    pnode_img = node_image[pnode_uuid]
2097
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2098

    
2099
    node_vol_should = {}
2100
    instance.MapLVsByNode(node_vol_should)
2101

    
2102
    cluster = self.cfg.GetClusterInfo()
2103
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2104
                                                            self.group_info)
2105
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2106
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2107
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2108

    
2109
    for node_uuid in node_vol_should:
2110
      n_img = node_image[node_uuid]
2111
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2112
        # ignore missing volumes on offline or broken nodes
2113
        continue
2114
      for volume in node_vol_should[node_uuid]:
2115
        test = volume not in n_img.volumes
2116
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2117
                      "volume %s missing on node %s", volume,
2118
                      self.cfg.GetNodeName(node_uuid))
2119

    
2120
    if instance.admin_state == constants.ADMINST_UP:
2121
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2122
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2123
                    "instance not running on its primary node %s",
2124
                     self.cfg.GetNodeName(pnode_uuid))
2125
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2126
                    instance.name, "instance is marked as running and lives on"
2127
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2128

    
2129
    diskdata = [(nname, success, status, idx)
2130
                for (nname, disks) in diskstatus.items()
2131
                for idx, (success, status) in enumerate(disks)]
2132

    
2133
    for nname, success, bdev_status, idx in diskdata:
2134
      # the 'ghost node' construction in Exec() ensures that we have a
2135
      # node here
2136
      snode = node_image[nname]
2137
      bad_snode = snode.ghost or snode.offline
2138
      self._ErrorIf(instance.disks_active and
2139
                    not success and not bad_snode,
2140
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2141
                    "couldn't retrieve status for disk/%s on %s: %s",
2142
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2143

    
2144
      if instance.disks_active and success and \
2145
         (bdev_status.is_degraded or
2146
          bdev_status.ldisk_status != constants.LDS_OKAY):
2147
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2148
        if bdev_status.is_degraded:
2149
          msg += " is degraded"
2150
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2151
          msg += "; state is '%s'" % \
2152
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2153

    
2154
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2155

    
2156
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2157
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2158
                  "instance %s, connection to primary node failed",
2159
                  instance.name)
2160

    
2161
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2162
                  constants.CV_EINSTANCELAYOUT, instance.name,
2163
                  "instance has multiple secondary nodes: %s",
2164
                  utils.CommaJoin(instance.secondary_nodes),
2165
                  code=self.ETYPE_WARNING)
2166

    
2167
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2168
    if any(es_flags.values()):
2169
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2170
        # Disk template not compatible with exclusive_storage: no instance
2171
        # node should have the flag set
2172
        es_nodes = [n
2173
                    for (n, es) in es_flags.items()
2174
                    if es]
2175
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2176
                    "instance has template %s, which is not supported on nodes"
2177
                    " that have exclusive storage set: %s",
2178
                    instance.disk_template,
2179
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2180
      for (idx, disk) in enumerate(instance.disks):
2181
        self._ErrorIf(disk.spindles is None,
2182
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2183
                      "number of spindles not configured for disk %s while"
2184
                      " exclusive storage is enabled, try running"
2185
                      " gnt-cluster repair-disk-sizes", idx)
2186

    
2187
    if instance.disk_template in constants.DTS_INT_MIRROR:
2188
      instance_nodes = utils.NiceSort(instance.all_nodes)
2189
      instance_groups = {}
2190

    
2191
      for node_uuid in instance_nodes:
2192
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2193
                                   []).append(node_uuid)
2194

    
2195
      pretty_list = [
2196
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2197
                           groupinfo[group].name)
2198
        # Sort so that we always list the primary node first.
2199
        for group, nodes in sorted(instance_groups.items(),
2200
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2201
                                   reverse=True)]
2202

    
2203
      self._ErrorIf(len(instance_groups) > 1,
2204
                    constants.CV_EINSTANCESPLITGROUPS,
2205
                    instance.name, "instance has primary and secondary nodes in"
2206
                    " different groups: %s", utils.CommaJoin(pretty_list),
2207
                    code=self.ETYPE_WARNING)
2208

    
2209
    inst_nodes_offline = []
2210
    for snode in instance.secondary_nodes:
2211
      s_img = node_image[snode]
2212
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2213
                    self.cfg.GetNodeName(snode),
2214
                    "instance %s, connection to secondary node failed",
2215
                    instance.name)
2216

    
2217
      if s_img.offline:
2218
        inst_nodes_offline.append(snode)
2219

    
2220
    # warn that the instance lives on offline nodes
2221
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2222
                  instance.name, "instance has offline secondary node(s) %s",
2223
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2224
    # ... or ghost/non-vm_capable nodes
2225
    for node_uuid in instance.all_nodes:
2226
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2227
                    instance.name, "instance lives on ghost node %s",
2228
                    self.cfg.GetNodeName(node_uuid))
2229
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2230
                    constants.CV_EINSTANCEBADNODE, instance.name,
2231
                    "instance lives on non-vm_capable node %s",
2232
                    self.cfg.GetNodeName(node_uuid))
2233

    
2234
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2235
    """Verify if there are any unknown volumes in the cluster.
2236

2237
    The .os, .swap and backup volumes are ignored. All other volumes are
2238
    reported as unknown.
2239

2240
    @type reserved: L{ganeti.utils.FieldSet}
2241
    @param reserved: a FieldSet of reserved volume names
2242

2243
    """
2244
    for node_uuid, n_img in node_image.items():
2245
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2246
          self.all_node_info[node_uuid].group != self.group_uuid):
2247
        # skip non-healthy nodes
2248
        continue
2249
      for volume in n_img.volumes:
2250
        test = ((node_uuid not in node_vol_should or
2251
                volume not in node_vol_should[node_uuid]) and
2252
                not reserved.Matches(volume))
2253
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2254
                      self.cfg.GetNodeName(node_uuid),
2255
                      "volume %s is unknown", volume,
2256
                      code=_VerifyErrors.ETYPE_WARNING)
2257

    
2258
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2259
    """Verify N+1 Memory Resilience.
2260

2261
    Check that if one single node dies we can still start all the
2262
    instances it was primary for.
2263

2264
    """
2265
    cluster_info = self.cfg.GetClusterInfo()
2266
    for node_uuid, n_img in node_image.items():
2267
      # This code checks that every node which is now listed as
2268
      # secondary has enough memory to host all instances it is
2269
      # supposed to should a single other node in the cluster fail.
2270
      # FIXME: not ready for failover to an arbitrary node
2271
      # FIXME: does not support file-backed instances
2272
      # WARNING: we currently take into account down instances as well
2273
      # as up ones, considering that even if they're down someone
2274
      # might want to start them even in the event of a node failure.
2275
      if n_img.offline or \
2276
         self.all_node_info[node_uuid].group != self.group_uuid:
2277
        # we're skipping nodes marked offline and nodes in other groups from
2278
        # the N+1 warning, since most likely we don't have good memory
2279
        # information from them; we already list instances living on such
2280
        # nodes, and that's enough warning
2281
        continue
2282
      #TODO(dynmem): also consider ballooning out other instances
2283
      for prinode, inst_uuids in n_img.sbp.items():
2284
        needed_mem = 0
2285
        for inst_uuid in inst_uuids:
2286
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2287
          if bep[constants.BE_AUTO_BALANCE]:
2288
            needed_mem += bep[constants.BE_MINMEM]
2289
        test = n_img.mfree < needed_mem
2290
        self._ErrorIf(test, constants.CV_ENODEN1,
2291
                      self.cfg.GetNodeName(node_uuid),
2292
                      "not enough memory to accomodate instance failovers"
2293
                      " should node %s fail (%dMiB needed, %dMiB available)",
2294
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2295

    
2296
  def _VerifyClientCertificates(self, nodes, all_nvinfo):
2297
    """Verifies the consistency of the client certificates.
2298

2299
    This includes several aspects:
2300
      - the individual validation of all nodes' certificates
2301
      - the consistency of the master candidate certificate map
2302
      - the consistency of the master candidate certificate map with the
2303
        certificates that the master candidates are actually using.
2304

2305
    @param nodes: the list of nodes to consider in this verification
2306
    @param all_nvinfo: the map of results of the verify_node call to
2307
      all nodes
2308

2309
    """
2310
    candidate_certs = self.cfg.GetClusterInfo().candidate_certs
2311
    if candidate_certs is None or len(candidate_certs) == 0:
2312
      self._ErrorIf(
2313
        True, constants.CV_ECLUSTERCLIENTCERT, None,
2314
        "The cluster's list of master candidate certificates is empty."
2315
        "If you just updated the cluster, please run"
2316
        " 'gnt-cluster renew-crypto --new-node-certificates'.")
2317
      return
2318

    
2319
    self._ErrorIf(
2320
      len(candidate_certs) != len(set(candidate_certs.values())),
2321
      constants.CV_ECLUSTERCLIENTCERT, None,
2322
      "There are at least two master candidates configured to use the same"
2323
      " certificate.")
2324

    
2325
    # collect the client certificate
2326
    for node in nodes:
2327
      if node.offline:
2328
        continue
2329

    
2330
      nresult = all_nvinfo[node.uuid]
2331
      if nresult.fail_msg or not nresult.payload:
2332
        continue
2333

    
2334
      (errcode, msg) = nresult.payload.get(constants.NV_CLIENT_CERT, None)
2335

    
2336
      self._ErrorIf(
2337
        errcode is not None, constants.CV_ECLUSTERCLIENTCERT, None,
2338
        "Client certificate of node '%s' failed validation: %s (code '%s')",
2339
        node.uuid, msg, errcode)
2340

    
2341
      if not errcode:
2342
        digest = msg
2343
        if node.master_candidate:
2344
          if node.uuid in candidate_certs:
2345
            self._ErrorIf(
2346
              digest != candidate_certs[node.uuid],
2347
              constants.CV_ECLUSTERCLIENTCERT, None,
2348
              "Client certificate digest of master candidate '%s' does not"
2349
              " match its entry in the cluster's map of master candidate"
2350
              " certificates. Expected: %s Got: %s", node.uuid,
2351
              digest, candidate_certs[node.uuid])
2352
          else:
2353
            self._ErrorIf(
2354
              True, constants.CV_ECLUSTERCLIENTCERT, None,
2355
              "The master candidate '%s' does not have an entry in the"
2356
              " map of candidate certificates.", node.uuid)
2357
            self._ErrorIf(
2358
              digest in candidate_certs.values(),
2359
              constants.CV_ECLUSTERCLIENTCERT, None,
2360
              "Master candidate '%s' is using a certificate of another node.",
2361
              node.uuid)
2362
        else:
2363
          self._ErrorIf(
2364
            node.uuid in candidate_certs,
2365
            constants.CV_ECLUSTERCLIENTCERT, None,
2366
            "Node '%s' is not a master candidate, but still listed in the"
2367
            " map of master candidate certificates.", node.uuid)
2368
          self._ErrorIf(
2369
            (node.uuid not in candidate_certs) and
2370
              (digest in candidate_certs.values()),
2371
            constants.CV_ECLUSTERCLIENTCERT, None,
2372
            "Node '%s' is not a master candidate and is incorrectly using a"
2373
            " certificate of another node which is master candidate.",
2374
            node.uuid)
2375

    
2376
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2377
                   (files_all, files_opt, files_mc, files_vm)):
2378
    """Verifies file checksums collected from all nodes.
2379

2380
    @param nodes: List of L{objects.Node} objects
2381
    @param master_node_uuid: UUID of master node
2382
    @param all_nvinfo: RPC results
2383

2384
    """
2385
    # Define functions determining which nodes to consider for a file
2386
    files2nodefn = [
2387
      (files_all, None),
2388
      (files_mc, lambda node: (node.master_candidate or
2389
                               node.uuid == master_node_uuid)),
2390
      (files_vm, lambda node: node.vm_capable),
2391
      ]
2392

    
2393
    # Build mapping from filename to list of nodes which should have the file
2394
    nodefiles = {}
2395
    for (files, fn) in files2nodefn:
2396
      if fn is None:
2397
        filenodes = nodes
2398
      else:
2399
        filenodes = filter(fn, nodes)
2400
      nodefiles.update((filename,
2401
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2402
                       for filename in files)
2403

    
2404
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2405

    
2406
    fileinfo = dict((filename, {}) for filename in nodefiles)
2407
    ignore_nodes = set()
2408

    
2409
    for node in nodes:
2410
      if node.offline:
2411
        ignore_nodes.add(node.uuid)
2412
        continue
2413

    
2414
      nresult = all_nvinfo[node.uuid]
2415

    
2416
      if nresult.fail_msg or not nresult.payload:
2417
        node_files = None
2418
      else:
2419
        fingerprints = nresult.payload.get(constants.NV_FILELIST, {})
2420
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2421
                          for (key, value) in fingerprints.items())
2422
        del fingerprints
2423

    
2424
      test = not (node_files and isinstance(node_files, dict))
2425
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2426
                    "Node did not return file checksum data")
2427
      if test:
2428
        ignore_nodes.add(node.uuid)
2429
        continue
2430

    
2431
      # Build per-checksum mapping from filename to nodes having it
2432
      for (filename, checksum) in node_files.items():
2433
        assert filename in nodefiles
2434
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2435

    
2436
    for (filename, checksums) in fileinfo.items():
2437
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2438

    
2439
      # Nodes having the file
2440
      with_file = frozenset(node_uuid
2441
                            for node_uuids in fileinfo[filename].values()
2442
                            for node_uuid in node_uuids) - ignore_nodes
2443

    
2444
      expected_nodes = nodefiles[filename] - ignore_nodes
2445

    
2446
      # Nodes missing file
2447
      missing_file = expected_nodes - with_file
2448

    
2449
      if filename in files_opt:
2450
        # All or no nodes
2451
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2452
                      constants.CV_ECLUSTERFILECHECK, None,
2453
                      "File %s is optional, but it must exist on all or no"
2454
                      " nodes (not found on %s)",
2455
                      filename,
2456
                      utils.CommaJoin(
2457
                        utils.NiceSort(
2458
                          map(self.cfg.GetNodeName, missing_file))))
2459
      else:
2460
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2461
                      "File %s is missing from node(s) %s", filename,
2462
                      utils.CommaJoin(
2463
                        utils.NiceSort(
2464
                          map(self.cfg.GetNodeName, missing_file))))
2465

    
2466
        # Warn if a node has a file it shouldn't
2467
        unexpected = with_file - expected_nodes
2468
        self._ErrorIf(unexpected,
2469
                      constants.CV_ECLUSTERFILECHECK, None,
2470
                      "File %s should not exist on node(s) %s",
2471
                      filename, utils.CommaJoin(
2472
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2473

    
2474
      # See if there are multiple versions of the file
2475
      test = len(checksums) > 1
2476
      if test:
2477
        variants = ["variant %s on %s" %
2478
                    (idx + 1,
2479
                     utils.CommaJoin(utils.NiceSort(
2480
                       map(self.cfg.GetNodeName, node_uuids))))
2481
                    for (idx, (checksum, node_uuids)) in
2482
                      enumerate(sorted(checksums.items()))]
2483
      else:
2484
        variants = []
2485

    
2486
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2487
                    "File %s found with %s different checksums (%s)",
2488
                    filename, len(checksums), "; ".join(variants))
2489

    
2490
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2491
    """Verify the drbd helper.
2492

2493
    """
2494
    if drbd_helper:
2495
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2496
      test = (helper_result is None)
2497
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2498
                    "no drbd usermode helper returned")
2499
      if helper_result:
2500
        status, payload = helper_result
2501
        test = not status
2502
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2503
                      "drbd usermode helper check unsuccessful: %s", payload)
2504
        test = status and (payload != drbd_helper)
2505
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2506
                      "wrong drbd usermode helper: %s", payload)
2507

    
2508
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2509
                      drbd_map):
2510
    """Verifies and the node DRBD status.
2511

2512
    @type ninfo: L{objects.Node}
2513
    @param ninfo: the node to check
2514
    @param nresult: the remote results for the node
2515
    @param instanceinfo: the dict of instances
2516
    @param drbd_helper: the configured DRBD usermode helper
2517
    @param drbd_map: the DRBD map as returned by
2518
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2519

2520
    """
2521
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2522

    
2523
    # compute the DRBD minors
2524
    node_drbd = {}
2525
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2526
      test = inst_uuid not in instanceinfo
2527
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2528
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2529
        # ghost instance should not be running, but otherwise we
2530
        # don't give double warnings (both ghost instance and
2531
        # unallocated minor in use)
2532
      if test:
2533
        node_drbd[minor] = (inst_uuid, False)
2534
      else:
2535
        instance = instanceinfo[inst_uuid]
2536
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2537

    
2538
    # and now check them
2539
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2540
    test = not isinstance(used_minors, (tuple, list))
2541
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2542
                  "cannot parse drbd status file: %s", str(used_minors))
2543
    if test:
2544
      # we cannot check drbd status
2545
      return
2546

    
2547
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2548
      test = minor not in used_minors and must_exist
2549
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2550
                    "drbd minor %d of instance %s is not active", minor,
2551
                    self.cfg.GetInstanceName(inst_uuid))
2552
    for minor in used_minors:
2553
      test = minor not in node_drbd
2554
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2555
                    "unallocated drbd minor %d is in use", minor)
2556

    
2557
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2558
    """Builds the node OS structures.
2559

2560
    @type ninfo: L{objects.Node}
2561
    @param ninfo: the node to check
2562
    @param nresult: the remote results for the node
2563
    @param nimg: the node image object
2564

2565
    """
2566
    remote_os = nresult.get(constants.NV_OSLIST, None)
2567
    test = (not isinstance(remote_os, list) or
2568
            not compat.all(isinstance(v, list) and len(v) == 7
2569
                           for v in remote_os))
2570

    
2571
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2572
                  "node hasn't returned valid OS data")
2573

    
2574
    nimg.os_fail = test
2575

    
2576
    if test:
2577
      return
2578

    
2579
    os_dict = {}
2580

    
2581
    for (name, os_path, status, diagnose,
2582
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2583

    
2584
      if name not in os_dict:
2585
        os_dict[name] = []
2586

    
2587
      # parameters is a list of lists instead of list of tuples due to
2588
      # JSON lacking a real tuple type, fix it:
2589
      parameters = [tuple(v) for v in parameters]
2590
      os_dict[name].append((os_path, status, diagnose,
2591
                            set(variants), set(parameters), set(api_ver)))
2592

    
2593
    nimg.oslist = os_dict
2594

    
2595
  def _VerifyNodeOS(self, ninfo, nimg, base):
2596
    """Verifies the node OS list.
2597

2598
    @type ninfo: L{objects.Node}
2599
    @param ninfo: the node to check
2600
    @param nimg: the node image object
2601
    @param base: the 'template' node we match against (e.g. from the master)
2602

2603
    """
2604
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2605

    
2606
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2607
    for os_name, os_data in nimg.oslist.items():
2608
      assert os_data, "Empty OS status for OS %s?!" % os_name
2609
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2610
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2611
                    "Invalid OS %s (located at %s): %s",
2612
                    os_name, f_path, f_diag)
2613
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2614
                    "OS '%s' has multiple entries"
2615
                    " (first one shadows the rest): %s",
2616
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2617
      # comparisons with the 'base' image
2618
      test = os_name not in base.oslist
2619
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2620
                    "Extra OS %s not present on reference node (%s)",
2621
                    os_name, self.cfg.GetNodeName(base.uuid))
2622
      if test:
2623
        continue
2624
      assert base.oslist[os_name], "Base node has empty OS status?"
2625
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2626
      if not b_status:
2627
        # base OS is invalid, skipping
2628
        continue
2629
      for kind, a, b in [("API version", f_api, b_api),
2630
                         ("variants list", f_var, b_var),
2631
                         ("parameters", beautify_params(f_param),
2632
                          beautify_params(b_param))]:
2633
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2634
                      "OS %s for %s differs from reference node %s:"
2635
                      " [%s] vs. [%s]", kind, os_name,
2636
                      self.cfg.GetNodeName(base.uuid),
2637
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2638

    
2639
    # check any missing OSes
2640
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2641
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2642
                  "OSes present on reference node %s"
2643
                  " but missing on this node: %s",
2644
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2645

    
2646
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2647
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2648

2649
    @type ninfo: L{objects.Node}
2650
    @param ninfo: the node to check
2651
    @param nresult: the remote results for the node
2652
    @type is_master: bool
2653
    @param is_master: Whether node is the master node
2654

2655
    """
2656
    cluster = self.cfg.GetClusterInfo()
2657
    if (is_master and
2658
        (cluster.IsFileStorageEnabled() or
2659
         cluster.IsSharedFileStorageEnabled())):
2660
      try:
2661
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2662
      except KeyError:
2663
        # This should never happen
2664
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2665
                      "Node did not return forbidden file storage paths")
2666
      else:
2667
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2668
                      "Found forbidden file storage paths: %s",
2669
                      utils.CommaJoin(fspaths))
2670
    else:
2671
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2672
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2673
                    "Node should not have returned forbidden file storage"
2674
                    " paths")
2675

    
2676
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2677
                          verify_key, error_key):
2678
    """Verifies (file) storage paths.
2679

2680
    @type ninfo: L{objects.Node}
2681
    @param ninfo: the node to check
2682
    @param nresult: the remote results for the node
2683
    @type file_disk_template: string
2684
    @param file_disk_template: file-based disk template, whose directory
2685
        is supposed to be verified
2686
    @type verify_key: string
2687
    @param verify_key: key for the verification map of this file
2688
        verification step
2689
    @param error_key: error key to be added to the verification results
2690
        in case something goes wrong in this verification step
2691

2692
    """
2693
    assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
2694
              constants.ST_FILE, constants.ST_SHARED_FILE
2695
           ))
2696

    
2697
    cluster = self.cfg.GetClusterInfo()
2698
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2699
      self._ErrorIf(
2700
          verify_key in nresult,
2701
          error_key, ninfo.name,
2702
          "The configured %s storage path is unusable: %s" %
2703
          (file_disk_template, nresult.get(verify_key)))
2704

    
2705
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2706
    """Verifies (file) storage paths.
2707

2708
    @see: C{_VerifyStoragePaths}
2709

2710
    """
2711
    self._VerifyStoragePaths(
2712
        ninfo, nresult, constants.DT_FILE,
2713
        constants.NV_FILE_STORAGE_PATH,
2714
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2715

    
2716
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2717
    """Verifies (file) storage paths.
2718

2719
    @see: C{_VerifyStoragePaths}
2720

2721
    """
2722
    self._VerifyStoragePaths(
2723
        ninfo, nresult, constants.DT_SHARED_FILE,
2724
        constants.NV_SHARED_FILE_STORAGE_PATH,
2725
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2726

    
2727
  def _VerifyOob(self, ninfo, nresult):
2728
    """Verifies out of band functionality of a node.
2729

2730
    @type ninfo: L{objects.Node}
2731
    @param ninfo: the node to check
2732
    @param nresult: the remote results for the node
2733

2734
    """
2735
    # We just have to verify the paths on master and/or master candidates
2736
    # as the oob helper is invoked on the master
2737
    if ((ninfo.master_candidate or ninfo.master_capable) and
2738
        constants.NV_OOB_PATHS in nresult):
2739
      for path_result in nresult[constants.NV_OOB_PATHS]:
2740
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2741
                      ninfo.name, path_result)
2742

    
2743
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2744
    """Verifies and updates the node volume data.
2745

2746
    This function will update a L{NodeImage}'s internal structures
2747
    with data from the remote call.
2748

2749
    @type ninfo: L{objects.Node}
2750
    @param ninfo: the node to check
2751
    @param nresult: the remote results for the node
2752
    @param nimg: the node image object
2753
    @param vg_name: the configured VG name
2754

2755
    """
2756
    nimg.lvm_fail = True
2757
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2758
    if vg_name is None:
2759
      pass
2760
    elif isinstance(lvdata, basestring):
2761
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2762
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2763
    elif not isinstance(lvdata, dict):
2764
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2765
                    "rpc call to node failed (lvlist)")
2766
    else:
2767
      nimg.volumes = lvdata
2768
      nimg.lvm_fail = False
2769

    
2770
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2771
    """Verifies and updates the node instance list.
2772

2773
    If the listing was successful, then updates this node's instance
2774
    list. Otherwise, it marks the RPC call as failed for the instance
2775
    list key.
2776

2777
    @type ninfo: L{objects.Node}
2778
    @param ninfo: the node to check
2779
    @param nresult: the remote results for the node
2780
    @param nimg: the node image object
2781

2782
    """
2783
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2784
    test = not isinstance(idata, list)
2785
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2786
                  "rpc call to node failed (instancelist): %s",
2787
                  utils.SafeEncode(str(idata)))
2788
    if test:
2789
      nimg.hyp_fail = True
2790
    else:
2791
      nimg.instances = [inst.uuid for (_, inst) in
2792
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2793

    
2794
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2795
    """Verifies and computes a node information map
2796

2797
    @type ninfo: L{objects.Node}
2798
    @param ninfo: the node to check
2799
    @param nresult: the remote results for the node
2800
    @param nimg: the node image object
2801
    @param vg_name: the configured VG name
2802

2803
    """
2804
    # try to read free memory (from the hypervisor)
2805
    hv_info = nresult.get(constants.NV_HVINFO, None)
2806
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2807
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2808
                  "rpc call to node failed (hvinfo)")
2809
    if not test:
2810
      try:
2811
        nimg.mfree = int(hv_info["memory_free"])
2812
      except (ValueError, TypeError):
2813
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2814
                      "node returned invalid nodeinfo, check hypervisor")
2815

    
2816
    # FIXME: devise a free space model for file based instances as well
2817
    if vg_name is not None:
2818
      test = (constants.NV_VGLIST not in nresult or
2819
              vg_name not in nresult[constants.NV_VGLIST])
2820
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2821
                    "node didn't return data for the volume group '%s'"
2822
                    " - it is either missing or broken", vg_name)
2823
      if not test:
2824
        try:
2825
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2826
        except (ValueError, TypeError):
2827
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2828
                        "node returned invalid LVM info, check LVM status")
2829

    
2830
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2831
    """Gets per-disk status information for all instances.
2832

2833
    @type node_uuids: list of strings
2834
    @param node_uuids: Node UUIDs
2835
    @type node_image: dict of (UUID, L{objects.Node})
2836
    @param node_image: Node objects
2837
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2838
    @param instanceinfo: Instance objects
2839
    @rtype: {instance: {node: [(succes, payload)]}}
2840
    @return: a dictionary of per-instance dictionaries with nodes as
2841
        keys and disk information as values; the disk information is a
2842
        list of tuples (success, payload)
2843

2844
    """
2845
    node_disks = {}
2846
    node_disks_dev_inst_only = {}
2847
    diskless_instances = set()
2848
    diskless = constants.DT_DISKLESS
2849

    
2850
    for nuuid in node_uuids:
2851
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2852
                                             node_image[nuuid].sinst))
2853
      diskless_instances.update(uuid for uuid in node_inst_uuids
2854
                                if instanceinfo[uuid].disk_template == diskless)
2855
      disks = [(inst_uuid, disk)
2856
               for inst_uuid in node_inst_uuids
2857
               for disk in instanceinfo[inst_uuid].disks]
2858

    
2859
      if not disks:
2860
        # No need to collect data
2861
        continue
2862

    
2863
      node_disks[nuuid] = disks
2864

    
2865
      # _AnnotateDiskParams makes already copies of the disks
2866
      dev_inst_only = []
2867
      for (inst_uuid, dev) in disks:
2868
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2869
                                          self.cfg)
2870
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2871

    
2872
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2873

    
2874
    assert len(node_disks) == len(node_disks_dev_inst_only)
2875

    
2876
    # Collect data from all nodes with disks
2877
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2878
               node_disks.keys(), node_disks_dev_inst_only)
2879

    
2880
    assert len(result) == len(node_disks)
2881

    
2882
    instdisk = {}
2883

    
2884
    for (nuuid, nres) in result.items():
2885
      node = self.cfg.GetNodeInfo(nuuid)
2886
      disks = node_disks[node.uuid]
2887

    
2888
      if nres.offline:
2889
        # No data from this node
2890
        data = len(disks) * [(False, "node offline")]
2891
      else:
2892
        msg = nres.fail_msg
2893
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2894
                      "while getting disk information: %s", msg)
2895
        if msg:
2896
          # No data from this node
2897
          data = len(disks) * [(False, msg)]
2898
        else:
2899
          data = []
2900
          for idx, i in enumerate(nres.payload):
2901
            if isinstance(i, (tuple, list)) and len(i) == 2:
2902
              data.append(i)
2903
            else:
2904
              logging.warning("Invalid result from node %s, entry %d: %s",
2905
                              node.name, idx, i)
2906
              data.append((False, "Invalid result from the remote node"))
2907

    
2908
      for ((inst_uuid, _), status) in zip(disks, data):
2909
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2910
          .append(status)
2911

    
2912
    # Add empty entries for diskless instances.
2913
    for inst_uuid in diskless_instances:
2914
      assert inst_uuid not in instdisk
2915
      instdisk[inst_uuid] = {}
2916

    
2917
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2918
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2919
                      compat.all(isinstance(s, (tuple, list)) and
2920
                                 len(s) == 2 for s in statuses)
2921
                      for inst, nuuids in instdisk.items()
2922
                      for nuuid, statuses in nuuids.items())
2923
    if __debug__:
2924
      instdisk_keys = set(instdisk)
2925
      instanceinfo_keys = set(instanceinfo)
2926
      assert instdisk_keys == instanceinfo_keys, \
2927
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2928
         (instdisk_keys, instanceinfo_keys))
2929

    
2930
    return instdisk
2931

    
2932
  @staticmethod
2933
  def _SshNodeSelector(group_uuid, all_nodes):
2934
    """Create endless iterators for all potential SSH check hosts.
2935

2936
    """
2937
    nodes = [node for node in all_nodes
2938
             if (node.group != group_uuid and
2939
                 not node.offline)]
2940
    keyfunc = operator.attrgetter("group")
2941

    
2942
    return map(itertools.cycle,
2943
               [sorted(map(operator.attrgetter("name"), names))
2944
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2945
                                                  keyfunc)])
2946

    
2947
  @classmethod
2948
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2949
    """Choose which nodes should talk to which other nodes.
2950

2951
    We will make nodes contact all nodes in their group, and one node from
2952
    every other group.
2953

2954
    @warning: This algorithm has a known issue if one node group is much
2955
      smaller than others (e.g. just one node). In such a case all other
2956
      nodes will talk to the single node.
2957

2958
    """
2959
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2960
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2961

    
2962
    return (online_nodes,
2963
            dict((name, sorted([i.next() for i in sel]))
2964
                 for name in online_nodes))
2965

    
2966
  def BuildHooksEnv(self):
2967
    """Build hooks env.
2968

2969
    Cluster-Verify hooks just ran in the post phase and their failure makes
2970
    the output be logged in the verify output and the verification to fail.
2971

2972
    """
2973
    env = {
2974
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2975
      }
2976

    
2977
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2978
               for node in self.my_node_info.values())
2979

    
2980
    return env
2981

    
2982
  def BuildHooksNodes(self):
2983
    """Build hooks nodes.
2984

2985
    """
2986
    return ([], list(self.my_node_info.keys()))
2987

    
2988
  def Exec(self, feedback_fn):
2989
    """Verify integrity of the node group, performing various test on nodes.
2990

2991
    """
2992
    # This method has too many local variables. pylint: disable=R0914
2993
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2994

    
2995
    if not self.my_node_uuids:
2996
      # empty node group
2997
      feedback_fn("* Empty node group, skipping verification")
2998
      return True
2999

    
3000
    self.bad = False
3001
    verbose = self.op.verbose
3002
    self._feedback_fn = feedback_fn
3003

    
3004
    vg_name = self.cfg.GetVGName()
3005
    drbd_helper = self.cfg.GetDRBDHelper()
3006
    cluster = self.cfg.GetClusterInfo()
3007
    hypervisors = cluster.enabled_hypervisors
3008
    node_data_list = self.my_node_info.values()
3009

    
3010
    i_non_redundant = [] # Non redundant instances
3011
    i_non_a_balanced = [] # Non auto-balanced instances
3012
    i_offline = 0 # Count of offline instances
3013
    n_offline = 0 # Count of offline nodes
3014
    n_drained = 0 # Count of nodes being drained
3015
    node_vol_should = {}
3016

    
3017
    # FIXME: verify OS list
3018

    
3019
    # File verification
3020
    filemap = ComputeAncillaryFiles(cluster, False)
3021

    
3022
    # do local checksums
3023
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
3024
    master_ip = self.cfg.GetMasterIP()
3025

    
3026
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
3027

    
3028
    user_scripts = []
3029
    if self.cfg.GetUseExternalMipScript():
3030
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
3031

    
3032
    node_verify_param = {
3033
      constants.NV_FILELIST:
3034
        map(vcluster.MakeVirtualPath,
3035
            utils.UniqueSequence(filename
3036
                                 for files in filemap
3037
                                 for filename in files)),
3038
      constants.NV_NODELIST:
3039
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
3040
                                  self.all_node_info.values()),
3041
      constants.NV_HYPERVISOR: hypervisors,
3042
      constants.NV_HVPARAMS:
3043
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
3044
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
3045
                                 for node in node_data_list
3046
                                 if not node.offline],
3047
      constants.NV_INSTANCELIST: hypervisors,
3048
      constants.NV_VERSION: None,
3049
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
3050
      constants.NV_NODESETUP: None,
3051
      constants.NV_TIME: None,
3052
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
3053
      constants.NV_OSLIST: None,
3054
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
3055
      constants.NV_USERSCRIPTS: user_scripts,
3056
      constants.NV_CLIENT_CERT: None,
3057
      }
3058

    
3059
    if vg_name is not None:
3060
      node_verify_param[constants.NV_VGLIST] = None
3061
      node_verify_param[constants.NV_LVLIST] = vg_name
3062
      node_verify_param[constants.NV_PVLIST] = [vg_name]
3063

    
3064
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
3065
      if drbd_helper:
3066
        node_verify_param[constants.NV_DRBDVERSION] = None
3067
        node_verify_param[constants.NV_DRBDLIST] = None
3068
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
3069

    
3070
    if cluster.IsFileStorageEnabled() or \
3071
        cluster.IsSharedFileStorageEnabled():
3072
      # Load file storage paths only from master node
3073
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
3074
        self.cfg.GetMasterNodeName()
3075
      if cluster.IsFileStorageEnabled():
3076
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
3077
          cluster.file_storage_dir
3078

    
3079
    # bridge checks
3080
    # FIXME: this needs to be changed per node-group, not cluster-wide
3081
    bridges = set()
3082
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
3083
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3084
      bridges.add(default_nicpp[constants.NIC_LINK])
3085
    for inst_uuid in self.my_inst_info.values():
3086
      for nic in inst_uuid.nics:
3087
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
3088
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3089
          bridges.add(full_nic[constants.NIC_LINK])
3090

    
3091
    if bridges:
3092
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
3093

    
3094
    # Build our expected cluster state
3095
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
3096
                                                 uuid=node.uuid,
3097
                                                 vm_capable=node.vm_capable))
3098
                      for node in node_data_list)
3099

    
3100
    # Gather OOB paths
3101
    oob_paths = []
3102
    for node in self.all_node_info.values():
3103
      path = SupportsOob(self.cfg, node)
3104
      if path and path not in oob_paths:
3105
        oob_paths.append(path)
3106

    
3107
    if oob_paths:
3108
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
3109

    
3110
    for inst_uuid in self.my_inst_uuids:
3111
      instance = self.my_inst_info[inst_uuid]
3112
      if instance.admin_state == constants.ADMINST_OFFLINE:
3113
        i_offline += 1
3114

    
3115
      for nuuid in instance.all_nodes:
3116
        if nuuid not in node_image:
3117
          gnode = self.NodeImage(uuid=nuuid)
3118
          gnode.ghost = (nuuid not in self.all_node_info)
3119
          node_image[nuuid] = gnode
3120

    
3121
      instance.MapLVsByNode(node_vol_should)
3122

    
3123
      pnode = instance.primary_node
3124
      node_image[pnode].pinst.append(instance.uuid)
3125

    
3126
      for snode in instance.secondary_nodes:
3127
        nimg = node_image[snode]
3128
        nimg.sinst.append(instance.uuid)
3129
        if pnode not in nimg.sbp:
3130
          nimg.sbp[pnode] = []
3131
        nimg.sbp[pnode].append(instance.uuid)
3132

    
3133
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3134
                                               self.my_node_info.keys())
3135
    # The value of exclusive_storage should be the same across the group, so if
3136
    # it's True for at least a node, we act as if it were set for all the nodes
3137
    self._exclusive_storage = compat.any(es_flags.values())
3138
    if self._exclusive_storage:
3139
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3140

    
3141
    node_group_uuids = dict(map(lambda n: (n.name, n.group),
3142
                                self.cfg.GetAllNodesInfo().values()))
3143
    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
3144

    
3145
    # At this point, we have the in-memory data structures complete,
3146
    # except for the runtime information, which we'll gather next
3147

    
3148
    # Due to the way our RPC system works, exact response times cannot be
3149
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3150
    # time before and after executing the request, we can at least have a time
3151
    # window.
3152
    nvinfo_starttime = time.time()
3153
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3154
                                           node_verify_param,
3155
                                           self.cfg.GetClusterName(),
3156
                                           self.cfg.GetClusterInfo().hvparams,
3157
                                           node_group_uuids,
3158
                                           groups_config)
3159
    nvinfo_endtime = time.time()
3160

    
3161
    if self.extra_lv_nodes and vg_name is not None:
3162
      extra_lv_nvinfo = \
3163
          self.rpc.call_node_verify(self.extra_lv_nodes,
3164
                                    {constants.NV_LVLIST: vg_name},
3165
                                    self.cfg.GetClusterName(),
3166
                                    self.cfg.GetClusterInfo().hvparams,
3167
                                    node_group_uuids,
3168
                                    groups_config)
3169
    else:
3170
      extra_lv_nvinfo = {}
3171

    
3172
    all_drbd_map = self.cfg.ComputeDRBDMap()
3173

    
3174
    feedback_fn("* Gathering disk information (%s nodes)" %
3175
                len(self.my_node_uuids))
3176
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3177
                                     self.my_inst_info)
3178

    
3179
    feedback_fn("* Verifying configuration file consistency")
3180

    
3181
    self._VerifyClientCertificates(self.my_node_info.values(), all_nvinfo)
3182
    # If not all nodes are being checked, we need to make sure the master node
3183
    # and a non-checked vm_capable node are in the list.
3184
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3185
    if absent_node_uuids:
3186
      vf_nvinfo = all_nvinfo.copy()
3187
      vf_node_info = list(self.my_node_info.values())
3188
      additional_node_uuids = []
3189
      if master_node_uuid not in self.my_node_info:
3190
        additional_node_uuids.append(master_node_uuid)
3191
        vf_node_info.append(self.all_node_info[master_node_uuid])
3192
      # Add the first vm_capable node we find which is not included,
3193
      # excluding the master node (which we already have)
3194
      for node_uuid in absent_node_uuids:
3195
        nodeinfo = self.all_node_info[node_uuid]
3196
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3197
            node_uuid != master_node_uuid):
3198
          additional_node_uuids.append(node_uuid)
3199
          vf_node_info.append(self.all_node_info[node_uuid])
3200
          break
3201
      key = constants.NV_FILELIST
3202
      vf_nvinfo.update(self.rpc.call_node_verify(
3203
         additional_node_uuids, {key: node_verify_param[key]},
3204
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
3205
         node_group_uuids,
3206
         groups_config))
3207
    else:
3208
      vf_nvinfo = all_nvinfo
3209
      vf_node_info = self.my_node_info.values()
3210

    
3211
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3212

    
3213
    feedback_fn("* Verifying node status")
3214

    
3215
    refos_img = None
3216

    
3217
    for node_i in node_data_list:
3218
      nimg = node_image[node_i.uuid]
3219

    
3220
      if node_i.offline:
3221
        if verbose:
3222
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3223
        n_offline += 1
3224
        continue
3225

    
3226
      if node_i.uuid == master_node_uuid:
3227
        ntype = "master"
3228
      elif node_i.master_candidate:
3229
        ntype = "master candidate"
3230
      elif node_i.drained:
3231
        ntype = "drained"
3232
        n_drained += 1
3233
      else:
3234
        ntype = "regular"
3235
      if verbose:
3236
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3237

    
3238
      msg = all_nvinfo[node_i.uuid].fail_msg
3239
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3240
                    "while contacting node: %s", msg)
3241
      if msg:
3242
        nimg.rpc_fail = True
3243
        continue
3244

    
3245
      nresult = all_nvinfo[node_i.uuid].payload
3246

    
3247
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3248
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3249
      self._VerifyNodeNetwork(node_i, nresult)
3250
      self._VerifyNodeUserScripts(node_i, nresult)
3251
      self._VerifyOob(node_i, nresult)
3252
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3253
                                           node_i.uuid == master_node_uuid)
3254
      self._VerifyFileStoragePaths(node_i, nresult)
3255
      self._VerifySharedFileStoragePaths(node_i, nresult)
3256

    
3257
      if nimg.vm_capable:
3258
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3259
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3260
                             all_drbd_map)
3261

    
3262
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3263
        self._UpdateNodeInstances(node_i, nresult, nimg)
3264
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3265
        self._UpdateNodeOS(node_i, nresult, nimg)
3266

    
3267
        if not nimg.os_fail:
3268
          if refos_img is None:
3269
            refos_img = nimg
3270
          self._VerifyNodeOS(node_i, nimg, refos_img)
3271
        self._VerifyNodeBridges(node_i, nresult, bridges)
3272

    
3273
        # Check whether all running instances are primary for the node. (This
3274
        # can no longer be done from _VerifyInstance below, since some of the
3275
        # wrong instances could be from other node groups.)
3276
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3277

    
3278
        for inst_uuid in non_primary_inst_uuids:
3279
          test = inst_uuid in self.all_inst_info
3280
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3281
                        self.cfg.GetInstanceName(inst_uuid),
3282
                        "instance should not run on node %s", node_i.name)
3283
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3284
                        "node is running unknown instance %s", inst_uuid)
3285

    
3286
    self._VerifyGroupDRBDVersion(all_nvinfo)
3287
    self._VerifyGroupLVM(node_image, vg_name)
3288

    
3289
    for node_uuid, result in extra_lv_nvinfo.items():
3290
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3291
                              node_image[node_uuid], vg_name)
3292

    
3293
    feedback_fn("* Verifying instance status")
3294
    for inst_uuid in self.my_inst_uuids:
3295
      instance = self.my_inst_info[inst_uuid]
3296
      if verbose:
3297
        feedback_fn("* Verifying instance %s" % instance.name)
3298
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3299

    
3300
      # If the instance is non-redundant we cannot survive losing its primary
3301
      # node, so we are not N+1 compliant.
3302
      if instance.disk_template not in constants.DTS_MIRRORED:
3303
        i_non_redundant.append(instance)
3304

    
3305
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3306
        i_non_a_balanced.append(instance)
3307

    
3308
    feedback_fn("* Verifying orphan volumes")
3309
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3310

    
3311
    # We will get spurious "unknown volume" warnings if any node of this group
3312
    # is secondary for an instance whose primary is in another group. To avoid
3313
    # them, we find these instances and add their volumes to node_vol_should.
3314
    for instance in self.all_inst_info.values():
3315
      for secondary in instance.secondary_nodes:
3316
        if (secondary in self.my_node_info
3317
            and instance.name not in self.my_inst_info):
3318
          instance.MapLVsByNode(node_vol_should)
3319
          break
3320

    
3321
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3322

    
3323
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3324
      feedback_fn("* Verifying N+1 Memory redundancy")
3325
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3326

    
3327
    feedback_fn("* Other Notes")
3328
    if i_non_redundant:
3329
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3330
                  % len(i_non_redundant))
3331

    
3332
    if i_non_a_balanced:
3333
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3334
                  % len(i_non_a_balanced))
3335

    
3336
    if i_offline:
3337
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3338

    
3339
    if n_offline:
3340
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3341

    
3342
    if n_drained:
3343
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3344

    
3345
    return not self.bad
3346

    
3347
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3348
    """Analyze the post-hooks' result
3349

3350
    This method analyses the hook result, handles it, and sends some
3351
    nicely-formatted feedback back to the user.
3352

3353
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3354
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3355
    @param hooks_results: the results of the multi-node hooks rpc call
3356
    @param feedback_fn: function used send feedback back to the caller
3357
    @param lu_result: previous Exec result
3358
    @return: the new Exec result, based on the previous result
3359
        and hook results
3360

3361
    """
3362
    # We only really run POST phase hooks, only for non-empty groups,
3363
    # and are only interested in their results
3364
    if not self.my_node_uuids:
3365
      # empty node group
3366
      pass
3367
    elif phase == constants.HOOKS_PHASE_POST:
3368
      # Used to change hooks' output to proper indentation
3369
      feedback_fn("* Hooks Results")
3370
      assert hooks_results, "invalid result from hooks"
3371

    
3372
      for node_name in hooks_results:
3373
        res = hooks_results[node_name]
3374
        msg = res.fail_msg
3375
        test = msg and not res.offline
3376
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3377
                      "Communication failure in hooks execution: %s", msg)
3378
        if res.offline or msg:
3379
          # No need to investigate payload if node is offline or gave
3380
          # an error.
3381
          continue
3382
        for script, hkr, output in res.payload:
3383
          test = hkr == constants.HKR_FAIL
3384
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3385
                        "Script %s failed, output:", script)
3386
          if test:
3387
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3388
            feedback_fn("%s" % output)
3389
            lu_result = False
3390

    
3391
    return lu_result
3392

    
3393

    
3394
class LUClusterVerifyDisks(NoHooksLU):
3395
  """Verifies the cluster disks status.
3396

3397
  """
3398
  REQ_BGL = False
3399

    
3400
  def ExpandNames(self):
3401
    self.share_locks = ShareAll()
3402
    self.needed_locks = {
3403
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3404
      }
3405

    
3406
  def Exec(self, feedback_fn):
3407
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3408

    
3409
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3410
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3411
                           for group in group_names])