Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ b6dd32db

History | View | Annotate | Download (135.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import copy
25
import itertools
26
import logging
27
import operator
28
import os
29
import re
30
import time
31

    
32
from ganeti import compat
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import hypervisor
36
from ganeti import locking
37
from ganeti import masterd
38
from ganeti import netutils
39
from ganeti import objects
40
from ganeti import opcodes
41
from ganeti import pathutils
42
from ganeti import query
43
import ganeti.rpc.node as rpc
44
from ganeti import runtime
45
from ganeti import ssh
46
from ganeti import uidpool
47
from ganeti import utils
48
from ganeti import vcluster
49

    
50
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
51
  ResultWithJobs
52
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
53
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
54
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
55
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
56
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
57
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
58
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
59
  CheckDiskAccessModeConsistency, CreateNewClientCert, \
60
  AddInstanceCommunicationNetworkOp, ConnectInstanceCommunicationNetworkOp
61

    
62
import ganeti.masterd.instance
63

    
64

    
65
def _UpdateMasterClientCert(
66
    lu, master_uuid, cluster, feedback_fn,
67
    client_cert=pathutils.NODED_CLIENT_CERT_FILE,
68
    client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP):
69
  """Renews the master's client certificate and propagates the config.
70

71
  @type lu: C{LogicalUnit}
72
  @param lu: the logical unit holding the config
73
  @type master_uuid: string
74
  @param master_uuid: the master node's UUID
75
  @type cluster: C{objects.Cluster}
76
  @param cluster: the cluster's configuration
77
  @type feedback_fn: function
78
  @param feedback_fn: feedback functions for config updates
79
  @type client_cert: string
80
  @param client_cert: the path of the client certificate
81
  @type client_cert_tmp: string
82
  @param client_cert_tmp: the temporary path of the client certificate
83
  @rtype: string
84
  @return: the digest of the newly created client certificate
85

86
  """
87
  client_digest = CreateNewClientCert(lu, master_uuid, filename=client_cert_tmp)
88
  utils.AddNodeToCandidateCerts(master_uuid, client_digest,
89
                                cluster.candidate_certs)
90
  # This triggers an update of the config and distribution of it with the old
91
  # SSL certificate
92
  lu.cfg.Update(cluster, feedback_fn)
93

    
94
  utils.RemoveFile(client_cert)
95
  utils.RenameFile(client_cert_tmp, client_cert)
96
  return client_digest
97

    
98

    
99
class LUClusterRenewCrypto(NoHooksLU):
100
  """Renew the cluster's crypto tokens.
101

102
  Note that most of this operation is done in gnt_cluster.py, this LU only
103
  takes care of the renewal of the client SSL certificates.
104

105
  """
106
  def Exec(self, feedback_fn):
107
    master_uuid = self.cfg.GetMasterNode()
108
    cluster = self.cfg.GetClusterInfo()
109

    
110
    server_digest = utils.GetCertificateDigest(
111
      cert_filename=pathutils.NODED_CERT_FILE)
112
    utils.AddNodeToCandidateCerts("%s-SERVER" % master_uuid,
113
                                  server_digest,
114
                                  cluster.candidate_certs)
115
    new_master_digest = _UpdateMasterClientCert(self, master_uuid, cluster,
116
                                                feedback_fn)
117

    
118
    cluster.candidate_certs = {master_uuid: new_master_digest}
119
    nodes = self.cfg.GetAllNodesInfo()
120
    for (node_uuid, node_info) in nodes.items():
121
      if node_uuid != master_uuid:
122
        new_digest = CreateNewClientCert(self, node_uuid)
123
        if node_info.master_candidate:
124
          cluster.candidate_certs[node_uuid] = new_digest
125
    # Trigger another update of the config now with the new master cert
126
    self.cfg.Update(cluster, feedback_fn)
127

    
128

    
129
class LUClusterActivateMasterIp(NoHooksLU):
130
  """Activate the master IP on the master node.
131

132
  """
133
  def Exec(self, feedback_fn):
134
    """Activate the master IP.
135

136
    """
137
    master_params = self.cfg.GetMasterNetworkParameters()
138
    ems = self.cfg.GetUseExternalMipScript()
139
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
140
                                                   master_params, ems)
141
    result.Raise("Could not activate the master IP")
142

    
143

    
144
class LUClusterDeactivateMasterIp(NoHooksLU):
145
  """Deactivate the master IP on the master node.
146

147
  """
148
  def Exec(self, feedback_fn):
149
    """Deactivate the master IP.
150

151
    """
152
    master_params = self.cfg.GetMasterNetworkParameters()
153
    ems = self.cfg.GetUseExternalMipScript()
154
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
155
                                                     master_params, ems)
156
    result.Raise("Could not deactivate the master IP")
157

    
158

    
159
class LUClusterConfigQuery(NoHooksLU):
160
  """Return configuration values.
161

162
  """
163
  REQ_BGL = False
164

    
165
  def CheckArguments(self):
166
    self.cq = ClusterQuery(None, self.op.output_fields, False)
167

    
168
  def ExpandNames(self):
169
    self.cq.ExpandNames(self)
170

    
171
  def DeclareLocks(self, level):
172
    self.cq.DeclareLocks(self, level)
173

    
174
  def Exec(self, feedback_fn):
175
    result = self.cq.OldStyleQuery(self)
176

    
177
    assert len(result) == 1
178

    
179
    return result[0]
180

    
181

    
182
class LUClusterDestroy(LogicalUnit):
183
  """Logical unit for destroying the cluster.
184

185
  """
186
  HPATH = "cluster-destroy"
187
  HTYPE = constants.HTYPE_CLUSTER
188

    
189
  def BuildHooksEnv(self):
190
    """Build hooks env.
191

192
    """
193
    return {
194
      "OP_TARGET": self.cfg.GetClusterName(),
195
      }
196

    
197
  def BuildHooksNodes(self):
198
    """Build hooks nodes.
199

200
    """
201
    return ([], [])
202

    
203
  def CheckPrereq(self):
204
    """Check prerequisites.
205

206
    This checks whether the cluster is empty.
207

208
    Any errors are signaled by raising errors.OpPrereqError.
209

210
    """
211
    master = self.cfg.GetMasterNode()
212

    
213
    nodelist = self.cfg.GetNodeList()
214
    if len(nodelist) != 1 or nodelist[0] != master:
215
      raise errors.OpPrereqError("There are still %d node(s) in"
216
                                 " this cluster." % (len(nodelist) - 1),
217
                                 errors.ECODE_INVAL)
218
    instancelist = self.cfg.GetInstanceList()
219
    if instancelist:
220
      raise errors.OpPrereqError("There are still %d instance(s) in"
221
                                 " this cluster." % len(instancelist),
222
                                 errors.ECODE_INVAL)
223

    
224
  def Exec(self, feedback_fn):
225
    """Destroys the cluster.
226

227
    """
228
    master_params = self.cfg.GetMasterNetworkParameters()
229

    
230
    # Run post hooks on master node before it's removed
231
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
232

    
233
    ems = self.cfg.GetUseExternalMipScript()
234
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
235
                                                     master_params, ems)
236
    result.Warn("Error disabling the master IP address", self.LogWarning)
237
    return master_params.uuid
238

    
239

    
240
class LUClusterPostInit(LogicalUnit):
241
  """Logical unit for running hooks after cluster initialization.
242

243
  """
244
  HPATH = "cluster-init"
245
  HTYPE = constants.HTYPE_CLUSTER
246

    
247
  def CheckArguments(self):
248
    self.master_uuid = self.cfg.GetMasterNode()
249
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
250

    
251
    # TODO: When Issue 584 is solved, and None is properly parsed when used
252
    # as a default value, ndparams.get(.., None) can be changed to
253
    # ndparams[..] to access the values directly
254

    
255
    # OpenvSwitch: Warn user if link is missing
256
    if (self.master_ndparams[constants.ND_OVS] and not
257
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
258
      self.LogInfo("No physical interface for OpenvSwitch was given."
259
                   " OpenvSwitch will not have an outside connection. This"
260
                   " might not be what you want.")
261

    
262
  def BuildHooksEnv(self):
263
    """Build hooks env.
264

265
    """
266
    return {
267
      "OP_TARGET": self.cfg.GetClusterName(),
268
      }
269

    
270
  def BuildHooksNodes(self):
271
    """Build hooks nodes.
272

273
    """
274
    return ([], [self.cfg.GetMasterNode()])
275

    
276
  def Exec(self, feedback_fn):
277
    """Create and configure Open vSwitch
278

279
    """
280
    if self.master_ndparams[constants.ND_OVS]:
281
      result = self.rpc.call_node_configure_ovs(
282
                 self.master_uuid,
283
                 self.master_ndparams[constants.ND_OVS_NAME],
284
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
285
      result.Raise("Could not successully configure Open vSwitch")
286

    
287
    cluster = self.cfg.GetClusterInfo()
288
    _UpdateMasterClientCert(self, self.master_uuid, cluster, feedback_fn)
289

    
290
    return True
291

    
292

    
293
class ClusterQuery(QueryBase):
294
  FIELDS = query.CLUSTER_FIELDS
295

    
296
  #: Do not sort (there is only one item)
297
  SORT_FIELD = None
298

    
299
  def ExpandNames(self, lu):
300
    lu.needed_locks = {}
301

    
302
    # The following variables interact with _QueryBase._GetNames
303
    self.wanted = locking.ALL_SET
304
    self.do_locking = self.use_locking
305

    
306
    if self.do_locking:
307
      raise errors.OpPrereqError("Can not use locking for cluster queries",
308
                                 errors.ECODE_INVAL)
309

    
310
  def DeclareLocks(self, lu, level):
311
    pass
312

    
313
  def _GetQueryData(self, lu):
314
    """Computes the list of nodes and their attributes.
315

316
    """
317
    # Locking is not used
318
    assert not (compat.any(lu.glm.is_owned(level)
319
                           for level in locking.LEVELS
320
                           if level != locking.LEVEL_CLUSTER) or
321
                self.do_locking or self.use_locking)
322

    
323
    if query.CQ_CONFIG in self.requested_data:
324
      cluster = lu.cfg.GetClusterInfo()
325
      nodes = lu.cfg.GetAllNodesInfo()
326
    else:
327
      cluster = NotImplemented
328
      nodes = NotImplemented
329

    
330
    if query.CQ_QUEUE_DRAINED in self.requested_data:
331
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
332
    else:
333
      drain_flag = NotImplemented
334

    
335
    if query.CQ_WATCHER_PAUSE in self.requested_data:
336
      master_node_uuid = lu.cfg.GetMasterNode()
337

    
338
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
339
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
340
                   lu.cfg.GetMasterNodeName())
341

    
342
      watcher_pause = result.payload
343
    else:
344
      watcher_pause = NotImplemented
345

    
346
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
347

    
348

    
349
class LUClusterQuery(NoHooksLU):
350
  """Query cluster configuration.
351

352
  """
353
  REQ_BGL = False
354

    
355
  def ExpandNames(self):
356
    self.needed_locks = {}
357

    
358
  def Exec(self, feedback_fn):
359
    """Return cluster config.
360

361
    """
362
    cluster = self.cfg.GetClusterInfo()
363
    os_hvp = {}
364

    
365
    # Filter just for enabled hypervisors
366
    for os_name, hv_dict in cluster.os_hvp.items():
367
      os_hvp[os_name] = {}
368
      for hv_name, hv_params in hv_dict.items():
369
        if hv_name in cluster.enabled_hypervisors:
370
          os_hvp[os_name][hv_name] = hv_params
371

    
372
    # Convert ip_family to ip_version
373
    primary_ip_version = constants.IP4_VERSION
374
    if cluster.primary_ip_family == netutils.IP6Address.family:
375
      primary_ip_version = constants.IP6_VERSION
376

    
377
    result = {
378
      "software_version": constants.RELEASE_VERSION,
379
      "protocol_version": constants.PROTOCOL_VERSION,
380
      "config_version": constants.CONFIG_VERSION,
381
      "os_api_version": max(constants.OS_API_VERSIONS),
382
      "export_version": constants.EXPORT_VERSION,
383
      "vcs_version": constants.VCS_VERSION,
384
      "architecture": runtime.GetArchInfo(),
385
      "name": cluster.cluster_name,
386
      "master": self.cfg.GetMasterNodeName(),
387
      "default_hypervisor": cluster.primary_hypervisor,
388
      "enabled_hypervisors": cluster.enabled_hypervisors,
389
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
390
                        for hypervisor_name in cluster.enabled_hypervisors]),
391
      "os_hvp": os_hvp,
392
      "beparams": cluster.beparams,
393
      "osparams": cluster.osparams,
394
      "ipolicy": cluster.ipolicy,
395
      "nicparams": cluster.nicparams,
396
      "ndparams": cluster.ndparams,
397
      "diskparams": cluster.diskparams,
398
      "candidate_pool_size": cluster.candidate_pool_size,
399
      "max_running_jobs": cluster.max_running_jobs,
400
      "master_netdev": cluster.master_netdev,
401
      "master_netmask": cluster.master_netmask,
402
      "use_external_mip_script": cluster.use_external_mip_script,
403
      "volume_group_name": cluster.volume_group_name,
404
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
405
      "file_storage_dir": cluster.file_storage_dir,
406
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
407
      "maintain_node_health": cluster.maintain_node_health,
408
      "ctime": cluster.ctime,
409
      "mtime": cluster.mtime,
410
      "uuid": cluster.uuid,
411
      "tags": list(cluster.GetTags()),
412
      "uid_pool": cluster.uid_pool,
413
      "default_iallocator": cluster.default_iallocator,
414
      "default_iallocator_params": cluster.default_iallocator_params,
415
      "reserved_lvs": cluster.reserved_lvs,
416
      "primary_ip_version": primary_ip_version,
417
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
418
      "hidden_os": cluster.hidden_os,
419
      "blacklisted_os": cluster.blacklisted_os,
420
      "enabled_disk_templates": cluster.enabled_disk_templates,
421
      "instance_communication_network": cluster.instance_communication_network,
422
      }
423

    
424
    return result
425

    
426

    
427
class LUClusterRedistConf(NoHooksLU):
428
  """Force the redistribution of cluster configuration.
429

430
  This is a very simple LU.
431

432
  """
433
  REQ_BGL = False
434

    
435
  def ExpandNames(self):
436
    self.needed_locks = {
437
      locking.LEVEL_NODE: locking.ALL_SET,
438
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
439
    }
440
    self.share_locks = ShareAll()
441

    
442
  def Exec(self, feedback_fn):
443
    """Redistribute the configuration.
444

445
    """
446
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
447
    RedistributeAncillaryFiles(self)
448

    
449

    
450
class LUClusterRename(LogicalUnit):
451
  """Rename the cluster.
452

453
  """
454
  HPATH = "cluster-rename"
455
  HTYPE = constants.HTYPE_CLUSTER
456

    
457
  def BuildHooksEnv(self):
458
    """Build hooks env.
459

460
    """
461
    return {
462
      "OP_TARGET": self.cfg.GetClusterName(),
463
      "NEW_NAME": self.op.name,
464
      }
465

    
466
  def BuildHooksNodes(self):
467
    """Build hooks nodes.
468

469
    """
470
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
471

    
472
  def CheckPrereq(self):
473
    """Verify that the passed name is a valid one.
474

475
    """
476
    hostname = netutils.GetHostname(name=self.op.name,
477
                                    family=self.cfg.GetPrimaryIPFamily())
478

    
479
    new_name = hostname.name
480
    self.ip = new_ip = hostname.ip
481
    old_name = self.cfg.GetClusterName()
482
    old_ip = self.cfg.GetMasterIP()
483
    if new_name == old_name and new_ip == old_ip:
484
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
485
                                 " cluster has changed",
486
                                 errors.ECODE_INVAL)
487
    if new_ip != old_ip:
488
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
489
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
490
                                   " reachable on the network" %
491
                                   new_ip, errors.ECODE_NOTUNIQUE)
492

    
493
    self.op.name = new_name
494

    
495
  def Exec(self, feedback_fn):
496
    """Rename the cluster.
497

498
    """
499
    clustername = self.op.name
500
    new_ip = self.ip
501

    
502
    # shutdown the master IP
503
    master_params = self.cfg.GetMasterNetworkParameters()
504
    ems = self.cfg.GetUseExternalMipScript()
505
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
506
                                                     master_params, ems)
507
    result.Raise("Could not disable the master role")
508

    
509
    try:
510
      cluster = self.cfg.GetClusterInfo()
511
      cluster.cluster_name = clustername
512
      cluster.master_ip = new_ip
513
      self.cfg.Update(cluster, feedback_fn)
514

    
515
      # update the known hosts file
516
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
517
      node_list = self.cfg.GetOnlineNodeList()
518
      try:
519
        node_list.remove(master_params.uuid)
520
      except ValueError:
521
        pass
522
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
523
    finally:
524
      master_params.ip = new_ip
525
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
526
                                                     master_params, ems)
527
      result.Warn("Could not re-enable the master role on the master,"
528
                  " please restart manually", self.LogWarning)
529

    
530
    return clustername
531

    
532

    
533
class LUClusterRepairDiskSizes(NoHooksLU):
534
  """Verifies the cluster disks sizes.
535

536
  """
537
  REQ_BGL = False
538

    
539
  def ExpandNames(self):
540
    if self.op.instances:
541
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
542
      # Not getting the node allocation lock as only a specific set of
543
      # instances (and their nodes) is going to be acquired
544
      self.needed_locks = {
545
        locking.LEVEL_NODE_RES: [],
546
        locking.LEVEL_INSTANCE: self.wanted_names,
547
        }
548
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
549
    else:
550
      self.wanted_names = None
551
      self.needed_locks = {
552
        locking.LEVEL_NODE_RES: locking.ALL_SET,
553
        locking.LEVEL_INSTANCE: locking.ALL_SET,
554

    
555
        # This opcode is acquires the node locks for all instances
556
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
557
        }
558

    
559
    self.share_locks = {
560
      locking.LEVEL_NODE_RES: 1,
561
      locking.LEVEL_INSTANCE: 0,
562
      locking.LEVEL_NODE_ALLOC: 1,
563
      }
564

    
565
  def DeclareLocks(self, level):
566
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
567
      self._LockInstancesNodes(primary_only=True, level=level)
568

    
569
  def CheckPrereq(self):
570
    """Check prerequisites.
571

572
    This only checks the optional instance list against the existing names.
573

574
    """
575
    if self.wanted_names is None:
576
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
577

    
578
    self.wanted_instances = \
579
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
580

    
581
  def _EnsureChildSizes(self, disk):
582
    """Ensure children of the disk have the needed disk size.
583

584
    This is valid mainly for DRBD8 and fixes an issue where the
585
    children have smaller disk size.
586

587
    @param disk: an L{ganeti.objects.Disk} object
588

589
    """
590
    if disk.dev_type == constants.DT_DRBD8:
591
      assert disk.children, "Empty children for DRBD8?"
592
      fchild = disk.children[0]
593
      mismatch = fchild.size < disk.size
594
      if mismatch:
595
        self.LogInfo("Child disk has size %d, parent %d, fixing",
596
                     fchild.size, disk.size)
597
        fchild.size = disk.size
598

    
599
      # and we recurse on this child only, not on the metadev
600
      return self._EnsureChildSizes(fchild) or mismatch
601
    else:
602
      return False
603

    
604
  def Exec(self, feedback_fn):
605
    """Verify the size of cluster disks.
606

607
    """
608
    # TODO: check child disks too
609
    # TODO: check differences in size between primary/secondary nodes
610
    per_node_disks = {}
611
    for instance in self.wanted_instances:
612
      pnode = instance.primary_node
613
      if pnode not in per_node_disks:
614
        per_node_disks[pnode] = []
615
      for idx, disk in enumerate(instance.disks):
616
        per_node_disks[pnode].append((instance, idx, disk))
617

    
618
    assert not (frozenset(per_node_disks.keys()) -
619
                self.owned_locks(locking.LEVEL_NODE_RES)), \
620
      "Not owning correct locks"
621
    assert not self.owned_locks(locking.LEVEL_NODE)
622

    
623
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
624
                                               per_node_disks.keys())
625

    
626
    changed = []
627
    for node_uuid, dskl in per_node_disks.items():
628
      if not dskl:
629
        # no disks on the node
630
        continue
631

    
632
      newl = [([v[2].Copy()], v[0]) for v in dskl]
633
      node_name = self.cfg.GetNodeName(node_uuid)
634
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
635
      if result.fail_msg:
636
        self.LogWarning("Failure in blockdev_getdimensions call to node"
637
                        " %s, ignoring", node_name)
638
        continue
639
      if len(result.payload) != len(dskl):
640
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
641
                        " result.payload=%s", node_name, len(dskl),
642
                        result.payload)
643
        self.LogWarning("Invalid result from node %s, ignoring node results",
644
                        node_name)
645
        continue
646
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
647
        if dimensions is None:
648
          self.LogWarning("Disk %d of instance %s did not return size"
649
                          " information, ignoring", idx, instance.name)
650
          continue
651
        if not isinstance(dimensions, (tuple, list)):
652
          self.LogWarning("Disk %d of instance %s did not return valid"
653
                          " dimension information, ignoring", idx,
654
                          instance.name)
655
          continue
656
        (size, spindles) = dimensions
657
        if not isinstance(size, (int, long)):
658
          self.LogWarning("Disk %d of instance %s did not return valid"
659
                          " size information, ignoring", idx, instance.name)
660
          continue
661
        size = size >> 20
662
        if size != disk.size:
663
          self.LogInfo("Disk %d of instance %s has mismatched size,"
664
                       " correcting: recorded %d, actual %d", idx,
665
                       instance.name, disk.size, size)
666
          disk.size = size
667
          self.cfg.Update(instance, feedback_fn)
668
          changed.append((instance.name, idx, "size", size))
669
        if es_flags[node_uuid]:
670
          if spindles is None:
671
            self.LogWarning("Disk %d of instance %s did not return valid"
672
                            " spindles information, ignoring", idx,
673
                            instance.name)
674
          elif disk.spindles is None or disk.spindles != spindles:
675
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
676
                         " correcting: recorded %s, actual %s",
677
                         idx, instance.name, disk.spindles, spindles)
678
            disk.spindles = spindles
679
            self.cfg.Update(instance, feedback_fn)
680
            changed.append((instance.name, idx, "spindles", disk.spindles))
681
        if self._EnsureChildSizes(disk):
682
          self.cfg.Update(instance, feedback_fn)
683
          changed.append((instance.name, idx, "size", disk.size))
684
    return changed
685

    
686

    
687
def _ValidateNetmask(cfg, netmask):
688
  """Checks if a netmask is valid.
689

690
  @type cfg: L{config.ConfigWriter}
691
  @param cfg: cluster configuration
692
  @type netmask: int
693
  @param netmask: netmask to be verified
694
  @raise errors.OpPrereqError: if the validation fails
695

696
  """
697
  ip_family = cfg.GetPrimaryIPFamily()
698
  try:
699
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
700
  except errors.ProgrammerError:
701
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
702
                               ip_family, errors.ECODE_INVAL)
703
  if not ipcls.ValidateNetmask(netmask):
704
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
705
                               (netmask), errors.ECODE_INVAL)
706

    
707

    
708
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
709
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
710
    file_disk_template):
711
  """Checks whether the given file-based storage directory is acceptable.
712

713
  Note: This function is public, because it is also used in bootstrap.py.
714

715
  @type logging_warn_fn: function
716
  @param logging_warn_fn: function which accepts a string and logs it
717
  @type file_storage_dir: string
718
  @param file_storage_dir: the directory to be used for file-based instances
719
  @type enabled_disk_templates: list of string
720
  @param enabled_disk_templates: the list of enabled disk templates
721
  @type file_disk_template: string
722
  @param file_disk_template: the file-based disk template for which the
723
      path should be checked
724

725
  """
726
  assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
727
            constants.ST_FILE, constants.ST_SHARED_FILE
728
         ))
729
  file_storage_enabled = file_disk_template in enabled_disk_templates
730
  if file_storage_dir is not None:
731
    if file_storage_dir == "":
732
      if file_storage_enabled:
733
        raise errors.OpPrereqError(
734
            "Unsetting the '%s' storage directory while having '%s' storage"
735
            " enabled is not permitted." %
736
            (file_disk_template, file_disk_template))
737
    else:
738
      if not file_storage_enabled:
739
        logging_warn_fn(
740
            "Specified a %s storage directory, although %s storage is not"
741
            " enabled." % (file_disk_template, file_disk_template))
742
  else:
743
    raise errors.ProgrammerError("Received %s storage dir with value"
744
                                 " 'None'." % file_disk_template)
745

    
746

    
747
def CheckFileStoragePathVsEnabledDiskTemplates(
748
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
749
  """Checks whether the given file storage directory is acceptable.
750

751
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
752

753
  """
754
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
755
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
756
      constants.DT_FILE)
757

    
758

    
759
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
760
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
761
  """Checks whether the given shared file storage directory is acceptable.
762

763
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
764

765
  """
766
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
767
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
768
      constants.DT_SHARED_FILE)
769

    
770

    
771
class LUClusterSetParams(LogicalUnit):
772
  """Change the parameters of the cluster.
773

774
  """
775
  HPATH = "cluster-modify"
776
  HTYPE = constants.HTYPE_CLUSTER
777
  REQ_BGL = False
778

    
779
  def CheckArguments(self):
780
    """Check parameters
781

782
    """
783
    if self.op.uid_pool:
784
      uidpool.CheckUidPool(self.op.uid_pool)
785

    
786
    if self.op.add_uids:
787
      uidpool.CheckUidPool(self.op.add_uids)
788

    
789
    if self.op.remove_uids:
790
      uidpool.CheckUidPool(self.op.remove_uids)
791

    
792
    if self.op.master_netmask is not None:
793
      _ValidateNetmask(self.cfg, self.op.master_netmask)
794

    
795
    if self.op.diskparams:
796
      for dt_params in self.op.diskparams.values():
797
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
798
      try:
799
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
800
        CheckDiskAccessModeValidity(self.op.diskparams)
801
      except errors.OpPrereqError, err:
802
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
803
                                   errors.ECODE_INVAL)
804

    
805
  def ExpandNames(self):
806
    # FIXME: in the future maybe other cluster params won't require checking on
807
    # all nodes to be modified.
808
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
809
    # resource locks the right thing, shouldn't it be the BGL instead?
810
    self.needed_locks = {
811
      locking.LEVEL_NODE: locking.ALL_SET,
812
      locking.LEVEL_INSTANCE: locking.ALL_SET,
813
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
814
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
815
    }
816
    self.share_locks = ShareAll()
817

    
818
  def BuildHooksEnv(self):
819
    """Build hooks env.
820

821
    """
822
    return {
823
      "OP_TARGET": self.cfg.GetClusterName(),
824
      "NEW_VG_NAME": self.op.vg_name,
825
      }
826

    
827
  def BuildHooksNodes(self):
828
    """Build hooks nodes.
829

830
    """
831
    mn = self.cfg.GetMasterNode()
832
    return ([mn], [mn])
833

    
834
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
835
                   new_enabled_disk_templates):
836
    """Check the consistency of the vg name on all nodes and in case it gets
837
       unset whether there are instances still using it.
838

839
    """
840
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
841
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
842
                                            new_enabled_disk_templates)
843
    current_vg_name = self.cfg.GetVGName()
844

    
845
    if self.op.vg_name == '':
846
      if lvm_is_enabled:
847
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
848
                                   " disk templates are or get enabled.")
849

    
850
    if self.op.vg_name is None:
851
      if current_vg_name is None and lvm_is_enabled:
852
        raise errors.OpPrereqError("Please specify a volume group when"
853
                                   " enabling lvm-based disk-templates.")
854

    
855
    if self.op.vg_name is not None and not self.op.vg_name:
856
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
857
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
858
                                   " instances exist", errors.ECODE_INVAL)
859

    
860
    if (self.op.vg_name is not None and lvm_is_enabled) or \
861
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
862
      self._CheckVgNameOnNodes(node_uuids)
863

    
864
  def _CheckVgNameOnNodes(self, node_uuids):
865
    """Check the status of the volume group on each node.
866

867
    """
868
    vglist = self.rpc.call_vg_list(node_uuids)
869
    for node_uuid in node_uuids:
870
      msg = vglist[node_uuid].fail_msg
871
      if msg:
872
        # ignoring down node
873
        self.LogWarning("Error while gathering data on node %s"
874
                        " (ignoring node): %s",
875
                        self.cfg.GetNodeName(node_uuid), msg)
876
        continue
877
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
878
                                            self.op.vg_name,
879
                                            constants.MIN_VG_SIZE)
880
      if vgstatus:
881
        raise errors.OpPrereqError("Error on node '%s': %s" %
882
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
883
                                   errors.ECODE_ENVIRON)
884

    
885
  @staticmethod
886
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
887
                                old_enabled_disk_templates):
888
    """Computes three sets of disk templates.
889

890
    @see: C{_GetDiskTemplateSets} for more details.
891

892
    """
893
    enabled_disk_templates = None
894
    new_enabled_disk_templates = []
895
    disabled_disk_templates = []
896
    if op_enabled_disk_templates:
897
      enabled_disk_templates = op_enabled_disk_templates
898
      new_enabled_disk_templates = \
899
        list(set(enabled_disk_templates)
900
             - set(old_enabled_disk_templates))
901
      disabled_disk_templates = \
902
        list(set(old_enabled_disk_templates)
903
             - set(enabled_disk_templates))
904
    else:
905
      enabled_disk_templates = old_enabled_disk_templates
906
    return (enabled_disk_templates, new_enabled_disk_templates,
907
            disabled_disk_templates)
908

    
909
  def _GetDiskTemplateSets(self, cluster):
910
    """Computes three sets of disk templates.
911

912
    The three sets are:
913
      - disk templates that will be enabled after this operation (no matter if
914
        they were enabled before or not)
915
      - disk templates that get enabled by this operation (thus haven't been
916
        enabled before.)
917
      - disk templates that get disabled by this operation
918

919
    """
920
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
921
                                          cluster.enabled_disk_templates)
922

    
923
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
924
    """Checks the ipolicy.
925

926
    @type cluster: C{objects.Cluster}
927
    @param cluster: the cluster's configuration
928
    @type enabled_disk_templates: list of string
929
    @param enabled_disk_templates: list of (possibly newly) enabled disk
930
      templates
931

932
    """
933
    # FIXME: write unit tests for this
934
    if self.op.ipolicy:
935
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
936
                                           group_policy=False)
937

    
938
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
939
                                  enabled_disk_templates)
940

    
941
      all_instances = self.cfg.GetAllInstancesInfo().values()
942
      violations = set()
943
      for group in self.cfg.GetAllNodeGroupsInfo().values():
944
        instances = frozenset(
945
          [inst for inst in all_instances
946
           if compat.any(nuuid in group.members
947
             for nuuid in self.cfg.GetInstanceNodes(inst))])
948
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
949
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
950
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
951
                                           self.cfg)
952
        if new:
953
          violations.update(new)
954

    
955
      if violations:
956
        self.LogWarning("After the ipolicy change the following instances"
957
                        " violate them: %s",
958
                        utils.CommaJoin(utils.NiceSort(violations)))
959
    else:
960
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
961
                                  enabled_disk_templates)
962

    
963
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
964
    """Checks whether the set DRBD helper actually exists on the nodes.
965

966
    @type drbd_helper: string
967
    @param drbd_helper: path of the drbd usermode helper binary
968
    @type node_uuids: list of strings
969
    @param node_uuids: list of node UUIDs to check for the helper
970

971
    """
972
    # checks given drbd helper on all nodes
973
    helpers = self.rpc.call_drbd_helper(node_uuids)
974
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
975
      if ninfo.offline:
976
        self.LogInfo("Not checking drbd helper on offline node %s",
977
                     ninfo.name)
978
        continue
979
      msg = helpers[ninfo.uuid].fail_msg
980
      if msg:
981
        raise errors.OpPrereqError("Error checking drbd helper on node"
982
                                   " '%s': %s" % (ninfo.name, msg),
983
                                   errors.ECODE_ENVIRON)
984
      node_helper = helpers[ninfo.uuid].payload
985
      if node_helper != drbd_helper:
986
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
987
                                   (ninfo.name, node_helper),
988
                                   errors.ECODE_ENVIRON)
989

    
990
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
991
    """Check the DRBD usermode helper.
992

993
    @type node_uuids: list of strings
994
    @param node_uuids: a list of nodes' UUIDs
995
    @type drbd_enabled: boolean
996
    @param drbd_enabled: whether DRBD will be enabled after this operation
997
      (no matter if it was disabled before or not)
998
    @type drbd_gets_enabled: boolen
999
    @param drbd_gets_enabled: true if DRBD was disabled before this
1000
      operation, but will be enabled afterwards
1001

1002
    """
1003
    if self.op.drbd_helper == '':
1004
      if drbd_enabled:
1005
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1006
                                   " DRBD is enabled.")
1007
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
1008
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1009
                                   " drbd-based instances exist",
1010
                                   errors.ECODE_INVAL)
1011

    
1012
    else:
1013
      if self.op.drbd_helper is not None and drbd_enabled:
1014
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
1015
      else:
1016
        if drbd_gets_enabled:
1017
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
1018
          if current_drbd_helper is not None:
1019
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
1020
          else:
1021
            raise errors.OpPrereqError("Cannot enable DRBD without a"
1022
                                       " DRBD usermode helper set.")
1023

    
1024
  def _CheckInstancesOfDisabledDiskTemplates(
1025
      self, disabled_disk_templates):
1026
    """Check whether we try to disable a disk template that is in use.
1027

1028
    @type disabled_disk_templates: list of string
1029
    @param disabled_disk_templates: list of disk templates that are going to
1030
      be disabled by this operation
1031

1032
    """
1033
    for disk_template in disabled_disk_templates:
1034
      if self.cfg.HasAnyDiskOfType(disk_template):
1035
        raise errors.OpPrereqError(
1036
            "Cannot disable disk template '%s', because there is at least one"
1037
            " instance using it." % disk_template)
1038

    
1039
  @staticmethod
1040
  def _CheckInstanceCommunicationNetwork(network, warning_fn):
1041
    """Check whether an existing network is configured for instance
1042
    communication.
1043

1044
    Checks whether an existing network is configured with the
1045
    parameters that are advisable for instance communication, and
1046
    otherwise issue security warnings.
1047

1048
    @type network: L{ganeti.objects.Network}
1049
    @param network: L{ganeti.objects.Network} object whose
1050
                    configuration is being checked
1051
    @type warning_fn: function
1052
    @param warning_fn: function used to print warnings
1053
    @rtype: None
1054
    @return: None
1055

1056
    """
1057
    def _MaybeWarn(err, val, default):
1058
      if val != default:
1059
        warning_fn("Supplied instance communication network '%s' %s '%s',"
1060
                   " this might pose a security risk (default is '%s').",
1061
                   network.name, err, val, default)
1062

    
1063
    if network.network is None:
1064
      raise errors.OpPrereqError("Supplied instance communication network '%s'"
1065
                                 " must have an IPv4 network address.",
1066
                                 network.name)
1067

    
1068
    _MaybeWarn("has an IPv4 gateway", network.gateway, None)
1069
    _MaybeWarn("has a non-standard IPv4 network address", network.network,
1070
               constants.INSTANCE_COMMUNICATION_NETWORK4)
1071
    _MaybeWarn("has an IPv6 gateway", network.gateway6, None)
1072
    _MaybeWarn("has a non-standard IPv6 network address", network.network6,
1073
               constants.INSTANCE_COMMUNICATION_NETWORK6)
1074
    _MaybeWarn("has a non-standard MAC prefix", network.mac_prefix,
1075
               constants.INSTANCE_COMMUNICATION_MAC_PREFIX)
1076

    
1077
  def CheckPrereq(self):
1078
    """Check prerequisites.
1079

1080
    This checks whether the given params don't conflict and
1081
    if the given volume group is valid.
1082

1083
    """
1084
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1085
    self.cluster = cluster = self.cfg.GetClusterInfo()
1086

    
1087
    vm_capable_node_uuids = [node.uuid
1088
                             for node in self.cfg.GetAllNodesInfo().values()
1089
                             if node.uuid in node_uuids and node.vm_capable]
1090

    
1091
    (enabled_disk_templates, new_enabled_disk_templates,
1092
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
1093
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
1094

    
1095
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
1096
                      new_enabled_disk_templates)
1097

    
1098
    if self.op.file_storage_dir is not None:
1099
      CheckFileStoragePathVsEnabledDiskTemplates(
1100
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
1101

    
1102
    if self.op.shared_file_storage_dir is not None:
1103
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
1104
          self.LogWarning, self.op.shared_file_storage_dir,
1105
          enabled_disk_templates)
1106

    
1107
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1108
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1109
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1110

    
1111
    # validate params changes
1112
    if self.op.beparams:
1113
      objects.UpgradeBeParams(self.op.beparams)
1114
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1115
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1116

    
1117
    if self.op.ndparams:
1118
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1119
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1120

    
1121
      # TODO: we need a more general way to handle resetting
1122
      # cluster-level parameters to default values
1123
      if self.new_ndparams["oob_program"] == "":
1124
        self.new_ndparams["oob_program"] = \
1125
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1126

    
1127
    if self.op.hv_state:
1128
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1129
                                           self.cluster.hv_state_static)
1130
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1131
                               for hv, values in new_hv_state.items())
1132

    
1133
    if self.op.disk_state:
1134
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1135
                                               self.cluster.disk_state_static)
1136
      self.new_disk_state = \
1137
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1138
                            for name, values in svalues.items()))
1139
             for storage, svalues in new_disk_state.items())
1140

    
1141
    self._CheckIpolicy(cluster, enabled_disk_templates)
1142

    
1143
    if self.op.nicparams:
1144
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1145
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1146
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1147
      nic_errors = []
1148

    
1149
      # check all instances for consistency
1150
      for instance in self.cfg.GetAllInstancesInfo().values():
1151
        for nic_idx, nic in enumerate(instance.nics):
1152
          params_copy = copy.deepcopy(nic.nicparams)
1153
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1154

    
1155
          # check parameter syntax
1156
          try:
1157
            objects.NIC.CheckParameterSyntax(params_filled)
1158
          except errors.ConfigurationError, err:
1159
            nic_errors.append("Instance %s, nic/%d: %s" %
1160
                              (instance.name, nic_idx, err))
1161

    
1162
          # if we're moving instances to routed, check that they have an ip
1163
          target_mode = params_filled[constants.NIC_MODE]
1164
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1165
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1166
                              " address" % (instance.name, nic_idx))
1167
      if nic_errors:
1168
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1169
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1170

    
1171
    # hypervisor list/parameters
1172
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1173
    if self.op.hvparams:
1174
      for hv_name, hv_dict in self.op.hvparams.items():
1175
        if hv_name not in self.new_hvparams:
1176
          self.new_hvparams[hv_name] = hv_dict
1177
        else:
1178
          self.new_hvparams[hv_name].update(hv_dict)
1179

    
1180
    # disk template parameters
1181
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1182
    if self.op.diskparams:
1183
      for dt_name, dt_params in self.op.diskparams.items():
1184
        if dt_name not in self.new_diskparams:
1185
          self.new_diskparams[dt_name] = dt_params
1186
        else:
1187
          self.new_diskparams[dt_name].update(dt_params)
1188
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1189

    
1190
    # os hypervisor parameters
1191
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1192
    if self.op.os_hvp:
1193
      for os_name, hvs in self.op.os_hvp.items():
1194
        if os_name not in self.new_os_hvp:
1195
          self.new_os_hvp[os_name] = hvs
1196
        else:
1197
          for hv_name, hv_dict in hvs.items():
1198
            if hv_dict is None:
1199
              # Delete if it exists
1200
              self.new_os_hvp[os_name].pop(hv_name, None)
1201
            elif hv_name not in self.new_os_hvp[os_name]:
1202
              self.new_os_hvp[os_name][hv_name] = hv_dict
1203
            else:
1204
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1205

    
1206
    # os parameters
1207
    self._BuildOSParams(cluster)
1208

    
1209
    # changes to the hypervisor list
1210
    if self.op.enabled_hypervisors is not None:
1211
      self.hv_list = self.op.enabled_hypervisors
1212
      for hv in self.hv_list:
1213
        # if the hypervisor doesn't already exist in the cluster
1214
        # hvparams, we initialize it to empty, and then (in both
1215
        # cases) we make sure to fill the defaults, as we might not
1216
        # have a complete defaults list if the hypervisor wasn't
1217
        # enabled before
1218
        if hv not in new_hvp:
1219
          new_hvp[hv] = {}
1220
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1221
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1222
    else:
1223
      self.hv_list = cluster.enabled_hypervisors
1224

    
1225
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1226
      # either the enabled list has changed, or the parameters have, validate
1227
      for hv_name, hv_params in self.new_hvparams.items():
1228
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1229
            (self.op.enabled_hypervisors and
1230
             hv_name in self.op.enabled_hypervisors)):
1231
          # either this is a new hypervisor, or its parameters have changed
1232
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1233
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1234
          hv_class.CheckParameterSyntax(hv_params)
1235
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1236

    
1237
    self._CheckDiskTemplateConsistency()
1238

    
1239
    if self.op.os_hvp:
1240
      # no need to check any newly-enabled hypervisors, since the
1241
      # defaults have already been checked in the above code-block
1242
      for os_name, os_hvp in self.new_os_hvp.items():
1243
        for hv_name, hv_params in os_hvp.items():
1244
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1245
          # we need to fill in the new os_hvp on top of the actual hv_p
1246
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1247
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1248
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1249
          hv_class.CheckParameterSyntax(new_osp)
1250
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1251

    
1252
    if self.op.default_iallocator:
1253
      alloc_script = utils.FindFile(self.op.default_iallocator,
1254
                                    constants.IALLOCATOR_SEARCH_PATH,
1255
                                    os.path.isfile)
1256
      if alloc_script is None:
1257
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1258
                                   " specified" % self.op.default_iallocator,
1259
                                   errors.ECODE_INVAL)
1260

    
1261
    if self.op.instance_communication_network:
1262
      network_name = self.op.instance_communication_network
1263

    
1264
      try:
1265
        network_uuid = self.cfg.LookupNetwork(network_name)
1266
      except errors.OpPrereqError:
1267
        network_uuid = None
1268

    
1269
      if network_uuid is not None:
1270
        network = self.cfg.GetNetwork(network_uuid)
1271
        self._CheckInstanceCommunicationNetwork(network, self.LogWarning)
1272

    
1273
  def _BuildOSParams(self, cluster):
1274
    "Calculate the new OS parameters for this operation."
1275

    
1276
    def _GetNewParams(source, new_params):
1277
      "Wrapper around GetUpdatedParams."
1278
      if new_params is None:
1279
        return source
1280
      result = objects.FillDict(source, {}) # deep copy of source
1281
      for os_name in new_params:
1282
        result[os_name] = GetUpdatedParams(result.get(os_name, {}),
1283
                                           new_params[os_name],
1284
                                           use_none=True)
1285
        if not result[os_name]:
1286
          del result[os_name] # we removed all parameters
1287
      return result
1288

    
1289
    self.new_osp = _GetNewParams(cluster.osparams,
1290
                                 self.op.osparams)
1291
    self.new_osp_private = _GetNewParams(cluster.osparams_private_cluster,
1292
                                         self.op.osparams_private_cluster)
1293

    
1294
    # Remove os validity check
1295
    changed_oses = (set(self.new_osp.keys()) | set(self.new_osp_private.keys()))
1296
    for os_name in changed_oses:
1297
      os_params = cluster.SimpleFillOS(
1298
        os_name,
1299
        self.new_osp.get(os_name, {}),
1300
        os_params_private=self.new_osp_private.get(os_name, {})
1301
      )
1302
      # check the parameter validity (remote check)
1303
      CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1304
                    os_name, os_params)
1305

    
1306
  def _CheckDiskTemplateConsistency(self):
1307
    """Check whether the disk templates that are going to be disabled
1308
       are still in use by some instances.
1309

1310
    """
1311
    if self.op.enabled_disk_templates:
1312
      cluster = self.cfg.GetClusterInfo()
1313
      instances = self.cfg.GetAllInstancesInfo()
1314

    
1315
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1316
        - set(self.op.enabled_disk_templates)
1317
      for instance in instances.itervalues():
1318
        if instance.disk_template in disk_templates_to_remove:
1319
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1320
                                     " because instance '%s' is using it." %
1321
                                     (instance.disk_template, instance.name))
1322

    
1323
  def _SetVgName(self, feedback_fn):
1324
    """Determines and sets the new volume group name.
1325

1326
    """
1327
    if self.op.vg_name is not None:
1328
      new_volume = self.op.vg_name
1329
      if not new_volume:
1330
        new_volume = None
1331
      if new_volume != self.cfg.GetVGName():
1332
        self.cfg.SetVGName(new_volume)
1333
      else:
1334
        feedback_fn("Cluster LVM configuration already in desired"
1335
                    " state, not changing")
1336

    
1337
  def _SetFileStorageDir(self, feedback_fn):
1338
    """Set the file storage directory.
1339

1340
    """
1341
    if self.op.file_storage_dir is not None:
1342
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1343
        feedback_fn("Global file storage dir already set to value '%s'"
1344
                    % self.cluster.file_storage_dir)
1345
      else:
1346
        self.cluster.file_storage_dir = self.op.file_storage_dir
1347

    
1348
  def _SetDrbdHelper(self, feedback_fn):
1349
    """Set the DRBD usermode helper.
1350

1351
    """
1352
    if self.op.drbd_helper is not None:
1353
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1354
        feedback_fn("Note that you specified a drbd user helper, but did not"
1355
                    " enable the drbd disk template.")
1356
      new_helper = self.op.drbd_helper
1357
      if not new_helper:
1358
        new_helper = None
1359
      if new_helper != self.cfg.GetDRBDHelper():
1360
        self.cfg.SetDRBDHelper(new_helper)
1361
      else:
1362
        feedback_fn("Cluster DRBD helper already in desired state,"
1363
                    " not changing")
1364

    
1365
  @staticmethod
1366
  def _EnsureInstanceCommunicationNetwork(cfg, network_name):
1367
    """Ensure that the instance communication network exists and is
1368
    connected to all groups.
1369

1370
    The instance communication network given by L{network_name} it is
1371
    created, if necessary, via the opcode 'OpNetworkAdd'.  Also, the
1372
    instance communication network is connected to all existing node
1373
    groups, if necessary, via the opcode 'OpNetworkConnect'.
1374

1375
    @type cfg: L{config.ConfigWriter}
1376
    @param cfg: cluster configuration
1377

1378
    @type network_name: string
1379
    @param network_name: instance communication network name
1380

1381
    @rtype: L{ganeti.cmdlib.ResultWithJobs} or L{None}
1382
    @return: L{ganeti.cmdlib.ResultWithJobs} if the instance
1383
             communication needs to be created or it needs to be
1384
             connected to a group, otherwise L{None}
1385

1386
    """
1387
    jobs = []
1388

    
1389
    try:
1390
      network_uuid = cfg.LookupNetwork(network_name)
1391
      network_exists = True
1392
    except errors.OpPrereqError:
1393
      network_exists = False
1394

    
1395
    if not network_exists:
1396
      jobs.append(AddInstanceCommunicationNetworkOp(network_name))
1397

    
1398
    for group_uuid in cfg.GetNodeGroupList():
1399
      group = cfg.GetNodeGroup(group_uuid)
1400

    
1401
      if network_exists:
1402
        network_connected = network_uuid in group.networks
1403
      else:
1404
        # The network was created asynchronously by the previous
1405
        # opcode and, therefore, we don't have access to its
1406
        # network_uuid.  As a result, we assume that the network is
1407
        # not connected to any group yet.
1408
        network_connected = False
1409

    
1410
      if not network_connected:
1411
        op = ConnectInstanceCommunicationNetworkOp(group_uuid, network_name)
1412
        jobs.append(op)
1413

    
1414
    if jobs:
1415
      return ResultWithJobs([jobs])
1416
    else:
1417
      return None
1418

    
1419
  @staticmethod
1420
  def _ModifyInstanceCommunicationNetwork(cfg, cluster, network_name,
1421
                                          feedback_fn):
1422
    """Update the instance communication network stored in the cluster
1423
    configuration.
1424

1425
    Compares the user-supplied instance communication network against
1426
    the one stored in the Ganeti cluster configuration.  If there is a
1427
    change, the instance communication network may be possibly created
1428
    and connected to all groups (see
1429
    L{LUClusterSetParams._EnsureInstanceCommunicationNetwork}).
1430

1431
    @type cfg: L{config.ConfigWriter}
1432
    @param cfg: cluster configuration
1433

1434
    @type cluster: L{ganeti.objects.Cluster}
1435
    @param cluster: Ganeti cluster
1436

1437
    @type network_name: string
1438
    @param network_name: instance communication network name
1439

1440
    @type feedback_fn: function
1441
    @param feedback_fn: see L{ganeti.cmdlist.base.LogicalUnit}
1442

1443
    @rtype: L{LUClusterSetParams._EnsureInstanceCommunicationNetwork} or L{None}
1444
    @return: see L{LUClusterSetParams._EnsureInstanceCommunicationNetwork}
1445

1446
    """
1447
    config_network_name = cfg.GetInstanceCommunicationNetwork()
1448

    
1449
    if network_name == config_network_name:
1450
      feedback_fn("Instance communication network already is '%s', nothing to"
1451
                  " do." % network_name)
1452
    else:
1453
      try:
1454
        cfg.LookupNetwork(config_network_name)
1455
        feedback_fn("Previous instance communication network '%s'"
1456
                    " should be removed manually." % config_network_name)
1457
      except errors.OpPrereqError:
1458
        pass
1459

    
1460
      if network_name:
1461
        feedback_fn("Changing instance communication network to '%s', only new"
1462
                    " instances will be affected."
1463
                    % network_name)
1464
      else:
1465
        feedback_fn("Disabling instance communication network, only new"
1466
                    " instances will be affected.")
1467

    
1468
      cluster.instance_communication_network = network_name
1469

    
1470
      if network_name:
1471
        return LUClusterSetParams._EnsureInstanceCommunicationNetwork(
1472
          cfg,
1473
          network_name)
1474
      else:
1475
        return None
1476

    
1477
  def Exec(self, feedback_fn):
1478
    """Change the parameters of the cluster.
1479

1480
    """
1481
    if self.op.enabled_disk_templates:
1482
      self.cluster.enabled_disk_templates = \
1483
        list(self.op.enabled_disk_templates)
1484

    
1485
    self._SetVgName(feedback_fn)
1486
    self._SetFileStorageDir(feedback_fn)
1487
    self._SetDrbdHelper(feedback_fn)
1488

    
1489
    if self.op.hvparams:
1490
      self.cluster.hvparams = self.new_hvparams
1491
    if self.op.os_hvp:
1492
      self.cluster.os_hvp = self.new_os_hvp
1493
    if self.op.enabled_hypervisors is not None:
1494
      self.cluster.hvparams = self.new_hvparams
1495
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1496
    if self.op.beparams:
1497
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1498
    if self.op.nicparams:
1499
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1500
    if self.op.ipolicy:
1501
      self.cluster.ipolicy = self.new_ipolicy
1502
    if self.op.osparams:
1503
      self.cluster.osparams = self.new_osp
1504
    if self.op.osparams_private_cluster:
1505
      self.cluster.osparams_private_cluster = self.new_osp_private
1506
    if self.op.ndparams:
1507
      self.cluster.ndparams = self.new_ndparams
1508
    if self.op.diskparams:
1509
      self.cluster.diskparams = self.new_diskparams
1510
    if self.op.hv_state:
1511
      self.cluster.hv_state_static = self.new_hv_state
1512
    if self.op.disk_state:
1513
      self.cluster.disk_state_static = self.new_disk_state
1514

    
1515
    if self.op.candidate_pool_size is not None:
1516
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1517
      # we need to update the pool size here, otherwise the save will fail
1518
      AdjustCandidatePool(self, [], feedback_fn)
1519

    
1520
    if self.op.max_running_jobs is not None:
1521
      self.cluster.max_running_jobs = self.op.max_running_jobs
1522

    
1523
    if self.op.maintain_node_health is not None:
1524
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1525
        feedback_fn("Note: CONFD was disabled at build time, node health"
1526
                    " maintenance is not useful (still enabling it)")
1527
      self.cluster.maintain_node_health = self.op.maintain_node_health
1528

    
1529
    if self.op.modify_etc_hosts is not None:
1530
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1531

    
1532
    if self.op.prealloc_wipe_disks is not None:
1533
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1534

    
1535
    if self.op.add_uids is not None:
1536
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1537

    
1538
    if self.op.remove_uids is not None:
1539
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1540

    
1541
    if self.op.uid_pool is not None:
1542
      self.cluster.uid_pool = self.op.uid_pool
1543

    
1544
    if self.op.default_iallocator is not None:
1545
      self.cluster.default_iallocator = self.op.default_iallocator
1546

    
1547
    if self.op.default_iallocator_params is not None:
1548
      self.cluster.default_iallocator_params = self.op.default_iallocator_params
1549

    
1550
    if self.op.reserved_lvs is not None:
1551
      self.cluster.reserved_lvs = self.op.reserved_lvs
1552

    
1553
    if self.op.use_external_mip_script is not None:
1554
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1555

    
1556
    def helper_os(aname, mods, desc):
1557
      desc += " OS list"
1558
      lst = getattr(self.cluster, aname)
1559
      for key, val in mods:
1560
        if key == constants.DDM_ADD:
1561
          if val in lst:
1562
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1563
          else:
1564
            lst.append(val)
1565
        elif key == constants.DDM_REMOVE:
1566
          if val in lst:
1567
            lst.remove(val)
1568
          else:
1569
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1570
        else:
1571
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1572

    
1573
    if self.op.hidden_os:
1574
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1575

    
1576
    if self.op.blacklisted_os:
1577
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1578

    
1579
    if self.op.master_netdev:
1580
      master_params = self.cfg.GetMasterNetworkParameters()
1581
      ems = self.cfg.GetUseExternalMipScript()
1582
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1583
                  self.cluster.master_netdev)
1584
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1585
                                                       master_params, ems)
1586
      if not self.op.force:
1587
        result.Raise("Could not disable the master ip")
1588
      else:
1589
        if result.fail_msg:
1590
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1591
                 result.fail_msg)
1592
          feedback_fn(msg)
1593
      feedback_fn("Changing master_netdev from %s to %s" %
1594
                  (master_params.netdev, self.op.master_netdev))
1595
      self.cluster.master_netdev = self.op.master_netdev
1596

    
1597
    if self.op.master_netmask:
1598
      master_params = self.cfg.GetMasterNetworkParameters()
1599
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1600
      result = self.rpc.call_node_change_master_netmask(
1601
                 master_params.uuid, master_params.netmask,
1602
                 self.op.master_netmask, master_params.ip,
1603
                 master_params.netdev)
1604
      result.Warn("Could not change the master IP netmask", feedback_fn)
1605
      self.cluster.master_netmask = self.op.master_netmask
1606

    
1607
    self.cfg.Update(self.cluster, feedback_fn)
1608

    
1609
    if self.op.master_netdev:
1610
      master_params = self.cfg.GetMasterNetworkParameters()
1611
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1612
                  self.op.master_netdev)
1613
      ems = self.cfg.GetUseExternalMipScript()
1614
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1615
                                                     master_params, ems)
1616
      result.Warn("Could not re-enable the master ip on the master,"
1617
                  " please restart manually", self.LogWarning)
1618

    
1619
    network_name = self.op.instance_communication_network
1620
    if network_name is not None:
1621
      return self._ModifyInstanceCommunicationNetwork(self.cfg, self.cluster,
1622
                                                      network_name, feedback_fn)
1623
    else:
1624
      return None
1625

    
1626

    
1627
class LUClusterVerify(NoHooksLU):
1628
  """Submits all jobs necessary to verify the cluster.
1629

1630
  """
1631
  REQ_BGL = False
1632

    
1633
  def ExpandNames(self):
1634
    self.needed_locks = {}
1635

    
1636
  def Exec(self, feedback_fn):
1637
    jobs = []
1638

    
1639
    if self.op.group_name:
1640
      groups = [self.op.group_name]
1641
      depends_fn = lambda: None
1642
    else:
1643
      groups = self.cfg.GetNodeGroupList()
1644

    
1645
      # Verify global configuration
1646
      jobs.append([
1647
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1648
        ])
1649

    
1650
      # Always depend on global verification
1651
      depends_fn = lambda: [(-len(jobs), [])]
1652

    
1653
    jobs.extend(
1654
      [opcodes.OpClusterVerifyGroup(group_name=group,
1655
                                    ignore_errors=self.op.ignore_errors,
1656
                                    depends=depends_fn())]
1657
      for group in groups)
1658

    
1659
    # Fix up all parameters
1660
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1661
      op.debug_simulate_errors = self.op.debug_simulate_errors
1662
      op.verbose = self.op.verbose
1663
      op.error_codes = self.op.error_codes
1664
      try:
1665
        op.skip_checks = self.op.skip_checks
1666
      except AttributeError:
1667
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1668

    
1669
    return ResultWithJobs(jobs)
1670

    
1671

    
1672
class _VerifyErrors(object):
1673
  """Mix-in for cluster/group verify LUs.
1674

1675
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1676
  self.op and self._feedback_fn to be available.)
1677

1678
  """
1679

    
1680
  ETYPE_FIELD = "code"
1681
  ETYPE_ERROR = constants.CV_ERROR
1682
  ETYPE_WARNING = constants.CV_WARNING
1683

    
1684
  def _Error(self, ecode, item, msg, *args, **kwargs):
1685
    """Format an error message.
1686

1687
    Based on the opcode's error_codes parameter, either format a
1688
    parseable error code, or a simpler error string.
1689

1690
    This must be called only from Exec and functions called from Exec.
1691

1692
    """
1693
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1694
    itype, etxt, _ = ecode
1695
    # If the error code is in the list of ignored errors, demote the error to a
1696
    # warning
1697
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1698
      ltype = self.ETYPE_WARNING
1699
    # first complete the msg
1700
    if args:
1701
      msg = msg % args
1702
    # then format the whole message
1703
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1704
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1705
    else:
1706
      if item:
1707
        item = " " + item
1708
      else:
1709
        item = ""
1710
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1711
    # and finally report it via the feedback_fn
1712
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1713
    # do not mark the operation as failed for WARN cases only
1714
    if ltype == self.ETYPE_ERROR:
1715
      self.bad = True
1716

    
1717
  def _ErrorIf(self, cond, *args, **kwargs):
1718
    """Log an error message if the passed condition is True.
1719

1720
    """
1721
    if (bool(cond)
1722
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1723
      self._Error(*args, **kwargs)
1724

    
1725

    
1726
def _GetAllHypervisorParameters(cluster, instances):
1727
  """Compute the set of all hypervisor parameters.
1728

1729
  @type cluster: L{objects.Cluster}
1730
  @param cluster: the cluster object
1731
  @param instances: list of L{objects.Instance}
1732
  @param instances: additional instances from which to obtain parameters
1733
  @rtype: list of (origin, hypervisor, parameters)
1734
  @return: a list with all parameters found, indicating the hypervisor they
1735
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1736

1737
  """
1738
  hvp_data = []
1739

    
1740
  for hv_name in cluster.enabled_hypervisors:
1741
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1742

    
1743
  for os_name, os_hvp in cluster.os_hvp.items():
1744
    for hv_name, hv_params in os_hvp.items():
1745
      if hv_params:
1746
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1747
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1748

    
1749
  # TODO: collapse identical parameter values in a single one
1750
  for instance in instances:
1751
    if instance.hvparams:
1752
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1753
                       cluster.FillHV(instance)))
1754

    
1755
  return hvp_data
1756

    
1757

    
1758
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1759
  """Verifies the cluster config.
1760

1761
  """
1762
  REQ_BGL = False
1763

    
1764
  def _VerifyHVP(self, hvp_data):
1765
    """Verifies locally the syntax of the hypervisor parameters.
1766

1767
    """
1768
    for item, hv_name, hv_params in hvp_data:
1769
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1770
             (item, hv_name))
1771
      try:
1772
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1773
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1774
        hv_class.CheckParameterSyntax(hv_params)
1775
      except errors.GenericError, err:
1776
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1777

    
1778
  def ExpandNames(self):
1779
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1780
    self.share_locks = ShareAll()
1781

    
1782
  def CheckPrereq(self):
1783
    """Check prerequisites.
1784

1785
    """
1786
    # Retrieve all information
1787
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1788
    self.all_node_info = self.cfg.GetAllNodesInfo()
1789
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1790

    
1791
  def Exec(self, feedback_fn):
1792
    """Verify integrity of cluster, performing various test on nodes.
1793

1794
    """
1795
    self.bad = False
1796
    self._feedback_fn = feedback_fn
1797

    
1798
    feedback_fn("* Verifying cluster config")
1799

    
1800
    for msg in self.cfg.VerifyConfig():
1801
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1802

    
1803
    feedback_fn("* Verifying cluster certificate files")
1804

    
1805
    for cert_filename in pathutils.ALL_CERT_FILES:
1806
      (errcode, msg) = utils.VerifyCertificate(cert_filename)
1807
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1808

    
1809
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1810
                                    pathutils.NODED_CERT_FILE),
1811
                  constants.CV_ECLUSTERCERT,
1812
                  None,
1813
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1814
                    constants.LUXID_USER + " user")
1815

    
1816
    feedback_fn("* Verifying hypervisor parameters")
1817

    
1818
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1819
                                                self.all_inst_info.values()))
1820

    
1821
    feedback_fn("* Verifying all nodes belong to an existing group")
1822

    
1823
    # We do this verification here because, should this bogus circumstance
1824
    # occur, it would never be caught by VerifyGroup, which only acts on
1825
    # nodes/instances reachable from existing node groups.
1826

    
1827
    dangling_nodes = set(node for node in self.all_node_info.values()
1828
                         if node.group not in self.all_group_info)
1829

    
1830
    dangling_instances = {}
1831
    no_node_instances = []
1832

    
1833
    for inst in self.all_inst_info.values():
1834
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1835
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1836
      elif inst.primary_node not in self.all_node_info:
1837
        no_node_instances.append(inst)
1838

    
1839
    pretty_dangling = [
1840
        "%s (%s)" %
1841
        (node.name,
1842
         utils.CommaJoin(inst.name for
1843
                         inst in dangling_instances.get(node.uuid, [])))
1844
        for node in dangling_nodes]
1845

    
1846
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1847
                  None,
1848
                  "the following nodes (and their instances) belong to a non"
1849
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1850

    
1851
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1852
                  None,
1853
                  "the following instances have a non-existing primary-node:"
1854
                  " %s", utils.CommaJoin(inst.name for
1855
                                         inst in no_node_instances))
1856

    
1857
    return not self.bad
1858

    
1859

    
1860
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1861
  """Verifies the status of a node group.
1862

1863
  """
1864
  HPATH = "cluster-verify"
1865
  HTYPE = constants.HTYPE_CLUSTER
1866
  REQ_BGL = False
1867

    
1868
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1869

    
1870
  class NodeImage(object):
1871
    """A class representing the logical and physical status of a node.
1872

1873
    @type uuid: string
1874
    @ivar uuid: the node UUID to which this object refers
1875
    @ivar volumes: a structure as returned from
1876
        L{ganeti.backend.GetVolumeList} (runtime)
1877
    @ivar instances: a list of running instances (runtime)
1878
    @ivar pinst: list of configured primary instances (config)
1879
    @ivar sinst: list of configured secondary instances (config)
1880
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1881
        instances for which this node is secondary (config)
1882
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1883
    @ivar dfree: free disk, as reported by the node (runtime)
1884
    @ivar offline: the offline status (config)
1885
    @type rpc_fail: boolean
1886
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1887
        not whether the individual keys were correct) (runtime)
1888
    @type lvm_fail: boolean
1889
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1890
    @type hyp_fail: boolean
1891
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1892
    @type ghost: boolean
1893
    @ivar ghost: whether this is a known node or not (config)
1894
    @type os_fail: boolean
1895
    @ivar os_fail: whether the RPC call didn't return valid OS data
1896
    @type oslist: list
1897
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1898
    @type vm_capable: boolean
1899
    @ivar vm_capable: whether the node can host instances
1900
    @type pv_min: float
1901
    @ivar pv_min: size in MiB of the smallest PVs
1902
    @type pv_max: float
1903
    @ivar pv_max: size in MiB of the biggest PVs
1904

1905
    """
1906
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1907
      self.uuid = uuid
1908
      self.volumes = {}
1909
      self.instances = []
1910
      self.pinst = []
1911
      self.sinst = []
1912
      self.sbp = {}
1913
      self.mfree = 0
1914
      self.dfree = 0
1915
      self.offline = offline
1916
      self.vm_capable = vm_capable
1917
      self.rpc_fail = False
1918
      self.lvm_fail = False
1919
      self.hyp_fail = False
1920
      self.ghost = False
1921
      self.os_fail = False
1922
      self.oslist = {}
1923
      self.pv_min = None
1924
      self.pv_max = None
1925

    
1926
  def ExpandNames(self):
1927
    # This raises errors.OpPrereqError on its own:
1928
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1929

    
1930
    # Get instances in node group; this is unsafe and needs verification later
1931
    inst_uuids = \
1932
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1933

    
1934
    self.needed_locks = {
1935
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1936
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1937
      locking.LEVEL_NODE: [],
1938

    
1939
      # This opcode is run by watcher every five minutes and acquires all nodes
1940
      # for a group. It doesn't run for a long time, so it's better to acquire
1941
      # the node allocation lock as well.
1942
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1943
      }
1944

    
1945
    self.share_locks = ShareAll()
1946

    
1947
  def DeclareLocks(self, level):
1948
    if level == locking.LEVEL_NODE:
1949
      # Get members of node group; this is unsafe and needs verification later
1950
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1951

    
1952
      # In Exec(), we warn about mirrored instances that have primary and
1953
      # secondary living in separate node groups. To fully verify that
1954
      # volumes for these instances are healthy, we will need to do an
1955
      # extra call to their secondaries. We ensure here those nodes will
1956
      # be locked.
1957
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1958
        # Important: access only the instances whose lock is owned
1959
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1960
        if instance.disk_template in constants.DTS_INT_MIRROR:
1961
          nodes.update(self.cfg.GetInstanceSecondaryNodes(instance))
1962

    
1963
      self.needed_locks[locking.LEVEL_NODE] = nodes
1964

    
1965
  def CheckPrereq(self):
1966
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1967
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1968

    
1969
    group_node_uuids = set(self.group_info.members)
1970
    group_inst_uuids = \
1971
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1972

    
1973
    unlocked_node_uuids = \
1974
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1975

    
1976
    unlocked_inst_uuids = \
1977
        group_inst_uuids.difference(
1978
          [self.cfg.GetInstanceInfoByName(name).uuid
1979
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1980

    
1981
    if unlocked_node_uuids:
1982
      raise errors.OpPrereqError(
1983
        "Missing lock for nodes: %s" %
1984
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1985
        errors.ECODE_STATE)
1986

    
1987
    if unlocked_inst_uuids:
1988
      raise errors.OpPrereqError(
1989
        "Missing lock for instances: %s" %
1990
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1991
        errors.ECODE_STATE)
1992

    
1993
    self.all_node_info = self.cfg.GetAllNodesInfo()
1994
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1995

    
1996
    self.my_node_uuids = group_node_uuids
1997
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1998
                             for node_uuid in group_node_uuids)
1999

    
2000
    self.my_inst_uuids = group_inst_uuids
2001
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
2002
                             for inst_uuid in group_inst_uuids)
2003

    
2004
    # We detect here the nodes that will need the extra RPC calls for verifying
2005
    # split LV volumes; they should be locked.
2006
    extra_lv_nodes = set()
2007

    
2008
    for inst in self.my_inst_info.values():
2009
      if inst.disk_template in constants.DTS_INT_MIRROR:
2010
        inst_nodes = self.cfg.GetInstanceNodes(inst)
2011
        for nuuid in inst_nodes:
2012
          if self.all_node_info[nuuid].group != self.group_uuid:
2013
            extra_lv_nodes.add(nuuid)
2014

    
2015
    unlocked_lv_nodes = \
2016
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
2017

    
2018
    if unlocked_lv_nodes:
2019
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
2020
                                 utils.CommaJoin(unlocked_lv_nodes),
2021
                                 errors.ECODE_STATE)
2022
    self.extra_lv_nodes = list(extra_lv_nodes)
2023

    
2024
  def _VerifyNode(self, ninfo, nresult):
2025
    """Perform some basic validation on data returned from a node.
2026

2027
      - check the result data structure is well formed and has all the
2028
        mandatory fields
2029
      - check ganeti version
2030

2031
    @type ninfo: L{objects.Node}
2032
    @param ninfo: the node to check
2033
    @param nresult: the results from the node
2034
    @rtype: boolean
2035
    @return: whether overall this call was successful (and we can expect
2036
         reasonable values in the respose)
2037

2038
    """
2039
    # main result, nresult should be a non-empty dict
2040
    test = not nresult or not isinstance(nresult, dict)
2041
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
2042
                  "unable to verify node: no data returned")
2043
    if test:
2044
      return False
2045

    
2046
    # compares ganeti version
2047
    local_version = constants.PROTOCOL_VERSION
2048
    remote_version = nresult.get("version", None)
2049
    test = not (remote_version and
2050
                isinstance(remote_version, (list, tuple)) and
2051
                len(remote_version) == 2)
2052
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
2053
                  "connection to node returned invalid data")
2054
    if test:
2055
      return False
2056

    
2057
    test = local_version != remote_version[0]
2058
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
2059
                  "incompatible protocol versions: master %s,"
2060
                  " node %s", local_version, remote_version[0])
2061
    if test:
2062
      return False
2063

    
2064
    # node seems compatible, we can actually try to look into its results
2065

    
2066
    # full package version
2067
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
2068
                  constants.CV_ENODEVERSION, ninfo.name,
2069
                  "software version mismatch: master %s, node %s",
2070
                  constants.RELEASE_VERSION, remote_version[1],
2071
                  code=self.ETYPE_WARNING)
2072

    
2073
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
2074
    if ninfo.vm_capable and isinstance(hyp_result, dict):
2075
      for hv_name, hv_result in hyp_result.iteritems():
2076
        test = hv_result is not None
2077
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2078
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
2079

    
2080
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
2081
    if ninfo.vm_capable and isinstance(hvp_result, list):
2082
      for item, hv_name, hv_result in hvp_result:
2083
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
2084
                      "hypervisor %s parameter verify failure (source %s): %s",
2085
                      hv_name, item, hv_result)
2086

    
2087
    test = nresult.get(constants.NV_NODESETUP,
2088
                       ["Missing NODESETUP results"])
2089
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
2090
                  "node setup error: %s", "; ".join(test))
2091

    
2092
    return True
2093

    
2094
  def _VerifyNodeTime(self, ninfo, nresult,
2095
                      nvinfo_starttime, nvinfo_endtime):
2096
    """Check the node time.
2097

2098
    @type ninfo: L{objects.Node}
2099
    @param ninfo: the node to check
2100
    @param nresult: the remote results for the node
2101
    @param nvinfo_starttime: the start time of the RPC call
2102
    @param nvinfo_endtime: the end time of the RPC call
2103

2104
    """
2105
    ntime = nresult.get(constants.NV_TIME, None)
2106
    try:
2107
      ntime_merged = utils.MergeTime(ntime)
2108
    except (ValueError, TypeError):
2109
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
2110
                    "Node returned invalid time")
2111
      return
2112

    
2113
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
2114
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
2115
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
2116
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
2117
    else:
2118
      ntime_diff = None
2119

    
2120
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
2121
                  "Node time diverges by at least %s from master node time",
2122
                  ntime_diff)
2123

    
2124
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
2125
    """Check the node LVM results and update info for cross-node checks.
2126

2127
    @type ninfo: L{objects.Node}
2128
    @param ninfo: the node to check
2129
    @param nresult: the remote results for the node
2130
    @param vg_name: the configured VG name
2131
    @type nimg: L{NodeImage}
2132
    @param nimg: node image
2133

2134
    """
2135
    if vg_name is None:
2136
      return
2137

    
2138
    # checks vg existence and size > 20G
2139
    vglist = nresult.get(constants.NV_VGLIST, None)
2140
    test = not vglist
2141
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2142
                  "unable to check volume groups")
2143
    if not test:
2144
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
2145
                                            constants.MIN_VG_SIZE)
2146
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
2147

    
2148
    # Check PVs
2149
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
2150
    for em in errmsgs:
2151
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
2152
    if pvminmax is not None:
2153
      (nimg.pv_min, nimg.pv_max) = pvminmax
2154

    
2155
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
2156
    """Check cross-node DRBD version consistency.
2157

2158
    @type node_verify_infos: dict
2159
    @param node_verify_infos: infos about nodes as returned from the
2160
      node_verify call.
2161

2162
    """
2163
    node_versions = {}
2164
    for node_uuid, ndata in node_verify_infos.items():
2165
      nresult = ndata.payload
2166
      if nresult:
2167
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
2168
        node_versions[node_uuid] = version
2169

    
2170
    if len(set(node_versions.values())) > 1:
2171
      for node_uuid, version in sorted(node_versions.items()):
2172
        msg = "DRBD version mismatch: %s" % version
2173
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
2174
                    code=self.ETYPE_WARNING)
2175

    
2176
  def _VerifyGroupLVM(self, node_image, vg_name):
2177
    """Check cross-node consistency in LVM.
2178

2179
    @type node_image: dict
2180
    @param node_image: info about nodes, mapping from node to names to
2181
      L{NodeImage} objects
2182
    @param vg_name: the configured VG name
2183

2184
    """
2185
    if vg_name is None:
2186
      return
2187

    
2188
    # Only exclusive storage needs this kind of checks
2189
    if not self._exclusive_storage:
2190
      return
2191

    
2192
    # exclusive_storage wants all PVs to have the same size (approximately),
2193
    # if the smallest and the biggest ones are okay, everything is fine.
2194
    # pv_min is None iff pv_max is None
2195
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
2196
    if not vals:
2197
      return
2198
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
2199
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
2200
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
2201
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
2202
                  "PV sizes differ too much in the group; smallest (%s MB) is"
2203
                  " on %s, biggest (%s MB) is on %s",
2204
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
2205
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
2206

    
2207
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
2208
    """Check the node bridges.
2209

2210
    @type ninfo: L{objects.Node}
2211
    @param ninfo: the node to check
2212
    @param nresult: the remote results for the node
2213
    @param bridges: the expected list of bridges
2214

2215
    """
2216
    if not bridges:
2217
      return
2218

    
2219
    missing = nresult.get(constants.NV_BRIDGES, None)
2220
    test = not isinstance(missing, list)
2221
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2222
                  "did not return valid bridge information")
2223
    if not test:
2224
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
2225
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
2226

    
2227
  def _VerifyNodeUserScripts(self, ninfo, nresult):
2228
    """Check the results of user scripts presence and executability on the node
2229

2230
    @type ninfo: L{objects.Node}
2231
    @param ninfo: the node to check
2232
    @param nresult: the remote results for the node
2233

2234
    """
2235
    test = not constants.NV_USERSCRIPTS in nresult
2236
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2237
                  "did not return user scripts information")
2238

    
2239
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2240
    if not test:
2241
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2242
                    "user scripts not present or not executable: %s" %
2243
                    utils.CommaJoin(sorted(broken_scripts)))
2244

    
2245
  def _VerifyNodeNetwork(self, ninfo, nresult):
2246
    """Check the node network connectivity results.
2247

2248
    @type ninfo: L{objects.Node}
2249
    @param ninfo: the node to check
2250
    @param nresult: the remote results for the node
2251

2252
    """
2253
    test = constants.NV_NODELIST not in nresult
2254
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2255
                  "node hasn't returned node ssh connectivity data")
2256
    if not test:
2257
      if nresult[constants.NV_NODELIST]:
2258
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2259
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2260
                        "ssh communication with node '%s': %s", a_node, a_msg)
2261

    
2262
    test = constants.NV_NODENETTEST not in nresult
2263
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2264
                  "node hasn't returned node tcp connectivity data")
2265
    if not test:
2266
      if nresult[constants.NV_NODENETTEST]:
2267
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2268
        for anode in nlist:
2269
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2270
                        "tcp communication with node '%s': %s",
2271
                        anode, nresult[constants.NV_NODENETTEST][anode])
2272

    
2273
    test = constants.NV_MASTERIP not in nresult
2274
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2275
                  "node hasn't returned node master IP reachability data")
2276
    if not test:
2277
      if not nresult[constants.NV_MASTERIP]:
2278
        if ninfo.uuid == self.master_node:
2279
          msg = "the master node cannot reach the master IP (not configured?)"
2280
        else:
2281
          msg = "cannot reach the master IP"
2282
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2283

    
2284
  def _VerifyInstance(self, instance, node_image, diskstatus):
2285
    """Verify an instance.
2286

2287
    This function checks to see if the required block devices are
2288
    available on the instance's node, and that the nodes are in the correct
2289
    state.
2290

2291
    """
2292
    pnode_uuid = instance.primary_node
2293
    pnode_img = node_image[pnode_uuid]
2294
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2295

    
2296
    node_vol_should = {}
2297
    self.cfg.GetInstanceLVsByNode(instance, lvmap=node_vol_should)
2298

    
2299
    cluster = self.cfg.GetClusterInfo()
2300
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2301
                                                            self.group_info)
2302
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2303
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2304
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2305

    
2306
    for node_uuid in node_vol_should:
2307
      n_img = node_image[node_uuid]
2308
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2309
        # ignore missing volumes on offline or broken nodes
2310
        continue
2311
      for volume in node_vol_should[node_uuid]:
2312
        test = volume not in n_img.volumes
2313
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2314
                      "volume %s missing on node %s", volume,
2315
                      self.cfg.GetNodeName(node_uuid))
2316

    
2317
    if instance.admin_state == constants.ADMINST_UP:
2318
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2319
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2320
                    "instance not running on its primary node %s",
2321
                     self.cfg.GetNodeName(pnode_uuid))
2322
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2323
                    instance.name, "instance is marked as running and lives on"
2324
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2325

    
2326
    diskdata = [(nname, success, status, idx)
2327
                for (nname, disks) in diskstatus.items()
2328
                for idx, (success, status) in enumerate(disks)]
2329

    
2330
    for nname, success, bdev_status, idx in diskdata:
2331
      # the 'ghost node' construction in Exec() ensures that we have a
2332
      # node here
2333
      snode = node_image[nname]
2334
      bad_snode = snode.ghost or snode.offline
2335
      self._ErrorIf(instance.disks_active and
2336
                    not success and not bad_snode,
2337
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2338
                    "couldn't retrieve status for disk/%s on %s: %s",
2339
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2340

    
2341
      if instance.disks_active and success and \
2342
         (bdev_status.is_degraded or
2343
          bdev_status.ldisk_status != constants.LDS_OKAY):
2344
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2345
        if bdev_status.is_degraded:
2346
          msg += " is degraded"
2347
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2348
          msg += "; state is '%s'" % \
2349
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2350

    
2351
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2352

    
2353
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2354
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2355
                  "instance %s, connection to primary node failed",
2356
                  instance.name)
2357

    
2358
    secondary_nodes = self.cfg.GetInstanceSecondaryNodes(instance)
2359
    self._ErrorIf(len(secondary_nodes) > 1,
2360
                  constants.CV_EINSTANCELAYOUT, instance.name,
2361
                  "instance has multiple secondary nodes: %s",
2362
                  utils.CommaJoin(secondary_nodes),
2363
                  code=self.ETYPE_WARNING)
2364

    
2365
    inst_nodes = self.cfg.GetInstanceNodes(instance)
2366
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, inst_nodes)
2367
    if any(es_flags.values()):
2368
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2369
        # Disk template not compatible with exclusive_storage: no instance
2370
        # node should have the flag set
2371
        es_nodes = [n
2372
                    for (n, es) in es_flags.items()
2373
                    if es]
2374
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2375
                    "instance has template %s, which is not supported on nodes"
2376
                    " that have exclusive storage set: %s",
2377
                    instance.disk_template,
2378
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2379
      for (idx, disk) in enumerate(instance.disks):
2380
        self._ErrorIf(disk.spindles is None,
2381
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2382
                      "number of spindles not configured for disk %s while"
2383
                      " exclusive storage is enabled, try running"
2384
                      " gnt-cluster repair-disk-sizes", idx)
2385

    
2386
    if instance.disk_template in constants.DTS_INT_MIRROR:
2387
      instance_nodes = utils.NiceSort(inst_nodes)
2388
      instance_groups = {}
2389

    
2390
      for node_uuid in instance_nodes:
2391
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2392
                                   []).append(node_uuid)
2393

    
2394
      pretty_list = [
2395
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2396
                           groupinfo[group].name)
2397
        # Sort so that we always list the primary node first.
2398
        for group, nodes in sorted(instance_groups.items(),
2399
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2400
                                   reverse=True)]
2401

    
2402
      self._ErrorIf(len(instance_groups) > 1,
2403
                    constants.CV_EINSTANCESPLITGROUPS,
2404
                    instance.name, "instance has primary and secondary nodes in"
2405
                    " different groups: %s", utils.CommaJoin(pretty_list),
2406
                    code=self.ETYPE_WARNING)
2407

    
2408
    inst_nodes_offline = []
2409
    for snode in secondary_nodes:
2410
      s_img = node_image[snode]
2411
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2412
                    self.cfg.GetNodeName(snode),
2413
                    "instance %s, connection to secondary node failed",
2414
                    instance.name)
2415

    
2416
      if s_img.offline:
2417
        inst_nodes_offline.append(snode)
2418

    
2419
    # warn that the instance lives on offline nodes
2420
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2421
                  instance.name, "instance has offline secondary node(s) %s",
2422
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2423
    # ... or ghost/non-vm_capable nodes
2424
    for node_uuid in inst_nodes:
2425
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2426
                    instance.name, "instance lives on ghost node %s",
2427
                    self.cfg.GetNodeName(node_uuid))
2428
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2429
                    constants.CV_EINSTANCEBADNODE, instance.name,
2430
                    "instance lives on non-vm_capable node %s",
2431
                    self.cfg.GetNodeName(node_uuid))
2432

    
2433
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2434
    """Verify if there are any unknown volumes in the cluster.
2435

2436
    The .os, .swap and backup volumes are ignored. All other volumes are
2437
    reported as unknown.
2438

2439
    @type reserved: L{ganeti.utils.FieldSet}
2440
    @param reserved: a FieldSet of reserved volume names
2441

2442
    """
2443
    for node_uuid, n_img in node_image.items():
2444
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2445
          self.all_node_info[node_uuid].group != self.group_uuid):
2446
        # skip non-healthy nodes
2447
        continue
2448
      for volume in n_img.volumes:
2449
        test = ((node_uuid not in node_vol_should or
2450
                volume not in node_vol_should[node_uuid]) and
2451
                not reserved.Matches(volume))
2452
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2453
                      self.cfg.GetNodeName(node_uuid),
2454
                      "volume %s is unknown", volume,
2455
                      code=_VerifyErrors.ETYPE_WARNING)
2456

    
2457
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2458
    """Verify N+1 Memory Resilience.
2459

2460
    Check that if one single node dies we can still start all the
2461
    instances it was primary for.
2462

2463
    """
2464
    cluster_info = self.cfg.GetClusterInfo()
2465
    for node_uuid, n_img in node_image.items():
2466
      # This code checks that every node which is now listed as
2467
      # secondary has enough memory to host all instances it is
2468
      # supposed to should a single other node in the cluster fail.
2469
      # FIXME: not ready for failover to an arbitrary node
2470
      # FIXME: does not support file-backed instances
2471
      # WARNING: we currently take into account down instances as well
2472
      # as up ones, considering that even if they're down someone
2473
      # might want to start them even in the event of a node failure.
2474
      if n_img.offline or \
2475
         self.all_node_info[node_uuid].group != self.group_uuid:
2476
        # we're skipping nodes marked offline and nodes in other groups from
2477
        # the N+1 warning, since most likely we don't have good memory
2478
        # information from them; we already list instances living on such
2479
        # nodes, and that's enough warning
2480
        continue
2481
      #TODO(dynmem): also consider ballooning out other instances
2482
      for prinode, inst_uuids in n_img.sbp.items():
2483
        needed_mem = 0
2484
        for inst_uuid in inst_uuids:
2485
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2486
          if bep[constants.BE_AUTO_BALANCE]:
2487
            needed_mem += bep[constants.BE_MINMEM]
2488
        test = n_img.mfree < needed_mem
2489
        self._ErrorIf(test, constants.CV_ENODEN1,
2490
                      self.cfg.GetNodeName(node_uuid),
2491
                      "not enough memory to accomodate instance failovers"
2492
                      " should node %s fail (%dMiB needed, %dMiB available)",
2493
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2494

    
2495
  def _VerifyClientCertificates(self, nodes, all_nvinfo):
2496
    """Verifies the consistency of the client certificates.
2497

2498
    This includes several aspects:
2499
      - the individual validation of all nodes' certificates
2500
      - the consistency of the master candidate certificate map
2501
      - the consistency of the master candidate certificate map with the
2502
        certificates that the master candidates are actually using.
2503

2504
    @param nodes: the list of nodes to consider in this verification
2505
    @param all_nvinfo: the map of results of the verify_node call to
2506
      all nodes
2507

2508
    """
2509
    candidate_certs = self.cfg.GetClusterInfo().candidate_certs
2510
    if candidate_certs is None or len(candidate_certs) == 0:
2511
      self._ErrorIf(
2512
        True, constants.CV_ECLUSTERCLIENTCERT, None,
2513
        "The cluster's list of master candidate certificates is empty."
2514
        "If you just updated the cluster, please run"
2515
        " 'gnt-cluster renew-crypto --new-node-certificates'.")
2516
      return
2517

    
2518
    self._ErrorIf(
2519
      len(candidate_certs) != len(set(candidate_certs.values())),
2520
      constants.CV_ECLUSTERCLIENTCERT, None,
2521
      "There are at least two master candidates configured to use the same"
2522
      " certificate.")
2523

    
2524
    # collect the client certificate
2525
    for node in nodes:
2526
      if node.offline:
2527
        continue
2528

    
2529
      nresult = all_nvinfo[node.uuid]
2530
      if nresult.fail_msg or not nresult.payload:
2531
        continue
2532

    
2533
      (errcode, msg) = nresult.payload.get(constants.NV_CLIENT_CERT, None)
2534

    
2535
      self._ErrorIf(
2536
        errcode is not None, constants.CV_ECLUSTERCLIENTCERT, None,
2537
        "Client certificate of node '%s' failed validation: %s (code '%s')",
2538
        node.uuid, msg, errcode)
2539

    
2540
      if not errcode:
2541
        digest = msg
2542
        if node.master_candidate:
2543
          if node.uuid in candidate_certs:
2544
            self._ErrorIf(
2545
              digest != candidate_certs[node.uuid],
2546
              constants.CV_ECLUSTERCLIENTCERT, None,
2547
              "Client certificate digest of master candidate '%s' does not"
2548
              " match its entry in the cluster's map of master candidate"
2549
              " certificates. Expected: %s Got: %s", node.uuid,
2550
              digest, candidate_certs[node.uuid])
2551
          else:
2552
            self._ErrorIf(
2553
              True, constants.CV_ECLUSTERCLIENTCERT, None,
2554
              "The master candidate '%s' does not have an entry in the"
2555
              " map of candidate certificates.", node.uuid)
2556
            self._ErrorIf(
2557
              digest in candidate_certs.values(),
2558
              constants.CV_ECLUSTERCLIENTCERT, None,
2559
              "Master candidate '%s' is using a certificate of another node.",
2560
              node.uuid)
2561
        else:
2562
          self._ErrorIf(
2563
            node.uuid in candidate_certs,
2564
            constants.CV_ECLUSTERCLIENTCERT, None,
2565
            "Node '%s' is not a master candidate, but still listed in the"
2566
            " map of master candidate certificates.", node.uuid)
2567
          self._ErrorIf(
2568
            (node.uuid not in candidate_certs) and
2569
              (digest in candidate_certs.values()),
2570
            constants.CV_ECLUSTERCLIENTCERT, None,
2571
            "Node '%s' is not a master candidate and is incorrectly using a"
2572
            " certificate of another node which is master candidate.",
2573
            node.uuid)
2574

    
2575
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2576
                   (files_all, files_opt, files_mc, files_vm)):
2577
    """Verifies file checksums collected from all nodes.
2578

2579
    @param nodes: List of L{objects.Node} objects
2580
    @param master_node_uuid: UUID of master node
2581
    @param all_nvinfo: RPC results
2582

2583
    """
2584
    # Define functions determining which nodes to consider for a file
2585
    files2nodefn = [
2586
      (files_all, None),
2587
      (files_mc, lambda node: (node.master_candidate or
2588
                               node.uuid == master_node_uuid)),
2589
      (files_vm, lambda node: node.vm_capable),
2590
      ]
2591

    
2592
    # Build mapping from filename to list of nodes which should have the file
2593
    nodefiles = {}
2594
    for (files, fn) in files2nodefn:
2595
      if fn is None:
2596
        filenodes = nodes
2597
      else:
2598
        filenodes = filter(fn, nodes)
2599
      nodefiles.update((filename,
2600
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2601
                       for filename in files)
2602

    
2603
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2604

    
2605
    fileinfo = dict((filename, {}) for filename in nodefiles)
2606
    ignore_nodes = set()
2607

    
2608
    for node in nodes:
2609
      if node.offline:
2610
        ignore_nodes.add(node.uuid)
2611
        continue
2612

    
2613
      nresult = all_nvinfo[node.uuid]
2614

    
2615
      if nresult.fail_msg or not nresult.payload:
2616
        node_files = None
2617
      else:
2618
        fingerprints = nresult.payload.get(constants.NV_FILELIST, {})
2619
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2620
                          for (key, value) in fingerprints.items())
2621
        del fingerprints
2622

    
2623
      test = not (node_files and isinstance(node_files, dict))
2624
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2625
                    "Node did not return file checksum data")
2626
      if test:
2627
        ignore_nodes.add(node.uuid)
2628
        continue
2629

    
2630
      # Build per-checksum mapping from filename to nodes having it
2631
      for (filename, checksum) in node_files.items():
2632
        assert filename in nodefiles
2633
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2634

    
2635
    for (filename, checksums) in fileinfo.items():
2636
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2637

    
2638
      # Nodes having the file
2639
      with_file = frozenset(node_uuid
2640
                            for node_uuids in fileinfo[filename].values()
2641
                            for node_uuid in node_uuids) - ignore_nodes
2642

    
2643
      expected_nodes = nodefiles[filename] - ignore_nodes
2644

    
2645
      # Nodes missing file
2646
      missing_file = expected_nodes - with_file
2647

    
2648
      if filename in files_opt:
2649
        # All or no nodes
2650
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2651
                      constants.CV_ECLUSTERFILECHECK, None,
2652
                      "File %s is optional, but it must exist on all or no"
2653
                      " nodes (not found on %s)",
2654
                      filename,
2655
                      utils.CommaJoin(
2656
                        utils.NiceSort(
2657
                          map(self.cfg.GetNodeName, missing_file))))
2658
      else:
2659
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2660
                      "File %s is missing from node(s) %s", filename,
2661
                      utils.CommaJoin(
2662
                        utils.NiceSort(
2663
                          map(self.cfg.GetNodeName, missing_file))))
2664

    
2665
        # Warn if a node has a file it shouldn't
2666
        unexpected = with_file - expected_nodes
2667
        self._ErrorIf(unexpected,
2668
                      constants.CV_ECLUSTERFILECHECK, None,
2669
                      "File %s should not exist on node(s) %s",
2670
                      filename, utils.CommaJoin(
2671
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2672

    
2673
      # See if there are multiple versions of the file
2674
      test = len(checksums) > 1
2675
      if test:
2676
        variants = ["variant %s on %s" %
2677
                    (idx + 1,
2678
                     utils.CommaJoin(utils.NiceSort(
2679
                       map(self.cfg.GetNodeName, node_uuids))))
2680
                    for (idx, (checksum, node_uuids)) in
2681
                      enumerate(sorted(checksums.items()))]
2682
      else:
2683
        variants = []
2684

    
2685
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2686
                    "File %s found with %s different checksums (%s)",
2687
                    filename, len(checksums), "; ".join(variants))
2688

    
2689
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2690
    """Verify the drbd helper.
2691

2692
    """
2693
    if drbd_helper:
2694
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2695
      test = (helper_result is None)
2696
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2697
                    "no drbd usermode helper returned")
2698
      if helper_result:
2699
        status, payload = helper_result
2700
        test = not status
2701
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2702
                      "drbd usermode helper check unsuccessful: %s", payload)
2703
        test = status and (payload != drbd_helper)
2704
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2705
                      "wrong drbd usermode helper: %s", payload)
2706

    
2707
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2708
                      drbd_map):
2709
    """Verifies and the node DRBD status.
2710

2711
    @type ninfo: L{objects.Node}
2712
    @param ninfo: the node to check
2713
    @param nresult: the remote results for the node
2714
    @param instanceinfo: the dict of instances
2715
    @param drbd_helper: the configured DRBD usermode helper
2716
    @param drbd_map: the DRBD map as returned by
2717
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2718

2719
    """
2720
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2721

    
2722
    # compute the DRBD minors
2723
    node_drbd = {}
2724
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2725
      test = inst_uuid not in instanceinfo
2726
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2727
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2728
        # ghost instance should not be running, but otherwise we
2729
        # don't give double warnings (both ghost instance and
2730
        # unallocated minor in use)
2731
      if test:
2732
        node_drbd[minor] = (inst_uuid, False)
2733
      else:
2734
        instance = instanceinfo[inst_uuid]
2735
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2736

    
2737
    # and now check them
2738
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2739
    test = not isinstance(used_minors, (tuple, list))
2740
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2741
                  "cannot parse drbd status file: %s", str(used_minors))
2742
    if test:
2743
      # we cannot check drbd status
2744
      return
2745

    
2746
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2747
      test = minor not in used_minors and must_exist
2748
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2749
                    "drbd minor %d of instance %s is not active", minor,
2750
                    self.cfg.GetInstanceName(inst_uuid))
2751
    for minor in used_minors:
2752
      test = minor not in node_drbd
2753
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2754
                    "unallocated drbd minor %d is in use", minor)
2755

    
2756
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2757
    """Builds the node OS structures.
2758

2759
    @type ninfo: L{objects.Node}
2760
    @param ninfo: the node to check
2761
    @param nresult: the remote results for the node
2762
    @param nimg: the node image object
2763

2764
    """
2765
    remote_os = nresult.get(constants.NV_OSLIST, None)
2766
    test = (not isinstance(remote_os, list) or
2767
            not compat.all(isinstance(v, list) and len(v) == 7
2768
                           for v in remote_os))
2769

    
2770
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2771
                  "node hasn't returned valid OS data")
2772

    
2773
    nimg.os_fail = test
2774

    
2775
    if test:
2776
      return
2777

    
2778
    os_dict = {}
2779

    
2780
    for (name, os_path, status, diagnose,
2781
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2782

    
2783
      if name not in os_dict:
2784
        os_dict[name] = []
2785

    
2786
      # parameters is a list of lists instead of list of tuples due to
2787
      # JSON lacking a real tuple type, fix it:
2788
      parameters = [tuple(v) for v in parameters]
2789
      os_dict[name].append((os_path, status, diagnose,
2790
                            set(variants), set(parameters), set(api_ver)))
2791

    
2792
    nimg.oslist = os_dict
2793

    
2794
  def _VerifyNodeOS(self, ninfo, nimg, base):
2795
    """Verifies the node OS list.
2796

2797
    @type ninfo: L{objects.Node}
2798
    @param ninfo: the node to check
2799
    @param nimg: the node image object
2800
    @param base: the 'template' node we match against (e.g. from the master)
2801

2802
    """
2803
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2804

    
2805
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2806
    for os_name, os_data in nimg.oslist.items():
2807
      assert os_data, "Empty OS status for OS %s?!" % os_name
2808
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2809
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2810
                    "Invalid OS %s (located at %s): %s",
2811
                    os_name, f_path, f_diag)
2812
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2813
                    "OS '%s' has multiple entries"
2814
                    " (first one shadows the rest): %s",
2815
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2816
      # comparisons with the 'base' image
2817
      test = os_name not in base.oslist
2818
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2819
                    "Extra OS %s not present on reference node (%s)",
2820
                    os_name, self.cfg.GetNodeName(base.uuid))
2821
      if test:
2822
        continue
2823
      assert base.oslist[os_name], "Base node has empty OS status?"
2824
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2825
      if not b_status:
2826
        # base OS is invalid, skipping
2827
        continue
2828
      for kind, a, b in [("API version", f_api, b_api),
2829
                         ("variants list", f_var, b_var),
2830
                         ("parameters", beautify_params(f_param),
2831
                          beautify_params(b_param))]:
2832
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2833
                      "OS %s for %s differs from reference node %s:"
2834
                      " [%s] vs. [%s]", kind, os_name,
2835
                      self.cfg.GetNodeName(base.uuid),
2836
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2837

    
2838
    # check any missing OSes
2839
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2840
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2841
                  "OSes present on reference node %s"
2842
                  " but missing on this node: %s",
2843
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2844

    
2845
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2846
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2847

2848
    @type ninfo: L{objects.Node}
2849
    @param ninfo: the node to check
2850
    @param nresult: the remote results for the node
2851
    @type is_master: bool
2852
    @param is_master: Whether node is the master node
2853

2854
    """
2855
    cluster = self.cfg.GetClusterInfo()
2856
    if (is_master and
2857
        (cluster.IsFileStorageEnabled() or
2858
         cluster.IsSharedFileStorageEnabled())):
2859
      try:
2860
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2861
      except KeyError:
2862
        # This should never happen
2863
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2864
                      "Node did not return forbidden file storage paths")
2865
      else:
2866
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2867
                      "Found forbidden file storage paths: %s",
2868
                      utils.CommaJoin(fspaths))
2869
    else:
2870
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2871
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2872
                    "Node should not have returned forbidden file storage"
2873
                    " paths")
2874

    
2875
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2876
                          verify_key, error_key):
2877
    """Verifies (file) storage paths.
2878

2879
    @type ninfo: L{objects.Node}
2880
    @param ninfo: the node to check
2881
    @param nresult: the remote results for the node
2882
    @type file_disk_template: string
2883
    @param file_disk_template: file-based disk template, whose directory
2884
        is supposed to be verified
2885
    @type verify_key: string
2886
    @param verify_key: key for the verification map of this file
2887
        verification step
2888
    @param error_key: error key to be added to the verification results
2889
        in case something goes wrong in this verification step
2890

2891
    """
2892
    assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
2893
              constants.ST_FILE, constants.ST_SHARED_FILE
2894
           ))
2895

    
2896
    cluster = self.cfg.GetClusterInfo()
2897
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2898
      self._ErrorIf(
2899
          verify_key in nresult,
2900
          error_key, ninfo.name,
2901
          "The configured %s storage path is unusable: %s" %
2902
          (file_disk_template, nresult.get(verify_key)))
2903

    
2904
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2905
    """Verifies (file) storage paths.
2906

2907
    @see: C{_VerifyStoragePaths}
2908

2909
    """
2910
    self._VerifyStoragePaths(
2911
        ninfo, nresult, constants.DT_FILE,
2912
        constants.NV_FILE_STORAGE_PATH,
2913
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2914

    
2915
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2916
    """Verifies (file) storage paths.
2917

2918
    @see: C{_VerifyStoragePaths}
2919

2920
    """
2921
    self._VerifyStoragePaths(
2922
        ninfo, nresult, constants.DT_SHARED_FILE,
2923
        constants.NV_SHARED_FILE_STORAGE_PATH,
2924
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2925

    
2926
  def _VerifyOob(self, ninfo, nresult):
2927
    """Verifies out of band functionality of a node.
2928

2929
    @type ninfo: L{objects.Node}
2930
    @param ninfo: the node to check
2931
    @param nresult: the remote results for the node
2932

2933
    """
2934
    # We just have to verify the paths on master and/or master candidates
2935
    # as the oob helper is invoked on the master
2936
    if ((ninfo.master_candidate or ninfo.master_capable) and
2937
        constants.NV_OOB_PATHS in nresult):
2938
      for path_result in nresult[constants.NV_OOB_PATHS]:
2939
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2940
                      ninfo.name, path_result)
2941

    
2942
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2943
    """Verifies and updates the node volume data.
2944

2945
    This function will update a L{NodeImage}'s internal structures
2946
    with data from the remote call.
2947

2948
    @type ninfo: L{objects.Node}
2949
    @param ninfo: the node to check
2950
    @param nresult: the remote results for the node
2951
    @param nimg: the node image object
2952
    @param vg_name: the configured VG name
2953

2954
    """
2955
    nimg.lvm_fail = True
2956
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2957
    if vg_name is None:
2958
      pass
2959
    elif isinstance(lvdata, basestring):
2960
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2961
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2962
    elif not isinstance(lvdata, dict):
2963
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2964
                    "rpc call to node failed (lvlist)")
2965
    else:
2966
      nimg.volumes = lvdata
2967
      nimg.lvm_fail = False
2968

    
2969
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2970
    """Verifies and updates the node instance list.
2971

2972
    If the listing was successful, then updates this node's instance
2973
    list. Otherwise, it marks the RPC call as failed for the instance
2974
    list key.
2975

2976
    @type ninfo: L{objects.Node}
2977
    @param ninfo: the node to check
2978
    @param nresult: the remote results for the node
2979
    @param nimg: the node image object
2980

2981
    """
2982
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2983
    test = not isinstance(idata, list)
2984
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2985
                  "rpc call to node failed (instancelist): %s",
2986
                  utils.SafeEncode(str(idata)))
2987
    if test:
2988
      nimg.hyp_fail = True
2989
    else:
2990
      nimg.instances = [inst.uuid for (_, inst) in
2991
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2992

    
2993
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2994
    """Verifies and computes a node information map
2995

2996
    @type ninfo: L{objects.Node}
2997
    @param ninfo: the node to check
2998
    @param nresult: the remote results for the node
2999
    @param nimg: the node image object
3000
    @param vg_name: the configured VG name
3001

3002
    """
3003
    # try to read free memory (from the hypervisor)
3004
    hv_info = nresult.get(constants.NV_HVINFO, None)
3005
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
3006
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
3007
                  "rpc call to node failed (hvinfo)")
3008
    if not test:
3009
      try:
3010
        nimg.mfree = int(hv_info["memory_free"])
3011
      except (ValueError, TypeError):
3012
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
3013
                      "node returned invalid nodeinfo, check hypervisor")
3014

    
3015
    # FIXME: devise a free space model for file based instances as well
3016
    if vg_name is not None:
3017
      test = (constants.NV_VGLIST not in nresult or
3018
              vg_name not in nresult[constants.NV_VGLIST])
3019
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
3020
                    "node didn't return data for the volume group '%s'"
3021
                    " - it is either missing or broken", vg_name)
3022
      if not test:
3023
        try:
3024
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
3025
        except (ValueError, TypeError):
3026
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
3027
                        "node returned invalid LVM info, check LVM status")
3028

    
3029
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
3030
    """Gets per-disk status information for all instances.
3031

3032
    @type node_uuids: list of strings
3033
    @param node_uuids: Node UUIDs
3034
    @type node_image: dict of (UUID, L{objects.Node})
3035
    @param node_image: Node objects
3036
    @type instanceinfo: dict of (UUID, L{objects.Instance})
3037
    @param instanceinfo: Instance objects
3038
    @rtype: {instance: {node: [(succes, payload)]}}
3039
    @return: a dictionary of per-instance dictionaries with nodes as
3040
        keys and disk information as values; the disk information is a
3041
        list of tuples (success, payload)
3042

3043
    """
3044
    node_disks = {}
3045
    node_disks_dev_inst_only = {}
3046
    diskless_instances = set()
3047
    nodisk_instances = set()
3048
    diskless = constants.DT_DISKLESS
3049

    
3050
    for nuuid in node_uuids:
3051
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
3052
                                             node_image[nuuid].sinst))
3053
      diskless_instances.update(uuid for uuid in node_inst_uuids
3054
                                if instanceinfo[uuid].disk_template == diskless)
3055
      disks = [(inst_uuid, disk)
3056
               for inst_uuid in node_inst_uuids
3057
               for disk in instanceinfo[inst_uuid].disks]
3058

    
3059
      if not disks:
3060
        nodisk_instances.update(uuid for uuid in node_inst_uuids
3061
                                if instanceinfo[uuid].disk_template != diskless)
3062
        # No need to collect data
3063
        continue
3064

    
3065
      node_disks[nuuid] = disks
3066

    
3067
      # _AnnotateDiskParams makes already copies of the disks
3068
      dev_inst_only = []
3069
      for (inst_uuid, dev) in disks:
3070
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
3071
                                          self.cfg)
3072
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
3073

    
3074
      node_disks_dev_inst_only[nuuid] = dev_inst_only
3075

    
3076
    assert len(node_disks) == len(node_disks_dev_inst_only)
3077

    
3078
    # Collect data from all nodes with disks
3079
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
3080
               node_disks.keys(), node_disks_dev_inst_only)
3081

    
3082
    assert len(result) == len(node_disks)
3083

    
3084
    instdisk = {}
3085

    
3086
    for (nuuid, nres) in result.items():
3087
      node = self.cfg.GetNodeInfo(nuuid)
3088
      disks = node_disks[node.uuid]
3089

    
3090
      if nres.offline:
3091
        # No data from this node
3092
        data = len(disks) * [(False, "node offline")]
3093
      else:
3094
        msg = nres.fail_msg
3095
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
3096
                      "while getting disk information: %s", msg)
3097
        if msg:
3098
          # No data from this node
3099
          data = len(disks) * [(False, msg)]
3100
        else:
3101
          data = []
3102
          for idx, i in enumerate(nres.payload):
3103
            if isinstance(i, (tuple, list)) and len(i) == 2:
3104
              data.append(i)
3105
            else:
3106
              logging.warning("Invalid result from node %s, entry %d: %s",
3107
                              node.name, idx, i)
3108
              data.append((False, "Invalid result from the remote node"))
3109

    
3110
      for ((inst_uuid, _), status) in zip(disks, data):
3111
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
3112
          .append(status)
3113

    
3114
    # Add empty entries for diskless instances.
3115
    for inst_uuid in diskless_instances:
3116
      assert inst_uuid not in instdisk
3117
      instdisk[inst_uuid] = {}
3118
    # ...and disk-full instances that happen to have no disks
3119
    for inst_uuid in nodisk_instances:
3120
      assert inst_uuid not in instdisk
3121
      instdisk[inst_uuid] = {}
3122

    
3123
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
3124
                      len(nuuids) <= len(
3125
                        self.cfg.GetInstanceNodes(instanceinfo[inst])) and
3126
                      compat.all(isinstance(s, (tuple, list)) and
3127
                                 len(s) == 2 for s in statuses)
3128
                      for inst, nuuids in instdisk.items()
3129
                      for nuuid, statuses in nuuids.items())
3130
    if __debug__:
3131
      instdisk_keys = set(instdisk)
3132
      instanceinfo_keys = set(instanceinfo)
3133
      assert instdisk_keys == instanceinfo_keys, \
3134
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
3135
         (instdisk_keys, instanceinfo_keys))
3136

    
3137
    return instdisk
3138

    
3139
  @staticmethod
3140
  def _SshNodeSelector(group_uuid, all_nodes):
3141
    """Create endless iterators for all potential SSH check hosts.
3142

3143
    """
3144
    nodes = [node for node in all_nodes
3145
             if (node.group != group_uuid and
3146
                 not node.offline)]
3147
    keyfunc = operator.attrgetter("group")
3148

    
3149
    return map(itertools.cycle,
3150
               [sorted(map(operator.attrgetter("name"), names))
3151
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
3152
                                                  keyfunc)])
3153

    
3154
  @classmethod
3155
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
3156
    """Choose which nodes should talk to which other nodes.
3157

3158
    We will make nodes contact all nodes in their group, and one node from
3159
    every other group.
3160

3161
    @warning: This algorithm has a known issue if one node group is much
3162
      smaller than others (e.g. just one node). In such a case all other
3163
      nodes will talk to the single node.
3164

3165
    """
3166
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
3167
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
3168

    
3169
    return (online_nodes,
3170
            dict((name, sorted([i.next() for i in sel]))
3171
                 for name in online_nodes))
3172

    
3173
  def BuildHooksEnv(self):
3174
    """Build hooks env.
3175

3176
    Cluster-Verify hooks just ran in the post phase and their failure makes
3177
    the output be logged in the verify output and the verification to fail.
3178

3179
    """
3180
    env = {
3181
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
3182
      }
3183

    
3184
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
3185
               for node in self.my_node_info.values())
3186

    
3187
    return env
3188

    
3189
  def BuildHooksNodes(self):
3190
    """Build hooks nodes.
3191

3192
    """
3193
    return ([], list(self.my_node_info.keys()))
3194

    
3195
  def Exec(self, feedback_fn):
3196
    """Verify integrity of the node group, performing various test on nodes.
3197

3198
    """
3199
    # This method has too many local variables. pylint: disable=R0914
3200
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
3201

    
3202
    if not self.my_node_uuids:
3203
      # empty node group
3204
      feedback_fn("* Empty node group, skipping verification")
3205
      return True
3206

    
3207
    self.bad = False
3208
    verbose = self.op.verbose
3209
    self._feedback_fn = feedback_fn
3210

    
3211
    vg_name = self.cfg.GetVGName()
3212
    drbd_helper = self.cfg.GetDRBDHelper()
3213
    cluster = self.cfg.GetClusterInfo()
3214
    hypervisors = cluster.enabled_hypervisors
3215
    node_data_list = self.my_node_info.values()
3216

    
3217
    i_non_redundant = [] # Non redundant instances
3218
    i_non_a_balanced = [] # Non auto-balanced instances
3219
    i_offline = 0 # Count of offline instances
3220
    n_offline = 0 # Count of offline nodes
3221
    n_drained = 0 # Count of nodes being drained
3222
    node_vol_should = {}
3223

    
3224
    # FIXME: verify OS list
3225

    
3226
    # File verification
3227
    filemap = ComputeAncillaryFiles(cluster, False)
3228

    
3229
    # do local checksums
3230
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
3231
    master_ip = self.cfg.GetMasterIP()
3232

    
3233
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
3234

    
3235
    user_scripts = []
3236
    if self.cfg.GetUseExternalMipScript():
3237
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
3238

    
3239
    node_verify_param = {
3240
      constants.NV_FILELIST:
3241
        map(vcluster.MakeVirtualPath,
3242
            utils.UniqueSequence(filename
3243
                                 for files in filemap
3244
                                 for filename in files)),
3245
      constants.NV_NODELIST:
3246
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
3247
                                  self.all_node_info.values()),
3248
      constants.NV_HYPERVISOR: hypervisors,
3249
      constants.NV_HVPARAMS:
3250
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
3251
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
3252
                                 for node in node_data_list
3253
                                 if not node.offline],
3254
      constants.NV_INSTANCELIST: hypervisors,
3255
      constants.NV_VERSION: None,
3256
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
3257
      constants.NV_NODESETUP: None,
3258
      constants.NV_TIME: None,
3259
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
3260
      constants.NV_OSLIST: None,
3261
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
3262
      constants.NV_USERSCRIPTS: user_scripts,
3263
      constants.NV_CLIENT_CERT: None,
3264
      }
3265

    
3266
    if vg_name is not None:
3267
      node_verify_param[constants.NV_VGLIST] = None
3268
      node_verify_param[constants.NV_LVLIST] = vg_name
3269
      node_verify_param[constants.NV_PVLIST] = [vg_name]
3270

    
3271
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
3272
      if drbd_helper:
3273
        node_verify_param[constants.NV_DRBDVERSION] = None
3274
        node_verify_param[constants.NV_DRBDLIST] = None
3275
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
3276

    
3277
    if cluster.IsFileStorageEnabled() or \
3278
        cluster.IsSharedFileStorageEnabled():
3279
      # Load file storage paths only from master node
3280
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
3281
        self.cfg.GetMasterNodeName()
3282
      if cluster.IsFileStorageEnabled():
3283
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
3284
          cluster.file_storage_dir
3285

    
3286
    # bridge checks
3287
    # FIXME: this needs to be changed per node-group, not cluster-wide
3288
    bridges = set()
3289
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
3290
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3291
      bridges.add(default_nicpp[constants.NIC_LINK])
3292
    for inst_uuid in self.my_inst_info.values():
3293
      for nic in inst_uuid.nics:
3294
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
3295
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3296
          bridges.add(full_nic[constants.NIC_LINK])
3297

    
3298
    if bridges:
3299
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
3300

    
3301
    # Build our expected cluster state
3302
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
3303
                                                 uuid=node.uuid,
3304
                                                 vm_capable=node.vm_capable))
3305
                      for node in node_data_list)
3306

    
3307
    # Gather OOB paths
3308
    oob_paths = []
3309
    for node in self.all_node_info.values():
3310
      path = SupportsOob(self.cfg, node)
3311
      if path and path not in oob_paths:
3312
        oob_paths.append(path)
3313

    
3314
    if oob_paths:
3315
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
3316

    
3317
    for inst_uuid in self.my_inst_uuids:
3318
      instance = self.my_inst_info[inst_uuid]
3319
      if instance.admin_state == constants.ADMINST_OFFLINE:
3320
        i_offline += 1
3321

    
3322
      inst_nodes = self.cfg.GetInstanceNodes(instance)
3323
      for nuuid in inst_nodes:
3324
        if nuuid not in node_image:
3325
          gnode = self.NodeImage(uuid=nuuid)
3326
          gnode.ghost = (nuuid not in self.all_node_info)
3327
          node_image[nuuid] = gnode
3328

    
3329
      self.cfg.GetInstanceLVsByNode(instance, lvmap=node_vol_should)
3330

    
3331
      pnode = instance.primary_node
3332
      node_image[pnode].pinst.append(instance.uuid)
3333

    
3334
      for snode in self.cfg.GetInstanceSecondaryNodes(instance):
3335
        nimg = node_image[snode]
3336
        nimg.sinst.append(instance.uuid)
3337
        if pnode not in nimg.sbp:
3338
          nimg.sbp[pnode] = []
3339
        nimg.sbp[pnode].append(instance.uuid)
3340

    
3341
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3342
                                               self.my_node_info.keys())
3343
    # The value of exclusive_storage should be the same across the group, so if
3344
    # it's True for at least a node, we act as if it were set for all the nodes
3345
    self._exclusive_storage = compat.any(es_flags.values())
3346
    if self._exclusive_storage:
3347
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3348

    
3349
    node_group_uuids = dict(map(lambda n: (n.name, n.group),
3350
                                self.cfg.GetAllNodesInfo().values()))
3351
    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
3352

    
3353
    # At this point, we have the in-memory data structures complete,
3354
    # except for the runtime information, which we'll gather next
3355

    
3356
    # Due to the way our RPC system works, exact response times cannot be
3357
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3358
    # time before and after executing the request, we can at least have a time
3359
    # window.
3360
    nvinfo_starttime = time.time()
3361
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3362
                                           node_verify_param,
3363
                                           self.cfg.GetClusterName(),
3364
                                           self.cfg.GetClusterInfo().hvparams,
3365
                                           node_group_uuids,
3366
                                           groups_config)
3367
    nvinfo_endtime = time.time()
3368

    
3369
    if self.extra_lv_nodes and vg_name is not None:
3370
      extra_lv_nvinfo = \
3371
          self.rpc.call_node_verify(self.extra_lv_nodes,
3372
                                    {constants.NV_LVLIST: vg_name},
3373
                                    self.cfg.GetClusterName(),
3374
                                    self.cfg.GetClusterInfo().hvparams,
3375
                                    node_group_uuids,
3376
                                    groups_config)
3377
    else:
3378
      extra_lv_nvinfo = {}
3379

    
3380
    all_drbd_map = self.cfg.ComputeDRBDMap()
3381

    
3382
    feedback_fn("* Gathering disk information (%s nodes)" %
3383
                len(self.my_node_uuids))
3384
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3385
                                     self.my_inst_info)
3386

    
3387
    feedback_fn("* Verifying configuration file consistency")
3388

    
3389
    self._VerifyClientCertificates(self.my_node_info.values(), all_nvinfo)
3390
    # If not all nodes are being checked, we need to make sure the master node
3391
    # and a non-checked vm_capable node are in the list.
3392
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3393
    if absent_node_uuids:
3394
      vf_nvinfo = all_nvinfo.copy()
3395
      vf_node_info = list(self.my_node_info.values())
3396
      additional_node_uuids = []
3397
      if master_node_uuid not in self.my_node_info:
3398
        additional_node_uuids.append(master_node_uuid)
3399
        vf_node_info.append(self.all_node_info[master_node_uuid])
3400
      # Add the first vm_capable node we find which is not included,
3401
      # excluding the master node (which we already have)
3402
      for node_uuid in absent_node_uuids:
3403
        nodeinfo = self.all_node_info[node_uuid]
3404
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3405
            node_uuid != master_node_uuid):
3406
          additional_node_uuids.append(node_uuid)
3407
          vf_node_info.append(self.all_node_info[node_uuid])
3408
          break
3409
      key = constants.NV_FILELIST
3410
      vf_nvinfo.update(self.rpc.call_node_verify(
3411
         additional_node_uuids, {key: node_verify_param[key]},
3412
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
3413
         node_group_uuids,
3414
         groups_config))
3415
    else:
3416
      vf_nvinfo = all_nvinfo
3417
      vf_node_info = self.my_node_info.values()
3418

    
3419
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3420

    
3421
    feedback_fn("* Verifying node status")
3422

    
3423
    refos_img = None
3424

    
3425
    for node_i in node_data_list:
3426
      nimg = node_image[node_i.uuid]
3427

    
3428
      if node_i.offline:
3429
        if verbose:
3430
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3431
        n_offline += 1
3432
        continue
3433

    
3434
      if node_i.uuid == master_node_uuid:
3435
        ntype = "master"
3436
      elif node_i.master_candidate:
3437
        ntype = "master candidate"
3438
      elif node_i.drained:
3439
        ntype = "drained"
3440
        n_drained += 1
3441
      else:
3442
        ntype = "regular"
3443
      if verbose:
3444
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3445

    
3446
      msg = all_nvinfo[node_i.uuid].fail_msg
3447
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3448
                    "while contacting node: %s", msg)
3449
      if msg:
3450
        nimg.rpc_fail = True
3451
        continue
3452

    
3453
      nresult = all_nvinfo[node_i.uuid].payload
3454

    
3455
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3456
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3457
      self._VerifyNodeNetwork(node_i, nresult)
3458
      self._VerifyNodeUserScripts(node_i, nresult)
3459
      self._VerifyOob(node_i, nresult)
3460
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3461
                                           node_i.uuid == master_node_uuid)
3462
      self._VerifyFileStoragePaths(node_i, nresult)
3463
      self._VerifySharedFileStoragePaths(node_i, nresult)
3464

    
3465
      if nimg.vm_capable:
3466
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3467
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3468
                             all_drbd_map)
3469

    
3470
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3471
        self._UpdateNodeInstances(node_i, nresult, nimg)
3472
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3473
        self._UpdateNodeOS(node_i, nresult, nimg)
3474

    
3475
        if not nimg.os_fail:
3476
          if refos_img is None:
3477
            refos_img = nimg
3478
          self._VerifyNodeOS(node_i, nimg, refos_img)
3479
        self._VerifyNodeBridges(node_i, nresult, bridges)
3480

    
3481
        # Check whether all running instances are primary for the node. (This
3482
        # can no longer be done from _VerifyInstance below, since some of the
3483
        # wrong instances could be from other node groups.)
3484
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3485

    
3486
        for inst_uuid in non_primary_inst_uuids:
3487
          test = inst_uuid in self.all_inst_info
3488
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3489
                        self.cfg.GetInstanceName(inst_uuid),
3490
                        "instance should not run on node %s", node_i.name)
3491
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3492
                        "node is running unknown instance %s", inst_uuid)
3493

    
3494
    self._VerifyGroupDRBDVersion(all_nvinfo)
3495
    self._VerifyGroupLVM(node_image, vg_name)
3496

    
3497
    for node_uuid, result in extra_lv_nvinfo.items():
3498
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3499
                              node_image[node_uuid], vg_name)
3500

    
3501
    feedback_fn("* Verifying instance status")
3502
    for inst_uuid in self.my_inst_uuids:
3503
      instance = self.my_inst_info[inst_uuid]
3504
      if verbose:
3505
        feedback_fn("* Verifying instance %s" % instance.name)
3506
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3507

    
3508
      # If the instance is non-redundant we cannot survive losing its primary
3509
      # node, so we are not N+1 compliant.
3510
      if instance.disk_template not in constants.DTS_MIRRORED:
3511
        i_non_redundant.append(instance)
3512

    
3513
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3514
        i_non_a_balanced.append(instance)
3515

    
3516
    feedback_fn("* Verifying orphan volumes")
3517
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3518

    
3519
    # We will get spurious "unknown volume" warnings if any node of this group
3520
    # is secondary for an instance whose primary is in another group. To avoid
3521
    # them, we find these instances and add their volumes to node_vol_should.
3522
    for instance in self.all_inst_info.values():
3523
      for secondary in self.cfg.GetInstanceSecondaryNodes(instance):
3524
        if (secondary in self.my_node_info
3525
            and instance.name not in self.my_inst_info):
3526
          self.cfg.GetInstanceLVsByNode(instance, lvmap=node_vol_should)
3527
          break
3528

    
3529
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3530

    
3531
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3532
      feedback_fn("* Verifying N+1 Memory redundancy")
3533
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3534

    
3535
    feedback_fn("* Other Notes")
3536
    if i_non_redundant:
3537
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3538
                  % len(i_non_redundant))
3539

    
3540
    if i_non_a_balanced:
3541
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3542
                  % len(i_non_a_balanced))
3543

    
3544
    if i_offline:
3545
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3546

    
3547
    if n_offline:
3548
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3549

    
3550
    if n_drained:
3551
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3552

    
3553
    return not self.bad
3554

    
3555
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3556
    """Analyze the post-hooks' result
3557

3558
    This method analyses the hook result, handles it, and sends some
3559
    nicely-formatted feedback back to the user.
3560

3561
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3562
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3563
    @param hooks_results: the results of the multi-node hooks rpc call
3564
    @param feedback_fn: function used send feedback back to the caller
3565
    @param lu_result: previous Exec result
3566
    @return: the new Exec result, based on the previous result
3567
        and hook results
3568

3569
    """
3570
    # We only really run POST phase hooks, only for non-empty groups,
3571
    # and are only interested in their results
3572
    if not self.my_node_uuids:
3573
      # empty node group
3574
      pass
3575
    elif phase == constants.HOOKS_PHASE_POST:
3576
      # Used to change hooks' output to proper indentation
3577
      feedback_fn("* Hooks Results")
3578
      assert hooks_results, "invalid result from hooks"
3579

    
3580
      for node_name in hooks_results:
3581
        res = hooks_results[node_name]
3582
        msg = res.fail_msg
3583
        test = msg and not res.offline
3584
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3585
                      "Communication failure in hooks execution: %s", msg)
3586
        if res.offline or msg:
3587
          # No need to investigate payload if node is offline or gave
3588
          # an error.
3589
          continue
3590
        for script, hkr, output in res.payload:
3591
          test = hkr == constants.HKR_FAIL
3592
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3593
                        "Script %s failed, output:", script)
3594
          if test:
3595
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3596
            feedback_fn("%s" % output)
3597
            lu_result = False
3598

    
3599
    return lu_result
3600

    
3601

    
3602
class LUClusterVerifyDisks(NoHooksLU):
3603
  """Verifies the cluster disks status.
3604

3605
  """
3606
  REQ_BGL = False
3607

    
3608
  def ExpandNames(self):
3609
    self.share_locks = ShareAll()
3610
    self.needed_locks = {
3611
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3612
      }
3613

    
3614
  def Exec(self, feedback_fn):
3615
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3616

    
3617
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3618
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3619
                           for group in group_names])