Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 11eeb1b9

History | View | Annotate | Download (131 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import copy
25
import itertools
26
import logging
27
import operator
28
import os
29
import re
30
import time
31

    
32
from ganeti import compat
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import hypervisor
36
from ganeti import locking
37
from ganeti import masterd
38
from ganeti import netutils
39
from ganeti import objects
40
from ganeti import opcodes
41
from ganeti import pathutils
42
from ganeti import query
43
import ganeti.rpc.node as rpc
44
from ganeti import runtime
45
from ganeti import ssh
46
from ganeti import uidpool
47
from ganeti import utils
48
from ganeti import vcluster
49

    
50
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
51
  ResultWithJobs
52
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
53
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
54
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
55
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
56
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
57
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
58
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
59
  CheckDiskAccessModeConsistency, CreateNewClientCert
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
def _UpdateMasterClientCert(
65
    lu, master_uuid, cluster, feedback_fn,
66
    client_cert=pathutils.NODED_CLIENT_CERT_FILE,
67
    client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP):
68
  """Renews the master's client certificate and propagates the config.
69

70
  @type lu: C{LogicalUnit}
71
  @param lu: the logical unit holding the config
72
  @type master_uuid: string
73
  @param master_uuid: the master node's UUID
74
  @type cluster: C{objects.Cluster}
75
  @param cluster: the cluster's configuration
76
  @type feedback_fn: function
77
  @param feedback_fn: feedback functions for config updates
78
  @type client_cert: string
79
  @param client_cert: the path of the client certificate
80
  @type client_cert_tmp: string
81
  @param client_cert_tmp: the temporary path of the client certificate
82
  @rtype: string
83
  @return: the digest of the newly created client certificate
84

85
  """
86
  client_digest = CreateNewClientCert(lu, master_uuid, filename=client_cert_tmp)
87
  utils.AddNodeToCandidateCerts(master_uuid, client_digest,
88
                                cluster.candidate_certs)
89
  # This triggers an update of the config and distribution of it with the old
90
  # SSL certificate
91
  lu.cfg.Update(cluster, feedback_fn)
92

    
93
  utils.RemoveFile(client_cert)
94
  utils.RenameFile(client_cert_tmp, client_cert)
95
  return client_digest
96

    
97

    
98
class LUClusterRenewCrypto(NoHooksLU):
99
  """Renew the cluster's crypto tokens.
100

101
  Note that most of this operation is done in gnt_cluster.py, this LU only
102
  takes care of the renewal of the client SSL certificates.
103

104
  """
105
  def Exec(self, feedback_fn):
106
    master_uuid = self.cfg.GetMasterNode()
107
    cluster = self.cfg.GetClusterInfo()
108

    
109
    server_digest = utils.GetCertificateDigest(
110
      cert_filename=pathutils.NODED_CERT_FILE)
111
    utils.AddNodeToCandidateCerts("%s-SERVER" % master_uuid,
112
                                  server_digest,
113
                                  cluster.candidate_certs)
114
    new_master_digest = _UpdateMasterClientCert(self, master_uuid, cluster,
115
                                                feedback_fn)
116

    
117
    cluster.candidate_certs = {master_uuid: new_master_digest}
118
    nodes = self.cfg.GetAllNodesInfo()
119
    for (node_uuid, node_info) in nodes.items():
120
      if node_uuid != master_uuid:
121
        new_digest = CreateNewClientCert(self, node_uuid)
122
        if node_info.master_candidate:
123
          cluster.candidate_certs[node_uuid] = new_digest
124
    # Trigger another update of the config now with the new master cert
125
    self.cfg.Update(cluster, feedback_fn)
126

    
127

    
128
class LUClusterActivateMasterIp(NoHooksLU):
129
  """Activate the master IP on the master node.
130

131
  """
132
  def Exec(self, feedback_fn):
133
    """Activate the master IP.
134

135
    """
136
    master_params = self.cfg.GetMasterNetworkParameters()
137
    ems = self.cfg.GetUseExternalMipScript()
138
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
139
                                                   master_params, ems)
140
    result.Raise("Could not activate the master IP")
141

    
142

    
143
class LUClusterDeactivateMasterIp(NoHooksLU):
144
  """Deactivate the master IP on the master node.
145

146
  """
147
  def Exec(self, feedback_fn):
148
    """Deactivate the master IP.
149

150
    """
151
    master_params = self.cfg.GetMasterNetworkParameters()
152
    ems = self.cfg.GetUseExternalMipScript()
153
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
154
                                                     master_params, ems)
155
    result.Raise("Could not deactivate the master IP")
156

    
157

    
158
class LUClusterConfigQuery(NoHooksLU):
159
  """Return configuration values.
160

161
  """
162
  REQ_BGL = False
163

    
164
  def CheckArguments(self):
165
    self.cq = ClusterQuery(None, self.op.output_fields, False)
166

    
167
  def ExpandNames(self):
168
    self.cq.ExpandNames(self)
169

    
170
  def DeclareLocks(self, level):
171
    self.cq.DeclareLocks(self, level)
172

    
173
  def Exec(self, feedback_fn):
174
    result = self.cq.OldStyleQuery(self)
175

    
176
    assert len(result) == 1
177

    
178
    return result[0]
179

    
180

    
181
class LUClusterDestroy(LogicalUnit):
182
  """Logical unit for destroying the cluster.
183

184
  """
185
  HPATH = "cluster-destroy"
186
  HTYPE = constants.HTYPE_CLUSTER
187

    
188
  def BuildHooksEnv(self):
189
    """Build hooks env.
190

191
    """
192
    return {
193
      "OP_TARGET": self.cfg.GetClusterName(),
194
      }
195

    
196
  def BuildHooksNodes(self):
197
    """Build hooks nodes.
198

199
    """
200
    return ([], [])
201

    
202
  def CheckPrereq(self):
203
    """Check prerequisites.
204

205
    This checks whether the cluster is empty.
206

207
    Any errors are signaled by raising errors.OpPrereqError.
208

209
    """
210
    master = self.cfg.GetMasterNode()
211

    
212
    nodelist = self.cfg.GetNodeList()
213
    if len(nodelist) != 1 or nodelist[0] != master:
214
      raise errors.OpPrereqError("There are still %d node(s) in"
215
                                 " this cluster." % (len(nodelist) - 1),
216
                                 errors.ECODE_INVAL)
217
    instancelist = self.cfg.GetInstanceList()
218
    if instancelist:
219
      raise errors.OpPrereqError("There are still %d instance(s) in"
220
                                 " this cluster." % len(instancelist),
221
                                 errors.ECODE_INVAL)
222

    
223
  def Exec(self, feedback_fn):
224
    """Destroys the cluster.
225

226
    """
227
    master_params = self.cfg.GetMasterNetworkParameters()
228

    
229
    # Run post hooks on master node before it's removed
230
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
231

    
232
    ems = self.cfg.GetUseExternalMipScript()
233
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
234
                                                     master_params, ems)
235
    result.Warn("Error disabling the master IP address", self.LogWarning)
236
    return master_params.uuid
237

    
238

    
239
class LUClusterPostInit(LogicalUnit):
240
  """Logical unit for running hooks after cluster initialization.
241

242
  """
243
  HPATH = "cluster-init"
244
  HTYPE = constants.HTYPE_CLUSTER
245

    
246
  def CheckArguments(self):
247
    self.master_uuid = self.cfg.GetMasterNode()
248
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
249

    
250
    # TODO: When Issue 584 is solved, and None is properly parsed when used
251
    # as a default value, ndparams.get(.., None) can be changed to
252
    # ndparams[..] to access the values directly
253

    
254
    # OpenvSwitch: Warn user if link is missing
255
    if (self.master_ndparams[constants.ND_OVS] and not
256
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
257
      self.LogInfo("No physical interface for OpenvSwitch was given."
258
                   " OpenvSwitch will not have an outside connection. This"
259
                   " might not be what you want.")
260

    
261
  def BuildHooksEnv(self):
262
    """Build hooks env.
263

264
    """
265
    return {
266
      "OP_TARGET": self.cfg.GetClusterName(),
267
      }
268

    
269
  def BuildHooksNodes(self):
270
    """Build hooks nodes.
271

272
    """
273
    return ([], [self.cfg.GetMasterNode()])
274

    
275
  def Exec(self, feedback_fn):
276
    """Create and configure Open vSwitch
277

278
    """
279
    if self.master_ndparams[constants.ND_OVS]:
280
      result = self.rpc.call_node_configure_ovs(
281
                 self.master_uuid,
282
                 self.master_ndparams[constants.ND_OVS_NAME],
283
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
284
      result.Raise("Could not successully configure Open vSwitch")
285

    
286
    cluster = self.cfg.GetClusterInfo()
287
    _UpdateMasterClientCert(self, self.master_uuid, cluster, feedback_fn)
288

    
289
    return True
290

    
291

    
292
class ClusterQuery(QueryBase):
293
  FIELDS = query.CLUSTER_FIELDS
294

    
295
  #: Do not sort (there is only one item)
296
  SORT_FIELD = None
297

    
298
  def ExpandNames(self, lu):
299
    lu.needed_locks = {}
300

    
301
    # The following variables interact with _QueryBase._GetNames
302
    self.wanted = locking.ALL_SET
303
    self.do_locking = self.use_locking
304

    
305
    if self.do_locking:
306
      raise errors.OpPrereqError("Can not use locking for cluster queries",
307
                                 errors.ECODE_INVAL)
308

    
309
  def DeclareLocks(self, lu, level):
310
    pass
311

    
312
  def _GetQueryData(self, lu):
313
    """Computes the list of nodes and their attributes.
314

315
    """
316
    # Locking is not used
317
    assert not (compat.any(lu.glm.is_owned(level)
318
                           for level in locking.LEVELS
319
                           if level != locking.LEVEL_CLUSTER) or
320
                self.do_locking or self.use_locking)
321

    
322
    if query.CQ_CONFIG in self.requested_data:
323
      cluster = lu.cfg.GetClusterInfo()
324
      nodes = lu.cfg.GetAllNodesInfo()
325
    else:
326
      cluster = NotImplemented
327
      nodes = NotImplemented
328

    
329
    if query.CQ_QUEUE_DRAINED in self.requested_data:
330
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
331
    else:
332
      drain_flag = NotImplemented
333

    
334
    if query.CQ_WATCHER_PAUSE in self.requested_data:
335
      master_node_uuid = lu.cfg.GetMasterNode()
336

    
337
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
338
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
339
                   lu.cfg.GetMasterNodeName())
340

    
341
      watcher_pause = result.payload
342
    else:
343
      watcher_pause = NotImplemented
344

    
345
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
346

    
347

    
348
class LUClusterQuery(NoHooksLU):
349
  """Query cluster configuration.
350

351
  """
352
  REQ_BGL = False
353

    
354
  def ExpandNames(self):
355
    self.needed_locks = {}
356

    
357
  def Exec(self, feedback_fn):
358
    """Return cluster config.
359

360
    """
361
    cluster = self.cfg.GetClusterInfo()
362
    os_hvp = {}
363

    
364
    # Filter just for enabled hypervisors
365
    for os_name, hv_dict in cluster.os_hvp.items():
366
      os_hvp[os_name] = {}
367
      for hv_name, hv_params in hv_dict.items():
368
        if hv_name in cluster.enabled_hypervisors:
369
          os_hvp[os_name][hv_name] = hv_params
370

    
371
    # Convert ip_family to ip_version
372
    primary_ip_version = constants.IP4_VERSION
373
    if cluster.primary_ip_family == netutils.IP6Address.family:
374
      primary_ip_version = constants.IP6_VERSION
375

    
376
    result = {
377
      "software_version": constants.RELEASE_VERSION,
378
      "protocol_version": constants.PROTOCOL_VERSION,
379
      "config_version": constants.CONFIG_VERSION,
380
      "os_api_version": max(constants.OS_API_VERSIONS),
381
      "export_version": constants.EXPORT_VERSION,
382
      "vcs_version": constants.VCS_VERSION,
383
      "architecture": runtime.GetArchInfo(),
384
      "name": cluster.cluster_name,
385
      "master": self.cfg.GetMasterNodeName(),
386
      "default_hypervisor": cluster.primary_hypervisor,
387
      "enabled_hypervisors": cluster.enabled_hypervisors,
388
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
389
                        for hypervisor_name in cluster.enabled_hypervisors]),
390
      "os_hvp": os_hvp,
391
      "beparams": cluster.beparams,
392
      "osparams": cluster.osparams,
393
      "ipolicy": cluster.ipolicy,
394
      "nicparams": cluster.nicparams,
395
      "ndparams": cluster.ndparams,
396
      "diskparams": cluster.diskparams,
397
      "candidate_pool_size": cluster.candidate_pool_size,
398
      "max_running_jobs": cluster.max_running_jobs,
399
      "master_netdev": cluster.master_netdev,
400
      "master_netmask": cluster.master_netmask,
401
      "use_external_mip_script": cluster.use_external_mip_script,
402
      "volume_group_name": cluster.volume_group_name,
403
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
404
      "file_storage_dir": cluster.file_storage_dir,
405
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
406
      "maintain_node_health": cluster.maintain_node_health,
407
      "ctime": cluster.ctime,
408
      "mtime": cluster.mtime,
409
      "uuid": cluster.uuid,
410
      "tags": list(cluster.GetTags()),
411
      "uid_pool": cluster.uid_pool,
412
      "default_iallocator": cluster.default_iallocator,
413
      "default_iallocator_params": cluster.default_iallocator_params,
414
      "reserved_lvs": cluster.reserved_lvs,
415
      "primary_ip_version": primary_ip_version,
416
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
417
      "hidden_os": cluster.hidden_os,
418
      "blacklisted_os": cluster.blacklisted_os,
419
      "enabled_disk_templates": cluster.enabled_disk_templates,
420
      "instance_communication_network": cluster.instance_communication_network,
421
      }
422

    
423
    return result
424

    
425

    
426
class LUClusterRedistConf(NoHooksLU):
427
  """Force the redistribution of cluster configuration.
428

429
  This is a very simple LU.
430

431
  """
432
  REQ_BGL = False
433

    
434
  def ExpandNames(self):
435
    self.needed_locks = {
436
      locking.LEVEL_NODE: locking.ALL_SET,
437
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
438
    }
439
    self.share_locks = ShareAll()
440

    
441
  def Exec(self, feedback_fn):
442
    """Redistribute the configuration.
443

444
    """
445
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
446
    RedistributeAncillaryFiles(self)
447

    
448

    
449
class LUClusterRename(LogicalUnit):
450
  """Rename the cluster.
451

452
  """
453
  HPATH = "cluster-rename"
454
  HTYPE = constants.HTYPE_CLUSTER
455

    
456
  def BuildHooksEnv(self):
457
    """Build hooks env.
458

459
    """
460
    return {
461
      "OP_TARGET": self.cfg.GetClusterName(),
462
      "NEW_NAME": self.op.name,
463
      }
464

    
465
  def BuildHooksNodes(self):
466
    """Build hooks nodes.
467

468
    """
469
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
470

    
471
  def CheckPrereq(self):
472
    """Verify that the passed name is a valid one.
473

474
    """
475
    hostname = netutils.GetHostname(name=self.op.name,
476
                                    family=self.cfg.GetPrimaryIPFamily())
477

    
478
    new_name = hostname.name
479
    self.ip = new_ip = hostname.ip
480
    old_name = self.cfg.GetClusterName()
481
    old_ip = self.cfg.GetMasterIP()
482
    if new_name == old_name and new_ip == old_ip:
483
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
484
                                 " cluster has changed",
485
                                 errors.ECODE_INVAL)
486
    if new_ip != old_ip:
487
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
488
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
489
                                   " reachable on the network" %
490
                                   new_ip, errors.ECODE_NOTUNIQUE)
491

    
492
    self.op.name = new_name
493

    
494
  def Exec(self, feedback_fn):
495
    """Rename the cluster.
496

497
    """
498
    clustername = self.op.name
499
    new_ip = self.ip
500

    
501
    # shutdown the master IP
502
    master_params = self.cfg.GetMasterNetworkParameters()
503
    ems = self.cfg.GetUseExternalMipScript()
504
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
505
                                                     master_params, ems)
506
    result.Raise("Could not disable the master role")
507

    
508
    try:
509
      cluster = self.cfg.GetClusterInfo()
510
      cluster.cluster_name = clustername
511
      cluster.master_ip = new_ip
512
      self.cfg.Update(cluster, feedback_fn)
513

    
514
      # update the known hosts file
515
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
516
      node_list = self.cfg.GetOnlineNodeList()
517
      try:
518
        node_list.remove(master_params.uuid)
519
      except ValueError:
520
        pass
521
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
522
    finally:
523
      master_params.ip = new_ip
524
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
525
                                                     master_params, ems)
526
      result.Warn("Could not re-enable the master role on the master,"
527
                  " please restart manually", self.LogWarning)
528

    
529
    return clustername
530

    
531

    
532
class LUClusterRepairDiskSizes(NoHooksLU):
533
  """Verifies the cluster disks sizes.
534

535
  """
536
  REQ_BGL = False
537

    
538
  def ExpandNames(self):
539
    if self.op.instances:
540
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
541
      # Not getting the node allocation lock as only a specific set of
542
      # instances (and their nodes) is going to be acquired
543
      self.needed_locks = {
544
        locking.LEVEL_NODE_RES: [],
545
        locking.LEVEL_INSTANCE: self.wanted_names,
546
        }
547
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
548
    else:
549
      self.wanted_names = None
550
      self.needed_locks = {
551
        locking.LEVEL_NODE_RES: locking.ALL_SET,
552
        locking.LEVEL_INSTANCE: locking.ALL_SET,
553

    
554
        # This opcode is acquires the node locks for all instances
555
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
556
        }
557

    
558
    self.share_locks = {
559
      locking.LEVEL_NODE_RES: 1,
560
      locking.LEVEL_INSTANCE: 0,
561
      locking.LEVEL_NODE_ALLOC: 1,
562
      }
563

    
564
  def DeclareLocks(self, level):
565
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
566
      self._LockInstancesNodes(primary_only=True, level=level)
567

    
568
  def CheckPrereq(self):
569
    """Check prerequisites.
570

571
    This only checks the optional instance list against the existing names.
572

573
    """
574
    if self.wanted_names is None:
575
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
576

    
577
    self.wanted_instances = \
578
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
579

    
580
  def _EnsureChildSizes(self, disk):
581
    """Ensure children of the disk have the needed disk size.
582

583
    This is valid mainly for DRBD8 and fixes an issue where the
584
    children have smaller disk size.
585

586
    @param disk: an L{ganeti.objects.Disk} object
587

588
    """
589
    if disk.dev_type == constants.DT_DRBD8:
590
      assert disk.children, "Empty children for DRBD8?"
591
      fchild = disk.children[0]
592
      mismatch = fchild.size < disk.size
593
      if mismatch:
594
        self.LogInfo("Child disk has size %d, parent %d, fixing",
595
                     fchild.size, disk.size)
596
        fchild.size = disk.size
597

    
598
      # and we recurse on this child only, not on the metadev
599
      return self._EnsureChildSizes(fchild) or mismatch
600
    else:
601
      return False
602

    
603
  def Exec(self, feedback_fn):
604
    """Verify the size of cluster disks.
605

606
    """
607
    # TODO: check child disks too
608
    # TODO: check differences in size between primary/secondary nodes
609
    per_node_disks = {}
610
    for instance in self.wanted_instances:
611
      pnode = instance.primary_node
612
      if pnode not in per_node_disks:
613
        per_node_disks[pnode] = []
614
      for idx, disk in enumerate(instance.disks):
615
        per_node_disks[pnode].append((instance, idx, disk))
616

    
617
    assert not (frozenset(per_node_disks.keys()) -
618
                self.owned_locks(locking.LEVEL_NODE_RES)), \
619
      "Not owning correct locks"
620
    assert not self.owned_locks(locking.LEVEL_NODE)
621

    
622
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
623
                                               per_node_disks.keys())
624

    
625
    changed = []
626
    for node_uuid, dskl in per_node_disks.items():
627
      if not dskl:
628
        # no disks on the node
629
        continue
630

    
631
      newl = [([v[2].Copy()], v[0]) for v in dskl]
632
      node_name = self.cfg.GetNodeName(node_uuid)
633
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
634
      if result.fail_msg:
635
        self.LogWarning("Failure in blockdev_getdimensions call to node"
636
                        " %s, ignoring", node_name)
637
        continue
638
      if len(result.payload) != len(dskl):
639
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
640
                        " result.payload=%s", node_name, len(dskl),
641
                        result.payload)
642
        self.LogWarning("Invalid result from node %s, ignoring node results",
643
                        node_name)
644
        continue
645
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
646
        if dimensions is None:
647
          self.LogWarning("Disk %d of instance %s did not return size"
648
                          " information, ignoring", idx, instance.name)
649
          continue
650
        if not isinstance(dimensions, (tuple, list)):
651
          self.LogWarning("Disk %d of instance %s did not return valid"
652
                          " dimension information, ignoring", idx,
653
                          instance.name)
654
          continue
655
        (size, spindles) = dimensions
656
        if not isinstance(size, (int, long)):
657
          self.LogWarning("Disk %d of instance %s did not return valid"
658
                          " size information, ignoring", idx, instance.name)
659
          continue
660
        size = size >> 20
661
        if size != disk.size:
662
          self.LogInfo("Disk %d of instance %s has mismatched size,"
663
                       " correcting: recorded %d, actual %d", idx,
664
                       instance.name, disk.size, size)
665
          disk.size = size
666
          self.cfg.Update(instance, feedback_fn)
667
          changed.append((instance.name, idx, "size", size))
668
        if es_flags[node_uuid]:
669
          if spindles is None:
670
            self.LogWarning("Disk %d of instance %s did not return valid"
671
                            " spindles information, ignoring", idx,
672
                            instance.name)
673
          elif disk.spindles is None or disk.spindles != spindles:
674
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
675
                         " correcting: recorded %s, actual %s",
676
                         idx, instance.name, disk.spindles, spindles)
677
            disk.spindles = spindles
678
            self.cfg.Update(instance, feedback_fn)
679
            changed.append((instance.name, idx, "spindles", disk.spindles))
680
        if self._EnsureChildSizes(disk):
681
          self.cfg.Update(instance, feedback_fn)
682
          changed.append((instance.name, idx, "size", disk.size))
683
    return changed
684

    
685

    
686
def _ValidateNetmask(cfg, netmask):
687
  """Checks if a netmask is valid.
688

689
  @type cfg: L{config.ConfigWriter}
690
  @param cfg: The cluster configuration
691
  @type netmask: int
692
  @param netmask: the netmask to be verified
693
  @raise errors.OpPrereqError: if the validation fails
694

695
  """
696
  ip_family = cfg.GetPrimaryIPFamily()
697
  try:
698
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
699
  except errors.ProgrammerError:
700
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
701
                               ip_family, errors.ECODE_INVAL)
702
  if not ipcls.ValidateNetmask(netmask):
703
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
704
                               (netmask), errors.ECODE_INVAL)
705

    
706

    
707
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
708
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
709
    file_disk_template):
710
  """Checks whether the given file-based storage directory is acceptable.
711

712
  Note: This function is public, because it is also used in bootstrap.py.
713

714
  @type logging_warn_fn: function
715
  @param logging_warn_fn: function which accepts a string and logs it
716
  @type file_storage_dir: string
717
  @param file_storage_dir: the directory to be used for file-based instances
718
  @type enabled_disk_templates: list of string
719
  @param enabled_disk_templates: the list of enabled disk templates
720
  @type file_disk_template: string
721
  @param file_disk_template: the file-based disk template for which the
722
      path should be checked
723

724
  """
725
  assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
726
            constants.ST_FILE, constants.ST_SHARED_FILE
727
         ))
728
  file_storage_enabled = file_disk_template in enabled_disk_templates
729
  if file_storage_dir is not None:
730
    if file_storage_dir == "":
731
      if file_storage_enabled:
732
        raise errors.OpPrereqError(
733
            "Unsetting the '%s' storage directory while having '%s' storage"
734
            " enabled is not permitted." %
735
            (file_disk_template, file_disk_template))
736
    else:
737
      if not file_storage_enabled:
738
        logging_warn_fn(
739
            "Specified a %s storage directory, although %s storage is not"
740
            " enabled." % (file_disk_template, file_disk_template))
741
  else:
742
    raise errors.ProgrammerError("Received %s storage dir with value"
743
                                 " 'None'." % file_disk_template)
744

    
745

    
746
def CheckFileStoragePathVsEnabledDiskTemplates(
747
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
748
  """Checks whether the given file storage directory is acceptable.
749

750
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
751

752
  """
753
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
754
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
755
      constants.DT_FILE)
756

    
757

    
758
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
759
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
760
  """Checks whether the given shared file storage directory is acceptable.
761

762
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
763

764
  """
765
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
766
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
767
      constants.DT_SHARED_FILE)
768

    
769

    
770
class LUClusterSetParams(LogicalUnit):
771
  """Change the parameters of the cluster.
772

773
  """
774
  HPATH = "cluster-modify"
775
  HTYPE = constants.HTYPE_CLUSTER
776
  REQ_BGL = False
777

    
778
  def CheckArguments(self):
779
    """Check parameters
780

781
    """
782
    if self.op.uid_pool:
783
      uidpool.CheckUidPool(self.op.uid_pool)
784

    
785
    if self.op.add_uids:
786
      uidpool.CheckUidPool(self.op.add_uids)
787

    
788
    if self.op.remove_uids:
789
      uidpool.CheckUidPool(self.op.remove_uids)
790

    
791
    if self.op.master_netmask is not None:
792
      _ValidateNetmask(self.cfg, self.op.master_netmask)
793

    
794
    if self.op.diskparams:
795
      for dt_params in self.op.diskparams.values():
796
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
797
      try:
798
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
799
        CheckDiskAccessModeValidity(self.op.diskparams)
800
      except errors.OpPrereqError, err:
801
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
802
                                   errors.ECODE_INVAL)
803

    
804
  def ExpandNames(self):
805
    # FIXME: in the future maybe other cluster params won't require checking on
806
    # all nodes to be modified.
807
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
808
    # resource locks the right thing, shouldn't it be the BGL instead?
809
    self.needed_locks = {
810
      locking.LEVEL_NODE: locking.ALL_SET,
811
      locking.LEVEL_INSTANCE: locking.ALL_SET,
812
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
813
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
814
    }
815
    self.share_locks = ShareAll()
816

    
817
  def BuildHooksEnv(self):
818
    """Build hooks env.
819

820
    """
821
    return {
822
      "OP_TARGET": self.cfg.GetClusterName(),
823
      "NEW_VG_NAME": self.op.vg_name,
824
      }
825

    
826
  def BuildHooksNodes(self):
827
    """Build hooks nodes.
828

829
    """
830
    mn = self.cfg.GetMasterNode()
831
    return ([mn], [mn])
832

    
833
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
834
                   new_enabled_disk_templates):
835
    """Check the consistency of the vg name on all nodes and in case it gets
836
       unset whether there are instances still using it.
837

838
    """
839
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
840
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
841
                                            new_enabled_disk_templates)
842
    current_vg_name = self.cfg.GetVGName()
843

    
844
    if self.op.vg_name == '':
845
      if lvm_is_enabled:
846
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
847
                                   " disk templates are or get enabled.")
848

    
849
    if self.op.vg_name is None:
850
      if current_vg_name is None and lvm_is_enabled:
851
        raise errors.OpPrereqError("Please specify a volume group when"
852
                                   " enabling lvm-based disk-templates.")
853

    
854
    if self.op.vg_name is not None and not self.op.vg_name:
855
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
856
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
857
                                   " instances exist", errors.ECODE_INVAL)
858

    
859
    if (self.op.vg_name is not None and lvm_is_enabled) or \
860
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
861
      self._CheckVgNameOnNodes(node_uuids)
862

    
863
  def _CheckVgNameOnNodes(self, node_uuids):
864
    """Check the status of the volume group on each node.
865

866
    """
867
    vglist = self.rpc.call_vg_list(node_uuids)
868
    for node_uuid in node_uuids:
869
      msg = vglist[node_uuid].fail_msg
870
      if msg:
871
        # ignoring down node
872
        self.LogWarning("Error while gathering data on node %s"
873
                        " (ignoring node): %s",
874
                        self.cfg.GetNodeName(node_uuid), msg)
875
        continue
876
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
877
                                            self.op.vg_name,
878
                                            constants.MIN_VG_SIZE)
879
      if vgstatus:
880
        raise errors.OpPrereqError("Error on node '%s': %s" %
881
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
882
                                   errors.ECODE_ENVIRON)
883

    
884
  @staticmethod
885
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
886
                                old_enabled_disk_templates):
887
    """Computes three sets of disk templates.
888

889
    @see: C{_GetDiskTemplateSets} for more details.
890

891
    """
892
    enabled_disk_templates = None
893
    new_enabled_disk_templates = []
894
    disabled_disk_templates = []
895
    if op_enabled_disk_templates:
896
      enabled_disk_templates = op_enabled_disk_templates
897
      new_enabled_disk_templates = \
898
        list(set(enabled_disk_templates)
899
             - set(old_enabled_disk_templates))
900
      disabled_disk_templates = \
901
        list(set(old_enabled_disk_templates)
902
             - set(enabled_disk_templates))
903
    else:
904
      enabled_disk_templates = old_enabled_disk_templates
905
    return (enabled_disk_templates, new_enabled_disk_templates,
906
            disabled_disk_templates)
907

    
908
  def _GetDiskTemplateSets(self, cluster):
909
    """Computes three sets of disk templates.
910

911
    The three sets are:
912
      - disk templates that will be enabled after this operation (no matter if
913
        they were enabled before or not)
914
      - disk templates that get enabled by this operation (thus haven't been
915
        enabled before.)
916
      - disk templates that get disabled by this operation
917

918
    """
919
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
920
                                          cluster.enabled_disk_templates)
921

    
922
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
923
    """Checks the ipolicy.
924

925
    @type cluster: C{objects.Cluster}
926
    @param cluster: the cluster's configuration
927
    @type enabled_disk_templates: list of string
928
    @param enabled_disk_templates: list of (possibly newly) enabled disk
929
      templates
930

931
    """
932
    # FIXME: write unit tests for this
933
    if self.op.ipolicy:
934
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
935
                                           group_policy=False)
936

    
937
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
938
                                  enabled_disk_templates)
939

    
940
      all_instances = self.cfg.GetAllInstancesInfo().values()
941
      violations = set()
942
      for group in self.cfg.GetAllNodeGroupsInfo().values():
943
        instances = frozenset([inst for inst in all_instances
944
                               if compat.any(nuuid in group.members
945
                                             for nuuid in inst.all_nodes)])
946
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
947
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
948
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
949
                                           self.cfg)
950
        if new:
951
          violations.update(new)
952

    
953
      if violations:
954
        self.LogWarning("After the ipolicy change the following instances"
955
                        " violate them: %s",
956
                        utils.CommaJoin(utils.NiceSort(violations)))
957
    else:
958
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
959
                                  enabled_disk_templates)
960

    
961
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
962
    """Checks whether the set DRBD helper actually exists on the nodes.
963

964
    @type drbd_helper: string
965
    @param drbd_helper: path of the drbd usermode helper binary
966
    @type node_uuids: list of strings
967
    @param node_uuids: list of node UUIDs to check for the helper
968

969
    """
970
    # checks given drbd helper on all nodes
971
    helpers = self.rpc.call_drbd_helper(node_uuids)
972
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
973
      if ninfo.offline:
974
        self.LogInfo("Not checking drbd helper on offline node %s",
975
                     ninfo.name)
976
        continue
977
      msg = helpers[ninfo.uuid].fail_msg
978
      if msg:
979
        raise errors.OpPrereqError("Error checking drbd helper on node"
980
                                   " '%s': %s" % (ninfo.name, msg),
981
                                   errors.ECODE_ENVIRON)
982
      node_helper = helpers[ninfo.uuid].payload
983
      if node_helper != drbd_helper:
984
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
985
                                   (ninfo.name, node_helper),
986
                                   errors.ECODE_ENVIRON)
987

    
988
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
989
    """Check the DRBD usermode helper.
990

991
    @type node_uuids: list of strings
992
    @param node_uuids: a list of nodes' UUIDs
993
    @type drbd_enabled: boolean
994
    @param drbd_enabled: whether DRBD will be enabled after this operation
995
      (no matter if it was disabled before or not)
996
    @type drbd_gets_enabled: boolen
997
    @param drbd_gets_enabled: true if DRBD was disabled before this
998
      operation, but will be enabled afterwards
999

1000
    """
1001
    if self.op.drbd_helper == '':
1002
      if drbd_enabled:
1003
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1004
                                   " DRBD is enabled.")
1005
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
1006
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1007
                                   " drbd-based instances exist",
1008
                                   errors.ECODE_INVAL)
1009

    
1010
    else:
1011
      if self.op.drbd_helper is not None and drbd_enabled:
1012
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
1013
      else:
1014
        if drbd_gets_enabled:
1015
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
1016
          if current_drbd_helper is not None:
1017
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
1018
          else:
1019
            raise errors.OpPrereqError("Cannot enable DRBD without a"
1020
                                       " DRBD usermode helper set.")
1021

    
1022
  def _CheckInstancesOfDisabledDiskTemplates(
1023
      self, disabled_disk_templates):
1024
    """Check whether we try to disable a disk template that is in use.
1025

1026
    @type disabled_disk_templates: list of string
1027
    @param disabled_disk_templates: list of disk templates that are going to
1028
      be disabled by this operation
1029

1030
    """
1031
    for disk_template in disabled_disk_templates:
1032
      if self.cfg.HasAnyDiskOfType(disk_template):
1033
        raise errors.OpPrereqError(
1034
            "Cannot disable disk template '%s', because there is at least one"
1035
            " instance using it." % disk_template)
1036

    
1037
  @staticmethod
1038
  def _CheckInstanceCommunicationNetwork(network, warning_fn):
1039
    """Check whether an existing network is configured for instance
1040
    communication.
1041

1042
    Checks whether an existing network is configured with the
1043
    parameters that are advisable for instance communication, and
1044
    otherwise issue security warnings.
1045

1046
    @type network: L{ganeti.objects.Network}
1047
    @param network: L{ganeti.objects.Network} object whose
1048
                    configuration is being checked
1049
    @type warning_fn: function
1050
    @param warning_fn: function used to print warnings
1051
    @rtype: None
1052
    @return: None
1053

1054
    """
1055
    def _MaybeWarn(err, val, default):
1056
      if val != default:
1057
        warning_fn("Supplied instance communication network '%s' %s '%s',"
1058
                   " this might pose a security risk (default is '%s').",
1059
                   network.name, err, val, default)
1060

    
1061
    if network.network is None:
1062
      raise errors.OpPrereqError("Supplied instance communication network '%s'"
1063
                                 " must have an IPv4 network address.",
1064
                                 network.name)
1065

    
1066
    _MaybeWarn("has an IPv4 gateway", network.gateway, None)
1067
    _MaybeWarn("has a non-standard IPv4 network address", network.network,
1068
               constants.INSTANCE_COMMUNICATION_NETWORK4)
1069
    _MaybeWarn("has an IPv6 gateway", network.gateway6, None)
1070
    _MaybeWarn("has a non-standard IPv6 network address", network.network6,
1071
               constants.INSTANCE_COMMUNICATION_NETWORK6)
1072
    _MaybeWarn("has a non-standard MAC prefix", network.mac_prefix,
1073
               constants.INSTANCE_COMMUNICATION_MAC_PREFIX)
1074

    
1075
  def CheckPrereq(self):
1076
    """Check prerequisites.
1077

1078
    This checks whether the given params don't conflict and
1079
    if the given volume group is valid.
1080

1081
    """
1082
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1083
    self.cluster = cluster = self.cfg.GetClusterInfo()
1084

    
1085
    vm_capable_node_uuids = [node.uuid
1086
                             for node in self.cfg.GetAllNodesInfo().values()
1087
                             if node.uuid in node_uuids and node.vm_capable]
1088

    
1089
    (enabled_disk_templates, new_enabled_disk_templates,
1090
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
1091
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
1092

    
1093
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
1094
                      new_enabled_disk_templates)
1095

    
1096
    if self.op.file_storage_dir is not None:
1097
      CheckFileStoragePathVsEnabledDiskTemplates(
1098
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
1099

    
1100
    if self.op.shared_file_storage_dir is not None:
1101
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
1102
          self.LogWarning, self.op.shared_file_storage_dir,
1103
          enabled_disk_templates)
1104

    
1105
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1106
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1107
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1108

    
1109
    # validate params changes
1110
    if self.op.beparams:
1111
      objects.UpgradeBeParams(self.op.beparams)
1112
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1113
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1114

    
1115
    if self.op.ndparams:
1116
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1117
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1118

    
1119
      # TODO: we need a more general way to handle resetting
1120
      # cluster-level parameters to default values
1121
      if self.new_ndparams["oob_program"] == "":
1122
        self.new_ndparams["oob_program"] = \
1123
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1124

    
1125
    if self.op.hv_state:
1126
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1127
                                           self.cluster.hv_state_static)
1128
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1129
                               for hv, values in new_hv_state.items())
1130

    
1131
    if self.op.disk_state:
1132
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1133
                                               self.cluster.disk_state_static)
1134
      self.new_disk_state = \
1135
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1136
                            for name, values in svalues.items()))
1137
             for storage, svalues in new_disk_state.items())
1138

    
1139
    self._CheckIpolicy(cluster, enabled_disk_templates)
1140

    
1141
    if self.op.nicparams:
1142
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1143
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1144
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1145
      nic_errors = []
1146

    
1147
      # check all instances for consistency
1148
      for instance in self.cfg.GetAllInstancesInfo().values():
1149
        for nic_idx, nic in enumerate(instance.nics):
1150
          params_copy = copy.deepcopy(nic.nicparams)
1151
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1152

    
1153
          # check parameter syntax
1154
          try:
1155
            objects.NIC.CheckParameterSyntax(params_filled)
1156
          except errors.ConfigurationError, err:
1157
            nic_errors.append("Instance %s, nic/%d: %s" %
1158
                              (instance.name, nic_idx, err))
1159

    
1160
          # if we're moving instances to routed, check that they have an ip
1161
          target_mode = params_filled[constants.NIC_MODE]
1162
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1163
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1164
                              " address" % (instance.name, nic_idx))
1165
      if nic_errors:
1166
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1167
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1168

    
1169
    # hypervisor list/parameters
1170
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1171
    if self.op.hvparams:
1172
      for hv_name, hv_dict in self.op.hvparams.items():
1173
        if hv_name not in self.new_hvparams:
1174
          self.new_hvparams[hv_name] = hv_dict
1175
        else:
1176
          self.new_hvparams[hv_name].update(hv_dict)
1177

    
1178
    # disk template parameters
1179
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1180
    if self.op.diskparams:
1181
      for dt_name, dt_params in self.op.diskparams.items():
1182
        if dt_name not in self.new_diskparams:
1183
          self.new_diskparams[dt_name] = dt_params
1184
        else:
1185
          self.new_diskparams[dt_name].update(dt_params)
1186
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1187

    
1188
    # os hypervisor parameters
1189
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1190
    if self.op.os_hvp:
1191
      for os_name, hvs in self.op.os_hvp.items():
1192
        if os_name not in self.new_os_hvp:
1193
          self.new_os_hvp[os_name] = hvs
1194
        else:
1195
          for hv_name, hv_dict in hvs.items():
1196
            if hv_dict is None:
1197
              # Delete if it exists
1198
              self.new_os_hvp[os_name].pop(hv_name, None)
1199
            elif hv_name not in self.new_os_hvp[os_name]:
1200
              self.new_os_hvp[os_name][hv_name] = hv_dict
1201
            else:
1202
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1203

    
1204
    # os parameters
1205
    self._BuildOSParams(cluster)
1206

    
1207
    # changes to the hypervisor list
1208
    if self.op.enabled_hypervisors is not None:
1209
      self.hv_list = self.op.enabled_hypervisors
1210
      for hv in self.hv_list:
1211
        # if the hypervisor doesn't already exist in the cluster
1212
        # hvparams, we initialize it to empty, and then (in both
1213
        # cases) we make sure to fill the defaults, as we might not
1214
        # have a complete defaults list if the hypervisor wasn't
1215
        # enabled before
1216
        if hv not in new_hvp:
1217
          new_hvp[hv] = {}
1218
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1219
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1220
    else:
1221
      self.hv_list = cluster.enabled_hypervisors
1222

    
1223
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1224
      # either the enabled list has changed, or the parameters have, validate
1225
      for hv_name, hv_params in self.new_hvparams.items():
1226
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1227
            (self.op.enabled_hypervisors and
1228
             hv_name in self.op.enabled_hypervisors)):
1229
          # either this is a new hypervisor, or its parameters have changed
1230
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1231
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1232
          hv_class.CheckParameterSyntax(hv_params)
1233
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1234

    
1235
    self._CheckDiskTemplateConsistency()
1236

    
1237
    if self.op.os_hvp:
1238
      # no need to check any newly-enabled hypervisors, since the
1239
      # defaults have already been checked in the above code-block
1240
      for os_name, os_hvp in self.new_os_hvp.items():
1241
        for hv_name, hv_params in os_hvp.items():
1242
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1243
          # we need to fill in the new os_hvp on top of the actual hv_p
1244
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1245
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1246
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1247
          hv_class.CheckParameterSyntax(new_osp)
1248
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1249

    
1250
    if self.op.default_iallocator:
1251
      alloc_script = utils.FindFile(self.op.default_iallocator,
1252
                                    constants.IALLOCATOR_SEARCH_PATH,
1253
                                    os.path.isfile)
1254
      if alloc_script is None:
1255
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1256
                                   " specified" % self.op.default_iallocator,
1257
                                   errors.ECODE_INVAL)
1258

    
1259
    if self.op.instance_communication_network:
1260
      network_name = self.op.instance_communication_network
1261

    
1262
      try:
1263
        network_uuid = self.cfg.LookupNetwork(network_name)
1264
      except errors.OpPrereqError:
1265
        network_uuid = None
1266

    
1267
      if network_uuid is not None:
1268
        network = self.cfg.GetNetwork(network_uuid)
1269
        self._CheckInstanceCommunicationNetwork(network, self.LogWarning)
1270

    
1271
  def _BuildOSParams(self, cluster):
1272
    "Calculate the new OS parameters for this operation."
1273

    
1274
    def _GetNewParams(source, new_params):
1275
      "Wrapper around GetUpdatedParams."
1276
      if new_params is None:
1277
        return source
1278
      result = objects.FillDict(source, {}) # deep copy of source
1279
      for os_name in new_params:
1280
        result[os_name] = GetUpdatedParams(result.get(os_name, {}),
1281
                                           new_params[os_name],
1282
                                           use_none=True)
1283
        if not result[os_name]:
1284
          del result[os_name] # we removed all parameters
1285
      return result
1286

    
1287
    self.new_osp = _GetNewParams(cluster.osparams,
1288
                                 self.op.osparams)
1289
    self.new_osp_private = _GetNewParams(cluster.osparams_private_cluster,
1290
                                         self.op.osparams_private_cluster)
1291

    
1292
    # Remove os validity check
1293
    changed_oses = (set(self.new_osp.keys()) | set(self.new_osp_private.keys()))
1294
    for os_name in changed_oses:
1295
      os_params = cluster.SimpleFillOS(
1296
        os_name,
1297
        self.new_osp.get(os_name, {}),
1298
        os_params_private=self.new_osp_private.get(os_name, {})
1299
      )
1300
      # check the parameter validity (remote check)
1301
      CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1302
                    os_name, os_params)
1303

    
1304
  def _CheckDiskTemplateConsistency(self):
1305
    """Check whether the disk templates that are going to be disabled
1306
       are still in use by some instances.
1307

1308
    """
1309
    if self.op.enabled_disk_templates:
1310
      cluster = self.cfg.GetClusterInfo()
1311
      instances = self.cfg.GetAllInstancesInfo()
1312

    
1313
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1314
        - set(self.op.enabled_disk_templates)
1315
      for instance in instances.itervalues():
1316
        if instance.disk_template in disk_templates_to_remove:
1317
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1318
                                     " because instance '%s' is using it." %
1319
                                     (instance.disk_template, instance.name))
1320

    
1321
  def _SetVgName(self, feedback_fn):
1322
    """Determines and sets the new volume group name.
1323

1324
    """
1325
    if self.op.vg_name is not None:
1326
      new_volume = self.op.vg_name
1327
      if not new_volume:
1328
        new_volume = None
1329
      if new_volume != self.cfg.GetVGName():
1330
        self.cfg.SetVGName(new_volume)
1331
      else:
1332
        feedback_fn("Cluster LVM configuration already in desired"
1333
                    " state, not changing")
1334

    
1335
  def _SetFileStorageDir(self, feedback_fn):
1336
    """Set the file storage directory.
1337

1338
    """
1339
    if self.op.file_storage_dir is not None:
1340
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1341
        feedback_fn("Global file storage dir already set to value '%s'"
1342
                    % self.cluster.file_storage_dir)
1343
      else:
1344
        self.cluster.file_storage_dir = self.op.file_storage_dir
1345

    
1346
  def _SetDrbdHelper(self, feedback_fn):
1347
    """Set the DRBD usermode helper.
1348

1349
    """
1350
    if self.op.drbd_helper is not None:
1351
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1352
        feedback_fn("Note that you specified a drbd user helper, but did not"
1353
                    " enable the drbd disk template.")
1354
      new_helper = self.op.drbd_helper
1355
      if not new_helper:
1356
        new_helper = None
1357
      if new_helper != self.cfg.GetDRBDHelper():
1358
        self.cfg.SetDRBDHelper(new_helper)
1359
      else:
1360
        feedback_fn("Cluster DRBD helper already in desired state,"
1361
                    " not changing")
1362

    
1363
  def Exec(self, feedback_fn):
1364
    """Change the parameters of the cluster.
1365

1366
    """
1367
    if self.op.enabled_disk_templates:
1368
      self.cluster.enabled_disk_templates = \
1369
        list(self.op.enabled_disk_templates)
1370

    
1371
    self._SetVgName(feedback_fn)
1372
    self._SetFileStorageDir(feedback_fn)
1373
    self._SetDrbdHelper(feedback_fn)
1374

    
1375
    if self.op.hvparams:
1376
      self.cluster.hvparams = self.new_hvparams
1377
    if self.op.os_hvp:
1378
      self.cluster.os_hvp = self.new_os_hvp
1379
    if self.op.enabled_hypervisors is not None:
1380
      self.cluster.hvparams = self.new_hvparams
1381
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1382
    if self.op.beparams:
1383
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1384
    if self.op.nicparams:
1385
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1386
    if self.op.ipolicy:
1387
      self.cluster.ipolicy = self.new_ipolicy
1388
    if self.op.osparams:
1389
      self.cluster.osparams = self.new_osp
1390
    if self.op.osparams_private_cluster:
1391
      self.cluster.osparams_private_cluster = self.new_osp_private
1392
    if self.op.ndparams:
1393
      self.cluster.ndparams = self.new_ndparams
1394
    if self.op.diskparams:
1395
      self.cluster.diskparams = self.new_diskparams
1396
    if self.op.hv_state:
1397
      self.cluster.hv_state_static = self.new_hv_state
1398
    if self.op.disk_state:
1399
      self.cluster.disk_state_static = self.new_disk_state
1400

    
1401
    if self.op.candidate_pool_size is not None:
1402
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1403
      # we need to update the pool size here, otherwise the save will fail
1404
      AdjustCandidatePool(self, [], feedback_fn)
1405

    
1406
    if self.op.max_running_jobs is not None:
1407
      self.cluster.max_running_jobs = self.op.max_running_jobs
1408

    
1409
    if self.op.maintain_node_health is not None:
1410
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1411
        feedback_fn("Note: CONFD was disabled at build time, node health"
1412
                    " maintenance is not useful (still enabling it)")
1413
      self.cluster.maintain_node_health = self.op.maintain_node_health
1414

    
1415
    if self.op.modify_etc_hosts is not None:
1416
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1417

    
1418
    if self.op.prealloc_wipe_disks is not None:
1419
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1420

    
1421
    if self.op.add_uids is not None:
1422
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1423

    
1424
    if self.op.remove_uids is not None:
1425
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1426

    
1427
    if self.op.uid_pool is not None:
1428
      self.cluster.uid_pool = self.op.uid_pool
1429

    
1430
    if self.op.default_iallocator is not None:
1431
      self.cluster.default_iallocator = self.op.default_iallocator
1432

    
1433
    if self.op.default_iallocator_params is not None:
1434
      self.cluster.default_iallocator_params = self.op.default_iallocator_params
1435

    
1436
    if self.op.reserved_lvs is not None:
1437
      self.cluster.reserved_lvs = self.op.reserved_lvs
1438

    
1439
    if self.op.use_external_mip_script is not None:
1440
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1441

    
1442
    def helper_os(aname, mods, desc):
1443
      desc += " OS list"
1444
      lst = getattr(self.cluster, aname)
1445
      for key, val in mods:
1446
        if key == constants.DDM_ADD:
1447
          if val in lst:
1448
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1449
          else:
1450
            lst.append(val)
1451
        elif key == constants.DDM_REMOVE:
1452
          if val in lst:
1453
            lst.remove(val)
1454
          else:
1455
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1456
        else:
1457
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1458

    
1459
    if self.op.hidden_os:
1460
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1461

    
1462
    if self.op.blacklisted_os:
1463
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1464

    
1465
    if self.op.master_netdev:
1466
      master_params = self.cfg.GetMasterNetworkParameters()
1467
      ems = self.cfg.GetUseExternalMipScript()
1468
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1469
                  self.cluster.master_netdev)
1470
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1471
                                                       master_params, ems)
1472
      if not self.op.force:
1473
        result.Raise("Could not disable the master ip")
1474
      else:
1475
        if result.fail_msg:
1476
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1477
                 result.fail_msg)
1478
          feedback_fn(msg)
1479
      feedback_fn("Changing master_netdev from %s to %s" %
1480
                  (master_params.netdev, self.op.master_netdev))
1481
      self.cluster.master_netdev = self.op.master_netdev
1482

    
1483
    if self.op.master_netmask:
1484
      master_params = self.cfg.GetMasterNetworkParameters()
1485
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1486
      result = self.rpc.call_node_change_master_netmask(
1487
                 master_params.uuid, master_params.netmask,
1488
                 self.op.master_netmask, master_params.ip,
1489
                 master_params.netdev)
1490
      result.Warn("Could not change the master IP netmask", feedback_fn)
1491
      self.cluster.master_netmask = self.op.master_netmask
1492

    
1493
    self.cfg.Update(self.cluster, feedback_fn)
1494

    
1495
    if self.op.master_netdev:
1496
      master_params = self.cfg.GetMasterNetworkParameters()
1497
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1498
                  self.op.master_netdev)
1499
      ems = self.cfg.GetUseExternalMipScript()
1500
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1501
                                                     master_params, ems)
1502
      result.Warn("Could not re-enable the master ip on the master,"
1503
                  " please restart manually", self.LogWarning)
1504

    
1505

    
1506
class LUClusterVerify(NoHooksLU):
1507
  """Submits all jobs necessary to verify the cluster.
1508

1509
  """
1510
  REQ_BGL = False
1511

    
1512
  def ExpandNames(self):
1513
    self.needed_locks = {}
1514

    
1515
  def Exec(self, feedback_fn):
1516
    jobs = []
1517

    
1518
    if self.op.group_name:
1519
      groups = [self.op.group_name]
1520
      depends_fn = lambda: None
1521
    else:
1522
      groups = self.cfg.GetNodeGroupList()
1523

    
1524
      # Verify global configuration
1525
      jobs.append([
1526
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1527
        ])
1528

    
1529
      # Always depend on global verification
1530
      depends_fn = lambda: [(-len(jobs), [])]
1531

    
1532
    jobs.extend(
1533
      [opcodes.OpClusterVerifyGroup(group_name=group,
1534
                                    ignore_errors=self.op.ignore_errors,
1535
                                    depends=depends_fn())]
1536
      for group in groups)
1537

    
1538
    # Fix up all parameters
1539
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1540
      op.debug_simulate_errors = self.op.debug_simulate_errors
1541
      op.verbose = self.op.verbose
1542
      op.error_codes = self.op.error_codes
1543
      try:
1544
        op.skip_checks = self.op.skip_checks
1545
      except AttributeError:
1546
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1547

    
1548
    return ResultWithJobs(jobs)
1549

    
1550

    
1551
class _VerifyErrors(object):
1552
  """Mix-in for cluster/group verify LUs.
1553

1554
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1555
  self.op and self._feedback_fn to be available.)
1556

1557
  """
1558

    
1559
  ETYPE_FIELD = "code"
1560
  ETYPE_ERROR = constants.CV_ERROR
1561
  ETYPE_WARNING = constants.CV_WARNING
1562

    
1563
  def _Error(self, ecode, item, msg, *args, **kwargs):
1564
    """Format an error message.
1565

1566
    Based on the opcode's error_codes parameter, either format a
1567
    parseable error code, or a simpler error string.
1568

1569
    This must be called only from Exec and functions called from Exec.
1570

1571
    """
1572
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1573
    itype, etxt, _ = ecode
1574
    # If the error code is in the list of ignored errors, demote the error to a
1575
    # warning
1576
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1577
      ltype = self.ETYPE_WARNING
1578
    # first complete the msg
1579
    if args:
1580
      msg = msg % args
1581
    # then format the whole message
1582
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1583
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1584
    else:
1585
      if item:
1586
        item = " " + item
1587
      else:
1588
        item = ""
1589
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1590
    # and finally report it via the feedback_fn
1591
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1592
    # do not mark the operation as failed for WARN cases only
1593
    if ltype == self.ETYPE_ERROR:
1594
      self.bad = True
1595

    
1596
  def _ErrorIf(self, cond, *args, **kwargs):
1597
    """Log an error message if the passed condition is True.
1598

1599
    """
1600
    if (bool(cond)
1601
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1602
      self._Error(*args, **kwargs)
1603

    
1604

    
1605
def _GetAllHypervisorParameters(cluster, instances):
1606
  """Compute the set of all hypervisor parameters.
1607

1608
  @type cluster: L{objects.Cluster}
1609
  @param cluster: the cluster object
1610
  @param instances: list of L{objects.Instance}
1611
  @param instances: additional instances from which to obtain parameters
1612
  @rtype: list of (origin, hypervisor, parameters)
1613
  @return: a list with all parameters found, indicating the hypervisor they
1614
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1615

1616
  """
1617
  hvp_data = []
1618

    
1619
  for hv_name in cluster.enabled_hypervisors:
1620
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1621

    
1622
  for os_name, os_hvp in cluster.os_hvp.items():
1623
    for hv_name, hv_params in os_hvp.items():
1624
      if hv_params:
1625
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1626
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1627

    
1628
  # TODO: collapse identical parameter values in a single one
1629
  for instance in instances:
1630
    if instance.hvparams:
1631
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1632
                       cluster.FillHV(instance)))
1633

    
1634
  return hvp_data
1635

    
1636

    
1637
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1638
  """Verifies the cluster config.
1639

1640
  """
1641
  REQ_BGL = False
1642

    
1643
  def _VerifyHVP(self, hvp_data):
1644
    """Verifies locally the syntax of the hypervisor parameters.
1645

1646
    """
1647
    for item, hv_name, hv_params in hvp_data:
1648
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1649
             (item, hv_name))
1650
      try:
1651
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1652
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1653
        hv_class.CheckParameterSyntax(hv_params)
1654
      except errors.GenericError, err:
1655
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1656

    
1657
  def ExpandNames(self):
1658
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1659
    self.share_locks = ShareAll()
1660

    
1661
  def CheckPrereq(self):
1662
    """Check prerequisites.
1663

1664
    """
1665
    # Retrieve all information
1666
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1667
    self.all_node_info = self.cfg.GetAllNodesInfo()
1668
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1669

    
1670
  def Exec(self, feedback_fn):
1671
    """Verify integrity of cluster, performing various test on nodes.
1672

1673
    """
1674
    self.bad = False
1675
    self._feedback_fn = feedback_fn
1676

    
1677
    feedback_fn("* Verifying cluster config")
1678

    
1679
    for msg in self.cfg.VerifyConfig():
1680
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1681

    
1682
    feedback_fn("* Verifying cluster certificate files")
1683

    
1684
    for cert_filename in pathutils.ALL_CERT_FILES:
1685
      (errcode, msg) = utils.VerifyCertificate(cert_filename)
1686
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1687

    
1688
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1689
                                    pathutils.NODED_CERT_FILE),
1690
                  constants.CV_ECLUSTERCERT,
1691
                  None,
1692
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1693
                    constants.LUXID_USER + " user")
1694

    
1695
    feedback_fn("* Verifying hypervisor parameters")
1696

    
1697
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1698
                                                self.all_inst_info.values()))
1699

    
1700
    feedback_fn("* Verifying all nodes belong to an existing group")
1701

    
1702
    # We do this verification here because, should this bogus circumstance
1703
    # occur, it would never be caught by VerifyGroup, which only acts on
1704
    # nodes/instances reachable from existing node groups.
1705

    
1706
    dangling_nodes = set(node for node in self.all_node_info.values()
1707
                         if node.group not in self.all_group_info)
1708

    
1709
    dangling_instances = {}
1710
    no_node_instances = []
1711

    
1712
    for inst in self.all_inst_info.values():
1713
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1714
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1715
      elif inst.primary_node not in self.all_node_info:
1716
        no_node_instances.append(inst)
1717

    
1718
    pretty_dangling = [
1719
        "%s (%s)" %
1720
        (node.name,
1721
         utils.CommaJoin(inst.name for
1722
                         inst in dangling_instances.get(node.uuid, [])))
1723
        for node in dangling_nodes]
1724

    
1725
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1726
                  None,
1727
                  "the following nodes (and their instances) belong to a non"
1728
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1729

    
1730
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1731
                  None,
1732
                  "the following instances have a non-existing primary-node:"
1733
                  " %s", utils.CommaJoin(inst.name for
1734
                                         inst in no_node_instances))
1735

    
1736
    return not self.bad
1737

    
1738

    
1739
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1740
  """Verifies the status of a node group.
1741

1742
  """
1743
  HPATH = "cluster-verify"
1744
  HTYPE = constants.HTYPE_CLUSTER
1745
  REQ_BGL = False
1746

    
1747
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1748

    
1749
  class NodeImage(object):
1750
    """A class representing the logical and physical status of a node.
1751

1752
    @type uuid: string
1753
    @ivar uuid: the node UUID to which this object refers
1754
    @ivar volumes: a structure as returned from
1755
        L{ganeti.backend.GetVolumeList} (runtime)
1756
    @ivar instances: a list of running instances (runtime)
1757
    @ivar pinst: list of configured primary instances (config)
1758
    @ivar sinst: list of configured secondary instances (config)
1759
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1760
        instances for which this node is secondary (config)
1761
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1762
    @ivar dfree: free disk, as reported by the node (runtime)
1763
    @ivar offline: the offline status (config)
1764
    @type rpc_fail: boolean
1765
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1766
        not whether the individual keys were correct) (runtime)
1767
    @type lvm_fail: boolean
1768
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1769
    @type hyp_fail: boolean
1770
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1771
    @type ghost: boolean
1772
    @ivar ghost: whether this is a known node or not (config)
1773
    @type os_fail: boolean
1774
    @ivar os_fail: whether the RPC call didn't return valid OS data
1775
    @type oslist: list
1776
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1777
    @type vm_capable: boolean
1778
    @ivar vm_capable: whether the node can host instances
1779
    @type pv_min: float
1780
    @ivar pv_min: size in MiB of the smallest PVs
1781
    @type pv_max: float
1782
    @ivar pv_max: size in MiB of the biggest PVs
1783

1784
    """
1785
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1786
      self.uuid = uuid
1787
      self.volumes = {}
1788
      self.instances = []
1789
      self.pinst = []
1790
      self.sinst = []
1791
      self.sbp = {}
1792
      self.mfree = 0
1793
      self.dfree = 0
1794
      self.offline = offline
1795
      self.vm_capable = vm_capable
1796
      self.rpc_fail = False
1797
      self.lvm_fail = False
1798
      self.hyp_fail = False
1799
      self.ghost = False
1800
      self.os_fail = False
1801
      self.oslist = {}
1802
      self.pv_min = None
1803
      self.pv_max = None
1804

    
1805
  def ExpandNames(self):
1806
    # This raises errors.OpPrereqError on its own:
1807
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1808

    
1809
    # Get instances in node group; this is unsafe and needs verification later
1810
    inst_uuids = \
1811
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1812

    
1813
    self.needed_locks = {
1814
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1815
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1816
      locking.LEVEL_NODE: [],
1817

    
1818
      # This opcode is run by watcher every five minutes and acquires all nodes
1819
      # for a group. It doesn't run for a long time, so it's better to acquire
1820
      # the node allocation lock as well.
1821
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1822
      }
1823

    
1824
    self.share_locks = ShareAll()
1825

    
1826
  def DeclareLocks(self, level):
1827
    if level == locking.LEVEL_NODE:
1828
      # Get members of node group; this is unsafe and needs verification later
1829
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1830

    
1831
      # In Exec(), we warn about mirrored instances that have primary and
1832
      # secondary living in separate node groups. To fully verify that
1833
      # volumes for these instances are healthy, we will need to do an
1834
      # extra call to their secondaries. We ensure here those nodes will
1835
      # be locked.
1836
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1837
        # Important: access only the instances whose lock is owned
1838
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1839
        if instance.disk_template in constants.DTS_INT_MIRROR:
1840
          nodes.update(instance.secondary_nodes)
1841

    
1842
      self.needed_locks[locking.LEVEL_NODE] = nodes
1843

    
1844
  def CheckPrereq(self):
1845
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1846
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1847

    
1848
    group_node_uuids = set(self.group_info.members)
1849
    group_inst_uuids = \
1850
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1851

    
1852
    unlocked_node_uuids = \
1853
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1854

    
1855
    unlocked_inst_uuids = \
1856
        group_inst_uuids.difference(
1857
          [self.cfg.GetInstanceInfoByName(name).uuid
1858
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1859

    
1860
    if unlocked_node_uuids:
1861
      raise errors.OpPrereqError(
1862
        "Missing lock for nodes: %s" %
1863
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1864
        errors.ECODE_STATE)
1865

    
1866
    if unlocked_inst_uuids:
1867
      raise errors.OpPrereqError(
1868
        "Missing lock for instances: %s" %
1869
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1870
        errors.ECODE_STATE)
1871

    
1872
    self.all_node_info = self.cfg.GetAllNodesInfo()
1873
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1874

    
1875
    self.my_node_uuids = group_node_uuids
1876
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1877
                             for node_uuid in group_node_uuids)
1878

    
1879
    self.my_inst_uuids = group_inst_uuids
1880
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1881
                             for inst_uuid in group_inst_uuids)
1882

    
1883
    # We detect here the nodes that will need the extra RPC calls for verifying
1884
    # split LV volumes; they should be locked.
1885
    extra_lv_nodes = set()
1886

    
1887
    for inst in self.my_inst_info.values():
1888
      if inst.disk_template in constants.DTS_INT_MIRROR:
1889
        for nuuid in inst.all_nodes:
1890
          if self.all_node_info[nuuid].group != self.group_uuid:
1891
            extra_lv_nodes.add(nuuid)
1892

    
1893
    unlocked_lv_nodes = \
1894
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1895

    
1896
    if unlocked_lv_nodes:
1897
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1898
                                 utils.CommaJoin(unlocked_lv_nodes),
1899
                                 errors.ECODE_STATE)
1900
    self.extra_lv_nodes = list(extra_lv_nodes)
1901

    
1902
  def _VerifyNode(self, ninfo, nresult):
1903
    """Perform some basic validation on data returned from a node.
1904

1905
      - check the result data structure is well formed and has all the
1906
        mandatory fields
1907
      - check ganeti version
1908

1909
    @type ninfo: L{objects.Node}
1910
    @param ninfo: the node to check
1911
    @param nresult: the results from the node
1912
    @rtype: boolean
1913
    @return: whether overall this call was successful (and we can expect
1914
         reasonable values in the respose)
1915

1916
    """
1917
    # main result, nresult should be a non-empty dict
1918
    test = not nresult or not isinstance(nresult, dict)
1919
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1920
                  "unable to verify node: no data returned")
1921
    if test:
1922
      return False
1923

    
1924
    # compares ganeti version
1925
    local_version = constants.PROTOCOL_VERSION
1926
    remote_version = nresult.get("version", None)
1927
    test = not (remote_version and
1928
                isinstance(remote_version, (list, tuple)) and
1929
                len(remote_version) == 2)
1930
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1931
                  "connection to node returned invalid data")
1932
    if test:
1933
      return False
1934

    
1935
    test = local_version != remote_version[0]
1936
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1937
                  "incompatible protocol versions: master %s,"
1938
                  " node %s", local_version, remote_version[0])
1939
    if test:
1940
      return False
1941

    
1942
    # node seems compatible, we can actually try to look into its results
1943

    
1944
    # full package version
1945
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1946
                  constants.CV_ENODEVERSION, ninfo.name,
1947
                  "software version mismatch: master %s, node %s",
1948
                  constants.RELEASE_VERSION, remote_version[1],
1949
                  code=self.ETYPE_WARNING)
1950

    
1951
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1952
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1953
      for hv_name, hv_result in hyp_result.iteritems():
1954
        test = hv_result is not None
1955
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1956
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1957

    
1958
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1959
    if ninfo.vm_capable and isinstance(hvp_result, list):
1960
      for item, hv_name, hv_result in hvp_result:
1961
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1962
                      "hypervisor %s parameter verify failure (source %s): %s",
1963
                      hv_name, item, hv_result)
1964

    
1965
    test = nresult.get(constants.NV_NODESETUP,
1966
                       ["Missing NODESETUP results"])
1967
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1968
                  "node setup error: %s", "; ".join(test))
1969

    
1970
    return True
1971

    
1972
  def _VerifyNodeTime(self, ninfo, nresult,
1973
                      nvinfo_starttime, nvinfo_endtime):
1974
    """Check the node time.
1975

1976
    @type ninfo: L{objects.Node}
1977
    @param ninfo: the node to check
1978
    @param nresult: the remote results for the node
1979
    @param nvinfo_starttime: the start time of the RPC call
1980
    @param nvinfo_endtime: the end time of the RPC call
1981

1982
    """
1983
    ntime = nresult.get(constants.NV_TIME, None)
1984
    try:
1985
      ntime_merged = utils.MergeTime(ntime)
1986
    except (ValueError, TypeError):
1987
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1988
                    "Node returned invalid time")
1989
      return
1990

    
1991
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1992
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1993
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1994
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1995
    else:
1996
      ntime_diff = None
1997

    
1998
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1999
                  "Node time diverges by at least %s from master node time",
2000
                  ntime_diff)
2001

    
2002
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
2003
    """Check the node LVM results and update info for cross-node checks.
2004

2005
    @type ninfo: L{objects.Node}
2006
    @param ninfo: the node to check
2007
    @param nresult: the remote results for the node
2008
    @param vg_name: the configured VG name
2009
    @type nimg: L{NodeImage}
2010
    @param nimg: node image
2011

2012
    """
2013
    if vg_name is None:
2014
      return
2015

    
2016
    # checks vg existence and size > 20G
2017
    vglist = nresult.get(constants.NV_VGLIST, None)
2018
    test = not vglist
2019
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2020
                  "unable to check volume groups")
2021
    if not test:
2022
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
2023
                                            constants.MIN_VG_SIZE)
2024
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
2025

    
2026
    # Check PVs
2027
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
2028
    for em in errmsgs:
2029
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
2030
    if pvminmax is not None:
2031
      (nimg.pv_min, nimg.pv_max) = pvminmax
2032

    
2033
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
2034
    """Check cross-node DRBD version consistency.
2035

2036
    @type node_verify_infos: dict
2037
    @param node_verify_infos: infos about nodes as returned from the
2038
      node_verify call.
2039

2040
    """
2041
    node_versions = {}
2042
    for node_uuid, ndata in node_verify_infos.items():
2043
      nresult = ndata.payload
2044
      if nresult:
2045
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
2046
        node_versions[node_uuid] = version
2047

    
2048
    if len(set(node_versions.values())) > 1:
2049
      for node_uuid, version in sorted(node_versions.items()):
2050
        msg = "DRBD version mismatch: %s" % version
2051
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
2052
                    code=self.ETYPE_WARNING)
2053

    
2054
  def _VerifyGroupLVM(self, node_image, vg_name):
2055
    """Check cross-node consistency in LVM.
2056

2057
    @type node_image: dict
2058
    @param node_image: info about nodes, mapping from node to names to
2059
      L{NodeImage} objects
2060
    @param vg_name: the configured VG name
2061

2062
    """
2063
    if vg_name is None:
2064
      return
2065

    
2066
    # Only exclusive storage needs this kind of checks
2067
    if not self._exclusive_storage:
2068
      return
2069

    
2070
    # exclusive_storage wants all PVs to have the same size (approximately),
2071
    # if the smallest and the biggest ones are okay, everything is fine.
2072
    # pv_min is None iff pv_max is None
2073
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
2074
    if not vals:
2075
      return
2076
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
2077
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
2078
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
2079
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
2080
                  "PV sizes differ too much in the group; smallest (%s MB) is"
2081
                  " on %s, biggest (%s MB) is on %s",
2082
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
2083
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
2084

    
2085
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
2086
    """Check the node bridges.
2087

2088
    @type ninfo: L{objects.Node}
2089
    @param ninfo: the node to check
2090
    @param nresult: the remote results for the node
2091
    @param bridges: the expected list of bridges
2092

2093
    """
2094
    if not bridges:
2095
      return
2096

    
2097
    missing = nresult.get(constants.NV_BRIDGES, None)
2098
    test = not isinstance(missing, list)
2099
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2100
                  "did not return valid bridge information")
2101
    if not test:
2102
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
2103
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
2104

    
2105
  def _VerifyNodeUserScripts(self, ninfo, nresult):
2106
    """Check the results of user scripts presence and executability on the node
2107

2108
    @type ninfo: L{objects.Node}
2109
    @param ninfo: the node to check
2110
    @param nresult: the remote results for the node
2111

2112
    """
2113
    test = not constants.NV_USERSCRIPTS in nresult
2114
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2115
                  "did not return user scripts information")
2116

    
2117
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2118
    if not test:
2119
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2120
                    "user scripts not present or not executable: %s" %
2121
                    utils.CommaJoin(sorted(broken_scripts)))
2122

    
2123
  def _VerifyNodeNetwork(self, ninfo, nresult):
2124
    """Check the node network connectivity results.
2125

2126
    @type ninfo: L{objects.Node}
2127
    @param ninfo: the node to check
2128
    @param nresult: the remote results for the node
2129

2130
    """
2131
    test = constants.NV_NODELIST not in nresult
2132
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2133
                  "node hasn't returned node ssh connectivity data")
2134
    if not test:
2135
      if nresult[constants.NV_NODELIST]:
2136
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2137
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2138
                        "ssh communication with node '%s': %s", a_node, a_msg)
2139

    
2140
    test = constants.NV_NODENETTEST not in nresult
2141
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2142
                  "node hasn't returned node tcp connectivity data")
2143
    if not test:
2144
      if nresult[constants.NV_NODENETTEST]:
2145
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2146
        for anode in nlist:
2147
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2148
                        "tcp communication with node '%s': %s",
2149
                        anode, nresult[constants.NV_NODENETTEST][anode])
2150

    
2151
    test = constants.NV_MASTERIP not in nresult
2152
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2153
                  "node hasn't returned node master IP reachability data")
2154
    if not test:
2155
      if not nresult[constants.NV_MASTERIP]:
2156
        if ninfo.uuid == self.master_node:
2157
          msg = "the master node cannot reach the master IP (not configured?)"
2158
        else:
2159
          msg = "cannot reach the master IP"
2160
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2161

    
2162
  def _VerifyInstance(self, instance, node_image, diskstatus):
2163
    """Verify an instance.
2164

2165
    This function checks to see if the required block devices are
2166
    available on the instance's node, and that the nodes are in the correct
2167
    state.
2168

2169
    """
2170
    pnode_uuid = instance.primary_node
2171
    pnode_img = node_image[pnode_uuid]
2172
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2173

    
2174
    node_vol_should = {}
2175
    instance.MapLVsByNode(node_vol_should)
2176

    
2177
    cluster = self.cfg.GetClusterInfo()
2178
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2179
                                                            self.group_info)
2180
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2181
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2182
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2183

    
2184
    for node_uuid in node_vol_should:
2185
      n_img = node_image[node_uuid]
2186
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2187
        # ignore missing volumes on offline or broken nodes
2188
        continue
2189
      for volume in node_vol_should[node_uuid]:
2190
        test = volume not in n_img.volumes
2191
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2192
                      "volume %s missing on node %s", volume,
2193
                      self.cfg.GetNodeName(node_uuid))
2194

    
2195
    if instance.admin_state == constants.ADMINST_UP:
2196
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2197
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2198
                    "instance not running on its primary node %s",
2199
                     self.cfg.GetNodeName(pnode_uuid))
2200
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2201
                    instance.name, "instance is marked as running and lives on"
2202
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2203

    
2204
    diskdata = [(nname, success, status, idx)
2205
                for (nname, disks) in diskstatus.items()
2206
                for idx, (success, status) in enumerate(disks)]
2207

    
2208
    for nname, success, bdev_status, idx in diskdata:
2209
      # the 'ghost node' construction in Exec() ensures that we have a
2210
      # node here
2211
      snode = node_image[nname]
2212
      bad_snode = snode.ghost or snode.offline
2213
      self._ErrorIf(instance.disks_active and
2214
                    not success and not bad_snode,
2215
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2216
                    "couldn't retrieve status for disk/%s on %s: %s",
2217
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2218

    
2219
      if instance.disks_active and success and \
2220
         (bdev_status.is_degraded or
2221
          bdev_status.ldisk_status != constants.LDS_OKAY):
2222
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2223
        if bdev_status.is_degraded:
2224
          msg += " is degraded"
2225
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2226
          msg += "; state is '%s'" % \
2227
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2228

    
2229
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2230

    
2231
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2232
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2233
                  "instance %s, connection to primary node failed",
2234
                  instance.name)
2235

    
2236
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2237
                  constants.CV_EINSTANCELAYOUT, instance.name,
2238
                  "instance has multiple secondary nodes: %s",
2239
                  utils.CommaJoin(instance.secondary_nodes),
2240
                  code=self.ETYPE_WARNING)
2241

    
2242
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2243
    if any(es_flags.values()):
2244
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2245
        # Disk template not compatible with exclusive_storage: no instance
2246
        # node should have the flag set
2247
        es_nodes = [n
2248
                    for (n, es) in es_flags.items()
2249
                    if es]
2250
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2251
                    "instance has template %s, which is not supported on nodes"
2252
                    " that have exclusive storage set: %s",
2253
                    instance.disk_template,
2254
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2255
      for (idx, disk) in enumerate(instance.disks):
2256
        self._ErrorIf(disk.spindles is None,
2257
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2258
                      "number of spindles not configured for disk %s while"
2259
                      " exclusive storage is enabled, try running"
2260
                      " gnt-cluster repair-disk-sizes", idx)
2261

    
2262
    if instance.disk_template in constants.DTS_INT_MIRROR:
2263
      instance_nodes = utils.NiceSort(instance.all_nodes)
2264
      instance_groups = {}
2265

    
2266
      for node_uuid in instance_nodes:
2267
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2268
                                   []).append(node_uuid)
2269

    
2270
      pretty_list = [
2271
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2272
                           groupinfo[group].name)
2273
        # Sort so that we always list the primary node first.
2274
        for group, nodes in sorted(instance_groups.items(),
2275
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2276
                                   reverse=True)]
2277

    
2278
      self._ErrorIf(len(instance_groups) > 1,
2279
                    constants.CV_EINSTANCESPLITGROUPS,
2280
                    instance.name, "instance has primary and secondary nodes in"
2281
                    " different groups: %s", utils.CommaJoin(pretty_list),
2282
                    code=self.ETYPE_WARNING)
2283

    
2284
    inst_nodes_offline = []
2285
    for snode in instance.secondary_nodes:
2286
      s_img = node_image[snode]
2287
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2288
                    self.cfg.GetNodeName(snode),
2289
                    "instance %s, connection to secondary node failed",
2290
                    instance.name)
2291

    
2292
      if s_img.offline:
2293
        inst_nodes_offline.append(snode)
2294

    
2295
    # warn that the instance lives on offline nodes
2296
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2297
                  instance.name, "instance has offline secondary node(s) %s",
2298
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2299
    # ... or ghost/non-vm_capable nodes
2300
    for node_uuid in instance.all_nodes:
2301
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2302
                    instance.name, "instance lives on ghost node %s",
2303
                    self.cfg.GetNodeName(node_uuid))
2304
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2305
                    constants.CV_EINSTANCEBADNODE, instance.name,
2306
                    "instance lives on non-vm_capable node %s",
2307
                    self.cfg.GetNodeName(node_uuid))
2308

    
2309
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2310
    """Verify if there are any unknown volumes in the cluster.
2311

2312
    The .os, .swap and backup volumes are ignored. All other volumes are
2313
    reported as unknown.
2314

2315
    @type reserved: L{ganeti.utils.FieldSet}
2316
    @param reserved: a FieldSet of reserved volume names
2317

2318
    """
2319
    for node_uuid, n_img in node_image.items():
2320
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2321
          self.all_node_info[node_uuid].group != self.group_uuid):
2322
        # skip non-healthy nodes
2323
        continue
2324
      for volume in n_img.volumes:
2325
        test = ((node_uuid not in node_vol_should or
2326
                volume not in node_vol_should[node_uuid]) and
2327
                not reserved.Matches(volume))
2328
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2329
                      self.cfg.GetNodeName(node_uuid),
2330
                      "volume %s is unknown", volume,
2331
                      code=_VerifyErrors.ETYPE_WARNING)
2332

    
2333
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2334
    """Verify N+1 Memory Resilience.
2335

2336
    Check that if one single node dies we can still start all the
2337
    instances it was primary for.
2338

2339
    """
2340
    cluster_info = self.cfg.GetClusterInfo()
2341
    for node_uuid, n_img in node_image.items():
2342
      # This code checks that every node which is now listed as
2343
      # secondary has enough memory to host all instances it is
2344
      # supposed to should a single other node in the cluster fail.
2345
      # FIXME: not ready for failover to an arbitrary node
2346
      # FIXME: does not support file-backed instances
2347
      # WARNING: we currently take into account down instances as well
2348
      # as up ones, considering that even if they're down someone
2349
      # might want to start them even in the event of a node failure.
2350
      if n_img.offline or \
2351
         self.all_node_info[node_uuid].group != self.group_uuid:
2352
        # we're skipping nodes marked offline and nodes in other groups from
2353
        # the N+1 warning, since most likely we don't have good memory
2354
        # information from them; we already list instances living on such
2355
        # nodes, and that's enough warning
2356
        continue
2357
      #TODO(dynmem): also consider ballooning out other instances
2358
      for prinode, inst_uuids in n_img.sbp.items():
2359
        needed_mem = 0
2360
        for inst_uuid in inst_uuids:
2361
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2362
          if bep[constants.BE_AUTO_BALANCE]:
2363
            needed_mem += bep[constants.BE_MINMEM]
2364
        test = n_img.mfree < needed_mem
2365
        self._ErrorIf(test, constants.CV_ENODEN1,
2366
                      self.cfg.GetNodeName(node_uuid),
2367
                      "not enough memory to accomodate instance failovers"
2368
                      " should node %s fail (%dMiB needed, %dMiB available)",
2369
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2370

    
2371
  def _VerifyClientCertificates(self, nodes, all_nvinfo):
2372
    """Verifies the consistency of the client certificates.
2373

2374
    This includes several aspects:
2375
      - the individual validation of all nodes' certificates
2376
      - the consistency of the master candidate certificate map
2377
      - the consistency of the master candidate certificate map with the
2378
        certificates that the master candidates are actually using.
2379

2380
    @param nodes: the list of nodes to consider in this verification
2381
    @param all_nvinfo: the map of results of the verify_node call to
2382
      all nodes
2383

2384
    """
2385
    candidate_certs = self.cfg.GetClusterInfo().candidate_certs
2386
    if candidate_certs is None or len(candidate_certs) == 0:
2387
      self._ErrorIf(
2388
        True, constants.CV_ECLUSTERCLIENTCERT, None,
2389
        "The cluster's list of master candidate certificates is empty."
2390
        "If you just updated the cluster, please run"
2391
        " 'gnt-cluster renew-crypto --new-node-certificates'.")
2392
      return
2393

    
2394
    self._ErrorIf(
2395
      len(candidate_certs) != len(set(candidate_certs.values())),
2396
      constants.CV_ECLUSTERCLIENTCERT, None,
2397
      "There are at least two master candidates configured to use the same"
2398
      " certificate.")
2399

    
2400
    # collect the client certificate
2401
    for node in nodes:
2402
      if node.offline:
2403
        continue
2404

    
2405
      nresult = all_nvinfo[node.uuid]
2406
      if nresult.fail_msg or not nresult.payload:
2407
        continue
2408

    
2409
      (errcode, msg) = nresult.payload.get(constants.NV_CLIENT_CERT, None)
2410

    
2411
      self._ErrorIf(
2412
        errcode is not None, constants.CV_ECLUSTERCLIENTCERT, None,
2413
        "Client certificate of node '%s' failed validation: %s (code '%s')",
2414
        node.uuid, msg, errcode)
2415

    
2416
      if not errcode:
2417
        digest = msg
2418
        if node.master_candidate:
2419
          if node.uuid in candidate_certs:
2420
            self._ErrorIf(
2421
              digest != candidate_certs[node.uuid],
2422
              constants.CV_ECLUSTERCLIENTCERT, None,
2423
              "Client certificate digest of master candidate '%s' does not"
2424
              " match its entry in the cluster's map of master candidate"
2425
              " certificates. Expected: %s Got: %s", node.uuid,
2426
              digest, candidate_certs[node.uuid])
2427
          else:
2428
            self._ErrorIf(
2429
              True, constants.CV_ECLUSTERCLIENTCERT, None,
2430
              "The master candidate '%s' does not have an entry in the"
2431
              " map of candidate certificates.", node.uuid)
2432
            self._ErrorIf(
2433
              digest in candidate_certs.values(),
2434
              constants.CV_ECLUSTERCLIENTCERT, None,
2435
              "Master candidate '%s' is using a certificate of another node.",
2436
              node.uuid)
2437
        else:
2438
          self._ErrorIf(
2439
            node.uuid in candidate_certs,
2440
            constants.CV_ECLUSTERCLIENTCERT, None,
2441
            "Node '%s' is not a master candidate, but still listed in the"
2442
            " map of master candidate certificates.", node.uuid)
2443
          self._ErrorIf(
2444
            (node.uuid not in candidate_certs) and
2445
              (digest in candidate_certs.values()),
2446
            constants.CV_ECLUSTERCLIENTCERT, None,
2447
            "Node '%s' is not a master candidate and is incorrectly using a"
2448
            " certificate of another node which is master candidate.",
2449
            node.uuid)
2450

    
2451
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2452
                   (files_all, files_opt, files_mc, files_vm)):
2453
    """Verifies file checksums collected from all nodes.
2454

2455
    @param nodes: List of L{objects.Node} objects
2456
    @param master_node_uuid: UUID of master node
2457
    @param all_nvinfo: RPC results
2458

2459
    """
2460
    # Define functions determining which nodes to consider for a file
2461
    files2nodefn = [
2462
      (files_all, None),
2463
      (files_mc, lambda node: (node.master_candidate or
2464
                               node.uuid == master_node_uuid)),
2465
      (files_vm, lambda node: node.vm_capable),
2466
      ]
2467

    
2468
    # Build mapping from filename to list of nodes which should have the file
2469
    nodefiles = {}
2470
    for (files, fn) in files2nodefn:
2471
      if fn is None:
2472
        filenodes = nodes
2473
      else:
2474
        filenodes = filter(fn, nodes)
2475
      nodefiles.update((filename,
2476
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2477
                       for filename in files)
2478

    
2479
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2480

    
2481
    fileinfo = dict((filename, {}) for filename in nodefiles)
2482
    ignore_nodes = set()
2483

    
2484
    for node in nodes:
2485
      if node.offline:
2486
        ignore_nodes.add(node.uuid)
2487
        continue
2488

    
2489
      nresult = all_nvinfo[node.uuid]
2490

    
2491
      if nresult.fail_msg or not nresult.payload:
2492
        node_files = None
2493
      else:
2494
        fingerprints = nresult.payload.get(constants.NV_FILELIST, {})
2495
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2496
                          for (key, value) in fingerprints.items())
2497
        del fingerprints
2498

    
2499
      test = not (node_files and isinstance(node_files, dict))
2500
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2501
                    "Node did not return file checksum data")
2502
      if test:
2503
        ignore_nodes.add(node.uuid)
2504
        continue
2505

    
2506
      # Build per-checksum mapping from filename to nodes having it
2507
      for (filename, checksum) in node_files.items():
2508
        assert filename in nodefiles
2509
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2510

    
2511
    for (filename, checksums) in fileinfo.items():
2512
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2513

    
2514
      # Nodes having the file
2515
      with_file = frozenset(node_uuid
2516
                            for node_uuids in fileinfo[filename].values()
2517
                            for node_uuid in node_uuids) - ignore_nodes
2518

    
2519
      expected_nodes = nodefiles[filename] - ignore_nodes
2520

    
2521
      # Nodes missing file
2522
      missing_file = expected_nodes - with_file
2523

    
2524
      if filename in files_opt:
2525
        # All or no nodes
2526
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2527
                      constants.CV_ECLUSTERFILECHECK, None,
2528
                      "File %s is optional, but it must exist on all or no"
2529
                      " nodes (not found on %s)",
2530
                      filename,
2531
                      utils.CommaJoin(
2532
                        utils.NiceSort(
2533
                          map(self.cfg.GetNodeName, missing_file))))
2534
      else:
2535
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2536
                      "File %s is missing from node(s) %s", filename,
2537
                      utils.CommaJoin(
2538
                        utils.NiceSort(
2539
                          map(self.cfg.GetNodeName, missing_file))))
2540

    
2541
        # Warn if a node has a file it shouldn't
2542
        unexpected = with_file - expected_nodes
2543
        self._ErrorIf(unexpected,
2544
                      constants.CV_ECLUSTERFILECHECK, None,
2545
                      "File %s should not exist on node(s) %s",
2546
                      filename, utils.CommaJoin(
2547
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2548

    
2549
      # See if there are multiple versions of the file
2550
      test = len(checksums) > 1
2551
      if test:
2552
        variants = ["variant %s on %s" %
2553
                    (idx + 1,
2554
                     utils.CommaJoin(utils.NiceSort(
2555
                       map(self.cfg.GetNodeName, node_uuids))))
2556
                    for (idx, (checksum, node_uuids)) in
2557
                      enumerate(sorted(checksums.items()))]
2558
      else:
2559
        variants = []
2560

    
2561
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2562
                    "File %s found with %s different checksums (%s)",
2563
                    filename, len(checksums), "; ".join(variants))
2564

    
2565
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2566
    """Verify the drbd helper.
2567

2568
    """
2569
    if drbd_helper:
2570
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2571
      test = (helper_result is None)
2572
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2573
                    "no drbd usermode helper returned")
2574
      if helper_result:
2575
        status, payload = helper_result
2576
        test = not status
2577
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2578
                      "drbd usermode helper check unsuccessful: %s", payload)
2579
        test = status and (payload != drbd_helper)
2580
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2581
                      "wrong drbd usermode helper: %s", payload)
2582

    
2583
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2584
                      drbd_map):
2585
    """Verifies and the node DRBD status.
2586

2587
    @type ninfo: L{objects.Node}
2588
    @param ninfo: the node to check
2589
    @param nresult: the remote results for the node
2590
    @param instanceinfo: the dict of instances
2591
    @param drbd_helper: the configured DRBD usermode helper
2592
    @param drbd_map: the DRBD map as returned by
2593
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2594

2595
    """
2596
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2597

    
2598
    # compute the DRBD minors
2599
    node_drbd = {}
2600
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2601
      test = inst_uuid not in instanceinfo
2602
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2603
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2604
        # ghost instance should not be running, but otherwise we
2605
        # don't give double warnings (both ghost instance and
2606
        # unallocated minor in use)
2607
      if test:
2608
        node_drbd[minor] = (inst_uuid, False)
2609
      else:
2610
        instance = instanceinfo[inst_uuid]
2611
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2612

    
2613
    # and now check them
2614
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2615
    test = not isinstance(used_minors, (tuple, list))
2616
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2617
                  "cannot parse drbd status file: %s", str(used_minors))
2618
    if test:
2619
      # we cannot check drbd status
2620
      return
2621

    
2622
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2623
      test = minor not in used_minors and must_exist
2624
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2625
                    "drbd minor %d of instance %s is not active", minor,
2626
                    self.cfg.GetInstanceName(inst_uuid))
2627
    for minor in used_minors:
2628
      test = minor not in node_drbd
2629
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2630
                    "unallocated drbd minor %d is in use", minor)
2631

    
2632
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2633
    """Builds the node OS structures.
2634

2635
    @type ninfo: L{objects.Node}
2636
    @param ninfo: the node to check
2637
    @param nresult: the remote results for the node
2638
    @param nimg: the node image object
2639

2640
    """
2641
    remote_os = nresult.get(constants.NV_OSLIST, None)
2642
    test = (not isinstance(remote_os, list) or
2643
            not compat.all(isinstance(v, list) and len(v) == 7
2644
                           for v in remote_os))
2645

    
2646
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2647
                  "node hasn't returned valid OS data")
2648

    
2649
    nimg.os_fail = test
2650

    
2651
    if test:
2652
      return
2653

    
2654
    os_dict = {}
2655

    
2656
    for (name, os_path, status, diagnose,
2657
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2658

    
2659
      if name not in os_dict:
2660
        os_dict[name] = []
2661

    
2662
      # parameters is a list of lists instead of list of tuples due to
2663
      # JSON lacking a real tuple type, fix it:
2664
      parameters = [tuple(v) for v in parameters]
2665
      os_dict[name].append((os_path, status, diagnose,
2666
                            set(variants), set(parameters), set(api_ver)))
2667

    
2668
    nimg.oslist = os_dict
2669

    
2670
  def _VerifyNodeOS(self, ninfo, nimg, base):
2671
    """Verifies the node OS list.
2672

2673
    @type ninfo: L{objects.Node}
2674
    @param ninfo: the node to check
2675
    @param nimg: the node image object
2676
    @param base: the 'template' node we match against (e.g. from the master)
2677

2678
    """
2679
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2680

    
2681
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2682
    for os_name, os_data in nimg.oslist.items():
2683
      assert os_data, "Empty OS status for OS %s?!" % os_name
2684
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2685
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2686
                    "Invalid OS %s (located at %s): %s",
2687
                    os_name, f_path, f_diag)
2688
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2689
                    "OS '%s' has multiple entries"
2690
                    " (first one shadows the rest): %s",
2691
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2692
      # comparisons with the 'base' image
2693
      test = os_name not in base.oslist
2694
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2695
                    "Extra OS %s not present on reference node (%s)",
2696
                    os_name, self.cfg.GetNodeName(base.uuid))
2697
      if test:
2698
        continue
2699
      assert base.oslist[os_name], "Base node has empty OS status?"
2700
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2701
      if not b_status:
2702
        # base OS is invalid, skipping
2703
        continue
2704
      for kind, a, b in [("API version", f_api, b_api),
2705
                         ("variants list", f_var, b_var),
2706
                         ("parameters", beautify_params(f_param),
2707
                          beautify_params(b_param))]:
2708
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2709
                      "OS %s for %s differs from reference node %s:"
2710
                      " [%s] vs. [%s]", kind, os_name,
2711
                      self.cfg.GetNodeName(base.uuid),
2712
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2713

    
2714
    # check any missing OSes
2715
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2716
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2717
                  "OSes present on reference node %s"
2718
                  " but missing on this node: %s",
2719
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2720

    
2721
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2722
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2723

2724
    @type ninfo: L{objects.Node}
2725
    @param ninfo: the node to check
2726
    @param nresult: the remote results for the node
2727
    @type is_master: bool
2728
    @param is_master: Whether node is the master node
2729

2730
    """
2731
    cluster = self.cfg.GetClusterInfo()
2732
    if (is_master and
2733
        (cluster.IsFileStorageEnabled() or
2734
         cluster.IsSharedFileStorageEnabled())):
2735
      try:
2736
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2737
      except KeyError:
2738
        # This should never happen
2739
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2740
                      "Node did not return forbidden file storage paths")
2741
      else:
2742
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2743
                      "Found forbidden file storage paths: %s",
2744
                      utils.CommaJoin(fspaths))
2745
    else:
2746
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2747
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2748
                    "Node should not have returned forbidden file storage"
2749
                    " paths")
2750

    
2751
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2752
                          verify_key, error_key):
2753
    """Verifies (file) storage paths.
2754

2755
    @type ninfo: L{objects.Node}
2756
    @param ninfo: the node to check
2757
    @param nresult: the remote results for the node
2758
    @type file_disk_template: string
2759
    @param file_disk_template: file-based disk template, whose directory
2760
        is supposed to be verified
2761
    @type verify_key: string
2762
    @param verify_key: key for the verification map of this file
2763
        verification step
2764
    @param error_key: error key to be added to the verification results
2765
        in case something goes wrong in this verification step
2766

2767
    """
2768
    assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
2769
              constants.ST_FILE, constants.ST_SHARED_FILE
2770
           ))
2771

    
2772
    cluster = self.cfg.GetClusterInfo()
2773
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2774
      self._ErrorIf(
2775
          verify_key in nresult,
2776
          error_key, ninfo.name,
2777
          "The configured %s storage path is unusable: %s" %
2778
          (file_disk_template, nresult.get(verify_key)))
2779

    
2780
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2781
    """Verifies (file) storage paths.
2782

2783
    @see: C{_VerifyStoragePaths}
2784

2785
    """
2786
    self._VerifyStoragePaths(
2787
        ninfo, nresult, constants.DT_FILE,
2788
        constants.NV_FILE_STORAGE_PATH,
2789
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2790

    
2791
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2792
    """Verifies (file) storage paths.
2793

2794
    @see: C{_VerifyStoragePaths}
2795

2796
    """
2797
    self._VerifyStoragePaths(
2798
        ninfo, nresult, constants.DT_SHARED_FILE,
2799
        constants.NV_SHARED_FILE_STORAGE_PATH,
2800
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2801

    
2802
  def _VerifyOob(self, ninfo, nresult):
2803
    """Verifies out of band functionality of a node.
2804

2805
    @type ninfo: L{objects.Node}
2806
    @param ninfo: the node to check
2807
    @param nresult: the remote results for the node
2808

2809
    """
2810
    # We just have to verify the paths on master and/or master candidates
2811
    # as the oob helper is invoked on the master
2812
    if ((ninfo.master_candidate or ninfo.master_capable) and
2813
        constants.NV_OOB_PATHS in nresult):
2814
      for path_result in nresult[constants.NV_OOB_PATHS]:
2815
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2816
                      ninfo.name, path_result)
2817

    
2818
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2819
    """Verifies and updates the node volume data.
2820

2821
    This function will update a L{NodeImage}'s internal structures
2822
    with data from the remote call.
2823

2824
    @type ninfo: L{objects.Node}
2825
    @param ninfo: the node to check
2826
    @param nresult: the remote results for the node
2827
    @param nimg: the node image object
2828
    @param vg_name: the configured VG name
2829

2830
    """
2831
    nimg.lvm_fail = True
2832
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2833
    if vg_name is None:
2834
      pass
2835
    elif isinstance(lvdata, basestring):
2836
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2837
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2838
    elif not isinstance(lvdata, dict):
2839
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2840
                    "rpc call to node failed (lvlist)")
2841
    else:
2842
      nimg.volumes = lvdata
2843
      nimg.lvm_fail = False
2844

    
2845
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2846
    """Verifies and updates the node instance list.
2847

2848
    If the listing was successful, then updates this node's instance
2849
    list. Otherwise, it marks the RPC call as failed for the instance
2850
    list key.
2851

2852
    @type ninfo: L{objects.Node}
2853
    @param ninfo: the node to check
2854
    @param nresult: the remote results for the node
2855
    @param nimg: the node image object
2856

2857
    """
2858
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2859
    test = not isinstance(idata, list)
2860
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2861
                  "rpc call to node failed (instancelist): %s",
2862
                  utils.SafeEncode(str(idata)))
2863
    if test:
2864
      nimg.hyp_fail = True
2865
    else:
2866
      nimg.instances = [inst.uuid for (_, inst) in
2867
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2868

    
2869
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2870
    """Verifies and computes a node information map
2871

2872
    @type ninfo: L{objects.Node}
2873
    @param ninfo: the node to check
2874
    @param nresult: the remote results for the node
2875
    @param nimg: the node image object
2876
    @param vg_name: the configured VG name
2877

2878
    """
2879
    # try to read free memory (from the hypervisor)
2880
    hv_info = nresult.get(constants.NV_HVINFO, None)
2881
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2882
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2883
                  "rpc call to node failed (hvinfo)")
2884
    if not test:
2885
      try:
2886
        nimg.mfree = int(hv_info["memory_free"])
2887
      except (ValueError, TypeError):
2888
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2889
                      "node returned invalid nodeinfo, check hypervisor")
2890

    
2891
    # FIXME: devise a free space model for file based instances as well
2892
    if vg_name is not None:
2893
      test = (constants.NV_VGLIST not in nresult or
2894
              vg_name not in nresult[constants.NV_VGLIST])
2895
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2896
                    "node didn't return data for the volume group '%s'"
2897
                    " - it is either missing or broken", vg_name)
2898
      if not test:
2899
        try:
2900
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2901
        except (ValueError, TypeError):
2902
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2903
                        "node returned invalid LVM info, check LVM status")
2904

    
2905
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2906
    """Gets per-disk status information for all instances.
2907

2908
    @type node_uuids: list of strings
2909
    @param node_uuids: Node UUIDs
2910
    @type node_image: dict of (UUID, L{objects.Node})
2911
    @param node_image: Node objects
2912
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2913
    @param instanceinfo: Instance objects
2914
    @rtype: {instance: {node: [(succes, payload)]}}
2915
    @return: a dictionary of per-instance dictionaries with nodes as
2916
        keys and disk information as values; the disk information is a
2917
        list of tuples (success, payload)
2918

2919
    """
2920
    node_disks = {}
2921
    node_disks_dev_inst_only = {}
2922
    diskless_instances = set()
2923
    nodisk_instances = set()
2924
    diskless = constants.DT_DISKLESS
2925

    
2926
    for nuuid in node_uuids:
2927
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2928
                                             node_image[nuuid].sinst))
2929
      diskless_instances.update(uuid for uuid in node_inst_uuids
2930
                                if instanceinfo[uuid].disk_template == diskless)
2931
      disks = [(inst_uuid, disk)
2932
               for inst_uuid in node_inst_uuids
2933
               for disk in instanceinfo[inst_uuid].disks]
2934

    
2935
      if not disks:
2936
        nodisk_instances.update(uuid for uuid in node_inst_uuids
2937
                                if instanceinfo[uuid].disk_template != diskless)
2938
        # No need to collect data
2939
        continue
2940

    
2941
      node_disks[nuuid] = disks
2942

    
2943
      # _AnnotateDiskParams makes already copies of the disks
2944
      dev_inst_only = []
2945
      for (inst_uuid, dev) in disks:
2946
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2947
                                          self.cfg)
2948
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2949

    
2950
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2951

    
2952
    assert len(node_disks) == len(node_disks_dev_inst_only)
2953

    
2954
    # Collect data from all nodes with disks
2955
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2956
               node_disks.keys(), node_disks_dev_inst_only)
2957

    
2958
    assert len(result) == len(node_disks)
2959

    
2960
    instdisk = {}
2961

    
2962
    for (nuuid, nres) in result.items():
2963
      node = self.cfg.GetNodeInfo(nuuid)
2964
      disks = node_disks[node.uuid]
2965

    
2966
      if nres.offline:
2967
        # No data from this node
2968
        data = len(disks) * [(False, "node offline")]
2969
      else:
2970
        msg = nres.fail_msg
2971
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2972
                      "while getting disk information: %s", msg)
2973
        if msg:
2974
          # No data from this node
2975
          data = len(disks) * [(False, msg)]
2976
        else:
2977
          data = []
2978
          for idx, i in enumerate(nres.payload):
2979
            if isinstance(i, (tuple, list)) and len(i) == 2:
2980
              data.append(i)
2981
            else:
2982
              logging.warning("Invalid result from node %s, entry %d: %s",
2983
                              node.name, idx, i)
2984
              data.append((False, "Invalid result from the remote node"))
2985

    
2986
      for ((inst_uuid, _), status) in zip(disks, data):
2987
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2988
          .append(status)
2989

    
2990
    # Add empty entries for diskless instances.
2991
    for inst_uuid in diskless_instances:
2992
      assert inst_uuid not in instdisk
2993
      instdisk[inst_uuid] = {}
2994
    # ...and disk-full instances that happen to have no disks
2995
    for inst_uuid in nodisk_instances:
2996
      assert inst_uuid not in instdisk
2997
      instdisk[inst_uuid] = {}
2998

    
2999
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
3000
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
3001
                      compat.all(isinstance(s, (tuple, list)) and
3002
                                 len(s) == 2 for s in statuses)
3003
                      for inst, nuuids in instdisk.items()
3004
                      for nuuid, statuses in nuuids.items())
3005
    if __debug__:
3006
      instdisk_keys = set(instdisk)
3007
      instanceinfo_keys = set(instanceinfo)
3008
      assert instdisk_keys == instanceinfo_keys, \
3009
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
3010
         (instdisk_keys, instanceinfo_keys))
3011

    
3012
    return instdisk
3013

    
3014
  @staticmethod
3015
  def _SshNodeSelector(group_uuid, all_nodes):
3016
    """Create endless iterators for all potential SSH check hosts.
3017

3018
    """
3019
    nodes = [node for node in all_nodes
3020
             if (node.group != group_uuid and
3021
                 not node.offline)]
3022
    keyfunc = operator.attrgetter("group")
3023

    
3024
    return map(itertools.cycle,
3025
               [sorted(map(operator.attrgetter("name"), names))
3026
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
3027
                                                  keyfunc)])
3028

    
3029
  @classmethod
3030
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
3031
    """Choose which nodes should talk to which other nodes.
3032

3033
    We will make nodes contact all nodes in their group, and one node from
3034
    every other group.
3035

3036
    @warning: This algorithm has a known issue if one node group is much
3037
      smaller than others (e.g. just one node). In such a case all other
3038
      nodes will talk to the single node.
3039

3040
    """
3041
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
3042
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
3043

    
3044
    return (online_nodes,
3045
            dict((name, sorted([i.next() for i in sel]))
3046
                 for name in online_nodes))
3047

    
3048
  def BuildHooksEnv(self):
3049
    """Build hooks env.
3050

3051
    Cluster-Verify hooks just ran in the post phase and their failure makes
3052
    the output be logged in the verify output and the verification to fail.
3053

3054
    """
3055
    env = {
3056
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
3057
      }
3058

    
3059
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
3060
               for node in self.my_node_info.values())
3061

    
3062
    return env
3063

    
3064
  def BuildHooksNodes(self):
3065
    """Build hooks nodes.
3066

3067
    """
3068
    return ([], list(self.my_node_info.keys()))
3069

    
3070
  def Exec(self, feedback_fn):
3071
    """Verify integrity of the node group, performing various test on nodes.
3072

3073
    """
3074
    # This method has too many local variables. pylint: disable=R0914
3075
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
3076

    
3077
    if not self.my_node_uuids:
3078
      # empty node group
3079
      feedback_fn("* Empty node group, skipping verification")
3080
      return True
3081

    
3082
    self.bad = False
3083
    verbose = self.op.verbose
3084
    self._feedback_fn = feedback_fn
3085

    
3086
    vg_name = self.cfg.GetVGName()
3087
    drbd_helper = self.cfg.GetDRBDHelper()
3088
    cluster = self.cfg.GetClusterInfo()
3089
    hypervisors = cluster.enabled_hypervisors
3090
    node_data_list = self.my_node_info.values()
3091

    
3092
    i_non_redundant = [] # Non redundant instances
3093
    i_non_a_balanced = [] # Non auto-balanced instances
3094
    i_offline = 0 # Count of offline instances
3095
    n_offline = 0 # Count of offline nodes
3096
    n_drained = 0 # Count of nodes being drained
3097
    node_vol_should = {}
3098

    
3099
    # FIXME: verify OS list
3100

    
3101
    # File verification
3102
    filemap = ComputeAncillaryFiles(cluster, False)
3103

    
3104
    # do local checksums
3105
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
3106
    master_ip = self.cfg.GetMasterIP()
3107

    
3108
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
3109

    
3110
    user_scripts = []
3111
    if self.cfg.GetUseExternalMipScript():
3112
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
3113

    
3114
    node_verify_param = {
3115
      constants.NV_FILELIST:
3116
        map(vcluster.MakeVirtualPath,
3117
            utils.UniqueSequence(filename
3118
                                 for files in filemap
3119
                                 for filename in files)),
3120
      constants.NV_NODELIST:
3121
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
3122
                                  self.all_node_info.values()),
3123
      constants.NV_HYPERVISOR: hypervisors,
3124
      constants.NV_HVPARAMS:
3125
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
3126
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
3127
                                 for node in node_data_list
3128
                                 if not node.offline],
3129
      constants.NV_INSTANCELIST: hypervisors,
3130
      constants.NV_VERSION: None,
3131
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
3132
      constants.NV_NODESETUP: None,
3133
      constants.NV_TIME: None,
3134
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
3135
      constants.NV_OSLIST: None,
3136
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
3137
      constants.NV_USERSCRIPTS: user_scripts,
3138
      constants.NV_CLIENT_CERT: None,
3139
      }
3140

    
3141
    if vg_name is not None:
3142
      node_verify_param[constants.NV_VGLIST] = None
3143
      node_verify_param[constants.NV_LVLIST] = vg_name
3144
      node_verify_param[constants.NV_PVLIST] = [vg_name]
3145

    
3146
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
3147
      if drbd_helper:
3148
        node_verify_param[constants.NV_DRBDVERSION] = None
3149
        node_verify_param[constants.NV_DRBDLIST] = None
3150
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
3151

    
3152
    if cluster.IsFileStorageEnabled() or \
3153
        cluster.IsSharedFileStorageEnabled():
3154
      # Load file storage paths only from master node
3155
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
3156
        self.cfg.GetMasterNodeName()
3157
      if cluster.IsFileStorageEnabled():
3158
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
3159
          cluster.file_storage_dir
3160

    
3161
    # bridge checks
3162
    # FIXME: this needs to be changed per node-group, not cluster-wide
3163
    bridges = set()
3164
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
3165
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3166
      bridges.add(default_nicpp[constants.NIC_LINK])
3167
    for inst_uuid in self.my_inst_info.values():
3168
      for nic in inst_uuid.nics:
3169
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
3170
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3171
          bridges.add(full_nic[constants.NIC_LINK])
3172

    
3173
    if bridges:
3174
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
3175

    
3176
    # Build our expected cluster state
3177
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
3178
                                                 uuid=node.uuid,
3179
                                                 vm_capable=node.vm_capable))
3180
                      for node in node_data_list)
3181

    
3182
    # Gather OOB paths
3183
    oob_paths = []
3184
    for node in self.all_node_info.values():
3185
      path = SupportsOob(self.cfg, node)
3186
      if path and path not in oob_paths:
3187
        oob_paths.append(path)
3188

    
3189
    if oob_paths:
3190
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
3191

    
3192
    for inst_uuid in self.my_inst_uuids:
3193
      instance = self.my_inst_info[inst_uuid]
3194
      if instance.admin_state == constants.ADMINST_OFFLINE:
3195
        i_offline += 1
3196

    
3197
      for nuuid in instance.all_nodes:
3198
        if nuuid not in node_image:
3199
          gnode = self.NodeImage(uuid=nuuid)
3200
          gnode.ghost = (nuuid not in self.all_node_info)
3201
          node_image[nuuid] = gnode
3202

    
3203
      instance.MapLVsByNode(node_vol_should)
3204

    
3205
      pnode = instance.primary_node
3206
      node_image[pnode].pinst.append(instance.uuid)
3207

    
3208
      for snode in instance.secondary_nodes:
3209
        nimg = node_image[snode]
3210
        nimg.sinst.append(instance.uuid)
3211
        if pnode not in nimg.sbp:
3212
          nimg.sbp[pnode] = []
3213
        nimg.sbp[pnode].append(instance.uuid)
3214

    
3215
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3216
                                               self.my_node_info.keys())
3217
    # The value of exclusive_storage should be the same across the group, so if
3218
    # it's True for at least a node, we act as if it were set for all the nodes
3219
    self._exclusive_storage = compat.any(es_flags.values())
3220
    if self._exclusive_storage:
3221
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3222

    
3223
    node_group_uuids = dict(map(lambda n: (n.name, n.group),
3224
                                self.cfg.GetAllNodesInfo().values()))
3225
    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
3226

    
3227
    # At this point, we have the in-memory data structures complete,
3228
    # except for the runtime information, which we'll gather next
3229

    
3230
    # Due to the way our RPC system works, exact response times cannot be
3231
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3232
    # time before and after executing the request, we can at least have a time
3233
    # window.
3234
    nvinfo_starttime = time.time()
3235
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3236
                                           node_verify_param,
3237
                                           self.cfg.GetClusterName(),
3238
                                           self.cfg.GetClusterInfo().hvparams,
3239
                                           node_group_uuids,
3240
                                           groups_config)
3241
    nvinfo_endtime = time.time()
3242

    
3243
    if self.extra_lv_nodes and vg_name is not None:
3244
      extra_lv_nvinfo = \
3245
          self.rpc.call_node_verify(self.extra_lv_nodes,
3246
                                    {constants.NV_LVLIST: vg_name},
3247
                                    self.cfg.GetClusterName(),
3248
                                    self.cfg.GetClusterInfo().hvparams,
3249
                                    node_group_uuids,
3250
                                    groups_config)
3251
    else:
3252
      extra_lv_nvinfo = {}
3253

    
3254
    all_drbd_map = self.cfg.ComputeDRBDMap()
3255

    
3256
    feedback_fn("* Gathering disk information (%s nodes)" %
3257
                len(self.my_node_uuids))
3258
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3259
                                     self.my_inst_info)
3260

    
3261
    feedback_fn("* Verifying configuration file consistency")
3262

    
3263
    # If not all nodes are being checked, we need to make sure the master node
3264
    # and a non-checked vm_capable node are in the list.
3265
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3266
    if absent_node_uuids:
3267
      vf_nvinfo = all_nvinfo.copy()
3268
      vf_node_info = list(self.my_node_info.values())
3269
      additional_node_uuids = []
3270
      if master_node_uuid not in self.my_node_info:
3271
        additional_node_uuids.append(master_node_uuid)
3272
        vf_node_info.append(self.all_node_info[master_node_uuid])
3273
      # Add the first vm_capable node we find which is not included,
3274
      # excluding the master node (which we already have)
3275
      for node_uuid in absent_node_uuids:
3276
        nodeinfo = self.all_node_info[node_uuid]
3277
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3278
            node_uuid != master_node_uuid):
3279
          additional_node_uuids.append(node_uuid)
3280
          vf_node_info.append(self.all_node_info[node_uuid])
3281
          break
3282
      key = constants.NV_FILELIST
3283
      vf_nvinfo.update(self.rpc.call_node_verify(
3284
         additional_node_uuids, {key: node_verify_param[key]},
3285
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
3286
         node_group_uuids,
3287
         groups_config))
3288
    else:
3289
      vf_nvinfo = all_nvinfo
3290
      vf_node_info = self.my_node_info.values()
3291

    
3292
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3293

    
3294
    feedback_fn("* Verifying node status")
3295

    
3296
    refos_img = None
3297

    
3298
    for node_i in node_data_list:
3299
      nimg = node_image[node_i.uuid]
3300

    
3301
      if node_i.offline:
3302
        if verbose:
3303
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3304
        n_offline += 1
3305
        continue
3306

    
3307
      if node_i.uuid == master_node_uuid:
3308
        ntype = "master"
3309
      elif node_i.master_candidate:
3310
        ntype = "master candidate"
3311
      elif node_i.drained:
3312
        ntype = "drained"
3313
        n_drained += 1
3314
      else:
3315
        ntype = "regular"
3316
      if verbose:
3317
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3318

    
3319
      msg = all_nvinfo[node_i.uuid].fail_msg
3320
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3321
                    "while contacting node: %s", msg)
3322
      if msg:
3323
        nimg.rpc_fail = True
3324
        continue
3325

    
3326
      nresult = all_nvinfo[node_i.uuid].payload
3327

    
3328
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3329
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3330
      self._VerifyNodeNetwork(node_i, nresult)
3331
      self._VerifyNodeUserScripts(node_i, nresult)
3332
      self._VerifyOob(node_i, nresult)
3333
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3334
                                           node_i.uuid == master_node_uuid)
3335
      self._VerifyFileStoragePaths(node_i, nresult)
3336
      self._VerifySharedFileStoragePaths(node_i, nresult)
3337

    
3338
      if nimg.vm_capable:
3339
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3340
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3341
                             all_drbd_map)
3342

    
3343
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3344
        self._UpdateNodeInstances(node_i, nresult, nimg)
3345
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3346
        self._UpdateNodeOS(node_i, nresult, nimg)
3347

    
3348
        if not nimg.os_fail:
3349
          if refos_img is None:
3350
            refos_img = nimg
3351
          self._VerifyNodeOS(node_i, nimg, refos_img)
3352
        self._VerifyNodeBridges(node_i, nresult, bridges)
3353

    
3354
        # Check whether all running instances are primary for the node. (This
3355
        # can no longer be done from _VerifyInstance below, since some of the
3356
        # wrong instances could be from other node groups.)
3357
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3358

    
3359
        for inst_uuid in non_primary_inst_uuids:
3360
          test = inst_uuid in self.all_inst_info
3361
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3362
                        self.cfg.GetInstanceName(inst_uuid),
3363
                        "instance should not run on node %s", node_i.name)
3364
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3365
                        "node is running unknown instance %s", inst_uuid)
3366

    
3367
    self._VerifyGroupDRBDVersion(all_nvinfo)
3368
    self._VerifyGroupLVM(node_image, vg_name)
3369

    
3370
    for node_uuid, result in extra_lv_nvinfo.items():
3371
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3372
                              node_image[node_uuid], vg_name)
3373

    
3374
    feedback_fn("* Verifying instance status")
3375
    for inst_uuid in self.my_inst_uuids:
3376
      instance = self.my_inst_info[inst_uuid]
3377
      if verbose:
3378
        feedback_fn("* Verifying instance %s" % instance.name)
3379
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3380

    
3381
      # If the instance is non-redundant we cannot survive losing its primary
3382
      # node, so we are not N+1 compliant.
3383
      if instance.disk_template not in constants.DTS_MIRRORED:
3384
        i_non_redundant.append(instance)
3385

    
3386
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3387
        i_non_a_balanced.append(instance)
3388

    
3389
    feedback_fn("* Verifying orphan volumes")
3390
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3391

    
3392
    # We will get spurious "unknown volume" warnings if any node of this group
3393
    # is secondary for an instance whose primary is in another group. To avoid
3394
    # them, we find these instances and add their volumes to node_vol_should.
3395
    for instance in self.all_inst_info.values():
3396
      for secondary in instance.secondary_nodes:
3397
        if (secondary in self.my_node_info
3398
            and instance.name not in self.my_inst_info):
3399
          instance.MapLVsByNode(node_vol_should)
3400
          break
3401

    
3402
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3403

    
3404
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3405
      feedback_fn("* Verifying N+1 Memory redundancy")
3406
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3407

    
3408
    feedback_fn("* Other Notes")
3409
    if i_non_redundant:
3410
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3411
                  % len(i_non_redundant))
3412

    
3413
    if i_non_a_balanced:
3414
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3415
                  % len(i_non_a_balanced))
3416

    
3417
    if i_offline:
3418
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3419

    
3420
    if n_offline:
3421
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3422

    
3423
    if n_drained:
3424
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3425

    
3426
    return not self.bad
3427

    
3428
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3429
    """Analyze the post-hooks' result
3430

3431
    This method analyses the hook result, handles it, and sends some
3432
    nicely-formatted feedback back to the user.
3433

3434
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3435
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3436
    @param hooks_results: the results of the multi-node hooks rpc call
3437
    @param feedback_fn: function used send feedback back to the caller
3438
    @param lu_result: previous Exec result
3439
    @return: the new Exec result, based on the previous result
3440
        and hook results
3441

3442
    """
3443
    # We only really run POST phase hooks, only for non-empty groups,
3444
    # and are only interested in their results
3445
    if not self.my_node_uuids:
3446
      # empty node group
3447
      pass
3448
    elif phase == constants.HOOKS_PHASE_POST:
3449
      # Used to change hooks' output to proper indentation
3450
      feedback_fn("* Hooks Results")
3451
      assert hooks_results, "invalid result from hooks"
3452

    
3453
      for node_name in hooks_results:
3454
        res = hooks_results[node_name]
3455
        msg = res.fail_msg
3456
        test = msg and not res.offline
3457
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3458
                      "Communication failure in hooks execution: %s", msg)
3459
        if res.offline or msg:
3460
          # No need to investigate payload if node is offline or gave
3461
          # an error.
3462
          continue
3463
        for script, hkr, output in res.payload:
3464
          test = hkr == constants.HKR_FAIL
3465
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3466
                        "Script %s failed, output:", script)
3467
          if test:
3468
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3469
            feedback_fn("%s" % output)
3470
            lu_result = False
3471

    
3472
    return lu_result
3473

    
3474

    
3475
class LUClusterVerifyDisks(NoHooksLU):
3476
  """Verifies the cluster disks status.
3477

3478
  """
3479
  REQ_BGL = False
3480

    
3481
  def ExpandNames(self):
3482
    self.share_locks = ShareAll()
3483
    self.needed_locks = {
3484
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3485
      }
3486

    
3487
  def Exec(self, feedback_fn):
3488
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3489

    
3490
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3491
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3492
                           for group in group_names])