Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 0cffcdb1

History | View | Annotate | Download (136.2 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import copy
25
import itertools
26
import logging
27
import operator
28
import os
29
import re
30
import time
31

    
32
from ganeti import compat
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import hypervisor
36
from ganeti import locking
37
from ganeti import masterd
38
from ganeti import netutils
39
from ganeti import objects
40
from ganeti import opcodes
41
from ganeti import pathutils
42
from ganeti import query
43
import ganeti.rpc.node as rpc
44
from ganeti import runtime
45
from ganeti import ssh
46
from ganeti import uidpool
47
from ganeti import utils
48
from ganeti import vcluster
49

    
50
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
51
  ResultWithJobs
52
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
53
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
54
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
55
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
56
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
57
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
58
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
59
  CheckDiskAccessModeConsistency, CreateNewClientCert, \
60
  AddInstanceCommunicationNetworkOp, ConnectInstanceCommunicationNetworkOp
61

    
62
import ganeti.masterd.instance
63

    
64

    
65
def _UpdateMasterClientCert(
66
    lu, master_uuid, cluster, feedback_fn,
67
    client_cert=pathutils.NODED_CLIENT_CERT_FILE,
68
    client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP):
69
  """Renews the master's client certificate and propagates the config.
70

71
  @type lu: C{LogicalUnit}
72
  @param lu: the logical unit holding the config
73
  @type master_uuid: string
74
  @param master_uuid: the master node's UUID
75
  @type cluster: C{objects.Cluster}
76
  @param cluster: the cluster's configuration
77
  @type feedback_fn: function
78
  @param feedback_fn: feedback functions for config updates
79
  @type client_cert: string
80
  @param client_cert: the path of the client certificate
81
  @type client_cert_tmp: string
82
  @param client_cert_tmp: the temporary path of the client certificate
83
  @rtype: string
84
  @return: the digest of the newly created client certificate
85

86
  """
87
  client_digest = CreateNewClientCert(lu, master_uuid, filename=client_cert_tmp)
88
  utils.AddNodeToCandidateCerts(master_uuid, client_digest,
89
                                cluster.candidate_certs)
90
  # This triggers an update of the config and distribution of it with the old
91
  # SSL certificate
92
  lu.cfg.Update(cluster, feedback_fn)
93

    
94
  utils.RemoveFile(client_cert)
95
  utils.RenameFile(client_cert_tmp, client_cert)
96
  return client_digest
97

    
98

    
99
class LUClusterRenewCrypto(NoHooksLU):
100
  """Renew the cluster's crypto tokens.
101

102
  Note that most of this operation is done in gnt_cluster.py, this LU only
103
  takes care of the renewal of the client SSL certificates.
104

105
  """
106
  def Exec(self, feedback_fn):
107
    master_uuid = self.cfg.GetMasterNode()
108
    cluster = self.cfg.GetClusterInfo()
109

    
110
    server_digest = utils.GetCertificateDigest(
111
      cert_filename=pathutils.NODED_CERT_FILE)
112
    old_master_digest = utils.GetCertificateDigest(
113
      cert_filename=pathutils.NODED_CLIENT_CERT_FILE)
114
    utils.AddNodeToCandidateCerts("%s-SERVER" % master_uuid,
115
                                  server_digest,
116
                                  cluster.candidate_certs)
117
    utils.AddNodeToCandidateCerts("%s-OLDMASTER" % master_uuid,
118
                                  old_master_digest,
119
                                  cluster.candidate_certs)
120
    new_master_digest = _UpdateMasterClientCert(self, master_uuid, cluster,
121
                                                feedback_fn)
122

    
123
    utils.AddNodeToCandidateCerts(master_uuid,
124
                                  new_master_digest,
125
                                  cluster.candidate_certs)
126
    nodes = self.cfg.GetAllNodesInfo()
127
    for (node_uuid, node_info) in nodes.items():
128
      if node_uuid != master_uuid:
129
        new_digest = CreateNewClientCert(self, node_uuid)
130
        if node_info.master_candidate:
131
          utils.AddNodeToCandidateCerts(node_uuid,
132
                                        new_digest,
133
                                        cluster.candidate_certs)
134
    utils.RemoveNodeFromCandidateCerts("%s-SERVER" % master_uuid,
135
                                       cluster.candidate_certs)
136
    utils.RemoveNodeFromCandidateCerts("%s-OLDMASTER" % master_uuid,
137
                                       cluster.candidate_certs)
138
    # Trigger another update of the config now with the new master cert
139
    self.cfg.Update(cluster, feedback_fn)
140

    
141

    
142
class LUClusterActivateMasterIp(NoHooksLU):
143
  """Activate the master IP on the master node.
144

145
  """
146
  def Exec(self, feedback_fn):
147
    """Activate the master IP.
148

149
    """
150
    master_params = self.cfg.GetMasterNetworkParameters()
151
    ems = self.cfg.GetUseExternalMipScript()
152
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
153
                                                   master_params, ems)
154
    result.Raise("Could not activate the master IP")
155

    
156

    
157
class LUClusterDeactivateMasterIp(NoHooksLU):
158
  """Deactivate the master IP on the master node.
159

160
  """
161
  def Exec(self, feedback_fn):
162
    """Deactivate the master IP.
163

164
    """
165
    master_params = self.cfg.GetMasterNetworkParameters()
166
    ems = self.cfg.GetUseExternalMipScript()
167
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
168
                                                     master_params, ems)
169
    result.Raise("Could not deactivate the master IP")
170

    
171

    
172
class LUClusterConfigQuery(NoHooksLU):
173
  """Return configuration values.
174

175
  """
176
  REQ_BGL = False
177

    
178
  def CheckArguments(self):
179
    self.cq = ClusterQuery(None, self.op.output_fields, False)
180

    
181
  def ExpandNames(self):
182
    self.cq.ExpandNames(self)
183

    
184
  def DeclareLocks(self, level):
185
    self.cq.DeclareLocks(self, level)
186

    
187
  def Exec(self, feedback_fn):
188
    result = self.cq.OldStyleQuery(self)
189

    
190
    assert len(result) == 1
191

    
192
    return result[0]
193

    
194

    
195
class LUClusterDestroy(LogicalUnit):
196
  """Logical unit for destroying the cluster.
197

198
  """
199
  HPATH = "cluster-destroy"
200
  HTYPE = constants.HTYPE_CLUSTER
201

    
202
  def BuildHooksEnv(self):
203
    """Build hooks env.
204

205
    """
206
    return {
207
      "OP_TARGET": self.cfg.GetClusterName(),
208
      }
209

    
210
  def BuildHooksNodes(self):
211
    """Build hooks nodes.
212

213
    """
214
    return ([], [])
215

    
216
  def CheckPrereq(self):
217
    """Check prerequisites.
218

219
    This checks whether the cluster is empty.
220

221
    Any errors are signaled by raising errors.OpPrereqError.
222

223
    """
224
    master = self.cfg.GetMasterNode()
225

    
226
    nodelist = self.cfg.GetNodeList()
227
    if len(nodelist) != 1 or nodelist[0] != master:
228
      raise errors.OpPrereqError("There are still %d node(s) in"
229
                                 " this cluster." % (len(nodelist) - 1),
230
                                 errors.ECODE_INVAL)
231
    instancelist = self.cfg.GetInstanceList()
232
    if instancelist:
233
      raise errors.OpPrereqError("There are still %d instance(s) in"
234
                                 " this cluster." % len(instancelist),
235
                                 errors.ECODE_INVAL)
236

    
237
  def Exec(self, feedback_fn):
238
    """Destroys the cluster.
239

240
    """
241
    master_params = self.cfg.GetMasterNetworkParameters()
242

    
243
    # Run post hooks on master node before it's removed
244
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
245

    
246
    ems = self.cfg.GetUseExternalMipScript()
247
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
248
                                                     master_params, ems)
249
    result.Warn("Error disabling the master IP address", self.LogWarning)
250
    return master_params.uuid
251

    
252

    
253
class LUClusterPostInit(LogicalUnit):
254
  """Logical unit for running hooks after cluster initialization.
255

256
  """
257
  HPATH = "cluster-init"
258
  HTYPE = constants.HTYPE_CLUSTER
259

    
260
  def CheckArguments(self):
261
    self.master_uuid = self.cfg.GetMasterNode()
262
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
263

    
264
    # TODO: When Issue 584 is solved, and None is properly parsed when used
265
    # as a default value, ndparams.get(.., None) can be changed to
266
    # ndparams[..] to access the values directly
267

    
268
    # OpenvSwitch: Warn user if link is missing
269
    if (self.master_ndparams[constants.ND_OVS] and not
270
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
271
      self.LogInfo("No physical interface for OpenvSwitch was given."
272
                   " OpenvSwitch will not have an outside connection. This"
273
                   " might not be what you want.")
274

    
275
  def BuildHooksEnv(self):
276
    """Build hooks env.
277

278
    """
279
    return {
280
      "OP_TARGET": self.cfg.GetClusterName(),
281
      }
282

    
283
  def BuildHooksNodes(self):
284
    """Build hooks nodes.
285

286
    """
287
    return ([], [self.cfg.GetMasterNode()])
288

    
289
  def Exec(self, feedback_fn):
290
    """Create and configure Open vSwitch
291

292
    """
293
    if self.master_ndparams[constants.ND_OVS]:
294
      result = self.rpc.call_node_configure_ovs(
295
                 self.master_uuid,
296
                 self.master_ndparams[constants.ND_OVS_NAME],
297
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
298
      result.Raise("Could not successully configure Open vSwitch")
299

    
300
    cluster = self.cfg.GetClusterInfo()
301
    _UpdateMasterClientCert(self, self.master_uuid, cluster, feedback_fn)
302

    
303
    return True
304

    
305

    
306
class ClusterQuery(QueryBase):
307
  FIELDS = query.CLUSTER_FIELDS
308

    
309
  #: Do not sort (there is only one item)
310
  SORT_FIELD = None
311

    
312
  def ExpandNames(self, lu):
313
    lu.needed_locks = {}
314

    
315
    # The following variables interact with _QueryBase._GetNames
316
    self.wanted = locking.ALL_SET
317
    self.do_locking = self.use_locking
318

    
319
    if self.do_locking:
320
      raise errors.OpPrereqError("Can not use locking for cluster queries",
321
                                 errors.ECODE_INVAL)
322

    
323
  def DeclareLocks(self, lu, level):
324
    pass
325

    
326
  def _GetQueryData(self, lu):
327
    """Computes the list of nodes and their attributes.
328

329
    """
330
    # Locking is not used
331
    assert not (compat.any(lu.glm.is_owned(level)
332
                           for level in locking.LEVELS
333
                           if level != locking.LEVEL_CLUSTER) or
334
                self.do_locking or self.use_locking)
335

    
336
    if query.CQ_CONFIG in self.requested_data:
337
      cluster = lu.cfg.GetClusterInfo()
338
      nodes = lu.cfg.GetAllNodesInfo()
339
    else:
340
      cluster = NotImplemented
341
      nodes = NotImplemented
342

    
343
    if query.CQ_QUEUE_DRAINED in self.requested_data:
344
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
345
    else:
346
      drain_flag = NotImplemented
347

    
348
    if query.CQ_WATCHER_PAUSE in self.requested_data:
349
      master_node_uuid = lu.cfg.GetMasterNode()
350

    
351
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
352
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
353
                   lu.cfg.GetMasterNodeName())
354

    
355
      watcher_pause = result.payload
356
    else:
357
      watcher_pause = NotImplemented
358

    
359
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
360

    
361

    
362
class LUClusterQuery(NoHooksLU):
363
  """Query cluster configuration.
364

365
  """
366
  REQ_BGL = False
367

    
368
  def ExpandNames(self):
369
    self.needed_locks = {}
370

    
371
  def Exec(self, feedback_fn):
372
    """Return cluster config.
373

374
    """
375
    cluster = self.cfg.GetClusterInfo()
376
    os_hvp = {}
377

    
378
    # Filter just for enabled hypervisors
379
    for os_name, hv_dict in cluster.os_hvp.items():
380
      os_hvp[os_name] = {}
381
      for hv_name, hv_params in hv_dict.items():
382
        if hv_name in cluster.enabled_hypervisors:
383
          os_hvp[os_name][hv_name] = hv_params
384

    
385
    # Convert ip_family to ip_version
386
    primary_ip_version = constants.IP4_VERSION
387
    if cluster.primary_ip_family == netutils.IP6Address.family:
388
      primary_ip_version = constants.IP6_VERSION
389

    
390
    result = {
391
      "software_version": constants.RELEASE_VERSION,
392
      "protocol_version": constants.PROTOCOL_VERSION,
393
      "config_version": constants.CONFIG_VERSION,
394
      "os_api_version": max(constants.OS_API_VERSIONS),
395
      "export_version": constants.EXPORT_VERSION,
396
      "vcs_version": constants.VCS_VERSION,
397
      "architecture": runtime.GetArchInfo(),
398
      "name": cluster.cluster_name,
399
      "master": self.cfg.GetMasterNodeName(),
400
      "default_hypervisor": cluster.primary_hypervisor,
401
      "enabled_hypervisors": cluster.enabled_hypervisors,
402
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
403
                        for hypervisor_name in cluster.enabled_hypervisors]),
404
      "os_hvp": os_hvp,
405
      "beparams": cluster.beparams,
406
      "osparams": cluster.osparams,
407
      "ipolicy": cluster.ipolicy,
408
      "nicparams": cluster.nicparams,
409
      "ndparams": cluster.ndparams,
410
      "diskparams": cluster.diskparams,
411
      "candidate_pool_size": cluster.candidate_pool_size,
412
      "max_running_jobs": cluster.max_running_jobs,
413
      "mac_prefix": cluster.mac_prefix,
414
      "master_netdev": cluster.master_netdev,
415
      "master_netmask": cluster.master_netmask,
416
      "use_external_mip_script": cluster.use_external_mip_script,
417
      "volume_group_name": cluster.volume_group_name,
418
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
419
      "file_storage_dir": cluster.file_storage_dir,
420
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
421
      "maintain_node_health": cluster.maintain_node_health,
422
      "ctime": cluster.ctime,
423
      "mtime": cluster.mtime,
424
      "uuid": cluster.uuid,
425
      "tags": list(cluster.GetTags()),
426
      "uid_pool": cluster.uid_pool,
427
      "default_iallocator": cluster.default_iallocator,
428
      "default_iallocator_params": cluster.default_iallocator_params,
429
      "reserved_lvs": cluster.reserved_lvs,
430
      "primary_ip_version": primary_ip_version,
431
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
432
      "hidden_os": cluster.hidden_os,
433
      "blacklisted_os": cluster.blacklisted_os,
434
      "enabled_disk_templates": cluster.enabled_disk_templates,
435
      "instance_communication_network": cluster.instance_communication_network,
436
      }
437

    
438
    return result
439

    
440

    
441
class LUClusterRedistConf(NoHooksLU):
442
  """Force the redistribution of cluster configuration.
443

444
  This is a very simple LU.
445

446
  """
447
  REQ_BGL = False
448

    
449
  def ExpandNames(self):
450
    self.needed_locks = {
451
      locking.LEVEL_NODE: locking.ALL_SET,
452
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
453
    }
454
    self.share_locks = ShareAll()
455

    
456
  def Exec(self, feedback_fn):
457
    """Redistribute the configuration.
458

459
    """
460
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
461
    RedistributeAncillaryFiles(self)
462

    
463

    
464
class LUClusterRename(LogicalUnit):
465
  """Rename the cluster.
466

467
  """
468
  HPATH = "cluster-rename"
469
  HTYPE = constants.HTYPE_CLUSTER
470

    
471
  def BuildHooksEnv(self):
472
    """Build hooks env.
473

474
    """
475
    return {
476
      "OP_TARGET": self.cfg.GetClusterName(),
477
      "NEW_NAME": self.op.name,
478
      }
479

    
480
  def BuildHooksNodes(self):
481
    """Build hooks nodes.
482

483
    """
484
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
485

    
486
  def CheckPrereq(self):
487
    """Verify that the passed name is a valid one.
488

489
    """
490
    hostname = netutils.GetHostname(name=self.op.name,
491
                                    family=self.cfg.GetPrimaryIPFamily())
492

    
493
    new_name = hostname.name
494
    self.ip = new_ip = hostname.ip
495
    old_name = self.cfg.GetClusterName()
496
    old_ip = self.cfg.GetMasterIP()
497
    if new_name == old_name and new_ip == old_ip:
498
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
499
                                 " cluster has changed",
500
                                 errors.ECODE_INVAL)
501
    if new_ip != old_ip:
502
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
503
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
504
                                   " reachable on the network" %
505
                                   new_ip, errors.ECODE_NOTUNIQUE)
506

    
507
    self.op.name = new_name
508

    
509
  def Exec(self, feedback_fn):
510
    """Rename the cluster.
511

512
    """
513
    clustername = self.op.name
514
    new_ip = self.ip
515

    
516
    # shutdown the master IP
517
    master_params = self.cfg.GetMasterNetworkParameters()
518
    ems = self.cfg.GetUseExternalMipScript()
519
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
520
                                                     master_params, ems)
521
    result.Raise("Could not disable the master role")
522

    
523
    try:
524
      cluster = self.cfg.GetClusterInfo()
525
      cluster.cluster_name = clustername
526
      cluster.master_ip = new_ip
527
      self.cfg.Update(cluster, feedback_fn)
528

    
529
      # update the known hosts file
530
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
531
      node_list = self.cfg.GetOnlineNodeList()
532
      try:
533
        node_list.remove(master_params.uuid)
534
      except ValueError:
535
        pass
536
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
537
    finally:
538
      master_params.ip = new_ip
539
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
540
                                                     master_params, ems)
541
      result.Warn("Could not re-enable the master role on the master,"
542
                  " please restart manually", self.LogWarning)
543

    
544
    return clustername
545

    
546

    
547
class LUClusterRepairDiskSizes(NoHooksLU):
548
  """Verifies the cluster disks sizes.
549

550
  """
551
  REQ_BGL = False
552

    
553
  def ExpandNames(self):
554
    if self.op.instances:
555
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
556
      # Not getting the node allocation lock as only a specific set of
557
      # instances (and their nodes) is going to be acquired
558
      self.needed_locks = {
559
        locking.LEVEL_NODE_RES: [],
560
        locking.LEVEL_INSTANCE: self.wanted_names,
561
        }
562
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
563
    else:
564
      self.wanted_names = None
565
      self.needed_locks = {
566
        locking.LEVEL_NODE_RES: locking.ALL_SET,
567
        locking.LEVEL_INSTANCE: locking.ALL_SET,
568

    
569
        # This opcode is acquires the node locks for all instances
570
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
571
        }
572

    
573
    self.share_locks = {
574
      locking.LEVEL_NODE_RES: 1,
575
      locking.LEVEL_INSTANCE: 0,
576
      locking.LEVEL_NODE_ALLOC: 1,
577
      }
578

    
579
  def DeclareLocks(self, level):
580
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
581
      self._LockInstancesNodes(primary_only=True, level=level)
582

    
583
  def CheckPrereq(self):
584
    """Check prerequisites.
585

586
    This only checks the optional instance list against the existing names.
587

588
    """
589
    if self.wanted_names is None:
590
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
591

    
592
    self.wanted_instances = \
593
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
594

    
595
  def _EnsureChildSizes(self, disk):
596
    """Ensure children of the disk have the needed disk size.
597

598
    This is valid mainly for DRBD8 and fixes an issue where the
599
    children have smaller disk size.
600

601
    @param disk: an L{ganeti.objects.Disk} object
602

603
    """
604
    if disk.dev_type == constants.DT_DRBD8:
605
      assert disk.children, "Empty children for DRBD8?"
606
      fchild = disk.children[0]
607
      mismatch = fchild.size < disk.size
608
      if mismatch:
609
        self.LogInfo("Child disk has size %d, parent %d, fixing",
610
                     fchild.size, disk.size)
611
        fchild.size = disk.size
612

    
613
      # and we recurse on this child only, not on the metadev
614
      return self._EnsureChildSizes(fchild) or mismatch
615
    else:
616
      return False
617

    
618
  def Exec(self, feedback_fn):
619
    """Verify the size of cluster disks.
620

621
    """
622
    # TODO: check child disks too
623
    # TODO: check differences in size between primary/secondary nodes
624
    per_node_disks = {}
625
    for instance in self.wanted_instances:
626
      pnode = instance.primary_node
627
      if pnode not in per_node_disks:
628
        per_node_disks[pnode] = []
629
      for idx, disk in enumerate(instance.disks):
630
        per_node_disks[pnode].append((instance, idx, disk))
631

    
632
    assert not (frozenset(per_node_disks.keys()) -
633
                self.owned_locks(locking.LEVEL_NODE_RES)), \
634
      "Not owning correct locks"
635
    assert not self.owned_locks(locking.LEVEL_NODE)
636

    
637
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
638
                                               per_node_disks.keys())
639

    
640
    changed = []
641
    for node_uuid, dskl in per_node_disks.items():
642
      if not dskl:
643
        # no disks on the node
644
        continue
645

    
646
      newl = [([v[2].Copy()], v[0]) for v in dskl]
647
      node_name = self.cfg.GetNodeName(node_uuid)
648
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
649
      if result.fail_msg:
650
        self.LogWarning("Failure in blockdev_getdimensions call to node"
651
                        " %s, ignoring", node_name)
652
        continue
653
      if len(result.payload) != len(dskl):
654
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
655
                        " result.payload=%s", node_name, len(dskl),
656
                        result.payload)
657
        self.LogWarning("Invalid result from node %s, ignoring node results",
658
                        node_name)
659
        continue
660
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
661
        if dimensions is None:
662
          self.LogWarning("Disk %d of instance %s did not return size"
663
                          " information, ignoring", idx, instance.name)
664
          continue
665
        if not isinstance(dimensions, (tuple, list)):
666
          self.LogWarning("Disk %d of instance %s did not return valid"
667
                          " dimension information, ignoring", idx,
668
                          instance.name)
669
          continue
670
        (size, spindles) = dimensions
671
        if not isinstance(size, (int, long)):
672
          self.LogWarning("Disk %d of instance %s did not return valid"
673
                          " size information, ignoring", idx, instance.name)
674
          continue
675
        size = size >> 20
676
        if size != disk.size:
677
          self.LogInfo("Disk %d of instance %s has mismatched size,"
678
                       " correcting: recorded %d, actual %d", idx,
679
                       instance.name, disk.size, size)
680
          disk.size = size
681
          self.cfg.Update(instance, feedback_fn)
682
          changed.append((instance.name, idx, "size", size))
683
        if es_flags[node_uuid]:
684
          if spindles is None:
685
            self.LogWarning("Disk %d of instance %s did not return valid"
686
                            " spindles information, ignoring", idx,
687
                            instance.name)
688
          elif disk.spindles is None or disk.spindles != spindles:
689
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
690
                         " correcting: recorded %s, actual %s",
691
                         idx, instance.name, disk.spindles, spindles)
692
            disk.spindles = spindles
693
            self.cfg.Update(instance, feedback_fn)
694
            changed.append((instance.name, idx, "spindles", disk.spindles))
695
        if self._EnsureChildSizes(disk):
696
          self.cfg.Update(instance, feedback_fn)
697
          changed.append((instance.name, idx, "size", disk.size))
698
    return changed
699

    
700

    
701
def _ValidateNetmask(cfg, netmask):
702
  """Checks if a netmask is valid.
703

704
  @type cfg: L{config.ConfigWriter}
705
  @param cfg: cluster configuration
706
  @type netmask: int
707
  @param netmask: netmask to be verified
708
  @raise errors.OpPrereqError: if the validation fails
709

710
  """
711
  ip_family = cfg.GetPrimaryIPFamily()
712
  try:
713
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
714
  except errors.ProgrammerError:
715
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
716
                               ip_family, errors.ECODE_INVAL)
717
  if not ipcls.ValidateNetmask(netmask):
718
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
719
                               (netmask), errors.ECODE_INVAL)
720

    
721

    
722
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
723
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
724
    file_disk_template):
725
  """Checks whether the given file-based storage directory is acceptable.
726

727
  Note: This function is public, because it is also used in bootstrap.py.
728

729
  @type logging_warn_fn: function
730
  @param logging_warn_fn: function which accepts a string and logs it
731
  @type file_storage_dir: string
732
  @param file_storage_dir: the directory to be used for file-based instances
733
  @type enabled_disk_templates: list of string
734
  @param enabled_disk_templates: the list of enabled disk templates
735
  @type file_disk_template: string
736
  @param file_disk_template: the file-based disk template for which the
737
      path should be checked
738

739
  """
740
  assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
741
            constants.ST_FILE, constants.ST_SHARED_FILE
742
         ))
743
  file_storage_enabled = file_disk_template in enabled_disk_templates
744
  if file_storage_dir is not None:
745
    if file_storage_dir == "":
746
      if file_storage_enabled:
747
        raise errors.OpPrereqError(
748
            "Unsetting the '%s' storage directory while having '%s' storage"
749
            " enabled is not permitted." %
750
            (file_disk_template, file_disk_template))
751
    else:
752
      if not file_storage_enabled:
753
        logging_warn_fn(
754
            "Specified a %s storage directory, although %s storage is not"
755
            " enabled." % (file_disk_template, file_disk_template))
756
  else:
757
    raise errors.ProgrammerError("Received %s storage dir with value"
758
                                 " 'None'." % file_disk_template)
759

    
760

    
761
def CheckFileStoragePathVsEnabledDiskTemplates(
762
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
763
  """Checks whether the given file storage directory is acceptable.
764

765
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
766

767
  """
768
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
769
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
770
      constants.DT_FILE)
771

    
772

    
773
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
774
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
775
  """Checks whether the given shared file storage directory is acceptable.
776

777
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
778

779
  """
780
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
781
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
782
      constants.DT_SHARED_FILE)
783

    
784

    
785
class LUClusterSetParams(LogicalUnit):
786
  """Change the parameters of the cluster.
787

788
  """
789
  HPATH = "cluster-modify"
790
  HTYPE = constants.HTYPE_CLUSTER
791
  REQ_BGL = False
792

    
793
  def CheckArguments(self):
794
    """Check parameters
795

796
    """
797
    if self.op.uid_pool:
798
      uidpool.CheckUidPool(self.op.uid_pool)
799

    
800
    if self.op.add_uids:
801
      uidpool.CheckUidPool(self.op.add_uids)
802

    
803
    if self.op.remove_uids:
804
      uidpool.CheckUidPool(self.op.remove_uids)
805

    
806
    if self.op.mac_prefix:
807
      self.op.mac_prefix = \
808
          utils.NormalizeAndValidateThreeOctetMacPrefix(self.op.mac_prefix)
809

    
810
    if self.op.master_netmask is not None:
811
      _ValidateNetmask(self.cfg, self.op.master_netmask)
812

    
813
    if self.op.diskparams:
814
      for dt_params in self.op.diskparams.values():
815
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
816
      try:
817
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
818
        CheckDiskAccessModeValidity(self.op.diskparams)
819
      except errors.OpPrereqError, err:
820
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
821
                                   errors.ECODE_INVAL)
822

    
823
  def ExpandNames(self):
824
    # FIXME: in the future maybe other cluster params won't require checking on
825
    # all nodes to be modified.
826
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
827
    # resource locks the right thing, shouldn't it be the BGL instead?
828
    self.needed_locks = {
829
      locking.LEVEL_NODE: locking.ALL_SET,
830
      locking.LEVEL_INSTANCE: locking.ALL_SET,
831
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
832
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
833
    }
834
    self.share_locks = ShareAll()
835

    
836
  def BuildHooksEnv(self):
837
    """Build hooks env.
838

839
    """
840
    return {
841
      "OP_TARGET": self.cfg.GetClusterName(),
842
      "NEW_VG_NAME": self.op.vg_name,
843
      }
844

    
845
  def BuildHooksNodes(self):
846
    """Build hooks nodes.
847

848
    """
849
    mn = self.cfg.GetMasterNode()
850
    return ([mn], [mn])
851

    
852
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
853
                   new_enabled_disk_templates):
854
    """Check the consistency of the vg name on all nodes and in case it gets
855
       unset whether there are instances still using it.
856

857
    """
858
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
859
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
860
                                            new_enabled_disk_templates)
861
    current_vg_name = self.cfg.GetVGName()
862

    
863
    if self.op.vg_name == '':
864
      if lvm_is_enabled:
865
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
866
                                   " disk templates are or get enabled.")
867

    
868
    if self.op.vg_name is None:
869
      if current_vg_name is None and lvm_is_enabled:
870
        raise errors.OpPrereqError("Please specify a volume group when"
871
                                   " enabling lvm-based disk-templates.")
872

    
873
    if self.op.vg_name is not None and not self.op.vg_name:
874
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
875
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
876
                                   " instances exist", errors.ECODE_INVAL)
877

    
878
    if (self.op.vg_name is not None and lvm_is_enabled) or \
879
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
880
      self._CheckVgNameOnNodes(node_uuids)
881

    
882
  def _CheckVgNameOnNodes(self, node_uuids):
883
    """Check the status of the volume group on each node.
884

885
    """
886
    vglist = self.rpc.call_vg_list(node_uuids)
887
    for node_uuid in node_uuids:
888
      msg = vglist[node_uuid].fail_msg
889
      if msg:
890
        # ignoring down node
891
        self.LogWarning("Error while gathering data on node %s"
892
                        " (ignoring node): %s",
893
                        self.cfg.GetNodeName(node_uuid), msg)
894
        continue
895
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
896
                                            self.op.vg_name,
897
                                            constants.MIN_VG_SIZE)
898
      if vgstatus:
899
        raise errors.OpPrereqError("Error on node '%s': %s" %
900
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
901
                                   errors.ECODE_ENVIRON)
902

    
903
  @staticmethod
904
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
905
                                old_enabled_disk_templates):
906
    """Computes three sets of disk templates.
907

908
    @see: C{_GetDiskTemplateSets} for more details.
909

910
    """
911
    enabled_disk_templates = None
912
    new_enabled_disk_templates = []
913
    disabled_disk_templates = []
914
    if op_enabled_disk_templates:
915
      enabled_disk_templates = op_enabled_disk_templates
916
      new_enabled_disk_templates = \
917
        list(set(enabled_disk_templates)
918
             - set(old_enabled_disk_templates))
919
      disabled_disk_templates = \
920
        list(set(old_enabled_disk_templates)
921
             - set(enabled_disk_templates))
922
    else:
923
      enabled_disk_templates = old_enabled_disk_templates
924
    return (enabled_disk_templates, new_enabled_disk_templates,
925
            disabled_disk_templates)
926

    
927
  def _GetDiskTemplateSets(self, cluster):
928
    """Computes three sets of disk templates.
929

930
    The three sets are:
931
      - disk templates that will be enabled after this operation (no matter if
932
        they were enabled before or not)
933
      - disk templates that get enabled by this operation (thus haven't been
934
        enabled before.)
935
      - disk templates that get disabled by this operation
936

937
    """
938
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
939
                                          cluster.enabled_disk_templates)
940

    
941
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
942
    """Checks the ipolicy.
943

944
    @type cluster: C{objects.Cluster}
945
    @param cluster: the cluster's configuration
946
    @type enabled_disk_templates: list of string
947
    @param enabled_disk_templates: list of (possibly newly) enabled disk
948
      templates
949

950
    """
951
    # FIXME: write unit tests for this
952
    if self.op.ipolicy:
953
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
954
                                           group_policy=False)
955

    
956
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
957
                                  enabled_disk_templates)
958

    
959
      all_instances = self.cfg.GetAllInstancesInfo().values()
960
      violations = set()
961
      for group in self.cfg.GetAllNodeGroupsInfo().values():
962
        instances = frozenset([inst for inst in all_instances
963
                               if compat.any(nuuid in group.members
964
                                             for nuuid in inst.all_nodes)])
965
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
966
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
967
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
968
                                           self.cfg)
969
        if new:
970
          violations.update(new)
971

    
972
      if violations:
973
        self.LogWarning("After the ipolicy change the following instances"
974
                        " violate them: %s",
975
                        utils.CommaJoin(utils.NiceSort(violations)))
976
    else:
977
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
978
                                  enabled_disk_templates)
979

    
980
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
981
    """Checks whether the set DRBD helper actually exists on the nodes.
982

983
    @type drbd_helper: string
984
    @param drbd_helper: path of the drbd usermode helper binary
985
    @type node_uuids: list of strings
986
    @param node_uuids: list of node UUIDs to check for the helper
987

988
    """
989
    # checks given drbd helper on all nodes
990
    helpers = self.rpc.call_drbd_helper(node_uuids)
991
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
992
      if ninfo.offline:
993
        self.LogInfo("Not checking drbd helper on offline node %s",
994
                     ninfo.name)
995
        continue
996
      msg = helpers[ninfo.uuid].fail_msg
997
      if msg:
998
        raise errors.OpPrereqError("Error checking drbd helper on node"
999
                                   " '%s': %s" % (ninfo.name, msg),
1000
                                   errors.ECODE_ENVIRON)
1001
      node_helper = helpers[ninfo.uuid].payload
1002
      if node_helper != drbd_helper:
1003
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
1004
                                   (ninfo.name, node_helper),
1005
                                   errors.ECODE_ENVIRON)
1006

    
1007
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
1008
    """Check the DRBD usermode helper.
1009

1010
    @type node_uuids: list of strings
1011
    @param node_uuids: a list of nodes' UUIDs
1012
    @type drbd_enabled: boolean
1013
    @param drbd_enabled: whether DRBD will be enabled after this operation
1014
      (no matter if it was disabled before or not)
1015
    @type drbd_gets_enabled: boolen
1016
    @param drbd_gets_enabled: true if DRBD was disabled before this
1017
      operation, but will be enabled afterwards
1018

1019
    """
1020
    if self.op.drbd_helper == '':
1021
      if drbd_enabled:
1022
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1023
                                   " DRBD is enabled.")
1024
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
1025
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1026
                                   " drbd-based instances exist",
1027
                                   errors.ECODE_INVAL)
1028

    
1029
    else:
1030
      if self.op.drbd_helper is not None and drbd_enabled:
1031
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
1032
      else:
1033
        if drbd_gets_enabled:
1034
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
1035
          if current_drbd_helper is not None:
1036
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
1037
          else:
1038
            raise errors.OpPrereqError("Cannot enable DRBD without a"
1039
                                       " DRBD usermode helper set.")
1040

    
1041
  def _CheckInstancesOfDisabledDiskTemplates(
1042
      self, disabled_disk_templates):
1043
    """Check whether we try to disable a disk template that is in use.
1044

1045
    @type disabled_disk_templates: list of string
1046
    @param disabled_disk_templates: list of disk templates that are going to
1047
      be disabled by this operation
1048

1049
    """
1050
    for disk_template in disabled_disk_templates:
1051
      if self.cfg.HasAnyDiskOfType(disk_template):
1052
        raise errors.OpPrereqError(
1053
            "Cannot disable disk template '%s', because there is at least one"
1054
            " instance using it." % disk_template)
1055

    
1056
  @staticmethod
1057
  def _CheckInstanceCommunicationNetwork(network, warning_fn):
1058
    """Check whether an existing network is configured for instance
1059
    communication.
1060

1061
    Checks whether an existing network is configured with the
1062
    parameters that are advisable for instance communication, and
1063
    otherwise issue security warnings.
1064

1065
    @type network: L{ganeti.objects.Network}
1066
    @param network: L{ganeti.objects.Network} object whose
1067
                    configuration is being checked
1068
    @type warning_fn: function
1069
    @param warning_fn: function used to print warnings
1070
    @rtype: None
1071
    @return: None
1072

1073
    """
1074
    def _MaybeWarn(err, val, default):
1075
      if val != default:
1076
        warning_fn("Supplied instance communication network '%s' %s '%s',"
1077
                   " this might pose a security risk (default is '%s').",
1078
                   network.name, err, val, default)
1079

    
1080
    if network.network is None:
1081
      raise errors.OpPrereqError("Supplied instance communication network '%s'"
1082
                                 " must have an IPv4 network address.",
1083
                                 network.name)
1084

    
1085
    _MaybeWarn("has an IPv4 gateway", network.gateway, None)
1086
    _MaybeWarn("has a non-standard IPv4 network address", network.network,
1087
               constants.INSTANCE_COMMUNICATION_NETWORK4)
1088
    _MaybeWarn("has an IPv6 gateway", network.gateway6, None)
1089
    _MaybeWarn("has a non-standard IPv6 network address", network.network6,
1090
               constants.INSTANCE_COMMUNICATION_NETWORK6)
1091
    _MaybeWarn("has a non-standard MAC prefix", network.mac_prefix,
1092
               constants.INSTANCE_COMMUNICATION_MAC_PREFIX)
1093

    
1094
  def CheckPrereq(self):
1095
    """Check prerequisites.
1096

1097
    This checks whether the given params don't conflict and
1098
    if the given volume group is valid.
1099

1100
    """
1101
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1102
    self.cluster = cluster = self.cfg.GetClusterInfo()
1103

    
1104
    vm_capable_node_uuids = [node.uuid
1105
                             for node in self.cfg.GetAllNodesInfo().values()
1106
                             if node.uuid in node_uuids and node.vm_capable]
1107

    
1108
    (enabled_disk_templates, new_enabled_disk_templates,
1109
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
1110
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
1111

    
1112
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
1113
                      new_enabled_disk_templates)
1114

    
1115
    if self.op.file_storage_dir is not None:
1116
      CheckFileStoragePathVsEnabledDiskTemplates(
1117
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
1118

    
1119
    if self.op.shared_file_storage_dir is not None:
1120
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
1121
          self.LogWarning, self.op.shared_file_storage_dir,
1122
          enabled_disk_templates)
1123

    
1124
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1125
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1126
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1127

    
1128
    # validate params changes
1129
    if self.op.beparams:
1130
      objects.UpgradeBeParams(self.op.beparams)
1131
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1132
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1133

    
1134
    if self.op.ndparams:
1135
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1136
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1137

    
1138
      # TODO: we need a more general way to handle resetting
1139
      # cluster-level parameters to default values
1140
      if self.new_ndparams["oob_program"] == "":
1141
        self.new_ndparams["oob_program"] = \
1142
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1143

    
1144
    if self.op.hv_state:
1145
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1146
                                           self.cluster.hv_state_static)
1147
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1148
                               for hv, values in new_hv_state.items())
1149

    
1150
    if self.op.disk_state:
1151
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1152
                                               self.cluster.disk_state_static)
1153
      self.new_disk_state = \
1154
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1155
                            for name, values in svalues.items()))
1156
             for storage, svalues in new_disk_state.items())
1157

    
1158
    self._CheckIpolicy(cluster, enabled_disk_templates)
1159

    
1160
    if self.op.nicparams:
1161
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1162
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1163
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1164
      nic_errors = []
1165

    
1166
      # check all instances for consistency
1167
      for instance in self.cfg.GetAllInstancesInfo().values():
1168
        for nic_idx, nic in enumerate(instance.nics):
1169
          params_copy = copy.deepcopy(nic.nicparams)
1170
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1171

    
1172
          # check parameter syntax
1173
          try:
1174
            objects.NIC.CheckParameterSyntax(params_filled)
1175
          except errors.ConfigurationError, err:
1176
            nic_errors.append("Instance %s, nic/%d: %s" %
1177
                              (instance.name, nic_idx, err))
1178

    
1179
          # if we're moving instances to routed, check that they have an ip
1180
          target_mode = params_filled[constants.NIC_MODE]
1181
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1182
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1183
                              " address" % (instance.name, nic_idx))
1184
      if nic_errors:
1185
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1186
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1187

    
1188
    # hypervisor list/parameters
1189
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1190
    if self.op.hvparams:
1191
      for hv_name, hv_dict in self.op.hvparams.items():
1192
        if hv_name not in self.new_hvparams:
1193
          self.new_hvparams[hv_name] = hv_dict
1194
        else:
1195
          self.new_hvparams[hv_name].update(hv_dict)
1196

    
1197
    # disk template parameters
1198
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1199
    if self.op.diskparams:
1200
      for dt_name, dt_params in self.op.diskparams.items():
1201
        if dt_name not in self.new_diskparams:
1202
          self.new_diskparams[dt_name] = dt_params
1203
        else:
1204
          self.new_diskparams[dt_name].update(dt_params)
1205
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1206

    
1207
    # os hypervisor parameters
1208
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1209
    if self.op.os_hvp:
1210
      for os_name, hvs in self.op.os_hvp.items():
1211
        if os_name not in self.new_os_hvp:
1212
          self.new_os_hvp[os_name] = hvs
1213
        else:
1214
          for hv_name, hv_dict in hvs.items():
1215
            if hv_dict is None:
1216
              # Delete if it exists
1217
              self.new_os_hvp[os_name].pop(hv_name, None)
1218
            elif hv_name not in self.new_os_hvp[os_name]:
1219
              self.new_os_hvp[os_name][hv_name] = hv_dict
1220
            else:
1221
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1222

    
1223
    # os parameters
1224
    self._BuildOSParams(cluster)
1225

    
1226
    # changes to the hypervisor list
1227
    if self.op.enabled_hypervisors is not None:
1228
      self.hv_list = self.op.enabled_hypervisors
1229
      for hv in self.hv_list:
1230
        # if the hypervisor doesn't already exist in the cluster
1231
        # hvparams, we initialize it to empty, and then (in both
1232
        # cases) we make sure to fill the defaults, as we might not
1233
        # have a complete defaults list if the hypervisor wasn't
1234
        # enabled before
1235
        if hv not in new_hvp:
1236
          new_hvp[hv] = {}
1237
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1238
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1239
    else:
1240
      self.hv_list = cluster.enabled_hypervisors
1241

    
1242
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1243
      # either the enabled list has changed, or the parameters have, validate
1244
      for hv_name, hv_params in self.new_hvparams.items():
1245
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1246
            (self.op.enabled_hypervisors and
1247
             hv_name in self.op.enabled_hypervisors)):
1248
          # either this is a new hypervisor, or its parameters have changed
1249
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1250
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1251
          hv_class.CheckParameterSyntax(hv_params)
1252
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1253

    
1254
    self._CheckDiskTemplateConsistency()
1255

    
1256
    if self.op.os_hvp:
1257
      # no need to check any newly-enabled hypervisors, since the
1258
      # defaults have already been checked in the above code-block
1259
      for os_name, os_hvp in self.new_os_hvp.items():
1260
        for hv_name, hv_params in os_hvp.items():
1261
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1262
          # we need to fill in the new os_hvp on top of the actual hv_p
1263
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1264
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1265
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1266
          hv_class.CheckParameterSyntax(new_osp)
1267
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1268

    
1269
    if self.op.default_iallocator:
1270
      alloc_script = utils.FindFile(self.op.default_iallocator,
1271
                                    constants.IALLOCATOR_SEARCH_PATH,
1272
                                    os.path.isfile)
1273
      if alloc_script is None:
1274
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1275
                                   " specified" % self.op.default_iallocator,
1276
                                   errors.ECODE_INVAL)
1277

    
1278
    if self.op.instance_communication_network:
1279
      network_name = self.op.instance_communication_network
1280

    
1281
      try:
1282
        network_uuid = self.cfg.LookupNetwork(network_name)
1283
      except errors.OpPrereqError:
1284
        network_uuid = None
1285

    
1286
      if network_uuid is not None:
1287
        network = self.cfg.GetNetwork(network_uuid)
1288
        self._CheckInstanceCommunicationNetwork(network, self.LogWarning)
1289

    
1290
  def _BuildOSParams(self, cluster):
1291
    "Calculate the new OS parameters for this operation."
1292

    
1293
    def _GetNewParams(source, new_params):
1294
      "Wrapper around GetUpdatedParams."
1295
      if new_params is None:
1296
        return source
1297
      result = objects.FillDict(source, {}) # deep copy of source
1298
      for os_name in new_params:
1299
        result[os_name] = GetUpdatedParams(result.get(os_name, {}),
1300
                                           new_params[os_name],
1301
                                           use_none=True)
1302
        if not result[os_name]:
1303
          del result[os_name] # we removed all parameters
1304
      return result
1305

    
1306
    self.new_osp = _GetNewParams(cluster.osparams,
1307
                                 self.op.osparams)
1308
    self.new_osp_private = _GetNewParams(cluster.osparams_private_cluster,
1309
                                         self.op.osparams_private_cluster)
1310

    
1311
    # Remove os validity check
1312
    changed_oses = (set(self.new_osp.keys()) | set(self.new_osp_private.keys()))
1313
    for os_name in changed_oses:
1314
      os_params = cluster.SimpleFillOS(
1315
        os_name,
1316
        self.new_osp.get(os_name, {}),
1317
        os_params_private=self.new_osp_private.get(os_name, {})
1318
      )
1319
      # check the parameter validity (remote check)
1320
      CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1321
                    os_name, os_params)
1322

    
1323
  def _CheckDiskTemplateConsistency(self):
1324
    """Check whether the disk templates that are going to be disabled
1325
       are still in use by some instances.
1326

1327
    """
1328
    if self.op.enabled_disk_templates:
1329
      cluster = self.cfg.GetClusterInfo()
1330
      instances = self.cfg.GetAllInstancesInfo()
1331

    
1332
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1333
        - set(self.op.enabled_disk_templates)
1334
      for instance in instances.itervalues():
1335
        if instance.disk_template in disk_templates_to_remove:
1336
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1337
                                     " because instance '%s' is using it." %
1338
                                     (instance.disk_template, instance.name))
1339

    
1340
  def _SetVgName(self, feedback_fn):
1341
    """Determines and sets the new volume group name.
1342

1343
    """
1344
    if self.op.vg_name is not None:
1345
      new_volume = self.op.vg_name
1346
      if not new_volume:
1347
        new_volume = None
1348
      if new_volume != self.cfg.GetVGName():
1349
        self.cfg.SetVGName(new_volume)
1350
      else:
1351
        feedback_fn("Cluster LVM configuration already in desired"
1352
                    " state, not changing")
1353

    
1354
  def _SetFileStorageDir(self, feedback_fn):
1355
    """Set the file storage directory.
1356

1357
    """
1358
    if self.op.file_storage_dir is not None:
1359
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1360
        feedback_fn("Global file storage dir already set to value '%s'"
1361
                    % self.cluster.file_storage_dir)
1362
      else:
1363
        self.cluster.file_storage_dir = self.op.file_storage_dir
1364

    
1365
  def _SetDrbdHelper(self, feedback_fn):
1366
    """Set the DRBD usermode helper.
1367

1368
    """
1369
    if self.op.drbd_helper is not None:
1370
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1371
        feedback_fn("Note that you specified a drbd user helper, but did not"
1372
                    " enable the drbd disk template.")
1373
      new_helper = self.op.drbd_helper
1374
      if not new_helper:
1375
        new_helper = None
1376
      if new_helper != self.cfg.GetDRBDHelper():
1377
        self.cfg.SetDRBDHelper(new_helper)
1378
      else:
1379
        feedback_fn("Cluster DRBD helper already in desired state,"
1380
                    " not changing")
1381

    
1382
  @staticmethod
1383
  def _EnsureInstanceCommunicationNetwork(cfg, network_name):
1384
    """Ensure that the instance communication network exists and is
1385
    connected to all groups.
1386

1387
    The instance communication network given by L{network_name} it is
1388
    created, if necessary, via the opcode 'OpNetworkAdd'.  Also, the
1389
    instance communication network is connected to all existing node
1390
    groups, if necessary, via the opcode 'OpNetworkConnect'.
1391

1392
    @type cfg: L{config.ConfigWriter}
1393
    @param cfg: cluster configuration
1394

1395
    @type network_name: string
1396
    @param network_name: instance communication network name
1397

1398
    @rtype: L{ganeti.cmdlib.ResultWithJobs} or L{None}
1399
    @return: L{ganeti.cmdlib.ResultWithJobs} if the instance
1400
             communication needs to be created or it needs to be
1401
             connected to a group, otherwise L{None}
1402

1403
    """
1404
    jobs = []
1405

    
1406
    try:
1407
      network_uuid = cfg.LookupNetwork(network_name)
1408
      network_exists = True
1409
    except errors.OpPrereqError:
1410
      network_exists = False
1411

    
1412
    if not network_exists:
1413
      jobs.append(AddInstanceCommunicationNetworkOp(network_name))
1414

    
1415
    for group_uuid in cfg.GetNodeGroupList():
1416
      group = cfg.GetNodeGroup(group_uuid)
1417

    
1418
      if network_exists:
1419
        network_connected = network_uuid in group.networks
1420
      else:
1421
        # The network was created asynchronously by the previous
1422
        # opcode and, therefore, we don't have access to its
1423
        # network_uuid.  As a result, we assume that the network is
1424
        # not connected to any group yet.
1425
        network_connected = False
1426

    
1427
      if not network_connected:
1428
        op = ConnectInstanceCommunicationNetworkOp(group_uuid, network_name)
1429
        jobs.append(op)
1430

    
1431
    if jobs:
1432
      return ResultWithJobs([jobs])
1433
    else:
1434
      return None
1435

    
1436
  @staticmethod
1437
  def _ModifyInstanceCommunicationNetwork(cfg, cluster, network_name,
1438
                                          feedback_fn):
1439
    """Update the instance communication network stored in the cluster
1440
    configuration.
1441

1442
    Compares the user-supplied instance communication network against
1443
    the one stored in the Ganeti cluster configuration.  If there is a
1444
    change, the instance communication network may be possibly created
1445
    and connected to all groups (see
1446
    L{LUClusterSetParams._EnsureInstanceCommunicationNetwork}).
1447

1448
    @type cfg: L{config.ConfigWriter}
1449
    @param cfg: cluster configuration
1450

1451
    @type cluster: L{ganeti.objects.Cluster}
1452
    @param cluster: Ganeti cluster
1453

1454
    @type network_name: string
1455
    @param network_name: instance communication network name
1456

1457
    @type feedback_fn: function
1458
    @param feedback_fn: see L{ganeti.cmdlist.base.LogicalUnit}
1459

1460
    @rtype: L{LUClusterSetParams._EnsureInstanceCommunicationNetwork} or L{None}
1461
    @return: see L{LUClusterSetParams._EnsureInstanceCommunicationNetwork}
1462

1463
    """
1464
    config_network_name = cfg.GetInstanceCommunicationNetwork()
1465

    
1466
    if network_name == config_network_name:
1467
      feedback_fn("Instance communication network already is '%s', nothing to"
1468
                  " do." % network_name)
1469
    else:
1470
      try:
1471
        cfg.LookupNetwork(config_network_name)
1472
        feedback_fn("Previous instance communication network '%s'"
1473
                    " should be removed manually." % config_network_name)
1474
      except errors.OpPrereqError:
1475
        pass
1476

    
1477
      if network_name:
1478
        feedback_fn("Changing instance communication network to '%s', only new"
1479
                    " instances will be affected."
1480
                    % network_name)
1481
      else:
1482
        feedback_fn("Disabling instance communication network, only new"
1483
                    " instances will be affected.")
1484

    
1485
      cluster.instance_communication_network = network_name
1486

    
1487
      if network_name:
1488
        return LUClusterSetParams._EnsureInstanceCommunicationNetwork(
1489
          cfg,
1490
          network_name)
1491
      else:
1492
        return None
1493

    
1494
  def Exec(self, feedback_fn):
1495
    """Change the parameters of the cluster.
1496

1497
    """
1498
    if self.op.enabled_disk_templates:
1499
      self.cluster.enabled_disk_templates = \
1500
        list(self.op.enabled_disk_templates)
1501

    
1502
    self._SetVgName(feedback_fn)
1503
    self._SetFileStorageDir(feedback_fn)
1504
    self._SetDrbdHelper(feedback_fn)
1505

    
1506
    if self.op.hvparams:
1507
      self.cluster.hvparams = self.new_hvparams
1508
    if self.op.os_hvp:
1509
      self.cluster.os_hvp = self.new_os_hvp
1510
    if self.op.enabled_hypervisors is not None:
1511
      self.cluster.hvparams = self.new_hvparams
1512
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1513
    if self.op.beparams:
1514
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1515
    if self.op.nicparams:
1516
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1517
    if self.op.ipolicy:
1518
      self.cluster.ipolicy = self.new_ipolicy
1519
    if self.op.osparams:
1520
      self.cluster.osparams = self.new_osp
1521
    if self.op.osparams_private_cluster:
1522
      self.cluster.osparams_private_cluster = self.new_osp_private
1523
    if self.op.ndparams:
1524
      self.cluster.ndparams = self.new_ndparams
1525
    if self.op.diskparams:
1526
      self.cluster.diskparams = self.new_diskparams
1527
    if self.op.hv_state:
1528
      self.cluster.hv_state_static = self.new_hv_state
1529
    if self.op.disk_state:
1530
      self.cluster.disk_state_static = self.new_disk_state
1531

    
1532
    if self.op.candidate_pool_size is not None:
1533
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1534
      # we need to update the pool size here, otherwise the save will fail
1535
      AdjustCandidatePool(self, [], feedback_fn)
1536

    
1537
    if self.op.max_running_jobs is not None:
1538
      self.cluster.max_running_jobs = self.op.max_running_jobs
1539

    
1540
    if self.op.maintain_node_health is not None:
1541
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1542
        feedback_fn("Note: CONFD was disabled at build time, node health"
1543
                    " maintenance is not useful (still enabling it)")
1544
      self.cluster.maintain_node_health = self.op.maintain_node_health
1545

    
1546
    if self.op.modify_etc_hosts is not None:
1547
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1548

    
1549
    if self.op.prealloc_wipe_disks is not None:
1550
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1551

    
1552
    if self.op.add_uids is not None:
1553
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1554

    
1555
    if self.op.remove_uids is not None:
1556
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1557

    
1558
    if self.op.uid_pool is not None:
1559
      self.cluster.uid_pool = self.op.uid_pool
1560

    
1561
    if self.op.default_iallocator is not None:
1562
      self.cluster.default_iallocator = self.op.default_iallocator
1563

    
1564
    if self.op.default_iallocator_params is not None:
1565
      self.cluster.default_iallocator_params = self.op.default_iallocator_params
1566

    
1567
    if self.op.reserved_lvs is not None:
1568
      self.cluster.reserved_lvs = self.op.reserved_lvs
1569

    
1570
    if self.op.use_external_mip_script is not None:
1571
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1572

    
1573
    def helper_os(aname, mods, desc):
1574
      desc += " OS list"
1575
      lst = getattr(self.cluster, aname)
1576
      for key, val in mods:
1577
        if key == constants.DDM_ADD:
1578
          if val in lst:
1579
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1580
          else:
1581
            lst.append(val)
1582
        elif key == constants.DDM_REMOVE:
1583
          if val in lst:
1584
            lst.remove(val)
1585
          else:
1586
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1587
        else:
1588
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1589

    
1590
    if self.op.hidden_os:
1591
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1592

    
1593
    if self.op.blacklisted_os:
1594
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1595

    
1596
    if self.op.mac_prefix:
1597
      self.cluster.mac_prefix = self.op.mac_prefix
1598

    
1599
    if self.op.master_netdev:
1600
      master_params = self.cfg.GetMasterNetworkParameters()
1601
      ems = self.cfg.GetUseExternalMipScript()
1602
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1603
                  self.cluster.master_netdev)
1604
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1605
                                                       master_params, ems)
1606
      if not self.op.force:
1607
        result.Raise("Could not disable the master ip")
1608
      else:
1609
        if result.fail_msg:
1610
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1611
                 result.fail_msg)
1612
          feedback_fn(msg)
1613
      feedback_fn("Changing master_netdev from %s to %s" %
1614
                  (master_params.netdev, self.op.master_netdev))
1615
      self.cluster.master_netdev = self.op.master_netdev
1616

    
1617
    if self.op.master_netmask:
1618
      master_params = self.cfg.GetMasterNetworkParameters()
1619
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1620
      result = self.rpc.call_node_change_master_netmask(
1621
                 master_params.uuid, master_params.netmask,
1622
                 self.op.master_netmask, master_params.ip,
1623
                 master_params.netdev)
1624
      result.Warn("Could not change the master IP netmask", feedback_fn)
1625
      self.cluster.master_netmask = self.op.master_netmask
1626

    
1627
    self.cfg.Update(self.cluster, feedback_fn)
1628

    
1629
    if self.op.master_netdev:
1630
      master_params = self.cfg.GetMasterNetworkParameters()
1631
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1632
                  self.op.master_netdev)
1633
      ems = self.cfg.GetUseExternalMipScript()
1634
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1635
                                                     master_params, ems)
1636
      result.Warn("Could not re-enable the master ip on the master,"
1637
                  " please restart manually", self.LogWarning)
1638

    
1639
    network_name = self.op.instance_communication_network
1640
    if network_name is not None:
1641
      return self._ModifyInstanceCommunicationNetwork(self.cfg, self.cluster,
1642
                                                      network_name, feedback_fn)
1643
    else:
1644
      return None
1645

    
1646

    
1647
class LUClusterVerify(NoHooksLU):
1648
  """Submits all jobs necessary to verify the cluster.
1649

1650
  """
1651
  REQ_BGL = False
1652

    
1653
  def ExpandNames(self):
1654
    self.needed_locks = {}
1655

    
1656
  def Exec(self, feedback_fn):
1657
    jobs = []
1658

    
1659
    if self.op.group_name:
1660
      groups = [self.op.group_name]
1661
      depends_fn = lambda: None
1662
    else:
1663
      groups = self.cfg.GetNodeGroupList()
1664

    
1665
      # Verify global configuration
1666
      jobs.append([
1667
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1668
        ])
1669

    
1670
      # Always depend on global verification
1671
      depends_fn = lambda: [(-len(jobs), [])]
1672

    
1673
    jobs.extend(
1674
      [opcodes.OpClusterVerifyGroup(group_name=group,
1675
                                    ignore_errors=self.op.ignore_errors,
1676
                                    depends=depends_fn())]
1677
      for group in groups)
1678

    
1679
    # Fix up all parameters
1680
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1681
      op.debug_simulate_errors = self.op.debug_simulate_errors
1682
      op.verbose = self.op.verbose
1683
      op.error_codes = self.op.error_codes
1684
      try:
1685
        op.skip_checks = self.op.skip_checks
1686
      except AttributeError:
1687
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1688

    
1689
    return ResultWithJobs(jobs)
1690

    
1691

    
1692
class _VerifyErrors(object):
1693
  """Mix-in for cluster/group verify LUs.
1694

1695
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1696
  self.op and self._feedback_fn to be available.)
1697

1698
  """
1699

    
1700
  ETYPE_FIELD = "code"
1701
  ETYPE_ERROR = constants.CV_ERROR
1702
  ETYPE_WARNING = constants.CV_WARNING
1703

    
1704
  def _Error(self, ecode, item, msg, *args, **kwargs):
1705
    """Format an error message.
1706

1707
    Based on the opcode's error_codes parameter, either format a
1708
    parseable error code, or a simpler error string.
1709

1710
    This must be called only from Exec and functions called from Exec.
1711

1712
    """
1713
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1714
    itype, etxt, _ = ecode
1715
    # If the error code is in the list of ignored errors, demote the error to a
1716
    # warning
1717
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1718
      ltype = self.ETYPE_WARNING
1719
    # first complete the msg
1720
    if args:
1721
      msg = msg % args
1722
    # then format the whole message
1723
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1724
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1725
    else:
1726
      if item:
1727
        item = " " + item
1728
      else:
1729
        item = ""
1730
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1731
    # and finally report it via the feedback_fn
1732
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1733
    # do not mark the operation as failed for WARN cases only
1734
    if ltype == self.ETYPE_ERROR:
1735
      self.bad = True
1736

    
1737
  def _ErrorIf(self, cond, *args, **kwargs):
1738
    """Log an error message if the passed condition is True.
1739

1740
    """
1741
    if (bool(cond)
1742
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1743
      self._Error(*args, **kwargs)
1744

    
1745

    
1746
def _GetAllHypervisorParameters(cluster, instances):
1747
  """Compute the set of all hypervisor parameters.
1748

1749
  @type cluster: L{objects.Cluster}
1750
  @param cluster: the cluster object
1751
  @param instances: list of L{objects.Instance}
1752
  @param instances: additional instances from which to obtain parameters
1753
  @rtype: list of (origin, hypervisor, parameters)
1754
  @return: a list with all parameters found, indicating the hypervisor they
1755
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1756

1757
  """
1758
  hvp_data = []
1759

    
1760
  for hv_name in cluster.enabled_hypervisors:
1761
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1762

    
1763
  for os_name, os_hvp in cluster.os_hvp.items():
1764
    for hv_name, hv_params in os_hvp.items():
1765
      if hv_params:
1766
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1767
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1768

    
1769
  # TODO: collapse identical parameter values in a single one
1770
  for instance in instances:
1771
    if instance.hvparams:
1772
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1773
                       cluster.FillHV(instance)))
1774

    
1775
  return hvp_data
1776

    
1777

    
1778
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1779
  """Verifies the cluster config.
1780

1781
  """
1782
  REQ_BGL = False
1783

    
1784
  def _VerifyHVP(self, hvp_data):
1785
    """Verifies locally the syntax of the hypervisor parameters.
1786

1787
    """
1788
    for item, hv_name, hv_params in hvp_data:
1789
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1790
             (item, hv_name))
1791
      try:
1792
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1793
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1794
        hv_class.CheckParameterSyntax(hv_params)
1795
      except errors.GenericError, err:
1796
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1797

    
1798
  def ExpandNames(self):
1799
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1800
    self.share_locks = ShareAll()
1801

    
1802
  def CheckPrereq(self):
1803
    """Check prerequisites.
1804

1805
    """
1806
    # Retrieve all information
1807
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1808
    self.all_node_info = self.cfg.GetAllNodesInfo()
1809
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1810

    
1811
  def Exec(self, feedback_fn):
1812
    """Verify integrity of cluster, performing various test on nodes.
1813

1814
    """
1815
    self.bad = False
1816
    self._feedback_fn = feedback_fn
1817

    
1818
    feedback_fn("* Verifying cluster config")
1819

    
1820
    for msg in self.cfg.VerifyConfig():
1821
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1822

    
1823
    feedback_fn("* Verifying cluster certificate files")
1824

    
1825
    for cert_filename in pathutils.ALL_CERT_FILES:
1826
      (errcode, msg) = utils.VerifyCertificate(cert_filename)
1827
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1828

    
1829
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1830
                                    pathutils.NODED_CERT_FILE),
1831
                  constants.CV_ECLUSTERCERT,
1832
                  None,
1833
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1834
                    constants.LUXID_USER + " user")
1835

    
1836
    feedback_fn("* Verifying hypervisor parameters")
1837

    
1838
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1839
                                                self.all_inst_info.values()))
1840

    
1841
    feedback_fn("* Verifying all nodes belong to an existing group")
1842

    
1843
    # We do this verification here because, should this bogus circumstance
1844
    # occur, it would never be caught by VerifyGroup, which only acts on
1845
    # nodes/instances reachable from existing node groups.
1846

    
1847
    dangling_nodes = set(node for node in self.all_node_info.values()
1848
                         if node.group not in self.all_group_info)
1849

    
1850
    dangling_instances = {}
1851
    no_node_instances = []
1852

    
1853
    for inst in self.all_inst_info.values():
1854
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1855
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1856
      elif inst.primary_node not in self.all_node_info:
1857
        no_node_instances.append(inst)
1858

    
1859
    pretty_dangling = [
1860
        "%s (%s)" %
1861
        (node.name,
1862
         utils.CommaJoin(inst.name for
1863
                         inst in dangling_instances.get(node.uuid, [])))
1864
        for node in dangling_nodes]
1865

    
1866
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1867
                  None,
1868
                  "the following nodes (and their instances) belong to a non"
1869
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1870

    
1871
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1872
                  None,
1873
                  "the following instances have a non-existing primary-node:"
1874
                  " %s", utils.CommaJoin(inst.name for
1875
                                         inst in no_node_instances))
1876

    
1877
    return not self.bad
1878

    
1879

    
1880
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1881
  """Verifies the status of a node group.
1882

1883
  """
1884
  HPATH = "cluster-verify"
1885
  HTYPE = constants.HTYPE_CLUSTER
1886
  REQ_BGL = False
1887

    
1888
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1889

    
1890
  class NodeImage(object):
1891
    """A class representing the logical and physical status of a node.
1892

1893
    @type uuid: string
1894
    @ivar uuid: the node UUID to which this object refers
1895
    @ivar volumes: a structure as returned from
1896
        L{ganeti.backend.GetVolumeList} (runtime)
1897
    @ivar instances: a list of running instances (runtime)
1898
    @ivar pinst: list of configured primary instances (config)
1899
    @ivar sinst: list of configured secondary instances (config)
1900
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1901
        instances for which this node is secondary (config)
1902
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1903
    @ivar dfree: free disk, as reported by the node (runtime)
1904
    @ivar offline: the offline status (config)
1905
    @type rpc_fail: boolean
1906
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1907
        not whether the individual keys were correct) (runtime)
1908
    @type lvm_fail: boolean
1909
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1910
    @type hyp_fail: boolean
1911
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1912
    @type ghost: boolean
1913
    @ivar ghost: whether this is a known node or not (config)
1914
    @type os_fail: boolean
1915
    @ivar os_fail: whether the RPC call didn't return valid OS data
1916
    @type oslist: list
1917
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1918
    @type vm_capable: boolean
1919
    @ivar vm_capable: whether the node can host instances
1920
    @type pv_min: float
1921
    @ivar pv_min: size in MiB of the smallest PVs
1922
    @type pv_max: float
1923
    @ivar pv_max: size in MiB of the biggest PVs
1924

1925
    """
1926
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1927
      self.uuid = uuid
1928
      self.volumes = {}
1929
      self.instances = []
1930
      self.pinst = []
1931
      self.sinst = []
1932
      self.sbp = {}
1933
      self.mfree = 0
1934
      self.dfree = 0
1935
      self.offline = offline
1936
      self.vm_capable = vm_capable
1937
      self.rpc_fail = False
1938
      self.lvm_fail = False
1939
      self.hyp_fail = False
1940
      self.ghost = False
1941
      self.os_fail = False
1942
      self.oslist = {}
1943
      self.pv_min = None
1944
      self.pv_max = None
1945

    
1946
  def ExpandNames(self):
1947
    # This raises errors.OpPrereqError on its own:
1948
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1949

    
1950
    # Get instances in node group; this is unsafe and needs verification later
1951
    inst_uuids = \
1952
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1953

    
1954
    self.needed_locks = {
1955
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1956
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1957
      locking.LEVEL_NODE: [],
1958

    
1959
      # This opcode is run by watcher every five minutes and acquires all nodes
1960
      # for a group. It doesn't run for a long time, so it's better to acquire
1961
      # the node allocation lock as well.
1962
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1963
      }
1964

    
1965
    self.share_locks = ShareAll()
1966

    
1967
  def DeclareLocks(self, level):
1968
    if level == locking.LEVEL_NODE:
1969
      # Get members of node group; this is unsafe and needs verification later
1970
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1971

    
1972
      # In Exec(), we warn about mirrored instances that have primary and
1973
      # secondary living in separate node groups. To fully verify that
1974
      # volumes for these instances are healthy, we will need to do an
1975
      # extra call to their secondaries. We ensure here those nodes will
1976
      # be locked.
1977
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1978
        # Important: access only the instances whose lock is owned
1979
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1980
        if instance.disk_template in constants.DTS_INT_MIRROR:
1981
          nodes.update(instance.secondary_nodes)
1982

    
1983
      self.needed_locks[locking.LEVEL_NODE] = nodes
1984

    
1985
  def CheckPrereq(self):
1986
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1987
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1988

    
1989
    group_node_uuids = set(self.group_info.members)
1990
    group_inst_uuids = \
1991
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1992

    
1993
    unlocked_node_uuids = \
1994
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1995

    
1996
    unlocked_inst_uuids = \
1997
        group_inst_uuids.difference(
1998
          [self.cfg.GetInstanceInfoByName(name).uuid
1999
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
2000

    
2001
    if unlocked_node_uuids:
2002
      raise errors.OpPrereqError(
2003
        "Missing lock for nodes: %s" %
2004
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
2005
        errors.ECODE_STATE)
2006

    
2007
    if unlocked_inst_uuids:
2008
      raise errors.OpPrereqError(
2009
        "Missing lock for instances: %s" %
2010
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
2011
        errors.ECODE_STATE)
2012

    
2013
    self.all_node_info = self.cfg.GetAllNodesInfo()
2014
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
2015

    
2016
    self.my_node_uuids = group_node_uuids
2017
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
2018
                             for node_uuid in group_node_uuids)
2019

    
2020
    self.my_inst_uuids = group_inst_uuids
2021
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
2022
                             for inst_uuid in group_inst_uuids)
2023

    
2024
    # We detect here the nodes that will need the extra RPC calls for verifying
2025
    # split LV volumes; they should be locked.
2026
    extra_lv_nodes = set()
2027

    
2028
    for inst in self.my_inst_info.values():
2029
      if inst.disk_template in constants.DTS_INT_MIRROR:
2030
        for nuuid in inst.all_nodes:
2031
          if self.all_node_info[nuuid].group != self.group_uuid:
2032
            extra_lv_nodes.add(nuuid)
2033

    
2034
    unlocked_lv_nodes = \
2035
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
2036

    
2037
    if unlocked_lv_nodes:
2038
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
2039
                                 utils.CommaJoin(unlocked_lv_nodes),
2040
                                 errors.ECODE_STATE)
2041
    self.extra_lv_nodes = list(extra_lv_nodes)
2042

    
2043
  def _VerifyNode(self, ninfo, nresult):
2044
    """Perform some basic validation on data returned from a node.
2045

2046
      - check the result data structure is well formed and has all the
2047
        mandatory fields
2048
      - check ganeti version
2049

2050
    @type ninfo: L{objects.Node}
2051
    @param ninfo: the node to check
2052
    @param nresult: the results from the node
2053
    @rtype: boolean
2054
    @return: whether overall this call was successful (and we can expect
2055
         reasonable values in the respose)
2056

2057
    """
2058
    # main result, nresult should be a non-empty dict
2059
    test = not nresult or not isinstance(nresult, dict)
2060
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
2061
                  "unable to verify node: no data returned")
2062
    if test:
2063
      return False
2064

    
2065
    # compares ganeti version
2066
    local_version = constants.PROTOCOL_VERSION
2067
    remote_version = nresult.get("version", None)
2068
    test = not (remote_version and
2069
                isinstance(remote_version, (list, tuple)) and
2070
                len(remote_version) == 2)
2071
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
2072
                  "connection to node returned invalid data")
2073
    if test:
2074
      return False
2075

    
2076
    test = local_version != remote_version[0]
2077
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
2078
                  "incompatible protocol versions: master %s,"
2079
                  " node %s", local_version, remote_version[0])
2080
    if test:
2081
      return False
2082

    
2083
    # node seems compatible, we can actually try to look into its results
2084

    
2085
    # full package version
2086
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
2087
                  constants.CV_ENODEVERSION, ninfo.name,
2088
                  "software version mismatch: master %s, node %s",
2089
                  constants.RELEASE_VERSION, remote_version[1],
2090
                  code=self.ETYPE_WARNING)
2091

    
2092
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
2093
    if ninfo.vm_capable and isinstance(hyp_result, dict):
2094
      for hv_name, hv_result in hyp_result.iteritems():
2095
        test = hv_result is not None
2096
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2097
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
2098

    
2099
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
2100
    if ninfo.vm_capable and isinstance(hvp_result, list):
2101
      for item, hv_name, hv_result in hvp_result:
2102
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
2103
                      "hypervisor %s parameter verify failure (source %s): %s",
2104
                      hv_name, item, hv_result)
2105

    
2106
    test = nresult.get(constants.NV_NODESETUP,
2107
                       ["Missing NODESETUP results"])
2108
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
2109
                  "node setup error: %s", "; ".join(test))
2110

    
2111
    return True
2112

    
2113
  def _VerifyNodeTime(self, ninfo, nresult,
2114
                      nvinfo_starttime, nvinfo_endtime):
2115
    """Check the node time.
2116

2117
    @type ninfo: L{objects.Node}
2118
    @param ninfo: the node to check
2119
    @param nresult: the remote results for the node
2120
    @param nvinfo_starttime: the start time of the RPC call
2121
    @param nvinfo_endtime: the end time of the RPC call
2122

2123
    """
2124
    ntime = nresult.get(constants.NV_TIME, None)
2125
    try:
2126
      ntime_merged = utils.MergeTime(ntime)
2127
    except (ValueError, TypeError):
2128
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
2129
                    "Node returned invalid time")
2130
      return
2131

    
2132
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
2133
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
2134
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
2135
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
2136
    else:
2137
      ntime_diff = None
2138

    
2139
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
2140
                  "Node time diverges by at least %s from master node time",
2141
                  ntime_diff)
2142

    
2143
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
2144
    """Check the node LVM results and update info for cross-node checks.
2145

2146
    @type ninfo: L{objects.Node}
2147
    @param ninfo: the node to check
2148
    @param nresult: the remote results for the node
2149
    @param vg_name: the configured VG name
2150
    @type nimg: L{NodeImage}
2151
    @param nimg: node image
2152

2153
    """
2154
    if vg_name is None:
2155
      return
2156

    
2157
    # checks vg existence and size > 20G
2158
    vglist = nresult.get(constants.NV_VGLIST, None)
2159
    test = not vglist
2160
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2161
                  "unable to check volume groups")
2162
    if not test:
2163
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
2164
                                            constants.MIN_VG_SIZE)
2165
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
2166

    
2167
    # Check PVs
2168
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
2169
    for em in errmsgs:
2170
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
2171
    if pvminmax is not None:
2172
      (nimg.pv_min, nimg.pv_max) = pvminmax
2173

    
2174
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
2175
    """Check cross-node DRBD version consistency.
2176

2177
    @type node_verify_infos: dict
2178
    @param node_verify_infos: infos about nodes as returned from the
2179
      node_verify call.
2180

2181
    """
2182
    node_versions = {}
2183
    for node_uuid, ndata in node_verify_infos.items():
2184
      nresult = ndata.payload
2185
      if nresult:
2186
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
2187
        node_versions[node_uuid] = version
2188

    
2189
    if len(set(node_versions.values())) > 1:
2190
      for node_uuid, version in sorted(node_versions.items()):
2191
        msg = "DRBD version mismatch: %s" % version
2192
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
2193
                    code=self.ETYPE_WARNING)
2194

    
2195
  def _VerifyGroupLVM(self, node_image, vg_name):
2196
    """Check cross-node consistency in LVM.
2197

2198
    @type node_image: dict
2199
    @param node_image: info about nodes, mapping from node to names to
2200
      L{NodeImage} objects
2201
    @param vg_name: the configured VG name
2202

2203
    """
2204
    if vg_name is None:
2205
      return
2206

    
2207
    # Only exclusive storage needs this kind of checks
2208
    if not self._exclusive_storage:
2209
      return
2210

    
2211
    # exclusive_storage wants all PVs to have the same size (approximately),
2212
    # if the smallest and the biggest ones are okay, everything is fine.
2213
    # pv_min is None iff pv_max is None
2214
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
2215
    if not vals:
2216
      return
2217
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
2218
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
2219
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
2220
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
2221
                  "PV sizes differ too much in the group; smallest (%s MB) is"
2222
                  " on %s, biggest (%s MB) is on %s",
2223
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
2224
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
2225

    
2226
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
2227
    """Check the node bridges.
2228

2229
    @type ninfo: L{objects.Node}
2230
    @param ninfo: the node to check
2231
    @param nresult: the remote results for the node
2232
    @param bridges: the expected list of bridges
2233

2234
    """
2235
    if not bridges:
2236
      return
2237

    
2238
    missing = nresult.get(constants.NV_BRIDGES, None)
2239
    test = not isinstance(missing, list)
2240
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2241
                  "did not return valid bridge information")
2242
    if not test:
2243
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
2244
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
2245

    
2246
  def _VerifyNodeUserScripts(self, ninfo, nresult):
2247
    """Check the results of user scripts presence and executability on the node
2248

2249
    @type ninfo: L{objects.Node}
2250
    @param ninfo: the node to check
2251
    @param nresult: the remote results for the node
2252

2253
    """
2254
    test = not constants.NV_USERSCRIPTS in nresult
2255
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2256
                  "did not return user scripts information")
2257

    
2258
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2259
    if not test:
2260
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2261
                    "user scripts not present or not executable: %s" %
2262
                    utils.CommaJoin(sorted(broken_scripts)))
2263

    
2264
  def _VerifyNodeNetwork(self, ninfo, nresult):
2265
    """Check the node network connectivity results.
2266

2267
    @type ninfo: L{objects.Node}
2268
    @param ninfo: the node to check
2269
    @param nresult: the remote results for the node
2270

2271
    """
2272
    test = constants.NV_NODELIST not in nresult
2273
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2274
                  "node hasn't returned node ssh connectivity data")
2275
    if not test:
2276
      if nresult[constants.NV_NODELIST]:
2277
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2278
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2279
                        "ssh communication with node '%s': %s", a_node, a_msg)
2280

    
2281
    test = constants.NV_NODENETTEST not in nresult
2282
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2283
                  "node hasn't returned node tcp connectivity data")
2284
    if not test:
2285
      if nresult[constants.NV_NODENETTEST]:
2286
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2287
        for anode in nlist:
2288
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2289
                        "tcp communication with node '%s': %s",
2290
                        anode, nresult[constants.NV_NODENETTEST][anode])
2291

    
2292
    test = constants.NV_MASTERIP not in nresult
2293
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2294
                  "node hasn't returned node master IP reachability data")
2295
    if not test:
2296
      if not nresult[constants.NV_MASTERIP]:
2297
        if ninfo.uuid == self.master_node:
2298
          msg = "the master node cannot reach the master IP (not configured?)"
2299
        else:
2300
          msg = "cannot reach the master IP"
2301
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2302

    
2303
  def _VerifyInstance(self, instance, node_image, diskstatus):
2304
    """Verify an instance.
2305

2306
    This function checks to see if the required block devices are
2307
    available on the instance's node, and that the nodes are in the correct
2308
    state.
2309

2310
    """
2311
    pnode_uuid = instance.primary_node
2312
    pnode_img = node_image[pnode_uuid]
2313
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2314

    
2315
    node_vol_should = {}
2316
    instance.MapLVsByNode(node_vol_should)
2317

    
2318
    cluster = self.cfg.GetClusterInfo()
2319
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2320
                                                            self.group_info)
2321
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2322
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2323
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2324

    
2325
    for node_uuid in node_vol_should:
2326
      n_img = node_image[node_uuid]
2327
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2328
        # ignore missing volumes on offline or broken nodes
2329
        continue
2330
      for volume in node_vol_should[node_uuid]:
2331
        test = volume not in n_img.volumes
2332
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2333
                      "volume %s missing on node %s", volume,
2334
                      self.cfg.GetNodeName(node_uuid))
2335

    
2336
    if instance.admin_state == constants.ADMINST_UP:
2337
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2338
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2339
                    "instance not running on its primary node %s",
2340
                     self.cfg.GetNodeName(pnode_uuid))
2341
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2342
                    instance.name, "instance is marked as running and lives on"
2343
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2344

    
2345
    diskdata = [(nname, success, status, idx)
2346
                for (nname, disks) in diskstatus.items()
2347
                for idx, (success, status) in enumerate(disks)]
2348

    
2349
    for nname, success, bdev_status, idx in diskdata:
2350
      # the 'ghost node' construction in Exec() ensures that we have a
2351
      # node here
2352
      snode = node_image[nname]
2353
      bad_snode = snode.ghost or snode.offline
2354
      self._ErrorIf(instance.disks_active and
2355
                    not success and not bad_snode,
2356
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2357
                    "couldn't retrieve status for disk/%s on %s: %s",
2358
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2359

    
2360
      if instance.disks_active and success and \
2361
         (bdev_status.is_degraded or
2362
          bdev_status.ldisk_status != constants.LDS_OKAY):
2363
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2364
        if bdev_status.is_degraded:
2365
          msg += " is degraded"
2366
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2367
          msg += "; state is '%s'" % \
2368
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2369

    
2370
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2371

    
2372
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2373
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2374
                  "instance %s, connection to primary node failed",
2375
                  instance.name)
2376

    
2377
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2378
                  constants.CV_EINSTANCELAYOUT, instance.name,
2379
                  "instance has multiple secondary nodes: %s",
2380
                  utils.CommaJoin(instance.secondary_nodes),
2381
                  code=self.ETYPE_WARNING)
2382

    
2383
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2384
    if any(es_flags.values()):
2385
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2386
        # Disk template not compatible with exclusive_storage: no instance
2387
        # node should have the flag set
2388
        es_nodes = [n
2389
                    for (n, es) in es_flags.items()
2390
                    if es]
2391
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2392
                    "instance has template %s, which is not supported on nodes"
2393
                    " that have exclusive storage set: %s",
2394
                    instance.disk_template,
2395
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2396
      for (idx, disk) in enumerate(instance.disks):
2397
        self._ErrorIf(disk.spindles is None,
2398
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2399
                      "number of spindles not configured for disk %s while"
2400
                      " exclusive storage is enabled, try running"
2401
                      " gnt-cluster repair-disk-sizes", idx)
2402

    
2403
    if instance.disk_template in constants.DTS_INT_MIRROR:
2404
      instance_nodes = utils.NiceSort(instance.all_nodes)
2405
      instance_groups = {}
2406

    
2407
      for node_uuid in instance_nodes:
2408
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2409
                                   []).append(node_uuid)
2410

    
2411
      pretty_list = [
2412
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2413
                           groupinfo[group].name)
2414
        # Sort so that we always list the primary node first.
2415
        for group, nodes in sorted(instance_groups.items(),
2416
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2417
                                   reverse=True)]
2418

    
2419
      self._ErrorIf(len(instance_groups) > 1,
2420
                    constants.CV_EINSTANCESPLITGROUPS,
2421
                    instance.name, "instance has primary and secondary nodes in"
2422
                    " different groups: %s", utils.CommaJoin(pretty_list),
2423
                    code=self.ETYPE_WARNING)
2424

    
2425
    inst_nodes_offline = []
2426
    for snode in instance.secondary_nodes:
2427
      s_img = node_image[snode]
2428
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2429
                    self.cfg.GetNodeName(snode),
2430
                    "instance %s, connection to secondary node failed",
2431
                    instance.name)
2432

    
2433
      if s_img.offline:
2434
        inst_nodes_offline.append(snode)
2435

    
2436
    # warn that the instance lives on offline nodes
2437
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2438
                  instance.name, "instance has offline secondary node(s) %s",
2439
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2440
    # ... or ghost/non-vm_capable nodes
2441
    for node_uuid in instance.all_nodes:
2442
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2443
                    instance.name, "instance lives on ghost node %s",
2444
                    self.cfg.GetNodeName(node_uuid))
2445
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2446
                    constants.CV_EINSTANCEBADNODE, instance.name,
2447
                    "instance lives on non-vm_capable node %s",
2448
                    self.cfg.GetNodeName(node_uuid))
2449

    
2450
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2451
    """Verify if there are any unknown volumes in the cluster.
2452

2453
    The .os, .swap and backup volumes are ignored. All other volumes are
2454
    reported as unknown.
2455

2456
    @type reserved: L{ganeti.utils.FieldSet}
2457
    @param reserved: a FieldSet of reserved volume names
2458

2459
    """
2460
    for node_uuid, n_img in node_image.items():
2461
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2462
          self.all_node_info[node_uuid].group != self.group_uuid):
2463
        # skip non-healthy nodes
2464
        continue
2465
      for volume in n_img.volumes:
2466
        test = ((node_uuid not in node_vol_should or
2467
                volume not in node_vol_should[node_uuid]) and
2468
                not reserved.Matches(volume))
2469
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2470
                      self.cfg.GetNodeName(node_uuid),
2471
                      "volume %s is unknown", volume,
2472
                      code=_VerifyErrors.ETYPE_WARNING)
2473

    
2474
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2475
    """Verify N+1 Memory Resilience.
2476

2477
    Check that if one single node dies we can still start all the
2478
    instances it was primary for.
2479

2480
    """
2481
    cluster_info = self.cfg.GetClusterInfo()
2482
    for node_uuid, n_img in node_image.items():
2483
      # This code checks that every node which is now listed as
2484
      # secondary has enough memory to host all instances it is
2485
      # supposed to should a single other node in the cluster fail.
2486
      # FIXME: not ready for failover to an arbitrary node
2487
      # FIXME: does not support file-backed instances
2488
      # WARNING: we currently take into account down instances as well
2489
      # as up ones, considering that even if they're down someone
2490
      # might want to start them even in the event of a node failure.
2491
      if n_img.offline or \
2492
         self.all_node_info[node_uuid].group != self.group_uuid:
2493
        # we're skipping nodes marked offline and nodes in other groups from
2494
        # the N+1 warning, since most likely we don't have good memory
2495
        # information from them; we already list instances living on such
2496
        # nodes, and that's enough warning
2497
        continue
2498
      #TODO(dynmem): also consider ballooning out other instances
2499
      for prinode, inst_uuids in n_img.sbp.items():
2500
        needed_mem = 0
2501
        for inst_uuid in inst_uuids:
2502
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2503
          if bep[constants.BE_AUTO_BALANCE]:
2504
            needed_mem += bep[constants.BE_MINMEM]
2505
        test = n_img.mfree < needed_mem
2506
        self._ErrorIf(test, constants.CV_ENODEN1,
2507
                      self.cfg.GetNodeName(node_uuid),
2508
                      "not enough memory to accomodate instance failovers"
2509
                      " should node %s fail (%dMiB needed, %dMiB available)",
2510
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2511

    
2512
  def _VerifyClientCertificates(self, nodes, all_nvinfo):
2513
    """Verifies the consistency of the client certificates.
2514

2515
    This includes several aspects:
2516
      - the individual validation of all nodes' certificates
2517
      - the consistency of the master candidate certificate map
2518
      - the consistency of the master candidate certificate map with the
2519
        certificates that the master candidates are actually using.
2520

2521
    @param nodes: the list of nodes to consider in this verification
2522
    @param all_nvinfo: the map of results of the verify_node call to
2523
      all nodes
2524

2525
    """
2526
    candidate_certs = self.cfg.GetClusterInfo().candidate_certs
2527
    if candidate_certs is None or len(candidate_certs) == 0:
2528
      self._ErrorIf(
2529
        True, constants.CV_ECLUSTERCLIENTCERT, None,
2530
        "The cluster's list of master candidate certificates is empty."
2531
        "If you just updated the cluster, please run"
2532
        " 'gnt-cluster renew-crypto --new-node-certificates'.")
2533
      return
2534

    
2535
    self._ErrorIf(
2536
      len(candidate_certs) != len(set(candidate_certs.values())),
2537
      constants.CV_ECLUSTERCLIENTCERT, None,
2538
      "There are at least two master candidates configured to use the same"
2539
      " certificate.")
2540

    
2541
    # collect the client certificate
2542
    for node in nodes:
2543
      if node.offline:
2544
        continue
2545

    
2546
      nresult = all_nvinfo[node.uuid]
2547
      if nresult.fail_msg or not nresult.payload:
2548
        continue
2549

    
2550
      (errcode, msg) = nresult.payload.get(constants.NV_CLIENT_CERT, None)
2551

    
2552
      self._ErrorIf(
2553
        errcode is not None, constants.CV_ECLUSTERCLIENTCERT, None,
2554
        "Client certificate of node '%s' failed validation: %s (code '%s')",
2555
        node.uuid, msg, errcode)
2556

    
2557
      if not errcode:
2558
        digest = msg
2559
        if node.master_candidate:
2560
          if node.uuid in candidate_certs:
2561
            self._ErrorIf(
2562
              digest != candidate_certs[node.uuid],
2563
              constants.CV_ECLUSTERCLIENTCERT, None,
2564
              "Client certificate digest of master candidate '%s' does not"
2565
              " match its entry in the cluster's map of master candidate"
2566
              " certificates. Expected: %s Got: %s", node.uuid,
2567
              digest, candidate_certs[node.uuid])
2568
          else:
2569
            self._ErrorIf(
2570
              True, constants.CV_ECLUSTERCLIENTCERT, None,
2571
              "The master candidate '%s' does not have an entry in the"
2572
              " map of candidate certificates.", node.uuid)
2573
            self._ErrorIf(
2574
              digest in candidate_certs.values(),
2575
              constants.CV_ECLUSTERCLIENTCERT, None,
2576
              "Master candidate '%s' is using a certificate of another node.",
2577
              node.uuid)
2578
        else:
2579
          self._ErrorIf(
2580
            node.uuid in candidate_certs,
2581
            constants.CV_ECLUSTERCLIENTCERT, None,
2582
            "Node '%s' is not a master candidate, but still listed in the"
2583
            " map of master candidate certificates.", node.uuid)
2584
          self._ErrorIf(
2585
            (node.uuid not in candidate_certs) and
2586
              (digest in candidate_certs.values()),
2587
            constants.CV_ECLUSTERCLIENTCERT, None,
2588
            "Node '%s' is not a master candidate and is incorrectly using a"
2589
            " certificate of another node which is master candidate.",
2590
            node.uuid)
2591

    
2592
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2593
                   (files_all, files_opt, files_mc, files_vm)):
2594
    """Verifies file checksums collected from all nodes.
2595

2596
    @param nodes: List of L{objects.Node} objects
2597
    @param master_node_uuid: UUID of master node
2598
    @param all_nvinfo: RPC results
2599

2600
    """
2601
    # Define functions determining which nodes to consider for a file
2602
    files2nodefn = [
2603
      (files_all, None),
2604
      (files_mc, lambda node: (node.master_candidate or
2605
                               node.uuid == master_node_uuid)),
2606
      (files_vm, lambda node: node.vm_capable),
2607
      ]
2608

    
2609
    # Build mapping from filename to list of nodes which should have the file
2610
    nodefiles = {}
2611
    for (files, fn) in files2nodefn:
2612
      if fn is None:
2613
        filenodes = nodes
2614
      else:
2615
        filenodes = filter(fn, nodes)
2616
      nodefiles.update((filename,
2617
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2618
                       for filename in files)
2619

    
2620
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2621

    
2622
    fileinfo = dict((filename, {}) for filename in nodefiles)
2623
    ignore_nodes = set()
2624

    
2625
    for node in nodes:
2626
      if node.offline:
2627
        ignore_nodes.add(node.uuid)
2628
        continue
2629

    
2630
      nresult = all_nvinfo[node.uuid]
2631

    
2632
      if nresult.fail_msg or not nresult.payload:
2633
        node_files = None
2634
      else:
2635
        fingerprints = nresult.payload.get(constants.NV_FILELIST, {})
2636
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2637
                          for (key, value) in fingerprints.items())
2638
        del fingerprints
2639

    
2640
      test = not (node_files and isinstance(node_files, dict))
2641
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2642
                    "Node did not return file checksum data")
2643
      if test:
2644
        ignore_nodes.add(node.uuid)
2645
        continue
2646

    
2647
      # Build per-checksum mapping from filename to nodes having it
2648
      for (filename, checksum) in node_files.items():
2649
        assert filename in nodefiles
2650
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2651

    
2652
    for (filename, checksums) in fileinfo.items():
2653
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2654

    
2655
      # Nodes having the file
2656
      with_file = frozenset(node_uuid
2657
                            for node_uuids in fileinfo[filename].values()
2658
                            for node_uuid in node_uuids) - ignore_nodes
2659

    
2660
      expected_nodes = nodefiles[filename] - ignore_nodes
2661

    
2662
      # Nodes missing file
2663
      missing_file = expected_nodes - with_file
2664

    
2665
      if filename in files_opt:
2666
        # All or no nodes
2667
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2668
                      constants.CV_ECLUSTERFILECHECK, None,
2669
                      "File %s is optional, but it must exist on all or no"
2670
                      " nodes (not found on %s)",
2671
                      filename,
2672
                      utils.CommaJoin(
2673
                        utils.NiceSort(
2674
                          map(self.cfg.GetNodeName, missing_file))))
2675
      else:
2676
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2677
                      "File %s is missing from node(s) %s", filename,
2678
                      utils.CommaJoin(
2679
                        utils.NiceSort(
2680
                          map(self.cfg.GetNodeName, missing_file))))
2681

    
2682
        # Warn if a node has a file it shouldn't
2683
        unexpected = with_file - expected_nodes
2684
        self._ErrorIf(unexpected,
2685
                      constants.CV_ECLUSTERFILECHECK, None,
2686
                      "File %s should not exist on node(s) %s",
2687
                      filename, utils.CommaJoin(
2688
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2689

    
2690
      # See if there are multiple versions of the file
2691
      test = len(checksums) > 1
2692
      if test:
2693
        variants = ["variant %s on %s" %
2694
                    (idx + 1,
2695
                     utils.CommaJoin(utils.NiceSort(
2696
                       map(self.cfg.GetNodeName, node_uuids))))
2697
                    for (idx, (checksum, node_uuids)) in
2698
                      enumerate(sorted(checksums.items()))]
2699
      else:
2700
        variants = []
2701

    
2702
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2703
                    "File %s found with %s different checksums (%s)",
2704
                    filename, len(checksums), "; ".join(variants))
2705

    
2706
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2707
    """Verify the drbd helper.
2708

2709
    """
2710
    if drbd_helper:
2711
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2712
      test = (helper_result is None)
2713
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2714
                    "no drbd usermode helper returned")
2715
      if helper_result:
2716
        status, payload = helper_result
2717
        test = not status
2718
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2719
                      "drbd usermode helper check unsuccessful: %s", payload)
2720
        test = status and (payload != drbd_helper)
2721
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2722
                      "wrong drbd usermode helper: %s", payload)
2723

    
2724
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2725
                      drbd_map):
2726
    """Verifies and the node DRBD status.
2727

2728
    @type ninfo: L{objects.Node}
2729
    @param ninfo: the node to check
2730
    @param nresult: the remote results for the node
2731
    @param instanceinfo: the dict of instances
2732
    @param drbd_helper: the configured DRBD usermode helper
2733
    @param drbd_map: the DRBD map as returned by
2734
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2735

2736
    """
2737
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2738

    
2739
    # compute the DRBD minors
2740
    node_drbd = {}
2741
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2742
      test = inst_uuid not in instanceinfo
2743
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2744
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2745
        # ghost instance should not be running, but otherwise we
2746
        # don't give double warnings (both ghost instance and
2747
        # unallocated minor in use)
2748
      if test:
2749
        node_drbd[minor] = (inst_uuid, False)
2750
      else:
2751
        instance = instanceinfo[inst_uuid]
2752
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2753

    
2754
    # and now check them
2755
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2756
    test = not isinstance(used_minors, (tuple, list))
2757
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2758
                  "cannot parse drbd status file: %s", str(used_minors))
2759
    if test:
2760
      # we cannot check drbd status
2761
      return
2762

    
2763
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2764
      test = minor not in used_minors and must_exist
2765
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2766
                    "drbd minor %d of instance %s is not active", minor,
2767
                    self.cfg.GetInstanceName(inst_uuid))
2768
    for minor in used_minors:
2769
      test = minor not in node_drbd
2770
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2771
                    "unallocated drbd minor %d is in use", minor)
2772

    
2773
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2774
    """Builds the node OS structures.
2775

2776
    @type ninfo: L{objects.Node}
2777
    @param ninfo: the node to check
2778
    @param nresult: the remote results for the node
2779
    @param nimg: the node image object
2780

2781
    """
2782
    remote_os = nresult.get(constants.NV_OSLIST, None)
2783
    test = (not isinstance(remote_os, list) or
2784
            not compat.all(isinstance(v, list) and len(v) == 7
2785
                           for v in remote_os))
2786

    
2787
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2788
                  "node hasn't returned valid OS data")
2789

    
2790
    nimg.os_fail = test
2791

    
2792
    if test:
2793
      return
2794

    
2795
    os_dict = {}
2796

    
2797
    for (name, os_path, status, diagnose,
2798
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2799

    
2800
      if name not in os_dict:
2801
        os_dict[name] = []
2802

    
2803
      # parameters is a list of lists instead of list of tuples due to
2804
      # JSON lacking a real tuple type, fix it:
2805
      parameters = [tuple(v) for v in parameters]
2806
      os_dict[name].append((os_path, status, diagnose,
2807
                            set(variants), set(parameters), set(api_ver)))
2808

    
2809
    nimg.oslist = os_dict
2810

    
2811
  def _VerifyNodeOS(self, ninfo, nimg, base):
2812
    """Verifies the node OS list.
2813

2814
    @type ninfo: L{objects.Node}
2815
    @param ninfo: the node to check
2816
    @param nimg: the node image object
2817
    @param base: the 'template' node we match against (e.g. from the master)
2818

2819
    """
2820
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2821

    
2822
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2823
    for os_name, os_data in nimg.oslist.items():
2824
      assert os_data, "Empty OS status for OS %s?!" % os_name
2825
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2826
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2827
                    "Invalid OS %s (located at %s): %s",
2828
                    os_name, f_path, f_diag)
2829
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2830
                    "OS '%s' has multiple entries"
2831
                    " (first one shadows the rest): %s",
2832
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2833
      # comparisons with the 'base' image
2834
      test = os_name not in base.oslist
2835
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2836
                    "Extra OS %s not present on reference node (%s)",
2837
                    os_name, self.cfg.GetNodeName(base.uuid))
2838
      if test:
2839
        continue
2840
      assert base.oslist[os_name], "Base node has empty OS status?"
2841
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2842
      if not b_status:
2843
        # base OS is invalid, skipping
2844
        continue
2845
      for kind, a, b in [("API version", f_api, b_api),
2846
                         ("variants list", f_var, b_var),
2847
                         ("parameters", beautify_params(f_param),
2848
                          beautify_params(b_param))]:
2849
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2850
                      "OS %s for %s differs from reference node %s:"
2851
                      " [%s] vs. [%s]", kind, os_name,
2852
                      self.cfg.GetNodeName(base.uuid),
2853
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2854

    
2855
    # check any missing OSes
2856
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2857
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2858
                  "OSes present on reference node %s"
2859
                  " but missing on this node: %s",
2860
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2861

    
2862
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2863
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2864

2865
    @type ninfo: L{objects.Node}
2866
    @param ninfo: the node to check
2867
    @param nresult: the remote results for the node
2868
    @type is_master: bool
2869
    @param is_master: Whether node is the master node
2870

2871
    """
2872
    cluster = self.cfg.GetClusterInfo()
2873
    if (is_master and
2874
        (cluster.IsFileStorageEnabled() or
2875
         cluster.IsSharedFileStorageEnabled())):
2876
      try:
2877
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2878
      except KeyError:
2879
        # This should never happen
2880
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2881
                      "Node did not return forbidden file storage paths")
2882
      else:
2883
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2884
                      "Found forbidden file storage paths: %s",
2885
                      utils.CommaJoin(fspaths))
2886
    else:
2887
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2888
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2889
                    "Node should not have returned forbidden file storage"
2890
                    " paths")
2891

    
2892
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2893
                          verify_key, error_key):
2894
    """Verifies (file) storage paths.
2895

2896
    @type ninfo: L{objects.Node}
2897
    @param ninfo: the node to check
2898
    @param nresult: the remote results for the node
2899
    @type file_disk_template: string
2900
    @param file_disk_template: file-based disk template, whose directory
2901
        is supposed to be verified
2902
    @type verify_key: string
2903
    @param verify_key: key for the verification map of this file
2904
        verification step
2905
    @param error_key: error key to be added to the verification results
2906
        in case something goes wrong in this verification step
2907

2908
    """
2909
    assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
2910
              constants.ST_FILE, constants.ST_SHARED_FILE
2911
           ))
2912

    
2913
    cluster = self.cfg.GetClusterInfo()
2914
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2915
      self._ErrorIf(
2916
          verify_key in nresult,
2917
          error_key, ninfo.name,
2918
          "The configured %s storage path is unusable: %s" %
2919
          (file_disk_template, nresult.get(verify_key)))
2920

    
2921
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2922
    """Verifies (file) storage paths.
2923

2924
    @see: C{_VerifyStoragePaths}
2925

2926
    """
2927
    self._VerifyStoragePaths(
2928
        ninfo, nresult, constants.DT_FILE,
2929
        constants.NV_FILE_STORAGE_PATH,
2930
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2931

    
2932
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2933
    """Verifies (file) storage paths.
2934

2935
    @see: C{_VerifyStoragePaths}
2936

2937
    """
2938
    self._VerifyStoragePaths(
2939
        ninfo, nresult, constants.DT_SHARED_FILE,
2940
        constants.NV_SHARED_FILE_STORAGE_PATH,
2941
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2942

    
2943
  def _VerifyOob(self, ninfo, nresult):
2944
    """Verifies out of band functionality of a node.
2945

2946
    @type ninfo: L{objects.Node}
2947
    @param ninfo: the node to check
2948
    @param nresult: the remote results for the node
2949

2950
    """
2951
    # We just have to verify the paths on master and/or master candidates
2952
    # as the oob helper is invoked on the master
2953
    if ((ninfo.master_candidate or ninfo.master_capable) and
2954
        constants.NV_OOB_PATHS in nresult):
2955
      for path_result in nresult[constants.NV_OOB_PATHS]:
2956
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2957
                      ninfo.name, path_result)
2958

    
2959
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2960
    """Verifies and updates the node volume data.
2961

2962
    This function will update a L{NodeImage}'s internal structures
2963
    with data from the remote call.
2964

2965
    @type ninfo: L{objects.Node}
2966
    @param ninfo: the node to check
2967
    @param nresult: the remote results for the node
2968
    @param nimg: the node image object
2969
    @param vg_name: the configured VG name
2970

2971
    """
2972
    nimg.lvm_fail = True
2973
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2974
    if vg_name is None:
2975
      pass
2976
    elif isinstance(lvdata, basestring):
2977
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2978
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2979
    elif not isinstance(lvdata, dict):
2980
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2981
                    "rpc call to node failed (lvlist)")
2982
    else:
2983
      nimg.volumes = lvdata
2984
      nimg.lvm_fail = False
2985

    
2986
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2987
    """Verifies and updates the node instance list.
2988

2989
    If the listing was successful, then updates this node's instance
2990
    list. Otherwise, it marks the RPC call as failed for the instance
2991
    list key.
2992

2993
    @type ninfo: L{objects.Node}
2994
    @param ninfo: the node to check
2995
    @param nresult: the remote results for the node
2996
    @param nimg: the node image object
2997

2998
    """
2999
    idata = nresult.get(constants.NV_INSTANCELIST, None)
3000
    test = not isinstance(idata, list)
3001
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
3002
                  "rpc call to node failed (instancelist): %s",
3003
                  utils.SafeEncode(str(idata)))
3004
    if test:
3005
      nimg.hyp_fail = True
3006
    else:
3007
      nimg.instances = [inst.uuid for (_, inst) in
3008
                        self.cfg.GetMultiInstanceInfoByName(idata)]
3009

    
3010
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
3011
    """Verifies and computes a node information map
3012

3013
    @type ninfo: L{objects.Node}
3014
    @param ninfo: the node to check
3015
    @param nresult: the remote results for the node
3016
    @param nimg: the node image object
3017
    @param vg_name: the configured VG name
3018

3019
    """
3020
    # try to read free memory (from the hypervisor)
3021
    hv_info = nresult.get(constants.NV_HVINFO, None)
3022
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
3023
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
3024
                  "rpc call to node failed (hvinfo)")
3025
    if not test:
3026
      try:
3027
        nimg.mfree = int(hv_info["memory_free"])
3028
      except (ValueError, TypeError):
3029
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
3030
                      "node returned invalid nodeinfo, check hypervisor")
3031

    
3032
    # FIXME: devise a free space model for file based instances as well
3033
    if vg_name is not None:
3034
      test = (constants.NV_VGLIST not in nresult or
3035
              vg_name not in nresult[constants.NV_VGLIST])
3036
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
3037
                    "node didn't return data for the volume group '%s'"
3038
                    " - it is either missing or broken", vg_name)
3039
      if not test:
3040
        try:
3041
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
3042
        except (ValueError, TypeError):
3043
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
3044
                        "node returned invalid LVM info, check LVM status")
3045

    
3046
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
3047
    """Gets per-disk status information for all instances.
3048

3049
    @type node_uuids: list of strings
3050
    @param node_uuids: Node UUIDs
3051
    @type node_image: dict of (UUID, L{objects.Node})
3052
    @param node_image: Node objects
3053
    @type instanceinfo: dict of (UUID, L{objects.Instance})
3054
    @param instanceinfo: Instance objects
3055
    @rtype: {instance: {node: [(succes, payload)]}}
3056
    @return: a dictionary of per-instance dictionaries with nodes as
3057
        keys and disk information as values; the disk information is a
3058
        list of tuples (success, payload)
3059

3060
    """
3061
    node_disks = {}
3062
    node_disks_dev_inst_only = {}
3063
    diskless_instances = set()
3064
    nodisk_instances = set()
3065
    diskless = constants.DT_DISKLESS
3066

    
3067
    for nuuid in node_uuids:
3068
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
3069
                                             node_image[nuuid].sinst))
3070
      diskless_instances.update(uuid for uuid in node_inst_uuids
3071
                                if instanceinfo[uuid].disk_template == diskless)
3072
      disks = [(inst_uuid, disk)
3073
               for inst_uuid in node_inst_uuids
3074
               for disk in instanceinfo[inst_uuid].disks]
3075

    
3076
      if not disks:
3077
        nodisk_instances.update(uuid for uuid in node_inst_uuids
3078
                                if instanceinfo[uuid].disk_template != diskless)
3079
        # No need to collect data
3080
        continue
3081

    
3082
      node_disks[nuuid] = disks
3083

    
3084
      # _AnnotateDiskParams makes already copies of the disks
3085
      dev_inst_only = []
3086
      for (inst_uuid, dev) in disks:
3087
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
3088
                                          self.cfg)
3089
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
3090

    
3091
      node_disks_dev_inst_only[nuuid] = dev_inst_only
3092

    
3093
    assert len(node_disks) == len(node_disks_dev_inst_only)
3094

    
3095
    # Collect data from all nodes with disks
3096
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
3097
               node_disks.keys(), node_disks_dev_inst_only)
3098

    
3099
    assert len(result) == len(node_disks)
3100

    
3101
    instdisk = {}
3102

    
3103
    for (nuuid, nres) in result.items():
3104
      node = self.cfg.GetNodeInfo(nuuid)
3105
      disks = node_disks[node.uuid]
3106

    
3107
      if nres.offline:
3108
        # No data from this node
3109
        data = len(disks) * [(False, "node offline")]
3110
      else:
3111
        msg = nres.fail_msg
3112
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
3113
                      "while getting disk information: %s", msg)
3114
        if msg:
3115
          # No data from this node
3116
          data = len(disks) * [(False, msg)]
3117
        else:
3118
          data = []
3119
          for idx, i in enumerate(nres.payload):
3120
            if isinstance(i, (tuple, list)) and len(i) == 2:
3121
              data.append(i)
3122
            else:
3123
              logging.warning("Invalid result from node %s, entry %d: %s",
3124
                              node.name, idx, i)
3125
              data.append((False, "Invalid result from the remote node"))
3126

    
3127
      for ((inst_uuid, _), status) in zip(disks, data):
3128
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
3129
          .append(status)
3130

    
3131
    # Add empty entries for diskless instances.
3132
    for inst_uuid in diskless_instances:
3133
      assert inst_uuid not in instdisk
3134
      instdisk[inst_uuid] = {}
3135
    # ...and disk-full instances that happen to have no disks
3136
    for inst_uuid in nodisk_instances:
3137
      assert inst_uuid not in instdisk
3138
      instdisk[inst_uuid] = {}
3139

    
3140
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
3141
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
3142
                      compat.all(isinstance(s, (tuple, list)) and
3143
                                 len(s) == 2 for s in statuses)
3144
                      for inst, nuuids in instdisk.items()
3145
                      for nuuid, statuses in nuuids.items())
3146
    if __debug__:
3147
      instdisk_keys = set(instdisk)
3148
      instanceinfo_keys = set(instanceinfo)
3149
      assert instdisk_keys == instanceinfo_keys, \
3150
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
3151
         (instdisk_keys, instanceinfo_keys))
3152

    
3153
    return instdisk
3154

    
3155
  @staticmethod
3156
  def _SshNodeSelector(group_uuid, all_nodes):
3157
    """Create endless iterators for all potential SSH check hosts.
3158

3159
    """
3160
    nodes = [node for node in all_nodes
3161
             if (node.group != group_uuid and
3162
                 not node.offline)]
3163
    keyfunc = operator.attrgetter("group")
3164

    
3165
    return map(itertools.cycle,
3166
               [sorted(map(operator.attrgetter("name"), names))
3167
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
3168
                                                  keyfunc)])
3169

    
3170
  @classmethod
3171
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
3172
    """Choose which nodes should talk to which other nodes.
3173

3174
    We will make nodes contact all nodes in their group, and one node from
3175
    every other group.
3176

3177
    @warning: This algorithm has a known issue if one node group is much
3178
      smaller than others (e.g. just one node). In such a case all other
3179
      nodes will talk to the single node.
3180

3181
    """
3182
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
3183
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
3184

    
3185
    return (online_nodes,
3186
            dict((name, sorted([i.next() for i in sel]))
3187
                 for name in online_nodes))
3188

    
3189
  def BuildHooksEnv(self):
3190
    """Build hooks env.
3191

3192
    Cluster-Verify hooks just ran in the post phase and their failure makes
3193
    the output be logged in the verify output and the verification to fail.
3194

3195
    """
3196
    env = {
3197
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
3198
      }
3199

    
3200
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
3201
               for node in self.my_node_info.values())
3202

    
3203
    return env
3204

    
3205
  def BuildHooksNodes(self):
3206
    """Build hooks nodes.
3207

3208
    """
3209
    return ([], list(self.my_node_info.keys()))
3210

    
3211
  def Exec(self, feedback_fn):
3212
    """Verify integrity of the node group, performing various test on nodes.
3213

3214
    """
3215
    # This method has too many local variables. pylint: disable=R0914
3216
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
3217

    
3218
    if not self.my_node_uuids:
3219
      # empty node group
3220
      feedback_fn("* Empty node group, skipping verification")
3221
      return True
3222

    
3223
    self.bad = False
3224
    verbose = self.op.verbose
3225
    self._feedback_fn = feedback_fn
3226

    
3227
    vg_name = self.cfg.GetVGName()
3228
    drbd_helper = self.cfg.GetDRBDHelper()
3229
    cluster = self.cfg.GetClusterInfo()
3230
    hypervisors = cluster.enabled_hypervisors
3231
    node_data_list = self.my_node_info.values()
3232

    
3233
    i_non_redundant = [] # Non redundant instances
3234
    i_non_a_balanced = [] # Non auto-balanced instances
3235
    i_offline = 0 # Count of offline instances
3236
    n_offline = 0 # Count of offline nodes
3237
    n_drained = 0 # Count of nodes being drained
3238
    node_vol_should = {}
3239

    
3240
    # FIXME: verify OS list
3241

    
3242
    # File verification
3243
    filemap = ComputeAncillaryFiles(cluster, False)
3244

    
3245
    # do local checksums
3246
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
3247
    master_ip = self.cfg.GetMasterIP()
3248

    
3249
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
3250

    
3251
    user_scripts = []
3252
    if self.cfg.GetUseExternalMipScript():
3253
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
3254

    
3255
    node_verify_param = {
3256
      constants.NV_FILELIST:
3257
        map(vcluster.MakeVirtualPath,
3258
            utils.UniqueSequence(filename
3259
                                 for files in filemap
3260
                                 for filename in files)),
3261
      constants.NV_NODELIST:
3262
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
3263
                                  self.all_node_info.values()),
3264
      constants.NV_HYPERVISOR: hypervisors,
3265
      constants.NV_HVPARAMS:
3266
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
3267
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
3268
                                 for node in node_data_list
3269
                                 if not node.offline],
3270
      constants.NV_INSTANCELIST: hypervisors,
3271
      constants.NV_VERSION: None,
3272
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
3273
      constants.NV_NODESETUP: None,
3274
      constants.NV_TIME: None,
3275
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
3276
      constants.NV_OSLIST: None,
3277
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
3278
      constants.NV_USERSCRIPTS: user_scripts,
3279
      constants.NV_CLIENT_CERT: None,
3280
      }
3281

    
3282
    if vg_name is not None:
3283
      node_verify_param[constants.NV_VGLIST] = None
3284
      node_verify_param[constants.NV_LVLIST] = vg_name
3285
      node_verify_param[constants.NV_PVLIST] = [vg_name]
3286

    
3287
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
3288
      if drbd_helper:
3289
        node_verify_param[constants.NV_DRBDVERSION] = None
3290
        node_verify_param[constants.NV_DRBDLIST] = None
3291
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
3292

    
3293
    if cluster.IsFileStorageEnabled() or \
3294
        cluster.IsSharedFileStorageEnabled():
3295
      # Load file storage paths only from master node
3296
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
3297
        self.cfg.GetMasterNodeName()
3298
      if cluster.IsFileStorageEnabled():
3299
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
3300
          cluster.file_storage_dir
3301

    
3302
    # bridge checks
3303
    # FIXME: this needs to be changed per node-group, not cluster-wide
3304
    bridges = set()
3305
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
3306
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3307
      bridges.add(default_nicpp[constants.NIC_LINK])
3308
    for inst_uuid in self.my_inst_info.values():
3309
      for nic in inst_uuid.nics:
3310
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
3311
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3312
          bridges.add(full_nic[constants.NIC_LINK])
3313

    
3314
    if bridges:
3315
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
3316

    
3317
    # Build our expected cluster state
3318
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
3319
                                                 uuid=node.uuid,
3320
                                                 vm_capable=node.vm_capable))
3321
                      for node in node_data_list)
3322

    
3323
    # Gather OOB paths
3324
    oob_paths = []
3325
    for node in self.all_node_info.values():
3326
      path = SupportsOob(self.cfg, node)
3327
      if path and path not in oob_paths:
3328
        oob_paths.append(path)
3329

    
3330
    if oob_paths:
3331
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
3332

    
3333
    for inst_uuid in self.my_inst_uuids:
3334
      instance = self.my_inst_info[inst_uuid]
3335
      if instance.admin_state == constants.ADMINST_OFFLINE:
3336
        i_offline += 1
3337

    
3338
      for nuuid in instance.all_nodes:
3339
        if nuuid not in node_image:
3340
          gnode = self.NodeImage(uuid=nuuid)
3341
          gnode.ghost = (nuuid not in self.all_node_info)
3342
          node_image[nuuid] = gnode
3343

    
3344
      instance.MapLVsByNode(node_vol_should)
3345

    
3346
      pnode = instance.primary_node
3347
      node_image[pnode].pinst.append(instance.uuid)
3348

    
3349
      for snode in instance.secondary_nodes:
3350
        nimg = node_image[snode]
3351
        nimg.sinst.append(instance.uuid)
3352
        if pnode not in nimg.sbp:
3353
          nimg.sbp[pnode] = []
3354
        nimg.sbp[pnode].append(instance.uuid)
3355

    
3356
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3357
                                               self.my_node_info.keys())
3358
    # The value of exclusive_storage should be the same across the group, so if
3359
    # it's True for at least a node, we act as if it were set for all the nodes
3360
    self._exclusive_storage = compat.any(es_flags.values())
3361
    if self._exclusive_storage:
3362
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3363

    
3364
    node_group_uuids = dict(map(lambda n: (n.name, n.group),
3365
                                self.cfg.GetAllNodesInfo().values()))
3366
    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
3367

    
3368
    # At this point, we have the in-memory data structures complete,
3369
    # except for the runtime information, which we'll gather next
3370

    
3371
    # Due to the way our RPC system works, exact response times cannot be
3372
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3373
    # time before and after executing the request, we can at least have a time
3374
    # window.
3375
    nvinfo_starttime = time.time()
3376
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3377
                                           node_verify_param,
3378
                                           self.cfg.GetClusterName(),
3379
                                           self.cfg.GetClusterInfo().hvparams,
3380
                                           node_group_uuids,
3381
                                           groups_config)
3382
    nvinfo_endtime = time.time()
3383

    
3384
    if self.extra_lv_nodes and vg_name is not None:
3385
      extra_lv_nvinfo = \
3386
          self.rpc.call_node_verify(self.extra_lv_nodes,
3387
                                    {constants.NV_LVLIST: vg_name},
3388
                                    self.cfg.GetClusterName(),
3389
                                    self.cfg.GetClusterInfo().hvparams,
3390
                                    node_group_uuids,
3391
                                    groups_config)
3392
    else:
3393
      extra_lv_nvinfo = {}
3394

    
3395
    all_drbd_map = self.cfg.ComputeDRBDMap()
3396

    
3397
    feedback_fn("* Gathering disk information (%s nodes)" %
3398
                len(self.my_node_uuids))
3399
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3400
                                     self.my_inst_info)
3401

    
3402
    feedback_fn("* Verifying configuration file consistency")
3403

    
3404
    self._VerifyClientCertificates(self.my_node_info.values(), all_nvinfo)
3405
    # If not all nodes are being checked, we need to make sure the master node
3406
    # and a non-checked vm_capable node are in the list.
3407
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3408
    if absent_node_uuids:
3409
      vf_nvinfo = all_nvinfo.copy()
3410
      vf_node_info = list(self.my_node_info.values())
3411
      additional_node_uuids = []
3412
      if master_node_uuid not in self.my_node_info:
3413
        additional_node_uuids.append(master_node_uuid)
3414
        vf_node_info.append(self.all_node_info[master_node_uuid])
3415
      # Add the first vm_capable node we find which is not included,
3416
      # excluding the master node (which we already have)
3417
      for node_uuid in absent_node_uuids:
3418
        nodeinfo = self.all_node_info[node_uuid]
3419
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3420
            node_uuid != master_node_uuid):
3421
          additional_node_uuids.append(node_uuid)
3422
          vf_node_info.append(self.all_node_info[node_uuid])
3423
          break
3424
      key = constants.NV_FILELIST
3425
      vf_nvinfo.update(self.rpc.call_node_verify(
3426
         additional_node_uuids, {key: node_verify_param[key]},
3427
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
3428
         node_group_uuids,
3429
         groups_config))
3430
    else:
3431
      vf_nvinfo = all_nvinfo
3432
      vf_node_info = self.my_node_info.values()
3433

    
3434
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3435

    
3436
    feedback_fn("* Verifying node status")
3437

    
3438
    refos_img = None
3439

    
3440
    for node_i in node_data_list:
3441
      nimg = node_image[node_i.uuid]
3442

    
3443
      if node_i.offline:
3444
        if verbose:
3445
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3446
        n_offline += 1
3447
        continue
3448

    
3449
      if node_i.uuid == master_node_uuid:
3450
        ntype = "master"
3451
      elif node_i.master_candidate:
3452
        ntype = "master candidate"
3453
      elif node_i.drained:
3454
        ntype = "drained"
3455
        n_drained += 1
3456
      else:
3457
        ntype = "regular"
3458
      if verbose:
3459
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3460

    
3461
      msg = all_nvinfo[node_i.uuid].fail_msg
3462
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3463
                    "while contacting node: %s", msg)
3464
      if msg:
3465
        nimg.rpc_fail = True
3466
        continue
3467

    
3468
      nresult = all_nvinfo[node_i.uuid].payload
3469

    
3470
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3471
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3472
      self._VerifyNodeNetwork(node_i, nresult)
3473
      self._VerifyNodeUserScripts(node_i, nresult)
3474
      self._VerifyOob(node_i, nresult)
3475
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3476
                                           node_i.uuid == master_node_uuid)
3477
      self._VerifyFileStoragePaths(node_i, nresult)
3478
      self._VerifySharedFileStoragePaths(node_i, nresult)
3479

    
3480
      if nimg.vm_capable:
3481
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3482
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3483
                             all_drbd_map)
3484

    
3485
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3486
        self._UpdateNodeInstances(node_i, nresult, nimg)
3487
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3488
        self._UpdateNodeOS(node_i, nresult, nimg)
3489

    
3490
        if not nimg.os_fail:
3491
          if refos_img is None:
3492
            refos_img = nimg
3493
          self._VerifyNodeOS(node_i, nimg, refos_img)
3494
        self._VerifyNodeBridges(node_i, nresult, bridges)
3495

    
3496
        # Check whether all running instances are primary for the node. (This
3497
        # can no longer be done from _VerifyInstance below, since some of the
3498
        # wrong instances could be from other node groups.)
3499
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3500

    
3501
        for inst_uuid in non_primary_inst_uuids:
3502
          test = inst_uuid in self.all_inst_info
3503
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3504
                        self.cfg.GetInstanceName(inst_uuid),
3505
                        "instance should not run on node %s", node_i.name)
3506
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3507
                        "node is running unknown instance %s", inst_uuid)
3508

    
3509
    self._VerifyGroupDRBDVersion(all_nvinfo)
3510
    self._VerifyGroupLVM(node_image, vg_name)
3511

    
3512
    for node_uuid, result in extra_lv_nvinfo.items():
3513
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3514
                              node_image[node_uuid], vg_name)
3515

    
3516
    feedback_fn("* Verifying instance status")
3517
    for inst_uuid in self.my_inst_uuids:
3518
      instance = self.my_inst_info[inst_uuid]
3519
      if verbose:
3520
        feedback_fn("* Verifying instance %s" % instance.name)
3521
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3522

    
3523
      # If the instance is non-redundant we cannot survive losing its primary
3524
      # node, so we are not N+1 compliant.
3525
      if instance.disk_template not in constants.DTS_MIRRORED:
3526
        i_non_redundant.append(instance)
3527

    
3528
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3529
        i_non_a_balanced.append(instance)
3530

    
3531
    feedback_fn("* Verifying orphan volumes")
3532
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3533

    
3534
    # We will get spurious "unknown volume" warnings if any node of this group
3535
    # is secondary for an instance whose primary is in another group. To avoid
3536
    # them, we find these instances and add their volumes to node_vol_should.
3537
    for instance in self.all_inst_info.values():
3538
      for secondary in instance.secondary_nodes:
3539
        if (secondary in self.my_node_info
3540
            and instance.name not in self.my_inst_info):
3541
          instance.MapLVsByNode(node_vol_should)
3542
          break
3543

    
3544
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3545

    
3546
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3547
      feedback_fn("* Verifying N+1 Memory redundancy")
3548
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3549

    
3550
    feedback_fn("* Other Notes")
3551
    if i_non_redundant:
3552
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3553
                  % len(i_non_redundant))
3554

    
3555
    if i_non_a_balanced:
3556
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3557
                  % len(i_non_a_balanced))
3558

    
3559
    if i_offline:
3560
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3561

    
3562
    if n_offline:
3563
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3564

    
3565
    if n_drained:
3566
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3567

    
3568
    return not self.bad
3569

    
3570
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3571
    """Analyze the post-hooks' result
3572

3573
    This method analyses the hook result, handles it, and sends some
3574
    nicely-formatted feedback back to the user.
3575

3576
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3577
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3578
    @param hooks_results: the results of the multi-node hooks rpc call
3579
    @param feedback_fn: function used send feedback back to the caller
3580
    @param lu_result: previous Exec result
3581
    @return: the new Exec result, based on the previous result
3582
        and hook results
3583

3584
    """
3585
    # We only really run POST phase hooks, only for non-empty groups,
3586
    # and are only interested in their results
3587
    if not self.my_node_uuids:
3588
      # empty node group
3589
      pass
3590
    elif phase == constants.HOOKS_PHASE_POST:
3591
      # Used to change hooks' output to proper indentation
3592
      feedback_fn("* Hooks Results")
3593
      assert hooks_results, "invalid result from hooks"
3594

    
3595
      for node_name in hooks_results:
3596
        res = hooks_results[node_name]
3597
        msg = res.fail_msg
3598
        test = msg and not res.offline
3599
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3600
                      "Communication failure in hooks execution: %s", msg)
3601
        if res.offline or msg:
3602
          # No need to investigate payload if node is offline or gave
3603
          # an error.
3604
          continue
3605
        for script, hkr, output in res.payload:
3606
          test = hkr == constants.HKR_FAIL
3607
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3608
                        "Script %s failed, output:", script)
3609
          if test:
3610
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3611
            feedback_fn("%s" % output)
3612
            lu_result = False
3613

    
3614
    return lu_result
3615

    
3616

    
3617
class LUClusterVerifyDisks(NoHooksLU):
3618
  """Verifies the cluster disks status.
3619

3620
  """
3621
  REQ_BGL = False
3622

    
3623
  def ExpandNames(self):
3624
    self.share_locks = ShareAll()
3625
    self.needed_locks = {
3626
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3627
      }
3628

    
3629
  def Exec(self, feedback_fn):
3630
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3631

    
3632
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3633
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3634
                           for group in group_names])