Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 2ff6426b

History | View | Annotate | Download (135.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import copy
25
import itertools
26
import logging
27
import operator
28
import os
29
import re
30
import time
31

    
32
from ganeti import compat
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import hypervisor
36
from ganeti import locking
37
from ganeti import masterd
38
from ganeti import netutils
39
from ganeti import objects
40
from ganeti import opcodes
41
from ganeti import pathutils
42
from ganeti import query
43
import ganeti.rpc.node as rpc
44
from ganeti import runtime
45
from ganeti import ssh
46
from ganeti import uidpool
47
from ganeti import utils
48
from ganeti import vcluster
49

    
50
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
51
  ResultWithJobs
52
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
53
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
54
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
55
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
56
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
57
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
58
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
59
  CheckDiskAccessModeConsistency, CreateNewClientCert, \
60
  OpAddInstanceCommunicationNetwork, OpConnectInstanceCommunicationNetwork
61

    
62
import ganeti.masterd.instance
63

    
64

    
65
def _UpdateMasterClientCert(
66
    lu, master_uuid, cluster, feedback_fn,
67
    client_cert=pathutils.NODED_CLIENT_CERT_FILE,
68
    client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP):
69
  """Renews the master's client certificate and propagates the config.
70

71
  @type lu: C{LogicalUnit}
72
  @param lu: the logical unit holding the config
73
  @type master_uuid: string
74
  @param master_uuid: the master node's UUID
75
  @type cluster: C{objects.Cluster}
76
  @param cluster: the cluster's configuration
77
  @type feedback_fn: function
78
  @param feedback_fn: feedback functions for config updates
79
  @type client_cert: string
80
  @param client_cert: the path of the client certificate
81
  @type client_cert_tmp: string
82
  @param client_cert_tmp: the temporary path of the client certificate
83
  @rtype: string
84
  @return: the digest of the newly created client certificate
85

86
  """
87
  client_digest = CreateNewClientCert(lu, master_uuid, filename=client_cert_tmp)
88
  utils.AddNodeToCandidateCerts(master_uuid, client_digest,
89
                                cluster.candidate_certs)
90
  # This triggers an update of the config and distribution of it with the old
91
  # SSL certificate
92
  lu.cfg.Update(cluster, feedback_fn)
93

    
94
  utils.RemoveFile(client_cert)
95
  utils.RenameFile(client_cert_tmp, client_cert)
96
  return client_digest
97

    
98

    
99
class LUClusterRenewCrypto(NoHooksLU):
100
  """Renew the cluster's crypto tokens.
101

102
  Note that most of this operation is done in gnt_cluster.py, this LU only
103
  takes care of the renewal of the client SSL certificates.
104

105
  """
106
  def Exec(self, feedback_fn):
107
    master_uuid = self.cfg.GetMasterNode()
108
    cluster = self.cfg.GetClusterInfo()
109

    
110
    server_digest = utils.GetCertificateDigest(
111
      cert_filename=pathutils.NODED_CERT_FILE)
112
    utils.AddNodeToCandidateCerts("%s-SERVER" % master_uuid,
113
                                  server_digest,
114
                                  cluster.candidate_certs)
115
    new_master_digest = _UpdateMasterClientCert(self, master_uuid, cluster,
116
                                                feedback_fn)
117

    
118
    cluster.candidate_certs = {master_uuid: new_master_digest}
119
    nodes = self.cfg.GetAllNodesInfo()
120
    for (node_uuid, node_info) in nodes.items():
121
      if node_uuid != master_uuid:
122
        new_digest = CreateNewClientCert(self, node_uuid)
123
        if node_info.master_candidate:
124
          cluster.candidate_certs[node_uuid] = new_digest
125
    # Trigger another update of the config now with the new master cert
126
    self.cfg.Update(cluster, feedback_fn)
127

    
128

    
129
class LUClusterActivateMasterIp(NoHooksLU):
130
  """Activate the master IP on the master node.
131

132
  """
133
  def Exec(self, feedback_fn):
134
    """Activate the master IP.
135

136
    """
137
    master_params = self.cfg.GetMasterNetworkParameters()
138
    ems = self.cfg.GetUseExternalMipScript()
139
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
140
                                                   master_params, ems)
141
    result.Raise("Could not activate the master IP")
142

    
143

    
144
class LUClusterDeactivateMasterIp(NoHooksLU):
145
  """Deactivate the master IP on the master node.
146

147
  """
148
  def Exec(self, feedback_fn):
149
    """Deactivate the master IP.
150

151
    """
152
    master_params = self.cfg.GetMasterNetworkParameters()
153
    ems = self.cfg.GetUseExternalMipScript()
154
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
155
                                                     master_params, ems)
156
    result.Raise("Could not deactivate the master IP")
157

    
158

    
159
class LUClusterConfigQuery(NoHooksLU):
160
  """Return configuration values.
161

162
  """
163
  REQ_BGL = False
164

    
165
  def CheckArguments(self):
166
    self.cq = ClusterQuery(None, self.op.output_fields, False)
167

    
168
  def ExpandNames(self):
169
    self.cq.ExpandNames(self)
170

    
171
  def DeclareLocks(self, level):
172
    self.cq.DeclareLocks(self, level)
173

    
174
  def Exec(self, feedback_fn):
175
    result = self.cq.OldStyleQuery(self)
176

    
177
    assert len(result) == 1
178

    
179
    return result[0]
180

    
181

    
182
class LUClusterDestroy(LogicalUnit):
183
  """Logical unit for destroying the cluster.
184

185
  """
186
  HPATH = "cluster-destroy"
187
  HTYPE = constants.HTYPE_CLUSTER
188

    
189
  def BuildHooksEnv(self):
190
    """Build hooks env.
191

192
    """
193
    return {
194
      "OP_TARGET": self.cfg.GetClusterName(),
195
      }
196

    
197
  def BuildHooksNodes(self):
198
    """Build hooks nodes.
199

200
    """
201
    return ([], [])
202

    
203
  def CheckPrereq(self):
204
    """Check prerequisites.
205

206
    This checks whether the cluster is empty.
207

208
    Any errors are signaled by raising errors.OpPrereqError.
209

210
    """
211
    master = self.cfg.GetMasterNode()
212

    
213
    nodelist = self.cfg.GetNodeList()
214
    if len(nodelist) != 1 or nodelist[0] != master:
215
      raise errors.OpPrereqError("There are still %d node(s) in"
216
                                 " this cluster." % (len(nodelist) - 1),
217
                                 errors.ECODE_INVAL)
218
    instancelist = self.cfg.GetInstanceList()
219
    if instancelist:
220
      raise errors.OpPrereqError("There are still %d instance(s) in"
221
                                 " this cluster." % len(instancelist),
222
                                 errors.ECODE_INVAL)
223

    
224
  def Exec(self, feedback_fn):
225
    """Destroys the cluster.
226

227
    """
228
    master_params = self.cfg.GetMasterNetworkParameters()
229

    
230
    # Run post hooks on master node before it's removed
231
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
232

    
233
    ems = self.cfg.GetUseExternalMipScript()
234
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
235
                                                     master_params, ems)
236
    result.Warn("Error disabling the master IP address", self.LogWarning)
237
    return master_params.uuid
238

    
239

    
240
class LUClusterPostInit(LogicalUnit):
241
  """Logical unit for running hooks after cluster initialization.
242

243
  """
244
  HPATH = "cluster-init"
245
  HTYPE = constants.HTYPE_CLUSTER
246

    
247
  def CheckArguments(self):
248
    self.master_uuid = self.cfg.GetMasterNode()
249
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
250

    
251
    # TODO: When Issue 584 is solved, and None is properly parsed when used
252
    # as a default value, ndparams.get(.., None) can be changed to
253
    # ndparams[..] to access the values directly
254

    
255
    # OpenvSwitch: Warn user if link is missing
256
    if (self.master_ndparams[constants.ND_OVS] and not
257
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
258
      self.LogInfo("No physical interface for OpenvSwitch was given."
259
                   " OpenvSwitch will not have an outside connection. This"
260
                   " might not be what you want.")
261

    
262
  def BuildHooksEnv(self):
263
    """Build hooks env.
264

265
    """
266
    return {
267
      "OP_TARGET": self.cfg.GetClusterName(),
268
      }
269

    
270
  def BuildHooksNodes(self):
271
    """Build hooks nodes.
272

273
    """
274
    return ([], [self.cfg.GetMasterNode()])
275

    
276
  def Exec(self, feedback_fn):
277
    """Create and configure Open vSwitch
278

279
    """
280
    if self.master_ndparams[constants.ND_OVS]:
281
      result = self.rpc.call_node_configure_ovs(
282
                 self.master_uuid,
283
                 self.master_ndparams[constants.ND_OVS_NAME],
284
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
285
      result.Raise("Could not successully configure Open vSwitch")
286

    
287
    cluster = self.cfg.GetClusterInfo()
288
    _UpdateMasterClientCert(self, self.master_uuid, cluster, feedback_fn)
289

    
290
    return True
291

    
292

    
293
class ClusterQuery(QueryBase):
294
  FIELDS = query.CLUSTER_FIELDS
295

    
296
  #: Do not sort (there is only one item)
297
  SORT_FIELD = None
298

    
299
  def ExpandNames(self, lu):
300
    lu.needed_locks = {}
301

    
302
    # The following variables interact with _QueryBase._GetNames
303
    self.wanted = locking.ALL_SET
304
    self.do_locking = self.use_locking
305

    
306
    if self.do_locking:
307
      raise errors.OpPrereqError("Can not use locking for cluster queries",
308
                                 errors.ECODE_INVAL)
309

    
310
  def DeclareLocks(self, lu, level):
311
    pass
312

    
313
  def _GetQueryData(self, lu):
314
    """Computes the list of nodes and their attributes.
315

316
    """
317
    # Locking is not used
318
    assert not (compat.any(lu.glm.is_owned(level)
319
                           for level in locking.LEVELS
320
                           if level != locking.LEVEL_CLUSTER) or
321
                self.do_locking or self.use_locking)
322

    
323
    if query.CQ_CONFIG in self.requested_data:
324
      cluster = lu.cfg.GetClusterInfo()
325
      nodes = lu.cfg.GetAllNodesInfo()
326
    else:
327
      cluster = NotImplemented
328
      nodes = NotImplemented
329

    
330
    if query.CQ_QUEUE_DRAINED in self.requested_data:
331
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
332
    else:
333
      drain_flag = NotImplemented
334

    
335
    if query.CQ_WATCHER_PAUSE in self.requested_data:
336
      master_node_uuid = lu.cfg.GetMasterNode()
337

    
338
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
339
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
340
                   lu.cfg.GetMasterNodeName())
341

    
342
      watcher_pause = result.payload
343
    else:
344
      watcher_pause = NotImplemented
345

    
346
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
347

    
348

    
349
class LUClusterQuery(NoHooksLU):
350
  """Query cluster configuration.
351

352
  """
353
  REQ_BGL = False
354

    
355
  def ExpandNames(self):
356
    self.needed_locks = {}
357

    
358
  def Exec(self, feedback_fn):
359
    """Return cluster config.
360

361
    """
362
    cluster = self.cfg.GetClusterInfo()
363
    os_hvp = {}
364

    
365
    # Filter just for enabled hypervisors
366
    for os_name, hv_dict in cluster.os_hvp.items():
367
      os_hvp[os_name] = {}
368
      for hv_name, hv_params in hv_dict.items():
369
        if hv_name in cluster.enabled_hypervisors:
370
          os_hvp[os_name][hv_name] = hv_params
371

    
372
    # Convert ip_family to ip_version
373
    primary_ip_version = constants.IP4_VERSION
374
    if cluster.primary_ip_family == netutils.IP6Address.family:
375
      primary_ip_version = constants.IP6_VERSION
376

    
377
    result = {
378
      "software_version": constants.RELEASE_VERSION,
379
      "protocol_version": constants.PROTOCOL_VERSION,
380
      "config_version": constants.CONFIG_VERSION,
381
      "os_api_version": max(constants.OS_API_VERSIONS),
382
      "export_version": constants.EXPORT_VERSION,
383
      "vcs_version": constants.VCS_VERSION,
384
      "architecture": runtime.GetArchInfo(),
385
      "name": cluster.cluster_name,
386
      "master": self.cfg.GetMasterNodeName(),
387
      "default_hypervisor": cluster.primary_hypervisor,
388
      "enabled_hypervisors": cluster.enabled_hypervisors,
389
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
390
                        for hypervisor_name in cluster.enabled_hypervisors]),
391
      "os_hvp": os_hvp,
392
      "beparams": cluster.beparams,
393
      "osparams": cluster.osparams,
394
      "ipolicy": cluster.ipolicy,
395
      "nicparams": cluster.nicparams,
396
      "ndparams": cluster.ndparams,
397
      "diskparams": cluster.diskparams,
398
      "candidate_pool_size": cluster.candidate_pool_size,
399
      "max_running_jobs": cluster.max_running_jobs,
400
      "master_netdev": cluster.master_netdev,
401
      "master_netmask": cluster.master_netmask,
402
      "use_external_mip_script": cluster.use_external_mip_script,
403
      "volume_group_name": cluster.volume_group_name,
404
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
405
      "file_storage_dir": cluster.file_storage_dir,
406
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
407
      "maintain_node_health": cluster.maintain_node_health,
408
      "ctime": cluster.ctime,
409
      "mtime": cluster.mtime,
410
      "uuid": cluster.uuid,
411
      "tags": list(cluster.GetTags()),
412
      "uid_pool": cluster.uid_pool,
413
      "default_iallocator": cluster.default_iallocator,
414
      "default_iallocator_params": cluster.default_iallocator_params,
415
      "reserved_lvs": cluster.reserved_lvs,
416
      "primary_ip_version": primary_ip_version,
417
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
418
      "hidden_os": cluster.hidden_os,
419
      "blacklisted_os": cluster.blacklisted_os,
420
      "enabled_disk_templates": cluster.enabled_disk_templates,
421
      "instance_communication_network": cluster.instance_communication_network,
422
      }
423

    
424
    return result
425

    
426

    
427
class LUClusterRedistConf(NoHooksLU):
428
  """Force the redistribution of cluster configuration.
429

430
  This is a very simple LU.
431

432
  """
433
  REQ_BGL = False
434

    
435
  def ExpandNames(self):
436
    self.needed_locks = {
437
      locking.LEVEL_NODE: locking.ALL_SET,
438
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
439
    }
440
    self.share_locks = ShareAll()
441

    
442
  def Exec(self, feedback_fn):
443
    """Redistribute the configuration.
444

445
    """
446
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
447
    RedistributeAncillaryFiles(self)
448

    
449

    
450
class LUClusterRename(LogicalUnit):
451
  """Rename the cluster.
452

453
  """
454
  HPATH = "cluster-rename"
455
  HTYPE = constants.HTYPE_CLUSTER
456

    
457
  def BuildHooksEnv(self):
458
    """Build hooks env.
459

460
    """
461
    return {
462
      "OP_TARGET": self.cfg.GetClusterName(),
463
      "NEW_NAME": self.op.name,
464
      }
465

    
466
  def BuildHooksNodes(self):
467
    """Build hooks nodes.
468

469
    """
470
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
471

    
472
  def CheckPrereq(self):
473
    """Verify that the passed name is a valid one.
474

475
    """
476
    hostname = netutils.GetHostname(name=self.op.name,
477
                                    family=self.cfg.GetPrimaryIPFamily())
478

    
479
    new_name = hostname.name
480
    self.ip = new_ip = hostname.ip
481
    old_name = self.cfg.GetClusterName()
482
    old_ip = self.cfg.GetMasterIP()
483
    if new_name == old_name and new_ip == old_ip:
484
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
485
                                 " cluster has changed",
486
                                 errors.ECODE_INVAL)
487
    if new_ip != old_ip:
488
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
489
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
490
                                   " reachable on the network" %
491
                                   new_ip, errors.ECODE_NOTUNIQUE)
492

    
493
    self.op.name = new_name
494

    
495
  def Exec(self, feedback_fn):
496
    """Rename the cluster.
497

498
    """
499
    clustername = self.op.name
500
    new_ip = self.ip
501

    
502
    # shutdown the master IP
503
    master_params = self.cfg.GetMasterNetworkParameters()
504
    ems = self.cfg.GetUseExternalMipScript()
505
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
506
                                                     master_params, ems)
507
    result.Raise("Could not disable the master role")
508

    
509
    try:
510
      cluster = self.cfg.GetClusterInfo()
511
      cluster.cluster_name = clustername
512
      cluster.master_ip = new_ip
513
      self.cfg.Update(cluster, feedback_fn)
514

    
515
      # update the known hosts file
516
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
517
      node_list = self.cfg.GetOnlineNodeList()
518
      try:
519
        node_list.remove(master_params.uuid)
520
      except ValueError:
521
        pass
522
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
523
    finally:
524
      master_params.ip = new_ip
525
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
526
                                                     master_params, ems)
527
      result.Warn("Could not re-enable the master role on the master,"
528
                  " please restart manually", self.LogWarning)
529

    
530
    return clustername
531

    
532

    
533
class LUClusterRepairDiskSizes(NoHooksLU):
534
  """Verifies the cluster disks sizes.
535

536
  """
537
  REQ_BGL = False
538

    
539
  def ExpandNames(self):
540
    if self.op.instances:
541
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
542
      # Not getting the node allocation lock as only a specific set of
543
      # instances (and their nodes) is going to be acquired
544
      self.needed_locks = {
545
        locking.LEVEL_NODE_RES: [],
546
        locking.LEVEL_INSTANCE: self.wanted_names,
547
        }
548
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
549
    else:
550
      self.wanted_names = None
551
      self.needed_locks = {
552
        locking.LEVEL_NODE_RES: locking.ALL_SET,
553
        locking.LEVEL_INSTANCE: locking.ALL_SET,
554

    
555
        # This opcode is acquires the node locks for all instances
556
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
557
        }
558

    
559
    self.share_locks = {
560
      locking.LEVEL_NODE_RES: 1,
561
      locking.LEVEL_INSTANCE: 0,
562
      locking.LEVEL_NODE_ALLOC: 1,
563
      }
564

    
565
  def DeclareLocks(self, level):
566
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
567
      self._LockInstancesNodes(primary_only=True, level=level)
568

    
569
  def CheckPrereq(self):
570
    """Check prerequisites.
571

572
    This only checks the optional instance list against the existing names.
573

574
    """
575
    if self.wanted_names is None:
576
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
577

    
578
    self.wanted_instances = \
579
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
580

    
581
  def _EnsureChildSizes(self, disk):
582
    """Ensure children of the disk have the needed disk size.
583

584
    This is valid mainly for DRBD8 and fixes an issue where the
585
    children have smaller disk size.
586

587
    @param disk: an L{ganeti.objects.Disk} object
588

589
    """
590
    if disk.dev_type == constants.DT_DRBD8:
591
      assert disk.children, "Empty children for DRBD8?"
592
      fchild = disk.children[0]
593
      mismatch = fchild.size < disk.size
594
      if mismatch:
595
        self.LogInfo("Child disk has size %d, parent %d, fixing",
596
                     fchild.size, disk.size)
597
        fchild.size = disk.size
598

    
599
      # and we recurse on this child only, not on the metadev
600
      return self._EnsureChildSizes(fchild) or mismatch
601
    else:
602
      return False
603

    
604
  def Exec(self, feedback_fn):
605
    """Verify the size of cluster disks.
606

607
    """
608
    # TODO: check child disks too
609
    # TODO: check differences in size between primary/secondary nodes
610
    per_node_disks = {}
611
    for instance in self.wanted_instances:
612
      pnode = instance.primary_node
613
      if pnode not in per_node_disks:
614
        per_node_disks[pnode] = []
615
      for idx, disk in enumerate(instance.disks):
616
        per_node_disks[pnode].append((instance, idx, disk))
617

    
618
    assert not (frozenset(per_node_disks.keys()) -
619
                self.owned_locks(locking.LEVEL_NODE_RES)), \
620
      "Not owning correct locks"
621
    assert not self.owned_locks(locking.LEVEL_NODE)
622

    
623
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
624
                                               per_node_disks.keys())
625

    
626
    changed = []
627
    for node_uuid, dskl in per_node_disks.items():
628
      if not dskl:
629
        # no disks on the node
630
        continue
631

    
632
      newl = [([v[2].Copy()], v[0]) for v in dskl]
633
      node_name = self.cfg.GetNodeName(node_uuid)
634
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
635
      if result.fail_msg:
636
        self.LogWarning("Failure in blockdev_getdimensions call to node"
637
                        " %s, ignoring", node_name)
638
        continue
639
      if len(result.payload) != len(dskl):
640
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
641
                        " result.payload=%s", node_name, len(dskl),
642
                        result.payload)
643
        self.LogWarning("Invalid result from node %s, ignoring node results",
644
                        node_name)
645
        continue
646
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
647
        if dimensions is None:
648
          self.LogWarning("Disk %d of instance %s did not return size"
649
                          " information, ignoring", idx, instance.name)
650
          continue
651
        if not isinstance(dimensions, (tuple, list)):
652
          self.LogWarning("Disk %d of instance %s did not return valid"
653
                          " dimension information, ignoring", idx,
654
                          instance.name)
655
          continue
656
        (size, spindles) = dimensions
657
        if not isinstance(size, (int, long)):
658
          self.LogWarning("Disk %d of instance %s did not return valid"
659
                          " size information, ignoring", idx, instance.name)
660
          continue
661
        size = size >> 20
662
        if size != disk.size:
663
          self.LogInfo("Disk %d of instance %s has mismatched size,"
664
                       " correcting: recorded %d, actual %d", idx,
665
                       instance.name, disk.size, size)
666
          disk.size = size
667
          self.cfg.Update(instance, feedback_fn)
668
          changed.append((instance.name, idx, "size", size))
669
        if es_flags[node_uuid]:
670
          if spindles is None:
671
            self.LogWarning("Disk %d of instance %s did not return valid"
672
                            " spindles information, ignoring", idx,
673
                            instance.name)
674
          elif disk.spindles is None or disk.spindles != spindles:
675
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
676
                         " correcting: recorded %s, actual %s",
677
                         idx, instance.name, disk.spindles, spindles)
678
            disk.spindles = spindles
679
            self.cfg.Update(instance, feedback_fn)
680
            changed.append((instance.name, idx, "spindles", disk.spindles))
681
        if self._EnsureChildSizes(disk):
682
          self.cfg.Update(instance, feedback_fn)
683
          changed.append((instance.name, idx, "size", disk.size))
684
    return changed
685

    
686

    
687
def _ValidateNetmask(cfg, netmask):
688
  """Checks if a netmask is valid.
689

690
  @type cfg: L{config.ConfigWriter}
691
  @param cfg: The cluster configuration
692
  @type netmask: int
693
  @param netmask: the netmask to be verified
694
  @raise errors.OpPrereqError: if the validation fails
695

696
  """
697
  ip_family = cfg.GetPrimaryIPFamily()
698
  try:
699
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
700
  except errors.ProgrammerError:
701
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
702
                               ip_family, errors.ECODE_INVAL)
703
  if not ipcls.ValidateNetmask(netmask):
704
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
705
                               (netmask), errors.ECODE_INVAL)
706

    
707

    
708
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
709
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
710
    file_disk_template):
711
  """Checks whether the given file-based storage directory is acceptable.
712

713
  Note: This function is public, because it is also used in bootstrap.py.
714

715
  @type logging_warn_fn: function
716
  @param logging_warn_fn: function which accepts a string and logs it
717
  @type file_storage_dir: string
718
  @param file_storage_dir: the directory to be used for file-based instances
719
  @type enabled_disk_templates: list of string
720
  @param enabled_disk_templates: the list of enabled disk templates
721
  @type file_disk_template: string
722
  @param file_disk_template: the file-based disk template for which the
723
      path should be checked
724

725
  """
726
  assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
727
            constants.ST_FILE, constants.ST_SHARED_FILE
728
         ))
729
  file_storage_enabled = file_disk_template in enabled_disk_templates
730
  if file_storage_dir is not None:
731
    if file_storage_dir == "":
732
      if file_storage_enabled:
733
        raise errors.OpPrereqError(
734
            "Unsetting the '%s' storage directory while having '%s' storage"
735
            " enabled is not permitted." %
736
            (file_disk_template, file_disk_template))
737
    else:
738
      if not file_storage_enabled:
739
        logging_warn_fn(
740
            "Specified a %s storage directory, although %s storage is not"
741
            " enabled." % (file_disk_template, file_disk_template))
742
  else:
743
    raise errors.ProgrammerError("Received %s storage dir with value"
744
                                 " 'None'." % file_disk_template)
745

    
746

    
747
def CheckFileStoragePathVsEnabledDiskTemplates(
748
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
749
  """Checks whether the given file storage directory is acceptable.
750

751
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
752

753
  """
754
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
755
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
756
      constants.DT_FILE)
757

    
758

    
759
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
760
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
761
  """Checks whether the given shared file storage directory is acceptable.
762

763
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
764

765
  """
766
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
767
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
768
      constants.DT_SHARED_FILE)
769

    
770

    
771
class LUClusterSetParams(LogicalUnit):
772
  """Change the parameters of the cluster.
773

774
  """
775
  HPATH = "cluster-modify"
776
  HTYPE = constants.HTYPE_CLUSTER
777
  REQ_BGL = False
778

    
779
  def CheckArguments(self):
780
    """Check parameters
781

782
    """
783
    if self.op.uid_pool:
784
      uidpool.CheckUidPool(self.op.uid_pool)
785

    
786
    if self.op.add_uids:
787
      uidpool.CheckUidPool(self.op.add_uids)
788

    
789
    if self.op.remove_uids:
790
      uidpool.CheckUidPool(self.op.remove_uids)
791

    
792
    if self.op.master_netmask is not None:
793
      _ValidateNetmask(self.cfg, self.op.master_netmask)
794

    
795
    if self.op.diskparams:
796
      for dt_params in self.op.diskparams.values():
797
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
798
      try:
799
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
800
        CheckDiskAccessModeValidity(self.op.diskparams)
801
      except errors.OpPrereqError, err:
802
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
803
                                   errors.ECODE_INVAL)
804

    
805
  def ExpandNames(self):
806
    # FIXME: in the future maybe other cluster params won't require checking on
807
    # all nodes to be modified.
808
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
809
    # resource locks the right thing, shouldn't it be the BGL instead?
810
    self.needed_locks = {
811
      locking.LEVEL_NODE: locking.ALL_SET,
812
      locking.LEVEL_INSTANCE: locking.ALL_SET,
813
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
814
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
815
    }
816
    self.share_locks = ShareAll()
817

    
818
  def BuildHooksEnv(self):
819
    """Build hooks env.
820

821
    """
822
    return {
823
      "OP_TARGET": self.cfg.GetClusterName(),
824
      "NEW_VG_NAME": self.op.vg_name,
825
      }
826

    
827
  def BuildHooksNodes(self):
828
    """Build hooks nodes.
829

830
    """
831
    mn = self.cfg.GetMasterNode()
832
    return ([mn], [mn])
833

    
834
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
835
                   new_enabled_disk_templates):
836
    """Check the consistency of the vg name on all nodes and in case it gets
837
       unset whether there are instances still using it.
838

839
    """
840
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
841
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
842
                                            new_enabled_disk_templates)
843
    current_vg_name = self.cfg.GetVGName()
844

    
845
    if self.op.vg_name == '':
846
      if lvm_is_enabled:
847
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
848
                                   " disk templates are or get enabled.")
849

    
850
    if self.op.vg_name is None:
851
      if current_vg_name is None and lvm_is_enabled:
852
        raise errors.OpPrereqError("Please specify a volume group when"
853
                                   " enabling lvm-based disk-templates.")
854

    
855
    if self.op.vg_name is not None and not self.op.vg_name:
856
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
857
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
858
                                   " instances exist", errors.ECODE_INVAL)
859

    
860
    if (self.op.vg_name is not None and lvm_is_enabled) or \
861
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
862
      self._CheckVgNameOnNodes(node_uuids)
863

    
864
  def _CheckVgNameOnNodes(self, node_uuids):
865
    """Check the status of the volume group on each node.
866

867
    """
868
    vglist = self.rpc.call_vg_list(node_uuids)
869
    for node_uuid in node_uuids:
870
      msg = vglist[node_uuid].fail_msg
871
      if msg:
872
        # ignoring down node
873
        self.LogWarning("Error while gathering data on node %s"
874
                        " (ignoring node): %s",
875
                        self.cfg.GetNodeName(node_uuid), msg)
876
        continue
877
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
878
                                            self.op.vg_name,
879
                                            constants.MIN_VG_SIZE)
880
      if vgstatus:
881
        raise errors.OpPrereqError("Error on node '%s': %s" %
882
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
883
                                   errors.ECODE_ENVIRON)
884

    
885
  @staticmethod
886
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
887
                                old_enabled_disk_templates):
888
    """Computes three sets of disk templates.
889

890
    @see: C{_GetDiskTemplateSets} for more details.
891

892
    """
893
    enabled_disk_templates = None
894
    new_enabled_disk_templates = []
895
    disabled_disk_templates = []
896
    if op_enabled_disk_templates:
897
      enabled_disk_templates = op_enabled_disk_templates
898
      new_enabled_disk_templates = \
899
        list(set(enabled_disk_templates)
900
             - set(old_enabled_disk_templates))
901
      disabled_disk_templates = \
902
        list(set(old_enabled_disk_templates)
903
             - set(enabled_disk_templates))
904
    else:
905
      enabled_disk_templates = old_enabled_disk_templates
906
    return (enabled_disk_templates, new_enabled_disk_templates,
907
            disabled_disk_templates)
908

    
909
  def _GetDiskTemplateSets(self, cluster):
910
    """Computes three sets of disk templates.
911

912
    The three sets are:
913
      - disk templates that will be enabled after this operation (no matter if
914
        they were enabled before or not)
915
      - disk templates that get enabled by this operation (thus haven't been
916
        enabled before.)
917
      - disk templates that get disabled by this operation
918

919
    """
920
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
921
                                          cluster.enabled_disk_templates)
922

    
923
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
924
    """Checks the ipolicy.
925

926
    @type cluster: C{objects.Cluster}
927
    @param cluster: the cluster's configuration
928
    @type enabled_disk_templates: list of string
929
    @param enabled_disk_templates: list of (possibly newly) enabled disk
930
      templates
931

932
    """
933
    # FIXME: write unit tests for this
934
    if self.op.ipolicy:
935
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
936
                                           group_policy=False)
937

    
938
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
939
                                  enabled_disk_templates)
940

    
941
      all_instances = self.cfg.GetAllInstancesInfo().values()
942
      violations = set()
943
      for group in self.cfg.GetAllNodeGroupsInfo().values():
944
        instances = frozenset([inst for inst in all_instances
945
                               if compat.any(nuuid in group.members
946
                                             for nuuid in inst.all_nodes)])
947
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
948
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
949
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
950
                                           self.cfg)
951
        if new:
952
          violations.update(new)
953

    
954
      if violations:
955
        self.LogWarning("After the ipolicy change the following instances"
956
                        " violate them: %s",
957
                        utils.CommaJoin(utils.NiceSort(violations)))
958
    else:
959
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
960
                                  enabled_disk_templates)
961

    
962
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
963
    """Checks whether the set DRBD helper actually exists on the nodes.
964

965
    @type drbd_helper: string
966
    @param drbd_helper: path of the drbd usermode helper binary
967
    @type node_uuids: list of strings
968
    @param node_uuids: list of node UUIDs to check for the helper
969

970
    """
971
    # checks given drbd helper on all nodes
972
    helpers = self.rpc.call_drbd_helper(node_uuids)
973
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
974
      if ninfo.offline:
975
        self.LogInfo("Not checking drbd helper on offline node %s",
976
                     ninfo.name)
977
        continue
978
      msg = helpers[ninfo.uuid].fail_msg
979
      if msg:
980
        raise errors.OpPrereqError("Error checking drbd helper on node"
981
                                   " '%s': %s" % (ninfo.name, msg),
982
                                   errors.ECODE_ENVIRON)
983
      node_helper = helpers[ninfo.uuid].payload
984
      if node_helper != drbd_helper:
985
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
986
                                   (ninfo.name, node_helper),
987
                                   errors.ECODE_ENVIRON)
988

    
989
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
990
    """Check the DRBD usermode helper.
991

992
    @type node_uuids: list of strings
993
    @param node_uuids: a list of nodes' UUIDs
994
    @type drbd_enabled: boolean
995
    @param drbd_enabled: whether DRBD will be enabled after this operation
996
      (no matter if it was disabled before or not)
997
    @type drbd_gets_enabled: boolen
998
    @param drbd_gets_enabled: true if DRBD was disabled before this
999
      operation, but will be enabled afterwards
1000

1001
    """
1002
    if self.op.drbd_helper == '':
1003
      if drbd_enabled:
1004
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1005
                                   " DRBD is enabled.")
1006
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
1007
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1008
                                   " drbd-based instances exist",
1009
                                   errors.ECODE_INVAL)
1010

    
1011
    else:
1012
      if self.op.drbd_helper is not None and drbd_enabled:
1013
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
1014
      else:
1015
        if drbd_gets_enabled:
1016
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
1017
          if current_drbd_helper is not None:
1018
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
1019
          else:
1020
            raise errors.OpPrereqError("Cannot enable DRBD without a"
1021
                                       " DRBD usermode helper set.")
1022

    
1023
  def _CheckInstancesOfDisabledDiskTemplates(
1024
      self, disabled_disk_templates):
1025
    """Check whether we try to disable a disk template that is in use.
1026

1027
    @type disabled_disk_templates: list of string
1028
    @param disabled_disk_templates: list of disk templates that are going to
1029
      be disabled by this operation
1030

1031
    """
1032
    for disk_template in disabled_disk_templates:
1033
      if self.cfg.HasAnyDiskOfType(disk_template):
1034
        raise errors.OpPrereqError(
1035
            "Cannot disable disk template '%s', because there is at least one"
1036
            " instance using it." % disk_template)
1037

    
1038
  @staticmethod
1039
  def _CheckInstanceCommunicationNetwork(network, warning_fn):
1040
    """Check whether an existing network is configured for instance
1041
    communication.
1042

1043
    Checks whether an existing network is configured with the
1044
    parameters that are advisable for instance communication, and
1045
    otherwise issue security warnings.
1046

1047
    @type network: L{ganeti.objects.Network}
1048
    @param network: L{ganeti.objects.Network} object whose
1049
                    configuration is being checked
1050
    @type warning_fn: function
1051
    @param warning_fn: function used to print warnings
1052
    @rtype: None
1053
    @return: None
1054

1055
    """
1056
    def _MaybeWarn(err, val, default):
1057
      if val != default:
1058
        warning_fn("Supplied instance communication network '%s' %s '%s',"
1059
                   " this might pose a security risk (default is '%s').",
1060
                   network.name, err, val, default)
1061

    
1062
    if network.network is None:
1063
      raise errors.OpPrereqError("Supplied instance communication network '%s'"
1064
                                 " must have an IPv4 network address.",
1065
                                 network.name)
1066

    
1067
    _MaybeWarn("has an IPv4 gateway", network.gateway, None)
1068
    _MaybeWarn("has a non-standard IPv4 network address", network.network,
1069
               constants.INSTANCE_COMMUNICATION_NETWORK4)
1070
    _MaybeWarn("has an IPv6 gateway", network.gateway6, None)
1071
    _MaybeWarn("has a non-standard IPv6 network address", network.network6,
1072
               constants.INSTANCE_COMMUNICATION_NETWORK6)
1073
    _MaybeWarn("has a non-standard MAC prefix", network.mac_prefix,
1074
               constants.INSTANCE_COMMUNICATION_MAC_PREFIX)
1075

    
1076
  def CheckPrereq(self):
1077
    """Check prerequisites.
1078

1079
    This checks whether the given params don't conflict and
1080
    if the given volume group is valid.
1081

1082
    """
1083
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1084
    self.cluster = cluster = self.cfg.GetClusterInfo()
1085

    
1086
    vm_capable_node_uuids = [node.uuid
1087
                             for node in self.cfg.GetAllNodesInfo().values()
1088
                             if node.uuid in node_uuids and node.vm_capable]
1089

    
1090
    (enabled_disk_templates, new_enabled_disk_templates,
1091
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
1092
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
1093

    
1094
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
1095
                      new_enabled_disk_templates)
1096

    
1097
    if self.op.file_storage_dir is not None:
1098
      CheckFileStoragePathVsEnabledDiskTemplates(
1099
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
1100

    
1101
    if self.op.shared_file_storage_dir is not None:
1102
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
1103
          self.LogWarning, self.op.shared_file_storage_dir,
1104
          enabled_disk_templates)
1105

    
1106
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1107
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1108
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1109

    
1110
    # validate params changes
1111
    if self.op.beparams:
1112
      objects.UpgradeBeParams(self.op.beparams)
1113
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1114
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1115

    
1116
    if self.op.ndparams:
1117
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1118
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1119

    
1120
      # TODO: we need a more general way to handle resetting
1121
      # cluster-level parameters to default values
1122
      if self.new_ndparams["oob_program"] == "":
1123
        self.new_ndparams["oob_program"] = \
1124
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1125

    
1126
    if self.op.hv_state:
1127
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1128
                                           self.cluster.hv_state_static)
1129
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1130
                               for hv, values in new_hv_state.items())
1131

    
1132
    if self.op.disk_state:
1133
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1134
                                               self.cluster.disk_state_static)
1135
      self.new_disk_state = \
1136
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1137
                            for name, values in svalues.items()))
1138
             for storage, svalues in new_disk_state.items())
1139

    
1140
    self._CheckIpolicy(cluster, enabled_disk_templates)
1141

    
1142
    if self.op.nicparams:
1143
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1144
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1145
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1146
      nic_errors = []
1147

    
1148
      # check all instances for consistency
1149
      for instance in self.cfg.GetAllInstancesInfo().values():
1150
        for nic_idx, nic in enumerate(instance.nics):
1151
          params_copy = copy.deepcopy(nic.nicparams)
1152
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1153

    
1154
          # check parameter syntax
1155
          try:
1156
            objects.NIC.CheckParameterSyntax(params_filled)
1157
          except errors.ConfigurationError, err:
1158
            nic_errors.append("Instance %s, nic/%d: %s" %
1159
                              (instance.name, nic_idx, err))
1160

    
1161
          # if we're moving instances to routed, check that they have an ip
1162
          target_mode = params_filled[constants.NIC_MODE]
1163
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1164
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1165
                              " address" % (instance.name, nic_idx))
1166
      if nic_errors:
1167
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1168
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1169

    
1170
    # hypervisor list/parameters
1171
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1172
    if self.op.hvparams:
1173
      for hv_name, hv_dict in self.op.hvparams.items():
1174
        if hv_name not in self.new_hvparams:
1175
          self.new_hvparams[hv_name] = hv_dict
1176
        else:
1177
          self.new_hvparams[hv_name].update(hv_dict)
1178

    
1179
    # disk template parameters
1180
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1181
    if self.op.diskparams:
1182
      for dt_name, dt_params in self.op.diskparams.items():
1183
        if dt_name not in self.new_diskparams:
1184
          self.new_diskparams[dt_name] = dt_params
1185
        else:
1186
          self.new_diskparams[dt_name].update(dt_params)
1187
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1188

    
1189
    # os hypervisor parameters
1190
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1191
    if self.op.os_hvp:
1192
      for os_name, hvs in self.op.os_hvp.items():
1193
        if os_name not in self.new_os_hvp:
1194
          self.new_os_hvp[os_name] = hvs
1195
        else:
1196
          for hv_name, hv_dict in hvs.items():
1197
            if hv_dict is None:
1198
              # Delete if it exists
1199
              self.new_os_hvp[os_name].pop(hv_name, None)
1200
            elif hv_name not in self.new_os_hvp[os_name]:
1201
              self.new_os_hvp[os_name][hv_name] = hv_dict
1202
            else:
1203
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1204

    
1205
    # os parameters
1206
    self._BuildOSParams(cluster)
1207

    
1208
    # changes to the hypervisor list
1209
    if self.op.enabled_hypervisors is not None:
1210
      self.hv_list = self.op.enabled_hypervisors
1211
      for hv in self.hv_list:
1212
        # if the hypervisor doesn't already exist in the cluster
1213
        # hvparams, we initialize it to empty, and then (in both
1214
        # cases) we make sure to fill the defaults, as we might not
1215
        # have a complete defaults list if the hypervisor wasn't
1216
        # enabled before
1217
        if hv not in new_hvp:
1218
          new_hvp[hv] = {}
1219
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1220
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1221
    else:
1222
      self.hv_list = cluster.enabled_hypervisors
1223

    
1224
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1225
      # either the enabled list has changed, or the parameters have, validate
1226
      for hv_name, hv_params in self.new_hvparams.items():
1227
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1228
            (self.op.enabled_hypervisors and
1229
             hv_name in self.op.enabled_hypervisors)):
1230
          # either this is a new hypervisor, or its parameters have changed
1231
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1232
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1233
          hv_class.CheckParameterSyntax(hv_params)
1234
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1235

    
1236
    self._CheckDiskTemplateConsistency()
1237

    
1238
    if self.op.os_hvp:
1239
      # no need to check any newly-enabled hypervisors, since the
1240
      # defaults have already been checked in the above code-block
1241
      for os_name, os_hvp in self.new_os_hvp.items():
1242
        for hv_name, hv_params in os_hvp.items():
1243
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1244
          # we need to fill in the new os_hvp on top of the actual hv_p
1245
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1246
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1247
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1248
          hv_class.CheckParameterSyntax(new_osp)
1249
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1250

    
1251
    if self.op.default_iallocator:
1252
      alloc_script = utils.FindFile(self.op.default_iallocator,
1253
                                    constants.IALLOCATOR_SEARCH_PATH,
1254
                                    os.path.isfile)
1255
      if alloc_script is None:
1256
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1257
                                   " specified" % self.op.default_iallocator,
1258
                                   errors.ECODE_INVAL)
1259

    
1260
    if self.op.instance_communication_network:
1261
      network_name = self.op.instance_communication_network
1262

    
1263
      try:
1264
        network_uuid = self.cfg.LookupNetwork(network_name)
1265
      except errors.OpPrereqError:
1266
        network_uuid = None
1267

    
1268
      if network_uuid is not None:
1269
        network = self.cfg.GetNetwork(network_uuid)
1270
        self._CheckInstanceCommunicationNetwork(network, self.LogWarning)
1271

    
1272
  def _BuildOSParams(self, cluster):
1273
    "Calculate the new OS parameters for this operation."
1274

    
1275
    def _GetNewParams(source, new_params):
1276
      "Wrapper around GetUpdatedParams."
1277
      if new_params is None:
1278
        return source
1279
      result = objects.FillDict(source, {}) # deep copy of source
1280
      for os_name in new_params:
1281
        result[os_name] = GetUpdatedParams(result.get(os_name, {}),
1282
                                           new_params[os_name],
1283
                                           use_none=True)
1284
        if not result[os_name]:
1285
          del result[os_name] # we removed all parameters
1286
      return result
1287

    
1288
    self.new_osp = _GetNewParams(cluster.osparams,
1289
                                 self.op.osparams)
1290
    self.new_osp_private = _GetNewParams(cluster.osparams_private_cluster,
1291
                                         self.op.osparams_private_cluster)
1292

    
1293
    # Remove os validity check
1294
    changed_oses = (set(self.new_osp.keys()) | set(self.new_osp_private.keys()))
1295
    for os_name in changed_oses:
1296
      os_params = cluster.SimpleFillOS(
1297
        os_name,
1298
        self.new_osp.get(os_name, {}),
1299
        os_params_private=self.new_osp_private.get(os_name, {})
1300
      )
1301
      # check the parameter validity (remote check)
1302
      CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1303
                    os_name, os_params)
1304

    
1305
  def _CheckDiskTemplateConsistency(self):
1306
    """Check whether the disk templates that are going to be disabled
1307
       are still in use by some instances.
1308

1309
    """
1310
    if self.op.enabled_disk_templates:
1311
      cluster = self.cfg.GetClusterInfo()
1312
      instances = self.cfg.GetAllInstancesInfo()
1313

    
1314
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1315
        - set(self.op.enabled_disk_templates)
1316
      for instance in instances.itervalues():
1317
        if instance.disk_template in disk_templates_to_remove:
1318
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1319
                                     " because instance '%s' is using it." %
1320
                                     (instance.disk_template, instance.name))
1321

    
1322
  def _SetVgName(self, feedback_fn):
1323
    """Determines and sets the new volume group name.
1324

1325
    """
1326
    if self.op.vg_name is not None:
1327
      new_volume = self.op.vg_name
1328
      if not new_volume:
1329
        new_volume = None
1330
      if new_volume != self.cfg.GetVGName():
1331
        self.cfg.SetVGName(new_volume)
1332
      else:
1333
        feedback_fn("Cluster LVM configuration already in desired"
1334
                    " state, not changing")
1335

    
1336
  def _SetFileStorageDir(self, feedback_fn):
1337
    """Set the file storage directory.
1338

1339
    """
1340
    if self.op.file_storage_dir is not None:
1341
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1342
        feedback_fn("Global file storage dir already set to value '%s'"
1343
                    % self.cluster.file_storage_dir)
1344
      else:
1345
        self.cluster.file_storage_dir = self.op.file_storage_dir
1346

    
1347
  def _SetDrbdHelper(self, feedback_fn):
1348
    """Set the DRBD usermode helper.
1349

1350
    """
1351
    if self.op.drbd_helper is not None:
1352
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1353
        feedback_fn("Note that you specified a drbd user helper, but did not"
1354
                    " enable the drbd disk template.")
1355
      new_helper = self.op.drbd_helper
1356
      if not new_helper:
1357
        new_helper = None
1358
      if new_helper != self.cfg.GetDRBDHelper():
1359
        self.cfg.SetDRBDHelper(new_helper)
1360
      else:
1361
        feedback_fn("Cluster DRBD helper already in desired state,"
1362
                    " not changing")
1363

    
1364
  @staticmethod
1365
  def _EnsureInstanceCommunicationNetwork(cfg, network_name):
1366
    """Ensure that the instance communication network exists and is
1367
    connected to all groups.
1368

1369
    The instance communication network given by L{network_name} it is
1370
    created, if necessary, via the opcode 'OpNetworkAdd'.  Also, the
1371
    instance communication network is connected to all existing node
1372
    groups, if necessary, via the opcode 'OpNetworkConnect'.
1373

1374
    @type cfg: L{ganeti.config.ConfigWriter}
1375
    @param cfg: Ganeti configuration
1376

1377
    @type network_name: string
1378
    @param network_name: instance communication network name
1379

1380
    @rtype: L{ganeti.cmdlib.ResultWithJobs} or L{None}
1381
    @return: L{ganeti.cmdlib.ResultWithJobs} if the instance
1382
             communication needs to be created or it needs to be
1383
             connected to a group, otherwise L{None}
1384

1385
    """
1386
    jobs = []
1387

    
1388
    try:
1389
      network_uuid = cfg.LookupNetwork(network_name)
1390
      network_exists = True
1391
    except errors.OpPrereqError:
1392
      network_exists = False
1393

    
1394
    if not network_exists:
1395
      jobs.append(OpAddInstanceCommunicationNetwork(network_name))
1396

    
1397
    for group_uuid in cfg.GetNodeGroupList():
1398
      group = cfg.GetNodeGroup(group_uuid)
1399

    
1400
      if network_exists:
1401
        network_connected = network_uuid in group.networks
1402
      else:
1403
        # The network was created asynchronously by the previous
1404
        # opcode and, therefore, we don't have access to its
1405
        # network_uuid.  As a result, we assume that the network is
1406
        # not connected to any group yet.
1407
        network_connected = False
1408

    
1409
      if not network_connected:
1410
        op = OpConnectInstanceCommunicationNetwork(group_uuid, network_name)
1411
        jobs.append(op)
1412

    
1413
    if jobs:
1414
      return ResultWithJobs([jobs])
1415
    else:
1416
      return None
1417

    
1418
  @staticmethod
1419
  def _ModifyInstanceCommunicationNetwork(cfg, cluster, network_name,
1420
                                          feedback_fn):
1421
    """Update the instance communication network stored in the cluster
1422
    configuration.
1423

1424
    Compares the user-supplied instance communication network against
1425
    the one stored in the Ganeti cluster configuration.  If there is a
1426
    change, the instance communication network may be possibly created
1427
    and connected to all groups (see
1428
    L{LUClusterSetParams._EnsureInstanceCommunicationNetwork}).
1429

1430
    @type cfg: L{ganeti.config.ConfigWriter}
1431
    @param cfg: Ganeti configuration
1432

1433
    @type cluster: L{ganeti.objects.Cluster}
1434
    @param cluster: Ganeti cluster
1435

1436
    @type network_name: string
1437
    @param network_name: instance communication network name
1438

1439
    @type feedback_fn: function
1440
    @param feedback_fn: see L{ganeti.cmdlist.base.LogicalUnit}
1441

1442
    @rtype: L{LUClusterSetParams._EnsureInstanceCommunicationNetwork} or L{None}
1443
    @return: see L{LUClusterSetParams._EnsureInstanceCommunicationNetwork}
1444

1445
    """
1446
    config_network_name = cfg.GetInstanceCommunicationNetwork()
1447

    
1448
    if network_name == config_network_name:
1449
      feedback_fn("Instance communication network already is '%s', nothing to"
1450
                  " do." % network_name)
1451
    else:
1452
      try:
1453
        cfg.LookupNetwork(config_network_name)
1454
        feedback_fn("Previous instance communication network '%s'"
1455
                    " should be removed manually." % config_network_name)
1456
      except errors.OpPrereqError:
1457
        pass
1458

    
1459
      if network_name:
1460
        feedback_fn("Changing instance communication network to '%s', only new"
1461
                    " instances will be affected."
1462
                    % network_name)
1463
      else:
1464
        feedback_fn("Disabling instance communication network, only new"
1465
                    " instances will be affected.")
1466

    
1467
      cluster.instance_communication_network = network_name
1468

    
1469
      if network_name:
1470
        return LUClusterSetParams._EnsureInstanceCommunicationNetwork(
1471
          cfg,
1472
          network_name)
1473
      else:
1474
        return None
1475

    
1476
  def Exec(self, feedback_fn):
1477
    """Change the parameters of the cluster.
1478

1479
    """
1480
    if self.op.enabled_disk_templates:
1481
      self.cluster.enabled_disk_templates = \
1482
        list(self.op.enabled_disk_templates)
1483

    
1484
    self._SetVgName(feedback_fn)
1485
    self._SetFileStorageDir(feedback_fn)
1486
    self._SetDrbdHelper(feedback_fn)
1487

    
1488
    if self.op.hvparams:
1489
      self.cluster.hvparams = self.new_hvparams
1490
    if self.op.os_hvp:
1491
      self.cluster.os_hvp = self.new_os_hvp
1492
    if self.op.enabled_hypervisors is not None:
1493
      self.cluster.hvparams = self.new_hvparams
1494
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1495
    if self.op.beparams:
1496
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1497
    if self.op.nicparams:
1498
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1499
    if self.op.ipolicy:
1500
      self.cluster.ipolicy = self.new_ipolicy
1501
    if self.op.osparams:
1502
      self.cluster.osparams = self.new_osp
1503
    if self.op.osparams_private_cluster:
1504
      self.cluster.osparams_private_cluster = self.new_osp_private
1505
    if self.op.ndparams:
1506
      self.cluster.ndparams = self.new_ndparams
1507
    if self.op.diskparams:
1508
      self.cluster.diskparams = self.new_diskparams
1509
    if self.op.hv_state:
1510
      self.cluster.hv_state_static = self.new_hv_state
1511
    if self.op.disk_state:
1512
      self.cluster.disk_state_static = self.new_disk_state
1513

    
1514
    if self.op.candidate_pool_size is not None:
1515
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1516
      # we need to update the pool size here, otherwise the save will fail
1517
      AdjustCandidatePool(self, [], feedback_fn)
1518

    
1519
    if self.op.max_running_jobs is not None:
1520
      self.cluster.max_running_jobs = self.op.max_running_jobs
1521

    
1522
    if self.op.maintain_node_health is not None:
1523
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1524
        feedback_fn("Note: CONFD was disabled at build time, node health"
1525
                    " maintenance is not useful (still enabling it)")
1526
      self.cluster.maintain_node_health = self.op.maintain_node_health
1527

    
1528
    if self.op.modify_etc_hosts is not None:
1529
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1530

    
1531
    if self.op.prealloc_wipe_disks is not None:
1532
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1533

    
1534
    if self.op.add_uids is not None:
1535
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1536

    
1537
    if self.op.remove_uids is not None:
1538
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1539

    
1540
    if self.op.uid_pool is not None:
1541
      self.cluster.uid_pool = self.op.uid_pool
1542

    
1543
    if self.op.default_iallocator is not None:
1544
      self.cluster.default_iallocator = self.op.default_iallocator
1545

    
1546
    if self.op.default_iallocator_params is not None:
1547
      self.cluster.default_iallocator_params = self.op.default_iallocator_params
1548

    
1549
    if self.op.reserved_lvs is not None:
1550
      self.cluster.reserved_lvs = self.op.reserved_lvs
1551

    
1552
    if self.op.use_external_mip_script is not None:
1553
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1554

    
1555
    def helper_os(aname, mods, desc):
1556
      desc += " OS list"
1557
      lst = getattr(self.cluster, aname)
1558
      for key, val in mods:
1559
        if key == constants.DDM_ADD:
1560
          if val in lst:
1561
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1562
          else:
1563
            lst.append(val)
1564
        elif key == constants.DDM_REMOVE:
1565
          if val in lst:
1566
            lst.remove(val)
1567
          else:
1568
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1569
        else:
1570
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1571

    
1572
    if self.op.hidden_os:
1573
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1574

    
1575
    if self.op.blacklisted_os:
1576
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1577

    
1578
    if self.op.master_netdev:
1579
      master_params = self.cfg.GetMasterNetworkParameters()
1580
      ems = self.cfg.GetUseExternalMipScript()
1581
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1582
                  self.cluster.master_netdev)
1583
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1584
                                                       master_params, ems)
1585
      if not self.op.force:
1586
        result.Raise("Could not disable the master ip")
1587
      else:
1588
        if result.fail_msg:
1589
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1590
                 result.fail_msg)
1591
          feedback_fn(msg)
1592
      feedback_fn("Changing master_netdev from %s to %s" %
1593
                  (master_params.netdev, self.op.master_netdev))
1594
      self.cluster.master_netdev = self.op.master_netdev
1595

    
1596
    if self.op.master_netmask:
1597
      master_params = self.cfg.GetMasterNetworkParameters()
1598
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1599
      result = self.rpc.call_node_change_master_netmask(
1600
                 master_params.uuid, master_params.netmask,
1601
                 self.op.master_netmask, master_params.ip,
1602
                 master_params.netdev)
1603
      result.Warn("Could not change the master IP netmask", feedback_fn)
1604
      self.cluster.master_netmask = self.op.master_netmask
1605

    
1606
    self.cfg.Update(self.cluster, feedback_fn)
1607

    
1608
    if self.op.master_netdev:
1609
      master_params = self.cfg.GetMasterNetworkParameters()
1610
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1611
                  self.op.master_netdev)
1612
      ems = self.cfg.GetUseExternalMipScript()
1613
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1614
                                                     master_params, ems)
1615
      result.Warn("Could not re-enable the master ip on the master,"
1616
                  " please restart manually", self.LogWarning)
1617

    
1618
    network_name = self.op.instance_communication_network
1619
    if network_name is not None:
1620
      return self._ModifyInstanceCommunicationNetwork(self.cfg, self.cluster,
1621
                                                      network_name, feedback_fn)
1622
    else:
1623
      return None
1624

    
1625

    
1626
class LUClusterVerify(NoHooksLU):
1627
  """Submits all jobs necessary to verify the cluster.
1628

1629
  """
1630
  REQ_BGL = False
1631

    
1632
  def ExpandNames(self):
1633
    self.needed_locks = {}
1634

    
1635
  def Exec(self, feedback_fn):
1636
    jobs = []
1637

    
1638
    if self.op.group_name:
1639
      groups = [self.op.group_name]
1640
      depends_fn = lambda: None
1641
    else:
1642
      groups = self.cfg.GetNodeGroupList()
1643

    
1644
      # Verify global configuration
1645
      jobs.append([
1646
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1647
        ])
1648

    
1649
      # Always depend on global verification
1650
      depends_fn = lambda: [(-len(jobs), [])]
1651

    
1652
    jobs.extend(
1653
      [opcodes.OpClusterVerifyGroup(group_name=group,
1654
                                    ignore_errors=self.op.ignore_errors,
1655
                                    depends=depends_fn())]
1656
      for group in groups)
1657

    
1658
    # Fix up all parameters
1659
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1660
      op.debug_simulate_errors = self.op.debug_simulate_errors
1661
      op.verbose = self.op.verbose
1662
      op.error_codes = self.op.error_codes
1663
      try:
1664
        op.skip_checks = self.op.skip_checks
1665
      except AttributeError:
1666
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1667

    
1668
    return ResultWithJobs(jobs)
1669

    
1670

    
1671
class _VerifyErrors(object):
1672
  """Mix-in for cluster/group verify LUs.
1673

1674
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1675
  self.op and self._feedback_fn to be available.)
1676

1677
  """
1678

    
1679
  ETYPE_FIELD = "code"
1680
  ETYPE_ERROR = constants.CV_ERROR
1681
  ETYPE_WARNING = constants.CV_WARNING
1682

    
1683
  def _Error(self, ecode, item, msg, *args, **kwargs):
1684
    """Format an error message.
1685

1686
    Based on the opcode's error_codes parameter, either format a
1687
    parseable error code, or a simpler error string.
1688

1689
    This must be called only from Exec and functions called from Exec.
1690

1691
    """
1692
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1693
    itype, etxt, _ = ecode
1694
    # If the error code is in the list of ignored errors, demote the error to a
1695
    # warning
1696
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1697
      ltype = self.ETYPE_WARNING
1698
    # first complete the msg
1699
    if args:
1700
      msg = msg % args
1701
    # then format the whole message
1702
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1703
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1704
    else:
1705
      if item:
1706
        item = " " + item
1707
      else:
1708
        item = ""
1709
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1710
    # and finally report it via the feedback_fn
1711
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1712
    # do not mark the operation as failed for WARN cases only
1713
    if ltype == self.ETYPE_ERROR:
1714
      self.bad = True
1715

    
1716
  def _ErrorIf(self, cond, *args, **kwargs):
1717
    """Log an error message if the passed condition is True.
1718

1719
    """
1720
    if (bool(cond)
1721
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1722
      self._Error(*args, **kwargs)
1723

    
1724

    
1725
def _GetAllHypervisorParameters(cluster, instances):
1726
  """Compute the set of all hypervisor parameters.
1727

1728
  @type cluster: L{objects.Cluster}
1729
  @param cluster: the cluster object
1730
  @param instances: list of L{objects.Instance}
1731
  @param instances: additional instances from which to obtain parameters
1732
  @rtype: list of (origin, hypervisor, parameters)
1733
  @return: a list with all parameters found, indicating the hypervisor they
1734
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1735

1736
  """
1737
  hvp_data = []
1738

    
1739
  for hv_name in cluster.enabled_hypervisors:
1740
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1741

    
1742
  for os_name, os_hvp in cluster.os_hvp.items():
1743
    for hv_name, hv_params in os_hvp.items():
1744
      if hv_params:
1745
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1746
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1747

    
1748
  # TODO: collapse identical parameter values in a single one
1749
  for instance in instances:
1750
    if instance.hvparams:
1751
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1752
                       cluster.FillHV(instance)))
1753

    
1754
  return hvp_data
1755

    
1756

    
1757
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1758
  """Verifies the cluster config.
1759

1760
  """
1761
  REQ_BGL = False
1762

    
1763
  def _VerifyHVP(self, hvp_data):
1764
    """Verifies locally the syntax of the hypervisor parameters.
1765

1766
    """
1767
    for item, hv_name, hv_params in hvp_data:
1768
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1769
             (item, hv_name))
1770
      try:
1771
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1772
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1773
        hv_class.CheckParameterSyntax(hv_params)
1774
      except errors.GenericError, err:
1775
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1776

    
1777
  def ExpandNames(self):
1778
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1779
    self.share_locks = ShareAll()
1780

    
1781
  def CheckPrereq(self):
1782
    """Check prerequisites.
1783

1784
    """
1785
    # Retrieve all information
1786
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1787
    self.all_node_info = self.cfg.GetAllNodesInfo()
1788
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1789

    
1790
  def Exec(self, feedback_fn):
1791
    """Verify integrity of cluster, performing various test on nodes.
1792

1793
    """
1794
    self.bad = False
1795
    self._feedback_fn = feedback_fn
1796

    
1797
    feedback_fn("* Verifying cluster config")
1798

    
1799
    for msg in self.cfg.VerifyConfig():
1800
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1801

    
1802
    feedback_fn("* Verifying cluster certificate files")
1803

    
1804
    for cert_filename in pathutils.ALL_CERT_FILES:
1805
      (errcode, msg) = utils.VerifyCertificate(cert_filename)
1806
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1807

    
1808
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1809
                                    pathutils.NODED_CERT_FILE),
1810
                  constants.CV_ECLUSTERCERT,
1811
                  None,
1812
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1813
                    constants.LUXID_USER + " user")
1814

    
1815
    feedback_fn("* Verifying hypervisor parameters")
1816

    
1817
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1818
                                                self.all_inst_info.values()))
1819

    
1820
    feedback_fn("* Verifying all nodes belong to an existing group")
1821

    
1822
    # We do this verification here because, should this bogus circumstance
1823
    # occur, it would never be caught by VerifyGroup, which only acts on
1824
    # nodes/instances reachable from existing node groups.
1825

    
1826
    dangling_nodes = set(node for node in self.all_node_info.values()
1827
                         if node.group not in self.all_group_info)
1828

    
1829
    dangling_instances = {}
1830
    no_node_instances = []
1831

    
1832
    for inst in self.all_inst_info.values():
1833
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1834
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1835
      elif inst.primary_node not in self.all_node_info:
1836
        no_node_instances.append(inst)
1837

    
1838
    pretty_dangling = [
1839
        "%s (%s)" %
1840
        (node.name,
1841
         utils.CommaJoin(inst.name for
1842
                         inst in dangling_instances.get(node.uuid, [])))
1843
        for node in dangling_nodes]
1844

    
1845
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1846
                  None,
1847
                  "the following nodes (and their instances) belong to a non"
1848
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1849

    
1850
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1851
                  None,
1852
                  "the following instances have a non-existing primary-node:"
1853
                  " %s", utils.CommaJoin(inst.name for
1854
                                         inst in no_node_instances))
1855

    
1856
    return not self.bad
1857

    
1858

    
1859
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1860
  """Verifies the status of a node group.
1861

1862
  """
1863
  HPATH = "cluster-verify"
1864
  HTYPE = constants.HTYPE_CLUSTER
1865
  REQ_BGL = False
1866

    
1867
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1868

    
1869
  class NodeImage(object):
1870
    """A class representing the logical and physical status of a node.
1871

1872
    @type uuid: string
1873
    @ivar uuid: the node UUID to which this object refers
1874
    @ivar volumes: a structure as returned from
1875
        L{ganeti.backend.GetVolumeList} (runtime)
1876
    @ivar instances: a list of running instances (runtime)
1877
    @ivar pinst: list of configured primary instances (config)
1878
    @ivar sinst: list of configured secondary instances (config)
1879
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1880
        instances for which this node is secondary (config)
1881
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1882
    @ivar dfree: free disk, as reported by the node (runtime)
1883
    @ivar offline: the offline status (config)
1884
    @type rpc_fail: boolean
1885
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1886
        not whether the individual keys were correct) (runtime)
1887
    @type lvm_fail: boolean
1888
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1889
    @type hyp_fail: boolean
1890
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1891
    @type ghost: boolean
1892
    @ivar ghost: whether this is a known node or not (config)
1893
    @type os_fail: boolean
1894
    @ivar os_fail: whether the RPC call didn't return valid OS data
1895
    @type oslist: list
1896
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1897
    @type vm_capable: boolean
1898
    @ivar vm_capable: whether the node can host instances
1899
    @type pv_min: float
1900
    @ivar pv_min: size in MiB of the smallest PVs
1901
    @type pv_max: float
1902
    @ivar pv_max: size in MiB of the biggest PVs
1903

1904
    """
1905
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1906
      self.uuid = uuid
1907
      self.volumes = {}
1908
      self.instances = []
1909
      self.pinst = []
1910
      self.sinst = []
1911
      self.sbp = {}
1912
      self.mfree = 0
1913
      self.dfree = 0
1914
      self.offline = offline
1915
      self.vm_capable = vm_capable
1916
      self.rpc_fail = False
1917
      self.lvm_fail = False
1918
      self.hyp_fail = False
1919
      self.ghost = False
1920
      self.os_fail = False
1921
      self.oslist = {}
1922
      self.pv_min = None
1923
      self.pv_max = None
1924

    
1925
  def ExpandNames(self):
1926
    # This raises errors.OpPrereqError on its own:
1927
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1928

    
1929
    # Get instances in node group; this is unsafe and needs verification later
1930
    inst_uuids = \
1931
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1932

    
1933
    self.needed_locks = {
1934
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1935
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1936
      locking.LEVEL_NODE: [],
1937

    
1938
      # This opcode is run by watcher every five minutes and acquires all nodes
1939
      # for a group. It doesn't run for a long time, so it's better to acquire
1940
      # the node allocation lock as well.
1941
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1942
      }
1943

    
1944
    self.share_locks = ShareAll()
1945

    
1946
  def DeclareLocks(self, level):
1947
    if level == locking.LEVEL_NODE:
1948
      # Get members of node group; this is unsafe and needs verification later
1949
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1950

    
1951
      # In Exec(), we warn about mirrored instances that have primary and
1952
      # secondary living in separate node groups. To fully verify that
1953
      # volumes for these instances are healthy, we will need to do an
1954
      # extra call to their secondaries. We ensure here those nodes will
1955
      # be locked.
1956
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1957
        # Important: access only the instances whose lock is owned
1958
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1959
        if instance.disk_template in constants.DTS_INT_MIRROR:
1960
          nodes.update(instance.secondary_nodes)
1961

    
1962
      self.needed_locks[locking.LEVEL_NODE] = nodes
1963

    
1964
  def CheckPrereq(self):
1965
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1966
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1967

    
1968
    group_node_uuids = set(self.group_info.members)
1969
    group_inst_uuids = \
1970
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1971

    
1972
    unlocked_node_uuids = \
1973
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1974

    
1975
    unlocked_inst_uuids = \
1976
        group_inst_uuids.difference(
1977
          [self.cfg.GetInstanceInfoByName(name).uuid
1978
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1979

    
1980
    if unlocked_node_uuids:
1981
      raise errors.OpPrereqError(
1982
        "Missing lock for nodes: %s" %
1983
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1984
        errors.ECODE_STATE)
1985

    
1986
    if unlocked_inst_uuids:
1987
      raise errors.OpPrereqError(
1988
        "Missing lock for instances: %s" %
1989
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1990
        errors.ECODE_STATE)
1991

    
1992
    self.all_node_info = self.cfg.GetAllNodesInfo()
1993
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1994

    
1995
    self.my_node_uuids = group_node_uuids
1996
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1997
                             for node_uuid in group_node_uuids)
1998

    
1999
    self.my_inst_uuids = group_inst_uuids
2000
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
2001
                             for inst_uuid in group_inst_uuids)
2002

    
2003
    # We detect here the nodes that will need the extra RPC calls for verifying
2004
    # split LV volumes; they should be locked.
2005
    extra_lv_nodes = set()
2006

    
2007
    for inst in self.my_inst_info.values():
2008
      if inst.disk_template in constants.DTS_INT_MIRROR:
2009
        for nuuid in inst.all_nodes:
2010
          if self.all_node_info[nuuid].group != self.group_uuid:
2011
            extra_lv_nodes.add(nuuid)
2012

    
2013
    unlocked_lv_nodes = \
2014
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
2015

    
2016
    if unlocked_lv_nodes:
2017
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
2018
                                 utils.CommaJoin(unlocked_lv_nodes),
2019
                                 errors.ECODE_STATE)
2020
    self.extra_lv_nodes = list(extra_lv_nodes)
2021

    
2022
  def _VerifyNode(self, ninfo, nresult):
2023
    """Perform some basic validation on data returned from a node.
2024

2025
      - check the result data structure is well formed and has all the
2026
        mandatory fields
2027
      - check ganeti version
2028

2029
    @type ninfo: L{objects.Node}
2030
    @param ninfo: the node to check
2031
    @param nresult: the results from the node
2032
    @rtype: boolean
2033
    @return: whether overall this call was successful (and we can expect
2034
         reasonable values in the respose)
2035

2036
    """
2037
    # main result, nresult should be a non-empty dict
2038
    test = not nresult or not isinstance(nresult, dict)
2039
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
2040
                  "unable to verify node: no data returned")
2041
    if test:
2042
      return False
2043

    
2044
    # compares ganeti version
2045
    local_version = constants.PROTOCOL_VERSION
2046
    remote_version = nresult.get("version", None)
2047
    test = not (remote_version and
2048
                isinstance(remote_version, (list, tuple)) and
2049
                len(remote_version) == 2)
2050
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
2051
                  "connection to node returned invalid data")
2052
    if test:
2053
      return False
2054

    
2055
    test = local_version != remote_version[0]
2056
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
2057
                  "incompatible protocol versions: master %s,"
2058
                  " node %s", local_version, remote_version[0])
2059
    if test:
2060
      return False
2061

    
2062
    # node seems compatible, we can actually try to look into its results
2063

    
2064
    # full package version
2065
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
2066
                  constants.CV_ENODEVERSION, ninfo.name,
2067
                  "software version mismatch: master %s, node %s",
2068
                  constants.RELEASE_VERSION, remote_version[1],
2069
                  code=self.ETYPE_WARNING)
2070

    
2071
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
2072
    if ninfo.vm_capable and isinstance(hyp_result, dict):
2073
      for hv_name, hv_result in hyp_result.iteritems():
2074
        test = hv_result is not None
2075
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2076
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
2077

    
2078
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
2079
    if ninfo.vm_capable and isinstance(hvp_result, list):
2080
      for item, hv_name, hv_result in hvp_result:
2081
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
2082
                      "hypervisor %s parameter verify failure (source %s): %s",
2083
                      hv_name, item, hv_result)
2084

    
2085
    test = nresult.get(constants.NV_NODESETUP,
2086
                       ["Missing NODESETUP results"])
2087
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
2088
                  "node setup error: %s", "; ".join(test))
2089

    
2090
    return True
2091

    
2092
  def _VerifyNodeTime(self, ninfo, nresult,
2093
                      nvinfo_starttime, nvinfo_endtime):
2094
    """Check the node time.
2095

2096
    @type ninfo: L{objects.Node}
2097
    @param ninfo: the node to check
2098
    @param nresult: the remote results for the node
2099
    @param nvinfo_starttime: the start time of the RPC call
2100
    @param nvinfo_endtime: the end time of the RPC call
2101

2102
    """
2103
    ntime = nresult.get(constants.NV_TIME, None)
2104
    try:
2105
      ntime_merged = utils.MergeTime(ntime)
2106
    except (ValueError, TypeError):
2107
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
2108
                    "Node returned invalid time")
2109
      return
2110

    
2111
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
2112
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
2113
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
2114
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
2115
    else:
2116
      ntime_diff = None
2117

    
2118
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
2119
                  "Node time diverges by at least %s from master node time",
2120
                  ntime_diff)
2121

    
2122
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
2123
    """Check the node LVM results and update info for cross-node checks.
2124

2125
    @type ninfo: L{objects.Node}
2126
    @param ninfo: the node to check
2127
    @param nresult: the remote results for the node
2128
    @param vg_name: the configured VG name
2129
    @type nimg: L{NodeImage}
2130
    @param nimg: node image
2131

2132
    """
2133
    if vg_name is None:
2134
      return
2135

    
2136
    # checks vg existence and size > 20G
2137
    vglist = nresult.get(constants.NV_VGLIST, None)
2138
    test = not vglist
2139
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2140
                  "unable to check volume groups")
2141
    if not test:
2142
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
2143
                                            constants.MIN_VG_SIZE)
2144
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
2145

    
2146
    # Check PVs
2147
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
2148
    for em in errmsgs:
2149
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
2150
    if pvminmax is not None:
2151
      (nimg.pv_min, nimg.pv_max) = pvminmax
2152

    
2153
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
2154
    """Check cross-node DRBD version consistency.
2155

2156
    @type node_verify_infos: dict
2157
    @param node_verify_infos: infos about nodes as returned from the
2158
      node_verify call.
2159

2160
    """
2161
    node_versions = {}
2162
    for node_uuid, ndata in node_verify_infos.items():
2163
      nresult = ndata.payload
2164
      if nresult:
2165
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
2166
        node_versions[node_uuid] = version
2167

    
2168
    if len(set(node_versions.values())) > 1:
2169
      for node_uuid, version in sorted(node_versions.items()):
2170
        msg = "DRBD version mismatch: %s" % version
2171
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
2172
                    code=self.ETYPE_WARNING)
2173

    
2174
  def _VerifyGroupLVM(self, node_image, vg_name):
2175
    """Check cross-node consistency in LVM.
2176

2177
    @type node_image: dict
2178
    @param node_image: info about nodes, mapping from node to names to
2179
      L{NodeImage} objects
2180
    @param vg_name: the configured VG name
2181

2182
    """
2183
    if vg_name is None:
2184
      return
2185

    
2186
    # Only exclusive storage needs this kind of checks
2187
    if not self._exclusive_storage:
2188
      return
2189

    
2190
    # exclusive_storage wants all PVs to have the same size (approximately),
2191
    # if the smallest and the biggest ones are okay, everything is fine.
2192
    # pv_min is None iff pv_max is None
2193
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
2194
    if not vals:
2195
      return
2196
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
2197
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
2198
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
2199
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
2200
                  "PV sizes differ too much in the group; smallest (%s MB) is"
2201
                  " on %s, biggest (%s MB) is on %s",
2202
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
2203
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
2204

    
2205
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
2206
    """Check the node bridges.
2207

2208
    @type ninfo: L{objects.Node}
2209
    @param ninfo: the node to check
2210
    @param nresult: the remote results for the node
2211
    @param bridges: the expected list of bridges
2212

2213
    """
2214
    if not bridges:
2215
      return
2216

    
2217
    missing = nresult.get(constants.NV_BRIDGES, None)
2218
    test = not isinstance(missing, list)
2219
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2220
                  "did not return valid bridge information")
2221
    if not test:
2222
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
2223
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
2224

    
2225
  def _VerifyNodeUserScripts(self, ninfo, nresult):
2226
    """Check the results of user scripts presence and executability on the node
2227

2228
    @type ninfo: L{objects.Node}
2229
    @param ninfo: the node to check
2230
    @param nresult: the remote results for the node
2231

2232
    """
2233
    test = not constants.NV_USERSCRIPTS in nresult
2234
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2235
                  "did not return user scripts information")
2236

    
2237
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2238
    if not test:
2239
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2240
                    "user scripts not present or not executable: %s" %
2241
                    utils.CommaJoin(sorted(broken_scripts)))
2242

    
2243
  def _VerifyNodeNetwork(self, ninfo, nresult):
2244
    """Check the node network connectivity results.
2245

2246
    @type ninfo: L{objects.Node}
2247
    @param ninfo: the node to check
2248
    @param nresult: the remote results for the node
2249

2250
    """
2251
    test = constants.NV_NODELIST not in nresult
2252
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2253
                  "node hasn't returned node ssh connectivity data")
2254
    if not test:
2255
      if nresult[constants.NV_NODELIST]:
2256
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2257
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2258
                        "ssh communication with node '%s': %s", a_node, a_msg)
2259

    
2260
    test = constants.NV_NODENETTEST not in nresult
2261
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2262
                  "node hasn't returned node tcp connectivity data")
2263
    if not test:
2264
      if nresult[constants.NV_NODENETTEST]:
2265
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2266
        for anode in nlist:
2267
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2268
                        "tcp communication with node '%s': %s",
2269
                        anode, nresult[constants.NV_NODENETTEST][anode])
2270

    
2271
    test = constants.NV_MASTERIP not in nresult
2272
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2273
                  "node hasn't returned node master IP reachability data")
2274
    if not test:
2275
      if not nresult[constants.NV_MASTERIP]:
2276
        if ninfo.uuid == self.master_node:
2277
          msg = "the master node cannot reach the master IP (not configured?)"
2278
        else:
2279
          msg = "cannot reach the master IP"
2280
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2281

    
2282
  def _VerifyInstance(self, instance, node_image, diskstatus):
2283
    """Verify an instance.
2284

2285
    This function checks to see if the required block devices are
2286
    available on the instance's node, and that the nodes are in the correct
2287
    state.
2288

2289
    """
2290
    pnode_uuid = instance.primary_node
2291
    pnode_img = node_image[pnode_uuid]
2292
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2293

    
2294
    node_vol_should = {}
2295
    instance.MapLVsByNode(node_vol_should)
2296

    
2297
    cluster = self.cfg.GetClusterInfo()
2298
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2299
                                                            self.group_info)
2300
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2301
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2302
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2303

    
2304
    for node_uuid in node_vol_should:
2305
      n_img = node_image[node_uuid]
2306
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2307
        # ignore missing volumes on offline or broken nodes
2308
        continue
2309
      for volume in node_vol_should[node_uuid]:
2310
        test = volume not in n_img.volumes
2311
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2312
                      "volume %s missing on node %s", volume,
2313
                      self.cfg.GetNodeName(node_uuid))
2314

    
2315
    if instance.admin_state == constants.ADMINST_UP:
2316
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2317
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2318
                    "instance not running on its primary node %s",
2319
                     self.cfg.GetNodeName(pnode_uuid))
2320
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2321
                    instance.name, "instance is marked as running and lives on"
2322
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2323

    
2324
    diskdata = [(nname, success, status, idx)
2325
                for (nname, disks) in diskstatus.items()
2326
                for idx, (success, status) in enumerate(disks)]
2327

    
2328
    for nname, success, bdev_status, idx in diskdata:
2329
      # the 'ghost node' construction in Exec() ensures that we have a
2330
      # node here
2331
      snode = node_image[nname]
2332
      bad_snode = snode.ghost or snode.offline
2333
      self._ErrorIf(instance.disks_active and
2334
                    not success and not bad_snode,
2335
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2336
                    "couldn't retrieve status for disk/%s on %s: %s",
2337
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2338

    
2339
      if instance.disks_active and success and \
2340
         (bdev_status.is_degraded or
2341
          bdev_status.ldisk_status != constants.LDS_OKAY):
2342
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2343
        if bdev_status.is_degraded:
2344
          msg += " is degraded"
2345
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2346
          msg += "; state is '%s'" % \
2347
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2348

    
2349
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2350

    
2351
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2352
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2353
                  "instance %s, connection to primary node failed",
2354
                  instance.name)
2355

    
2356
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2357
                  constants.CV_EINSTANCELAYOUT, instance.name,
2358
                  "instance has multiple secondary nodes: %s",
2359
                  utils.CommaJoin(instance.secondary_nodes),
2360
                  code=self.ETYPE_WARNING)
2361

    
2362
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2363
    if any(es_flags.values()):
2364
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2365
        # Disk template not compatible with exclusive_storage: no instance
2366
        # node should have the flag set
2367
        es_nodes = [n
2368
                    for (n, es) in es_flags.items()
2369
                    if es]
2370
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2371
                    "instance has template %s, which is not supported on nodes"
2372
                    " that have exclusive storage set: %s",
2373
                    instance.disk_template,
2374
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2375
      for (idx, disk) in enumerate(instance.disks):
2376
        self._ErrorIf(disk.spindles is None,
2377
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2378
                      "number of spindles not configured for disk %s while"
2379
                      " exclusive storage is enabled, try running"
2380
                      " gnt-cluster repair-disk-sizes", idx)
2381

    
2382
    if instance.disk_template in constants.DTS_INT_MIRROR:
2383
      instance_nodes = utils.NiceSort(instance.all_nodes)
2384
      instance_groups = {}
2385

    
2386
      for node_uuid in instance_nodes:
2387
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2388
                                   []).append(node_uuid)
2389

    
2390
      pretty_list = [
2391
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2392
                           groupinfo[group].name)
2393
        # Sort so that we always list the primary node first.
2394
        for group, nodes in sorted(instance_groups.items(),
2395
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2396
                                   reverse=True)]
2397

    
2398
      self._ErrorIf(len(instance_groups) > 1,
2399
                    constants.CV_EINSTANCESPLITGROUPS,
2400
                    instance.name, "instance has primary and secondary nodes in"
2401
                    " different groups: %s", utils.CommaJoin(pretty_list),
2402
                    code=self.ETYPE_WARNING)
2403

    
2404
    inst_nodes_offline = []
2405
    for snode in instance.secondary_nodes:
2406
      s_img = node_image[snode]
2407
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2408
                    self.cfg.GetNodeName(snode),
2409
                    "instance %s, connection to secondary node failed",
2410
                    instance.name)
2411

    
2412
      if s_img.offline:
2413
        inst_nodes_offline.append(snode)
2414

    
2415
    # warn that the instance lives on offline nodes
2416
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2417
                  instance.name, "instance has offline secondary node(s) %s",
2418
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2419
    # ... or ghost/non-vm_capable nodes
2420
    for node_uuid in instance.all_nodes:
2421
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2422
                    instance.name, "instance lives on ghost node %s",
2423
                    self.cfg.GetNodeName(node_uuid))
2424
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2425
                    constants.CV_EINSTANCEBADNODE, instance.name,
2426
                    "instance lives on non-vm_capable node %s",
2427
                    self.cfg.GetNodeName(node_uuid))
2428

    
2429
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2430
    """Verify if there are any unknown volumes in the cluster.
2431

2432
    The .os, .swap and backup volumes are ignored. All other volumes are
2433
    reported as unknown.
2434

2435
    @type reserved: L{ganeti.utils.FieldSet}
2436
    @param reserved: a FieldSet of reserved volume names
2437

2438
    """
2439
    for node_uuid, n_img in node_image.items():
2440
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2441
          self.all_node_info[node_uuid].group != self.group_uuid):
2442
        # skip non-healthy nodes
2443
        continue
2444
      for volume in n_img.volumes:
2445
        test = ((node_uuid not in node_vol_should or
2446
                volume not in node_vol_should[node_uuid]) and
2447
                not reserved.Matches(volume))
2448
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2449
                      self.cfg.GetNodeName(node_uuid),
2450
                      "volume %s is unknown", volume,
2451
                      code=_VerifyErrors.ETYPE_WARNING)
2452

    
2453
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2454
    """Verify N+1 Memory Resilience.
2455

2456
    Check that if one single node dies we can still start all the
2457
    instances it was primary for.
2458

2459
    """
2460
    cluster_info = self.cfg.GetClusterInfo()
2461
    for node_uuid, n_img in node_image.items():
2462
      # This code checks that every node which is now listed as
2463
      # secondary has enough memory to host all instances it is
2464
      # supposed to should a single other node in the cluster fail.
2465
      # FIXME: not ready for failover to an arbitrary node
2466
      # FIXME: does not support file-backed instances
2467
      # WARNING: we currently take into account down instances as well
2468
      # as up ones, considering that even if they're down someone
2469
      # might want to start them even in the event of a node failure.
2470
      if n_img.offline or \
2471
         self.all_node_info[node_uuid].group != self.group_uuid:
2472
        # we're skipping nodes marked offline and nodes in other groups from
2473
        # the N+1 warning, since most likely we don't have good memory
2474
        # information from them; we already list instances living on such
2475
        # nodes, and that's enough warning
2476
        continue
2477
      #TODO(dynmem): also consider ballooning out other instances
2478
      for prinode, inst_uuids in n_img.sbp.items():
2479
        needed_mem = 0
2480
        for inst_uuid in inst_uuids:
2481
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2482
          if bep[constants.BE_AUTO_BALANCE]:
2483
            needed_mem += bep[constants.BE_MINMEM]
2484
        test = n_img.mfree < needed_mem
2485
        self._ErrorIf(test, constants.CV_ENODEN1,
2486
                      self.cfg.GetNodeName(node_uuid),
2487
                      "not enough memory to accomodate instance failovers"
2488
                      " should node %s fail (%dMiB needed, %dMiB available)",
2489
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2490

    
2491
  def _VerifyClientCertificates(self, nodes, all_nvinfo):
2492
    """Verifies the consistency of the client certificates.
2493

2494
    This includes several aspects:
2495
      - the individual validation of all nodes' certificates
2496
      - the consistency of the master candidate certificate map
2497
      - the consistency of the master candidate certificate map with the
2498
        certificates that the master candidates are actually using.
2499

2500
    @param nodes: the list of nodes to consider in this verification
2501
    @param all_nvinfo: the map of results of the verify_node call to
2502
      all nodes
2503

2504
    """
2505
    candidate_certs = self.cfg.GetClusterInfo().candidate_certs
2506
    if candidate_certs is None or len(candidate_certs) == 0:
2507
      self._ErrorIf(
2508
        True, constants.CV_ECLUSTERCLIENTCERT, None,
2509
        "The cluster's list of master candidate certificates is empty."
2510
        "If you just updated the cluster, please run"
2511
        " 'gnt-cluster renew-crypto --new-node-certificates'.")
2512
      return
2513

    
2514
    self._ErrorIf(
2515
      len(candidate_certs) != len(set(candidate_certs.values())),
2516
      constants.CV_ECLUSTERCLIENTCERT, None,
2517
      "There are at least two master candidates configured to use the same"
2518
      " certificate.")
2519

    
2520
    # collect the client certificate
2521
    for node in nodes:
2522
      if node.offline:
2523
        continue
2524

    
2525
      nresult = all_nvinfo[node.uuid]
2526
      if nresult.fail_msg or not nresult.payload:
2527
        continue
2528

    
2529
      (errcode, msg) = nresult.payload.get(constants.NV_CLIENT_CERT, None)
2530

    
2531
      self._ErrorIf(
2532
        errcode is not None, constants.CV_ECLUSTERCLIENTCERT, None,
2533
        "Client certificate of node '%s' failed validation: %s (code '%s')",
2534
        node.uuid, msg, errcode)
2535

    
2536
      if not errcode:
2537
        digest = msg
2538
        if node.master_candidate:
2539
          if node.uuid in candidate_certs:
2540
            self._ErrorIf(
2541
              digest != candidate_certs[node.uuid],
2542
              constants.CV_ECLUSTERCLIENTCERT, None,
2543
              "Client certificate digest of master candidate '%s' does not"
2544
              " match its entry in the cluster's map of master candidate"
2545
              " certificates. Expected: %s Got: %s", node.uuid,
2546
              digest, candidate_certs[node.uuid])
2547
          else:
2548
            self._ErrorIf(
2549
              True, constants.CV_ECLUSTERCLIENTCERT, None,
2550
              "The master candidate '%s' does not have an entry in the"
2551
              " map of candidate certificates.", node.uuid)
2552
            self._ErrorIf(
2553
              digest in candidate_certs.values(),
2554
              constants.CV_ECLUSTERCLIENTCERT, None,
2555
              "Master candidate '%s' is using a certificate of another node.",
2556
              node.uuid)
2557
        else:
2558
          self._ErrorIf(
2559
            node.uuid in candidate_certs,
2560
            constants.CV_ECLUSTERCLIENTCERT, None,
2561
            "Node '%s' is not a master candidate, but still listed in the"
2562
            " map of master candidate certificates.", node.uuid)
2563
          self._ErrorIf(
2564
            (node.uuid not in candidate_certs) and
2565
              (digest in candidate_certs.values()),
2566
            constants.CV_ECLUSTERCLIENTCERT, None,
2567
            "Node '%s' is not a master candidate and is incorrectly using a"
2568
            " certificate of another node which is master candidate.",
2569
            node.uuid)
2570

    
2571
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2572
                   (files_all, files_opt, files_mc, files_vm)):
2573
    """Verifies file checksums collected from all nodes.
2574

2575
    @param nodes: List of L{objects.Node} objects
2576
    @param master_node_uuid: UUID of master node
2577
    @param all_nvinfo: RPC results
2578

2579
    """
2580
    # Define functions determining which nodes to consider for a file
2581
    files2nodefn = [
2582
      (files_all, None),
2583
      (files_mc, lambda node: (node.master_candidate or
2584
                               node.uuid == master_node_uuid)),
2585
      (files_vm, lambda node: node.vm_capable),
2586
      ]
2587

    
2588
    # Build mapping from filename to list of nodes which should have the file
2589
    nodefiles = {}
2590
    for (files, fn) in files2nodefn:
2591
      if fn is None:
2592
        filenodes = nodes
2593
      else:
2594
        filenodes = filter(fn, nodes)
2595
      nodefiles.update((filename,
2596
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2597
                       for filename in files)
2598

    
2599
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2600

    
2601
    fileinfo = dict((filename, {}) for filename in nodefiles)
2602
    ignore_nodes = set()
2603

    
2604
    for node in nodes:
2605
      if node.offline:
2606
        ignore_nodes.add(node.uuid)
2607
        continue
2608

    
2609
      nresult = all_nvinfo[node.uuid]
2610

    
2611
      if nresult.fail_msg or not nresult.payload:
2612
        node_files = None
2613
      else:
2614
        fingerprints = nresult.payload.get(constants.NV_FILELIST, {})
2615
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2616
                          for (key, value) in fingerprints.items())
2617
        del fingerprints
2618

    
2619
      test = not (node_files and isinstance(node_files, dict))
2620
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2621
                    "Node did not return file checksum data")
2622
      if test:
2623
        ignore_nodes.add(node.uuid)
2624
        continue
2625

    
2626
      # Build per-checksum mapping from filename to nodes having it
2627
      for (filename, checksum) in node_files.items():
2628
        assert filename in nodefiles
2629
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2630

    
2631
    for (filename, checksums) in fileinfo.items():
2632
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2633

    
2634
      # Nodes having the file
2635
      with_file = frozenset(node_uuid
2636
                            for node_uuids in fileinfo[filename].values()
2637
                            for node_uuid in node_uuids) - ignore_nodes
2638

    
2639
      expected_nodes = nodefiles[filename] - ignore_nodes
2640

    
2641
      # Nodes missing file
2642
      missing_file = expected_nodes - with_file
2643

    
2644
      if filename in files_opt:
2645
        # All or no nodes
2646
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2647
                      constants.CV_ECLUSTERFILECHECK, None,
2648
                      "File %s is optional, but it must exist on all or no"
2649
                      " nodes (not found on %s)",
2650
                      filename,
2651
                      utils.CommaJoin(
2652
                        utils.NiceSort(
2653
                          map(self.cfg.GetNodeName, missing_file))))
2654
      else:
2655
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2656
                      "File %s is missing from node(s) %s", filename,
2657
                      utils.CommaJoin(
2658
                        utils.NiceSort(
2659
                          map(self.cfg.GetNodeName, missing_file))))
2660

    
2661
        # Warn if a node has a file it shouldn't
2662
        unexpected = with_file - expected_nodes
2663
        self._ErrorIf(unexpected,
2664
                      constants.CV_ECLUSTERFILECHECK, None,
2665
                      "File %s should not exist on node(s) %s",
2666
                      filename, utils.CommaJoin(
2667
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2668

    
2669
      # See if there are multiple versions of the file
2670
      test = len(checksums) > 1
2671
      if test:
2672
        variants = ["variant %s on %s" %
2673
                    (idx + 1,
2674
                     utils.CommaJoin(utils.NiceSort(
2675
                       map(self.cfg.GetNodeName, node_uuids))))
2676
                    for (idx, (checksum, node_uuids)) in
2677
                      enumerate(sorted(checksums.items()))]
2678
      else:
2679
        variants = []
2680

    
2681
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2682
                    "File %s found with %s different checksums (%s)",
2683
                    filename, len(checksums), "; ".join(variants))
2684

    
2685
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2686
    """Verify the drbd helper.
2687

2688
    """
2689
    if drbd_helper:
2690
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2691
      test = (helper_result is None)
2692
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2693
                    "no drbd usermode helper returned")
2694
      if helper_result:
2695
        status, payload = helper_result
2696
        test = not status
2697
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2698
                      "drbd usermode helper check unsuccessful: %s", payload)
2699
        test = status and (payload != drbd_helper)
2700
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2701
                      "wrong drbd usermode helper: %s", payload)
2702

    
2703
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2704
                      drbd_map):
2705
    """Verifies and the node DRBD status.
2706

2707
    @type ninfo: L{objects.Node}
2708
    @param ninfo: the node to check
2709
    @param nresult: the remote results for the node
2710
    @param instanceinfo: the dict of instances
2711
    @param drbd_helper: the configured DRBD usermode helper
2712
    @param drbd_map: the DRBD map as returned by
2713
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2714

2715
    """
2716
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2717

    
2718
    # compute the DRBD minors
2719
    node_drbd = {}
2720
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2721
      test = inst_uuid not in instanceinfo
2722
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2723
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2724
        # ghost instance should not be running, but otherwise we
2725
        # don't give double warnings (both ghost instance and
2726
        # unallocated minor in use)
2727
      if test:
2728
        node_drbd[minor] = (inst_uuid, False)
2729
      else:
2730
        instance = instanceinfo[inst_uuid]
2731
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2732

    
2733
    # and now check them
2734
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2735
    test = not isinstance(used_minors, (tuple, list))
2736
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2737
                  "cannot parse drbd status file: %s", str(used_minors))
2738
    if test:
2739
      # we cannot check drbd status
2740
      return
2741

    
2742
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2743
      test = minor not in used_minors and must_exist
2744
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2745
                    "drbd minor %d of instance %s is not active", minor,
2746
                    self.cfg.GetInstanceName(inst_uuid))
2747
    for minor in used_minors:
2748
      test = minor not in node_drbd
2749
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2750
                    "unallocated drbd minor %d is in use", minor)
2751

    
2752
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2753
    """Builds the node OS structures.
2754

2755
    @type ninfo: L{objects.Node}
2756
    @param ninfo: the node to check
2757
    @param nresult: the remote results for the node
2758
    @param nimg: the node image object
2759

2760
    """
2761
    remote_os = nresult.get(constants.NV_OSLIST, None)
2762
    test = (not isinstance(remote_os, list) or
2763
            not compat.all(isinstance(v, list) and len(v) == 7
2764
                           for v in remote_os))
2765

    
2766
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2767
                  "node hasn't returned valid OS data")
2768

    
2769
    nimg.os_fail = test
2770

    
2771
    if test:
2772
      return
2773

    
2774
    os_dict = {}
2775

    
2776
    for (name, os_path, status, diagnose,
2777
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2778

    
2779
      if name not in os_dict:
2780
        os_dict[name] = []
2781

    
2782
      # parameters is a list of lists instead of list of tuples due to
2783
      # JSON lacking a real tuple type, fix it:
2784
      parameters = [tuple(v) for v in parameters]
2785
      os_dict[name].append((os_path, status, diagnose,
2786
                            set(variants), set(parameters), set(api_ver)))
2787

    
2788
    nimg.oslist = os_dict
2789

    
2790
  def _VerifyNodeOS(self, ninfo, nimg, base):
2791
    """Verifies the node OS list.
2792

2793
    @type ninfo: L{objects.Node}
2794
    @param ninfo: the node to check
2795
    @param nimg: the node image object
2796
    @param base: the 'template' node we match against (e.g. from the master)
2797

2798
    """
2799
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2800

    
2801
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2802
    for os_name, os_data in nimg.oslist.items():
2803
      assert os_data, "Empty OS status for OS %s?!" % os_name
2804
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2805
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2806
                    "Invalid OS %s (located at %s): %s",
2807
                    os_name, f_path, f_diag)
2808
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2809
                    "OS '%s' has multiple entries"
2810
                    " (first one shadows the rest): %s",
2811
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2812
      # comparisons with the 'base' image
2813
      test = os_name not in base.oslist
2814
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2815
                    "Extra OS %s not present on reference node (%s)",
2816
                    os_name, self.cfg.GetNodeName(base.uuid))
2817
      if test:
2818
        continue
2819
      assert base.oslist[os_name], "Base node has empty OS status?"
2820
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2821
      if not b_status:
2822
        # base OS is invalid, skipping
2823
        continue
2824
      for kind, a, b in [("API version", f_api, b_api),
2825
                         ("variants list", f_var, b_var),
2826
                         ("parameters", beautify_params(f_param),
2827
                          beautify_params(b_param))]:
2828
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2829
                      "OS %s for %s differs from reference node %s:"
2830
                      " [%s] vs. [%s]", kind, os_name,
2831
                      self.cfg.GetNodeName(base.uuid),
2832
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2833

    
2834
    # check any missing OSes
2835
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2836
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2837
                  "OSes present on reference node %s"
2838
                  " but missing on this node: %s",
2839
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2840

    
2841
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2842
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2843

2844
    @type ninfo: L{objects.Node}
2845
    @param ninfo: the node to check
2846
    @param nresult: the remote results for the node
2847
    @type is_master: bool
2848
    @param is_master: Whether node is the master node
2849

2850
    """
2851
    cluster = self.cfg.GetClusterInfo()
2852
    if (is_master and
2853
        (cluster.IsFileStorageEnabled() or
2854
         cluster.IsSharedFileStorageEnabled())):
2855
      try:
2856
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2857
      except KeyError:
2858
        # This should never happen
2859
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2860
                      "Node did not return forbidden file storage paths")
2861
      else:
2862
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2863
                      "Found forbidden file storage paths: %s",
2864
                      utils.CommaJoin(fspaths))
2865
    else:
2866
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2867
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2868
                    "Node should not have returned forbidden file storage"
2869
                    " paths")
2870

    
2871
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2872
                          verify_key, error_key):
2873
    """Verifies (file) storage paths.
2874

2875
    @type ninfo: L{objects.Node}
2876
    @param ninfo: the node to check
2877
    @param nresult: the remote results for the node
2878
    @type file_disk_template: string
2879
    @param file_disk_template: file-based disk template, whose directory
2880
        is supposed to be verified
2881
    @type verify_key: string
2882
    @param verify_key: key for the verification map of this file
2883
        verification step
2884
    @param error_key: error key to be added to the verification results
2885
        in case something goes wrong in this verification step
2886

2887
    """
2888
    assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
2889
              constants.ST_FILE, constants.ST_SHARED_FILE
2890
           ))
2891

    
2892
    cluster = self.cfg.GetClusterInfo()
2893
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2894
      self._ErrorIf(
2895
          verify_key in nresult,
2896
          error_key, ninfo.name,
2897
          "The configured %s storage path is unusable: %s" %
2898
          (file_disk_template, nresult.get(verify_key)))
2899

    
2900
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2901
    """Verifies (file) storage paths.
2902

2903
    @see: C{_VerifyStoragePaths}
2904

2905
    """
2906
    self._VerifyStoragePaths(
2907
        ninfo, nresult, constants.DT_FILE,
2908
        constants.NV_FILE_STORAGE_PATH,
2909
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2910

    
2911
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2912
    """Verifies (file) storage paths.
2913

2914
    @see: C{_VerifyStoragePaths}
2915

2916
    """
2917
    self._VerifyStoragePaths(
2918
        ninfo, nresult, constants.DT_SHARED_FILE,
2919
        constants.NV_SHARED_FILE_STORAGE_PATH,
2920
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2921

    
2922
  def _VerifyOob(self, ninfo, nresult):
2923
    """Verifies out of band functionality of a node.
2924

2925
    @type ninfo: L{objects.Node}
2926
    @param ninfo: the node to check
2927
    @param nresult: the remote results for the node
2928

2929
    """
2930
    # We just have to verify the paths on master and/or master candidates
2931
    # as the oob helper is invoked on the master
2932
    if ((ninfo.master_candidate or ninfo.master_capable) and
2933
        constants.NV_OOB_PATHS in nresult):
2934
      for path_result in nresult[constants.NV_OOB_PATHS]:
2935
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2936
                      ninfo.name, path_result)
2937

    
2938
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2939
    """Verifies and updates the node volume data.
2940

2941
    This function will update a L{NodeImage}'s internal structures
2942
    with data from the remote call.
2943

2944
    @type ninfo: L{objects.Node}
2945
    @param ninfo: the node to check
2946
    @param nresult: the remote results for the node
2947
    @param nimg: the node image object
2948
    @param vg_name: the configured VG name
2949

2950
    """
2951
    nimg.lvm_fail = True
2952
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2953
    if vg_name is None:
2954
      pass
2955
    elif isinstance(lvdata, basestring):
2956
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2957
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2958
    elif not isinstance(lvdata, dict):
2959
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2960
                    "rpc call to node failed (lvlist)")
2961
    else:
2962
      nimg.volumes = lvdata
2963
      nimg.lvm_fail = False
2964

    
2965
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2966
    """Verifies and updates the node instance list.
2967

2968
    If the listing was successful, then updates this node's instance
2969
    list. Otherwise, it marks the RPC call as failed for the instance
2970
    list key.
2971

2972
    @type ninfo: L{objects.Node}
2973
    @param ninfo: the node to check
2974
    @param nresult: the remote results for the node
2975
    @param nimg: the node image object
2976

2977
    """
2978
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2979
    test = not isinstance(idata, list)
2980
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2981
                  "rpc call to node failed (instancelist): %s",
2982
                  utils.SafeEncode(str(idata)))
2983
    if test:
2984
      nimg.hyp_fail = True
2985
    else:
2986
      nimg.instances = [inst.uuid for (_, inst) in
2987
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2988

    
2989
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2990
    """Verifies and computes a node information map
2991

2992
    @type ninfo: L{objects.Node}
2993
    @param ninfo: the node to check
2994
    @param nresult: the remote results for the node
2995
    @param nimg: the node image object
2996
    @param vg_name: the configured VG name
2997

2998
    """
2999
    # try to read free memory (from the hypervisor)
3000
    hv_info = nresult.get(constants.NV_HVINFO, None)
3001
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
3002
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
3003
                  "rpc call to node failed (hvinfo)")
3004
    if not test:
3005
      try:
3006
        nimg.mfree = int(hv_info["memory_free"])
3007
      except (ValueError, TypeError):
3008
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
3009
                      "node returned invalid nodeinfo, check hypervisor")
3010

    
3011
    # FIXME: devise a free space model for file based instances as well
3012
    if vg_name is not None:
3013
      test = (constants.NV_VGLIST not in nresult or
3014
              vg_name not in nresult[constants.NV_VGLIST])
3015
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
3016
                    "node didn't return data for the volume group '%s'"
3017
                    " - it is either missing or broken", vg_name)
3018
      if not test:
3019
        try:
3020
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
3021
        except (ValueError, TypeError):
3022
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
3023
                        "node returned invalid LVM info, check LVM status")
3024

    
3025
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
3026
    """Gets per-disk status information for all instances.
3027

3028
    @type node_uuids: list of strings
3029
    @param node_uuids: Node UUIDs
3030
    @type node_image: dict of (UUID, L{objects.Node})
3031
    @param node_image: Node objects
3032
    @type instanceinfo: dict of (UUID, L{objects.Instance})
3033
    @param instanceinfo: Instance objects
3034
    @rtype: {instance: {node: [(succes, payload)]}}
3035
    @return: a dictionary of per-instance dictionaries with nodes as
3036
        keys and disk information as values; the disk information is a
3037
        list of tuples (success, payload)
3038

3039
    """
3040
    node_disks = {}
3041
    node_disks_dev_inst_only = {}
3042
    diskless_instances = set()
3043
    nodisk_instances = set()
3044
    diskless = constants.DT_DISKLESS
3045

    
3046
    for nuuid in node_uuids:
3047
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
3048
                                             node_image[nuuid].sinst))
3049
      diskless_instances.update(uuid for uuid in node_inst_uuids
3050
                                if instanceinfo[uuid].disk_template == diskless)
3051
      disks = [(inst_uuid, disk)
3052
               for inst_uuid in node_inst_uuids
3053
               for disk in instanceinfo[inst_uuid].disks]
3054

    
3055
      if not disks:
3056
        nodisk_instances.update(uuid for uuid in node_inst_uuids
3057
                                if instanceinfo[uuid].disk_template != diskless)
3058
        # No need to collect data
3059
        continue
3060

    
3061
      node_disks[nuuid] = disks
3062

    
3063
      # _AnnotateDiskParams makes already copies of the disks
3064
      dev_inst_only = []
3065
      for (inst_uuid, dev) in disks:
3066
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
3067
                                          self.cfg)
3068
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
3069

    
3070
      node_disks_dev_inst_only[nuuid] = dev_inst_only
3071

    
3072
    assert len(node_disks) == len(node_disks_dev_inst_only)
3073

    
3074
    # Collect data from all nodes with disks
3075
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
3076
               node_disks.keys(), node_disks_dev_inst_only)
3077

    
3078
    assert len(result) == len(node_disks)
3079

    
3080
    instdisk = {}
3081

    
3082
    for (nuuid, nres) in result.items():
3083
      node = self.cfg.GetNodeInfo(nuuid)
3084
      disks = node_disks[node.uuid]
3085

    
3086
      if nres.offline:
3087
        # No data from this node
3088
        data = len(disks) * [(False, "node offline")]
3089
      else:
3090
        msg = nres.fail_msg
3091
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
3092
                      "while getting disk information: %s", msg)
3093
        if msg:
3094
          # No data from this node
3095
          data = len(disks) * [(False, msg)]
3096
        else:
3097
          data = []
3098
          for idx, i in enumerate(nres.payload):
3099
            if isinstance(i, (tuple, list)) and len(i) == 2:
3100
              data.append(i)
3101
            else:
3102
              logging.warning("Invalid result from node %s, entry %d: %s",
3103
                              node.name, idx, i)
3104
              data.append((False, "Invalid result from the remote node"))
3105

    
3106
      for ((inst_uuid, _), status) in zip(disks, data):
3107
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
3108
          .append(status)
3109

    
3110
    # Add empty entries for diskless instances.
3111
    for inst_uuid in diskless_instances:
3112
      assert inst_uuid not in instdisk
3113
      instdisk[inst_uuid] = {}
3114
    # ...and disk-full instances that happen to have no disks
3115
    for inst_uuid in nodisk_instances:
3116
      assert inst_uuid not in instdisk
3117
      instdisk[inst_uuid] = {}
3118

    
3119
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
3120
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
3121
                      compat.all(isinstance(s, (tuple, list)) and
3122
                                 len(s) == 2 for s in statuses)
3123
                      for inst, nuuids in instdisk.items()
3124
                      for nuuid, statuses in nuuids.items())
3125
    if __debug__:
3126
      instdisk_keys = set(instdisk)
3127
      instanceinfo_keys = set(instanceinfo)
3128
      assert instdisk_keys == instanceinfo_keys, \
3129
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
3130
         (instdisk_keys, instanceinfo_keys))
3131

    
3132
    return instdisk
3133

    
3134
  @staticmethod
3135
  def _SshNodeSelector(group_uuid, all_nodes):
3136
    """Create endless iterators for all potential SSH check hosts.
3137

3138
    """
3139
    nodes = [node for node in all_nodes
3140
             if (node.group != group_uuid and
3141
                 not node.offline)]
3142
    keyfunc = operator.attrgetter("group")
3143

    
3144
    return map(itertools.cycle,
3145
               [sorted(map(operator.attrgetter("name"), names))
3146
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
3147
                                                  keyfunc)])
3148

    
3149
  @classmethod
3150
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
3151
    """Choose which nodes should talk to which other nodes.
3152

3153
    We will make nodes contact all nodes in their group, and one node from
3154
    every other group.
3155

3156
    @warning: This algorithm has a known issue if one node group is much
3157
      smaller than others (e.g. just one node). In such a case all other
3158
      nodes will talk to the single node.
3159

3160
    """
3161
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
3162
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
3163

    
3164
    return (online_nodes,
3165
            dict((name, sorted([i.next() for i in sel]))
3166
                 for name in online_nodes))
3167

    
3168
  def BuildHooksEnv(self):
3169
    """Build hooks env.
3170

3171
    Cluster-Verify hooks just ran in the post phase and their failure makes
3172
    the output be logged in the verify output and the verification to fail.
3173

3174
    """
3175
    env = {
3176
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
3177
      }
3178

    
3179
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
3180
               for node in self.my_node_info.values())
3181

    
3182
    return env
3183

    
3184
  def BuildHooksNodes(self):
3185
    """Build hooks nodes.
3186

3187
    """
3188
    return ([], list(self.my_node_info.keys()))
3189

    
3190
  def Exec(self, feedback_fn):
3191
    """Verify integrity of the node group, performing various test on nodes.
3192

3193
    """
3194
    # This method has too many local variables. pylint: disable=R0914
3195
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
3196

    
3197
    if not self.my_node_uuids:
3198
      # empty node group
3199
      feedback_fn("* Empty node group, skipping verification")
3200
      return True
3201

    
3202
    self.bad = False
3203
    verbose = self.op.verbose
3204
    self._feedback_fn = feedback_fn
3205

    
3206
    vg_name = self.cfg.GetVGName()
3207
    drbd_helper = self.cfg.GetDRBDHelper()
3208
    cluster = self.cfg.GetClusterInfo()
3209
    hypervisors = cluster.enabled_hypervisors
3210
    node_data_list = self.my_node_info.values()
3211

    
3212
    i_non_redundant = [] # Non redundant instances
3213
    i_non_a_balanced = [] # Non auto-balanced instances
3214
    i_offline = 0 # Count of offline instances
3215
    n_offline = 0 # Count of offline nodes
3216
    n_drained = 0 # Count of nodes being drained
3217
    node_vol_should = {}
3218

    
3219
    # FIXME: verify OS list
3220

    
3221
    # File verification
3222
    filemap = ComputeAncillaryFiles(cluster, False)
3223

    
3224
    # do local checksums
3225
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
3226
    master_ip = self.cfg.GetMasterIP()
3227

    
3228
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
3229

    
3230
    user_scripts = []
3231
    if self.cfg.GetUseExternalMipScript():
3232
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
3233

    
3234
    node_verify_param = {
3235
      constants.NV_FILELIST:
3236
        map(vcluster.MakeVirtualPath,
3237
            utils.UniqueSequence(filename
3238
                                 for files in filemap
3239
                                 for filename in files)),
3240
      constants.NV_NODELIST:
3241
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
3242
                                  self.all_node_info.values()),
3243
      constants.NV_HYPERVISOR: hypervisors,
3244
      constants.NV_HVPARAMS:
3245
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
3246
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
3247
                                 for node in node_data_list
3248
                                 if not node.offline],
3249
      constants.NV_INSTANCELIST: hypervisors,
3250
      constants.NV_VERSION: None,
3251
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
3252
      constants.NV_NODESETUP: None,
3253
      constants.NV_TIME: None,
3254
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
3255
      constants.NV_OSLIST: None,
3256
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
3257
      constants.NV_USERSCRIPTS: user_scripts,
3258
      constants.NV_CLIENT_CERT: None,
3259
      }
3260

    
3261
    if vg_name is not None:
3262
      node_verify_param[constants.NV_VGLIST] = None
3263
      node_verify_param[constants.NV_LVLIST] = vg_name
3264
      node_verify_param[constants.NV_PVLIST] = [vg_name]
3265

    
3266
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
3267
      if drbd_helper:
3268
        node_verify_param[constants.NV_DRBDVERSION] = None
3269
        node_verify_param[constants.NV_DRBDLIST] = None
3270
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
3271

    
3272
    if cluster.IsFileStorageEnabled() or \
3273
        cluster.IsSharedFileStorageEnabled():
3274
      # Load file storage paths only from master node
3275
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
3276
        self.cfg.GetMasterNodeName()
3277
      if cluster.IsFileStorageEnabled():
3278
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
3279
          cluster.file_storage_dir
3280

    
3281
    # bridge checks
3282
    # FIXME: this needs to be changed per node-group, not cluster-wide
3283
    bridges = set()
3284
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
3285
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3286
      bridges.add(default_nicpp[constants.NIC_LINK])
3287
    for inst_uuid in self.my_inst_info.values():
3288
      for nic in inst_uuid.nics:
3289
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
3290
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3291
          bridges.add(full_nic[constants.NIC_LINK])
3292

    
3293
    if bridges:
3294
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
3295

    
3296
    # Build our expected cluster state
3297
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
3298
                                                 uuid=node.uuid,
3299
                                                 vm_capable=node.vm_capable))
3300
                      for node in node_data_list)
3301

    
3302
    # Gather OOB paths
3303
    oob_paths = []
3304
    for node in self.all_node_info.values():
3305
      path = SupportsOob(self.cfg, node)
3306
      if path and path not in oob_paths:
3307
        oob_paths.append(path)
3308

    
3309
    if oob_paths:
3310
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
3311

    
3312
    for inst_uuid in self.my_inst_uuids:
3313
      instance = self.my_inst_info[inst_uuid]
3314
      if instance.admin_state == constants.ADMINST_OFFLINE:
3315
        i_offline += 1
3316

    
3317
      for nuuid in instance.all_nodes:
3318
        if nuuid not in node_image:
3319
          gnode = self.NodeImage(uuid=nuuid)
3320
          gnode.ghost = (nuuid not in self.all_node_info)
3321
          node_image[nuuid] = gnode
3322

    
3323
      instance.MapLVsByNode(node_vol_should)
3324

    
3325
      pnode = instance.primary_node
3326
      node_image[pnode].pinst.append(instance.uuid)
3327

    
3328
      for snode in instance.secondary_nodes:
3329
        nimg = node_image[snode]
3330
        nimg.sinst.append(instance.uuid)
3331
        if pnode not in nimg.sbp:
3332
          nimg.sbp[pnode] = []
3333
        nimg.sbp[pnode].append(instance.uuid)
3334

    
3335
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3336
                                               self.my_node_info.keys())
3337
    # The value of exclusive_storage should be the same across the group, so if
3338
    # it's True for at least a node, we act as if it were set for all the nodes
3339
    self._exclusive_storage = compat.any(es_flags.values())
3340
    if self._exclusive_storage:
3341
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3342

    
3343
    node_group_uuids = dict(map(lambda n: (n.name, n.group),
3344
                                self.cfg.GetAllNodesInfo().values()))
3345
    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
3346

    
3347
    # At this point, we have the in-memory data structures complete,
3348
    # except for the runtime information, which we'll gather next
3349

    
3350
    # Due to the way our RPC system works, exact response times cannot be
3351
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3352
    # time before and after executing the request, we can at least have a time
3353
    # window.
3354
    nvinfo_starttime = time.time()
3355
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3356
                                           node_verify_param,
3357
                                           self.cfg.GetClusterName(),
3358
                                           self.cfg.GetClusterInfo().hvparams,
3359
                                           node_group_uuids,
3360
                                           groups_config)
3361
    nvinfo_endtime = time.time()
3362

    
3363
    if self.extra_lv_nodes and vg_name is not None:
3364
      extra_lv_nvinfo = \
3365
          self.rpc.call_node_verify(self.extra_lv_nodes,
3366
                                    {constants.NV_LVLIST: vg_name},
3367
                                    self.cfg.GetClusterName(),
3368
                                    self.cfg.GetClusterInfo().hvparams,
3369
                                    node_group_uuids,
3370
                                    groups_config)
3371
    else:
3372
      extra_lv_nvinfo = {}
3373

    
3374
    all_drbd_map = self.cfg.ComputeDRBDMap()
3375

    
3376
    feedback_fn("* Gathering disk information (%s nodes)" %
3377
                len(self.my_node_uuids))
3378
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3379
                                     self.my_inst_info)
3380

    
3381
    feedback_fn("* Verifying configuration file consistency")
3382

    
3383
    # If not all nodes are being checked, we need to make sure the master node
3384
    # and a non-checked vm_capable node are in the list.
3385
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3386
    if absent_node_uuids:
3387
      vf_nvinfo = all_nvinfo.copy()
3388
      vf_node_info = list(self.my_node_info.values())
3389
      additional_node_uuids = []
3390
      if master_node_uuid not in self.my_node_info:
3391
        additional_node_uuids.append(master_node_uuid)
3392
        vf_node_info.append(self.all_node_info[master_node_uuid])
3393
      # Add the first vm_capable node we find which is not included,
3394
      # excluding the master node (which we already have)
3395
      for node_uuid in absent_node_uuids:
3396
        nodeinfo = self.all_node_info[node_uuid]
3397
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3398
            node_uuid != master_node_uuid):
3399
          additional_node_uuids.append(node_uuid)
3400
          vf_node_info.append(self.all_node_info[node_uuid])
3401
          break
3402
      key = constants.NV_FILELIST
3403
      vf_nvinfo.update(self.rpc.call_node_verify(
3404
         additional_node_uuids, {key: node_verify_param[key]},
3405
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
3406
         node_group_uuids,
3407
         groups_config))
3408
    else:
3409
      vf_nvinfo = all_nvinfo
3410
      vf_node_info = self.my_node_info.values()
3411

    
3412
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3413

    
3414
    feedback_fn("* Verifying node status")
3415

    
3416
    refos_img = None
3417

    
3418
    for node_i in node_data_list:
3419
      nimg = node_image[node_i.uuid]
3420

    
3421
      if node_i.offline:
3422
        if verbose:
3423
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3424
        n_offline += 1
3425
        continue
3426

    
3427
      if node_i.uuid == master_node_uuid:
3428
        ntype = "master"
3429
      elif node_i.master_candidate:
3430
        ntype = "master candidate"
3431
      elif node_i.drained:
3432
        ntype = "drained"
3433
        n_drained += 1
3434
      else:
3435
        ntype = "regular"
3436
      if verbose:
3437
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3438

    
3439
      msg = all_nvinfo[node_i.uuid].fail_msg
3440
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3441
                    "while contacting node: %s", msg)
3442
      if msg:
3443
        nimg.rpc_fail = True
3444
        continue
3445

    
3446
      nresult = all_nvinfo[node_i.uuid].payload
3447

    
3448
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3449
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3450
      self._VerifyNodeNetwork(node_i, nresult)
3451
      self._VerifyNodeUserScripts(node_i, nresult)
3452
      self._VerifyOob(node_i, nresult)
3453
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3454
                                           node_i.uuid == master_node_uuid)
3455
      self._VerifyFileStoragePaths(node_i, nresult)
3456
      self._VerifySharedFileStoragePaths(node_i, nresult)
3457

    
3458
      if nimg.vm_capable:
3459
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3460
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3461
                             all_drbd_map)
3462

    
3463
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3464
        self._UpdateNodeInstances(node_i, nresult, nimg)
3465
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3466
        self._UpdateNodeOS(node_i, nresult, nimg)
3467

    
3468
        if not nimg.os_fail:
3469
          if refos_img is None:
3470
            refos_img = nimg
3471
          self._VerifyNodeOS(node_i, nimg, refos_img)
3472
        self._VerifyNodeBridges(node_i, nresult, bridges)
3473

    
3474
        # Check whether all running instances are primary for the node. (This
3475
        # can no longer be done from _VerifyInstance below, since some of the
3476
        # wrong instances could be from other node groups.)
3477
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3478

    
3479
        for inst_uuid in non_primary_inst_uuids:
3480
          test = inst_uuid in self.all_inst_info
3481
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3482
                        self.cfg.GetInstanceName(inst_uuid),
3483
                        "instance should not run on node %s", node_i.name)
3484
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3485
                        "node is running unknown instance %s", inst_uuid)
3486

    
3487
    self._VerifyGroupDRBDVersion(all_nvinfo)
3488
    self._VerifyGroupLVM(node_image, vg_name)
3489

    
3490
    for node_uuid, result in extra_lv_nvinfo.items():
3491
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3492
                              node_image[node_uuid], vg_name)
3493

    
3494
    feedback_fn("* Verifying instance status")
3495
    for inst_uuid in self.my_inst_uuids:
3496
      instance = self.my_inst_info[inst_uuid]
3497
      if verbose:
3498
        feedback_fn("* Verifying instance %s" % instance.name)
3499
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3500

    
3501
      # If the instance is non-redundant we cannot survive losing its primary
3502
      # node, so we are not N+1 compliant.
3503
      if instance.disk_template not in constants.DTS_MIRRORED:
3504
        i_non_redundant.append(instance)
3505

    
3506
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3507
        i_non_a_balanced.append(instance)
3508

    
3509
    feedback_fn("* Verifying orphan volumes")
3510
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3511

    
3512
    # We will get spurious "unknown volume" warnings if any node of this group
3513
    # is secondary for an instance whose primary is in another group. To avoid
3514
    # them, we find these instances and add their volumes to node_vol_should.
3515
    for instance in self.all_inst_info.values():
3516
      for secondary in instance.secondary_nodes:
3517
        if (secondary in self.my_node_info
3518
            and instance.name not in self.my_inst_info):
3519
          instance.MapLVsByNode(node_vol_should)
3520
          break
3521

    
3522
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3523

    
3524
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3525
      feedback_fn("* Verifying N+1 Memory redundancy")
3526
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3527

    
3528
    feedback_fn("* Other Notes")
3529
    if i_non_redundant:
3530
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3531
                  % len(i_non_redundant))
3532

    
3533
    if i_non_a_balanced:
3534
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3535
                  % len(i_non_a_balanced))
3536

    
3537
    if i_offline:
3538
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3539

    
3540
    if n_offline:
3541
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3542

    
3543
    if n_drained:
3544
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3545

    
3546
    return not self.bad
3547

    
3548
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3549
    """Analyze the post-hooks' result
3550

3551
    This method analyses the hook result, handles it, and sends some
3552
    nicely-formatted feedback back to the user.
3553

3554
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3555
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3556
    @param hooks_results: the results of the multi-node hooks rpc call
3557
    @param feedback_fn: function used send feedback back to the caller
3558
    @param lu_result: previous Exec result
3559
    @return: the new Exec result, based on the previous result
3560
        and hook results
3561

3562
    """
3563
    # We only really run POST phase hooks, only for non-empty groups,
3564
    # and are only interested in their results
3565
    if not self.my_node_uuids:
3566
      # empty node group
3567
      pass
3568
    elif phase == constants.HOOKS_PHASE_POST:
3569
      # Used to change hooks' output to proper indentation
3570
      feedback_fn("* Hooks Results")
3571
      assert hooks_results, "invalid result from hooks"
3572

    
3573
      for node_name in hooks_results:
3574
        res = hooks_results[node_name]
3575
        msg = res.fail_msg
3576
        test = msg and not res.offline
3577
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3578
                      "Communication failure in hooks execution: %s", msg)
3579
        if res.offline or msg:
3580
          # No need to investigate payload if node is offline or gave
3581
          # an error.
3582
          continue
3583
        for script, hkr, output in res.payload:
3584
          test = hkr == constants.HKR_FAIL
3585
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3586
                        "Script %s failed, output:", script)
3587
          if test:
3588
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3589
            feedback_fn("%s" % output)
3590
            lu_result = False
3591

    
3592
    return lu_result
3593

    
3594

    
3595
class LUClusterVerifyDisks(NoHooksLU):
3596
  """Verifies the cluster disks status.
3597

3598
  """
3599
  REQ_BGL = False
3600

    
3601
  def ExpandNames(self):
3602
    self.share_locks = ShareAll()
3603
    self.needed_locks = {
3604
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3605
      }
3606

    
3607
  def Exec(self, feedback_fn):
3608
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3609

    
3610
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3611
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3612
                           for group in group_names])