Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ ad756c77

History | View | Annotate | Download (127.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import copy
25
import itertools
26
import logging
27
import operator
28
import os
29
import re
30
import time
31

    
32
from ganeti import compat
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import hypervisor
36
from ganeti import locking
37
from ganeti import masterd
38
from ganeti import netutils
39
from ganeti import objects
40
from ganeti import opcodes
41
from ganeti import pathutils
42
from ganeti import query
43
import ganeti.rpc.node as rpc
44
from ganeti import runtime
45
from ganeti import ssh
46
from ganeti import uidpool
47
from ganeti import utils
48
from ganeti import vcluster
49

    
50
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
51
  ResultWithJobs
52
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
53
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
54
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
55
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
56
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
57
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
58
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
59
  CheckDiskAccessModeConsistency, CreateNewClientCert
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
def _UpdateMasterClientCert(
65
    lu, master_uuid, cluster, feedback_fn,
66
    client_cert=pathutils.NODED_CLIENT_CERT_FILE,
67
    client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP):
68
  """Renews the master's client certificate and propagates the config.
69

70
  @type lu: C{LogicalUnit}
71
  @param lu: the logical unit holding the config
72
  @type master_uuid: string
73
  @param master_uuid: the master node's UUID
74
  @type cluster: C{objects.Cluster}
75
  @param cluster: the cluster's configuration
76
  @type feedback_fn: function
77
  @param feedback_fn: feedback functions for config updates
78
  @type client_cert: string
79
  @param client_cert: the path of the client certificate
80
  @type client_cert_tmp: string
81
  @param client_cert_tmp: the temporary path of the client certificate
82
  @rtype: string
83
  @return: the digest of the newly created client certificate
84

85
  """
86
  client_digest = CreateNewClientCert(lu, master_uuid, filename=client_cert_tmp)
87
  utils.AddNodeToCandidateCerts(master_uuid, client_digest,
88
                                cluster.candidate_certs)
89
  # This triggers an update of the config and distribution of it with the old
90
  # SSL certificate
91
  lu.cfg.Update(cluster, feedback_fn)
92

    
93
  utils.RemoveFile(client_cert)
94
  utils.RenameFile(client_cert_tmp, client_cert)
95
  return client_digest
96

    
97

    
98
class LUClusterRenewCrypto(NoHooksLU):
99
  """Renew the cluster's crypto tokens.
100

101
  Note that most of this operation is done in gnt_cluster.py, this LU only
102
  takes care of the renewal of the client SSL certificates.
103

104
  """
105
  def Exec(self, feedback_fn):
106
    master_uuid = self.cfg.GetMasterNode()
107
    cluster = self.cfg.GetClusterInfo()
108

    
109
    server_digest = utils.GetCertificateDigest(
110
      cert_filename=pathutils.NODED_CERT_FILE)
111
    utils.AddNodeToCandidateCerts("%s-SERVER" % master_uuid,
112
                                  server_digest,
113
                                  cluster.candidate_certs)
114
    new_master_digest = _UpdateMasterClientCert(self, master_uuid, cluster,
115
                                                feedback_fn)
116

    
117
    cluster.candidate_certs = {master_uuid: new_master_digest}
118
    nodes = self.cfg.GetAllNodesInfo()
119
    for (node_uuid, node_info) in nodes.items():
120
      if node_uuid != master_uuid:
121
        new_digest = CreateNewClientCert(self, node_uuid)
122
        if node_info.master_candidate:
123
          cluster.candidate_certs[node_uuid] = new_digest
124
    # Trigger another update of the config now with the new master cert
125
    self.cfg.Update(cluster, feedback_fn)
126

    
127

    
128
class LUClusterActivateMasterIp(NoHooksLU):
129
  """Activate the master IP on the master node.
130

131
  """
132
  def Exec(self, feedback_fn):
133
    """Activate the master IP.
134

135
    """
136
    master_params = self.cfg.GetMasterNetworkParameters()
137
    ems = self.cfg.GetUseExternalMipScript()
138
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
139
                                                   master_params, ems)
140
    result.Raise("Could not activate the master IP")
141

    
142

    
143
class LUClusterDeactivateMasterIp(NoHooksLU):
144
  """Deactivate the master IP on the master node.
145

146
  """
147
  def Exec(self, feedback_fn):
148
    """Deactivate the master IP.
149

150
    """
151
    master_params = self.cfg.GetMasterNetworkParameters()
152
    ems = self.cfg.GetUseExternalMipScript()
153
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
154
                                                     master_params, ems)
155
    result.Raise("Could not deactivate the master IP")
156

    
157

    
158
class LUClusterConfigQuery(NoHooksLU):
159
  """Return configuration values.
160

161
  """
162
  REQ_BGL = False
163

    
164
  def CheckArguments(self):
165
    self.cq = ClusterQuery(None, self.op.output_fields, False)
166

    
167
  def ExpandNames(self):
168
    self.cq.ExpandNames(self)
169

    
170
  def DeclareLocks(self, level):
171
    self.cq.DeclareLocks(self, level)
172

    
173
  def Exec(self, feedback_fn):
174
    result = self.cq.OldStyleQuery(self)
175

    
176
    assert len(result) == 1
177

    
178
    return result[0]
179

    
180

    
181
class LUClusterDestroy(LogicalUnit):
182
  """Logical unit for destroying the cluster.
183

184
  """
185
  HPATH = "cluster-destroy"
186
  HTYPE = constants.HTYPE_CLUSTER
187

    
188
  def BuildHooksEnv(self):
189
    """Build hooks env.
190

191
    """
192
    return {
193
      "OP_TARGET": self.cfg.GetClusterName(),
194
      }
195

    
196
  def BuildHooksNodes(self):
197
    """Build hooks nodes.
198

199
    """
200
    return ([], [])
201

    
202
  def CheckPrereq(self):
203
    """Check prerequisites.
204

205
    This checks whether the cluster is empty.
206

207
    Any errors are signaled by raising errors.OpPrereqError.
208

209
    """
210
    master = self.cfg.GetMasterNode()
211

    
212
    nodelist = self.cfg.GetNodeList()
213
    if len(nodelist) != 1 or nodelist[0] != master:
214
      raise errors.OpPrereqError("There are still %d node(s) in"
215
                                 " this cluster." % (len(nodelist) - 1),
216
                                 errors.ECODE_INVAL)
217
    instancelist = self.cfg.GetInstanceList()
218
    if instancelist:
219
      raise errors.OpPrereqError("There are still %d instance(s) in"
220
                                 " this cluster." % len(instancelist),
221
                                 errors.ECODE_INVAL)
222

    
223
  def Exec(self, feedback_fn):
224
    """Destroys the cluster.
225

226
    """
227
    master_params = self.cfg.GetMasterNetworkParameters()
228

    
229
    # Run post hooks on master node before it's removed
230
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
231

    
232
    ems = self.cfg.GetUseExternalMipScript()
233
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
234
                                                     master_params, ems)
235
    result.Warn("Error disabling the master IP address", self.LogWarning)
236
    return master_params.uuid
237

    
238

    
239
class LUClusterPostInit(LogicalUnit):
240
  """Logical unit for running hooks after cluster initialization.
241

242
  """
243
  HPATH = "cluster-init"
244
  HTYPE = constants.HTYPE_CLUSTER
245

    
246
  def CheckArguments(self):
247
    self.master_uuid = self.cfg.GetMasterNode()
248
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
249

    
250
    # TODO: When Issue 584 is solved, and None is properly parsed when used
251
    # as a default value, ndparams.get(.., None) can be changed to
252
    # ndparams[..] to access the values directly
253

    
254
    # OpenvSwitch: Warn user if link is missing
255
    if (self.master_ndparams[constants.ND_OVS] and not
256
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
257
      self.LogInfo("No physical interface for OpenvSwitch was given."
258
                   " OpenvSwitch will not have an outside connection. This"
259
                   " might not be what you want.")
260

    
261
  def BuildHooksEnv(self):
262
    """Build hooks env.
263

264
    """
265
    return {
266
      "OP_TARGET": self.cfg.GetClusterName(),
267
      }
268

    
269
  def BuildHooksNodes(self):
270
    """Build hooks nodes.
271

272
    """
273
    return ([], [self.cfg.GetMasterNode()])
274

    
275
  def Exec(self, feedback_fn):
276
    """Create and configure Open vSwitch
277

278
    """
279
    if self.master_ndparams[constants.ND_OVS]:
280
      result = self.rpc.call_node_configure_ovs(
281
                 self.master_uuid,
282
                 self.master_ndparams[constants.ND_OVS_NAME],
283
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
284
      result.Raise("Could not successully configure Open vSwitch")
285

    
286
    cluster = self.cfg.GetClusterInfo()
287
    _UpdateMasterClientCert(self, self.master_uuid, cluster, feedback_fn)
288

    
289
    return True
290

    
291

    
292
class ClusterQuery(QueryBase):
293
  FIELDS = query.CLUSTER_FIELDS
294

    
295
  #: Do not sort (there is only one item)
296
  SORT_FIELD = None
297

    
298
  def ExpandNames(self, lu):
299
    lu.needed_locks = {}
300

    
301
    # The following variables interact with _QueryBase._GetNames
302
    self.wanted = locking.ALL_SET
303
    self.do_locking = self.use_locking
304

    
305
    if self.do_locking:
306
      raise errors.OpPrereqError("Can not use locking for cluster queries",
307
                                 errors.ECODE_INVAL)
308

    
309
  def DeclareLocks(self, lu, level):
310
    pass
311

    
312
  def _GetQueryData(self, lu):
313
    """Computes the list of nodes and their attributes.
314

315
    """
316
    # Locking is not used
317
    assert not (compat.any(lu.glm.is_owned(level)
318
                           for level in locking.LEVELS
319
                           if level != locking.LEVEL_CLUSTER) or
320
                self.do_locking or self.use_locking)
321

    
322
    if query.CQ_CONFIG in self.requested_data:
323
      cluster = lu.cfg.GetClusterInfo()
324
      nodes = lu.cfg.GetAllNodesInfo()
325
    else:
326
      cluster = NotImplemented
327
      nodes = NotImplemented
328

    
329
    if query.CQ_QUEUE_DRAINED in self.requested_data:
330
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
331
    else:
332
      drain_flag = NotImplemented
333

    
334
    if query.CQ_WATCHER_PAUSE in self.requested_data:
335
      master_node_uuid = lu.cfg.GetMasterNode()
336

    
337
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
338
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
339
                   lu.cfg.GetMasterNodeName())
340

    
341
      watcher_pause = result.payload
342
    else:
343
      watcher_pause = NotImplemented
344

    
345
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
346

    
347

    
348
class LUClusterQuery(NoHooksLU):
349
  """Query cluster configuration.
350

351
  """
352
  REQ_BGL = False
353

    
354
  def ExpandNames(self):
355
    self.needed_locks = {}
356

    
357
  def Exec(self, feedback_fn):
358
    """Return cluster config.
359

360
    """
361
    cluster = self.cfg.GetClusterInfo()
362
    os_hvp = {}
363

    
364
    # Filter just for enabled hypervisors
365
    for os_name, hv_dict in cluster.os_hvp.items():
366
      os_hvp[os_name] = {}
367
      for hv_name, hv_params in hv_dict.items():
368
        if hv_name in cluster.enabled_hypervisors:
369
          os_hvp[os_name][hv_name] = hv_params
370

    
371
    # Convert ip_family to ip_version
372
    primary_ip_version = constants.IP4_VERSION
373
    if cluster.primary_ip_family == netutils.IP6Address.family:
374
      primary_ip_version = constants.IP6_VERSION
375

    
376
    result = {
377
      "software_version": constants.RELEASE_VERSION,
378
      "protocol_version": constants.PROTOCOL_VERSION,
379
      "config_version": constants.CONFIG_VERSION,
380
      "os_api_version": max(constants.OS_API_VERSIONS),
381
      "export_version": constants.EXPORT_VERSION,
382
      "vcs_version": constants.VCS_VERSION,
383
      "architecture": runtime.GetArchInfo(),
384
      "name": cluster.cluster_name,
385
      "master": self.cfg.GetMasterNodeName(),
386
      "default_hypervisor": cluster.primary_hypervisor,
387
      "enabled_hypervisors": cluster.enabled_hypervisors,
388
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
389
                        for hypervisor_name in cluster.enabled_hypervisors]),
390
      "os_hvp": os_hvp,
391
      "beparams": cluster.beparams,
392
      "osparams": cluster.osparams,
393
      "ipolicy": cluster.ipolicy,
394
      "nicparams": cluster.nicparams,
395
      "ndparams": cluster.ndparams,
396
      "diskparams": cluster.diskparams,
397
      "candidate_pool_size": cluster.candidate_pool_size,
398
      "master_netdev": cluster.master_netdev,
399
      "master_netmask": cluster.master_netmask,
400
      "use_external_mip_script": cluster.use_external_mip_script,
401
      "volume_group_name": cluster.volume_group_name,
402
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
403
      "file_storage_dir": cluster.file_storage_dir,
404
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
405
      "maintain_node_health": cluster.maintain_node_health,
406
      "ctime": cluster.ctime,
407
      "mtime": cluster.mtime,
408
      "uuid": cluster.uuid,
409
      "tags": list(cluster.GetTags()),
410
      "uid_pool": cluster.uid_pool,
411
      "default_iallocator": cluster.default_iallocator,
412
      "default_iallocator_params": cluster.default_iallocator_params,
413
      "reserved_lvs": cluster.reserved_lvs,
414
      "primary_ip_version": primary_ip_version,
415
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
416
      "hidden_os": cluster.hidden_os,
417
      "blacklisted_os": cluster.blacklisted_os,
418
      "enabled_disk_templates": cluster.enabled_disk_templates,
419
      }
420

    
421
    return result
422

    
423

    
424
class LUClusterRedistConf(NoHooksLU):
425
  """Force the redistribution of cluster configuration.
426

427
  This is a very simple LU.
428

429
  """
430
  REQ_BGL = False
431

    
432
  def ExpandNames(self):
433
    self.needed_locks = {
434
      locking.LEVEL_NODE: locking.ALL_SET,
435
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
436
    }
437
    self.share_locks = ShareAll()
438

    
439
  def Exec(self, feedback_fn):
440
    """Redistribute the configuration.
441

442
    """
443
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
444
    RedistributeAncillaryFiles(self)
445

    
446

    
447
class LUClusterRename(LogicalUnit):
448
  """Rename the cluster.
449

450
  """
451
  HPATH = "cluster-rename"
452
  HTYPE = constants.HTYPE_CLUSTER
453

    
454
  def BuildHooksEnv(self):
455
    """Build hooks env.
456

457
    """
458
    return {
459
      "OP_TARGET": self.cfg.GetClusterName(),
460
      "NEW_NAME": self.op.name,
461
      }
462

    
463
  def BuildHooksNodes(self):
464
    """Build hooks nodes.
465

466
    """
467
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
468

    
469
  def CheckPrereq(self):
470
    """Verify that the passed name is a valid one.
471

472
    """
473
    hostname = netutils.GetHostname(name=self.op.name,
474
                                    family=self.cfg.GetPrimaryIPFamily())
475

    
476
    new_name = hostname.name
477
    self.ip = new_ip = hostname.ip
478
    old_name = self.cfg.GetClusterName()
479
    old_ip = self.cfg.GetMasterIP()
480
    if new_name == old_name and new_ip == old_ip:
481
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
482
                                 " cluster has changed",
483
                                 errors.ECODE_INVAL)
484
    if new_ip != old_ip:
485
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
486
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
487
                                   " reachable on the network" %
488
                                   new_ip, errors.ECODE_NOTUNIQUE)
489

    
490
    self.op.name = new_name
491

    
492
  def Exec(self, feedback_fn):
493
    """Rename the cluster.
494

495
    """
496
    clustername = self.op.name
497
    new_ip = self.ip
498

    
499
    # shutdown the master IP
500
    master_params = self.cfg.GetMasterNetworkParameters()
501
    ems = self.cfg.GetUseExternalMipScript()
502
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
503
                                                     master_params, ems)
504
    result.Raise("Could not disable the master role")
505

    
506
    try:
507
      cluster = self.cfg.GetClusterInfo()
508
      cluster.cluster_name = clustername
509
      cluster.master_ip = new_ip
510
      self.cfg.Update(cluster, feedback_fn)
511

    
512
      # update the known hosts file
513
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
514
      node_list = self.cfg.GetOnlineNodeList()
515
      try:
516
        node_list.remove(master_params.uuid)
517
      except ValueError:
518
        pass
519
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
520
    finally:
521
      master_params.ip = new_ip
522
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
523
                                                     master_params, ems)
524
      result.Warn("Could not re-enable the master role on the master,"
525
                  " please restart manually", self.LogWarning)
526

    
527
    return clustername
528

    
529

    
530
class LUClusterRepairDiskSizes(NoHooksLU):
531
  """Verifies the cluster disks sizes.
532

533
  """
534
  REQ_BGL = False
535

    
536
  def ExpandNames(self):
537
    if self.op.instances:
538
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
539
      # Not getting the node allocation lock as only a specific set of
540
      # instances (and their nodes) is going to be acquired
541
      self.needed_locks = {
542
        locking.LEVEL_NODE_RES: [],
543
        locking.LEVEL_INSTANCE: self.wanted_names,
544
        }
545
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
546
    else:
547
      self.wanted_names = None
548
      self.needed_locks = {
549
        locking.LEVEL_NODE_RES: locking.ALL_SET,
550
        locking.LEVEL_INSTANCE: locking.ALL_SET,
551

    
552
        # This opcode is acquires the node locks for all instances
553
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
554
        }
555

    
556
    self.share_locks = {
557
      locking.LEVEL_NODE_RES: 1,
558
      locking.LEVEL_INSTANCE: 0,
559
      locking.LEVEL_NODE_ALLOC: 1,
560
      }
561

    
562
  def DeclareLocks(self, level):
563
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
564
      self._LockInstancesNodes(primary_only=True, level=level)
565

    
566
  def CheckPrereq(self):
567
    """Check prerequisites.
568

569
    This only checks the optional instance list against the existing names.
570

571
    """
572
    if self.wanted_names is None:
573
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
574

    
575
    self.wanted_instances = \
576
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
577

    
578
  def _EnsureChildSizes(self, disk):
579
    """Ensure children of the disk have the needed disk size.
580

581
    This is valid mainly for DRBD8 and fixes an issue where the
582
    children have smaller disk size.
583

584
    @param disk: an L{ganeti.objects.Disk} object
585

586
    """
587
    if disk.dev_type == constants.DT_DRBD8:
588
      assert disk.children, "Empty children for DRBD8?"
589
      fchild = disk.children[0]
590
      mismatch = fchild.size < disk.size
591
      if mismatch:
592
        self.LogInfo("Child disk has size %d, parent %d, fixing",
593
                     fchild.size, disk.size)
594
        fchild.size = disk.size
595

    
596
      # and we recurse on this child only, not on the metadev
597
      return self._EnsureChildSizes(fchild) or mismatch
598
    else:
599
      return False
600

    
601
  def Exec(self, feedback_fn):
602
    """Verify the size of cluster disks.
603

604
    """
605
    # TODO: check child disks too
606
    # TODO: check differences in size between primary/secondary nodes
607
    per_node_disks = {}
608
    for instance in self.wanted_instances:
609
      pnode = instance.primary_node
610
      if pnode not in per_node_disks:
611
        per_node_disks[pnode] = []
612
      for idx, disk in enumerate(instance.disks):
613
        per_node_disks[pnode].append((instance, idx, disk))
614

    
615
    assert not (frozenset(per_node_disks.keys()) -
616
                self.owned_locks(locking.LEVEL_NODE_RES)), \
617
      "Not owning correct locks"
618
    assert not self.owned_locks(locking.LEVEL_NODE)
619

    
620
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
621
                                               per_node_disks.keys())
622

    
623
    changed = []
624
    for node_uuid, dskl in per_node_disks.items():
625
      if not dskl:
626
        # no disks on the node
627
        continue
628

    
629
      newl = [([v[2].Copy()], v[0]) for v in dskl]
630
      node_name = self.cfg.GetNodeName(node_uuid)
631
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
632
      if result.fail_msg:
633
        self.LogWarning("Failure in blockdev_getdimensions call to node"
634
                        " %s, ignoring", node_name)
635
        continue
636
      if len(result.payload) != len(dskl):
637
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
638
                        " result.payload=%s", node_name, len(dskl),
639
                        result.payload)
640
        self.LogWarning("Invalid result from node %s, ignoring node results",
641
                        node_name)
642
        continue
643
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
644
        if dimensions is None:
645
          self.LogWarning("Disk %d of instance %s did not return size"
646
                          " information, ignoring", idx, instance.name)
647
          continue
648
        if not isinstance(dimensions, (tuple, list)):
649
          self.LogWarning("Disk %d of instance %s did not return valid"
650
                          " dimension information, ignoring", idx,
651
                          instance.name)
652
          continue
653
        (size, spindles) = dimensions
654
        if not isinstance(size, (int, long)):
655
          self.LogWarning("Disk %d of instance %s did not return valid"
656
                          " size information, ignoring", idx, instance.name)
657
          continue
658
        size = size >> 20
659
        if size != disk.size:
660
          self.LogInfo("Disk %d of instance %s has mismatched size,"
661
                       " correcting: recorded %d, actual %d", idx,
662
                       instance.name, disk.size, size)
663
          disk.size = size
664
          self.cfg.Update(instance, feedback_fn)
665
          changed.append((instance.name, idx, "size", size))
666
        if es_flags[node_uuid]:
667
          if spindles is None:
668
            self.LogWarning("Disk %d of instance %s did not return valid"
669
                            " spindles information, ignoring", idx,
670
                            instance.name)
671
          elif disk.spindles is None or disk.spindles != spindles:
672
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
673
                         " correcting: recorded %s, actual %s",
674
                         idx, instance.name, disk.spindles, spindles)
675
            disk.spindles = spindles
676
            self.cfg.Update(instance, feedback_fn)
677
            changed.append((instance.name, idx, "spindles", disk.spindles))
678
        if self._EnsureChildSizes(disk):
679
          self.cfg.Update(instance, feedback_fn)
680
          changed.append((instance.name, idx, "size", disk.size))
681
    return changed
682

    
683

    
684
def _ValidateNetmask(cfg, netmask):
685
  """Checks if a netmask is valid.
686

687
  @type cfg: L{config.ConfigWriter}
688
  @param cfg: The cluster configuration
689
  @type netmask: int
690
  @param netmask: the netmask to be verified
691
  @raise errors.OpPrereqError: if the validation fails
692

693
  """
694
  ip_family = cfg.GetPrimaryIPFamily()
695
  try:
696
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
697
  except errors.ProgrammerError:
698
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
699
                               ip_family, errors.ECODE_INVAL)
700
  if not ipcls.ValidateNetmask(netmask):
701
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
702
                               (netmask), errors.ECODE_INVAL)
703

    
704

    
705
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
706
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
707
    file_disk_template):
708
  """Checks whether the given file-based storage directory is acceptable.
709

710
  Note: This function is public, because it is also used in bootstrap.py.
711

712
  @type logging_warn_fn: function
713
  @param logging_warn_fn: function which accepts a string and logs it
714
  @type file_storage_dir: string
715
  @param file_storage_dir: the directory to be used for file-based instances
716
  @type enabled_disk_templates: list of string
717
  @param enabled_disk_templates: the list of enabled disk templates
718
  @type file_disk_template: string
719
  @param file_disk_template: the file-based disk template for which the
720
      path should be checked
721

722
  """
723
  assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
724
            constants.ST_FILE, constants.ST_SHARED_FILE
725
         ))
726
  file_storage_enabled = file_disk_template in enabled_disk_templates
727
  if file_storage_dir is not None:
728
    if file_storage_dir == "":
729
      if file_storage_enabled:
730
        raise errors.OpPrereqError(
731
            "Unsetting the '%s' storage directory while having '%s' storage"
732
            " enabled is not permitted." %
733
            (file_disk_template, file_disk_template))
734
    else:
735
      if not file_storage_enabled:
736
        logging_warn_fn(
737
            "Specified a %s storage directory, although %s storage is not"
738
            " enabled." % (file_disk_template, file_disk_template))
739
  else:
740
    raise errors.ProgrammerError("Received %s storage dir with value"
741
                                 " 'None'." % file_disk_template)
742

    
743

    
744
def CheckFileStoragePathVsEnabledDiskTemplates(
745
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
746
  """Checks whether the given file storage directory is acceptable.
747

748
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
749

750
  """
751
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
752
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
753
      constants.DT_FILE)
754

    
755

    
756
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
757
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
758
  """Checks whether the given shared file storage directory is acceptable.
759

760
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
761

762
  """
763
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
764
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
765
      constants.DT_SHARED_FILE)
766

    
767

    
768
class LUClusterSetParams(LogicalUnit):
769
  """Change the parameters of the cluster.
770

771
  """
772
  HPATH = "cluster-modify"
773
  HTYPE = constants.HTYPE_CLUSTER
774
  REQ_BGL = False
775

    
776
  def CheckArguments(self):
777
    """Check parameters
778

779
    """
780
    if self.op.uid_pool:
781
      uidpool.CheckUidPool(self.op.uid_pool)
782

    
783
    if self.op.add_uids:
784
      uidpool.CheckUidPool(self.op.add_uids)
785

    
786
    if self.op.remove_uids:
787
      uidpool.CheckUidPool(self.op.remove_uids)
788

    
789
    if self.op.master_netmask is not None:
790
      _ValidateNetmask(self.cfg, self.op.master_netmask)
791

    
792
    if self.op.diskparams:
793
      for dt_params in self.op.diskparams.values():
794
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
795
      try:
796
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
797
        CheckDiskAccessModeValidity(self.op.diskparams)
798
      except errors.OpPrereqError, err:
799
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
800
                                   errors.ECODE_INVAL)
801

    
802
  def ExpandNames(self):
803
    # FIXME: in the future maybe other cluster params won't require checking on
804
    # all nodes to be modified.
805
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
806
    # resource locks the right thing, shouldn't it be the BGL instead?
807
    self.needed_locks = {
808
      locking.LEVEL_NODE: locking.ALL_SET,
809
      locking.LEVEL_INSTANCE: locking.ALL_SET,
810
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
811
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
812
    }
813
    self.share_locks = ShareAll()
814

    
815
  def BuildHooksEnv(self):
816
    """Build hooks env.
817

818
    """
819
    return {
820
      "OP_TARGET": self.cfg.GetClusterName(),
821
      "NEW_VG_NAME": self.op.vg_name,
822
      }
823

    
824
  def BuildHooksNodes(self):
825
    """Build hooks nodes.
826

827
    """
828
    mn = self.cfg.GetMasterNode()
829
    return ([mn], [mn])
830

    
831
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
832
                   new_enabled_disk_templates):
833
    """Check the consistency of the vg name on all nodes and in case it gets
834
       unset whether there are instances still using it.
835

836
    """
837
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
838
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
839
                                            new_enabled_disk_templates)
840
    current_vg_name = self.cfg.GetVGName()
841

    
842
    if self.op.vg_name == '':
843
      if lvm_is_enabled:
844
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
845
                                   " disk templates are or get enabled.")
846

    
847
    if self.op.vg_name is None:
848
      if current_vg_name is None and lvm_is_enabled:
849
        raise errors.OpPrereqError("Please specify a volume group when"
850
                                   " enabling lvm-based disk-templates.")
851

    
852
    if self.op.vg_name is not None and not self.op.vg_name:
853
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
854
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
855
                                   " instances exist", errors.ECODE_INVAL)
856

    
857
    if (self.op.vg_name is not None and lvm_is_enabled) or \
858
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
859
      self._CheckVgNameOnNodes(node_uuids)
860

    
861
  def _CheckVgNameOnNodes(self, node_uuids):
862
    """Check the status of the volume group on each node.
863

864
    """
865
    vglist = self.rpc.call_vg_list(node_uuids)
866
    for node_uuid in node_uuids:
867
      msg = vglist[node_uuid].fail_msg
868
      if msg:
869
        # ignoring down node
870
        self.LogWarning("Error while gathering data on node %s"
871
                        " (ignoring node): %s",
872
                        self.cfg.GetNodeName(node_uuid), msg)
873
        continue
874
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
875
                                            self.op.vg_name,
876
                                            constants.MIN_VG_SIZE)
877
      if vgstatus:
878
        raise errors.OpPrereqError("Error on node '%s': %s" %
879
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
880
                                   errors.ECODE_ENVIRON)
881

    
882
  @staticmethod
883
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
884
                                old_enabled_disk_templates):
885
    """Computes three sets of disk templates.
886

887
    @see: C{_GetDiskTemplateSets} for more details.
888

889
    """
890
    enabled_disk_templates = None
891
    new_enabled_disk_templates = []
892
    disabled_disk_templates = []
893
    if op_enabled_disk_templates:
894
      enabled_disk_templates = op_enabled_disk_templates
895
      new_enabled_disk_templates = \
896
        list(set(enabled_disk_templates)
897
             - set(old_enabled_disk_templates))
898
      disabled_disk_templates = \
899
        list(set(old_enabled_disk_templates)
900
             - set(enabled_disk_templates))
901
    else:
902
      enabled_disk_templates = old_enabled_disk_templates
903
    return (enabled_disk_templates, new_enabled_disk_templates,
904
            disabled_disk_templates)
905

    
906
  def _GetDiskTemplateSets(self, cluster):
907
    """Computes three sets of disk templates.
908

909
    The three sets are:
910
      - disk templates that will be enabled after this operation (no matter if
911
        they were enabled before or not)
912
      - disk templates that get enabled by this operation (thus haven't been
913
        enabled before.)
914
      - disk templates that get disabled by this operation
915

916
    """
917
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
918
                                          cluster.enabled_disk_templates)
919

    
920
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
921
    """Checks the ipolicy.
922

923
    @type cluster: C{objects.Cluster}
924
    @param cluster: the cluster's configuration
925
    @type enabled_disk_templates: list of string
926
    @param enabled_disk_templates: list of (possibly newly) enabled disk
927
      templates
928

929
    """
930
    # FIXME: write unit tests for this
931
    if self.op.ipolicy:
932
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
933
                                           group_policy=False)
934

    
935
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
936
                                  enabled_disk_templates)
937

    
938
      all_instances = self.cfg.GetAllInstancesInfo().values()
939
      violations = set()
940
      for group in self.cfg.GetAllNodeGroupsInfo().values():
941
        instances = frozenset([inst for inst in all_instances
942
                               if compat.any(nuuid in group.members
943
                                             for nuuid in inst.all_nodes)])
944
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
945
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
946
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
947
                                           self.cfg)
948
        if new:
949
          violations.update(new)
950

    
951
      if violations:
952
        self.LogWarning("After the ipolicy change the following instances"
953
                        " violate them: %s",
954
                        utils.CommaJoin(utils.NiceSort(violations)))
955
    else:
956
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
957
                                  enabled_disk_templates)
958

    
959
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
960
    """Checks whether the set DRBD helper actually exists on the nodes.
961

962
    @type drbd_helper: string
963
    @param drbd_helper: path of the drbd usermode helper binary
964
    @type node_uuids: list of strings
965
    @param node_uuids: list of node UUIDs to check for the helper
966

967
    """
968
    # checks given drbd helper on all nodes
969
    helpers = self.rpc.call_drbd_helper(node_uuids)
970
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
971
      if ninfo.offline:
972
        self.LogInfo("Not checking drbd helper on offline node %s",
973
                     ninfo.name)
974
        continue
975
      msg = helpers[ninfo.uuid].fail_msg
976
      if msg:
977
        raise errors.OpPrereqError("Error checking drbd helper on node"
978
                                   " '%s': %s" % (ninfo.name, msg),
979
                                   errors.ECODE_ENVIRON)
980
      node_helper = helpers[ninfo.uuid].payload
981
      if node_helper != drbd_helper:
982
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
983
                                   (ninfo.name, node_helper),
984
                                   errors.ECODE_ENVIRON)
985

    
986
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
987
    """Check the DRBD usermode helper.
988

989
    @type node_uuids: list of strings
990
    @param node_uuids: a list of nodes' UUIDs
991
    @type drbd_enabled: boolean
992
    @param drbd_enabled: whether DRBD will be enabled after this operation
993
      (no matter if it was disabled before or not)
994
    @type drbd_gets_enabled: boolen
995
    @param drbd_gets_enabled: true if DRBD was disabled before this
996
      operation, but will be enabled afterwards
997

998
    """
999
    if self.op.drbd_helper == '':
1000
      if drbd_enabled:
1001
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1002
                                   " DRBD is enabled.")
1003
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
1004
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1005
                                   " drbd-based instances exist",
1006
                                   errors.ECODE_INVAL)
1007

    
1008
    else:
1009
      if self.op.drbd_helper is not None and drbd_enabled:
1010
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
1011
      else:
1012
        if drbd_gets_enabled:
1013
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
1014
          if current_drbd_helper is not None:
1015
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
1016
          else:
1017
            raise errors.OpPrereqError("Cannot enable DRBD without a"
1018
                                       " DRBD usermode helper set.")
1019

    
1020
  def _CheckInstancesOfDisabledDiskTemplates(
1021
      self, disabled_disk_templates):
1022
    """Check whether we try to disable a disk template that is in use.
1023

1024
    @type disabled_disk_templates: list of string
1025
    @param disabled_disk_templates: list of disk templates that are going to
1026
      be disabled by this operation
1027

1028
    """
1029
    for disk_template in disabled_disk_templates:
1030
      if self.cfg.HasAnyDiskOfType(disk_template):
1031
        raise errors.OpPrereqError(
1032
            "Cannot disable disk template '%s', because there is at least one"
1033
            " instance using it." % disk_template)
1034

    
1035
  def CheckPrereq(self):
1036
    """Check prerequisites.
1037

1038
    This checks whether the given params don't conflict and
1039
    if the given volume group is valid.
1040

1041
    """
1042
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1043
    self.cluster = cluster = self.cfg.GetClusterInfo()
1044

    
1045
    vm_capable_node_uuids = [node.uuid
1046
                             for node in self.cfg.GetAllNodesInfo().values()
1047
                             if node.uuid in node_uuids and node.vm_capable]
1048

    
1049
    (enabled_disk_templates, new_enabled_disk_templates,
1050
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
1051
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
1052

    
1053
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
1054
                      new_enabled_disk_templates)
1055

    
1056
    if self.op.file_storage_dir is not None:
1057
      CheckFileStoragePathVsEnabledDiskTemplates(
1058
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
1059

    
1060
    if self.op.shared_file_storage_dir is not None:
1061
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
1062
          self.LogWarning, self.op.shared_file_storage_dir,
1063
          enabled_disk_templates)
1064

    
1065
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1066
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1067
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1068

    
1069
    # validate params changes
1070
    if self.op.beparams:
1071
      objects.UpgradeBeParams(self.op.beparams)
1072
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1073
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1074

    
1075
    if self.op.ndparams:
1076
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1077
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1078

    
1079
      # TODO: we need a more general way to handle resetting
1080
      # cluster-level parameters to default values
1081
      if self.new_ndparams["oob_program"] == "":
1082
        self.new_ndparams["oob_program"] = \
1083
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1084

    
1085
    if self.op.hv_state:
1086
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1087
                                           self.cluster.hv_state_static)
1088
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1089
                               for hv, values in new_hv_state.items())
1090

    
1091
    if self.op.disk_state:
1092
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1093
                                               self.cluster.disk_state_static)
1094
      self.new_disk_state = \
1095
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1096
                            for name, values in svalues.items()))
1097
             for storage, svalues in new_disk_state.items())
1098

    
1099
    self._CheckIpolicy(cluster, enabled_disk_templates)
1100

    
1101
    if self.op.nicparams:
1102
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1103
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1104
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1105
      nic_errors = []
1106

    
1107
      # check all instances for consistency
1108
      for instance in self.cfg.GetAllInstancesInfo().values():
1109
        for nic_idx, nic in enumerate(instance.nics):
1110
          params_copy = copy.deepcopy(nic.nicparams)
1111
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1112

    
1113
          # check parameter syntax
1114
          try:
1115
            objects.NIC.CheckParameterSyntax(params_filled)
1116
          except errors.ConfigurationError, err:
1117
            nic_errors.append("Instance %s, nic/%d: %s" %
1118
                              (instance.name, nic_idx, err))
1119

    
1120
          # if we're moving instances to routed, check that they have an ip
1121
          target_mode = params_filled[constants.NIC_MODE]
1122
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1123
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1124
                              " address" % (instance.name, nic_idx))
1125
      if nic_errors:
1126
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1127
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1128

    
1129
    # hypervisor list/parameters
1130
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1131
    if self.op.hvparams:
1132
      for hv_name, hv_dict in self.op.hvparams.items():
1133
        if hv_name not in self.new_hvparams:
1134
          self.new_hvparams[hv_name] = hv_dict
1135
        else:
1136
          self.new_hvparams[hv_name].update(hv_dict)
1137

    
1138
    # disk template parameters
1139
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1140
    if self.op.diskparams:
1141
      for dt_name, dt_params in self.op.diskparams.items():
1142
        if dt_name not in self.new_diskparams:
1143
          self.new_diskparams[dt_name] = dt_params
1144
        else:
1145
          self.new_diskparams[dt_name].update(dt_params)
1146
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1147

    
1148
    # os hypervisor parameters
1149
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1150
    if self.op.os_hvp:
1151
      for os_name, hvs in self.op.os_hvp.items():
1152
        if os_name not in self.new_os_hvp:
1153
          self.new_os_hvp[os_name] = hvs
1154
        else:
1155
          for hv_name, hv_dict in hvs.items():
1156
            if hv_dict is None:
1157
              # Delete if it exists
1158
              self.new_os_hvp[os_name].pop(hv_name, None)
1159
            elif hv_name not in self.new_os_hvp[os_name]:
1160
              self.new_os_hvp[os_name][hv_name] = hv_dict
1161
            else:
1162
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1163

    
1164
    # os parameters
1165
    self.new_osp = objects.FillDict(cluster.osparams, {})
1166
    if self.op.osparams:
1167
      for os_name, osp in self.op.osparams.items():
1168
        if os_name not in self.new_osp:
1169
          self.new_osp[os_name] = {}
1170

    
1171
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1172
                                                 use_none=True)
1173

    
1174
        if not self.new_osp[os_name]:
1175
          # we removed all parameters
1176
          del self.new_osp[os_name]
1177
        else:
1178
          # check the parameter validity (remote check)
1179
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1180
                        os_name, self.new_osp[os_name])
1181

    
1182
    # changes to the hypervisor list
1183
    if self.op.enabled_hypervisors is not None:
1184
      self.hv_list = self.op.enabled_hypervisors
1185
      for hv in self.hv_list:
1186
        # if the hypervisor doesn't already exist in the cluster
1187
        # hvparams, we initialize it to empty, and then (in both
1188
        # cases) we make sure to fill the defaults, as we might not
1189
        # have a complete defaults list if the hypervisor wasn't
1190
        # enabled before
1191
        if hv not in new_hvp:
1192
          new_hvp[hv] = {}
1193
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1194
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1195
    else:
1196
      self.hv_list = cluster.enabled_hypervisors
1197

    
1198
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1199
      # either the enabled list has changed, or the parameters have, validate
1200
      for hv_name, hv_params in self.new_hvparams.items():
1201
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1202
            (self.op.enabled_hypervisors and
1203
             hv_name in self.op.enabled_hypervisors)):
1204
          # either this is a new hypervisor, or its parameters have changed
1205
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1206
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1207
          hv_class.CheckParameterSyntax(hv_params)
1208
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1209

    
1210
    self._CheckDiskTemplateConsistency()
1211

    
1212
    if self.op.os_hvp:
1213
      # no need to check any newly-enabled hypervisors, since the
1214
      # defaults have already been checked in the above code-block
1215
      for os_name, os_hvp in self.new_os_hvp.items():
1216
        for hv_name, hv_params in os_hvp.items():
1217
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1218
          # we need to fill in the new os_hvp on top of the actual hv_p
1219
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1220
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1221
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1222
          hv_class.CheckParameterSyntax(new_osp)
1223
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1224

    
1225
    if self.op.default_iallocator:
1226
      alloc_script = utils.FindFile(self.op.default_iallocator,
1227
                                    constants.IALLOCATOR_SEARCH_PATH,
1228
                                    os.path.isfile)
1229
      if alloc_script is None:
1230
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1231
                                   " specified" % self.op.default_iallocator,
1232
                                   errors.ECODE_INVAL)
1233

    
1234
  def _CheckDiskTemplateConsistency(self):
1235
    """Check whether the disk templates that are going to be disabled
1236
       are still in use by some instances.
1237

1238
    """
1239
    if self.op.enabled_disk_templates:
1240
      cluster = self.cfg.GetClusterInfo()
1241
      instances = self.cfg.GetAllInstancesInfo()
1242

    
1243
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1244
        - set(self.op.enabled_disk_templates)
1245
      for instance in instances.itervalues():
1246
        if instance.disk_template in disk_templates_to_remove:
1247
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1248
                                     " because instance '%s' is using it." %
1249
                                     (instance.disk_template, instance.name))
1250

    
1251
  def _SetVgName(self, feedback_fn):
1252
    """Determines and sets the new volume group name.
1253

1254
    """
1255
    if self.op.vg_name is not None:
1256
      new_volume = self.op.vg_name
1257
      if not new_volume:
1258
        new_volume = None
1259
      if new_volume != self.cfg.GetVGName():
1260
        self.cfg.SetVGName(new_volume)
1261
      else:
1262
        feedback_fn("Cluster LVM configuration already in desired"
1263
                    " state, not changing")
1264

    
1265
  def _SetFileStorageDir(self, feedback_fn):
1266
    """Set the file storage directory.
1267

1268
    """
1269
    if self.op.file_storage_dir is not None:
1270
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1271
        feedback_fn("Global file storage dir already set to value '%s'"
1272
                    % self.cluster.file_storage_dir)
1273
      else:
1274
        self.cluster.file_storage_dir = self.op.file_storage_dir
1275

    
1276
  def _SetDrbdHelper(self, feedback_fn):
1277
    """Set the DRBD usermode helper.
1278

1279
    """
1280
    if self.op.drbd_helper is not None:
1281
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1282
        feedback_fn("Note that you specified a drbd user helper, but did not"
1283
                    " enable the drbd disk template.")
1284
      new_helper = self.op.drbd_helper
1285
      if not new_helper:
1286
        new_helper = None
1287
      if new_helper != self.cfg.GetDRBDHelper():
1288
        self.cfg.SetDRBDHelper(new_helper)
1289
      else:
1290
        feedback_fn("Cluster DRBD helper already in desired state,"
1291
                    " not changing")
1292

    
1293
  def Exec(self, feedback_fn):
1294
    """Change the parameters of the cluster.
1295

1296
    """
1297
    if self.op.enabled_disk_templates:
1298
      self.cluster.enabled_disk_templates = \
1299
        list(self.op.enabled_disk_templates)
1300

    
1301
    self._SetVgName(feedback_fn)
1302
    self._SetFileStorageDir(feedback_fn)
1303
    self._SetDrbdHelper(feedback_fn)
1304

    
1305
    if self.op.hvparams:
1306
      self.cluster.hvparams = self.new_hvparams
1307
    if self.op.os_hvp:
1308
      self.cluster.os_hvp = self.new_os_hvp
1309
    if self.op.enabled_hypervisors is not None:
1310
      self.cluster.hvparams = self.new_hvparams
1311
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1312
    if self.op.beparams:
1313
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1314
    if self.op.nicparams:
1315
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1316
    if self.op.ipolicy:
1317
      self.cluster.ipolicy = self.new_ipolicy
1318
    if self.op.osparams:
1319
      self.cluster.osparams = self.new_osp
1320
    if self.op.ndparams:
1321
      self.cluster.ndparams = self.new_ndparams
1322
    if self.op.diskparams:
1323
      self.cluster.diskparams = self.new_diskparams
1324
    if self.op.hv_state:
1325
      self.cluster.hv_state_static = self.new_hv_state
1326
    if self.op.disk_state:
1327
      self.cluster.disk_state_static = self.new_disk_state
1328

    
1329
    if self.op.candidate_pool_size is not None:
1330
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1331
      # we need to update the pool size here, otherwise the save will fail
1332
      AdjustCandidatePool(self, [], feedback_fn)
1333

    
1334
    if self.op.max_running_jobs is not None:
1335
      self.cluster.max_running_jobs = self.op.max_running_jobs
1336

    
1337
    if self.op.maintain_node_health is not None:
1338
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1339
        feedback_fn("Note: CONFD was disabled at build time, node health"
1340
                    " maintenance is not useful (still enabling it)")
1341
      self.cluster.maintain_node_health = self.op.maintain_node_health
1342

    
1343
    if self.op.modify_etc_hosts is not None:
1344
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1345

    
1346
    if self.op.prealloc_wipe_disks is not None:
1347
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1348

    
1349
    if self.op.add_uids is not None:
1350
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1351

    
1352
    if self.op.remove_uids is not None:
1353
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1354

    
1355
    if self.op.uid_pool is not None:
1356
      self.cluster.uid_pool = self.op.uid_pool
1357

    
1358
    if self.op.default_iallocator is not None:
1359
      self.cluster.default_iallocator = self.op.default_iallocator
1360

    
1361
    if self.op.default_iallocator_params is not None:
1362
      self.cluster.default_iallocator_params = self.op.default_iallocator_params
1363

    
1364
    if self.op.reserved_lvs is not None:
1365
      self.cluster.reserved_lvs = self.op.reserved_lvs
1366

    
1367
    if self.op.use_external_mip_script is not None:
1368
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1369

    
1370
    def helper_os(aname, mods, desc):
1371
      desc += " OS list"
1372
      lst = getattr(self.cluster, aname)
1373
      for key, val in mods:
1374
        if key == constants.DDM_ADD:
1375
          if val in lst:
1376
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1377
          else:
1378
            lst.append(val)
1379
        elif key == constants.DDM_REMOVE:
1380
          if val in lst:
1381
            lst.remove(val)
1382
          else:
1383
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1384
        else:
1385
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1386

    
1387
    if self.op.hidden_os:
1388
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1389

    
1390
    if self.op.blacklisted_os:
1391
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1392

    
1393
    if self.op.master_netdev:
1394
      master_params = self.cfg.GetMasterNetworkParameters()
1395
      ems = self.cfg.GetUseExternalMipScript()
1396
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1397
                  self.cluster.master_netdev)
1398
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1399
                                                       master_params, ems)
1400
      if not self.op.force:
1401
        result.Raise("Could not disable the master ip")
1402
      else:
1403
        if result.fail_msg:
1404
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1405
                 result.fail_msg)
1406
          feedback_fn(msg)
1407
      feedback_fn("Changing master_netdev from %s to %s" %
1408
                  (master_params.netdev, self.op.master_netdev))
1409
      self.cluster.master_netdev = self.op.master_netdev
1410

    
1411
    if self.op.master_netmask:
1412
      master_params = self.cfg.GetMasterNetworkParameters()
1413
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1414
      result = self.rpc.call_node_change_master_netmask(
1415
                 master_params.uuid, master_params.netmask,
1416
                 self.op.master_netmask, master_params.ip,
1417
                 master_params.netdev)
1418
      result.Warn("Could not change the master IP netmask", feedback_fn)
1419
      self.cluster.master_netmask = self.op.master_netmask
1420

    
1421
    self.cfg.Update(self.cluster, feedback_fn)
1422

    
1423
    if self.op.master_netdev:
1424
      master_params = self.cfg.GetMasterNetworkParameters()
1425
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1426
                  self.op.master_netdev)
1427
      ems = self.cfg.GetUseExternalMipScript()
1428
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1429
                                                     master_params, ems)
1430
      result.Warn("Could not re-enable the master ip on the master,"
1431
                  " please restart manually", self.LogWarning)
1432

    
1433

    
1434
class LUClusterVerify(NoHooksLU):
1435
  """Submits all jobs necessary to verify the cluster.
1436

1437
  """
1438
  REQ_BGL = False
1439

    
1440
  def ExpandNames(self):
1441
    self.needed_locks = {}
1442

    
1443
  def Exec(self, feedback_fn):
1444
    jobs = []
1445

    
1446
    if self.op.group_name:
1447
      groups = [self.op.group_name]
1448
      depends_fn = lambda: None
1449
    else:
1450
      groups = self.cfg.GetNodeGroupList()
1451

    
1452
      # Verify global configuration
1453
      jobs.append([
1454
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1455
        ])
1456

    
1457
      # Always depend on global verification
1458
      depends_fn = lambda: [(-len(jobs), [])]
1459

    
1460
    jobs.extend(
1461
      [opcodes.OpClusterVerifyGroup(group_name=group,
1462
                                    ignore_errors=self.op.ignore_errors,
1463
                                    depends=depends_fn())]
1464
      for group in groups)
1465

    
1466
    # Fix up all parameters
1467
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1468
      op.debug_simulate_errors = self.op.debug_simulate_errors
1469
      op.verbose = self.op.verbose
1470
      op.error_codes = self.op.error_codes
1471
      try:
1472
        op.skip_checks = self.op.skip_checks
1473
      except AttributeError:
1474
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1475

    
1476
    return ResultWithJobs(jobs)
1477

    
1478

    
1479
class _VerifyErrors(object):
1480
  """Mix-in for cluster/group verify LUs.
1481

1482
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1483
  self.op and self._feedback_fn to be available.)
1484

1485
  """
1486

    
1487
  ETYPE_FIELD = "code"
1488
  ETYPE_ERROR = constants.CV_ERROR
1489
  ETYPE_WARNING = constants.CV_WARNING
1490

    
1491
  def _Error(self, ecode, item, msg, *args, **kwargs):
1492
    """Format an error message.
1493

1494
    Based on the opcode's error_codes parameter, either format a
1495
    parseable error code, or a simpler error string.
1496

1497
    This must be called only from Exec and functions called from Exec.
1498

1499
    """
1500
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1501
    itype, etxt, _ = ecode
1502
    # If the error code is in the list of ignored errors, demote the error to a
1503
    # warning
1504
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1505
      ltype = self.ETYPE_WARNING
1506
    # first complete the msg
1507
    if args:
1508
      msg = msg % args
1509
    # then format the whole message
1510
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1511
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1512
    else:
1513
      if item:
1514
        item = " " + item
1515
      else:
1516
        item = ""
1517
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1518
    # and finally report it via the feedback_fn
1519
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1520
    # do not mark the operation as failed for WARN cases only
1521
    if ltype == self.ETYPE_ERROR:
1522
      self.bad = True
1523

    
1524
  def _ErrorIf(self, cond, *args, **kwargs):
1525
    """Log an error message if the passed condition is True.
1526

1527
    """
1528
    if (bool(cond)
1529
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1530
      self._Error(*args, **kwargs)
1531

    
1532

    
1533
def _GetAllHypervisorParameters(cluster, instances):
1534
  """Compute the set of all hypervisor parameters.
1535

1536
  @type cluster: L{objects.Cluster}
1537
  @param cluster: the cluster object
1538
  @param instances: list of L{objects.Instance}
1539
  @param instances: additional instances from which to obtain parameters
1540
  @rtype: list of (origin, hypervisor, parameters)
1541
  @return: a list with all parameters found, indicating the hypervisor they
1542
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1543

1544
  """
1545
  hvp_data = []
1546

    
1547
  for hv_name in cluster.enabled_hypervisors:
1548
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1549

    
1550
  for os_name, os_hvp in cluster.os_hvp.items():
1551
    for hv_name, hv_params in os_hvp.items():
1552
      if hv_params:
1553
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1554
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1555

    
1556
  # TODO: collapse identical parameter values in a single one
1557
  for instance in instances:
1558
    if instance.hvparams:
1559
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1560
                       cluster.FillHV(instance)))
1561

    
1562
  return hvp_data
1563

    
1564

    
1565
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1566
  """Verifies the cluster config.
1567

1568
  """
1569
  REQ_BGL = False
1570

    
1571
  def _VerifyHVP(self, hvp_data):
1572
    """Verifies locally the syntax of the hypervisor parameters.
1573

1574
    """
1575
    for item, hv_name, hv_params in hvp_data:
1576
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1577
             (item, hv_name))
1578
      try:
1579
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1580
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1581
        hv_class.CheckParameterSyntax(hv_params)
1582
      except errors.GenericError, err:
1583
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1584

    
1585
  def ExpandNames(self):
1586
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1587
    self.share_locks = ShareAll()
1588

    
1589
  def CheckPrereq(self):
1590
    """Check prerequisites.
1591

1592
    """
1593
    # Retrieve all information
1594
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1595
    self.all_node_info = self.cfg.GetAllNodesInfo()
1596
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1597

    
1598
  def Exec(self, feedback_fn):
1599
    """Verify integrity of cluster, performing various test on nodes.
1600

1601
    """
1602
    self.bad = False
1603
    self._feedback_fn = feedback_fn
1604

    
1605
    feedback_fn("* Verifying cluster config")
1606

    
1607
    for msg in self.cfg.VerifyConfig():
1608
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1609

    
1610
    feedback_fn("* Verifying cluster certificate files")
1611

    
1612
    for cert_filename in pathutils.ALL_CERT_FILES:
1613
      (errcode, msg) = utils.VerifyCertificate(cert_filename)
1614
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1615

    
1616
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1617
                                    pathutils.NODED_CERT_FILE),
1618
                  constants.CV_ECLUSTERCERT,
1619
                  None,
1620
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1621
                    constants.LUXID_USER + " user")
1622

    
1623
    feedback_fn("* Verifying hypervisor parameters")
1624

    
1625
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1626
                                                self.all_inst_info.values()))
1627

    
1628
    feedback_fn("* Verifying all nodes belong to an existing group")
1629

    
1630
    # We do this verification here because, should this bogus circumstance
1631
    # occur, it would never be caught by VerifyGroup, which only acts on
1632
    # nodes/instances reachable from existing node groups.
1633

    
1634
    dangling_nodes = set(node for node in self.all_node_info.values()
1635
                         if node.group not in self.all_group_info)
1636

    
1637
    dangling_instances = {}
1638
    no_node_instances = []
1639

    
1640
    for inst in self.all_inst_info.values():
1641
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1642
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1643
      elif inst.primary_node not in self.all_node_info:
1644
        no_node_instances.append(inst)
1645

    
1646
    pretty_dangling = [
1647
        "%s (%s)" %
1648
        (node.name,
1649
         utils.CommaJoin(inst.name for
1650
                         inst in dangling_instances.get(node.uuid, [])))
1651
        for node in dangling_nodes]
1652

    
1653
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1654
                  None,
1655
                  "the following nodes (and their instances) belong to a non"
1656
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1657

    
1658
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1659
                  None,
1660
                  "the following instances have a non-existing primary-node:"
1661
                  " %s", utils.CommaJoin(inst.name for
1662
                                         inst in no_node_instances))
1663

    
1664
    return not self.bad
1665

    
1666

    
1667
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1668
  """Verifies the status of a node group.
1669

1670
  """
1671
  HPATH = "cluster-verify"
1672
  HTYPE = constants.HTYPE_CLUSTER
1673
  REQ_BGL = False
1674

    
1675
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1676

    
1677
  class NodeImage(object):
1678
    """A class representing the logical and physical status of a node.
1679

1680
    @type uuid: string
1681
    @ivar uuid: the node UUID to which this object refers
1682
    @ivar volumes: a structure as returned from
1683
        L{ganeti.backend.GetVolumeList} (runtime)
1684
    @ivar instances: a list of running instances (runtime)
1685
    @ivar pinst: list of configured primary instances (config)
1686
    @ivar sinst: list of configured secondary instances (config)
1687
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1688
        instances for which this node is secondary (config)
1689
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1690
    @ivar dfree: free disk, as reported by the node (runtime)
1691
    @ivar offline: the offline status (config)
1692
    @type rpc_fail: boolean
1693
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1694
        not whether the individual keys were correct) (runtime)
1695
    @type lvm_fail: boolean
1696
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1697
    @type hyp_fail: boolean
1698
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1699
    @type ghost: boolean
1700
    @ivar ghost: whether this is a known node or not (config)
1701
    @type os_fail: boolean
1702
    @ivar os_fail: whether the RPC call didn't return valid OS data
1703
    @type oslist: list
1704
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1705
    @type vm_capable: boolean
1706
    @ivar vm_capable: whether the node can host instances
1707
    @type pv_min: float
1708
    @ivar pv_min: size in MiB of the smallest PVs
1709
    @type pv_max: float
1710
    @ivar pv_max: size in MiB of the biggest PVs
1711

1712
    """
1713
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1714
      self.uuid = uuid
1715
      self.volumes = {}
1716
      self.instances = []
1717
      self.pinst = []
1718
      self.sinst = []
1719
      self.sbp = {}
1720
      self.mfree = 0
1721
      self.dfree = 0
1722
      self.offline = offline
1723
      self.vm_capable = vm_capable
1724
      self.rpc_fail = False
1725
      self.lvm_fail = False
1726
      self.hyp_fail = False
1727
      self.ghost = False
1728
      self.os_fail = False
1729
      self.oslist = {}
1730
      self.pv_min = None
1731
      self.pv_max = None
1732

    
1733
  def ExpandNames(self):
1734
    # This raises errors.OpPrereqError on its own:
1735
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1736

    
1737
    # Get instances in node group; this is unsafe and needs verification later
1738
    inst_uuids = \
1739
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1740

    
1741
    self.needed_locks = {
1742
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1743
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1744
      locking.LEVEL_NODE: [],
1745

    
1746
      # This opcode is run by watcher every five minutes and acquires all nodes
1747
      # for a group. It doesn't run for a long time, so it's better to acquire
1748
      # the node allocation lock as well.
1749
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1750
      }
1751

    
1752
    self.share_locks = ShareAll()
1753

    
1754
  def DeclareLocks(self, level):
1755
    if level == locking.LEVEL_NODE:
1756
      # Get members of node group; this is unsafe and needs verification later
1757
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1758

    
1759
      # In Exec(), we warn about mirrored instances that have primary and
1760
      # secondary living in separate node groups. To fully verify that
1761
      # volumes for these instances are healthy, we will need to do an
1762
      # extra call to their secondaries. We ensure here those nodes will
1763
      # be locked.
1764
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1765
        # Important: access only the instances whose lock is owned
1766
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1767
        if instance.disk_template in constants.DTS_INT_MIRROR:
1768
          nodes.update(instance.secondary_nodes)
1769

    
1770
      self.needed_locks[locking.LEVEL_NODE] = nodes
1771

    
1772
  def CheckPrereq(self):
1773
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1774
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1775

    
1776
    group_node_uuids = set(self.group_info.members)
1777
    group_inst_uuids = \
1778
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1779

    
1780
    unlocked_node_uuids = \
1781
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1782

    
1783
    unlocked_inst_uuids = \
1784
        group_inst_uuids.difference(
1785
          [self.cfg.GetInstanceInfoByName(name).uuid
1786
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1787

    
1788
    if unlocked_node_uuids:
1789
      raise errors.OpPrereqError(
1790
        "Missing lock for nodes: %s" %
1791
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1792
        errors.ECODE_STATE)
1793

    
1794
    if unlocked_inst_uuids:
1795
      raise errors.OpPrereqError(
1796
        "Missing lock for instances: %s" %
1797
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1798
        errors.ECODE_STATE)
1799

    
1800
    self.all_node_info = self.cfg.GetAllNodesInfo()
1801
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1802

    
1803
    self.my_node_uuids = group_node_uuids
1804
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1805
                             for node_uuid in group_node_uuids)
1806

    
1807
    self.my_inst_uuids = group_inst_uuids
1808
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1809
                             for inst_uuid in group_inst_uuids)
1810

    
1811
    # We detect here the nodes that will need the extra RPC calls for verifying
1812
    # split LV volumes; they should be locked.
1813
    extra_lv_nodes = set()
1814

    
1815
    for inst in self.my_inst_info.values():
1816
      if inst.disk_template in constants.DTS_INT_MIRROR:
1817
        for nuuid in inst.all_nodes:
1818
          if self.all_node_info[nuuid].group != self.group_uuid:
1819
            extra_lv_nodes.add(nuuid)
1820

    
1821
    unlocked_lv_nodes = \
1822
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1823

    
1824
    if unlocked_lv_nodes:
1825
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1826
                                 utils.CommaJoin(unlocked_lv_nodes),
1827
                                 errors.ECODE_STATE)
1828
    self.extra_lv_nodes = list(extra_lv_nodes)
1829

    
1830
  def _VerifyNode(self, ninfo, nresult):
1831
    """Perform some basic validation on data returned from a node.
1832

1833
      - check the result data structure is well formed and has all the
1834
        mandatory fields
1835
      - check ganeti version
1836

1837
    @type ninfo: L{objects.Node}
1838
    @param ninfo: the node to check
1839
    @param nresult: the results from the node
1840
    @rtype: boolean
1841
    @return: whether overall this call was successful (and we can expect
1842
         reasonable values in the respose)
1843

1844
    """
1845
    # main result, nresult should be a non-empty dict
1846
    test = not nresult or not isinstance(nresult, dict)
1847
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1848
                  "unable to verify node: no data returned")
1849
    if test:
1850
      return False
1851

    
1852
    # compares ganeti version
1853
    local_version = constants.PROTOCOL_VERSION
1854
    remote_version = nresult.get("version", None)
1855
    test = not (remote_version and
1856
                isinstance(remote_version, (list, tuple)) and
1857
                len(remote_version) == 2)
1858
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1859
                  "connection to node returned invalid data")
1860
    if test:
1861
      return False
1862

    
1863
    test = local_version != remote_version[0]
1864
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1865
                  "incompatible protocol versions: master %s,"
1866
                  " node %s", local_version, remote_version[0])
1867
    if test:
1868
      return False
1869

    
1870
    # node seems compatible, we can actually try to look into its results
1871

    
1872
    # full package version
1873
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1874
                  constants.CV_ENODEVERSION, ninfo.name,
1875
                  "software version mismatch: master %s, node %s",
1876
                  constants.RELEASE_VERSION, remote_version[1],
1877
                  code=self.ETYPE_WARNING)
1878

    
1879
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1880
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1881
      for hv_name, hv_result in hyp_result.iteritems():
1882
        test = hv_result is not None
1883
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1884
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1885

    
1886
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1887
    if ninfo.vm_capable and isinstance(hvp_result, list):
1888
      for item, hv_name, hv_result in hvp_result:
1889
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1890
                      "hypervisor %s parameter verify failure (source %s): %s",
1891
                      hv_name, item, hv_result)
1892

    
1893
    test = nresult.get(constants.NV_NODESETUP,
1894
                       ["Missing NODESETUP results"])
1895
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1896
                  "node setup error: %s", "; ".join(test))
1897

    
1898
    return True
1899

    
1900
  def _VerifyNodeTime(self, ninfo, nresult,
1901
                      nvinfo_starttime, nvinfo_endtime):
1902
    """Check the node time.
1903

1904
    @type ninfo: L{objects.Node}
1905
    @param ninfo: the node to check
1906
    @param nresult: the remote results for the node
1907
    @param nvinfo_starttime: the start time of the RPC call
1908
    @param nvinfo_endtime: the end time of the RPC call
1909

1910
    """
1911
    ntime = nresult.get(constants.NV_TIME, None)
1912
    try:
1913
      ntime_merged = utils.MergeTime(ntime)
1914
    except (ValueError, TypeError):
1915
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1916
                    "Node returned invalid time")
1917
      return
1918

    
1919
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1920
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1921
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1922
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1923
    else:
1924
      ntime_diff = None
1925

    
1926
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1927
                  "Node time diverges by at least %s from master node time",
1928
                  ntime_diff)
1929

    
1930
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1931
    """Check the node LVM results and update info for cross-node checks.
1932

1933
    @type ninfo: L{objects.Node}
1934
    @param ninfo: the node to check
1935
    @param nresult: the remote results for the node
1936
    @param vg_name: the configured VG name
1937
    @type nimg: L{NodeImage}
1938
    @param nimg: node image
1939

1940
    """
1941
    if vg_name is None:
1942
      return
1943

    
1944
    # checks vg existence and size > 20G
1945
    vglist = nresult.get(constants.NV_VGLIST, None)
1946
    test = not vglist
1947
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1948
                  "unable to check volume groups")
1949
    if not test:
1950
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1951
                                            constants.MIN_VG_SIZE)
1952
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1953

    
1954
    # Check PVs
1955
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1956
    for em in errmsgs:
1957
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1958
    if pvminmax is not None:
1959
      (nimg.pv_min, nimg.pv_max) = pvminmax
1960

    
1961
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1962
    """Check cross-node DRBD version consistency.
1963

1964
    @type node_verify_infos: dict
1965
    @param node_verify_infos: infos about nodes as returned from the
1966
      node_verify call.
1967

1968
    """
1969
    node_versions = {}
1970
    for node_uuid, ndata in node_verify_infos.items():
1971
      nresult = ndata.payload
1972
      if nresult:
1973
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1974
        node_versions[node_uuid] = version
1975

    
1976
    if len(set(node_versions.values())) > 1:
1977
      for node_uuid, version in sorted(node_versions.items()):
1978
        msg = "DRBD version mismatch: %s" % version
1979
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1980
                    code=self.ETYPE_WARNING)
1981

    
1982
  def _VerifyGroupLVM(self, node_image, vg_name):
1983
    """Check cross-node consistency in LVM.
1984

1985
    @type node_image: dict
1986
    @param node_image: info about nodes, mapping from node to names to
1987
      L{NodeImage} objects
1988
    @param vg_name: the configured VG name
1989

1990
    """
1991
    if vg_name is None:
1992
      return
1993

    
1994
    # Only exclusive storage needs this kind of checks
1995
    if not self._exclusive_storage:
1996
      return
1997

    
1998
    # exclusive_storage wants all PVs to have the same size (approximately),
1999
    # if the smallest and the biggest ones are okay, everything is fine.
2000
    # pv_min is None iff pv_max is None
2001
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
2002
    if not vals:
2003
      return
2004
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
2005
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
2006
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
2007
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
2008
                  "PV sizes differ too much in the group; smallest (%s MB) is"
2009
                  " on %s, biggest (%s MB) is on %s",
2010
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
2011
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
2012

    
2013
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
2014
    """Check the node bridges.
2015

2016
    @type ninfo: L{objects.Node}
2017
    @param ninfo: the node to check
2018
    @param nresult: the remote results for the node
2019
    @param bridges: the expected list of bridges
2020

2021
    """
2022
    if not bridges:
2023
      return
2024

    
2025
    missing = nresult.get(constants.NV_BRIDGES, None)
2026
    test = not isinstance(missing, list)
2027
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2028
                  "did not return valid bridge information")
2029
    if not test:
2030
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
2031
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
2032

    
2033
  def _VerifyNodeUserScripts(self, ninfo, nresult):
2034
    """Check the results of user scripts presence and executability on the node
2035

2036
    @type ninfo: L{objects.Node}
2037
    @param ninfo: the node to check
2038
    @param nresult: the remote results for the node
2039

2040
    """
2041
    test = not constants.NV_USERSCRIPTS in nresult
2042
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2043
                  "did not return user scripts information")
2044

    
2045
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2046
    if not test:
2047
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2048
                    "user scripts not present or not executable: %s" %
2049
                    utils.CommaJoin(sorted(broken_scripts)))
2050

    
2051
  def _VerifyNodeNetwork(self, ninfo, nresult):
2052
    """Check the node network connectivity results.
2053

2054
    @type ninfo: L{objects.Node}
2055
    @param ninfo: the node to check
2056
    @param nresult: the remote results for the node
2057

2058
    """
2059
    test = constants.NV_NODELIST not in nresult
2060
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2061
                  "node hasn't returned node ssh connectivity data")
2062
    if not test:
2063
      if nresult[constants.NV_NODELIST]:
2064
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2065
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2066
                        "ssh communication with node '%s': %s", a_node, a_msg)
2067

    
2068
    test = constants.NV_NODENETTEST not in nresult
2069
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2070
                  "node hasn't returned node tcp connectivity data")
2071
    if not test:
2072
      if nresult[constants.NV_NODENETTEST]:
2073
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2074
        for anode in nlist:
2075
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2076
                        "tcp communication with node '%s': %s",
2077
                        anode, nresult[constants.NV_NODENETTEST][anode])
2078

    
2079
    test = constants.NV_MASTERIP not in nresult
2080
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2081
                  "node hasn't returned node master IP reachability data")
2082
    if not test:
2083
      if not nresult[constants.NV_MASTERIP]:
2084
        if ninfo.uuid == self.master_node:
2085
          msg = "the master node cannot reach the master IP (not configured?)"
2086
        else:
2087
          msg = "cannot reach the master IP"
2088
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2089

    
2090
  def _VerifyInstance(self, instance, node_image, diskstatus):
2091
    """Verify an instance.
2092

2093
    This function checks to see if the required block devices are
2094
    available on the instance's node, and that the nodes are in the correct
2095
    state.
2096

2097
    """
2098
    pnode_uuid = instance.primary_node
2099
    pnode_img = node_image[pnode_uuid]
2100
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2101

    
2102
    node_vol_should = {}
2103
    instance.MapLVsByNode(node_vol_should)
2104

    
2105
    cluster = self.cfg.GetClusterInfo()
2106
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2107
                                                            self.group_info)
2108
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2109
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2110
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2111

    
2112
    for node_uuid in node_vol_should:
2113
      n_img = node_image[node_uuid]
2114
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2115
        # ignore missing volumes on offline or broken nodes
2116
        continue
2117
      for volume in node_vol_should[node_uuid]:
2118
        test = volume not in n_img.volumes
2119
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2120
                      "volume %s missing on node %s", volume,
2121
                      self.cfg.GetNodeName(node_uuid))
2122

    
2123
    if instance.admin_state == constants.ADMINST_UP:
2124
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2125
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2126
                    "instance not running on its primary node %s",
2127
                     self.cfg.GetNodeName(pnode_uuid))
2128
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2129
                    instance.name, "instance is marked as running and lives on"
2130
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2131

    
2132
    diskdata = [(nname, success, status, idx)
2133
                for (nname, disks) in diskstatus.items()
2134
                for idx, (success, status) in enumerate(disks)]
2135

    
2136
    for nname, success, bdev_status, idx in diskdata:
2137
      # the 'ghost node' construction in Exec() ensures that we have a
2138
      # node here
2139
      snode = node_image[nname]
2140
      bad_snode = snode.ghost or snode.offline
2141
      self._ErrorIf(instance.disks_active and
2142
                    not success and not bad_snode,
2143
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2144
                    "couldn't retrieve status for disk/%s on %s: %s",
2145
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2146

    
2147
      if instance.disks_active and success and \
2148
         (bdev_status.is_degraded or
2149
          bdev_status.ldisk_status != constants.LDS_OKAY):
2150
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2151
        if bdev_status.is_degraded:
2152
          msg += " is degraded"
2153
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2154
          msg += "; state is '%s'" % \
2155
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2156

    
2157
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2158

    
2159
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2160
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2161
                  "instance %s, connection to primary node failed",
2162
                  instance.name)
2163

    
2164
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2165
                  constants.CV_EINSTANCELAYOUT, instance.name,
2166
                  "instance has multiple secondary nodes: %s",
2167
                  utils.CommaJoin(instance.secondary_nodes),
2168
                  code=self.ETYPE_WARNING)
2169

    
2170
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2171
    if any(es_flags.values()):
2172
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2173
        # Disk template not compatible with exclusive_storage: no instance
2174
        # node should have the flag set
2175
        es_nodes = [n
2176
                    for (n, es) in es_flags.items()
2177
                    if es]
2178
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2179
                    "instance has template %s, which is not supported on nodes"
2180
                    " that have exclusive storage set: %s",
2181
                    instance.disk_template,
2182
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2183
      for (idx, disk) in enumerate(instance.disks):
2184
        self._ErrorIf(disk.spindles is None,
2185
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2186
                      "number of spindles not configured for disk %s while"
2187
                      " exclusive storage is enabled, try running"
2188
                      " gnt-cluster repair-disk-sizes", idx)
2189

    
2190
    if instance.disk_template in constants.DTS_INT_MIRROR:
2191
      instance_nodes = utils.NiceSort(instance.all_nodes)
2192
      instance_groups = {}
2193

    
2194
      for node_uuid in instance_nodes:
2195
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2196
                                   []).append(node_uuid)
2197

    
2198
      pretty_list = [
2199
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2200
                           groupinfo[group].name)
2201
        # Sort so that we always list the primary node first.
2202
        for group, nodes in sorted(instance_groups.items(),
2203
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2204
                                   reverse=True)]
2205

    
2206
      self._ErrorIf(len(instance_groups) > 1,
2207
                    constants.CV_EINSTANCESPLITGROUPS,
2208
                    instance.name, "instance has primary and secondary nodes in"
2209
                    " different groups: %s", utils.CommaJoin(pretty_list),
2210
                    code=self.ETYPE_WARNING)
2211

    
2212
    inst_nodes_offline = []
2213
    for snode in instance.secondary_nodes:
2214
      s_img = node_image[snode]
2215
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2216
                    self.cfg.GetNodeName(snode),
2217
                    "instance %s, connection to secondary node failed",
2218
                    instance.name)
2219

    
2220
      if s_img.offline:
2221
        inst_nodes_offline.append(snode)
2222

    
2223
    # warn that the instance lives on offline nodes
2224
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2225
                  instance.name, "instance has offline secondary node(s) %s",
2226
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2227
    # ... or ghost/non-vm_capable nodes
2228
    for node_uuid in instance.all_nodes:
2229
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2230
                    instance.name, "instance lives on ghost node %s",
2231
                    self.cfg.GetNodeName(node_uuid))
2232
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2233
                    constants.CV_EINSTANCEBADNODE, instance.name,
2234
                    "instance lives on non-vm_capable node %s",
2235
                    self.cfg.GetNodeName(node_uuid))
2236

    
2237
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2238
    """Verify if there are any unknown volumes in the cluster.
2239

2240
    The .os, .swap and backup volumes are ignored. All other volumes are
2241
    reported as unknown.
2242

2243
    @type reserved: L{ganeti.utils.FieldSet}
2244
    @param reserved: a FieldSet of reserved volume names
2245

2246
    """
2247
    for node_uuid, n_img in node_image.items():
2248
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2249
          self.all_node_info[node_uuid].group != self.group_uuid):
2250
        # skip non-healthy nodes
2251
        continue
2252
      for volume in n_img.volumes:
2253
        test = ((node_uuid not in node_vol_should or
2254
                volume not in node_vol_should[node_uuid]) and
2255
                not reserved.Matches(volume))
2256
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2257
                      self.cfg.GetNodeName(node_uuid),
2258
                      "volume %s is unknown", volume,
2259
                      code=_VerifyErrors.ETYPE_WARNING)
2260

    
2261
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2262
    """Verify N+1 Memory Resilience.
2263

2264
    Check that if one single node dies we can still start all the
2265
    instances it was primary for.
2266

2267
    """
2268
    cluster_info = self.cfg.GetClusterInfo()
2269
    for node_uuid, n_img in node_image.items():
2270
      # This code checks that every node which is now listed as
2271
      # secondary has enough memory to host all instances it is
2272
      # supposed to should a single other node in the cluster fail.
2273
      # FIXME: not ready for failover to an arbitrary node
2274
      # FIXME: does not support file-backed instances
2275
      # WARNING: we currently take into account down instances as well
2276
      # as up ones, considering that even if they're down someone
2277
      # might want to start them even in the event of a node failure.
2278
      if n_img.offline or \
2279
         self.all_node_info[node_uuid].group != self.group_uuid:
2280
        # we're skipping nodes marked offline and nodes in other groups from
2281
        # the N+1 warning, since most likely we don't have good memory
2282
        # information from them; we already list instances living on such
2283
        # nodes, and that's enough warning
2284
        continue
2285
      #TODO(dynmem): also consider ballooning out other instances
2286
      for prinode, inst_uuids in n_img.sbp.items():
2287
        needed_mem = 0
2288
        for inst_uuid in inst_uuids:
2289
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2290
          if bep[constants.BE_AUTO_BALANCE]:
2291
            needed_mem += bep[constants.BE_MINMEM]
2292
        test = n_img.mfree < needed_mem
2293
        self._ErrorIf(test, constants.CV_ENODEN1,
2294
                      self.cfg.GetNodeName(node_uuid),
2295
                      "not enough memory to accomodate instance failovers"
2296
                      " should node %s fail (%dMiB needed, %dMiB available)",
2297
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2298

    
2299
  def _VerifyClientCertificates(self, nodes, all_nvinfo):
2300
    """Verifies the consistency of the client certificates.
2301

2302
    This includes several aspects:
2303
      - the individual validation of all nodes' certificates
2304
      - the consistency of the master candidate certificate map
2305
      - the consistency of the master candidate certificate map with the
2306
        certificates that the master candidates are actually using.
2307

2308
    @param nodes: the list of nodes to consider in this verification
2309
    @param all_nvinfo: the map of results of the verify_node call to
2310
      all nodes
2311

2312
    """
2313
    candidate_certs = self.cfg.GetClusterInfo().candidate_certs
2314
    if candidate_certs is None or len(candidate_certs) == 0:
2315
      self._ErrorIf(
2316
        True, constants.CV_ECLUSTERCLIENTCERT, None,
2317
        "The cluster's list of master candidate certificates is empty."
2318
        "If you just updated the cluster, please run"
2319
        " 'gnt-cluster renew-crypto --new-node-certificates'.")
2320
      return
2321

    
2322
    self._ErrorIf(
2323
      len(candidate_certs) != len(set(candidate_certs.values())),
2324
      constants.CV_ECLUSTERCLIENTCERT, None,
2325
      "There are at least two master candidates configured to use the same"
2326
      " certificate.")
2327

    
2328
    # collect the client certificate
2329
    for node in nodes:
2330
      if node.offline:
2331
        continue
2332

    
2333
      nresult = all_nvinfo[node.uuid]
2334
      if nresult.fail_msg or not nresult.payload:
2335
        continue
2336

    
2337
      (errcode, msg) = nresult.payload.get(constants.NV_CLIENT_CERT, None)
2338

    
2339
      self._ErrorIf(
2340
        errcode is not None, constants.CV_ECLUSTERCLIENTCERT, None,
2341
        "Client certificate of node '%s' failed validation: %s (code '%s')",
2342
        node.uuid, msg, errcode)
2343

    
2344
      if not errcode:
2345
        digest = msg
2346
        if node.master_candidate:
2347
          if node.uuid in candidate_certs:
2348
            self._ErrorIf(
2349
              digest != candidate_certs[node.uuid],
2350
              constants.CV_ECLUSTERCLIENTCERT, None,
2351
              "Client certificate digest of master candidate '%s' does not"
2352
              " match its entry in the cluster's map of master candidate"
2353
              " certificates. Expected: %s Got: %s", node.uuid,
2354
              digest, candidate_certs[node.uuid])
2355
          else:
2356
            self._ErrorIf(
2357
              True, constants.CV_ECLUSTERCLIENTCERT, None,
2358
              "The master candidate '%s' does not have an entry in the"
2359
              " map of candidate certificates.", node.uuid)
2360
            self._ErrorIf(
2361
              digest in candidate_certs.values(),
2362
              constants.CV_ECLUSTERCLIENTCERT, None,
2363
              "Master candidate '%s' is using a certificate of another node.",
2364
              node.uuid)
2365
        else:
2366
          self._ErrorIf(
2367
            node.uuid in candidate_certs,
2368
            constants.CV_ECLUSTERCLIENTCERT, None,
2369
            "Node '%s' is not a master candidate, but still listed in the"
2370
            " map of master candidate certificates.", node.uuid)
2371
          self._ErrorIf(
2372
            (node.uuid not in candidate_certs) and
2373
              (digest in candidate_certs.values()),
2374
            constants.CV_ECLUSTERCLIENTCERT, None,
2375
            "Node '%s' is not a master candidate and is incorrectly using a"
2376
            " certificate of another node which is master candidate.",
2377
            node.uuid)
2378

    
2379
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2380
                   (files_all, files_opt, files_mc, files_vm)):
2381
    """Verifies file checksums collected from all nodes.
2382

2383
    @param nodes: List of L{objects.Node} objects
2384
    @param master_node_uuid: UUID of master node
2385
    @param all_nvinfo: RPC results
2386

2387
    """
2388
    # Define functions determining which nodes to consider for a file
2389
    files2nodefn = [
2390
      (files_all, None),
2391
      (files_mc, lambda node: (node.master_candidate or
2392
                               node.uuid == master_node_uuid)),
2393
      (files_vm, lambda node: node.vm_capable),
2394
      ]
2395

    
2396
    # Build mapping from filename to list of nodes which should have the file
2397
    nodefiles = {}
2398
    for (files, fn) in files2nodefn:
2399
      if fn is None:
2400
        filenodes = nodes
2401
      else:
2402
        filenodes = filter(fn, nodes)
2403
      nodefiles.update((filename,
2404
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2405
                       for filename in files)
2406

    
2407
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2408

    
2409
    fileinfo = dict((filename, {}) for filename in nodefiles)
2410
    ignore_nodes = set()
2411

    
2412
    for node in nodes:
2413
      if node.offline:
2414
        ignore_nodes.add(node.uuid)
2415
        continue
2416

    
2417
      nresult = all_nvinfo[node.uuid]
2418

    
2419
      if nresult.fail_msg or not nresult.payload:
2420
        node_files = None
2421
      else:
2422
        fingerprints = nresult.payload.get(constants.NV_FILELIST, {})
2423
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2424
                          for (key, value) in fingerprints.items())
2425
        del fingerprints
2426

    
2427
      test = not (node_files and isinstance(node_files, dict))
2428
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2429
                    "Node did not return file checksum data")
2430
      if test:
2431
        ignore_nodes.add(node.uuid)
2432
        continue
2433

    
2434
      # Build per-checksum mapping from filename to nodes having it
2435
      for (filename, checksum) in node_files.items():
2436
        assert filename in nodefiles
2437
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2438

    
2439
    for (filename, checksums) in fileinfo.items():
2440
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2441

    
2442
      # Nodes having the file
2443
      with_file = frozenset(node_uuid
2444
                            for node_uuids in fileinfo[filename].values()
2445
                            for node_uuid in node_uuids) - ignore_nodes
2446

    
2447
      expected_nodes = nodefiles[filename] - ignore_nodes
2448

    
2449
      # Nodes missing file
2450
      missing_file = expected_nodes - with_file
2451

    
2452
      if filename in files_opt:
2453
        # All or no nodes
2454
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2455
                      constants.CV_ECLUSTERFILECHECK, None,
2456
                      "File %s is optional, but it must exist on all or no"
2457
                      " nodes (not found on %s)",
2458
                      filename,
2459
                      utils.CommaJoin(
2460
                        utils.NiceSort(
2461
                          map(self.cfg.GetNodeName, missing_file))))
2462
      else:
2463
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2464
                      "File %s is missing from node(s) %s", filename,
2465
                      utils.CommaJoin(
2466
                        utils.NiceSort(
2467
                          map(self.cfg.GetNodeName, missing_file))))
2468

    
2469
        # Warn if a node has a file it shouldn't
2470
        unexpected = with_file - expected_nodes
2471
        self._ErrorIf(unexpected,
2472
                      constants.CV_ECLUSTERFILECHECK, None,
2473
                      "File %s should not exist on node(s) %s",
2474
                      filename, utils.CommaJoin(
2475
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2476

    
2477
      # See if there are multiple versions of the file
2478
      test = len(checksums) > 1
2479
      if test:
2480
        variants = ["variant %s on %s" %
2481
                    (idx + 1,
2482
                     utils.CommaJoin(utils.NiceSort(
2483
                       map(self.cfg.GetNodeName, node_uuids))))
2484
                    for (idx, (checksum, node_uuids)) in
2485
                      enumerate(sorted(checksums.items()))]
2486
      else:
2487
        variants = []
2488

    
2489
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2490
                    "File %s found with %s different checksums (%s)",
2491
                    filename, len(checksums), "; ".join(variants))
2492

    
2493
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2494
    """Verify the drbd helper.
2495

2496
    """
2497
    if drbd_helper:
2498
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2499
      test = (helper_result is None)
2500
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2501
                    "no drbd usermode helper returned")
2502
      if helper_result:
2503
        status, payload = helper_result
2504
        test = not status
2505
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2506
                      "drbd usermode helper check unsuccessful: %s", payload)
2507
        test = status and (payload != drbd_helper)
2508
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2509
                      "wrong drbd usermode helper: %s", payload)
2510

    
2511
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2512
                      drbd_map):
2513
    """Verifies and the node DRBD status.
2514

2515
    @type ninfo: L{objects.Node}
2516
    @param ninfo: the node to check
2517
    @param nresult: the remote results for the node
2518
    @param instanceinfo: the dict of instances
2519
    @param drbd_helper: the configured DRBD usermode helper
2520
    @param drbd_map: the DRBD map as returned by
2521
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2522

2523
    """
2524
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2525

    
2526
    # compute the DRBD minors
2527
    node_drbd = {}
2528
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2529
      test = inst_uuid not in instanceinfo
2530
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2531
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2532
        # ghost instance should not be running, but otherwise we
2533
        # don't give double warnings (both ghost instance and
2534
        # unallocated minor in use)
2535
      if test:
2536
        node_drbd[minor] = (inst_uuid, False)
2537
      else:
2538
        instance = instanceinfo[inst_uuid]
2539
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2540

    
2541
    # and now check them
2542
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2543
    test = not isinstance(used_minors, (tuple, list))
2544
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2545
                  "cannot parse drbd status file: %s", str(used_minors))
2546
    if test:
2547
      # we cannot check drbd status
2548
      return
2549

    
2550
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2551
      test = minor not in used_minors and must_exist
2552
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2553
                    "drbd minor %d of instance %s is not active", minor,
2554
                    self.cfg.GetInstanceName(inst_uuid))
2555
    for minor in used_minors:
2556
      test = minor not in node_drbd
2557
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2558
                    "unallocated drbd minor %d is in use", minor)
2559

    
2560
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2561
    """Builds the node OS structures.
2562

2563
    @type ninfo: L{objects.Node}
2564
    @param ninfo: the node to check
2565
    @param nresult: the remote results for the node
2566
    @param nimg: the node image object
2567

2568
    """
2569
    remote_os = nresult.get(constants.NV_OSLIST, None)
2570
    test = (not isinstance(remote_os, list) or
2571
            not compat.all(isinstance(v, list) and len(v) == 7
2572
                           for v in remote_os))
2573

    
2574
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2575
                  "node hasn't returned valid OS data")
2576

    
2577
    nimg.os_fail = test
2578

    
2579
    if test:
2580
      return
2581

    
2582
    os_dict = {}
2583

    
2584
    for (name, os_path, status, diagnose,
2585
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2586

    
2587
      if name not in os_dict:
2588
        os_dict[name] = []
2589

    
2590
      # parameters is a list of lists instead of list of tuples due to
2591
      # JSON lacking a real tuple type, fix it:
2592
      parameters = [tuple(v) for v in parameters]
2593
      os_dict[name].append((os_path, status, diagnose,
2594
                            set(variants), set(parameters), set(api_ver)))
2595

    
2596
    nimg.oslist = os_dict
2597

    
2598
  def _VerifyNodeOS(self, ninfo, nimg, base):
2599
    """Verifies the node OS list.
2600

2601
    @type ninfo: L{objects.Node}
2602
    @param ninfo: the node to check
2603
    @param nimg: the node image object
2604
    @param base: the 'template' node we match against (e.g. from the master)
2605

2606
    """
2607
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2608

    
2609
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2610
    for os_name, os_data in nimg.oslist.items():
2611
      assert os_data, "Empty OS status for OS %s?!" % os_name
2612
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2613
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2614
                    "Invalid OS %s (located at %s): %s",
2615
                    os_name, f_path, f_diag)
2616
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2617
                    "OS '%s' has multiple entries"
2618
                    " (first one shadows the rest): %s",
2619
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2620
      # comparisons with the 'base' image
2621
      test = os_name not in base.oslist
2622
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2623
                    "Extra OS %s not present on reference node (%s)",
2624
                    os_name, self.cfg.GetNodeName(base.uuid))
2625
      if test:
2626
        continue
2627
      assert base.oslist[os_name], "Base node has empty OS status?"
2628
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2629
      if not b_status:
2630
        # base OS is invalid, skipping
2631
        continue
2632
      for kind, a, b in [("API version", f_api, b_api),
2633
                         ("variants list", f_var, b_var),
2634
                         ("parameters", beautify_params(f_param),
2635
                          beautify_params(b_param))]:
2636
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2637
                      "OS %s for %s differs from reference node %s:"
2638
                      " [%s] vs. [%s]", kind, os_name,
2639
                      self.cfg.GetNodeName(base.uuid),
2640
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2641

    
2642
    # check any missing OSes
2643
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2644
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2645
                  "OSes present on reference node %s"
2646
                  " but missing on this node: %s",
2647
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2648

    
2649
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2650
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2651

2652
    @type ninfo: L{objects.Node}
2653
    @param ninfo: the node to check
2654
    @param nresult: the remote results for the node
2655
    @type is_master: bool
2656
    @param is_master: Whether node is the master node
2657

2658
    """
2659
    cluster = self.cfg.GetClusterInfo()
2660
    if (is_master and
2661
        (cluster.IsFileStorageEnabled() or
2662
         cluster.IsSharedFileStorageEnabled())):
2663
      try:
2664
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2665
      except KeyError:
2666
        # This should never happen
2667
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2668
                      "Node did not return forbidden file storage paths")
2669
      else:
2670
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2671
                      "Found forbidden file storage paths: %s",
2672
                      utils.CommaJoin(fspaths))
2673
    else:
2674
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2675
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2676
                    "Node should not have returned forbidden file storage"
2677
                    " paths")
2678

    
2679
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2680
                          verify_key, error_key):
2681
    """Verifies (file) storage paths.
2682

2683
    @type ninfo: L{objects.Node}
2684
    @param ninfo: the node to check
2685
    @param nresult: the remote results for the node
2686
    @type file_disk_template: string
2687
    @param file_disk_template: file-based disk template, whose directory
2688
        is supposed to be verified
2689
    @type verify_key: string
2690
    @param verify_key: key for the verification map of this file
2691
        verification step
2692
    @param error_key: error key to be added to the verification results
2693
        in case something goes wrong in this verification step
2694

2695
    """
2696
    assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
2697
              constants.ST_FILE, constants.ST_SHARED_FILE
2698
           ))
2699

    
2700
    cluster = self.cfg.GetClusterInfo()
2701
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2702
      self._ErrorIf(
2703
          verify_key in nresult,
2704
          error_key, ninfo.name,
2705
          "The configured %s storage path is unusable: %s" %
2706
          (file_disk_template, nresult.get(verify_key)))
2707

    
2708
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2709
    """Verifies (file) storage paths.
2710

2711
    @see: C{_VerifyStoragePaths}
2712

2713
    """
2714
    self._VerifyStoragePaths(
2715
        ninfo, nresult, constants.DT_FILE,
2716
        constants.NV_FILE_STORAGE_PATH,
2717
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2718

    
2719
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2720
    """Verifies (file) storage paths.
2721

2722
    @see: C{_VerifyStoragePaths}
2723

2724
    """
2725
    self._VerifyStoragePaths(
2726
        ninfo, nresult, constants.DT_SHARED_FILE,
2727
        constants.NV_SHARED_FILE_STORAGE_PATH,
2728
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2729

    
2730
  def _VerifyOob(self, ninfo, nresult):
2731
    """Verifies out of band functionality of a node.
2732

2733
    @type ninfo: L{objects.Node}
2734
    @param ninfo: the node to check
2735
    @param nresult: the remote results for the node
2736

2737
    """
2738
    # We just have to verify the paths on master and/or master candidates
2739
    # as the oob helper is invoked on the master
2740
    if ((ninfo.master_candidate or ninfo.master_capable) and
2741
        constants.NV_OOB_PATHS in nresult):
2742
      for path_result in nresult[constants.NV_OOB_PATHS]:
2743
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2744
                      ninfo.name, path_result)
2745

    
2746
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2747
    """Verifies and updates the node volume data.
2748

2749
    This function will update a L{NodeImage}'s internal structures
2750
    with data from the remote call.
2751

2752
    @type ninfo: L{objects.Node}
2753
    @param ninfo: the node to check
2754
    @param nresult: the remote results for the node
2755
    @param nimg: the node image object
2756
    @param vg_name: the configured VG name
2757

2758
    """
2759
    nimg.lvm_fail = True
2760
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2761
    if vg_name is None:
2762
      pass
2763
    elif isinstance(lvdata, basestring):
2764
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2765
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2766
    elif not isinstance(lvdata, dict):
2767
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2768
                    "rpc call to node failed (lvlist)")
2769
    else:
2770
      nimg.volumes = lvdata
2771
      nimg.lvm_fail = False
2772

    
2773
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2774
    """Verifies and updates the node instance list.
2775

2776
    If the listing was successful, then updates this node's instance
2777
    list. Otherwise, it marks the RPC call as failed for the instance
2778
    list key.
2779

2780
    @type ninfo: L{objects.Node}
2781
    @param ninfo: the node to check
2782
    @param nresult: the remote results for the node
2783
    @param nimg: the node image object
2784

2785
    """
2786
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2787
    test = not isinstance(idata, list)
2788
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2789
                  "rpc call to node failed (instancelist): %s",
2790
                  utils.SafeEncode(str(idata)))
2791
    if test:
2792
      nimg.hyp_fail = True
2793
    else:
2794
      nimg.instances = [inst.uuid for (_, inst) in
2795
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2796

    
2797
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2798
    """Verifies and computes a node information map
2799

2800
    @type ninfo: L{objects.Node}
2801
    @param ninfo: the node to check
2802
    @param nresult: the remote results for the node
2803
    @param nimg: the node image object
2804
    @param vg_name: the configured VG name
2805

2806
    """
2807
    # try to read free memory (from the hypervisor)
2808
    hv_info = nresult.get(constants.NV_HVINFO, None)
2809
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2810
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2811
                  "rpc call to node failed (hvinfo)")
2812
    if not test:
2813
      try:
2814
        nimg.mfree = int(hv_info["memory_free"])
2815
      except (ValueError, TypeError):
2816
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2817
                      "node returned invalid nodeinfo, check hypervisor")
2818

    
2819
    # FIXME: devise a free space model for file based instances as well
2820
    if vg_name is not None:
2821
      test = (constants.NV_VGLIST not in nresult or
2822
              vg_name not in nresult[constants.NV_VGLIST])
2823
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2824
                    "node didn't return data for the volume group '%s'"
2825
                    " - it is either missing or broken", vg_name)
2826
      if not test:
2827
        try:
2828
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2829
        except (ValueError, TypeError):
2830
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2831
                        "node returned invalid LVM info, check LVM status")
2832

    
2833
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2834
    """Gets per-disk status information for all instances.
2835

2836
    @type node_uuids: list of strings
2837
    @param node_uuids: Node UUIDs
2838
    @type node_image: dict of (UUID, L{objects.Node})
2839
    @param node_image: Node objects
2840
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2841
    @param instanceinfo: Instance objects
2842
    @rtype: {instance: {node: [(succes, payload)]}}
2843
    @return: a dictionary of per-instance dictionaries with nodes as
2844
        keys and disk information as values; the disk information is a
2845
        list of tuples (success, payload)
2846

2847
    """
2848
    node_disks = {}
2849
    node_disks_dev_inst_only = {}
2850
    diskless_instances = set()
2851
    diskless = constants.DT_DISKLESS
2852

    
2853
    for nuuid in node_uuids:
2854
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2855
                                             node_image[nuuid].sinst))
2856
      diskless_instances.update(uuid for uuid in node_inst_uuids
2857
                                if instanceinfo[uuid].disk_template == diskless)
2858
      disks = [(inst_uuid, disk)
2859
               for inst_uuid in node_inst_uuids
2860
               for disk in instanceinfo[inst_uuid].disks]
2861

    
2862
      if not disks:
2863
        # No need to collect data
2864
        continue
2865

    
2866
      node_disks[nuuid] = disks
2867

    
2868
      # _AnnotateDiskParams makes already copies of the disks
2869
      dev_inst_only = []
2870
      for (inst_uuid, dev) in disks:
2871
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2872
                                          self.cfg)
2873
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2874

    
2875
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2876

    
2877
    assert len(node_disks) == len(node_disks_dev_inst_only)
2878

    
2879
    # Collect data from all nodes with disks
2880
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2881
               node_disks.keys(), node_disks_dev_inst_only)
2882

    
2883
    assert len(result) == len(node_disks)
2884

    
2885
    instdisk = {}
2886

    
2887
    for (nuuid, nres) in result.items():
2888
      node = self.cfg.GetNodeInfo(nuuid)
2889
      disks = node_disks[node.uuid]
2890

    
2891
      if nres.offline:
2892
        # No data from this node
2893
        data = len(disks) * [(False, "node offline")]
2894
      else:
2895
        msg = nres.fail_msg
2896
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2897
                      "while getting disk information: %s", msg)
2898
        if msg:
2899
          # No data from this node
2900
          data = len(disks) * [(False, msg)]
2901
        else:
2902
          data = []
2903
          for idx, i in enumerate(nres.payload):
2904
            if isinstance(i, (tuple, list)) and len(i) == 2:
2905
              data.append(i)
2906
            else:
2907
              logging.warning("Invalid result from node %s, entry %d: %s",
2908
                              node.name, idx, i)
2909
              data.append((False, "Invalid result from the remote node"))
2910

    
2911
      for ((inst_uuid, _), status) in zip(disks, data):
2912
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2913
          .append(status)
2914

    
2915
    # Add empty entries for diskless instances.
2916
    for inst_uuid in diskless_instances:
2917
      assert inst_uuid not in instdisk
2918
      instdisk[inst_uuid] = {}
2919

    
2920
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2921
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2922
                      compat.all(isinstance(s, (tuple, list)) and
2923
                                 len(s) == 2 for s in statuses)
2924
                      for inst, nuuids in instdisk.items()
2925
                      for nuuid, statuses in nuuids.items())
2926
    if __debug__:
2927
      instdisk_keys = set(instdisk)
2928
      instanceinfo_keys = set(instanceinfo)
2929
      assert instdisk_keys == instanceinfo_keys, \
2930
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2931
         (instdisk_keys, instanceinfo_keys))
2932

    
2933
    return instdisk
2934

    
2935
  @staticmethod
2936
  def _SshNodeSelector(group_uuid, all_nodes):
2937
    """Create endless iterators for all potential SSH check hosts.
2938

2939
    """
2940
    nodes = [node for node in all_nodes
2941
             if (node.group != group_uuid and
2942
                 not node.offline)]
2943
    keyfunc = operator.attrgetter("group")
2944

    
2945
    return map(itertools.cycle,
2946
               [sorted(map(operator.attrgetter("name"), names))
2947
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2948
                                                  keyfunc)])
2949

    
2950
  @classmethod
2951
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2952
    """Choose which nodes should talk to which other nodes.
2953

2954
    We will make nodes contact all nodes in their group, and one node from
2955
    every other group.
2956

2957
    @warning: This algorithm has a known issue if one node group is much
2958
      smaller than others (e.g. just one node). In such a case all other
2959
      nodes will talk to the single node.
2960

2961
    """
2962
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2963
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2964

    
2965
    return (online_nodes,
2966
            dict((name, sorted([i.next() for i in sel]))
2967
                 for name in online_nodes))
2968

    
2969
  def BuildHooksEnv(self):
2970
    """Build hooks env.
2971

2972
    Cluster-Verify hooks just ran in the post phase and their failure makes
2973
    the output be logged in the verify output and the verification to fail.
2974

2975
    """
2976
    env = {
2977
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2978
      }
2979

    
2980
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2981
               for node in self.my_node_info.values())
2982

    
2983
    return env
2984

    
2985
  def BuildHooksNodes(self):
2986
    """Build hooks nodes.
2987

2988
    """
2989
    return ([], list(self.my_node_info.keys()))
2990

    
2991
  def Exec(self, feedback_fn):
2992
    """Verify integrity of the node group, performing various test on nodes.
2993

2994
    """
2995
    # This method has too many local variables. pylint: disable=R0914
2996
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2997

    
2998
    if not self.my_node_uuids:
2999
      # empty node group
3000
      feedback_fn("* Empty node group, skipping verification")
3001
      return True
3002

    
3003
    self.bad = False
3004
    verbose = self.op.verbose
3005
    self._feedback_fn = feedback_fn
3006

    
3007
    vg_name = self.cfg.GetVGName()
3008
    drbd_helper = self.cfg.GetDRBDHelper()
3009
    cluster = self.cfg.GetClusterInfo()
3010
    hypervisors = cluster.enabled_hypervisors
3011
    node_data_list = self.my_node_info.values()
3012

    
3013
    i_non_redundant = [] # Non redundant instances
3014
    i_non_a_balanced = [] # Non auto-balanced instances
3015
    i_offline = 0 # Count of offline instances
3016
    n_offline = 0 # Count of offline nodes
3017
    n_drained = 0 # Count of nodes being drained
3018
    node_vol_should = {}
3019

    
3020
    # FIXME: verify OS list
3021

    
3022
    # File verification
3023
    filemap = ComputeAncillaryFiles(cluster, False)
3024

    
3025
    # do local checksums
3026
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
3027
    master_ip = self.cfg.GetMasterIP()
3028

    
3029
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
3030

    
3031
    user_scripts = []
3032
    if self.cfg.GetUseExternalMipScript():
3033
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
3034

    
3035
    node_verify_param = {
3036
      constants.NV_FILELIST:
3037
        map(vcluster.MakeVirtualPath,
3038
            utils.UniqueSequence(filename
3039
                                 for files in filemap
3040
                                 for filename in files)),
3041
      constants.NV_NODELIST:
3042
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
3043
                                  self.all_node_info.values()),
3044
      constants.NV_HYPERVISOR: hypervisors,
3045
      constants.NV_HVPARAMS:
3046
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
3047
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
3048
                                 for node in node_data_list
3049
                                 if not node.offline],
3050
      constants.NV_INSTANCELIST: hypervisors,
3051
      constants.NV_VERSION: None,
3052
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
3053
      constants.NV_NODESETUP: None,
3054
      constants.NV_TIME: None,
3055
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
3056
      constants.NV_OSLIST: None,
3057
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
3058
      constants.NV_USERSCRIPTS: user_scripts,
3059
      constants.NV_CLIENT_CERT: None,
3060
      }
3061

    
3062
    if vg_name is not None:
3063
      node_verify_param[constants.NV_VGLIST] = None
3064
      node_verify_param[constants.NV_LVLIST] = vg_name
3065
      node_verify_param[constants.NV_PVLIST] = [vg_name]
3066

    
3067
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
3068
      if drbd_helper:
3069
        node_verify_param[constants.NV_DRBDVERSION] = None
3070
        node_verify_param[constants.NV_DRBDLIST] = None
3071
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
3072

    
3073
    if cluster.IsFileStorageEnabled() or \
3074
        cluster.IsSharedFileStorageEnabled():
3075
      # Load file storage paths only from master node
3076
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
3077
        self.cfg.GetMasterNodeName()
3078
      if cluster.IsFileStorageEnabled():
3079
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
3080
          cluster.file_storage_dir
3081

    
3082
    # bridge checks
3083
    # FIXME: this needs to be changed per node-group, not cluster-wide
3084
    bridges = set()
3085
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
3086
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3087
      bridges.add(default_nicpp[constants.NIC_LINK])
3088
    for inst_uuid in self.my_inst_info.values():
3089
      for nic in inst_uuid.nics:
3090
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
3091
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3092
          bridges.add(full_nic[constants.NIC_LINK])
3093

    
3094
    if bridges:
3095
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
3096

    
3097
    # Build our expected cluster state
3098
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
3099
                                                 uuid=node.uuid,
3100
                                                 vm_capable=node.vm_capable))
3101
                      for node in node_data_list)
3102

    
3103
    # Gather OOB paths
3104
    oob_paths = []
3105
    for node in self.all_node_info.values():
3106
      path = SupportsOob(self.cfg, node)
3107
      if path and path not in oob_paths:
3108
        oob_paths.append(path)
3109

    
3110
    if oob_paths:
3111
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
3112

    
3113
    for inst_uuid in self.my_inst_uuids:
3114
      instance = self.my_inst_info[inst_uuid]
3115
      if instance.admin_state == constants.ADMINST_OFFLINE:
3116
        i_offline += 1
3117

    
3118
      for nuuid in instance.all_nodes:
3119
        if nuuid not in node_image:
3120
          gnode = self.NodeImage(uuid=nuuid)
3121
          gnode.ghost = (nuuid not in self.all_node_info)
3122
          node_image[nuuid] = gnode
3123

    
3124
      instance.MapLVsByNode(node_vol_should)
3125

    
3126
      pnode = instance.primary_node
3127
      node_image[pnode].pinst.append(instance.uuid)
3128

    
3129
      for snode in instance.secondary_nodes:
3130
        nimg = node_image[snode]
3131
        nimg.sinst.append(instance.uuid)
3132
        if pnode not in nimg.sbp:
3133
          nimg.sbp[pnode] = []
3134
        nimg.sbp[pnode].append(instance.uuid)
3135

    
3136
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3137
                                               self.my_node_info.keys())
3138
    # The value of exclusive_storage should be the same across the group, so if
3139
    # it's True for at least a node, we act as if it were set for all the nodes
3140
    self._exclusive_storage = compat.any(es_flags.values())
3141
    if self._exclusive_storage:
3142
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3143

    
3144
    node_group_uuids = dict(map(lambda n: (n.name, n.group),
3145
                                self.cfg.GetAllNodesInfo().values()))
3146
    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
3147

    
3148
    # At this point, we have the in-memory data structures complete,
3149
    # except for the runtime information, which we'll gather next
3150

    
3151
    # Due to the way our RPC system works, exact response times cannot be
3152
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3153
    # time before and after executing the request, we can at least have a time
3154
    # window.
3155
    nvinfo_starttime = time.time()
3156
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3157
                                           node_verify_param,
3158
                                           self.cfg.GetClusterName(),
3159
                                           self.cfg.GetClusterInfo().hvparams,
3160
                                           node_group_uuids,
3161
                                           groups_config)
3162
    nvinfo_endtime = time.time()
3163

    
3164
    if self.extra_lv_nodes and vg_name is not None:
3165
      extra_lv_nvinfo = \
3166
          self.rpc.call_node_verify(self.extra_lv_nodes,
3167
                                    {constants.NV_LVLIST: vg_name},
3168
                                    self.cfg.GetClusterName(),
3169
                                    self.cfg.GetClusterInfo().hvparams,
3170
                                    node_group_uuids,
3171
                                    groups_config)
3172
    else:
3173
      extra_lv_nvinfo = {}
3174

    
3175
    all_drbd_map = self.cfg.ComputeDRBDMap()
3176

    
3177
    feedback_fn("* Gathering disk information (%s nodes)" %
3178
                len(self.my_node_uuids))
3179
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3180
                                     self.my_inst_info)
3181

    
3182
    feedback_fn("* Verifying configuration file consistency")
3183

    
3184
    self._VerifyClientCertificates(self.my_node_info.values(), all_nvinfo)
3185
    # If not all nodes are being checked, we need to make sure the master node
3186
    # and a non-checked vm_capable node are in the list.
3187
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3188
    if absent_node_uuids:
3189
      vf_nvinfo = all_nvinfo.copy()
3190
      vf_node_info = list(self.my_node_info.values())
3191
      additional_node_uuids = []
3192
      if master_node_uuid not in self.my_node_info:
3193
        additional_node_uuids.append(master_node_uuid)
3194
        vf_node_info.append(self.all_node_info[master_node_uuid])
3195
      # Add the first vm_capable node we find which is not included,
3196
      # excluding the master node (which we already have)
3197
      for node_uuid in absent_node_uuids:
3198
        nodeinfo = self.all_node_info[node_uuid]
3199
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3200
            node_uuid != master_node_uuid):
3201
          additional_node_uuids.append(node_uuid)
3202
          vf_node_info.append(self.all_node_info[node_uuid])
3203
          break
3204
      key = constants.NV_FILELIST
3205
      vf_nvinfo.update(self.rpc.call_node_verify(
3206
         additional_node_uuids, {key: node_verify_param[key]},
3207
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
3208
         node_group_uuids,
3209
         groups_config))
3210
    else:
3211
      vf_nvinfo = all_nvinfo
3212
      vf_node_info = self.my_node_info.values()
3213

    
3214
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3215

    
3216
    feedback_fn("* Verifying node status")
3217

    
3218
    refos_img = None
3219

    
3220
    for node_i in node_data_list:
3221
      nimg = node_image[node_i.uuid]
3222

    
3223
      if node_i.offline:
3224
        if verbose:
3225
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3226
        n_offline += 1
3227
        continue
3228

    
3229
      if node_i.uuid == master_node_uuid:
3230
        ntype = "master"
3231
      elif node_i.master_candidate:
3232
        ntype = "master candidate"
3233
      elif node_i.drained:
3234
        ntype = "drained"
3235
        n_drained += 1
3236
      else:
3237
        ntype = "regular"
3238
      if verbose:
3239
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3240

    
3241
      msg = all_nvinfo[node_i.uuid].fail_msg
3242
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3243
                    "while contacting node: %s", msg)
3244
      if msg:
3245
        nimg.rpc_fail = True
3246
        continue
3247

    
3248
      nresult = all_nvinfo[node_i.uuid].payload
3249

    
3250
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3251
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3252
      self._VerifyNodeNetwork(node_i, nresult)
3253
      self._VerifyNodeUserScripts(node_i, nresult)
3254
      self._VerifyOob(node_i, nresult)
3255
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3256
                                           node_i.uuid == master_node_uuid)
3257
      self._VerifyFileStoragePaths(node_i, nresult)
3258
      self._VerifySharedFileStoragePaths(node_i, nresult)
3259

    
3260
      if nimg.vm_capable:
3261
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3262
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3263
                             all_drbd_map)
3264

    
3265
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3266
        self._UpdateNodeInstances(node_i, nresult, nimg)
3267
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3268
        self._UpdateNodeOS(node_i, nresult, nimg)
3269

    
3270
        if not nimg.os_fail:
3271
          if refos_img is None:
3272
            refos_img = nimg
3273
          self._VerifyNodeOS(node_i, nimg, refos_img)
3274
        self._VerifyNodeBridges(node_i, nresult, bridges)
3275

    
3276
        # Check whether all running instances are primary for the node. (This
3277
        # can no longer be done from _VerifyInstance below, since some of the
3278
        # wrong instances could be from other node groups.)
3279
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3280

    
3281
        for inst_uuid in non_primary_inst_uuids:
3282
          test = inst_uuid in self.all_inst_info
3283
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3284
                        self.cfg.GetInstanceName(inst_uuid),
3285
                        "instance should not run on node %s", node_i.name)
3286
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3287
                        "node is running unknown instance %s", inst_uuid)
3288

    
3289
    self._VerifyGroupDRBDVersion(all_nvinfo)
3290
    self._VerifyGroupLVM(node_image, vg_name)
3291

    
3292
    for node_uuid, result in extra_lv_nvinfo.items():
3293
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3294
                              node_image[node_uuid], vg_name)
3295

    
3296
    feedback_fn("* Verifying instance status")
3297
    for inst_uuid in self.my_inst_uuids:
3298
      instance = self.my_inst_info[inst_uuid]
3299
      if verbose:
3300
        feedback_fn("* Verifying instance %s" % instance.name)
3301
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3302

    
3303
      # If the instance is non-redundant we cannot survive losing its primary
3304
      # node, so we are not N+1 compliant.
3305
      if instance.disk_template not in constants.DTS_MIRRORED:
3306
        i_non_redundant.append(instance)
3307

    
3308
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3309
        i_non_a_balanced.append(instance)
3310

    
3311
    feedback_fn("* Verifying orphan volumes")
3312
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3313

    
3314
    # We will get spurious "unknown volume" warnings if any node of this group
3315
    # is secondary for an instance whose primary is in another group. To avoid
3316
    # them, we find these instances and add their volumes to node_vol_should.
3317
    for instance in self.all_inst_info.values():
3318
      for secondary in instance.secondary_nodes:
3319
        if (secondary in self.my_node_info
3320
            and instance.name not in self.my_inst_info):
3321
          instance.MapLVsByNode(node_vol_should)
3322
          break
3323

    
3324
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3325

    
3326
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3327
      feedback_fn("* Verifying N+1 Memory redundancy")
3328
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3329

    
3330
    feedback_fn("* Other Notes")
3331
    if i_non_redundant:
3332
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3333
                  % len(i_non_redundant))
3334

    
3335
    if i_non_a_balanced:
3336
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3337
                  % len(i_non_a_balanced))
3338

    
3339
    if i_offline:
3340
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3341

    
3342
    if n_offline:
3343
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3344

    
3345
    if n_drained:
3346
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3347

    
3348
    return not self.bad
3349

    
3350
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3351
    """Analyze the post-hooks' result
3352

3353
    This method analyses the hook result, handles it, and sends some
3354
    nicely-formatted feedback back to the user.
3355

3356
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3357
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3358
    @param hooks_results: the results of the multi-node hooks rpc call
3359
    @param feedback_fn: function used send feedback back to the caller
3360
    @param lu_result: previous Exec result
3361
    @return: the new Exec result, based on the previous result
3362
        and hook results
3363

3364
    """
3365
    # We only really run POST phase hooks, only for non-empty groups,
3366
    # and are only interested in their results
3367
    if not self.my_node_uuids:
3368
      # empty node group
3369
      pass
3370
    elif phase == constants.HOOKS_PHASE_POST:
3371
      # Used to change hooks' output to proper indentation
3372
      feedback_fn("* Hooks Results")
3373
      assert hooks_results, "invalid result from hooks"
3374

    
3375
      for node_name in hooks_results:
3376
        res = hooks_results[node_name]
3377
        msg = res.fail_msg
3378
        test = msg and not res.offline
3379
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3380
                      "Communication failure in hooks execution: %s", msg)
3381
        if res.offline or msg:
3382
          # No need to investigate payload if node is offline or gave
3383
          # an error.
3384
          continue
3385
        for script, hkr, output in res.payload:
3386
          test = hkr == constants.HKR_FAIL
3387
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3388
                        "Script %s failed, output:", script)
3389
          if test:
3390
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3391
            feedback_fn("%s" % output)
3392
            lu_result = False
3393

    
3394
    return lu_result
3395

    
3396

    
3397
class LUClusterVerifyDisks(NoHooksLU):
3398
  """Verifies the cluster disks status.
3399

3400
  """
3401
  REQ_BGL = False
3402

    
3403
  def ExpandNames(self):
3404
    self.share_locks = ShareAll()
3405
    self.needed_locks = {
3406
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3407
      }
3408

    
3409
  def Exec(self, feedback_fn):
3410
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3411

    
3412
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3413
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3414
                           for group in group_names])