Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 45f75526

History | View | Annotate | Download (127.8 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import copy
25
import itertools
26
import logging
27
import operator
28
import os
29
import re
30
import time
31

    
32
from ganeti import compat
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import hypervisor
36
from ganeti import locking
37
from ganeti import masterd
38
from ganeti import netutils
39
from ganeti import objects
40
from ganeti import opcodes
41
from ganeti import pathutils
42
from ganeti import query
43
import ganeti.rpc.node as rpc
44
from ganeti import runtime
45
from ganeti import ssh
46
from ganeti import uidpool
47
from ganeti import utils
48
from ganeti import vcluster
49

    
50
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
51
  ResultWithJobs
52
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
53
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
54
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
55
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
56
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
57
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
58
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
59
  CheckDiskAccessModeConsistency, CreateNewClientCert
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
def _UpdateMasterClientCert(
65
    lu, master_uuid, cluster, feedback_fn,
66
    client_cert=pathutils.NODED_CLIENT_CERT_FILE,
67
    client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP):
68
  """Renews the master's client certificate and propagates the config.
69

70
  @type lu: C{LogicalUnit}
71
  @param lu: the logical unit holding the config
72
  @type master_uuid: string
73
  @param master_uuid: the master node's UUID
74
  @type cluster: C{objects.Cluster}
75
  @param cluster: the cluster's configuration
76
  @type feedback_fn: function
77
  @param feedback_fn: feedback functions for config updates
78
  @type client_cert: string
79
  @param client_cert: the path of the client certificate
80
  @type client_cert_tmp: string
81
  @param client_cert_tmp: the temporary path of the client certificate
82
  @rtype: string
83
  @return: the digest of the newly created client certificate
84

85
  """
86
  client_digest = CreateNewClientCert(lu, master_uuid, filename=client_cert_tmp)
87
  utils.AddNodeToCandidateCerts(master_uuid, client_digest,
88
                                cluster.candidate_certs)
89
  # This triggers an update of the config and distribution of it with the old
90
  # SSL certificate
91
  lu.cfg.Update(cluster, feedback_fn)
92

    
93
  utils.RemoveFile(client_cert)
94
  utils.RenameFile(client_cert_tmp, client_cert)
95
  return client_digest
96

    
97

    
98
class LUClusterRenewCrypto(NoHooksLU):
99
  """Renew the cluster's crypto tokens.
100

101
  Note that most of this operation is done in gnt_cluster.py, this LU only
102
  takes care of the renewal of the client SSL certificates.
103

104
  """
105
  def Exec(self, feedback_fn):
106
    master_uuid = self.cfg.GetMasterNode()
107
    cluster = self.cfg.GetClusterInfo()
108

    
109
    server_digest = utils.GetCertificateDigest(
110
      cert_filename=pathutils.NODED_CERT_FILE)
111
    utils.AddNodeToCandidateCerts("%s-SERVER" % master_uuid,
112
                                  server_digest,
113
                                  cluster.candidate_certs)
114
    new_master_digest = _UpdateMasterClientCert(self, master_uuid, cluster,
115
                                                feedback_fn)
116

    
117
    cluster.candidate_certs = {master_uuid: new_master_digest}
118
    nodes = self.cfg.GetAllNodesInfo()
119
    for (node_uuid, node_info) in nodes.items():
120
      if node_uuid != master_uuid:
121
        new_digest = CreateNewClientCert(self, node_uuid)
122
        if node_info.master_candidate:
123
          cluster.candidate_certs[node_uuid] = new_digest
124
    # Trigger another update of the config now with the new master cert
125
    self.cfg.Update(cluster, feedback_fn)
126

    
127

    
128
class LUClusterActivateMasterIp(NoHooksLU):
129
  """Activate the master IP on the master node.
130

131
  """
132
  def Exec(self, feedback_fn):
133
    """Activate the master IP.
134

135
    """
136
    master_params = self.cfg.GetMasterNetworkParameters()
137
    ems = self.cfg.GetUseExternalMipScript()
138
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
139
                                                   master_params, ems)
140
    result.Raise("Could not activate the master IP")
141

    
142

    
143
class LUClusterDeactivateMasterIp(NoHooksLU):
144
  """Deactivate the master IP on the master node.
145

146
  """
147
  def Exec(self, feedback_fn):
148
    """Deactivate the master IP.
149

150
    """
151
    master_params = self.cfg.GetMasterNetworkParameters()
152
    ems = self.cfg.GetUseExternalMipScript()
153
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
154
                                                     master_params, ems)
155
    result.Raise("Could not deactivate the master IP")
156

    
157

    
158
class LUClusterConfigQuery(NoHooksLU):
159
  """Return configuration values.
160

161
  """
162
  REQ_BGL = False
163

    
164
  def CheckArguments(self):
165
    self.cq = ClusterQuery(None, self.op.output_fields, False)
166

    
167
  def ExpandNames(self):
168
    self.cq.ExpandNames(self)
169

    
170
  def DeclareLocks(self, level):
171
    self.cq.DeclareLocks(self, level)
172

    
173
  def Exec(self, feedback_fn):
174
    result = self.cq.OldStyleQuery(self)
175

    
176
    assert len(result) == 1
177

    
178
    return result[0]
179

    
180

    
181
class LUClusterDestroy(LogicalUnit):
182
  """Logical unit for destroying the cluster.
183

184
  """
185
  HPATH = "cluster-destroy"
186
  HTYPE = constants.HTYPE_CLUSTER
187

    
188
  def BuildHooksEnv(self):
189
    """Build hooks env.
190

191
    """
192
    return {
193
      "OP_TARGET": self.cfg.GetClusterName(),
194
      }
195

    
196
  def BuildHooksNodes(self):
197
    """Build hooks nodes.
198

199
    """
200
    return ([], [])
201

    
202
  def CheckPrereq(self):
203
    """Check prerequisites.
204

205
    This checks whether the cluster is empty.
206

207
    Any errors are signaled by raising errors.OpPrereqError.
208

209
    """
210
    master = self.cfg.GetMasterNode()
211

    
212
    nodelist = self.cfg.GetNodeList()
213
    if len(nodelist) != 1 or nodelist[0] != master:
214
      raise errors.OpPrereqError("There are still %d node(s) in"
215
                                 " this cluster." % (len(nodelist) - 1),
216
                                 errors.ECODE_INVAL)
217
    instancelist = self.cfg.GetInstanceList()
218
    if instancelist:
219
      raise errors.OpPrereqError("There are still %d instance(s) in"
220
                                 " this cluster." % len(instancelist),
221
                                 errors.ECODE_INVAL)
222

    
223
  def Exec(self, feedback_fn):
224
    """Destroys the cluster.
225

226
    """
227
    master_params = self.cfg.GetMasterNetworkParameters()
228

    
229
    # Run post hooks on master node before it's removed
230
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
231

    
232
    ems = self.cfg.GetUseExternalMipScript()
233
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
234
                                                     master_params, ems)
235
    result.Warn("Error disabling the master IP address", self.LogWarning)
236
    return master_params.uuid
237

    
238

    
239
class LUClusterPostInit(LogicalUnit):
240
  """Logical unit for running hooks after cluster initialization.
241

242
  """
243
  HPATH = "cluster-init"
244
  HTYPE = constants.HTYPE_CLUSTER
245

    
246
  def CheckArguments(self):
247
    self.master_uuid = self.cfg.GetMasterNode()
248
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
249

    
250
    # TODO: When Issue 584 is solved, and None is properly parsed when used
251
    # as a default value, ndparams.get(.., None) can be changed to
252
    # ndparams[..] to access the values directly
253

    
254
    # OpenvSwitch: Warn user if link is missing
255
    if (self.master_ndparams[constants.ND_OVS] and not
256
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
257
      self.LogInfo("No physical interface for OpenvSwitch was given."
258
                   " OpenvSwitch will not have an outside connection. This"
259
                   " might not be what you want.")
260

    
261
  def BuildHooksEnv(self):
262
    """Build hooks env.
263

264
    """
265
    return {
266
      "OP_TARGET": self.cfg.GetClusterName(),
267
      }
268

    
269
  def BuildHooksNodes(self):
270
    """Build hooks nodes.
271

272
    """
273
    return ([], [self.cfg.GetMasterNode()])
274

    
275
  def Exec(self, feedback_fn):
276
    """Create and configure Open vSwitch
277

278
    """
279
    if self.master_ndparams[constants.ND_OVS]:
280
      result = self.rpc.call_node_configure_ovs(
281
                 self.master_uuid,
282
                 self.master_ndparams[constants.ND_OVS_NAME],
283
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
284
      result.Raise("Could not successully configure Open vSwitch")
285

    
286
    cluster = self.cfg.GetClusterInfo()
287
    _UpdateMasterClientCert(self, self.master_uuid, cluster, feedback_fn)
288

    
289
    return True
290

    
291

    
292
class ClusterQuery(QueryBase):
293
  FIELDS = query.CLUSTER_FIELDS
294

    
295
  #: Do not sort (there is only one item)
296
  SORT_FIELD = None
297

    
298
  def ExpandNames(self, lu):
299
    lu.needed_locks = {}
300

    
301
    # The following variables interact with _QueryBase._GetNames
302
    self.wanted = locking.ALL_SET
303
    self.do_locking = self.use_locking
304

    
305
    if self.do_locking:
306
      raise errors.OpPrereqError("Can not use locking for cluster queries",
307
                                 errors.ECODE_INVAL)
308

    
309
  def DeclareLocks(self, lu, level):
310
    pass
311

    
312
  def _GetQueryData(self, lu):
313
    """Computes the list of nodes and their attributes.
314

315
    """
316
    # Locking is not used
317
    assert not (compat.any(lu.glm.is_owned(level)
318
                           for level in locking.LEVELS
319
                           if level != locking.LEVEL_CLUSTER) or
320
                self.do_locking or self.use_locking)
321

    
322
    if query.CQ_CONFIG in self.requested_data:
323
      cluster = lu.cfg.GetClusterInfo()
324
      nodes = lu.cfg.GetAllNodesInfo()
325
    else:
326
      cluster = NotImplemented
327
      nodes = NotImplemented
328

    
329
    if query.CQ_QUEUE_DRAINED in self.requested_data:
330
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
331
    else:
332
      drain_flag = NotImplemented
333

    
334
    if query.CQ_WATCHER_PAUSE in self.requested_data:
335
      master_node_uuid = lu.cfg.GetMasterNode()
336

    
337
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
338
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
339
                   lu.cfg.GetMasterNodeName())
340

    
341
      watcher_pause = result.payload
342
    else:
343
      watcher_pause = NotImplemented
344

    
345
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
346

    
347

    
348
class LUClusterQuery(NoHooksLU):
349
  """Query cluster configuration.
350

351
  """
352
  REQ_BGL = False
353

    
354
  def ExpandNames(self):
355
    self.needed_locks = {}
356

    
357
  def Exec(self, feedback_fn):
358
    """Return cluster config.
359

360
    """
361
    cluster = self.cfg.GetClusterInfo()
362
    os_hvp = {}
363

    
364
    # Filter just for enabled hypervisors
365
    for os_name, hv_dict in cluster.os_hvp.items():
366
      os_hvp[os_name] = {}
367
      for hv_name, hv_params in hv_dict.items():
368
        if hv_name in cluster.enabled_hypervisors:
369
          os_hvp[os_name][hv_name] = hv_params
370

    
371
    # Convert ip_family to ip_version
372
    primary_ip_version = constants.IP4_VERSION
373
    if cluster.primary_ip_family == netutils.IP6Address.family:
374
      primary_ip_version = constants.IP6_VERSION
375

    
376
    result = {
377
      "software_version": constants.RELEASE_VERSION,
378
      "protocol_version": constants.PROTOCOL_VERSION,
379
      "config_version": constants.CONFIG_VERSION,
380
      "os_api_version": max(constants.OS_API_VERSIONS),
381
      "export_version": constants.EXPORT_VERSION,
382
      "vcs_version": constants.VCS_VERSION,
383
      "architecture": runtime.GetArchInfo(),
384
      "name": cluster.cluster_name,
385
      "master": self.cfg.GetMasterNodeName(),
386
      "default_hypervisor": cluster.primary_hypervisor,
387
      "enabled_hypervisors": cluster.enabled_hypervisors,
388
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
389
                        for hypervisor_name in cluster.enabled_hypervisors]),
390
      "os_hvp": os_hvp,
391
      "beparams": cluster.beparams,
392
      "osparams": cluster.osparams,
393
      "ipolicy": cluster.ipolicy,
394
      "nicparams": cluster.nicparams,
395
      "ndparams": cluster.ndparams,
396
      "diskparams": cluster.diskparams,
397
      "candidate_pool_size": cluster.candidate_pool_size,
398
      "max_running_jobs": cluster.max_running_jobs,
399
      "master_netdev": cluster.master_netdev,
400
      "master_netmask": cluster.master_netmask,
401
      "use_external_mip_script": cluster.use_external_mip_script,
402
      "volume_group_name": cluster.volume_group_name,
403
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
404
      "file_storage_dir": cluster.file_storage_dir,
405
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
406
      "maintain_node_health": cluster.maintain_node_health,
407
      "ctime": cluster.ctime,
408
      "mtime": cluster.mtime,
409
      "uuid": cluster.uuid,
410
      "tags": list(cluster.GetTags()),
411
      "uid_pool": cluster.uid_pool,
412
      "default_iallocator": cluster.default_iallocator,
413
      "default_iallocator_params": cluster.default_iallocator_params,
414
      "reserved_lvs": cluster.reserved_lvs,
415
      "primary_ip_version": primary_ip_version,
416
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
417
      "hidden_os": cluster.hidden_os,
418
      "blacklisted_os": cluster.blacklisted_os,
419
      "enabled_disk_templates": cluster.enabled_disk_templates,
420
      }
421

    
422
    return result
423

    
424

    
425
class LUClusterRedistConf(NoHooksLU):
426
  """Force the redistribution of cluster configuration.
427

428
  This is a very simple LU.
429

430
  """
431
  REQ_BGL = False
432

    
433
  def ExpandNames(self):
434
    self.needed_locks = {
435
      locking.LEVEL_NODE: locking.ALL_SET,
436
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
437
    }
438
    self.share_locks = ShareAll()
439

    
440
  def Exec(self, feedback_fn):
441
    """Redistribute the configuration.
442

443
    """
444
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
445
    RedistributeAncillaryFiles(self)
446

    
447

    
448
class LUClusterRename(LogicalUnit):
449
  """Rename the cluster.
450

451
  """
452
  HPATH = "cluster-rename"
453
  HTYPE = constants.HTYPE_CLUSTER
454

    
455
  def BuildHooksEnv(self):
456
    """Build hooks env.
457

458
    """
459
    return {
460
      "OP_TARGET": self.cfg.GetClusterName(),
461
      "NEW_NAME": self.op.name,
462
      }
463

    
464
  def BuildHooksNodes(self):
465
    """Build hooks nodes.
466

467
    """
468
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
469

    
470
  def CheckPrereq(self):
471
    """Verify that the passed name is a valid one.
472

473
    """
474
    hostname = netutils.GetHostname(name=self.op.name,
475
                                    family=self.cfg.GetPrimaryIPFamily())
476

    
477
    new_name = hostname.name
478
    self.ip = new_ip = hostname.ip
479
    old_name = self.cfg.GetClusterName()
480
    old_ip = self.cfg.GetMasterIP()
481
    if new_name == old_name and new_ip == old_ip:
482
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
483
                                 " cluster has changed",
484
                                 errors.ECODE_INVAL)
485
    if new_ip != old_ip:
486
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
487
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
488
                                   " reachable on the network" %
489
                                   new_ip, errors.ECODE_NOTUNIQUE)
490

    
491
    self.op.name = new_name
492

    
493
  def Exec(self, feedback_fn):
494
    """Rename the cluster.
495

496
    """
497
    clustername = self.op.name
498
    new_ip = self.ip
499

    
500
    # shutdown the master IP
501
    master_params = self.cfg.GetMasterNetworkParameters()
502
    ems = self.cfg.GetUseExternalMipScript()
503
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
504
                                                     master_params, ems)
505
    result.Raise("Could not disable the master role")
506

    
507
    try:
508
      cluster = self.cfg.GetClusterInfo()
509
      cluster.cluster_name = clustername
510
      cluster.master_ip = new_ip
511
      self.cfg.Update(cluster, feedback_fn)
512

    
513
      # update the known hosts file
514
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
515
      node_list = self.cfg.GetOnlineNodeList()
516
      try:
517
        node_list.remove(master_params.uuid)
518
      except ValueError:
519
        pass
520
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
521
    finally:
522
      master_params.ip = new_ip
523
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
524
                                                     master_params, ems)
525
      result.Warn("Could not re-enable the master role on the master,"
526
                  " please restart manually", self.LogWarning)
527

    
528
    return clustername
529

    
530

    
531
class LUClusterRepairDiskSizes(NoHooksLU):
532
  """Verifies the cluster disks sizes.
533

534
  """
535
  REQ_BGL = False
536

    
537
  def ExpandNames(self):
538
    if self.op.instances:
539
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
540
      # Not getting the node allocation lock as only a specific set of
541
      # instances (and their nodes) is going to be acquired
542
      self.needed_locks = {
543
        locking.LEVEL_NODE_RES: [],
544
        locking.LEVEL_INSTANCE: self.wanted_names,
545
        }
546
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
547
    else:
548
      self.wanted_names = None
549
      self.needed_locks = {
550
        locking.LEVEL_NODE_RES: locking.ALL_SET,
551
        locking.LEVEL_INSTANCE: locking.ALL_SET,
552

    
553
        # This opcode is acquires the node locks for all instances
554
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
555
        }
556

    
557
    self.share_locks = {
558
      locking.LEVEL_NODE_RES: 1,
559
      locking.LEVEL_INSTANCE: 0,
560
      locking.LEVEL_NODE_ALLOC: 1,
561
      }
562

    
563
  def DeclareLocks(self, level):
564
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
565
      self._LockInstancesNodes(primary_only=True, level=level)
566

    
567
  def CheckPrereq(self):
568
    """Check prerequisites.
569

570
    This only checks the optional instance list against the existing names.
571

572
    """
573
    if self.wanted_names is None:
574
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
575

    
576
    self.wanted_instances = \
577
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
578

    
579
  def _EnsureChildSizes(self, disk):
580
    """Ensure children of the disk have the needed disk size.
581

582
    This is valid mainly for DRBD8 and fixes an issue where the
583
    children have smaller disk size.
584

585
    @param disk: an L{ganeti.objects.Disk} object
586

587
    """
588
    if disk.dev_type == constants.DT_DRBD8:
589
      assert disk.children, "Empty children for DRBD8?"
590
      fchild = disk.children[0]
591
      mismatch = fchild.size < disk.size
592
      if mismatch:
593
        self.LogInfo("Child disk has size %d, parent %d, fixing",
594
                     fchild.size, disk.size)
595
        fchild.size = disk.size
596

    
597
      # and we recurse on this child only, not on the metadev
598
      return self._EnsureChildSizes(fchild) or mismatch
599
    else:
600
      return False
601

    
602
  def Exec(self, feedback_fn):
603
    """Verify the size of cluster disks.
604

605
    """
606
    # TODO: check child disks too
607
    # TODO: check differences in size between primary/secondary nodes
608
    per_node_disks = {}
609
    for instance in self.wanted_instances:
610
      pnode = instance.primary_node
611
      if pnode not in per_node_disks:
612
        per_node_disks[pnode] = []
613
      for idx, disk in enumerate(instance.disks):
614
        per_node_disks[pnode].append((instance, idx, disk))
615

    
616
    assert not (frozenset(per_node_disks.keys()) -
617
                self.owned_locks(locking.LEVEL_NODE_RES)), \
618
      "Not owning correct locks"
619
    assert not self.owned_locks(locking.LEVEL_NODE)
620

    
621
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
622
                                               per_node_disks.keys())
623

    
624
    changed = []
625
    for node_uuid, dskl in per_node_disks.items():
626
      if not dskl:
627
        # no disks on the node
628
        continue
629

    
630
      newl = [([v[2].Copy()], v[0]) for v in dskl]
631
      node_name = self.cfg.GetNodeName(node_uuid)
632
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
633
      if result.fail_msg:
634
        self.LogWarning("Failure in blockdev_getdimensions call to node"
635
                        " %s, ignoring", node_name)
636
        continue
637
      if len(result.payload) != len(dskl):
638
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
639
                        " result.payload=%s", node_name, len(dskl),
640
                        result.payload)
641
        self.LogWarning("Invalid result from node %s, ignoring node results",
642
                        node_name)
643
        continue
644
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
645
        if dimensions is None:
646
          self.LogWarning("Disk %d of instance %s did not return size"
647
                          " information, ignoring", idx, instance.name)
648
          continue
649
        if not isinstance(dimensions, (tuple, list)):
650
          self.LogWarning("Disk %d of instance %s did not return valid"
651
                          " dimension information, ignoring", idx,
652
                          instance.name)
653
          continue
654
        (size, spindles) = dimensions
655
        if not isinstance(size, (int, long)):
656
          self.LogWarning("Disk %d of instance %s did not return valid"
657
                          " size information, ignoring", idx, instance.name)
658
          continue
659
        size = size >> 20
660
        if size != disk.size:
661
          self.LogInfo("Disk %d of instance %s has mismatched size,"
662
                       " correcting: recorded %d, actual %d", idx,
663
                       instance.name, disk.size, size)
664
          disk.size = size
665
          self.cfg.Update(instance, feedback_fn)
666
          changed.append((instance.name, idx, "size", size))
667
        if es_flags[node_uuid]:
668
          if spindles is None:
669
            self.LogWarning("Disk %d of instance %s did not return valid"
670
                            " spindles information, ignoring", idx,
671
                            instance.name)
672
          elif disk.spindles is None or disk.spindles != spindles:
673
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
674
                         " correcting: recorded %s, actual %s",
675
                         idx, instance.name, disk.spindles, spindles)
676
            disk.spindles = spindles
677
            self.cfg.Update(instance, feedback_fn)
678
            changed.append((instance.name, idx, "spindles", disk.spindles))
679
        if self._EnsureChildSizes(disk):
680
          self.cfg.Update(instance, feedback_fn)
681
          changed.append((instance.name, idx, "size", disk.size))
682
    return changed
683

    
684

    
685
def _ValidateNetmask(cfg, netmask):
686
  """Checks if a netmask is valid.
687

688
  @type cfg: L{config.ConfigWriter}
689
  @param cfg: The cluster configuration
690
  @type netmask: int
691
  @param netmask: the netmask to be verified
692
  @raise errors.OpPrereqError: if the validation fails
693

694
  """
695
  ip_family = cfg.GetPrimaryIPFamily()
696
  try:
697
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
698
  except errors.ProgrammerError:
699
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
700
                               ip_family, errors.ECODE_INVAL)
701
  if not ipcls.ValidateNetmask(netmask):
702
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
703
                               (netmask), errors.ECODE_INVAL)
704

    
705

    
706
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
707
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
708
    file_disk_template):
709
  """Checks whether the given file-based storage directory is acceptable.
710

711
  Note: This function is public, because it is also used in bootstrap.py.
712

713
  @type logging_warn_fn: function
714
  @param logging_warn_fn: function which accepts a string and logs it
715
  @type file_storage_dir: string
716
  @param file_storage_dir: the directory to be used for file-based instances
717
  @type enabled_disk_templates: list of string
718
  @param enabled_disk_templates: the list of enabled disk templates
719
  @type file_disk_template: string
720
  @param file_disk_template: the file-based disk template for which the
721
      path should be checked
722

723
  """
724
  assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
725
            constants.ST_FILE, constants.ST_SHARED_FILE
726
         ))
727
  file_storage_enabled = file_disk_template in enabled_disk_templates
728
  if file_storage_dir is not None:
729
    if file_storage_dir == "":
730
      if file_storage_enabled:
731
        raise errors.OpPrereqError(
732
            "Unsetting the '%s' storage directory while having '%s' storage"
733
            " enabled is not permitted." %
734
            (file_disk_template, file_disk_template))
735
    else:
736
      if not file_storage_enabled:
737
        logging_warn_fn(
738
            "Specified a %s storage directory, although %s storage is not"
739
            " enabled." % (file_disk_template, file_disk_template))
740
  else:
741
    raise errors.ProgrammerError("Received %s storage dir with value"
742
                                 " 'None'." % file_disk_template)
743

    
744

    
745
def CheckFileStoragePathVsEnabledDiskTemplates(
746
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
747
  """Checks whether the given file storage directory is acceptable.
748

749
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
750

751
  """
752
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
753
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
754
      constants.DT_FILE)
755

    
756

    
757
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
758
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
759
  """Checks whether the given shared file storage directory is acceptable.
760

761
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
762

763
  """
764
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
765
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
766
      constants.DT_SHARED_FILE)
767

    
768

    
769
class LUClusterSetParams(LogicalUnit):
770
  """Change the parameters of the cluster.
771

772
  """
773
  HPATH = "cluster-modify"
774
  HTYPE = constants.HTYPE_CLUSTER
775
  REQ_BGL = False
776

    
777
  def CheckArguments(self):
778
    """Check parameters
779

780
    """
781
    if self.op.uid_pool:
782
      uidpool.CheckUidPool(self.op.uid_pool)
783

    
784
    if self.op.add_uids:
785
      uidpool.CheckUidPool(self.op.add_uids)
786

    
787
    if self.op.remove_uids:
788
      uidpool.CheckUidPool(self.op.remove_uids)
789

    
790
    if self.op.master_netmask is not None:
791
      _ValidateNetmask(self.cfg, self.op.master_netmask)
792

    
793
    if self.op.diskparams:
794
      for dt_params in self.op.diskparams.values():
795
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
796
      try:
797
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
798
        CheckDiskAccessModeValidity(self.op.diskparams)
799
      except errors.OpPrereqError, err:
800
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
801
                                   errors.ECODE_INVAL)
802

    
803
  def ExpandNames(self):
804
    # FIXME: in the future maybe other cluster params won't require checking on
805
    # all nodes to be modified.
806
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
807
    # resource locks the right thing, shouldn't it be the BGL instead?
808
    self.needed_locks = {
809
      locking.LEVEL_NODE: locking.ALL_SET,
810
      locking.LEVEL_INSTANCE: locking.ALL_SET,
811
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
812
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
813
    }
814
    self.share_locks = ShareAll()
815

    
816
  def BuildHooksEnv(self):
817
    """Build hooks env.
818

819
    """
820
    return {
821
      "OP_TARGET": self.cfg.GetClusterName(),
822
      "NEW_VG_NAME": self.op.vg_name,
823
      }
824

    
825
  def BuildHooksNodes(self):
826
    """Build hooks nodes.
827

828
    """
829
    mn = self.cfg.GetMasterNode()
830
    return ([mn], [mn])
831

    
832
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
833
                   new_enabled_disk_templates):
834
    """Check the consistency of the vg name on all nodes and in case it gets
835
       unset whether there are instances still using it.
836

837
    """
838
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
839
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
840
                                            new_enabled_disk_templates)
841
    current_vg_name = self.cfg.GetVGName()
842

    
843
    if self.op.vg_name == '':
844
      if lvm_is_enabled:
845
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
846
                                   " disk templates are or get enabled.")
847

    
848
    if self.op.vg_name is None:
849
      if current_vg_name is None and lvm_is_enabled:
850
        raise errors.OpPrereqError("Please specify a volume group when"
851
                                   " enabling lvm-based disk-templates.")
852

    
853
    if self.op.vg_name is not None and not self.op.vg_name:
854
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
855
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
856
                                   " instances exist", errors.ECODE_INVAL)
857

    
858
    if (self.op.vg_name is not None and lvm_is_enabled) or \
859
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
860
      self._CheckVgNameOnNodes(node_uuids)
861

    
862
  def _CheckVgNameOnNodes(self, node_uuids):
863
    """Check the status of the volume group on each node.
864

865
    """
866
    vglist = self.rpc.call_vg_list(node_uuids)
867
    for node_uuid in node_uuids:
868
      msg = vglist[node_uuid].fail_msg
869
      if msg:
870
        # ignoring down node
871
        self.LogWarning("Error while gathering data on node %s"
872
                        " (ignoring node): %s",
873
                        self.cfg.GetNodeName(node_uuid), msg)
874
        continue
875
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
876
                                            self.op.vg_name,
877
                                            constants.MIN_VG_SIZE)
878
      if vgstatus:
879
        raise errors.OpPrereqError("Error on node '%s': %s" %
880
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
881
                                   errors.ECODE_ENVIRON)
882

    
883
  @staticmethod
884
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
885
                                old_enabled_disk_templates):
886
    """Computes three sets of disk templates.
887

888
    @see: C{_GetDiskTemplateSets} for more details.
889

890
    """
891
    enabled_disk_templates = None
892
    new_enabled_disk_templates = []
893
    disabled_disk_templates = []
894
    if op_enabled_disk_templates:
895
      enabled_disk_templates = op_enabled_disk_templates
896
      new_enabled_disk_templates = \
897
        list(set(enabled_disk_templates)
898
             - set(old_enabled_disk_templates))
899
      disabled_disk_templates = \
900
        list(set(old_enabled_disk_templates)
901
             - set(enabled_disk_templates))
902
    else:
903
      enabled_disk_templates = old_enabled_disk_templates
904
    return (enabled_disk_templates, new_enabled_disk_templates,
905
            disabled_disk_templates)
906

    
907
  def _GetDiskTemplateSets(self, cluster):
908
    """Computes three sets of disk templates.
909

910
    The three sets are:
911
      - disk templates that will be enabled after this operation (no matter if
912
        they were enabled before or not)
913
      - disk templates that get enabled by this operation (thus haven't been
914
        enabled before.)
915
      - disk templates that get disabled by this operation
916

917
    """
918
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
919
                                          cluster.enabled_disk_templates)
920

    
921
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
922
    """Checks the ipolicy.
923

924
    @type cluster: C{objects.Cluster}
925
    @param cluster: the cluster's configuration
926
    @type enabled_disk_templates: list of string
927
    @param enabled_disk_templates: list of (possibly newly) enabled disk
928
      templates
929

930
    """
931
    # FIXME: write unit tests for this
932
    if self.op.ipolicy:
933
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
934
                                           group_policy=False)
935

    
936
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
937
                                  enabled_disk_templates)
938

    
939
      all_instances = self.cfg.GetAllInstancesInfo().values()
940
      violations = set()
941
      for group in self.cfg.GetAllNodeGroupsInfo().values():
942
        instances = frozenset([inst for inst in all_instances
943
                               if compat.any(nuuid in group.members
944
                                             for nuuid in inst.all_nodes)])
945
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
946
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
947
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
948
                                           self.cfg)
949
        if new:
950
          violations.update(new)
951

    
952
      if violations:
953
        self.LogWarning("After the ipolicy change the following instances"
954
                        " violate them: %s",
955
                        utils.CommaJoin(utils.NiceSort(violations)))
956
    else:
957
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
958
                                  enabled_disk_templates)
959

    
960
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
961
    """Checks whether the set DRBD helper actually exists on the nodes.
962

963
    @type drbd_helper: string
964
    @param drbd_helper: path of the drbd usermode helper binary
965
    @type node_uuids: list of strings
966
    @param node_uuids: list of node UUIDs to check for the helper
967

968
    """
969
    # checks given drbd helper on all nodes
970
    helpers = self.rpc.call_drbd_helper(node_uuids)
971
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
972
      if ninfo.offline:
973
        self.LogInfo("Not checking drbd helper on offline node %s",
974
                     ninfo.name)
975
        continue
976
      msg = helpers[ninfo.uuid].fail_msg
977
      if msg:
978
        raise errors.OpPrereqError("Error checking drbd helper on node"
979
                                   " '%s': %s" % (ninfo.name, msg),
980
                                   errors.ECODE_ENVIRON)
981
      node_helper = helpers[ninfo.uuid].payload
982
      if node_helper != drbd_helper:
983
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
984
                                   (ninfo.name, node_helper),
985
                                   errors.ECODE_ENVIRON)
986

    
987
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
988
    """Check the DRBD usermode helper.
989

990
    @type node_uuids: list of strings
991
    @param node_uuids: a list of nodes' UUIDs
992
    @type drbd_enabled: boolean
993
    @param drbd_enabled: whether DRBD will be enabled after this operation
994
      (no matter if it was disabled before or not)
995
    @type drbd_gets_enabled: boolen
996
    @param drbd_gets_enabled: true if DRBD was disabled before this
997
      operation, but will be enabled afterwards
998

999
    """
1000
    if self.op.drbd_helper == '':
1001
      if drbd_enabled:
1002
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1003
                                   " DRBD is enabled.")
1004
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
1005
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1006
                                   " drbd-based instances exist",
1007
                                   errors.ECODE_INVAL)
1008

    
1009
    else:
1010
      if self.op.drbd_helper is not None and drbd_enabled:
1011
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
1012
      else:
1013
        if drbd_gets_enabled:
1014
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
1015
          if current_drbd_helper is not None:
1016
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
1017
          else:
1018
            raise errors.OpPrereqError("Cannot enable DRBD without a"
1019
                                       " DRBD usermode helper set.")
1020

    
1021
  def _CheckInstancesOfDisabledDiskTemplates(
1022
      self, disabled_disk_templates):
1023
    """Check whether we try to disable a disk template that is in use.
1024

1025
    @type disabled_disk_templates: list of string
1026
    @param disabled_disk_templates: list of disk templates that are going to
1027
      be disabled by this operation
1028

1029
    """
1030
    for disk_template in disabled_disk_templates:
1031
      if self.cfg.HasAnyDiskOfType(disk_template):
1032
        raise errors.OpPrereqError(
1033
            "Cannot disable disk template '%s', because there is at least one"
1034
            " instance using it." % disk_template)
1035

    
1036
  def CheckPrereq(self):
1037
    """Check prerequisites.
1038

1039
    This checks whether the given params don't conflict and
1040
    if the given volume group is valid.
1041

1042
    """
1043
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1044
    self.cluster = cluster = self.cfg.GetClusterInfo()
1045

    
1046
    vm_capable_node_uuids = [node.uuid
1047
                             for node in self.cfg.GetAllNodesInfo().values()
1048
                             if node.uuid in node_uuids and node.vm_capable]
1049

    
1050
    (enabled_disk_templates, new_enabled_disk_templates,
1051
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
1052
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
1053

    
1054
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
1055
                      new_enabled_disk_templates)
1056

    
1057
    if self.op.file_storage_dir is not None:
1058
      CheckFileStoragePathVsEnabledDiskTemplates(
1059
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
1060

    
1061
    if self.op.shared_file_storage_dir is not None:
1062
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
1063
          self.LogWarning, self.op.shared_file_storage_dir,
1064
          enabled_disk_templates)
1065

    
1066
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1067
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1068
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1069

    
1070
    # validate params changes
1071
    if self.op.beparams:
1072
      objects.UpgradeBeParams(self.op.beparams)
1073
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1074
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1075

    
1076
    if self.op.ndparams:
1077
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1078
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1079

    
1080
      # TODO: we need a more general way to handle resetting
1081
      # cluster-level parameters to default values
1082
      if self.new_ndparams["oob_program"] == "":
1083
        self.new_ndparams["oob_program"] = \
1084
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1085

    
1086
    if self.op.hv_state:
1087
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1088
                                           self.cluster.hv_state_static)
1089
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1090
                               for hv, values in new_hv_state.items())
1091

    
1092
    if self.op.disk_state:
1093
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1094
                                               self.cluster.disk_state_static)
1095
      self.new_disk_state = \
1096
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1097
                            for name, values in svalues.items()))
1098
             for storage, svalues in new_disk_state.items())
1099

    
1100
    self._CheckIpolicy(cluster, enabled_disk_templates)
1101

    
1102
    if self.op.nicparams:
1103
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1104
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1105
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1106
      nic_errors = []
1107

    
1108
      # check all instances for consistency
1109
      for instance in self.cfg.GetAllInstancesInfo().values():
1110
        for nic_idx, nic in enumerate(instance.nics):
1111
          params_copy = copy.deepcopy(nic.nicparams)
1112
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1113

    
1114
          # check parameter syntax
1115
          try:
1116
            objects.NIC.CheckParameterSyntax(params_filled)
1117
          except errors.ConfigurationError, err:
1118
            nic_errors.append("Instance %s, nic/%d: %s" %
1119
                              (instance.name, nic_idx, err))
1120

    
1121
          # if we're moving instances to routed, check that they have an ip
1122
          target_mode = params_filled[constants.NIC_MODE]
1123
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1124
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1125
                              " address" % (instance.name, nic_idx))
1126
      if nic_errors:
1127
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1128
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1129

    
1130
    # hypervisor list/parameters
1131
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1132
    if self.op.hvparams:
1133
      for hv_name, hv_dict in self.op.hvparams.items():
1134
        if hv_name not in self.new_hvparams:
1135
          self.new_hvparams[hv_name] = hv_dict
1136
        else:
1137
          self.new_hvparams[hv_name].update(hv_dict)
1138

    
1139
    # disk template parameters
1140
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1141
    if self.op.diskparams:
1142
      for dt_name, dt_params in self.op.diskparams.items():
1143
        if dt_name not in self.new_diskparams:
1144
          self.new_diskparams[dt_name] = dt_params
1145
        else:
1146
          self.new_diskparams[dt_name].update(dt_params)
1147
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1148

    
1149
    # os hypervisor parameters
1150
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1151
    if self.op.os_hvp:
1152
      for os_name, hvs in self.op.os_hvp.items():
1153
        if os_name not in self.new_os_hvp:
1154
          self.new_os_hvp[os_name] = hvs
1155
        else:
1156
          for hv_name, hv_dict in hvs.items():
1157
            if hv_dict is None:
1158
              # Delete if it exists
1159
              self.new_os_hvp[os_name].pop(hv_name, None)
1160
            elif hv_name not in self.new_os_hvp[os_name]:
1161
              self.new_os_hvp[os_name][hv_name] = hv_dict
1162
            else:
1163
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1164

    
1165
    # os parameters
1166
    self.new_osp = objects.FillDict(cluster.osparams, {})
1167
    if self.op.osparams:
1168
      for os_name, osp in self.op.osparams.items():
1169
        if os_name not in self.new_osp:
1170
          self.new_osp[os_name] = {}
1171

    
1172
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1173
                                                 use_none=True)
1174

    
1175
        if not self.new_osp[os_name]:
1176
          # we removed all parameters
1177
          del self.new_osp[os_name]
1178
        else:
1179
          # check the parameter validity (remote check)
1180
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1181
                        os_name, self.new_osp[os_name])
1182

    
1183
    # changes to the hypervisor list
1184
    if self.op.enabled_hypervisors is not None:
1185
      self.hv_list = self.op.enabled_hypervisors
1186
      for hv in self.hv_list:
1187
        # if the hypervisor doesn't already exist in the cluster
1188
        # hvparams, we initialize it to empty, and then (in both
1189
        # cases) we make sure to fill the defaults, as we might not
1190
        # have a complete defaults list if the hypervisor wasn't
1191
        # enabled before
1192
        if hv not in new_hvp:
1193
          new_hvp[hv] = {}
1194
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1195
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1196
    else:
1197
      self.hv_list = cluster.enabled_hypervisors
1198

    
1199
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1200
      # either the enabled list has changed, or the parameters have, validate
1201
      for hv_name, hv_params in self.new_hvparams.items():
1202
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1203
            (self.op.enabled_hypervisors and
1204
             hv_name in self.op.enabled_hypervisors)):
1205
          # either this is a new hypervisor, or its parameters have changed
1206
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1207
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1208
          hv_class.CheckParameterSyntax(hv_params)
1209
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1210

    
1211
    self._CheckDiskTemplateConsistency()
1212

    
1213
    if self.op.os_hvp:
1214
      # no need to check any newly-enabled hypervisors, since the
1215
      # defaults have already been checked in the above code-block
1216
      for os_name, os_hvp in self.new_os_hvp.items():
1217
        for hv_name, hv_params in os_hvp.items():
1218
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1219
          # we need to fill in the new os_hvp on top of the actual hv_p
1220
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1221
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1222
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1223
          hv_class.CheckParameterSyntax(new_osp)
1224
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1225

    
1226
    if self.op.default_iallocator:
1227
      alloc_script = utils.FindFile(self.op.default_iallocator,
1228
                                    constants.IALLOCATOR_SEARCH_PATH,
1229
                                    os.path.isfile)
1230
      if alloc_script is None:
1231
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1232
                                   " specified" % self.op.default_iallocator,
1233
                                   errors.ECODE_INVAL)
1234

    
1235
  def _CheckDiskTemplateConsistency(self):
1236
    """Check whether the disk templates that are going to be disabled
1237
       are still in use by some instances.
1238

1239
    """
1240
    if self.op.enabled_disk_templates:
1241
      cluster = self.cfg.GetClusterInfo()
1242
      instances = self.cfg.GetAllInstancesInfo()
1243

    
1244
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1245
        - set(self.op.enabled_disk_templates)
1246
      for instance in instances.itervalues():
1247
        if instance.disk_template in disk_templates_to_remove:
1248
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1249
                                     " because instance '%s' is using it." %
1250
                                     (instance.disk_template, instance.name))
1251

    
1252
  def _SetVgName(self, feedback_fn):
1253
    """Determines and sets the new volume group name.
1254

1255
    """
1256
    if self.op.vg_name is not None:
1257
      new_volume = self.op.vg_name
1258
      if not new_volume:
1259
        new_volume = None
1260
      if new_volume != self.cfg.GetVGName():
1261
        self.cfg.SetVGName(new_volume)
1262
      else:
1263
        feedback_fn("Cluster LVM configuration already in desired"
1264
                    " state, not changing")
1265

    
1266
  def _SetFileStorageDir(self, feedback_fn):
1267
    """Set the file storage directory.
1268

1269
    """
1270
    if self.op.file_storage_dir is not None:
1271
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1272
        feedback_fn("Global file storage dir already set to value '%s'"
1273
                    % self.cluster.file_storage_dir)
1274
      else:
1275
        self.cluster.file_storage_dir = self.op.file_storage_dir
1276

    
1277
  def _SetDrbdHelper(self, feedback_fn):
1278
    """Set the DRBD usermode helper.
1279

1280
    """
1281
    if self.op.drbd_helper is not None:
1282
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1283
        feedback_fn("Note that you specified a drbd user helper, but did not"
1284
                    " enable the drbd disk template.")
1285
      new_helper = self.op.drbd_helper
1286
      if not new_helper:
1287
        new_helper = None
1288
      if new_helper != self.cfg.GetDRBDHelper():
1289
        self.cfg.SetDRBDHelper(new_helper)
1290
      else:
1291
        feedback_fn("Cluster DRBD helper already in desired state,"
1292
                    " not changing")
1293

    
1294
  def Exec(self, feedback_fn):
1295
    """Change the parameters of the cluster.
1296

1297
    """
1298
    if self.op.enabled_disk_templates:
1299
      self.cluster.enabled_disk_templates = \
1300
        list(self.op.enabled_disk_templates)
1301

    
1302
    self._SetVgName(feedback_fn)
1303
    self._SetFileStorageDir(feedback_fn)
1304
    self._SetDrbdHelper(feedback_fn)
1305

    
1306
    if self.op.hvparams:
1307
      self.cluster.hvparams = self.new_hvparams
1308
    if self.op.os_hvp:
1309
      self.cluster.os_hvp = self.new_os_hvp
1310
    if self.op.enabled_hypervisors is not None:
1311
      self.cluster.hvparams = self.new_hvparams
1312
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1313
    if self.op.beparams:
1314
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1315
    if self.op.nicparams:
1316
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1317
    if self.op.ipolicy:
1318
      self.cluster.ipolicy = self.new_ipolicy
1319
    if self.op.osparams:
1320
      self.cluster.osparams = self.new_osp
1321
    if self.op.ndparams:
1322
      self.cluster.ndparams = self.new_ndparams
1323
    if self.op.diskparams:
1324
      self.cluster.diskparams = self.new_diskparams
1325
    if self.op.hv_state:
1326
      self.cluster.hv_state_static = self.new_hv_state
1327
    if self.op.disk_state:
1328
      self.cluster.disk_state_static = self.new_disk_state
1329

    
1330
    if self.op.candidate_pool_size is not None:
1331
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1332
      # we need to update the pool size here, otherwise the save will fail
1333
      AdjustCandidatePool(self, [], feedback_fn)
1334

    
1335
    if self.op.max_running_jobs is not None:
1336
      self.cluster.max_running_jobs = self.op.max_running_jobs
1337

    
1338
    if self.op.maintain_node_health is not None:
1339
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1340
        feedback_fn("Note: CONFD was disabled at build time, node health"
1341
                    " maintenance is not useful (still enabling it)")
1342
      self.cluster.maintain_node_health = self.op.maintain_node_health
1343

    
1344
    if self.op.modify_etc_hosts is not None:
1345
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1346

    
1347
    if self.op.prealloc_wipe_disks is not None:
1348
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1349

    
1350
    if self.op.add_uids is not None:
1351
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1352

    
1353
    if self.op.remove_uids is not None:
1354
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1355

    
1356
    if self.op.uid_pool is not None:
1357
      self.cluster.uid_pool = self.op.uid_pool
1358

    
1359
    if self.op.default_iallocator is not None:
1360
      self.cluster.default_iallocator = self.op.default_iallocator
1361

    
1362
    if self.op.default_iallocator_params is not None:
1363
      self.cluster.default_iallocator_params = self.op.default_iallocator_params
1364

    
1365
    if self.op.reserved_lvs is not None:
1366
      self.cluster.reserved_lvs = self.op.reserved_lvs
1367

    
1368
    if self.op.use_external_mip_script is not None:
1369
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1370

    
1371
    def helper_os(aname, mods, desc):
1372
      desc += " OS list"
1373
      lst = getattr(self.cluster, aname)
1374
      for key, val in mods:
1375
        if key == constants.DDM_ADD:
1376
          if val in lst:
1377
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1378
          else:
1379
            lst.append(val)
1380
        elif key == constants.DDM_REMOVE:
1381
          if val in lst:
1382
            lst.remove(val)
1383
          else:
1384
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1385
        else:
1386
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1387

    
1388
    if self.op.hidden_os:
1389
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1390

    
1391
    if self.op.blacklisted_os:
1392
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1393

    
1394
    if self.op.master_netdev:
1395
      master_params = self.cfg.GetMasterNetworkParameters()
1396
      ems = self.cfg.GetUseExternalMipScript()
1397
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1398
                  self.cluster.master_netdev)
1399
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1400
                                                       master_params, ems)
1401
      if not self.op.force:
1402
        result.Raise("Could not disable the master ip")
1403
      else:
1404
        if result.fail_msg:
1405
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1406
                 result.fail_msg)
1407
          feedback_fn(msg)
1408
      feedback_fn("Changing master_netdev from %s to %s" %
1409
                  (master_params.netdev, self.op.master_netdev))
1410
      self.cluster.master_netdev = self.op.master_netdev
1411

    
1412
    if self.op.master_netmask:
1413
      master_params = self.cfg.GetMasterNetworkParameters()
1414
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1415
      result = self.rpc.call_node_change_master_netmask(
1416
                 master_params.uuid, master_params.netmask,
1417
                 self.op.master_netmask, master_params.ip,
1418
                 master_params.netdev)
1419
      result.Warn("Could not change the master IP netmask", feedback_fn)
1420
      self.cluster.master_netmask = self.op.master_netmask
1421

    
1422
    self.cfg.Update(self.cluster, feedback_fn)
1423

    
1424
    if self.op.master_netdev:
1425
      master_params = self.cfg.GetMasterNetworkParameters()
1426
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1427
                  self.op.master_netdev)
1428
      ems = self.cfg.GetUseExternalMipScript()
1429
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1430
                                                     master_params, ems)
1431
      result.Warn("Could not re-enable the master ip on the master,"
1432
                  " please restart manually", self.LogWarning)
1433

    
1434

    
1435
class LUClusterVerify(NoHooksLU):
1436
  """Submits all jobs necessary to verify the cluster.
1437

1438
  """
1439
  REQ_BGL = False
1440

    
1441
  def ExpandNames(self):
1442
    self.needed_locks = {}
1443

    
1444
  def Exec(self, feedback_fn):
1445
    jobs = []
1446

    
1447
    if self.op.group_name:
1448
      groups = [self.op.group_name]
1449
      depends_fn = lambda: None
1450
    else:
1451
      groups = self.cfg.GetNodeGroupList()
1452

    
1453
      # Verify global configuration
1454
      jobs.append([
1455
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1456
        ])
1457

    
1458
      # Always depend on global verification
1459
      depends_fn = lambda: [(-len(jobs), [])]
1460

    
1461
    jobs.extend(
1462
      [opcodes.OpClusterVerifyGroup(group_name=group,
1463
                                    ignore_errors=self.op.ignore_errors,
1464
                                    depends=depends_fn())]
1465
      for group in groups)
1466

    
1467
    # Fix up all parameters
1468
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1469
      op.debug_simulate_errors = self.op.debug_simulate_errors
1470
      op.verbose = self.op.verbose
1471
      op.error_codes = self.op.error_codes
1472
      try:
1473
        op.skip_checks = self.op.skip_checks
1474
      except AttributeError:
1475
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1476

    
1477
    return ResultWithJobs(jobs)
1478

    
1479

    
1480
class _VerifyErrors(object):
1481
  """Mix-in for cluster/group verify LUs.
1482

1483
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1484
  self.op and self._feedback_fn to be available.)
1485

1486
  """
1487

    
1488
  ETYPE_FIELD = "code"
1489
  ETYPE_ERROR = constants.CV_ERROR
1490
  ETYPE_WARNING = constants.CV_WARNING
1491

    
1492
  def _Error(self, ecode, item, msg, *args, **kwargs):
1493
    """Format an error message.
1494

1495
    Based on the opcode's error_codes parameter, either format a
1496
    parseable error code, or a simpler error string.
1497

1498
    This must be called only from Exec and functions called from Exec.
1499

1500
    """
1501
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1502
    itype, etxt, _ = ecode
1503
    # If the error code is in the list of ignored errors, demote the error to a
1504
    # warning
1505
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1506
      ltype = self.ETYPE_WARNING
1507
    # first complete the msg
1508
    if args:
1509
      msg = msg % args
1510
    # then format the whole message
1511
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1512
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1513
    else:
1514
      if item:
1515
        item = " " + item
1516
      else:
1517
        item = ""
1518
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1519
    # and finally report it via the feedback_fn
1520
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1521
    # do not mark the operation as failed for WARN cases only
1522
    if ltype == self.ETYPE_ERROR:
1523
      self.bad = True
1524

    
1525
  def _ErrorIf(self, cond, *args, **kwargs):
1526
    """Log an error message if the passed condition is True.
1527

1528
    """
1529
    if (bool(cond)
1530
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1531
      self._Error(*args, **kwargs)
1532

    
1533

    
1534
def _GetAllHypervisorParameters(cluster, instances):
1535
  """Compute the set of all hypervisor parameters.
1536

1537
  @type cluster: L{objects.Cluster}
1538
  @param cluster: the cluster object
1539
  @param instances: list of L{objects.Instance}
1540
  @param instances: additional instances from which to obtain parameters
1541
  @rtype: list of (origin, hypervisor, parameters)
1542
  @return: a list with all parameters found, indicating the hypervisor they
1543
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1544

1545
  """
1546
  hvp_data = []
1547

    
1548
  for hv_name in cluster.enabled_hypervisors:
1549
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1550

    
1551
  for os_name, os_hvp in cluster.os_hvp.items():
1552
    for hv_name, hv_params in os_hvp.items():
1553
      if hv_params:
1554
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1555
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1556

    
1557
  # TODO: collapse identical parameter values in a single one
1558
  for instance in instances:
1559
    if instance.hvparams:
1560
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1561
                       cluster.FillHV(instance)))
1562

    
1563
  return hvp_data
1564

    
1565

    
1566
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1567
  """Verifies the cluster config.
1568

1569
  """
1570
  REQ_BGL = False
1571

    
1572
  def _VerifyHVP(self, hvp_data):
1573
    """Verifies locally the syntax of the hypervisor parameters.
1574

1575
    """
1576
    for item, hv_name, hv_params in hvp_data:
1577
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1578
             (item, hv_name))
1579
      try:
1580
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1581
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1582
        hv_class.CheckParameterSyntax(hv_params)
1583
      except errors.GenericError, err:
1584
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1585

    
1586
  def ExpandNames(self):
1587
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1588
    self.share_locks = ShareAll()
1589

    
1590
  def CheckPrereq(self):
1591
    """Check prerequisites.
1592

1593
    """
1594
    # Retrieve all information
1595
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1596
    self.all_node_info = self.cfg.GetAllNodesInfo()
1597
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1598

    
1599
  def Exec(self, feedback_fn):
1600
    """Verify integrity of cluster, performing various test on nodes.
1601

1602
    """
1603
    self.bad = False
1604
    self._feedback_fn = feedback_fn
1605

    
1606
    feedback_fn("* Verifying cluster config")
1607

    
1608
    for msg in self.cfg.VerifyConfig():
1609
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1610

    
1611
    feedback_fn("* Verifying cluster certificate files")
1612

    
1613
    for cert_filename in pathutils.ALL_CERT_FILES:
1614
      (errcode, msg) = utils.VerifyCertificate(cert_filename)
1615
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1616

    
1617
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1618
                                    pathutils.NODED_CERT_FILE),
1619
                  constants.CV_ECLUSTERCERT,
1620
                  None,
1621
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1622
                    constants.LUXID_USER + " user")
1623

    
1624
    feedback_fn("* Verifying hypervisor parameters")
1625

    
1626
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1627
                                                self.all_inst_info.values()))
1628

    
1629
    feedback_fn("* Verifying all nodes belong to an existing group")
1630

    
1631
    # We do this verification here because, should this bogus circumstance
1632
    # occur, it would never be caught by VerifyGroup, which only acts on
1633
    # nodes/instances reachable from existing node groups.
1634

    
1635
    dangling_nodes = set(node for node in self.all_node_info.values()
1636
                         if node.group not in self.all_group_info)
1637

    
1638
    dangling_instances = {}
1639
    no_node_instances = []
1640

    
1641
    for inst in self.all_inst_info.values():
1642
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1643
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1644
      elif inst.primary_node not in self.all_node_info:
1645
        no_node_instances.append(inst)
1646

    
1647
    pretty_dangling = [
1648
        "%s (%s)" %
1649
        (node.name,
1650
         utils.CommaJoin(inst.name for
1651
                         inst in dangling_instances.get(node.uuid, [])))
1652
        for node in dangling_nodes]
1653

    
1654
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1655
                  None,
1656
                  "the following nodes (and their instances) belong to a non"
1657
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1658

    
1659
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1660
                  None,
1661
                  "the following instances have a non-existing primary-node:"
1662
                  " %s", utils.CommaJoin(inst.name for
1663
                                         inst in no_node_instances))
1664

    
1665
    return not self.bad
1666

    
1667

    
1668
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1669
  """Verifies the status of a node group.
1670

1671
  """
1672
  HPATH = "cluster-verify"
1673
  HTYPE = constants.HTYPE_CLUSTER
1674
  REQ_BGL = False
1675

    
1676
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1677

    
1678
  class NodeImage(object):
1679
    """A class representing the logical and physical status of a node.
1680

1681
    @type uuid: string
1682
    @ivar uuid: the node UUID to which this object refers
1683
    @ivar volumes: a structure as returned from
1684
        L{ganeti.backend.GetVolumeList} (runtime)
1685
    @ivar instances: a list of running instances (runtime)
1686
    @ivar pinst: list of configured primary instances (config)
1687
    @ivar sinst: list of configured secondary instances (config)
1688
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1689
        instances for which this node is secondary (config)
1690
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1691
    @ivar dfree: free disk, as reported by the node (runtime)
1692
    @ivar offline: the offline status (config)
1693
    @type rpc_fail: boolean
1694
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1695
        not whether the individual keys were correct) (runtime)
1696
    @type lvm_fail: boolean
1697
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1698
    @type hyp_fail: boolean
1699
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1700
    @type ghost: boolean
1701
    @ivar ghost: whether this is a known node or not (config)
1702
    @type os_fail: boolean
1703
    @ivar os_fail: whether the RPC call didn't return valid OS data
1704
    @type oslist: list
1705
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1706
    @type vm_capable: boolean
1707
    @ivar vm_capable: whether the node can host instances
1708
    @type pv_min: float
1709
    @ivar pv_min: size in MiB of the smallest PVs
1710
    @type pv_max: float
1711
    @ivar pv_max: size in MiB of the biggest PVs
1712

1713
    """
1714
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1715
      self.uuid = uuid
1716
      self.volumes = {}
1717
      self.instances = []
1718
      self.pinst = []
1719
      self.sinst = []
1720
      self.sbp = {}
1721
      self.mfree = 0
1722
      self.dfree = 0
1723
      self.offline = offline
1724
      self.vm_capable = vm_capable
1725
      self.rpc_fail = False
1726
      self.lvm_fail = False
1727
      self.hyp_fail = False
1728
      self.ghost = False
1729
      self.os_fail = False
1730
      self.oslist = {}
1731
      self.pv_min = None
1732
      self.pv_max = None
1733

    
1734
  def ExpandNames(self):
1735
    # This raises errors.OpPrereqError on its own:
1736
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1737

    
1738
    # Get instances in node group; this is unsafe and needs verification later
1739
    inst_uuids = \
1740
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1741

    
1742
    self.needed_locks = {
1743
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1744
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1745
      locking.LEVEL_NODE: [],
1746

    
1747
      # This opcode is run by watcher every five minutes and acquires all nodes
1748
      # for a group. It doesn't run for a long time, so it's better to acquire
1749
      # the node allocation lock as well.
1750
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1751
      }
1752

    
1753
    self.share_locks = ShareAll()
1754

    
1755
  def DeclareLocks(self, level):
1756
    if level == locking.LEVEL_NODE:
1757
      # Get members of node group; this is unsafe and needs verification later
1758
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1759

    
1760
      # In Exec(), we warn about mirrored instances that have primary and
1761
      # secondary living in separate node groups. To fully verify that
1762
      # volumes for these instances are healthy, we will need to do an
1763
      # extra call to their secondaries. We ensure here those nodes will
1764
      # be locked.
1765
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1766
        # Important: access only the instances whose lock is owned
1767
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1768
        if instance.disk_template in constants.DTS_INT_MIRROR:
1769
          nodes.update(instance.secondary_nodes)
1770

    
1771
      self.needed_locks[locking.LEVEL_NODE] = nodes
1772

    
1773
  def CheckPrereq(self):
1774
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1775
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1776

    
1777
    group_node_uuids = set(self.group_info.members)
1778
    group_inst_uuids = \
1779
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1780

    
1781
    unlocked_node_uuids = \
1782
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1783

    
1784
    unlocked_inst_uuids = \
1785
        group_inst_uuids.difference(
1786
          [self.cfg.GetInstanceInfoByName(name).uuid
1787
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1788

    
1789
    if unlocked_node_uuids:
1790
      raise errors.OpPrereqError(
1791
        "Missing lock for nodes: %s" %
1792
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1793
        errors.ECODE_STATE)
1794

    
1795
    if unlocked_inst_uuids:
1796
      raise errors.OpPrereqError(
1797
        "Missing lock for instances: %s" %
1798
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1799
        errors.ECODE_STATE)
1800

    
1801
    self.all_node_info = self.cfg.GetAllNodesInfo()
1802
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1803

    
1804
    self.my_node_uuids = group_node_uuids
1805
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1806
                             for node_uuid in group_node_uuids)
1807

    
1808
    self.my_inst_uuids = group_inst_uuids
1809
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1810
                             for inst_uuid in group_inst_uuids)
1811

    
1812
    # We detect here the nodes that will need the extra RPC calls for verifying
1813
    # split LV volumes; they should be locked.
1814
    extra_lv_nodes = set()
1815

    
1816
    for inst in self.my_inst_info.values():
1817
      if inst.disk_template in constants.DTS_INT_MIRROR:
1818
        for nuuid in inst.all_nodes:
1819
          if self.all_node_info[nuuid].group != self.group_uuid:
1820
            extra_lv_nodes.add(nuuid)
1821

    
1822
    unlocked_lv_nodes = \
1823
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1824

    
1825
    if unlocked_lv_nodes:
1826
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1827
                                 utils.CommaJoin(unlocked_lv_nodes),
1828
                                 errors.ECODE_STATE)
1829
    self.extra_lv_nodes = list(extra_lv_nodes)
1830

    
1831
  def _VerifyNode(self, ninfo, nresult):
1832
    """Perform some basic validation on data returned from a node.
1833

1834
      - check the result data structure is well formed and has all the
1835
        mandatory fields
1836
      - check ganeti version
1837

1838
    @type ninfo: L{objects.Node}
1839
    @param ninfo: the node to check
1840
    @param nresult: the results from the node
1841
    @rtype: boolean
1842
    @return: whether overall this call was successful (and we can expect
1843
         reasonable values in the respose)
1844

1845
    """
1846
    # main result, nresult should be a non-empty dict
1847
    test = not nresult or not isinstance(nresult, dict)
1848
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1849
                  "unable to verify node: no data returned")
1850
    if test:
1851
      return False
1852

    
1853
    # compares ganeti version
1854
    local_version = constants.PROTOCOL_VERSION
1855
    remote_version = nresult.get("version", None)
1856
    test = not (remote_version and
1857
                isinstance(remote_version, (list, tuple)) and
1858
                len(remote_version) == 2)
1859
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1860
                  "connection to node returned invalid data")
1861
    if test:
1862
      return False
1863

    
1864
    test = local_version != remote_version[0]
1865
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1866
                  "incompatible protocol versions: master %s,"
1867
                  " node %s", local_version, remote_version[0])
1868
    if test:
1869
      return False
1870

    
1871
    # node seems compatible, we can actually try to look into its results
1872

    
1873
    # full package version
1874
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1875
                  constants.CV_ENODEVERSION, ninfo.name,
1876
                  "software version mismatch: master %s, node %s",
1877
                  constants.RELEASE_VERSION, remote_version[1],
1878
                  code=self.ETYPE_WARNING)
1879

    
1880
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1881
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1882
      for hv_name, hv_result in hyp_result.iteritems():
1883
        test = hv_result is not None
1884
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1885
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1886

    
1887
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1888
    if ninfo.vm_capable and isinstance(hvp_result, list):
1889
      for item, hv_name, hv_result in hvp_result:
1890
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1891
                      "hypervisor %s parameter verify failure (source %s): %s",
1892
                      hv_name, item, hv_result)
1893

    
1894
    test = nresult.get(constants.NV_NODESETUP,
1895
                       ["Missing NODESETUP results"])
1896
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1897
                  "node setup error: %s", "; ".join(test))
1898

    
1899
    return True
1900

    
1901
  def _VerifyNodeTime(self, ninfo, nresult,
1902
                      nvinfo_starttime, nvinfo_endtime):
1903
    """Check the node time.
1904

1905
    @type ninfo: L{objects.Node}
1906
    @param ninfo: the node to check
1907
    @param nresult: the remote results for the node
1908
    @param nvinfo_starttime: the start time of the RPC call
1909
    @param nvinfo_endtime: the end time of the RPC call
1910

1911
    """
1912
    ntime = nresult.get(constants.NV_TIME, None)
1913
    try:
1914
      ntime_merged = utils.MergeTime(ntime)
1915
    except (ValueError, TypeError):
1916
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1917
                    "Node returned invalid time")
1918
      return
1919

    
1920
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1921
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1922
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1923
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1924
    else:
1925
      ntime_diff = None
1926

    
1927
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1928
                  "Node time diverges by at least %s from master node time",
1929
                  ntime_diff)
1930

    
1931
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1932
    """Check the node LVM results and update info for cross-node checks.
1933

1934
    @type ninfo: L{objects.Node}
1935
    @param ninfo: the node to check
1936
    @param nresult: the remote results for the node
1937
    @param vg_name: the configured VG name
1938
    @type nimg: L{NodeImage}
1939
    @param nimg: node image
1940

1941
    """
1942
    if vg_name is None:
1943
      return
1944

    
1945
    # checks vg existence and size > 20G
1946
    vglist = nresult.get(constants.NV_VGLIST, None)
1947
    test = not vglist
1948
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1949
                  "unable to check volume groups")
1950
    if not test:
1951
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1952
                                            constants.MIN_VG_SIZE)
1953
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1954

    
1955
    # Check PVs
1956
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1957
    for em in errmsgs:
1958
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1959
    if pvminmax is not None:
1960
      (nimg.pv_min, nimg.pv_max) = pvminmax
1961

    
1962
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1963
    """Check cross-node DRBD version consistency.
1964

1965
    @type node_verify_infos: dict
1966
    @param node_verify_infos: infos about nodes as returned from the
1967
      node_verify call.
1968

1969
    """
1970
    node_versions = {}
1971
    for node_uuid, ndata in node_verify_infos.items():
1972
      nresult = ndata.payload
1973
      if nresult:
1974
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1975
        node_versions[node_uuid] = version
1976

    
1977
    if len(set(node_versions.values())) > 1:
1978
      for node_uuid, version in sorted(node_versions.items()):
1979
        msg = "DRBD version mismatch: %s" % version
1980
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1981
                    code=self.ETYPE_WARNING)
1982

    
1983
  def _VerifyGroupLVM(self, node_image, vg_name):
1984
    """Check cross-node consistency in LVM.
1985

1986
    @type node_image: dict
1987
    @param node_image: info about nodes, mapping from node to names to
1988
      L{NodeImage} objects
1989
    @param vg_name: the configured VG name
1990

1991
    """
1992
    if vg_name is None:
1993
      return
1994

    
1995
    # Only exclusive storage needs this kind of checks
1996
    if not self._exclusive_storage:
1997
      return
1998

    
1999
    # exclusive_storage wants all PVs to have the same size (approximately),
2000
    # if the smallest and the biggest ones are okay, everything is fine.
2001
    # pv_min is None iff pv_max is None
2002
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
2003
    if not vals:
2004
      return
2005
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
2006
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
2007
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
2008
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
2009
                  "PV sizes differ too much in the group; smallest (%s MB) is"
2010
                  " on %s, biggest (%s MB) is on %s",
2011
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
2012
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
2013

    
2014
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
2015
    """Check the node bridges.
2016

2017
    @type ninfo: L{objects.Node}
2018
    @param ninfo: the node to check
2019
    @param nresult: the remote results for the node
2020
    @param bridges: the expected list of bridges
2021

2022
    """
2023
    if not bridges:
2024
      return
2025

    
2026
    missing = nresult.get(constants.NV_BRIDGES, None)
2027
    test = not isinstance(missing, list)
2028
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2029
                  "did not return valid bridge information")
2030
    if not test:
2031
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
2032
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
2033

    
2034
  def _VerifyNodeUserScripts(self, ninfo, nresult):
2035
    """Check the results of user scripts presence and executability on the node
2036

2037
    @type ninfo: L{objects.Node}
2038
    @param ninfo: the node to check
2039
    @param nresult: the remote results for the node
2040

2041
    """
2042
    test = not constants.NV_USERSCRIPTS in nresult
2043
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2044
                  "did not return user scripts information")
2045

    
2046
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2047
    if not test:
2048
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2049
                    "user scripts not present or not executable: %s" %
2050
                    utils.CommaJoin(sorted(broken_scripts)))
2051

    
2052
  def _VerifyNodeNetwork(self, ninfo, nresult):
2053
    """Check the node network connectivity results.
2054

2055
    @type ninfo: L{objects.Node}
2056
    @param ninfo: the node to check
2057
    @param nresult: the remote results for the node
2058

2059
    """
2060
    test = constants.NV_NODELIST not in nresult
2061
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2062
                  "node hasn't returned node ssh connectivity data")
2063
    if not test:
2064
      if nresult[constants.NV_NODELIST]:
2065
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2066
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2067
                        "ssh communication with node '%s': %s", a_node, a_msg)
2068

    
2069
    test = constants.NV_NODENETTEST not in nresult
2070
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2071
                  "node hasn't returned node tcp connectivity data")
2072
    if not test:
2073
      if nresult[constants.NV_NODENETTEST]:
2074
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2075
        for anode in nlist:
2076
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2077
                        "tcp communication with node '%s': %s",
2078
                        anode, nresult[constants.NV_NODENETTEST][anode])
2079

    
2080
    test = constants.NV_MASTERIP not in nresult
2081
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2082
                  "node hasn't returned node master IP reachability data")
2083
    if not test:
2084
      if not nresult[constants.NV_MASTERIP]:
2085
        if ninfo.uuid == self.master_node:
2086
          msg = "the master node cannot reach the master IP (not configured?)"
2087
        else:
2088
          msg = "cannot reach the master IP"
2089
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2090

    
2091
  def _VerifyInstance(self, instance, node_image, diskstatus):
2092
    """Verify an instance.
2093

2094
    This function checks to see if the required block devices are
2095
    available on the instance's node, and that the nodes are in the correct
2096
    state.
2097

2098
    """
2099
    pnode_uuid = instance.primary_node
2100
    pnode_img = node_image[pnode_uuid]
2101
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2102

    
2103
    node_vol_should = {}
2104
    instance.MapLVsByNode(node_vol_should)
2105

    
2106
    cluster = self.cfg.GetClusterInfo()
2107
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2108
                                                            self.group_info)
2109
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2110
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2111
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2112

    
2113
    for node_uuid in node_vol_should:
2114
      n_img = node_image[node_uuid]
2115
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2116
        # ignore missing volumes on offline or broken nodes
2117
        continue
2118
      for volume in node_vol_should[node_uuid]:
2119
        test = volume not in n_img.volumes
2120
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2121
                      "volume %s missing on node %s", volume,
2122
                      self.cfg.GetNodeName(node_uuid))
2123

    
2124
    if instance.admin_state == constants.ADMINST_UP:
2125
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2126
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2127
                    "instance not running on its primary node %s",
2128
                     self.cfg.GetNodeName(pnode_uuid))
2129
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2130
                    instance.name, "instance is marked as running and lives on"
2131
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2132

    
2133
    diskdata = [(nname, success, status, idx)
2134
                for (nname, disks) in diskstatus.items()
2135
                for idx, (success, status) in enumerate(disks)]
2136

    
2137
    for nname, success, bdev_status, idx in diskdata:
2138
      # the 'ghost node' construction in Exec() ensures that we have a
2139
      # node here
2140
      snode = node_image[nname]
2141
      bad_snode = snode.ghost or snode.offline
2142
      self._ErrorIf(instance.disks_active and
2143
                    not success and not bad_snode,
2144
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2145
                    "couldn't retrieve status for disk/%s on %s: %s",
2146
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2147

    
2148
      if instance.disks_active and success and \
2149
         (bdev_status.is_degraded or
2150
          bdev_status.ldisk_status != constants.LDS_OKAY):
2151
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2152
        if bdev_status.is_degraded:
2153
          msg += " is degraded"
2154
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2155
          msg += "; state is '%s'" % \
2156
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2157

    
2158
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2159

    
2160
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2161
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2162
                  "instance %s, connection to primary node failed",
2163
                  instance.name)
2164

    
2165
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2166
                  constants.CV_EINSTANCELAYOUT, instance.name,
2167
                  "instance has multiple secondary nodes: %s",
2168
                  utils.CommaJoin(instance.secondary_nodes),
2169
                  code=self.ETYPE_WARNING)
2170

    
2171
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2172
    if any(es_flags.values()):
2173
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2174
        # Disk template not compatible with exclusive_storage: no instance
2175
        # node should have the flag set
2176
        es_nodes = [n
2177
                    for (n, es) in es_flags.items()
2178
                    if es]
2179
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2180
                    "instance has template %s, which is not supported on nodes"
2181
                    " that have exclusive storage set: %s",
2182
                    instance.disk_template,
2183
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2184
      for (idx, disk) in enumerate(instance.disks):
2185
        self._ErrorIf(disk.spindles is None,
2186
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2187
                      "number of spindles not configured for disk %s while"
2188
                      " exclusive storage is enabled, try running"
2189
                      " gnt-cluster repair-disk-sizes", idx)
2190

    
2191
    if instance.disk_template in constants.DTS_INT_MIRROR:
2192
      instance_nodes = utils.NiceSort(instance.all_nodes)
2193
      instance_groups = {}
2194

    
2195
      for node_uuid in instance_nodes:
2196
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2197
                                   []).append(node_uuid)
2198

    
2199
      pretty_list = [
2200
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2201
                           groupinfo[group].name)
2202
        # Sort so that we always list the primary node first.
2203
        for group, nodes in sorted(instance_groups.items(),
2204
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2205
                                   reverse=True)]
2206

    
2207
      self._ErrorIf(len(instance_groups) > 1,
2208
                    constants.CV_EINSTANCESPLITGROUPS,
2209
                    instance.name, "instance has primary and secondary nodes in"
2210
                    " different groups: %s", utils.CommaJoin(pretty_list),
2211
                    code=self.ETYPE_WARNING)
2212

    
2213
    inst_nodes_offline = []
2214
    for snode in instance.secondary_nodes:
2215
      s_img = node_image[snode]
2216
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2217
                    self.cfg.GetNodeName(snode),
2218
                    "instance %s, connection to secondary node failed",
2219
                    instance.name)
2220

    
2221
      if s_img.offline:
2222
        inst_nodes_offline.append(snode)
2223

    
2224
    # warn that the instance lives on offline nodes
2225
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2226
                  instance.name, "instance has offline secondary node(s) %s",
2227
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2228
    # ... or ghost/non-vm_capable nodes
2229
    for node_uuid in instance.all_nodes:
2230
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2231
                    instance.name, "instance lives on ghost node %s",
2232
                    self.cfg.GetNodeName(node_uuid))
2233
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2234
                    constants.CV_EINSTANCEBADNODE, instance.name,
2235
                    "instance lives on non-vm_capable node %s",
2236
                    self.cfg.GetNodeName(node_uuid))
2237

    
2238
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2239
    """Verify if there are any unknown volumes in the cluster.
2240

2241
    The .os, .swap and backup volumes are ignored. All other volumes are
2242
    reported as unknown.
2243

2244
    @type reserved: L{ganeti.utils.FieldSet}
2245
    @param reserved: a FieldSet of reserved volume names
2246

2247
    """
2248
    for node_uuid, n_img in node_image.items():
2249
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2250
          self.all_node_info[node_uuid].group != self.group_uuid):
2251
        # skip non-healthy nodes
2252
        continue
2253
      for volume in n_img.volumes:
2254
        test = ((node_uuid not in node_vol_should or
2255
                volume not in node_vol_should[node_uuid]) and
2256
                not reserved.Matches(volume))
2257
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2258
                      self.cfg.GetNodeName(node_uuid),
2259
                      "volume %s is unknown", volume,
2260
                      code=_VerifyErrors.ETYPE_WARNING)
2261

    
2262
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2263
    """Verify N+1 Memory Resilience.
2264

2265
    Check that if one single node dies we can still start all the
2266
    instances it was primary for.
2267

2268
    """
2269
    cluster_info = self.cfg.GetClusterInfo()
2270
    for node_uuid, n_img in node_image.items():
2271
      # This code checks that every node which is now listed as
2272
      # secondary has enough memory to host all instances it is
2273
      # supposed to should a single other node in the cluster fail.
2274
      # FIXME: not ready for failover to an arbitrary node
2275
      # FIXME: does not support file-backed instances
2276
      # WARNING: we currently take into account down instances as well
2277
      # as up ones, considering that even if they're down someone
2278
      # might want to start them even in the event of a node failure.
2279
      if n_img.offline or \
2280
         self.all_node_info[node_uuid].group != self.group_uuid:
2281
        # we're skipping nodes marked offline and nodes in other groups from
2282
        # the N+1 warning, since most likely we don't have good memory
2283
        # information from them; we already list instances living on such
2284
        # nodes, and that's enough warning
2285
        continue
2286
      #TODO(dynmem): also consider ballooning out other instances
2287
      for prinode, inst_uuids in n_img.sbp.items():
2288
        needed_mem = 0
2289
        for inst_uuid in inst_uuids:
2290
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2291
          if bep[constants.BE_AUTO_BALANCE]:
2292
            needed_mem += bep[constants.BE_MINMEM]
2293
        test = n_img.mfree < needed_mem
2294
        self._ErrorIf(test, constants.CV_ENODEN1,
2295
                      self.cfg.GetNodeName(node_uuid),
2296
                      "not enough memory to accomodate instance failovers"
2297
                      " should node %s fail (%dMiB needed, %dMiB available)",
2298
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2299

    
2300
  def _VerifyClientCertificates(self, nodes, all_nvinfo):
2301
    """Verifies the consistency of the client certificates.
2302

2303
    This includes several aspects:
2304
      - the individual validation of all nodes' certificates
2305
      - the consistency of the master candidate certificate map
2306
      - the consistency of the master candidate certificate map with the
2307
        certificates that the master candidates are actually using.
2308

2309
    @param nodes: the list of nodes to consider in this verification
2310
    @param all_nvinfo: the map of results of the verify_node call to
2311
      all nodes
2312

2313
    """
2314
    candidate_certs = self.cfg.GetClusterInfo().candidate_certs
2315
    if candidate_certs is None or len(candidate_certs) == 0:
2316
      self._ErrorIf(
2317
        True, constants.CV_ECLUSTERCLIENTCERT, None,
2318
        "The cluster's list of master candidate certificates is empty."
2319
        "If you just updated the cluster, please run"
2320
        " 'gnt-cluster renew-crypto --new-node-certificates'.")
2321
      return
2322

    
2323
    self._ErrorIf(
2324
      len(candidate_certs) != len(set(candidate_certs.values())),
2325
      constants.CV_ECLUSTERCLIENTCERT, None,
2326
      "There are at least two master candidates configured to use the same"
2327
      " certificate.")
2328

    
2329
    # collect the client certificate
2330
    for node in nodes:
2331
      if node.offline:
2332
        continue
2333

    
2334
      nresult = all_nvinfo[node.uuid]
2335
      if nresult.fail_msg or not nresult.payload:
2336
        continue
2337

    
2338
      (errcode, msg) = nresult.payload.get(constants.NV_CLIENT_CERT, None)
2339

    
2340
      self._ErrorIf(
2341
        errcode is not None, constants.CV_ECLUSTERCLIENTCERT, None,
2342
        "Client certificate of node '%s' failed validation: %s (code '%s')",
2343
        node.uuid, msg, errcode)
2344

    
2345
      if not errcode:
2346
        digest = msg
2347
        if node.master_candidate:
2348
          if node.uuid in candidate_certs:
2349
            self._ErrorIf(
2350
              digest != candidate_certs[node.uuid],
2351
              constants.CV_ECLUSTERCLIENTCERT, None,
2352
              "Client certificate digest of master candidate '%s' does not"
2353
              " match its entry in the cluster's map of master candidate"
2354
              " certificates. Expected: %s Got: %s", node.uuid,
2355
              digest, candidate_certs[node.uuid])
2356
          else:
2357
            self._ErrorIf(
2358
              True, constants.CV_ECLUSTERCLIENTCERT, None,
2359
              "The master candidate '%s' does not have an entry in the"
2360
              " map of candidate certificates.", node.uuid)
2361
            self._ErrorIf(
2362
              digest in candidate_certs.values(),
2363
              constants.CV_ECLUSTERCLIENTCERT, None,
2364
              "Master candidate '%s' is using a certificate of another node.",
2365
              node.uuid)
2366
        else:
2367
          self._ErrorIf(
2368
            node.uuid in candidate_certs,
2369
            constants.CV_ECLUSTERCLIENTCERT, None,
2370
            "Node '%s' is not a master candidate, but still listed in the"
2371
            " map of master candidate certificates.", node.uuid)
2372
          self._ErrorIf(
2373
            (node.uuid not in candidate_certs) and
2374
              (digest in candidate_certs.values()),
2375
            constants.CV_ECLUSTERCLIENTCERT, None,
2376
            "Node '%s' is not a master candidate and is incorrectly using a"
2377
            " certificate of another node which is master candidate.",
2378
            node.uuid)
2379

    
2380
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2381
                   (files_all, files_opt, files_mc, files_vm)):
2382
    """Verifies file checksums collected from all nodes.
2383

2384
    @param nodes: List of L{objects.Node} objects
2385
    @param master_node_uuid: UUID of master node
2386
    @param all_nvinfo: RPC results
2387

2388
    """
2389
    # Define functions determining which nodes to consider for a file
2390
    files2nodefn = [
2391
      (files_all, None),
2392
      (files_mc, lambda node: (node.master_candidate or
2393
                               node.uuid == master_node_uuid)),
2394
      (files_vm, lambda node: node.vm_capable),
2395
      ]
2396

    
2397
    # Build mapping from filename to list of nodes which should have the file
2398
    nodefiles = {}
2399
    for (files, fn) in files2nodefn:
2400
      if fn is None:
2401
        filenodes = nodes
2402
      else:
2403
        filenodes = filter(fn, nodes)
2404
      nodefiles.update((filename,
2405
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2406
                       for filename in files)
2407

    
2408
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2409

    
2410
    fileinfo = dict((filename, {}) for filename in nodefiles)
2411
    ignore_nodes = set()
2412

    
2413
    for node in nodes:
2414
      if node.offline:
2415
        ignore_nodes.add(node.uuid)
2416
        continue
2417

    
2418
      nresult = all_nvinfo[node.uuid]
2419

    
2420
      if nresult.fail_msg or not nresult.payload:
2421
        node_files = None
2422
      else:
2423
        fingerprints = nresult.payload.get(constants.NV_FILELIST, {})
2424
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2425
                          for (key, value) in fingerprints.items())
2426
        del fingerprints
2427

    
2428
      test = not (node_files and isinstance(node_files, dict))
2429
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2430
                    "Node did not return file checksum data")
2431
      if test:
2432
        ignore_nodes.add(node.uuid)
2433
        continue
2434

    
2435
      # Build per-checksum mapping from filename to nodes having it
2436
      for (filename, checksum) in node_files.items():
2437
        assert filename in nodefiles
2438
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2439

    
2440
    for (filename, checksums) in fileinfo.items():
2441
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2442

    
2443
      # Nodes having the file
2444
      with_file = frozenset(node_uuid
2445
                            for node_uuids in fileinfo[filename].values()
2446
                            for node_uuid in node_uuids) - ignore_nodes
2447

    
2448
      expected_nodes = nodefiles[filename] - ignore_nodes
2449

    
2450
      # Nodes missing file
2451
      missing_file = expected_nodes - with_file
2452

    
2453
      if filename in files_opt:
2454
        # All or no nodes
2455
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2456
                      constants.CV_ECLUSTERFILECHECK, None,
2457
                      "File %s is optional, but it must exist on all or no"
2458
                      " nodes (not found on %s)",
2459
                      filename,
2460
                      utils.CommaJoin(
2461
                        utils.NiceSort(
2462
                          map(self.cfg.GetNodeName, missing_file))))
2463
      else:
2464
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2465
                      "File %s is missing from node(s) %s", filename,
2466
                      utils.CommaJoin(
2467
                        utils.NiceSort(
2468
                          map(self.cfg.GetNodeName, missing_file))))
2469

    
2470
        # Warn if a node has a file it shouldn't
2471
        unexpected = with_file - expected_nodes
2472
        self._ErrorIf(unexpected,
2473
                      constants.CV_ECLUSTERFILECHECK, None,
2474
                      "File %s should not exist on node(s) %s",
2475
                      filename, utils.CommaJoin(
2476
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2477

    
2478
      # See if there are multiple versions of the file
2479
      test = len(checksums) > 1
2480
      if test:
2481
        variants = ["variant %s on %s" %
2482
                    (idx + 1,
2483
                     utils.CommaJoin(utils.NiceSort(
2484
                       map(self.cfg.GetNodeName, node_uuids))))
2485
                    for (idx, (checksum, node_uuids)) in
2486
                      enumerate(sorted(checksums.items()))]
2487
      else:
2488
        variants = []
2489

    
2490
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2491
                    "File %s found with %s different checksums (%s)",
2492
                    filename, len(checksums), "; ".join(variants))
2493

    
2494
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2495
    """Verify the drbd helper.
2496

2497
    """
2498
    if drbd_helper:
2499
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2500
      test = (helper_result is None)
2501
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2502
                    "no drbd usermode helper returned")
2503
      if helper_result:
2504
        status, payload = helper_result
2505
        test = not status
2506
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2507
                      "drbd usermode helper check unsuccessful: %s", payload)
2508
        test = status and (payload != drbd_helper)
2509
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2510
                      "wrong drbd usermode helper: %s", payload)
2511

    
2512
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2513
                      drbd_map):
2514
    """Verifies and the node DRBD status.
2515

2516
    @type ninfo: L{objects.Node}
2517
    @param ninfo: the node to check
2518
    @param nresult: the remote results for the node
2519
    @param instanceinfo: the dict of instances
2520
    @param drbd_helper: the configured DRBD usermode helper
2521
    @param drbd_map: the DRBD map as returned by
2522
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2523

2524
    """
2525
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2526

    
2527
    # compute the DRBD minors
2528
    node_drbd = {}
2529
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2530
      test = inst_uuid not in instanceinfo
2531
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2532
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2533
        # ghost instance should not be running, but otherwise we
2534
        # don't give double warnings (both ghost instance and
2535
        # unallocated minor in use)
2536
      if test:
2537
        node_drbd[minor] = (inst_uuid, False)
2538
      else:
2539
        instance = instanceinfo[inst_uuid]
2540
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2541

    
2542
    # and now check them
2543
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2544
    test = not isinstance(used_minors, (tuple, list))
2545
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2546
                  "cannot parse drbd status file: %s", str(used_minors))
2547
    if test:
2548
      # we cannot check drbd status
2549
      return
2550

    
2551
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2552
      test = minor not in used_minors and must_exist
2553
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2554
                    "drbd minor %d of instance %s is not active", minor,
2555
                    self.cfg.GetInstanceName(inst_uuid))
2556
    for minor in used_minors:
2557
      test = minor not in node_drbd
2558
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2559
                    "unallocated drbd minor %d is in use", minor)
2560

    
2561
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2562
    """Builds the node OS structures.
2563

2564
    @type ninfo: L{objects.Node}
2565
    @param ninfo: the node to check
2566
    @param nresult: the remote results for the node
2567
    @param nimg: the node image object
2568

2569
    """
2570
    remote_os = nresult.get(constants.NV_OSLIST, None)
2571
    test = (not isinstance(remote_os, list) or
2572
            not compat.all(isinstance(v, list) and len(v) == 7
2573
                           for v in remote_os))
2574

    
2575
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2576
                  "node hasn't returned valid OS data")
2577

    
2578
    nimg.os_fail = test
2579

    
2580
    if test:
2581
      return
2582

    
2583
    os_dict = {}
2584

    
2585
    for (name, os_path, status, diagnose,
2586
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2587

    
2588
      if name not in os_dict:
2589
        os_dict[name] = []
2590

    
2591
      # parameters is a list of lists instead of list of tuples due to
2592
      # JSON lacking a real tuple type, fix it:
2593
      parameters = [tuple(v) for v in parameters]
2594
      os_dict[name].append((os_path, status, diagnose,
2595
                            set(variants), set(parameters), set(api_ver)))
2596

    
2597
    nimg.oslist = os_dict
2598

    
2599
  def _VerifyNodeOS(self, ninfo, nimg, base):
2600
    """Verifies the node OS list.
2601

2602
    @type ninfo: L{objects.Node}
2603
    @param ninfo: the node to check
2604
    @param nimg: the node image object
2605
    @param base: the 'template' node we match against (e.g. from the master)
2606

2607
    """
2608
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2609

    
2610
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2611
    for os_name, os_data in nimg.oslist.items():
2612
      assert os_data, "Empty OS status for OS %s?!" % os_name
2613
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2614
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2615
                    "Invalid OS %s (located at %s): %s",
2616
                    os_name, f_path, f_diag)
2617
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2618
                    "OS '%s' has multiple entries"
2619
                    " (first one shadows the rest): %s",
2620
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2621
      # comparisons with the 'base' image
2622
      test = os_name not in base.oslist
2623
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2624
                    "Extra OS %s not present on reference node (%s)",
2625
                    os_name, self.cfg.GetNodeName(base.uuid))
2626
      if test:
2627
        continue
2628
      assert base.oslist[os_name], "Base node has empty OS status?"
2629
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2630
      if not b_status:
2631
        # base OS is invalid, skipping
2632
        continue
2633
      for kind, a, b in [("API version", f_api, b_api),
2634
                         ("variants list", f_var, b_var),
2635
                         ("parameters", beautify_params(f_param),
2636
                          beautify_params(b_param))]:
2637
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2638
                      "OS %s for %s differs from reference node %s:"
2639
                      " [%s] vs. [%s]", kind, os_name,
2640
                      self.cfg.GetNodeName(base.uuid),
2641
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2642

    
2643
    # check any missing OSes
2644
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2645
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2646
                  "OSes present on reference node %s"
2647
                  " but missing on this node: %s",
2648
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2649

    
2650
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2651
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2652

2653
    @type ninfo: L{objects.Node}
2654
    @param ninfo: the node to check
2655
    @param nresult: the remote results for the node
2656
    @type is_master: bool
2657
    @param is_master: Whether node is the master node
2658

2659
    """
2660
    cluster = self.cfg.GetClusterInfo()
2661
    if (is_master and
2662
        (cluster.IsFileStorageEnabled() or
2663
         cluster.IsSharedFileStorageEnabled())):
2664
      try:
2665
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2666
      except KeyError:
2667
        # This should never happen
2668
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2669
                      "Node did not return forbidden file storage paths")
2670
      else:
2671
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2672
                      "Found forbidden file storage paths: %s",
2673
                      utils.CommaJoin(fspaths))
2674
    else:
2675
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2676
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2677
                    "Node should not have returned forbidden file storage"
2678
                    " paths")
2679

    
2680
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2681
                          verify_key, error_key):
2682
    """Verifies (file) storage paths.
2683

2684
    @type ninfo: L{objects.Node}
2685
    @param ninfo: the node to check
2686
    @param nresult: the remote results for the node
2687
    @type file_disk_template: string
2688
    @param file_disk_template: file-based disk template, whose directory
2689
        is supposed to be verified
2690
    @type verify_key: string
2691
    @param verify_key: key for the verification map of this file
2692
        verification step
2693
    @param error_key: error key to be added to the verification results
2694
        in case something goes wrong in this verification step
2695

2696
    """
2697
    assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
2698
              constants.ST_FILE, constants.ST_SHARED_FILE
2699
           ))
2700

    
2701
    cluster = self.cfg.GetClusterInfo()
2702
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2703
      self._ErrorIf(
2704
          verify_key in nresult,
2705
          error_key, ninfo.name,
2706
          "The configured %s storage path is unusable: %s" %
2707
          (file_disk_template, nresult.get(verify_key)))
2708

    
2709
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2710
    """Verifies (file) storage paths.
2711

2712
    @see: C{_VerifyStoragePaths}
2713

2714
    """
2715
    self._VerifyStoragePaths(
2716
        ninfo, nresult, constants.DT_FILE,
2717
        constants.NV_FILE_STORAGE_PATH,
2718
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2719

    
2720
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2721
    """Verifies (file) storage paths.
2722

2723
    @see: C{_VerifyStoragePaths}
2724

2725
    """
2726
    self._VerifyStoragePaths(
2727
        ninfo, nresult, constants.DT_SHARED_FILE,
2728
        constants.NV_SHARED_FILE_STORAGE_PATH,
2729
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2730

    
2731
  def _VerifyOob(self, ninfo, nresult):
2732
    """Verifies out of band functionality of a node.
2733

2734
    @type ninfo: L{objects.Node}
2735
    @param ninfo: the node to check
2736
    @param nresult: the remote results for the node
2737

2738
    """
2739
    # We just have to verify the paths on master and/or master candidates
2740
    # as the oob helper is invoked on the master
2741
    if ((ninfo.master_candidate or ninfo.master_capable) and
2742
        constants.NV_OOB_PATHS in nresult):
2743
      for path_result in nresult[constants.NV_OOB_PATHS]:
2744
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2745
                      ninfo.name, path_result)
2746

    
2747
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2748
    """Verifies and updates the node volume data.
2749

2750
    This function will update a L{NodeImage}'s internal structures
2751
    with data from the remote call.
2752

2753
    @type ninfo: L{objects.Node}
2754
    @param ninfo: the node to check
2755
    @param nresult: the remote results for the node
2756
    @param nimg: the node image object
2757
    @param vg_name: the configured VG name
2758

2759
    """
2760
    nimg.lvm_fail = True
2761
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2762
    if vg_name is None:
2763
      pass
2764
    elif isinstance(lvdata, basestring):
2765
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2766
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2767
    elif not isinstance(lvdata, dict):
2768
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2769
                    "rpc call to node failed (lvlist)")
2770
    else:
2771
      nimg.volumes = lvdata
2772
      nimg.lvm_fail = False
2773

    
2774
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2775
    """Verifies and updates the node instance list.
2776

2777
    If the listing was successful, then updates this node's instance
2778
    list. Otherwise, it marks the RPC call as failed for the instance
2779
    list key.
2780

2781
    @type ninfo: L{objects.Node}
2782
    @param ninfo: the node to check
2783
    @param nresult: the remote results for the node
2784
    @param nimg: the node image object
2785

2786
    """
2787
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2788
    test = not isinstance(idata, list)
2789
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2790
                  "rpc call to node failed (instancelist): %s",
2791
                  utils.SafeEncode(str(idata)))
2792
    if test:
2793
      nimg.hyp_fail = True
2794
    else:
2795
      nimg.instances = [inst.uuid for (_, inst) in
2796
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2797

    
2798
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2799
    """Verifies and computes a node information map
2800

2801
    @type ninfo: L{objects.Node}
2802
    @param ninfo: the node to check
2803
    @param nresult: the remote results for the node
2804
    @param nimg: the node image object
2805
    @param vg_name: the configured VG name
2806

2807
    """
2808
    # try to read free memory (from the hypervisor)
2809
    hv_info = nresult.get(constants.NV_HVINFO, None)
2810
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2811
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2812
                  "rpc call to node failed (hvinfo)")
2813
    if not test:
2814
      try:
2815
        nimg.mfree = int(hv_info["memory_free"])
2816
      except (ValueError, TypeError):
2817
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2818
                      "node returned invalid nodeinfo, check hypervisor")
2819

    
2820
    # FIXME: devise a free space model for file based instances as well
2821
    if vg_name is not None:
2822
      test = (constants.NV_VGLIST not in nresult or
2823
              vg_name not in nresult[constants.NV_VGLIST])
2824
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2825
                    "node didn't return data for the volume group '%s'"
2826
                    " - it is either missing or broken", vg_name)
2827
      if not test:
2828
        try:
2829
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2830
        except (ValueError, TypeError):
2831
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2832
                        "node returned invalid LVM info, check LVM status")
2833

    
2834
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2835
    """Gets per-disk status information for all instances.
2836

2837
    @type node_uuids: list of strings
2838
    @param node_uuids: Node UUIDs
2839
    @type node_image: dict of (UUID, L{objects.Node})
2840
    @param node_image: Node objects
2841
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2842
    @param instanceinfo: Instance objects
2843
    @rtype: {instance: {node: [(succes, payload)]}}
2844
    @return: a dictionary of per-instance dictionaries with nodes as
2845
        keys and disk information as values; the disk information is a
2846
        list of tuples (success, payload)
2847

2848
    """
2849
    node_disks = {}
2850
    node_disks_dev_inst_only = {}
2851
    diskless_instances = set()
2852
    diskless = constants.DT_DISKLESS
2853

    
2854
    for nuuid in node_uuids:
2855
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2856
                                             node_image[nuuid].sinst))
2857
      diskless_instances.update(uuid for uuid in node_inst_uuids
2858
                                if instanceinfo[uuid].disk_template == diskless)
2859
      disks = [(inst_uuid, disk)
2860
               for inst_uuid in node_inst_uuids
2861
               for disk in instanceinfo[inst_uuid].disks]
2862

    
2863
      if not disks:
2864
        # No need to collect data
2865
        continue
2866

    
2867
      node_disks[nuuid] = disks
2868

    
2869
      # _AnnotateDiskParams makes already copies of the disks
2870
      dev_inst_only = []
2871
      for (inst_uuid, dev) in disks:
2872
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2873
                                          self.cfg)
2874
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2875

    
2876
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2877

    
2878
    assert len(node_disks) == len(node_disks_dev_inst_only)
2879

    
2880
    # Collect data from all nodes with disks
2881
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2882
               node_disks.keys(), node_disks_dev_inst_only)
2883

    
2884
    assert len(result) == len(node_disks)
2885

    
2886
    instdisk = {}
2887

    
2888
    for (nuuid, nres) in result.items():
2889
      node = self.cfg.GetNodeInfo(nuuid)
2890
      disks = node_disks[node.uuid]
2891

    
2892
      if nres.offline:
2893
        # No data from this node
2894
        data = len(disks) * [(False, "node offline")]
2895
      else:
2896
        msg = nres.fail_msg
2897
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2898
                      "while getting disk information: %s", msg)
2899
        if msg:
2900
          # No data from this node
2901
          data = len(disks) * [(False, msg)]
2902
        else:
2903
          data = []
2904
          for idx, i in enumerate(nres.payload):
2905
            if isinstance(i, (tuple, list)) and len(i) == 2:
2906
              data.append(i)
2907
            else:
2908
              logging.warning("Invalid result from node %s, entry %d: %s",
2909
                              node.name, idx, i)
2910
              data.append((False, "Invalid result from the remote node"))
2911

    
2912
      for ((inst_uuid, _), status) in zip(disks, data):
2913
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2914
          .append(status)
2915

    
2916
    # Add empty entries for diskless instances.
2917
    for inst_uuid in diskless_instances:
2918
      assert inst_uuid not in instdisk
2919
      instdisk[inst_uuid] = {}
2920

    
2921
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2922
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2923
                      compat.all(isinstance(s, (tuple, list)) and
2924
                                 len(s) == 2 for s in statuses)
2925
                      for inst, nuuids in instdisk.items()
2926
                      for nuuid, statuses in nuuids.items())
2927
    if __debug__:
2928
      instdisk_keys = set(instdisk)
2929
      instanceinfo_keys = set(instanceinfo)
2930
      assert instdisk_keys == instanceinfo_keys, \
2931
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2932
         (instdisk_keys, instanceinfo_keys))
2933

    
2934
    return instdisk
2935

    
2936
  @staticmethod
2937
  def _SshNodeSelector(group_uuid, all_nodes):
2938
    """Create endless iterators for all potential SSH check hosts.
2939

2940
    """
2941
    nodes = [node for node in all_nodes
2942
             if (node.group != group_uuid and
2943
                 not node.offline)]
2944
    keyfunc = operator.attrgetter("group")
2945

    
2946
    return map(itertools.cycle,
2947
               [sorted(map(operator.attrgetter("name"), names))
2948
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2949
                                                  keyfunc)])
2950

    
2951
  @classmethod
2952
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2953
    """Choose which nodes should talk to which other nodes.
2954

2955
    We will make nodes contact all nodes in their group, and one node from
2956
    every other group.
2957

2958
    @warning: This algorithm has a known issue if one node group is much
2959
      smaller than others (e.g. just one node). In such a case all other
2960
      nodes will talk to the single node.
2961

2962
    """
2963
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2964
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2965

    
2966
    return (online_nodes,
2967
            dict((name, sorted([i.next() for i in sel]))
2968
                 for name in online_nodes))
2969

    
2970
  def BuildHooksEnv(self):
2971
    """Build hooks env.
2972

2973
    Cluster-Verify hooks just ran in the post phase and their failure makes
2974
    the output be logged in the verify output and the verification to fail.
2975

2976
    """
2977
    env = {
2978
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2979
      }
2980

    
2981
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2982
               for node in self.my_node_info.values())
2983

    
2984
    return env
2985

    
2986
  def BuildHooksNodes(self):
2987
    """Build hooks nodes.
2988

2989
    """
2990
    return ([], list(self.my_node_info.keys()))
2991

    
2992
  def Exec(self, feedback_fn):
2993
    """Verify integrity of the node group, performing various test on nodes.
2994

2995
    """
2996
    # This method has too many local variables. pylint: disable=R0914
2997
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2998

    
2999
    if not self.my_node_uuids:
3000
      # empty node group
3001
      feedback_fn("* Empty node group, skipping verification")
3002
      return True
3003

    
3004
    self.bad = False
3005
    verbose = self.op.verbose
3006
    self._feedback_fn = feedback_fn
3007

    
3008
    vg_name = self.cfg.GetVGName()
3009
    drbd_helper = self.cfg.GetDRBDHelper()
3010
    cluster = self.cfg.GetClusterInfo()
3011
    hypervisors = cluster.enabled_hypervisors
3012
    node_data_list = self.my_node_info.values()
3013

    
3014
    i_non_redundant = [] # Non redundant instances
3015
    i_non_a_balanced = [] # Non auto-balanced instances
3016
    i_offline = 0 # Count of offline instances
3017
    n_offline = 0 # Count of offline nodes
3018
    n_drained = 0 # Count of nodes being drained
3019
    node_vol_should = {}
3020

    
3021
    # FIXME: verify OS list
3022

    
3023
    # File verification
3024
    filemap = ComputeAncillaryFiles(cluster, False)
3025

    
3026
    # do local checksums
3027
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
3028
    master_ip = self.cfg.GetMasterIP()
3029

    
3030
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
3031

    
3032
    user_scripts = []
3033
    if self.cfg.GetUseExternalMipScript():
3034
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
3035

    
3036
    node_verify_param = {
3037
      constants.NV_FILELIST:
3038
        map(vcluster.MakeVirtualPath,
3039
            utils.UniqueSequence(filename
3040
                                 for files in filemap
3041
                                 for filename in files)),
3042
      constants.NV_NODELIST:
3043
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
3044
                                  self.all_node_info.values()),
3045
      constants.NV_HYPERVISOR: hypervisors,
3046
      constants.NV_HVPARAMS:
3047
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
3048
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
3049
                                 for node in node_data_list
3050
                                 if not node.offline],
3051
      constants.NV_INSTANCELIST: hypervisors,
3052
      constants.NV_VERSION: None,
3053
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
3054
      constants.NV_NODESETUP: None,
3055
      constants.NV_TIME: None,
3056
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
3057
      constants.NV_OSLIST: None,
3058
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
3059
      constants.NV_USERSCRIPTS: user_scripts,
3060
      constants.NV_CLIENT_CERT: None,
3061
      }
3062

    
3063
    if vg_name is not None:
3064
      node_verify_param[constants.NV_VGLIST] = None
3065
      node_verify_param[constants.NV_LVLIST] = vg_name
3066
      node_verify_param[constants.NV_PVLIST] = [vg_name]
3067

    
3068
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
3069
      if drbd_helper:
3070
        node_verify_param[constants.NV_DRBDVERSION] = None
3071
        node_verify_param[constants.NV_DRBDLIST] = None
3072
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
3073

    
3074
    if cluster.IsFileStorageEnabled() or \
3075
        cluster.IsSharedFileStorageEnabled():
3076
      # Load file storage paths only from master node
3077
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
3078
        self.cfg.GetMasterNodeName()
3079
      if cluster.IsFileStorageEnabled():
3080
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
3081
          cluster.file_storage_dir
3082

    
3083
    # bridge checks
3084
    # FIXME: this needs to be changed per node-group, not cluster-wide
3085
    bridges = set()
3086
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
3087
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3088
      bridges.add(default_nicpp[constants.NIC_LINK])
3089
    for inst_uuid in self.my_inst_info.values():
3090
      for nic in inst_uuid.nics:
3091
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
3092
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3093
          bridges.add(full_nic[constants.NIC_LINK])
3094

    
3095
    if bridges:
3096
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
3097

    
3098
    # Build our expected cluster state
3099
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
3100
                                                 uuid=node.uuid,
3101
                                                 vm_capable=node.vm_capable))
3102
                      for node in node_data_list)
3103

    
3104
    # Gather OOB paths
3105
    oob_paths = []
3106
    for node in self.all_node_info.values():
3107
      path = SupportsOob(self.cfg, node)
3108
      if path and path not in oob_paths:
3109
        oob_paths.append(path)
3110

    
3111
    if oob_paths:
3112
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
3113

    
3114
    for inst_uuid in self.my_inst_uuids:
3115
      instance = self.my_inst_info[inst_uuid]
3116
      if instance.admin_state == constants.ADMINST_OFFLINE:
3117
        i_offline += 1
3118

    
3119
      for nuuid in instance.all_nodes:
3120
        if nuuid not in node_image:
3121
          gnode = self.NodeImage(uuid=nuuid)
3122
          gnode.ghost = (nuuid not in self.all_node_info)
3123
          node_image[nuuid] = gnode
3124

    
3125
      instance.MapLVsByNode(node_vol_should)
3126

    
3127
      pnode = instance.primary_node
3128
      node_image[pnode].pinst.append(instance.uuid)
3129

    
3130
      for snode in instance.secondary_nodes:
3131
        nimg = node_image[snode]
3132
        nimg.sinst.append(instance.uuid)
3133
        if pnode not in nimg.sbp:
3134
          nimg.sbp[pnode] = []
3135
        nimg.sbp[pnode].append(instance.uuid)
3136

    
3137
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3138
                                               self.my_node_info.keys())
3139
    # The value of exclusive_storage should be the same across the group, so if
3140
    # it's True for at least a node, we act as if it were set for all the nodes
3141
    self._exclusive_storage = compat.any(es_flags.values())
3142
    if self._exclusive_storage:
3143
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3144

    
3145
    node_group_uuids = dict(map(lambda n: (n.name, n.group),
3146
                                self.cfg.GetAllNodesInfo().values()))
3147
    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
3148

    
3149
    # At this point, we have the in-memory data structures complete,
3150
    # except for the runtime information, which we'll gather next
3151

    
3152
    # Due to the way our RPC system works, exact response times cannot be
3153
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3154
    # time before and after executing the request, we can at least have a time
3155
    # window.
3156
    nvinfo_starttime = time.time()
3157
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3158
                                           node_verify_param,
3159
                                           self.cfg.GetClusterName(),
3160
                                           self.cfg.GetClusterInfo().hvparams,
3161
                                           node_group_uuids,
3162
                                           groups_config)
3163
    nvinfo_endtime = time.time()
3164

    
3165
    if self.extra_lv_nodes and vg_name is not None:
3166
      extra_lv_nvinfo = \
3167
          self.rpc.call_node_verify(self.extra_lv_nodes,
3168
                                    {constants.NV_LVLIST: vg_name},
3169
                                    self.cfg.GetClusterName(),
3170
                                    self.cfg.GetClusterInfo().hvparams,
3171
                                    node_group_uuids,
3172
                                    groups_config)
3173
    else:
3174
      extra_lv_nvinfo = {}
3175

    
3176
    all_drbd_map = self.cfg.ComputeDRBDMap()
3177

    
3178
    feedback_fn("* Gathering disk information (%s nodes)" %
3179
                len(self.my_node_uuids))
3180
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3181
                                     self.my_inst_info)
3182

    
3183
    feedback_fn("* Verifying configuration file consistency")
3184

    
3185
    # If not all nodes are being checked, we need to make sure the master node
3186
    # and a non-checked vm_capable node are in the list.
3187
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3188
    if absent_node_uuids:
3189
      vf_nvinfo = all_nvinfo.copy()
3190
      vf_node_info = list(self.my_node_info.values())
3191
      additional_node_uuids = []
3192
      if master_node_uuid not in self.my_node_info:
3193
        additional_node_uuids.append(master_node_uuid)
3194
        vf_node_info.append(self.all_node_info[master_node_uuid])
3195
      # Add the first vm_capable node we find which is not included,
3196
      # excluding the master node (which we already have)
3197
      for node_uuid in absent_node_uuids:
3198
        nodeinfo = self.all_node_info[node_uuid]
3199
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3200
            node_uuid != master_node_uuid):
3201
          additional_node_uuids.append(node_uuid)
3202
          vf_node_info.append(self.all_node_info[node_uuid])
3203
          break
3204
      key = constants.NV_FILELIST
3205
      vf_nvinfo.update(self.rpc.call_node_verify(
3206
         additional_node_uuids, {key: node_verify_param[key]},
3207
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
3208
         node_group_uuids,
3209
         groups_config))
3210
    else:
3211
      vf_nvinfo = all_nvinfo
3212
      vf_node_info = self.my_node_info.values()
3213

    
3214
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3215

    
3216
    feedback_fn("* Verifying node status")
3217

    
3218
    refos_img = None
3219

    
3220
    for node_i in node_data_list:
3221
      nimg = node_image[node_i.uuid]
3222

    
3223
      if node_i.offline:
3224
        if verbose:
3225
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3226
        n_offline += 1
3227
        continue
3228

    
3229
      if node_i.uuid == master_node_uuid:
3230
        ntype = "master"
3231
      elif node_i.master_candidate:
3232
        ntype = "master candidate"
3233
      elif node_i.drained:
3234
        ntype = "drained"
3235
        n_drained += 1
3236
      else:
3237
        ntype = "regular"
3238
      if verbose:
3239
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3240

    
3241
      msg = all_nvinfo[node_i.uuid].fail_msg
3242
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3243
                    "while contacting node: %s", msg)
3244
      if msg:
3245
        nimg.rpc_fail = True
3246
        continue
3247

    
3248
      nresult = all_nvinfo[node_i.uuid].payload
3249

    
3250
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3251
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3252
      self._VerifyNodeNetwork(node_i, nresult)
3253
      self._VerifyNodeUserScripts(node_i, nresult)
3254
      self._VerifyOob(node_i, nresult)
3255
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3256
                                           node_i.uuid == master_node_uuid)
3257
      self._VerifyFileStoragePaths(node_i, nresult)
3258
      self._VerifySharedFileStoragePaths(node_i, nresult)
3259

    
3260
      if nimg.vm_capable:
3261
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3262
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3263
                             all_drbd_map)
3264

    
3265
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3266
        self._UpdateNodeInstances(node_i, nresult, nimg)
3267
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3268
        self._UpdateNodeOS(node_i, nresult, nimg)
3269

    
3270
        if not nimg.os_fail:
3271
          if refos_img is None:
3272
            refos_img = nimg
3273
          self._VerifyNodeOS(node_i, nimg, refos_img)
3274
        self._VerifyNodeBridges(node_i, nresult, bridges)
3275

    
3276
        # Check whether all running instances are primary for the node. (This
3277
        # can no longer be done from _VerifyInstance below, since some of the
3278
        # wrong instances could be from other node groups.)
3279
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3280

    
3281
        for inst_uuid in non_primary_inst_uuids:
3282
          test = inst_uuid in self.all_inst_info
3283
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3284
                        self.cfg.GetInstanceName(inst_uuid),
3285
                        "instance should not run on node %s", node_i.name)
3286
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3287
                        "node is running unknown instance %s", inst_uuid)
3288

    
3289
    self._VerifyGroupDRBDVersion(all_nvinfo)
3290
    self._VerifyGroupLVM(node_image, vg_name)
3291

    
3292
    for node_uuid, result in extra_lv_nvinfo.items():
3293
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3294
                              node_image[node_uuid], vg_name)
3295

    
3296
    feedback_fn("* Verifying instance status")
3297
    for inst_uuid in self.my_inst_uuids:
3298
      instance = self.my_inst_info[inst_uuid]
3299
      if verbose:
3300
        feedback_fn("* Verifying instance %s" % instance.name)
3301
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3302

    
3303
      # If the instance is non-redundant we cannot survive losing its primary
3304
      # node, so we are not N+1 compliant.
3305
      if instance.disk_template not in constants.DTS_MIRRORED:
3306
        i_non_redundant.append(instance)
3307

    
3308
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3309
        i_non_a_balanced.append(instance)
3310

    
3311
    feedback_fn("* Verifying orphan volumes")
3312
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3313

    
3314
    # We will get spurious "unknown volume" warnings if any node of this group
3315
    # is secondary for an instance whose primary is in another group. To avoid
3316
    # them, we find these instances and add their volumes to node_vol_should.
3317
    for instance in self.all_inst_info.values():
3318
      for secondary in instance.secondary_nodes:
3319
        if (secondary in self.my_node_info
3320
            and instance.name not in self.my_inst_info):
3321
          instance.MapLVsByNode(node_vol_should)
3322
          break
3323

    
3324
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3325

    
3326
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3327
      feedback_fn("* Verifying N+1 Memory redundancy")
3328
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3329

    
3330
    feedback_fn("* Other Notes")
3331
    if i_non_redundant:
3332
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3333
                  % len(i_non_redundant))
3334

    
3335
    if i_non_a_balanced:
3336
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3337
                  % len(i_non_a_balanced))
3338

    
3339
    if i_offline:
3340
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3341

    
3342
    if n_offline:
3343
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3344

    
3345
    if n_drained:
3346
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3347

    
3348
    return not self.bad
3349

    
3350
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3351
    """Analyze the post-hooks' result
3352

3353
    This method analyses the hook result, handles it, and sends some
3354
    nicely-formatted feedback back to the user.
3355

3356
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3357
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3358
    @param hooks_results: the results of the multi-node hooks rpc call
3359
    @param feedback_fn: function used send feedback back to the caller
3360
    @param lu_result: previous Exec result
3361
    @return: the new Exec result, based on the previous result
3362
        and hook results
3363

3364
    """
3365
    # We only really run POST phase hooks, only for non-empty groups,
3366
    # and are only interested in their results
3367
    if not self.my_node_uuids:
3368
      # empty node group
3369
      pass
3370
    elif phase == constants.HOOKS_PHASE_POST:
3371
      # Used to change hooks' output to proper indentation
3372
      feedback_fn("* Hooks Results")
3373
      assert hooks_results, "invalid result from hooks"
3374

    
3375
      for node_name in hooks_results:
3376
        res = hooks_results[node_name]
3377
        msg = res.fail_msg
3378
        test = msg and not res.offline
3379
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3380
                      "Communication failure in hooks execution: %s", msg)
3381
        if res.offline or msg:
3382
          # No need to investigate payload if node is offline or gave
3383
          # an error.
3384
          continue
3385
        for script, hkr, output in res.payload:
3386
          test = hkr == constants.HKR_FAIL
3387
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3388
                        "Script %s failed, output:", script)
3389
          if test:
3390
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3391
            feedback_fn("%s" % output)
3392
            lu_result = False
3393

    
3394
    return lu_result
3395

    
3396

    
3397
class LUClusterVerifyDisks(NoHooksLU):
3398
  """Verifies the cluster disks status.
3399

3400
  """
3401
  REQ_BGL = False
3402

    
3403
  def ExpandNames(self):
3404
    self.share_locks = ShareAll()
3405
    self.needed_locks = {
3406
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3407
      }
3408

    
3409
  def Exec(self, feedback_fn):
3410
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3411

    
3412
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3413
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3414
                           for group in group_names])