Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 363e2869

History | View | Annotate | Download (136.3 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013, 2014 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import copy
25
import itertools
26
import logging
27
import operator
28
import os
29
import re
30
import time
31

    
32
from ganeti import compat
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import hypervisor
36
from ganeti import locking
37
from ganeti import masterd
38
from ganeti import netutils
39
from ganeti import objects
40
from ganeti import opcodes
41
from ganeti import pathutils
42
from ganeti import query
43
import ganeti.rpc.node as rpc
44
from ganeti import runtime
45
from ganeti import ssh
46
from ganeti import uidpool
47
from ganeti import utils
48
from ganeti import vcluster
49

    
50
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
51
  ResultWithJobs
52
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
53
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
54
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
55
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
56
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
57
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
58
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
59
  CheckDiskAccessModeConsistency, CreateNewClientCert, \
60
  AddInstanceCommunicationNetworkOp, ConnectInstanceCommunicationNetworkOp
61

    
62
import ganeti.masterd.instance
63

    
64

    
65
def _UpdateMasterClientCert(
66
    lu, master_uuid, cluster, feedback_fn,
67
    client_cert=pathutils.NODED_CLIENT_CERT_FILE,
68
    client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP):
69
  """Renews the master's client certificate and propagates the config.
70

71
  @type lu: C{LogicalUnit}
72
  @param lu: the logical unit holding the config
73
  @type master_uuid: string
74
  @param master_uuid: the master node's UUID
75
  @type cluster: C{objects.Cluster}
76
  @param cluster: the cluster's configuration
77
  @type feedback_fn: function
78
  @param feedback_fn: feedback functions for config updates
79
  @type client_cert: string
80
  @param client_cert: the path of the client certificate
81
  @type client_cert_tmp: string
82
  @param client_cert_tmp: the temporary path of the client certificate
83
  @rtype: string
84
  @return: the digest of the newly created client certificate
85

86
  """
87
  client_digest = CreateNewClientCert(lu, master_uuid, filename=client_cert_tmp)
88
  utils.AddNodeToCandidateCerts(master_uuid, client_digest,
89
                                cluster.candidate_certs)
90
  # This triggers an update of the config and distribution of it with the old
91
  # SSL certificate
92
  lu.cfg.Update(cluster, feedback_fn)
93

    
94
  utils.RemoveFile(client_cert)
95
  utils.RenameFile(client_cert_tmp, client_cert)
96
  return client_digest
97

    
98

    
99
class LUClusterRenewCrypto(NoHooksLU):
100
  """Renew the cluster's crypto tokens.
101

102
  Note that most of this operation is done in gnt_cluster.py, this LU only
103
  takes care of the renewal of the client SSL certificates.
104

105
  """
106
  def Exec(self, feedback_fn):
107
    master_uuid = self.cfg.GetMasterNode()
108
    cluster = self.cfg.GetClusterInfo()
109

    
110
    server_digest = utils.GetCertificateDigest(
111
      cert_filename=pathutils.NODED_CERT_FILE)
112
    utils.AddNodeToCandidateCerts("%s-SERVER" % master_uuid,
113
                                  server_digest,
114
                                  cluster.candidate_certs)
115
    try:
116
      old_master_digest = utils.GetCertificateDigest(
117
        cert_filename=pathutils.NODED_CLIENT_CERT_FILE)
118
      utils.AddNodeToCandidateCerts("%s-OLDMASTER" % master_uuid,
119
                                    old_master_digest,
120
                                    cluster.candidate_certs)
121
    except IOError:
122
      logging.info("No old certificate available.")
123

    
124
    new_master_digest = _UpdateMasterClientCert(self, master_uuid, cluster,
125
                                                feedback_fn)
126

    
127
    utils.AddNodeToCandidateCerts(master_uuid,
128
                                  new_master_digest,
129
                                  cluster.candidate_certs)
130
    nodes = self.cfg.GetAllNodesInfo()
131
    for (node_uuid, node_info) in nodes.items():
132
      if node_uuid != master_uuid:
133
        new_digest = CreateNewClientCert(self, node_uuid)
134
        if node_info.master_candidate:
135
          utils.AddNodeToCandidateCerts(node_uuid,
136
                                        new_digest,
137
                                        cluster.candidate_certs)
138
    utils.RemoveNodeFromCandidateCerts("%s-SERVER" % master_uuid,
139
                                       cluster.candidate_certs)
140
    utils.RemoveNodeFromCandidateCerts("%s-OLDMASTER" % master_uuid,
141
                                       cluster.candidate_certs)
142
    # Trigger another update of the config now with the new master cert
143
    self.cfg.Update(cluster, feedback_fn)
144

    
145

    
146
class LUClusterActivateMasterIp(NoHooksLU):
147
  """Activate the master IP on the master node.
148

149
  """
150
  def Exec(self, feedback_fn):
151
    """Activate the master IP.
152

153
    """
154
    master_params = self.cfg.GetMasterNetworkParameters()
155
    ems = self.cfg.GetUseExternalMipScript()
156
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
157
                                                   master_params, ems)
158
    result.Raise("Could not activate the master IP")
159

    
160

    
161
class LUClusterDeactivateMasterIp(NoHooksLU):
162
  """Deactivate the master IP on the master node.
163

164
  """
165
  def Exec(self, feedback_fn):
166
    """Deactivate the master IP.
167

168
    """
169
    master_params = self.cfg.GetMasterNetworkParameters()
170
    ems = self.cfg.GetUseExternalMipScript()
171
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
172
                                                     master_params, ems)
173
    result.Raise("Could not deactivate the master IP")
174

    
175

    
176
class LUClusterConfigQuery(NoHooksLU):
177
  """Return configuration values.
178

179
  """
180
  REQ_BGL = False
181

    
182
  def CheckArguments(self):
183
    self.cq = ClusterQuery(None, self.op.output_fields, False)
184

    
185
  def ExpandNames(self):
186
    self.cq.ExpandNames(self)
187

    
188
  def DeclareLocks(self, level):
189
    self.cq.DeclareLocks(self, level)
190

    
191
  def Exec(self, feedback_fn):
192
    result = self.cq.OldStyleQuery(self)
193

    
194
    assert len(result) == 1
195

    
196
    return result[0]
197

    
198

    
199
class LUClusterDestroy(LogicalUnit):
200
  """Logical unit for destroying the cluster.
201

202
  """
203
  HPATH = "cluster-destroy"
204
  HTYPE = constants.HTYPE_CLUSTER
205

    
206
  def BuildHooksEnv(self):
207
    """Build hooks env.
208

209
    """
210
    return {
211
      "OP_TARGET": self.cfg.GetClusterName(),
212
      }
213

    
214
  def BuildHooksNodes(self):
215
    """Build hooks nodes.
216

217
    """
218
    return ([], [])
219

    
220
  def CheckPrereq(self):
221
    """Check prerequisites.
222

223
    This checks whether the cluster is empty.
224

225
    Any errors are signaled by raising errors.OpPrereqError.
226

227
    """
228
    master = self.cfg.GetMasterNode()
229

    
230
    nodelist = self.cfg.GetNodeList()
231
    if len(nodelist) != 1 or nodelist[0] != master:
232
      raise errors.OpPrereqError("There are still %d node(s) in"
233
                                 " this cluster." % (len(nodelist) - 1),
234
                                 errors.ECODE_INVAL)
235
    instancelist = self.cfg.GetInstanceList()
236
    if instancelist:
237
      raise errors.OpPrereqError("There are still %d instance(s) in"
238
                                 " this cluster." % len(instancelist),
239
                                 errors.ECODE_INVAL)
240

    
241
  def Exec(self, feedback_fn):
242
    """Destroys the cluster.
243

244
    """
245
    master_params = self.cfg.GetMasterNetworkParameters()
246

    
247
    # Run post hooks on master node before it's removed
248
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
249

    
250
    ems = self.cfg.GetUseExternalMipScript()
251
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
252
                                                     master_params, ems)
253
    result.Warn("Error disabling the master IP address", self.LogWarning)
254
    return master_params.uuid
255

    
256

    
257
class LUClusterPostInit(LogicalUnit):
258
  """Logical unit for running hooks after cluster initialization.
259

260
  """
261
  HPATH = "cluster-init"
262
  HTYPE = constants.HTYPE_CLUSTER
263

    
264
  def CheckArguments(self):
265
    self.master_uuid = self.cfg.GetMasterNode()
266
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
267

    
268
    # TODO: When Issue 584 is solved, and None is properly parsed when used
269
    # as a default value, ndparams.get(.., None) can be changed to
270
    # ndparams[..] to access the values directly
271

    
272
    # OpenvSwitch: Warn user if link is missing
273
    if (self.master_ndparams[constants.ND_OVS] and not
274
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
275
      self.LogInfo("No physical interface for OpenvSwitch was given."
276
                   " OpenvSwitch will not have an outside connection. This"
277
                   " might not be what you want.")
278

    
279
  def BuildHooksEnv(self):
280
    """Build hooks env.
281

282
    """
283
    return {
284
      "OP_TARGET": self.cfg.GetClusterName(),
285
      }
286

    
287
  def BuildHooksNodes(self):
288
    """Build hooks nodes.
289

290
    """
291
    return ([], [self.cfg.GetMasterNode()])
292

    
293
  def Exec(self, feedback_fn):
294
    """Create and configure Open vSwitch
295

296
    """
297
    if self.master_ndparams[constants.ND_OVS]:
298
      result = self.rpc.call_node_configure_ovs(
299
                 self.master_uuid,
300
                 self.master_ndparams[constants.ND_OVS_NAME],
301
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
302
      result.Raise("Could not successully configure Open vSwitch")
303

    
304
    cluster = self.cfg.GetClusterInfo()
305
    _UpdateMasterClientCert(self, self.master_uuid, cluster, feedback_fn)
306

    
307
    return True
308

    
309

    
310
class ClusterQuery(QueryBase):
311
  FIELDS = query.CLUSTER_FIELDS
312

    
313
  #: Do not sort (there is only one item)
314
  SORT_FIELD = None
315

    
316
  def ExpandNames(self, lu):
317
    lu.needed_locks = {}
318

    
319
    # The following variables interact with _QueryBase._GetNames
320
    self.wanted = locking.ALL_SET
321
    self.do_locking = self.use_locking
322

    
323
    if self.do_locking:
324
      raise errors.OpPrereqError("Can not use locking for cluster queries",
325
                                 errors.ECODE_INVAL)
326

    
327
  def DeclareLocks(self, lu, level):
328
    pass
329

    
330
  def _GetQueryData(self, lu):
331
    """Computes the list of nodes and their attributes.
332

333
    """
334
    # Locking is not used
335
    assert not (compat.any(lu.glm.is_owned(level)
336
                           for level in locking.LEVELS
337
                           if level != locking.LEVEL_CLUSTER) or
338
                self.do_locking or self.use_locking)
339

    
340
    if query.CQ_CONFIG in self.requested_data:
341
      cluster = lu.cfg.GetClusterInfo()
342
      nodes = lu.cfg.GetAllNodesInfo()
343
    else:
344
      cluster = NotImplemented
345
      nodes = NotImplemented
346

    
347
    if query.CQ_QUEUE_DRAINED in self.requested_data:
348
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
349
    else:
350
      drain_flag = NotImplemented
351

    
352
    if query.CQ_WATCHER_PAUSE in self.requested_data:
353
      master_node_uuid = lu.cfg.GetMasterNode()
354

    
355
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
356
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
357
                   lu.cfg.GetMasterNodeName())
358

    
359
      watcher_pause = result.payload
360
    else:
361
      watcher_pause = NotImplemented
362

    
363
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
364

    
365

    
366
class LUClusterQuery(NoHooksLU):
367
  """Query cluster configuration.
368

369
  """
370
  REQ_BGL = False
371

    
372
  def ExpandNames(self):
373
    self.needed_locks = {}
374

    
375
  def Exec(self, feedback_fn):
376
    """Return cluster config.
377

378
    """
379
    cluster = self.cfg.GetClusterInfo()
380
    os_hvp = {}
381

    
382
    # Filter just for enabled hypervisors
383
    for os_name, hv_dict in cluster.os_hvp.items():
384
      os_hvp[os_name] = {}
385
      for hv_name, hv_params in hv_dict.items():
386
        if hv_name in cluster.enabled_hypervisors:
387
          os_hvp[os_name][hv_name] = hv_params
388

    
389
    # Convert ip_family to ip_version
390
    primary_ip_version = constants.IP4_VERSION
391
    if cluster.primary_ip_family == netutils.IP6Address.family:
392
      primary_ip_version = constants.IP6_VERSION
393

    
394
    result = {
395
      "software_version": constants.RELEASE_VERSION,
396
      "protocol_version": constants.PROTOCOL_VERSION,
397
      "config_version": constants.CONFIG_VERSION,
398
      "os_api_version": max(constants.OS_API_VERSIONS),
399
      "export_version": constants.EXPORT_VERSION,
400
      "vcs_version": constants.VCS_VERSION,
401
      "architecture": runtime.GetArchInfo(),
402
      "name": cluster.cluster_name,
403
      "master": self.cfg.GetMasterNodeName(),
404
      "default_hypervisor": cluster.primary_hypervisor,
405
      "enabled_hypervisors": cluster.enabled_hypervisors,
406
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
407
                        for hypervisor_name in cluster.enabled_hypervisors]),
408
      "os_hvp": os_hvp,
409
      "beparams": cluster.beparams,
410
      "osparams": cluster.osparams,
411
      "ipolicy": cluster.ipolicy,
412
      "nicparams": cluster.nicparams,
413
      "ndparams": cluster.ndparams,
414
      "diskparams": cluster.diskparams,
415
      "candidate_pool_size": cluster.candidate_pool_size,
416
      "max_running_jobs": cluster.max_running_jobs,
417
      "mac_prefix": cluster.mac_prefix,
418
      "master_netdev": cluster.master_netdev,
419
      "master_netmask": cluster.master_netmask,
420
      "use_external_mip_script": cluster.use_external_mip_script,
421
      "volume_group_name": cluster.volume_group_name,
422
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
423
      "file_storage_dir": cluster.file_storage_dir,
424
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
425
      "maintain_node_health": cluster.maintain_node_health,
426
      "ctime": cluster.ctime,
427
      "mtime": cluster.mtime,
428
      "uuid": cluster.uuid,
429
      "tags": list(cluster.GetTags()),
430
      "uid_pool": cluster.uid_pool,
431
      "default_iallocator": cluster.default_iallocator,
432
      "default_iallocator_params": cluster.default_iallocator_params,
433
      "reserved_lvs": cluster.reserved_lvs,
434
      "primary_ip_version": primary_ip_version,
435
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
436
      "hidden_os": cluster.hidden_os,
437
      "blacklisted_os": cluster.blacklisted_os,
438
      "enabled_disk_templates": cluster.enabled_disk_templates,
439
      "instance_communication_network": cluster.instance_communication_network,
440
      }
441

    
442
    return result
443

    
444

    
445
class LUClusterRedistConf(NoHooksLU):
446
  """Force the redistribution of cluster configuration.
447

448
  This is a very simple LU.
449

450
  """
451
  REQ_BGL = False
452

    
453
  def ExpandNames(self):
454
    self.needed_locks = {
455
      locking.LEVEL_NODE: locking.ALL_SET,
456
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
457
    }
458
    self.share_locks = ShareAll()
459

    
460
  def Exec(self, feedback_fn):
461
    """Redistribute the configuration.
462

463
    """
464
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
465
    RedistributeAncillaryFiles(self)
466

    
467

    
468
class LUClusterRename(LogicalUnit):
469
  """Rename the cluster.
470

471
  """
472
  HPATH = "cluster-rename"
473
  HTYPE = constants.HTYPE_CLUSTER
474

    
475
  def BuildHooksEnv(self):
476
    """Build hooks env.
477

478
    """
479
    return {
480
      "OP_TARGET": self.cfg.GetClusterName(),
481
      "NEW_NAME": self.op.name,
482
      }
483

    
484
  def BuildHooksNodes(self):
485
    """Build hooks nodes.
486

487
    """
488
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
489

    
490
  def CheckPrereq(self):
491
    """Verify that the passed name is a valid one.
492

493
    """
494
    hostname = netutils.GetHostname(name=self.op.name,
495
                                    family=self.cfg.GetPrimaryIPFamily())
496

    
497
    new_name = hostname.name
498
    self.ip = new_ip = hostname.ip
499
    old_name = self.cfg.GetClusterName()
500
    old_ip = self.cfg.GetMasterIP()
501
    if new_name == old_name and new_ip == old_ip:
502
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
503
                                 " cluster has changed",
504
                                 errors.ECODE_INVAL)
505
    if new_ip != old_ip:
506
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
507
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
508
                                   " reachable on the network" %
509
                                   new_ip, errors.ECODE_NOTUNIQUE)
510

    
511
    self.op.name = new_name
512

    
513
  def Exec(self, feedback_fn):
514
    """Rename the cluster.
515

516
    """
517
    clustername = self.op.name
518
    new_ip = self.ip
519

    
520
    # shutdown the master IP
521
    master_params = self.cfg.GetMasterNetworkParameters()
522
    ems = self.cfg.GetUseExternalMipScript()
523
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
524
                                                     master_params, ems)
525
    result.Raise("Could not disable the master role")
526

    
527
    try:
528
      cluster = self.cfg.GetClusterInfo()
529
      cluster.cluster_name = clustername
530
      cluster.master_ip = new_ip
531
      self.cfg.Update(cluster, feedback_fn)
532

    
533
      # update the known hosts file
534
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
535
      node_list = self.cfg.GetOnlineNodeList()
536
      try:
537
        node_list.remove(master_params.uuid)
538
      except ValueError:
539
        pass
540
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
541
    finally:
542
      master_params.ip = new_ip
543
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
544
                                                     master_params, ems)
545
      result.Warn("Could not re-enable the master role on the master,"
546
                  " please restart manually", self.LogWarning)
547

    
548
    return clustername
549

    
550

    
551
class LUClusterRepairDiskSizes(NoHooksLU):
552
  """Verifies the cluster disks sizes.
553

554
  """
555
  REQ_BGL = False
556

    
557
  def ExpandNames(self):
558
    if self.op.instances:
559
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
560
      # Not getting the node allocation lock as only a specific set of
561
      # instances (and their nodes) is going to be acquired
562
      self.needed_locks = {
563
        locking.LEVEL_NODE_RES: [],
564
        locking.LEVEL_INSTANCE: self.wanted_names,
565
        }
566
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
567
    else:
568
      self.wanted_names = None
569
      self.needed_locks = {
570
        locking.LEVEL_NODE_RES: locking.ALL_SET,
571
        locking.LEVEL_INSTANCE: locking.ALL_SET,
572

    
573
        # This opcode is acquires the node locks for all instances
574
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
575
        }
576

    
577
    self.share_locks = {
578
      locking.LEVEL_NODE_RES: 1,
579
      locking.LEVEL_INSTANCE: 0,
580
      locking.LEVEL_NODE_ALLOC: 1,
581
      }
582

    
583
  def DeclareLocks(self, level):
584
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
585
      self._LockInstancesNodes(primary_only=True, level=level)
586

    
587
  def CheckPrereq(self):
588
    """Check prerequisites.
589

590
    This only checks the optional instance list against the existing names.
591

592
    """
593
    if self.wanted_names is None:
594
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
595

    
596
    self.wanted_instances = \
597
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
598

    
599
  def _EnsureChildSizes(self, disk):
600
    """Ensure children of the disk have the needed disk size.
601

602
    This is valid mainly for DRBD8 and fixes an issue where the
603
    children have smaller disk size.
604

605
    @param disk: an L{ganeti.objects.Disk} object
606

607
    """
608
    if disk.dev_type == constants.DT_DRBD8:
609
      assert disk.children, "Empty children for DRBD8?"
610
      fchild = disk.children[0]
611
      mismatch = fchild.size < disk.size
612
      if mismatch:
613
        self.LogInfo("Child disk has size %d, parent %d, fixing",
614
                     fchild.size, disk.size)
615
        fchild.size = disk.size
616

    
617
      # and we recurse on this child only, not on the metadev
618
      return self._EnsureChildSizes(fchild) or mismatch
619
    else:
620
      return False
621

    
622
  def Exec(self, feedback_fn):
623
    """Verify the size of cluster disks.
624

625
    """
626
    # TODO: check child disks too
627
    # TODO: check differences in size between primary/secondary nodes
628
    per_node_disks = {}
629
    for instance in self.wanted_instances:
630
      pnode = instance.primary_node
631
      if pnode not in per_node_disks:
632
        per_node_disks[pnode] = []
633
      for idx, disk in enumerate(instance.disks):
634
        per_node_disks[pnode].append((instance, idx, disk))
635

    
636
    assert not (frozenset(per_node_disks.keys()) -
637
                self.owned_locks(locking.LEVEL_NODE_RES)), \
638
      "Not owning correct locks"
639
    assert not self.owned_locks(locking.LEVEL_NODE)
640

    
641
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
642
                                               per_node_disks.keys())
643

    
644
    changed = []
645
    for node_uuid, dskl in per_node_disks.items():
646
      if not dskl:
647
        # no disks on the node
648
        continue
649

    
650
      newl = [([v[2].Copy()], v[0]) for v in dskl]
651
      node_name = self.cfg.GetNodeName(node_uuid)
652
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
653
      if result.fail_msg:
654
        self.LogWarning("Failure in blockdev_getdimensions call to node"
655
                        " %s, ignoring", node_name)
656
        continue
657
      if len(result.payload) != len(dskl):
658
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
659
                        " result.payload=%s", node_name, len(dskl),
660
                        result.payload)
661
        self.LogWarning("Invalid result from node %s, ignoring node results",
662
                        node_name)
663
        continue
664
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
665
        if dimensions is None:
666
          self.LogWarning("Disk %d of instance %s did not return size"
667
                          " information, ignoring", idx, instance.name)
668
          continue
669
        if not isinstance(dimensions, (tuple, list)):
670
          self.LogWarning("Disk %d of instance %s did not return valid"
671
                          " dimension information, ignoring", idx,
672
                          instance.name)
673
          continue
674
        (size, spindles) = dimensions
675
        if not isinstance(size, (int, long)):
676
          self.LogWarning("Disk %d of instance %s did not return valid"
677
                          " size information, ignoring", idx, instance.name)
678
          continue
679
        size = size >> 20
680
        if size != disk.size:
681
          self.LogInfo("Disk %d of instance %s has mismatched size,"
682
                       " correcting: recorded %d, actual %d", idx,
683
                       instance.name, disk.size, size)
684
          disk.size = size
685
          self.cfg.Update(instance, feedback_fn)
686
          changed.append((instance.name, idx, "size", size))
687
        if es_flags[node_uuid]:
688
          if spindles is None:
689
            self.LogWarning("Disk %d of instance %s did not return valid"
690
                            " spindles information, ignoring", idx,
691
                            instance.name)
692
          elif disk.spindles is None or disk.spindles != spindles:
693
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
694
                         " correcting: recorded %s, actual %s",
695
                         idx, instance.name, disk.spindles, spindles)
696
            disk.spindles = spindles
697
            self.cfg.Update(instance, feedback_fn)
698
            changed.append((instance.name, idx, "spindles", disk.spindles))
699
        if self._EnsureChildSizes(disk):
700
          self.cfg.Update(instance, feedback_fn)
701
          changed.append((instance.name, idx, "size", disk.size))
702
    return changed
703

    
704

    
705
def _ValidateNetmask(cfg, netmask):
706
  """Checks if a netmask is valid.
707

708
  @type cfg: L{config.ConfigWriter}
709
  @param cfg: cluster configuration
710
  @type netmask: int
711
  @param netmask: netmask to be verified
712
  @raise errors.OpPrereqError: if the validation fails
713

714
  """
715
  ip_family = cfg.GetPrimaryIPFamily()
716
  try:
717
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
718
  except errors.ProgrammerError:
719
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
720
                               ip_family, errors.ECODE_INVAL)
721
  if not ipcls.ValidateNetmask(netmask):
722
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
723
                               (netmask), errors.ECODE_INVAL)
724

    
725

    
726
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
727
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
728
    file_disk_template):
729
  """Checks whether the given file-based storage directory is acceptable.
730

731
  Note: This function is public, because it is also used in bootstrap.py.
732

733
  @type logging_warn_fn: function
734
  @param logging_warn_fn: function which accepts a string and logs it
735
  @type file_storage_dir: string
736
  @param file_storage_dir: the directory to be used for file-based instances
737
  @type enabled_disk_templates: list of string
738
  @param enabled_disk_templates: the list of enabled disk templates
739
  @type file_disk_template: string
740
  @param file_disk_template: the file-based disk template for which the
741
      path should be checked
742

743
  """
744
  assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
745
            constants.ST_FILE, constants.ST_SHARED_FILE
746
         ))
747
  file_storage_enabled = file_disk_template in enabled_disk_templates
748
  if file_storage_dir is not None:
749
    if file_storage_dir == "":
750
      if file_storage_enabled:
751
        raise errors.OpPrereqError(
752
            "Unsetting the '%s' storage directory while having '%s' storage"
753
            " enabled is not permitted." %
754
            (file_disk_template, file_disk_template))
755
    else:
756
      if not file_storage_enabled:
757
        logging_warn_fn(
758
            "Specified a %s storage directory, although %s storage is not"
759
            " enabled." % (file_disk_template, file_disk_template))
760
  else:
761
    raise errors.ProgrammerError("Received %s storage dir with value"
762
                                 " 'None'." % file_disk_template)
763

    
764

    
765
def CheckFileStoragePathVsEnabledDiskTemplates(
766
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
767
  """Checks whether the given file storage directory is acceptable.
768

769
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
770

771
  """
772
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
773
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
774
      constants.DT_FILE)
775

    
776

    
777
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
778
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
779
  """Checks whether the given shared file storage directory is acceptable.
780

781
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
782

783
  """
784
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
785
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
786
      constants.DT_SHARED_FILE)
787

    
788

    
789
class LUClusterSetParams(LogicalUnit):
790
  """Change the parameters of the cluster.
791

792
  """
793
  HPATH = "cluster-modify"
794
  HTYPE = constants.HTYPE_CLUSTER
795
  REQ_BGL = False
796

    
797
  def CheckArguments(self):
798
    """Check parameters
799

800
    """
801
    if self.op.uid_pool:
802
      uidpool.CheckUidPool(self.op.uid_pool)
803

    
804
    if self.op.add_uids:
805
      uidpool.CheckUidPool(self.op.add_uids)
806

    
807
    if self.op.remove_uids:
808
      uidpool.CheckUidPool(self.op.remove_uids)
809

    
810
    if self.op.mac_prefix:
811
      self.op.mac_prefix = \
812
          utils.NormalizeAndValidateThreeOctetMacPrefix(self.op.mac_prefix)
813

    
814
    if self.op.master_netmask is not None:
815
      _ValidateNetmask(self.cfg, self.op.master_netmask)
816

    
817
    if self.op.diskparams:
818
      for dt_params in self.op.diskparams.values():
819
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
820
      try:
821
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
822
        CheckDiskAccessModeValidity(self.op.diskparams)
823
      except errors.OpPrereqError, err:
824
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
825
                                   errors.ECODE_INVAL)
826

    
827
  def ExpandNames(self):
828
    # FIXME: in the future maybe other cluster params won't require checking on
829
    # all nodes to be modified.
830
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
831
    # resource locks the right thing, shouldn't it be the BGL instead?
832
    self.needed_locks = {
833
      locking.LEVEL_NODE: locking.ALL_SET,
834
      locking.LEVEL_INSTANCE: locking.ALL_SET,
835
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
836
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
837
    }
838
    self.share_locks = ShareAll()
839

    
840
  def BuildHooksEnv(self):
841
    """Build hooks env.
842

843
    """
844
    return {
845
      "OP_TARGET": self.cfg.GetClusterName(),
846
      "NEW_VG_NAME": self.op.vg_name,
847
      }
848

    
849
  def BuildHooksNodes(self):
850
    """Build hooks nodes.
851

852
    """
853
    mn = self.cfg.GetMasterNode()
854
    return ([mn], [mn])
855

    
856
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
857
                   new_enabled_disk_templates):
858
    """Check the consistency of the vg name on all nodes and in case it gets
859
       unset whether there are instances still using it.
860

861
    """
862
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
863
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
864
                                            new_enabled_disk_templates)
865
    current_vg_name = self.cfg.GetVGName()
866

    
867
    if self.op.vg_name == '':
868
      if lvm_is_enabled:
869
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
870
                                   " disk templates are or get enabled.")
871

    
872
    if self.op.vg_name is None:
873
      if current_vg_name is None and lvm_is_enabled:
874
        raise errors.OpPrereqError("Please specify a volume group when"
875
                                   " enabling lvm-based disk-templates.")
876

    
877
    if self.op.vg_name is not None and not self.op.vg_name:
878
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
879
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
880
                                   " instances exist", errors.ECODE_INVAL)
881

    
882
    if (self.op.vg_name is not None and lvm_is_enabled) or \
883
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
884
      self._CheckVgNameOnNodes(node_uuids)
885

    
886
  def _CheckVgNameOnNodes(self, node_uuids):
887
    """Check the status of the volume group on each node.
888

889
    """
890
    vglist = self.rpc.call_vg_list(node_uuids)
891
    for node_uuid in node_uuids:
892
      msg = vglist[node_uuid].fail_msg
893
      if msg:
894
        # ignoring down node
895
        self.LogWarning("Error while gathering data on node %s"
896
                        " (ignoring node): %s",
897
                        self.cfg.GetNodeName(node_uuid), msg)
898
        continue
899
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
900
                                            self.op.vg_name,
901
                                            constants.MIN_VG_SIZE)
902
      if vgstatus:
903
        raise errors.OpPrereqError("Error on node '%s': %s" %
904
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
905
                                   errors.ECODE_ENVIRON)
906

    
907
  @staticmethod
908
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
909
                                old_enabled_disk_templates):
910
    """Computes three sets of disk templates.
911

912
    @see: C{_GetDiskTemplateSets} for more details.
913

914
    """
915
    enabled_disk_templates = None
916
    new_enabled_disk_templates = []
917
    disabled_disk_templates = []
918
    if op_enabled_disk_templates:
919
      enabled_disk_templates = op_enabled_disk_templates
920
      new_enabled_disk_templates = \
921
        list(set(enabled_disk_templates)
922
             - set(old_enabled_disk_templates))
923
      disabled_disk_templates = \
924
        list(set(old_enabled_disk_templates)
925
             - set(enabled_disk_templates))
926
    else:
927
      enabled_disk_templates = old_enabled_disk_templates
928
    return (enabled_disk_templates, new_enabled_disk_templates,
929
            disabled_disk_templates)
930

    
931
  def _GetDiskTemplateSets(self, cluster):
932
    """Computes three sets of disk templates.
933

934
    The three sets are:
935
      - disk templates that will be enabled after this operation (no matter if
936
        they were enabled before or not)
937
      - disk templates that get enabled by this operation (thus haven't been
938
        enabled before.)
939
      - disk templates that get disabled by this operation
940

941
    """
942
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
943
                                          cluster.enabled_disk_templates)
944

    
945
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
946
    """Checks the ipolicy.
947

948
    @type cluster: C{objects.Cluster}
949
    @param cluster: the cluster's configuration
950
    @type enabled_disk_templates: list of string
951
    @param enabled_disk_templates: list of (possibly newly) enabled disk
952
      templates
953

954
    """
955
    # FIXME: write unit tests for this
956
    if self.op.ipolicy:
957
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
958
                                           group_policy=False)
959

    
960
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
961
                                  enabled_disk_templates)
962

    
963
      all_instances = self.cfg.GetAllInstancesInfo().values()
964
      violations = set()
965
      for group in self.cfg.GetAllNodeGroupsInfo().values():
966
        instances = frozenset([inst for inst in all_instances
967
                               if compat.any(nuuid in group.members
968
                                             for nuuid in inst.all_nodes)])
969
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
970
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
971
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
972
                                           self.cfg)
973
        if new:
974
          violations.update(new)
975

    
976
      if violations:
977
        self.LogWarning("After the ipolicy change the following instances"
978
                        " violate them: %s",
979
                        utils.CommaJoin(utils.NiceSort(violations)))
980
    else:
981
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
982
                                  enabled_disk_templates)
983

    
984
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
985
    """Checks whether the set DRBD helper actually exists on the nodes.
986

987
    @type drbd_helper: string
988
    @param drbd_helper: path of the drbd usermode helper binary
989
    @type node_uuids: list of strings
990
    @param node_uuids: list of node UUIDs to check for the helper
991

992
    """
993
    # checks given drbd helper on all nodes
994
    helpers = self.rpc.call_drbd_helper(node_uuids)
995
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
996
      if ninfo.offline:
997
        self.LogInfo("Not checking drbd helper on offline node %s",
998
                     ninfo.name)
999
        continue
1000
      msg = helpers[ninfo.uuid].fail_msg
1001
      if msg:
1002
        raise errors.OpPrereqError("Error checking drbd helper on node"
1003
                                   " '%s': %s" % (ninfo.name, msg),
1004
                                   errors.ECODE_ENVIRON)
1005
      node_helper = helpers[ninfo.uuid].payload
1006
      if node_helper != drbd_helper:
1007
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
1008
                                   (ninfo.name, node_helper),
1009
                                   errors.ECODE_ENVIRON)
1010

    
1011
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
1012
    """Check the DRBD usermode helper.
1013

1014
    @type node_uuids: list of strings
1015
    @param node_uuids: a list of nodes' UUIDs
1016
    @type drbd_enabled: boolean
1017
    @param drbd_enabled: whether DRBD will be enabled after this operation
1018
      (no matter if it was disabled before or not)
1019
    @type drbd_gets_enabled: boolen
1020
    @param drbd_gets_enabled: true if DRBD was disabled before this
1021
      operation, but will be enabled afterwards
1022

1023
    """
1024
    if self.op.drbd_helper == '':
1025
      if drbd_enabled:
1026
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1027
                                   " DRBD is enabled.")
1028
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
1029
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1030
                                   " drbd-based instances exist",
1031
                                   errors.ECODE_INVAL)
1032

    
1033
    else:
1034
      if self.op.drbd_helper is not None and drbd_enabled:
1035
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
1036
      else:
1037
        if drbd_gets_enabled:
1038
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
1039
          if current_drbd_helper is not None:
1040
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
1041
          else:
1042
            raise errors.OpPrereqError("Cannot enable DRBD without a"
1043
                                       " DRBD usermode helper set.")
1044

    
1045
  def _CheckInstancesOfDisabledDiskTemplates(
1046
      self, disabled_disk_templates):
1047
    """Check whether we try to disable a disk template that is in use.
1048

1049
    @type disabled_disk_templates: list of string
1050
    @param disabled_disk_templates: list of disk templates that are going to
1051
      be disabled by this operation
1052

1053
    """
1054
    for disk_template in disabled_disk_templates:
1055
      if self.cfg.HasAnyDiskOfType(disk_template):
1056
        raise errors.OpPrereqError(
1057
            "Cannot disable disk template '%s', because there is at least one"
1058
            " instance using it." % disk_template)
1059

    
1060
  @staticmethod
1061
  def _CheckInstanceCommunicationNetwork(network, warning_fn):
1062
    """Check whether an existing network is configured for instance
1063
    communication.
1064

1065
    Checks whether an existing network is configured with the
1066
    parameters that are advisable for instance communication, and
1067
    otherwise issue security warnings.
1068

1069
    @type network: L{ganeti.objects.Network}
1070
    @param network: L{ganeti.objects.Network} object whose
1071
                    configuration is being checked
1072
    @type warning_fn: function
1073
    @param warning_fn: function used to print warnings
1074
    @rtype: None
1075
    @return: None
1076

1077
    """
1078
    def _MaybeWarn(err, val, default):
1079
      if val != default:
1080
        warning_fn("Supplied instance communication network '%s' %s '%s',"
1081
                   " this might pose a security risk (default is '%s').",
1082
                   network.name, err, val, default)
1083

    
1084
    if network.network is None:
1085
      raise errors.OpPrereqError("Supplied instance communication network '%s'"
1086
                                 " must have an IPv4 network address.",
1087
                                 network.name)
1088

    
1089
    _MaybeWarn("has an IPv4 gateway", network.gateway, None)
1090
    _MaybeWarn("has a non-standard IPv4 network address", network.network,
1091
               constants.INSTANCE_COMMUNICATION_NETWORK4)
1092
    _MaybeWarn("has an IPv6 gateway", network.gateway6, None)
1093
    _MaybeWarn("has a non-standard IPv6 network address", network.network6,
1094
               constants.INSTANCE_COMMUNICATION_NETWORK6)
1095
    _MaybeWarn("has a non-standard MAC prefix", network.mac_prefix,
1096
               constants.INSTANCE_COMMUNICATION_MAC_PREFIX)
1097

    
1098
  def CheckPrereq(self):
1099
    """Check prerequisites.
1100

1101
    This checks whether the given params don't conflict and
1102
    if the given volume group is valid.
1103

1104
    """
1105
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1106
    self.cluster = cluster = self.cfg.GetClusterInfo()
1107

    
1108
    vm_capable_node_uuids = [node.uuid
1109
                             for node in self.cfg.GetAllNodesInfo().values()
1110
                             if node.uuid in node_uuids and node.vm_capable]
1111

    
1112
    (enabled_disk_templates, new_enabled_disk_templates,
1113
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
1114
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
1115

    
1116
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
1117
                      new_enabled_disk_templates)
1118

    
1119
    if self.op.file_storage_dir is not None:
1120
      CheckFileStoragePathVsEnabledDiskTemplates(
1121
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
1122

    
1123
    if self.op.shared_file_storage_dir is not None:
1124
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
1125
          self.LogWarning, self.op.shared_file_storage_dir,
1126
          enabled_disk_templates)
1127

    
1128
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1129
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1130
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1131

    
1132
    # validate params changes
1133
    if self.op.beparams:
1134
      objects.UpgradeBeParams(self.op.beparams)
1135
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1136
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1137

    
1138
    if self.op.ndparams:
1139
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1140
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1141

    
1142
      # TODO: we need a more general way to handle resetting
1143
      # cluster-level parameters to default values
1144
      if self.new_ndparams["oob_program"] == "":
1145
        self.new_ndparams["oob_program"] = \
1146
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1147

    
1148
    if self.op.hv_state:
1149
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1150
                                           self.cluster.hv_state_static)
1151
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1152
                               for hv, values in new_hv_state.items())
1153

    
1154
    if self.op.disk_state:
1155
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1156
                                               self.cluster.disk_state_static)
1157
      self.new_disk_state = \
1158
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1159
                            for name, values in svalues.items()))
1160
             for storage, svalues in new_disk_state.items())
1161

    
1162
    self._CheckIpolicy(cluster, enabled_disk_templates)
1163

    
1164
    if self.op.nicparams:
1165
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1166
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1167
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1168
      nic_errors = []
1169

    
1170
      # check all instances for consistency
1171
      for instance in self.cfg.GetAllInstancesInfo().values():
1172
        for nic_idx, nic in enumerate(instance.nics):
1173
          params_copy = copy.deepcopy(nic.nicparams)
1174
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1175

    
1176
          # check parameter syntax
1177
          try:
1178
            objects.NIC.CheckParameterSyntax(params_filled)
1179
          except errors.ConfigurationError, err:
1180
            nic_errors.append("Instance %s, nic/%d: %s" %
1181
                              (instance.name, nic_idx, err))
1182

    
1183
          # if we're moving instances to routed, check that they have an ip
1184
          target_mode = params_filled[constants.NIC_MODE]
1185
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1186
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1187
                              " address" % (instance.name, nic_idx))
1188
      if nic_errors:
1189
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1190
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1191

    
1192
    # hypervisor list/parameters
1193
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1194
    if self.op.hvparams:
1195
      for hv_name, hv_dict in self.op.hvparams.items():
1196
        if hv_name not in self.new_hvparams:
1197
          self.new_hvparams[hv_name] = hv_dict
1198
        else:
1199
          self.new_hvparams[hv_name].update(hv_dict)
1200

    
1201
    # disk template parameters
1202
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1203
    if self.op.diskparams:
1204
      for dt_name, dt_params in self.op.diskparams.items():
1205
        if dt_name not in self.new_diskparams:
1206
          self.new_diskparams[dt_name] = dt_params
1207
        else:
1208
          self.new_diskparams[dt_name].update(dt_params)
1209
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1210

    
1211
    # os hypervisor parameters
1212
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1213
    if self.op.os_hvp:
1214
      for os_name, hvs in self.op.os_hvp.items():
1215
        if os_name not in self.new_os_hvp:
1216
          self.new_os_hvp[os_name] = hvs
1217
        else:
1218
          for hv_name, hv_dict in hvs.items():
1219
            if hv_dict is None:
1220
              # Delete if it exists
1221
              self.new_os_hvp[os_name].pop(hv_name, None)
1222
            elif hv_name not in self.new_os_hvp[os_name]:
1223
              self.new_os_hvp[os_name][hv_name] = hv_dict
1224
            else:
1225
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1226

    
1227
    # os parameters
1228
    self._BuildOSParams(cluster)
1229

    
1230
    # changes to the hypervisor list
1231
    if self.op.enabled_hypervisors is not None:
1232
      self.hv_list = self.op.enabled_hypervisors
1233
      for hv in self.hv_list:
1234
        # if the hypervisor doesn't already exist in the cluster
1235
        # hvparams, we initialize it to empty, and then (in both
1236
        # cases) we make sure to fill the defaults, as we might not
1237
        # have a complete defaults list if the hypervisor wasn't
1238
        # enabled before
1239
        if hv not in new_hvp:
1240
          new_hvp[hv] = {}
1241
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1242
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1243
    else:
1244
      self.hv_list = cluster.enabled_hypervisors
1245

    
1246
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1247
      # either the enabled list has changed, or the parameters have, validate
1248
      for hv_name, hv_params in self.new_hvparams.items():
1249
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1250
            (self.op.enabled_hypervisors and
1251
             hv_name in self.op.enabled_hypervisors)):
1252
          # either this is a new hypervisor, or its parameters have changed
1253
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1254
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1255
          hv_class.CheckParameterSyntax(hv_params)
1256
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1257

    
1258
    self._CheckDiskTemplateConsistency()
1259

    
1260
    if self.op.os_hvp:
1261
      # no need to check any newly-enabled hypervisors, since the
1262
      # defaults have already been checked in the above code-block
1263
      for os_name, os_hvp in self.new_os_hvp.items():
1264
        for hv_name, hv_params in os_hvp.items():
1265
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1266
          # we need to fill in the new os_hvp on top of the actual hv_p
1267
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1268
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1269
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1270
          hv_class.CheckParameterSyntax(new_osp)
1271
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1272

    
1273
    if self.op.default_iallocator:
1274
      alloc_script = utils.FindFile(self.op.default_iallocator,
1275
                                    constants.IALLOCATOR_SEARCH_PATH,
1276
                                    os.path.isfile)
1277
      if alloc_script is None:
1278
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1279
                                   " specified" % self.op.default_iallocator,
1280
                                   errors.ECODE_INVAL)
1281

    
1282
    if self.op.instance_communication_network:
1283
      network_name = self.op.instance_communication_network
1284

    
1285
      try:
1286
        network_uuid = self.cfg.LookupNetwork(network_name)
1287
      except errors.OpPrereqError:
1288
        network_uuid = None
1289

    
1290
      if network_uuid is not None:
1291
        network = self.cfg.GetNetwork(network_uuid)
1292
        self._CheckInstanceCommunicationNetwork(network, self.LogWarning)
1293

    
1294
  def _BuildOSParams(self, cluster):
1295
    "Calculate the new OS parameters for this operation."
1296

    
1297
    def _GetNewParams(source, new_params):
1298
      "Wrapper around GetUpdatedParams."
1299
      if new_params is None:
1300
        return source
1301
      result = objects.FillDict(source, {}) # deep copy of source
1302
      for os_name in new_params:
1303
        result[os_name] = GetUpdatedParams(result.get(os_name, {}),
1304
                                           new_params[os_name],
1305
                                           use_none=True)
1306
        if not result[os_name]:
1307
          del result[os_name] # we removed all parameters
1308
      return result
1309

    
1310
    self.new_osp = _GetNewParams(cluster.osparams,
1311
                                 self.op.osparams)
1312
    self.new_osp_private = _GetNewParams(cluster.osparams_private_cluster,
1313
                                         self.op.osparams_private_cluster)
1314

    
1315
    # Remove os validity check
1316
    changed_oses = (set(self.new_osp.keys()) | set(self.new_osp_private.keys()))
1317
    for os_name in changed_oses:
1318
      os_params = cluster.SimpleFillOS(
1319
        os_name,
1320
        self.new_osp.get(os_name, {}),
1321
        os_params_private=self.new_osp_private.get(os_name, {})
1322
      )
1323
      # check the parameter validity (remote check)
1324
      CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1325
                    os_name, os_params)
1326

    
1327
  def _CheckDiskTemplateConsistency(self):
1328
    """Check whether the disk templates that are going to be disabled
1329
       are still in use by some instances.
1330

1331
    """
1332
    if self.op.enabled_disk_templates:
1333
      cluster = self.cfg.GetClusterInfo()
1334
      instances = self.cfg.GetAllInstancesInfo()
1335

    
1336
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1337
        - set(self.op.enabled_disk_templates)
1338
      for instance in instances.itervalues():
1339
        if instance.disk_template in disk_templates_to_remove:
1340
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1341
                                     " because instance '%s' is using it." %
1342
                                     (instance.disk_template, instance.name))
1343

    
1344
  def _SetVgName(self, feedback_fn):
1345
    """Determines and sets the new volume group name.
1346

1347
    """
1348
    if self.op.vg_name is not None:
1349
      new_volume = self.op.vg_name
1350
      if not new_volume:
1351
        new_volume = None
1352
      if new_volume != self.cfg.GetVGName():
1353
        self.cfg.SetVGName(new_volume)
1354
      else:
1355
        feedback_fn("Cluster LVM configuration already in desired"
1356
                    " state, not changing")
1357

    
1358
  def _SetFileStorageDir(self, feedback_fn):
1359
    """Set the file storage directory.
1360

1361
    """
1362
    if self.op.file_storage_dir is not None:
1363
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1364
        feedback_fn("Global file storage dir already set to value '%s'"
1365
                    % self.cluster.file_storage_dir)
1366
      else:
1367
        self.cluster.file_storage_dir = self.op.file_storage_dir
1368

    
1369
  def _SetDrbdHelper(self, feedback_fn):
1370
    """Set the DRBD usermode helper.
1371

1372
    """
1373
    if self.op.drbd_helper is not None:
1374
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1375
        feedback_fn("Note that you specified a drbd user helper, but did not"
1376
                    " enable the drbd disk template.")
1377
      new_helper = self.op.drbd_helper
1378
      if not new_helper:
1379
        new_helper = None
1380
      if new_helper != self.cfg.GetDRBDHelper():
1381
        self.cfg.SetDRBDHelper(new_helper)
1382
      else:
1383
        feedback_fn("Cluster DRBD helper already in desired state,"
1384
                    " not changing")
1385

    
1386
  @staticmethod
1387
  def _EnsureInstanceCommunicationNetwork(cfg, network_name):
1388
    """Ensure that the instance communication network exists and is
1389
    connected to all groups.
1390

1391
    The instance communication network given by L{network_name} it is
1392
    created, if necessary, via the opcode 'OpNetworkAdd'.  Also, the
1393
    instance communication network is connected to all existing node
1394
    groups, if necessary, via the opcode 'OpNetworkConnect'.
1395

1396
    @type cfg: L{config.ConfigWriter}
1397
    @param cfg: cluster configuration
1398

1399
    @type network_name: string
1400
    @param network_name: instance communication network name
1401

1402
    @rtype: L{ganeti.cmdlib.ResultWithJobs} or L{None}
1403
    @return: L{ganeti.cmdlib.ResultWithJobs} if the instance
1404
             communication needs to be created or it needs to be
1405
             connected to a group, otherwise L{None}
1406

1407
    """
1408
    jobs = []
1409

    
1410
    try:
1411
      network_uuid = cfg.LookupNetwork(network_name)
1412
      network_exists = True
1413
    except errors.OpPrereqError:
1414
      network_exists = False
1415

    
1416
    if not network_exists:
1417
      jobs.append(AddInstanceCommunicationNetworkOp(network_name))
1418

    
1419
    for group_uuid in cfg.GetNodeGroupList():
1420
      group = cfg.GetNodeGroup(group_uuid)
1421

    
1422
      if network_exists:
1423
        network_connected = network_uuid in group.networks
1424
      else:
1425
        # The network was created asynchronously by the previous
1426
        # opcode and, therefore, we don't have access to its
1427
        # network_uuid.  As a result, we assume that the network is
1428
        # not connected to any group yet.
1429
        network_connected = False
1430

    
1431
      if not network_connected:
1432
        op = ConnectInstanceCommunicationNetworkOp(group_uuid, network_name)
1433
        jobs.append(op)
1434

    
1435
    if jobs:
1436
      return ResultWithJobs([jobs])
1437
    else:
1438
      return None
1439

    
1440
  @staticmethod
1441
  def _ModifyInstanceCommunicationNetwork(cfg, cluster, network_name,
1442
                                          feedback_fn):
1443
    """Update the instance communication network stored in the cluster
1444
    configuration.
1445

1446
    Compares the user-supplied instance communication network against
1447
    the one stored in the Ganeti cluster configuration.  If there is a
1448
    change, the instance communication network may be possibly created
1449
    and connected to all groups (see
1450
    L{LUClusterSetParams._EnsureInstanceCommunicationNetwork}).
1451

1452
    @type cfg: L{config.ConfigWriter}
1453
    @param cfg: cluster configuration
1454

1455
    @type cluster: L{ganeti.objects.Cluster}
1456
    @param cluster: Ganeti cluster
1457

1458
    @type network_name: string
1459
    @param network_name: instance communication network name
1460

1461
    @type feedback_fn: function
1462
    @param feedback_fn: see L{ganeti.cmdlist.base.LogicalUnit}
1463

1464
    @rtype: L{LUClusterSetParams._EnsureInstanceCommunicationNetwork} or L{None}
1465
    @return: see L{LUClusterSetParams._EnsureInstanceCommunicationNetwork}
1466

1467
    """
1468
    config_network_name = cfg.GetInstanceCommunicationNetwork()
1469

    
1470
    if network_name == config_network_name:
1471
      feedback_fn("Instance communication network already is '%s', nothing to"
1472
                  " do." % network_name)
1473
    else:
1474
      try:
1475
        cfg.LookupNetwork(config_network_name)
1476
        feedback_fn("Previous instance communication network '%s'"
1477
                    " should be removed manually." % config_network_name)
1478
      except errors.OpPrereqError:
1479
        pass
1480

    
1481
      if network_name:
1482
        feedback_fn("Changing instance communication network to '%s', only new"
1483
                    " instances will be affected."
1484
                    % network_name)
1485
      else:
1486
        feedback_fn("Disabling instance communication network, only new"
1487
                    " instances will be affected.")
1488

    
1489
      cluster.instance_communication_network = network_name
1490

    
1491
      if network_name:
1492
        return LUClusterSetParams._EnsureInstanceCommunicationNetwork(
1493
          cfg,
1494
          network_name)
1495
      else:
1496
        return None
1497

    
1498
  def Exec(self, feedback_fn):
1499
    """Change the parameters of the cluster.
1500

1501
    """
1502
    if self.op.enabled_disk_templates:
1503
      self.cluster.enabled_disk_templates = \
1504
        list(self.op.enabled_disk_templates)
1505

    
1506
    self._SetVgName(feedback_fn)
1507
    self._SetFileStorageDir(feedback_fn)
1508
    self._SetDrbdHelper(feedback_fn)
1509

    
1510
    if self.op.hvparams:
1511
      self.cluster.hvparams = self.new_hvparams
1512
    if self.op.os_hvp:
1513
      self.cluster.os_hvp = self.new_os_hvp
1514
    if self.op.enabled_hypervisors is not None:
1515
      self.cluster.hvparams = self.new_hvparams
1516
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1517
    if self.op.beparams:
1518
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1519
    if self.op.nicparams:
1520
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1521
    if self.op.ipolicy:
1522
      self.cluster.ipolicy = self.new_ipolicy
1523
    if self.op.osparams:
1524
      self.cluster.osparams = self.new_osp
1525
    if self.op.osparams_private_cluster:
1526
      self.cluster.osparams_private_cluster = self.new_osp_private
1527
    if self.op.ndparams:
1528
      self.cluster.ndparams = self.new_ndparams
1529
    if self.op.diskparams:
1530
      self.cluster.diskparams = self.new_diskparams
1531
    if self.op.hv_state:
1532
      self.cluster.hv_state_static = self.new_hv_state
1533
    if self.op.disk_state:
1534
      self.cluster.disk_state_static = self.new_disk_state
1535

    
1536
    if self.op.candidate_pool_size is not None:
1537
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1538
      # we need to update the pool size here, otherwise the save will fail
1539
      AdjustCandidatePool(self, [], feedback_fn)
1540

    
1541
    if self.op.max_running_jobs is not None:
1542
      self.cluster.max_running_jobs = self.op.max_running_jobs
1543

    
1544
    if self.op.maintain_node_health is not None:
1545
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1546
        feedback_fn("Note: CONFD was disabled at build time, node health"
1547
                    " maintenance is not useful (still enabling it)")
1548
      self.cluster.maintain_node_health = self.op.maintain_node_health
1549

    
1550
    if self.op.modify_etc_hosts is not None:
1551
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1552

    
1553
    if self.op.prealloc_wipe_disks is not None:
1554
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1555

    
1556
    if self.op.add_uids is not None:
1557
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1558

    
1559
    if self.op.remove_uids is not None:
1560
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1561

    
1562
    if self.op.uid_pool is not None:
1563
      self.cluster.uid_pool = self.op.uid_pool
1564

    
1565
    if self.op.default_iallocator is not None:
1566
      self.cluster.default_iallocator = self.op.default_iallocator
1567

    
1568
    if self.op.default_iallocator_params is not None:
1569
      self.cluster.default_iallocator_params = self.op.default_iallocator_params
1570

    
1571
    if self.op.reserved_lvs is not None:
1572
      self.cluster.reserved_lvs = self.op.reserved_lvs
1573

    
1574
    if self.op.use_external_mip_script is not None:
1575
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1576

    
1577
    def helper_os(aname, mods, desc):
1578
      desc += " OS list"
1579
      lst = getattr(self.cluster, aname)
1580
      for key, val in mods:
1581
        if key == constants.DDM_ADD:
1582
          if val in lst:
1583
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1584
          else:
1585
            lst.append(val)
1586
        elif key == constants.DDM_REMOVE:
1587
          if val in lst:
1588
            lst.remove(val)
1589
          else:
1590
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1591
        else:
1592
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1593

    
1594
    if self.op.hidden_os:
1595
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1596

    
1597
    if self.op.blacklisted_os:
1598
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1599

    
1600
    if self.op.mac_prefix:
1601
      self.cluster.mac_prefix = self.op.mac_prefix
1602

    
1603
    if self.op.master_netdev:
1604
      master_params = self.cfg.GetMasterNetworkParameters()
1605
      ems = self.cfg.GetUseExternalMipScript()
1606
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1607
                  self.cluster.master_netdev)
1608
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1609
                                                       master_params, ems)
1610
      if not self.op.force:
1611
        result.Raise("Could not disable the master ip")
1612
      else:
1613
        if result.fail_msg:
1614
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1615
                 result.fail_msg)
1616
          feedback_fn(msg)
1617
      feedback_fn("Changing master_netdev from %s to %s" %
1618
                  (master_params.netdev, self.op.master_netdev))
1619
      self.cluster.master_netdev = self.op.master_netdev
1620

    
1621
    if self.op.master_netmask:
1622
      master_params = self.cfg.GetMasterNetworkParameters()
1623
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1624
      result = self.rpc.call_node_change_master_netmask(
1625
                 master_params.uuid, master_params.netmask,
1626
                 self.op.master_netmask, master_params.ip,
1627
                 master_params.netdev)
1628
      result.Warn("Could not change the master IP netmask", feedback_fn)
1629
      self.cluster.master_netmask = self.op.master_netmask
1630

    
1631
    self.cfg.Update(self.cluster, feedback_fn)
1632

    
1633
    if self.op.master_netdev:
1634
      master_params = self.cfg.GetMasterNetworkParameters()
1635
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1636
                  self.op.master_netdev)
1637
      ems = self.cfg.GetUseExternalMipScript()
1638
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1639
                                                     master_params, ems)
1640
      result.Warn("Could not re-enable the master ip on the master,"
1641
                  " please restart manually", self.LogWarning)
1642

    
1643
    network_name = self.op.instance_communication_network
1644
    if network_name is not None:
1645
      return self._ModifyInstanceCommunicationNetwork(self.cfg, self.cluster,
1646
                                                      network_name, feedback_fn)
1647
    else:
1648
      return None
1649

    
1650

    
1651
class LUClusterVerify(NoHooksLU):
1652
  """Submits all jobs necessary to verify the cluster.
1653

1654
  """
1655
  REQ_BGL = False
1656

    
1657
  def ExpandNames(self):
1658
    self.needed_locks = {}
1659

    
1660
  def Exec(self, feedback_fn):
1661
    jobs = []
1662

    
1663
    if self.op.group_name:
1664
      groups = [self.op.group_name]
1665
      depends_fn = lambda: None
1666
    else:
1667
      groups = self.cfg.GetNodeGroupList()
1668

    
1669
      # Verify global configuration
1670
      jobs.append([
1671
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1672
        ])
1673

    
1674
      # Always depend on global verification
1675
      depends_fn = lambda: [(-len(jobs), [])]
1676

    
1677
    jobs.extend(
1678
      [opcodes.OpClusterVerifyGroup(group_name=group,
1679
                                    ignore_errors=self.op.ignore_errors,
1680
                                    depends=depends_fn())]
1681
      for group in groups)
1682

    
1683
    # Fix up all parameters
1684
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1685
      op.debug_simulate_errors = self.op.debug_simulate_errors
1686
      op.verbose = self.op.verbose
1687
      op.error_codes = self.op.error_codes
1688
      try:
1689
        op.skip_checks = self.op.skip_checks
1690
      except AttributeError:
1691
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1692

    
1693
    return ResultWithJobs(jobs)
1694

    
1695

    
1696
class _VerifyErrors(object):
1697
  """Mix-in for cluster/group verify LUs.
1698

1699
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1700
  self.op and self._feedback_fn to be available.)
1701

1702
  """
1703

    
1704
  ETYPE_FIELD = "code"
1705
  ETYPE_ERROR = constants.CV_ERROR
1706
  ETYPE_WARNING = constants.CV_WARNING
1707

    
1708
  def _Error(self, ecode, item, msg, *args, **kwargs):
1709
    """Format an error message.
1710

1711
    Based on the opcode's error_codes parameter, either format a
1712
    parseable error code, or a simpler error string.
1713

1714
    This must be called only from Exec and functions called from Exec.
1715

1716
    """
1717
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1718
    itype, etxt, _ = ecode
1719
    # If the error code is in the list of ignored errors, demote the error to a
1720
    # warning
1721
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1722
      ltype = self.ETYPE_WARNING
1723
    # first complete the msg
1724
    if args:
1725
      msg = msg % args
1726
    # then format the whole message
1727
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1728
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1729
    else:
1730
      if item:
1731
        item = " " + item
1732
      else:
1733
        item = ""
1734
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1735
    # and finally report it via the feedback_fn
1736
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1737
    # do not mark the operation as failed for WARN cases only
1738
    if ltype == self.ETYPE_ERROR:
1739
      self.bad = True
1740

    
1741
  def _ErrorIf(self, cond, *args, **kwargs):
1742
    """Log an error message if the passed condition is True.
1743

1744
    """
1745
    if (bool(cond)
1746
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1747
      self._Error(*args, **kwargs)
1748

    
1749

    
1750
def _GetAllHypervisorParameters(cluster, instances):
1751
  """Compute the set of all hypervisor parameters.
1752

1753
  @type cluster: L{objects.Cluster}
1754
  @param cluster: the cluster object
1755
  @param instances: list of L{objects.Instance}
1756
  @param instances: additional instances from which to obtain parameters
1757
  @rtype: list of (origin, hypervisor, parameters)
1758
  @return: a list with all parameters found, indicating the hypervisor they
1759
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1760

1761
  """
1762
  hvp_data = []
1763

    
1764
  for hv_name in cluster.enabled_hypervisors:
1765
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1766

    
1767
  for os_name, os_hvp in cluster.os_hvp.items():
1768
    for hv_name, hv_params in os_hvp.items():
1769
      if hv_params:
1770
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1771
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1772

    
1773
  # TODO: collapse identical parameter values in a single one
1774
  for instance in instances:
1775
    if instance.hvparams:
1776
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1777
                       cluster.FillHV(instance)))
1778

    
1779
  return hvp_data
1780

    
1781

    
1782
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1783
  """Verifies the cluster config.
1784

1785
  """
1786
  REQ_BGL = False
1787

    
1788
  def _VerifyHVP(self, hvp_data):
1789
    """Verifies locally the syntax of the hypervisor parameters.
1790

1791
    """
1792
    for item, hv_name, hv_params in hvp_data:
1793
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1794
             (item, hv_name))
1795
      try:
1796
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1797
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1798
        hv_class.CheckParameterSyntax(hv_params)
1799
      except errors.GenericError, err:
1800
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1801

    
1802
  def ExpandNames(self):
1803
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1804
    self.share_locks = ShareAll()
1805

    
1806
  def CheckPrereq(self):
1807
    """Check prerequisites.
1808

1809
    """
1810
    # Retrieve all information
1811
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1812
    self.all_node_info = self.cfg.GetAllNodesInfo()
1813
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1814

    
1815
  def Exec(self, feedback_fn):
1816
    """Verify integrity of cluster, performing various test on nodes.
1817

1818
    """
1819
    self.bad = False
1820
    self._feedback_fn = feedback_fn
1821

    
1822
    feedback_fn("* Verifying cluster config")
1823

    
1824
    for msg in self.cfg.VerifyConfig():
1825
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1826

    
1827
    feedback_fn("* Verifying cluster certificate files")
1828

    
1829
    for cert_filename in pathutils.ALL_CERT_FILES:
1830
      (errcode, msg) = utils.VerifyCertificate(cert_filename)
1831
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1832

    
1833
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1834
                                    pathutils.NODED_CERT_FILE),
1835
                  constants.CV_ECLUSTERCERT,
1836
                  None,
1837
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1838
                    constants.LUXID_USER + " user")
1839

    
1840
    feedback_fn("* Verifying hypervisor parameters")
1841

    
1842
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1843
                                                self.all_inst_info.values()))
1844

    
1845
    feedback_fn("* Verifying all nodes belong to an existing group")
1846

    
1847
    # We do this verification here because, should this bogus circumstance
1848
    # occur, it would never be caught by VerifyGroup, which only acts on
1849
    # nodes/instances reachable from existing node groups.
1850

    
1851
    dangling_nodes = set(node for node in self.all_node_info.values()
1852
                         if node.group not in self.all_group_info)
1853

    
1854
    dangling_instances = {}
1855
    no_node_instances = []
1856

    
1857
    for inst in self.all_inst_info.values():
1858
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1859
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1860
      elif inst.primary_node not in self.all_node_info:
1861
        no_node_instances.append(inst)
1862

    
1863
    pretty_dangling = [
1864
        "%s (%s)" %
1865
        (node.name,
1866
         utils.CommaJoin(inst.name for
1867
                         inst in dangling_instances.get(node.uuid, [])))
1868
        for node in dangling_nodes]
1869

    
1870
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1871
                  None,
1872
                  "the following nodes (and their instances) belong to a non"
1873
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1874

    
1875
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1876
                  None,
1877
                  "the following instances have a non-existing primary-node:"
1878
                  " %s", utils.CommaJoin(inst.name for
1879
                                         inst in no_node_instances))
1880

    
1881
    return not self.bad
1882

    
1883

    
1884
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1885
  """Verifies the status of a node group.
1886

1887
  """
1888
  HPATH = "cluster-verify"
1889
  HTYPE = constants.HTYPE_CLUSTER
1890
  REQ_BGL = False
1891

    
1892
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1893

    
1894
  class NodeImage(object):
1895
    """A class representing the logical and physical status of a node.
1896

1897
    @type uuid: string
1898
    @ivar uuid: the node UUID to which this object refers
1899
    @ivar volumes: a structure as returned from
1900
        L{ganeti.backend.GetVolumeList} (runtime)
1901
    @ivar instances: a list of running instances (runtime)
1902
    @ivar pinst: list of configured primary instances (config)
1903
    @ivar sinst: list of configured secondary instances (config)
1904
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1905
        instances for which this node is secondary (config)
1906
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1907
    @ivar dfree: free disk, as reported by the node (runtime)
1908
    @ivar offline: the offline status (config)
1909
    @type rpc_fail: boolean
1910
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1911
        not whether the individual keys were correct) (runtime)
1912
    @type lvm_fail: boolean
1913
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1914
    @type hyp_fail: boolean
1915
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1916
    @type ghost: boolean
1917
    @ivar ghost: whether this is a known node or not (config)
1918
    @type os_fail: boolean
1919
    @ivar os_fail: whether the RPC call didn't return valid OS data
1920
    @type oslist: list
1921
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1922
    @type vm_capable: boolean
1923
    @ivar vm_capable: whether the node can host instances
1924
    @type pv_min: float
1925
    @ivar pv_min: size in MiB of the smallest PVs
1926
    @type pv_max: float
1927
    @ivar pv_max: size in MiB of the biggest PVs
1928

1929
    """
1930
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1931
      self.uuid = uuid
1932
      self.volumes = {}
1933
      self.instances = []
1934
      self.pinst = []
1935
      self.sinst = []
1936
      self.sbp = {}
1937
      self.mfree = 0
1938
      self.dfree = 0
1939
      self.offline = offline
1940
      self.vm_capable = vm_capable
1941
      self.rpc_fail = False
1942
      self.lvm_fail = False
1943
      self.hyp_fail = False
1944
      self.ghost = False
1945
      self.os_fail = False
1946
      self.oslist = {}
1947
      self.pv_min = None
1948
      self.pv_max = None
1949

    
1950
  def ExpandNames(self):
1951
    # This raises errors.OpPrereqError on its own:
1952
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1953

    
1954
    # Get instances in node group; this is unsafe and needs verification later
1955
    inst_uuids = \
1956
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1957

    
1958
    self.needed_locks = {
1959
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1960
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1961
      locking.LEVEL_NODE: [],
1962

    
1963
      # This opcode is run by watcher every five minutes and acquires all nodes
1964
      # for a group. It doesn't run for a long time, so it's better to acquire
1965
      # the node allocation lock as well.
1966
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1967
      }
1968

    
1969
    self.share_locks = ShareAll()
1970

    
1971
  def DeclareLocks(self, level):
1972
    if level == locking.LEVEL_NODE:
1973
      # Get members of node group; this is unsafe and needs verification later
1974
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1975

    
1976
      # In Exec(), we warn about mirrored instances that have primary and
1977
      # secondary living in separate node groups. To fully verify that
1978
      # volumes for these instances are healthy, we will need to do an
1979
      # extra call to their secondaries. We ensure here those nodes will
1980
      # be locked.
1981
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1982
        # Important: access only the instances whose lock is owned
1983
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1984
        if instance.disk_template in constants.DTS_INT_MIRROR:
1985
          nodes.update(instance.secondary_nodes)
1986

    
1987
      self.needed_locks[locking.LEVEL_NODE] = nodes
1988

    
1989
  def CheckPrereq(self):
1990
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1991
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1992

    
1993
    group_node_uuids = set(self.group_info.members)
1994
    group_inst_uuids = \
1995
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1996

    
1997
    unlocked_node_uuids = \
1998
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1999

    
2000
    unlocked_inst_uuids = \
2001
        group_inst_uuids.difference(
2002
          [self.cfg.GetInstanceInfoByName(name).uuid
2003
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
2004

    
2005
    if unlocked_node_uuids:
2006
      raise errors.OpPrereqError(
2007
        "Missing lock for nodes: %s" %
2008
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
2009
        errors.ECODE_STATE)
2010

    
2011
    if unlocked_inst_uuids:
2012
      raise errors.OpPrereqError(
2013
        "Missing lock for instances: %s" %
2014
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
2015
        errors.ECODE_STATE)
2016

    
2017
    self.all_node_info = self.cfg.GetAllNodesInfo()
2018
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
2019

    
2020
    self.my_node_uuids = group_node_uuids
2021
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
2022
                             for node_uuid in group_node_uuids)
2023

    
2024
    self.my_inst_uuids = group_inst_uuids
2025
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
2026
                             for inst_uuid in group_inst_uuids)
2027

    
2028
    # We detect here the nodes that will need the extra RPC calls for verifying
2029
    # split LV volumes; they should be locked.
2030
    extra_lv_nodes = set()
2031

    
2032
    for inst in self.my_inst_info.values():
2033
      if inst.disk_template in constants.DTS_INT_MIRROR:
2034
        for nuuid in inst.all_nodes:
2035
          if self.all_node_info[nuuid].group != self.group_uuid:
2036
            extra_lv_nodes.add(nuuid)
2037

    
2038
    unlocked_lv_nodes = \
2039
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
2040

    
2041
    if unlocked_lv_nodes:
2042
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
2043
                                 utils.CommaJoin(unlocked_lv_nodes),
2044
                                 errors.ECODE_STATE)
2045
    self.extra_lv_nodes = list(extra_lv_nodes)
2046

    
2047
  def _VerifyNode(self, ninfo, nresult):
2048
    """Perform some basic validation on data returned from a node.
2049

2050
      - check the result data structure is well formed and has all the
2051
        mandatory fields
2052
      - check ganeti version
2053

2054
    @type ninfo: L{objects.Node}
2055
    @param ninfo: the node to check
2056
    @param nresult: the results from the node
2057
    @rtype: boolean
2058
    @return: whether overall this call was successful (and we can expect
2059
         reasonable values in the respose)
2060

2061
    """
2062
    # main result, nresult should be a non-empty dict
2063
    test = not nresult or not isinstance(nresult, dict)
2064
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
2065
                  "unable to verify node: no data returned")
2066
    if test:
2067
      return False
2068

    
2069
    # compares ganeti version
2070
    local_version = constants.PROTOCOL_VERSION
2071
    remote_version = nresult.get("version", None)
2072
    test = not (remote_version and
2073
                isinstance(remote_version, (list, tuple)) and
2074
                len(remote_version) == 2)
2075
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
2076
                  "connection to node returned invalid data")
2077
    if test:
2078
      return False
2079

    
2080
    test = local_version != remote_version[0]
2081
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
2082
                  "incompatible protocol versions: master %s,"
2083
                  " node %s", local_version, remote_version[0])
2084
    if test:
2085
      return False
2086

    
2087
    # node seems compatible, we can actually try to look into its results
2088

    
2089
    # full package version
2090
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
2091
                  constants.CV_ENODEVERSION, ninfo.name,
2092
                  "software version mismatch: master %s, node %s",
2093
                  constants.RELEASE_VERSION, remote_version[1],
2094
                  code=self.ETYPE_WARNING)
2095

    
2096
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
2097
    if ninfo.vm_capable and isinstance(hyp_result, dict):
2098
      for hv_name, hv_result in hyp_result.iteritems():
2099
        test = hv_result is not None
2100
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2101
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
2102

    
2103
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
2104
    if ninfo.vm_capable and isinstance(hvp_result, list):
2105
      for item, hv_name, hv_result in hvp_result:
2106
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
2107
                      "hypervisor %s parameter verify failure (source %s): %s",
2108
                      hv_name, item, hv_result)
2109

    
2110
    test = nresult.get(constants.NV_NODESETUP,
2111
                       ["Missing NODESETUP results"])
2112
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
2113
                  "node setup error: %s", "; ".join(test))
2114

    
2115
    return True
2116

    
2117
  def _VerifyNodeTime(self, ninfo, nresult,
2118
                      nvinfo_starttime, nvinfo_endtime):
2119
    """Check the node time.
2120

2121
    @type ninfo: L{objects.Node}
2122
    @param ninfo: the node to check
2123
    @param nresult: the remote results for the node
2124
    @param nvinfo_starttime: the start time of the RPC call
2125
    @param nvinfo_endtime: the end time of the RPC call
2126

2127
    """
2128
    ntime = nresult.get(constants.NV_TIME, None)
2129
    try:
2130
      ntime_merged = utils.MergeTime(ntime)
2131
    except (ValueError, TypeError):
2132
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
2133
                    "Node returned invalid time")
2134
      return
2135

    
2136
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
2137
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
2138
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
2139
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
2140
    else:
2141
      ntime_diff = None
2142

    
2143
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
2144
                  "Node time diverges by at least %s from master node time",
2145
                  ntime_diff)
2146

    
2147
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
2148
    """Check the node LVM results and update info for cross-node checks.
2149

2150
    @type ninfo: L{objects.Node}
2151
    @param ninfo: the node to check
2152
    @param nresult: the remote results for the node
2153
    @param vg_name: the configured VG name
2154
    @type nimg: L{NodeImage}
2155
    @param nimg: node image
2156

2157
    """
2158
    if vg_name is None:
2159
      return
2160

    
2161
    # checks vg existence and size > 20G
2162
    vglist = nresult.get(constants.NV_VGLIST, None)
2163
    test = not vglist
2164
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2165
                  "unable to check volume groups")
2166
    if not test:
2167
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
2168
                                            constants.MIN_VG_SIZE)
2169
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
2170

    
2171
    # Check PVs
2172
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
2173
    for em in errmsgs:
2174
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
2175
    if pvminmax is not None:
2176
      (nimg.pv_min, nimg.pv_max) = pvminmax
2177

    
2178
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
2179
    """Check cross-node DRBD version consistency.
2180

2181
    @type node_verify_infos: dict
2182
    @param node_verify_infos: infos about nodes as returned from the
2183
      node_verify call.
2184

2185
    """
2186
    node_versions = {}
2187
    for node_uuid, ndata in node_verify_infos.items():
2188
      nresult = ndata.payload
2189
      if nresult:
2190
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
2191
        node_versions[node_uuid] = version
2192

    
2193
    if len(set(node_versions.values())) > 1:
2194
      for node_uuid, version in sorted(node_versions.items()):
2195
        msg = "DRBD version mismatch: %s" % version
2196
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
2197
                    code=self.ETYPE_WARNING)
2198

    
2199
  def _VerifyGroupLVM(self, node_image, vg_name):
2200
    """Check cross-node consistency in LVM.
2201

2202
    @type node_image: dict
2203
    @param node_image: info about nodes, mapping from node to names to
2204
      L{NodeImage} objects
2205
    @param vg_name: the configured VG name
2206

2207
    """
2208
    if vg_name is None:
2209
      return
2210

    
2211
    # Only exclusive storage needs this kind of checks
2212
    if not self._exclusive_storage:
2213
      return
2214

    
2215
    # exclusive_storage wants all PVs to have the same size (approximately),
2216
    # if the smallest and the biggest ones are okay, everything is fine.
2217
    # pv_min is None iff pv_max is None
2218
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
2219
    if not vals:
2220
      return
2221
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
2222
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
2223
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
2224
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
2225
                  "PV sizes differ too much in the group; smallest (%s MB) is"
2226
                  " on %s, biggest (%s MB) is on %s",
2227
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
2228
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
2229

    
2230
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
2231
    """Check the node bridges.
2232

2233
    @type ninfo: L{objects.Node}
2234
    @param ninfo: the node to check
2235
    @param nresult: the remote results for the node
2236
    @param bridges: the expected list of bridges
2237

2238
    """
2239
    if not bridges:
2240
      return
2241

    
2242
    missing = nresult.get(constants.NV_BRIDGES, None)
2243
    test = not isinstance(missing, list)
2244
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2245
                  "did not return valid bridge information")
2246
    if not test:
2247
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
2248
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
2249

    
2250
  def _VerifyNodeUserScripts(self, ninfo, nresult):
2251
    """Check the results of user scripts presence and executability on the node
2252

2253
    @type ninfo: L{objects.Node}
2254
    @param ninfo: the node to check
2255
    @param nresult: the remote results for the node
2256

2257
    """
2258
    test = not constants.NV_USERSCRIPTS in nresult
2259
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2260
                  "did not return user scripts information")
2261

    
2262
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2263
    if not test:
2264
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2265
                    "user scripts not present or not executable: %s" %
2266
                    utils.CommaJoin(sorted(broken_scripts)))
2267

    
2268
  def _VerifyNodeNetwork(self, ninfo, nresult):
2269
    """Check the node network connectivity results.
2270

2271
    @type ninfo: L{objects.Node}
2272
    @param ninfo: the node to check
2273
    @param nresult: the remote results for the node
2274

2275
    """
2276
    test = constants.NV_NODELIST not in nresult
2277
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2278
                  "node hasn't returned node ssh connectivity data")
2279
    if not test:
2280
      if nresult[constants.NV_NODELIST]:
2281
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2282
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2283
                        "ssh communication with node '%s': %s", a_node, a_msg)
2284

    
2285
    test = constants.NV_NODENETTEST not in nresult
2286
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2287
                  "node hasn't returned node tcp connectivity data")
2288
    if not test:
2289
      if nresult[constants.NV_NODENETTEST]:
2290
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2291
        for anode in nlist:
2292
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2293
                        "tcp communication with node '%s': %s",
2294
                        anode, nresult[constants.NV_NODENETTEST][anode])
2295

    
2296
    test = constants.NV_MASTERIP not in nresult
2297
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2298
                  "node hasn't returned node master IP reachability data")
2299
    if not test:
2300
      if not nresult[constants.NV_MASTERIP]:
2301
        if ninfo.uuid == self.master_node:
2302
          msg = "the master node cannot reach the master IP (not configured?)"
2303
        else:
2304
          msg = "cannot reach the master IP"
2305
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2306

    
2307
  def _VerifyInstance(self, instance, node_image, diskstatus):
2308
    """Verify an instance.
2309

2310
    This function checks to see if the required block devices are
2311
    available on the instance's node, and that the nodes are in the correct
2312
    state.
2313

2314
    """
2315
    pnode_uuid = instance.primary_node
2316
    pnode_img = node_image[pnode_uuid]
2317
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2318

    
2319
    node_vol_should = {}
2320
    instance.MapLVsByNode(node_vol_should)
2321

    
2322
    cluster = self.cfg.GetClusterInfo()
2323
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2324
                                                            self.group_info)
2325
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2326
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2327
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2328

    
2329
    for node_uuid in node_vol_should:
2330
      n_img = node_image[node_uuid]
2331
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2332
        # ignore missing volumes on offline or broken nodes
2333
        continue
2334
      for volume in node_vol_should[node_uuid]:
2335
        test = volume not in n_img.volumes
2336
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2337
                      "volume %s missing on node %s", volume,
2338
                      self.cfg.GetNodeName(node_uuid))
2339

    
2340
    if instance.admin_state == constants.ADMINST_UP:
2341
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2342
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2343
                    "instance not running on its primary node %s",
2344
                     self.cfg.GetNodeName(pnode_uuid))
2345
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2346
                    instance.name, "instance is marked as running and lives on"
2347
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2348

    
2349
    diskdata = [(nname, success, status, idx)
2350
                for (nname, disks) in diskstatus.items()
2351
                for idx, (success, status) in enumerate(disks)]
2352

    
2353
    for nname, success, bdev_status, idx in diskdata:
2354
      # the 'ghost node' construction in Exec() ensures that we have a
2355
      # node here
2356
      snode = node_image[nname]
2357
      bad_snode = snode.ghost or snode.offline
2358
      self._ErrorIf(instance.disks_active and
2359
                    not success and not bad_snode,
2360
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2361
                    "couldn't retrieve status for disk/%s on %s: %s",
2362
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2363

    
2364
      if instance.disks_active and success and \
2365
         (bdev_status.is_degraded or
2366
          bdev_status.ldisk_status != constants.LDS_OKAY):
2367
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2368
        if bdev_status.is_degraded:
2369
          msg += " is degraded"
2370
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2371
          msg += "; state is '%s'" % \
2372
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2373

    
2374
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2375

    
2376
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2377
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2378
                  "instance %s, connection to primary node failed",
2379
                  instance.name)
2380

    
2381
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2382
                  constants.CV_EINSTANCELAYOUT, instance.name,
2383
                  "instance has multiple secondary nodes: %s",
2384
                  utils.CommaJoin(instance.secondary_nodes),
2385
                  code=self.ETYPE_WARNING)
2386

    
2387
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2388
    if any(es_flags.values()):
2389
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2390
        # Disk template not compatible with exclusive_storage: no instance
2391
        # node should have the flag set
2392
        es_nodes = [n
2393
                    for (n, es) in es_flags.items()
2394
                    if es]
2395
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2396
                    "instance has template %s, which is not supported on nodes"
2397
                    " that have exclusive storage set: %s",
2398
                    instance.disk_template,
2399
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2400
      for (idx, disk) in enumerate(instance.disks):
2401
        self._ErrorIf(disk.spindles is None,
2402
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2403
                      "number of spindles not configured for disk %s while"
2404
                      " exclusive storage is enabled, try running"
2405
                      " gnt-cluster repair-disk-sizes", idx)
2406

    
2407
    if instance.disk_template in constants.DTS_INT_MIRROR:
2408
      instance_nodes = utils.NiceSort(instance.all_nodes)
2409
      instance_groups = {}
2410

    
2411
      for node_uuid in instance_nodes:
2412
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2413
                                   []).append(node_uuid)
2414

    
2415
      pretty_list = [
2416
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2417
                           groupinfo[group].name)
2418
        # Sort so that we always list the primary node first.
2419
        for group, nodes in sorted(instance_groups.items(),
2420
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2421
                                   reverse=True)]
2422

    
2423
      self._ErrorIf(len(instance_groups) > 1,
2424
                    constants.CV_EINSTANCESPLITGROUPS,
2425
                    instance.name, "instance has primary and secondary nodes in"
2426
                    " different groups: %s", utils.CommaJoin(pretty_list),
2427
                    code=self.ETYPE_WARNING)
2428

    
2429
    inst_nodes_offline = []
2430
    for snode in instance.secondary_nodes:
2431
      s_img = node_image[snode]
2432
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2433
                    self.cfg.GetNodeName(snode),
2434
                    "instance %s, connection to secondary node failed",
2435
                    instance.name)
2436

    
2437
      if s_img.offline:
2438
        inst_nodes_offline.append(snode)
2439

    
2440
    # warn that the instance lives on offline nodes
2441
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2442
                  instance.name, "instance has offline secondary node(s) %s",
2443
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2444
    # ... or ghost/non-vm_capable nodes
2445
    for node_uuid in instance.all_nodes:
2446
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2447
                    instance.name, "instance lives on ghost node %s",
2448
                    self.cfg.GetNodeName(node_uuid))
2449
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2450
                    constants.CV_EINSTANCEBADNODE, instance.name,
2451
                    "instance lives on non-vm_capable node %s",
2452
                    self.cfg.GetNodeName(node_uuid))
2453

    
2454
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2455
    """Verify if there are any unknown volumes in the cluster.
2456

2457
    The .os, .swap and backup volumes are ignored. All other volumes are
2458
    reported as unknown.
2459

2460
    @type reserved: L{ganeti.utils.FieldSet}
2461
    @param reserved: a FieldSet of reserved volume names
2462

2463
    """
2464
    for node_uuid, n_img in node_image.items():
2465
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2466
          self.all_node_info[node_uuid].group != self.group_uuid):
2467
        # skip non-healthy nodes
2468
        continue
2469
      for volume in n_img.volumes:
2470
        test = ((node_uuid not in node_vol_should or
2471
                volume not in node_vol_should[node_uuid]) and
2472
                not reserved.Matches(volume))
2473
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2474
                      self.cfg.GetNodeName(node_uuid),
2475
                      "volume %s is unknown", volume,
2476
                      code=_VerifyErrors.ETYPE_WARNING)
2477

    
2478
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2479
    """Verify N+1 Memory Resilience.
2480

2481
    Check that if one single node dies we can still start all the
2482
    instances it was primary for.
2483

2484
    """
2485
    cluster_info = self.cfg.GetClusterInfo()
2486
    for node_uuid, n_img in node_image.items():
2487
      # This code checks that every node which is now listed as
2488
      # secondary has enough memory to host all instances it is
2489
      # supposed to should a single other node in the cluster fail.
2490
      # FIXME: not ready for failover to an arbitrary node
2491
      # FIXME: does not support file-backed instances
2492
      # WARNING: we currently take into account down instances as well
2493
      # as up ones, considering that even if they're down someone
2494
      # might want to start them even in the event of a node failure.
2495
      if n_img.offline or \
2496
         self.all_node_info[node_uuid].group != self.group_uuid:
2497
        # we're skipping nodes marked offline and nodes in other groups from
2498
        # the N+1 warning, since most likely we don't have good memory
2499
        # information from them; we already list instances living on such
2500
        # nodes, and that's enough warning
2501
        continue
2502
      #TODO(dynmem): also consider ballooning out other instances
2503
      for prinode, inst_uuids in n_img.sbp.items():
2504
        needed_mem = 0
2505
        for inst_uuid in inst_uuids:
2506
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2507
          if bep[constants.BE_AUTO_BALANCE]:
2508
            needed_mem += bep[constants.BE_MINMEM]
2509
        test = n_img.mfree < needed_mem
2510
        self._ErrorIf(test, constants.CV_ENODEN1,
2511
                      self.cfg.GetNodeName(node_uuid),
2512
                      "not enough memory to accomodate instance failovers"
2513
                      " should node %s fail (%dMiB needed, %dMiB available)",
2514
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2515

    
2516
  def _VerifyClientCertificates(self, nodes, all_nvinfo):
2517
    """Verifies the consistency of the client certificates.
2518

2519
    This includes several aspects:
2520
      - the individual validation of all nodes' certificates
2521
      - the consistency of the master candidate certificate map
2522
      - the consistency of the master candidate certificate map with the
2523
        certificates that the master candidates are actually using.
2524

2525
    @param nodes: the list of nodes to consider in this verification
2526
    @param all_nvinfo: the map of results of the verify_node call to
2527
      all nodes
2528

2529
    """
2530
    candidate_certs = self.cfg.GetClusterInfo().candidate_certs
2531
    if candidate_certs is None or len(candidate_certs) == 0:
2532
      self._ErrorIf(
2533
        True, constants.CV_ECLUSTERCLIENTCERT, None,
2534
        "The cluster's list of master candidate certificates is empty."
2535
        " If you just updated the cluster, please run"
2536
        " 'gnt-cluster renew-crypto --new-node-certificates'.")
2537
      return
2538

    
2539
    self._ErrorIf(
2540
      len(candidate_certs) != len(set(candidate_certs.values())),
2541
      constants.CV_ECLUSTERCLIENTCERT, None,
2542
      "There are at least two master candidates configured to use the same"
2543
      " certificate.")
2544

    
2545
    # collect the client certificate
2546
    for node in nodes:
2547
      if node.offline:
2548
        continue
2549

    
2550
      nresult = all_nvinfo[node.uuid]
2551
      if nresult.fail_msg or not nresult.payload:
2552
        continue
2553

    
2554
      (errcode, msg) = nresult.payload.get(constants.NV_CLIENT_CERT, None)
2555

    
2556
      self._ErrorIf(
2557
        errcode is not None, constants.CV_ECLUSTERCLIENTCERT, None,
2558
        "Client certificate of node '%s' failed validation: %s (code '%s')",
2559
        node.uuid, msg, errcode)
2560

    
2561
      if not errcode:
2562
        digest = msg
2563
        if node.master_candidate:
2564
          if node.uuid in candidate_certs:
2565
            self._ErrorIf(
2566
              digest != candidate_certs[node.uuid],
2567
              constants.CV_ECLUSTERCLIENTCERT, None,
2568
              "Client certificate digest of master candidate '%s' does not"
2569
              " match its entry in the cluster's map of master candidate"
2570
              " certificates. Expected: %s Got: %s", node.uuid,
2571
              digest, candidate_certs[node.uuid])
2572
          else:
2573
            self._ErrorIf(
2574
              True, constants.CV_ECLUSTERCLIENTCERT, None,
2575
              "The master candidate '%s' does not have an entry in the"
2576
              " map of candidate certificates.", node.uuid)
2577
            self._ErrorIf(
2578
              digest in candidate_certs.values(),
2579
              constants.CV_ECLUSTERCLIENTCERT, None,
2580
              "Master candidate '%s' is using a certificate of another node.",
2581
              node.uuid)
2582
        else:
2583
          self._ErrorIf(
2584
            node.uuid in candidate_certs,
2585
            constants.CV_ECLUSTERCLIENTCERT, None,
2586
            "Node '%s' is not a master candidate, but still listed in the"
2587
            " map of master candidate certificates.", node.uuid)
2588
          self._ErrorIf(
2589
            (node.uuid not in candidate_certs) and
2590
              (digest in candidate_certs.values()),
2591
            constants.CV_ECLUSTERCLIENTCERT, None,
2592
            "Node '%s' is not a master candidate and is incorrectly using a"
2593
            " certificate of another node which is master candidate.",
2594
            node.uuid)
2595

    
2596
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2597
                   (files_all, files_opt, files_mc, files_vm)):
2598
    """Verifies file checksums collected from all nodes.
2599

2600
    @param nodes: List of L{objects.Node} objects
2601
    @param master_node_uuid: UUID of master node
2602
    @param all_nvinfo: RPC results
2603

2604
    """
2605
    # Define functions determining which nodes to consider for a file
2606
    files2nodefn = [
2607
      (files_all, None),
2608
      (files_mc, lambda node: (node.master_candidate or
2609
                               node.uuid == master_node_uuid)),
2610
      (files_vm, lambda node: node.vm_capable),
2611
      ]
2612

    
2613
    # Build mapping from filename to list of nodes which should have the file
2614
    nodefiles = {}
2615
    for (files, fn) in files2nodefn:
2616
      if fn is None:
2617
        filenodes = nodes
2618
      else:
2619
        filenodes = filter(fn, nodes)
2620
      nodefiles.update((filename,
2621
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2622
                       for filename in files)
2623

    
2624
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2625

    
2626
    fileinfo = dict((filename, {}) for filename in nodefiles)
2627
    ignore_nodes = set()
2628

    
2629
    for node in nodes:
2630
      if node.offline:
2631
        ignore_nodes.add(node.uuid)
2632
        continue
2633

    
2634
      nresult = all_nvinfo[node.uuid]
2635

    
2636
      if nresult.fail_msg or not nresult.payload:
2637
        node_files = None
2638
      else:
2639
        fingerprints = nresult.payload.get(constants.NV_FILELIST, {})
2640
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2641
                          for (key, value) in fingerprints.items())
2642
        del fingerprints
2643

    
2644
      test = not (node_files and isinstance(node_files, dict))
2645
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2646
                    "Node did not return file checksum data")
2647
      if test:
2648
        ignore_nodes.add(node.uuid)
2649
        continue
2650

    
2651
      # Build per-checksum mapping from filename to nodes having it
2652
      for (filename, checksum) in node_files.items():
2653
        assert filename in nodefiles
2654
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2655

    
2656
    for (filename, checksums) in fileinfo.items():
2657
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2658

    
2659
      # Nodes having the file
2660
      with_file = frozenset(node_uuid
2661
                            for node_uuids in fileinfo[filename].values()
2662
                            for node_uuid in node_uuids) - ignore_nodes
2663

    
2664
      expected_nodes = nodefiles[filename] - ignore_nodes
2665

    
2666
      # Nodes missing file
2667
      missing_file = expected_nodes - with_file
2668

    
2669
      if filename in files_opt:
2670
        # All or no nodes
2671
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2672
                      constants.CV_ECLUSTERFILECHECK, None,
2673
                      "File %s is optional, but it must exist on all or no"
2674
                      " nodes (not found on %s)",
2675
                      filename,
2676
                      utils.CommaJoin(
2677
                        utils.NiceSort(
2678
                          map(self.cfg.GetNodeName, missing_file))))
2679
      else:
2680
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2681
                      "File %s is missing from node(s) %s", filename,
2682
                      utils.CommaJoin(
2683
                        utils.NiceSort(
2684
                          map(self.cfg.GetNodeName, missing_file))))
2685

    
2686
        # Warn if a node has a file it shouldn't
2687
        unexpected = with_file - expected_nodes
2688
        self._ErrorIf(unexpected,
2689
                      constants.CV_ECLUSTERFILECHECK, None,
2690
                      "File %s should not exist on node(s) %s",
2691
                      filename, utils.CommaJoin(
2692
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2693

    
2694
      # See if there are multiple versions of the file
2695
      test = len(checksums) > 1
2696
      if test:
2697
        variants = ["variant %s on %s" %
2698
                    (idx + 1,
2699
                     utils.CommaJoin(utils.NiceSort(
2700
                       map(self.cfg.GetNodeName, node_uuids))))
2701
                    for (idx, (checksum, node_uuids)) in
2702
                      enumerate(sorted(checksums.items()))]
2703
      else:
2704
        variants = []
2705

    
2706
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2707
                    "File %s found with %s different checksums (%s)",
2708
                    filename, len(checksums), "; ".join(variants))
2709

    
2710
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2711
    """Verify the drbd helper.
2712

2713
    """
2714
    if drbd_helper:
2715
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2716
      test = (helper_result is None)
2717
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2718
                    "no drbd usermode helper returned")
2719
      if helper_result:
2720
        status, payload = helper_result
2721
        test = not status
2722
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2723
                      "drbd usermode helper check unsuccessful: %s", payload)
2724
        test = status and (payload != drbd_helper)
2725
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2726
                      "wrong drbd usermode helper: %s", payload)
2727

    
2728
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2729
                      drbd_map):
2730
    """Verifies and the node DRBD status.
2731

2732
    @type ninfo: L{objects.Node}
2733
    @param ninfo: the node to check
2734
    @param nresult: the remote results for the node
2735
    @param instanceinfo: the dict of instances
2736
    @param drbd_helper: the configured DRBD usermode helper
2737
    @param drbd_map: the DRBD map as returned by
2738
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2739

2740
    """
2741
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2742

    
2743
    # compute the DRBD minors
2744
    node_drbd = {}
2745
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2746
      test = inst_uuid not in instanceinfo
2747
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2748
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2749
        # ghost instance should not be running, but otherwise we
2750
        # don't give double warnings (both ghost instance and
2751
        # unallocated minor in use)
2752
      if test:
2753
        node_drbd[minor] = (inst_uuid, False)
2754
      else:
2755
        instance = instanceinfo[inst_uuid]
2756
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2757

    
2758
    # and now check them
2759
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2760
    test = not isinstance(used_minors, (tuple, list))
2761
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2762
                  "cannot parse drbd status file: %s", str(used_minors))
2763
    if test:
2764
      # we cannot check drbd status
2765
      return
2766

    
2767
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2768
      test = minor not in used_minors and must_exist
2769
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2770
                    "drbd minor %d of instance %s is not active", minor,
2771
                    self.cfg.GetInstanceName(inst_uuid))
2772
    for minor in used_minors:
2773
      test = minor not in node_drbd
2774
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2775
                    "unallocated drbd minor %d is in use", minor)
2776

    
2777
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2778
    """Builds the node OS structures.
2779

2780
    @type ninfo: L{objects.Node}
2781
    @param ninfo: the node to check
2782
    @param nresult: the remote results for the node
2783
    @param nimg: the node image object
2784

2785
    """
2786
    remote_os = nresult.get(constants.NV_OSLIST, None)
2787
    test = (not isinstance(remote_os, list) or
2788
            not compat.all(isinstance(v, list) and len(v) == 7
2789
                           for v in remote_os))
2790

    
2791
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2792
                  "node hasn't returned valid OS data")
2793

    
2794
    nimg.os_fail = test
2795

    
2796
    if test:
2797
      return
2798

    
2799
    os_dict = {}
2800

    
2801
    for (name, os_path, status, diagnose,
2802
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2803

    
2804
      if name not in os_dict:
2805
        os_dict[name] = []
2806

    
2807
      # parameters is a list of lists instead of list of tuples due to
2808
      # JSON lacking a real tuple type, fix it:
2809
      parameters = [tuple(v) for v in parameters]
2810
      os_dict[name].append((os_path, status, diagnose,
2811
                            set(variants), set(parameters), set(api_ver)))
2812

    
2813
    nimg.oslist = os_dict
2814

    
2815
  def _VerifyNodeOS(self, ninfo, nimg, base):
2816
    """Verifies the node OS list.
2817

2818
    @type ninfo: L{objects.Node}
2819
    @param ninfo: the node to check
2820
    @param nimg: the node image object
2821
    @param base: the 'template' node we match against (e.g. from the master)
2822

2823
    """
2824
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2825

    
2826
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2827
    for os_name, os_data in nimg.oslist.items():
2828
      assert os_data, "Empty OS status for OS %s?!" % os_name
2829
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2830
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2831
                    "Invalid OS %s (located at %s): %s",
2832
                    os_name, f_path, f_diag)
2833
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2834
                    "OS '%s' has multiple entries"
2835
                    " (first one shadows the rest): %s",
2836
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2837
      # comparisons with the 'base' image
2838
      test = os_name not in base.oslist
2839
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2840
                    "Extra OS %s not present on reference node (%s)",
2841
                    os_name, self.cfg.GetNodeName(base.uuid))
2842
      if test:
2843
        continue
2844
      assert base.oslist[os_name], "Base node has empty OS status?"
2845
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2846
      if not b_status:
2847
        # base OS is invalid, skipping
2848
        continue
2849
      for kind, a, b in [("API version", f_api, b_api),
2850
                         ("variants list", f_var, b_var),
2851
                         ("parameters", beautify_params(f_param),
2852
                          beautify_params(b_param))]:
2853
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2854
                      "OS %s for %s differs from reference node %s:"
2855
                      " [%s] vs. [%s]", kind, os_name,
2856
                      self.cfg.GetNodeName(base.uuid),
2857
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2858

    
2859
    # check any missing OSes
2860
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2861
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2862
                  "OSes present on reference node %s"
2863
                  " but missing on this node: %s",
2864
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2865

    
2866
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2867
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2868

2869
    @type ninfo: L{objects.Node}
2870
    @param ninfo: the node to check
2871
    @param nresult: the remote results for the node
2872
    @type is_master: bool
2873
    @param is_master: Whether node is the master node
2874

2875
    """
2876
    cluster = self.cfg.GetClusterInfo()
2877
    if (is_master and
2878
        (cluster.IsFileStorageEnabled() or
2879
         cluster.IsSharedFileStorageEnabled())):
2880
      try:
2881
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2882
      except KeyError:
2883
        # This should never happen
2884
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2885
                      "Node did not return forbidden file storage paths")
2886
      else:
2887
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2888
                      "Found forbidden file storage paths: %s",
2889
                      utils.CommaJoin(fspaths))
2890
    else:
2891
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2892
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2893
                    "Node should not have returned forbidden file storage"
2894
                    " paths")
2895

    
2896
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2897
                          verify_key, error_key):
2898
    """Verifies (file) storage paths.
2899

2900
    @type ninfo: L{objects.Node}
2901
    @param ninfo: the node to check
2902
    @param nresult: the remote results for the node
2903
    @type file_disk_template: string
2904
    @param file_disk_template: file-based disk template, whose directory
2905
        is supposed to be verified
2906
    @type verify_key: string
2907
    @param verify_key: key for the verification map of this file
2908
        verification step
2909
    @param error_key: error key to be added to the verification results
2910
        in case something goes wrong in this verification step
2911

2912
    """
2913
    assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
2914
              constants.ST_FILE, constants.ST_SHARED_FILE
2915
           ))
2916

    
2917
    cluster = self.cfg.GetClusterInfo()
2918
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2919
      self._ErrorIf(
2920
          verify_key in nresult,
2921
          error_key, ninfo.name,
2922
          "The configured %s storage path is unusable: %s" %
2923
          (file_disk_template, nresult.get(verify_key)))
2924

    
2925
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2926
    """Verifies (file) storage paths.
2927

2928
    @see: C{_VerifyStoragePaths}
2929

2930
    """
2931
    self._VerifyStoragePaths(
2932
        ninfo, nresult, constants.DT_FILE,
2933
        constants.NV_FILE_STORAGE_PATH,
2934
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2935

    
2936
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2937
    """Verifies (file) storage paths.
2938

2939
    @see: C{_VerifyStoragePaths}
2940

2941
    """
2942
    self._VerifyStoragePaths(
2943
        ninfo, nresult, constants.DT_SHARED_FILE,
2944
        constants.NV_SHARED_FILE_STORAGE_PATH,
2945
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2946

    
2947
  def _VerifyOob(self, ninfo, nresult):
2948
    """Verifies out of band functionality of a node.
2949

2950
    @type ninfo: L{objects.Node}
2951
    @param ninfo: the node to check
2952
    @param nresult: the remote results for the node
2953

2954
    """
2955
    # We just have to verify the paths on master and/or master candidates
2956
    # as the oob helper is invoked on the master
2957
    if ((ninfo.master_candidate or ninfo.master_capable) and
2958
        constants.NV_OOB_PATHS in nresult):
2959
      for path_result in nresult[constants.NV_OOB_PATHS]:
2960
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2961
                      ninfo.name, path_result)
2962

    
2963
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2964
    """Verifies and updates the node volume data.
2965

2966
    This function will update a L{NodeImage}'s internal structures
2967
    with data from the remote call.
2968

2969
    @type ninfo: L{objects.Node}
2970
    @param ninfo: the node to check
2971
    @param nresult: the remote results for the node
2972
    @param nimg: the node image object
2973
    @param vg_name: the configured VG name
2974

2975
    """
2976
    nimg.lvm_fail = True
2977
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2978
    if vg_name is None:
2979
      pass
2980
    elif isinstance(lvdata, basestring):
2981
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2982
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2983
    elif not isinstance(lvdata, dict):
2984
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2985
                    "rpc call to node failed (lvlist)")
2986
    else:
2987
      nimg.volumes = lvdata
2988
      nimg.lvm_fail = False
2989

    
2990
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2991
    """Verifies and updates the node instance list.
2992

2993
    If the listing was successful, then updates this node's instance
2994
    list. Otherwise, it marks the RPC call as failed for the instance
2995
    list key.
2996

2997
    @type ninfo: L{objects.Node}
2998
    @param ninfo: the node to check
2999
    @param nresult: the remote results for the node
3000
    @param nimg: the node image object
3001

3002
    """
3003
    idata = nresult.get(constants.NV_INSTANCELIST, None)
3004
    test = not isinstance(idata, list)
3005
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
3006
                  "rpc call to node failed (instancelist): %s",
3007
                  utils.SafeEncode(str(idata)))
3008
    if test:
3009
      nimg.hyp_fail = True
3010
    else:
3011
      nimg.instances = [inst.uuid for (_, inst) in
3012
                        self.cfg.GetMultiInstanceInfoByName(idata)]
3013

    
3014
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
3015
    """Verifies and computes a node information map
3016

3017
    @type ninfo: L{objects.Node}
3018
    @param ninfo: the node to check
3019
    @param nresult: the remote results for the node
3020
    @param nimg: the node image object
3021
    @param vg_name: the configured VG name
3022

3023
    """
3024
    # try to read free memory (from the hypervisor)
3025
    hv_info = nresult.get(constants.NV_HVINFO, None)
3026
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
3027
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
3028
                  "rpc call to node failed (hvinfo)")
3029
    if not test:
3030
      try:
3031
        nimg.mfree = int(hv_info["memory_free"])
3032
      except (ValueError, TypeError):
3033
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
3034
                      "node returned invalid nodeinfo, check hypervisor")
3035

    
3036
    # FIXME: devise a free space model for file based instances as well
3037
    if vg_name is not None:
3038
      test = (constants.NV_VGLIST not in nresult or
3039
              vg_name not in nresult[constants.NV_VGLIST])
3040
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
3041
                    "node didn't return data for the volume group '%s'"
3042
                    " - it is either missing or broken", vg_name)
3043
      if not test:
3044
        try:
3045
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
3046
        except (ValueError, TypeError):
3047
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
3048
                        "node returned invalid LVM info, check LVM status")
3049

    
3050
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
3051
    """Gets per-disk status information for all instances.
3052

3053
    @type node_uuids: list of strings
3054
    @param node_uuids: Node UUIDs
3055
    @type node_image: dict of (UUID, L{objects.Node})
3056
    @param node_image: Node objects
3057
    @type instanceinfo: dict of (UUID, L{objects.Instance})
3058
    @param instanceinfo: Instance objects
3059
    @rtype: {instance: {node: [(succes, payload)]}}
3060
    @return: a dictionary of per-instance dictionaries with nodes as
3061
        keys and disk information as values; the disk information is a
3062
        list of tuples (success, payload)
3063

3064
    """
3065
    node_disks = {}
3066
    node_disks_dev_inst_only = {}
3067
    diskless_instances = set()
3068
    nodisk_instances = set()
3069
    diskless = constants.DT_DISKLESS
3070

    
3071
    for nuuid in node_uuids:
3072
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
3073
                                             node_image[nuuid].sinst))
3074
      diskless_instances.update(uuid for uuid in node_inst_uuids
3075
                                if instanceinfo[uuid].disk_template == diskless)
3076
      disks = [(inst_uuid, disk)
3077
               for inst_uuid in node_inst_uuids
3078
               for disk in instanceinfo[inst_uuid].disks]
3079

    
3080
      if not disks:
3081
        nodisk_instances.update(uuid for uuid in node_inst_uuids
3082
                                if instanceinfo[uuid].disk_template != diskless)
3083
        # No need to collect data
3084
        continue
3085

    
3086
      node_disks[nuuid] = disks
3087

    
3088
      # _AnnotateDiskParams makes already copies of the disks
3089
      dev_inst_only = []
3090
      for (inst_uuid, dev) in disks:
3091
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
3092
                                          self.cfg)
3093
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
3094

    
3095
      node_disks_dev_inst_only[nuuid] = dev_inst_only
3096

    
3097
    assert len(node_disks) == len(node_disks_dev_inst_only)
3098

    
3099
    # Collect data from all nodes with disks
3100
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
3101
               node_disks.keys(), node_disks_dev_inst_only)
3102

    
3103
    assert len(result) == len(node_disks)
3104

    
3105
    instdisk = {}
3106

    
3107
    for (nuuid, nres) in result.items():
3108
      node = self.cfg.GetNodeInfo(nuuid)
3109
      disks = node_disks[node.uuid]
3110

    
3111
      if nres.offline:
3112
        # No data from this node
3113
        data = len(disks) * [(False, "node offline")]
3114
      else:
3115
        msg = nres.fail_msg
3116
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
3117
                      "while getting disk information: %s", msg)
3118
        if msg:
3119
          # No data from this node
3120
          data = len(disks) * [(False, msg)]
3121
        else:
3122
          data = []
3123
          for idx, i in enumerate(nres.payload):
3124
            if isinstance(i, (tuple, list)) and len(i) == 2:
3125
              data.append(i)
3126
            else:
3127
              logging.warning("Invalid result from node %s, entry %d: %s",
3128
                              node.name, idx, i)
3129
              data.append((False, "Invalid result from the remote node"))
3130

    
3131
      for ((inst_uuid, _), status) in zip(disks, data):
3132
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
3133
          .append(status)
3134

    
3135
    # Add empty entries for diskless instances.
3136
    for inst_uuid in diskless_instances:
3137
      assert inst_uuid not in instdisk
3138
      instdisk[inst_uuid] = {}
3139
    # ...and disk-full instances that happen to have no disks
3140
    for inst_uuid in nodisk_instances:
3141
      assert inst_uuid not in instdisk
3142
      instdisk[inst_uuid] = {}
3143

    
3144
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
3145
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
3146
                      compat.all(isinstance(s, (tuple, list)) and
3147
                                 len(s) == 2 for s in statuses)
3148
                      for inst, nuuids in instdisk.items()
3149
                      for nuuid, statuses in nuuids.items())
3150
    if __debug__:
3151
      instdisk_keys = set(instdisk)
3152
      instanceinfo_keys = set(instanceinfo)
3153
      assert instdisk_keys == instanceinfo_keys, \
3154
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
3155
         (instdisk_keys, instanceinfo_keys))
3156

    
3157
    return instdisk
3158

    
3159
  @staticmethod
3160
  def _SshNodeSelector(group_uuid, all_nodes):
3161
    """Create endless iterators for all potential SSH check hosts.
3162

3163
    """
3164
    nodes = [node for node in all_nodes
3165
             if (node.group != group_uuid and
3166
                 not node.offline)]
3167
    keyfunc = operator.attrgetter("group")
3168

    
3169
    return map(itertools.cycle,
3170
               [sorted(map(operator.attrgetter("name"), names))
3171
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
3172
                                                  keyfunc)])
3173

    
3174
  @classmethod
3175
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
3176
    """Choose which nodes should talk to which other nodes.
3177

3178
    We will make nodes contact all nodes in their group, and one node from
3179
    every other group.
3180

3181
    @warning: This algorithm has a known issue if one node group is much
3182
      smaller than others (e.g. just one node). In such a case all other
3183
      nodes will talk to the single node.
3184

3185
    """
3186
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
3187
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
3188

    
3189
    return (online_nodes,
3190
            dict((name, sorted([i.next() for i in sel]))
3191
                 for name in online_nodes))
3192

    
3193
  def BuildHooksEnv(self):
3194
    """Build hooks env.
3195

3196
    Cluster-Verify hooks just ran in the post phase and their failure makes
3197
    the output be logged in the verify output and the verification to fail.
3198

3199
    """
3200
    env = {
3201
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
3202
      }
3203

    
3204
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
3205
               for node in self.my_node_info.values())
3206

    
3207
    return env
3208

    
3209
  def BuildHooksNodes(self):
3210
    """Build hooks nodes.
3211

3212
    """
3213
    return ([], list(self.my_node_info.keys()))
3214

    
3215
  def Exec(self, feedback_fn):
3216
    """Verify integrity of the node group, performing various test on nodes.
3217

3218
    """
3219
    # This method has too many local variables. pylint: disable=R0914
3220
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
3221

    
3222
    if not self.my_node_uuids:
3223
      # empty node group
3224
      feedback_fn("* Empty node group, skipping verification")
3225
      return True
3226

    
3227
    self.bad = False
3228
    verbose = self.op.verbose
3229
    self._feedback_fn = feedback_fn
3230

    
3231
    vg_name = self.cfg.GetVGName()
3232
    drbd_helper = self.cfg.GetDRBDHelper()
3233
    cluster = self.cfg.GetClusterInfo()
3234
    hypervisors = cluster.enabled_hypervisors
3235
    node_data_list = self.my_node_info.values()
3236

    
3237
    i_non_redundant = [] # Non redundant instances
3238
    i_non_a_balanced = [] # Non auto-balanced instances
3239
    i_offline = 0 # Count of offline instances
3240
    n_offline = 0 # Count of offline nodes
3241
    n_drained = 0 # Count of nodes being drained
3242
    node_vol_should = {}
3243

    
3244
    # FIXME: verify OS list
3245

    
3246
    # File verification
3247
    filemap = ComputeAncillaryFiles(cluster, False)
3248

    
3249
    # do local checksums
3250
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
3251
    master_ip = self.cfg.GetMasterIP()
3252

    
3253
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
3254

    
3255
    user_scripts = []
3256
    if self.cfg.GetUseExternalMipScript():
3257
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
3258

    
3259
    node_verify_param = {
3260
      constants.NV_FILELIST:
3261
        map(vcluster.MakeVirtualPath,
3262
            utils.UniqueSequence(filename
3263
                                 for files in filemap
3264
                                 for filename in files)),
3265
      constants.NV_NODELIST:
3266
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
3267
                                  self.all_node_info.values()),
3268
      constants.NV_HYPERVISOR: hypervisors,
3269
      constants.NV_HVPARAMS:
3270
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
3271
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
3272
                                 for node in node_data_list
3273
                                 if not node.offline],
3274
      constants.NV_INSTANCELIST: hypervisors,
3275
      constants.NV_VERSION: None,
3276
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
3277
      constants.NV_NODESETUP: None,
3278
      constants.NV_TIME: None,
3279
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
3280
      constants.NV_OSLIST: None,
3281
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
3282
      constants.NV_USERSCRIPTS: user_scripts,
3283
      constants.NV_CLIENT_CERT: None,
3284
      }
3285

    
3286
    if vg_name is not None:
3287
      node_verify_param[constants.NV_VGLIST] = None
3288
      node_verify_param[constants.NV_LVLIST] = vg_name
3289
      node_verify_param[constants.NV_PVLIST] = [vg_name]
3290

    
3291
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
3292
      if drbd_helper:
3293
        node_verify_param[constants.NV_DRBDVERSION] = None
3294
        node_verify_param[constants.NV_DRBDLIST] = None
3295
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
3296

    
3297
    if cluster.IsFileStorageEnabled() or \
3298
        cluster.IsSharedFileStorageEnabled():
3299
      # Load file storage paths only from master node
3300
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
3301
        self.cfg.GetMasterNodeName()
3302
      if cluster.IsFileStorageEnabled():
3303
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
3304
          cluster.file_storage_dir
3305

    
3306
    # bridge checks
3307
    # FIXME: this needs to be changed per node-group, not cluster-wide
3308
    bridges = set()
3309
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
3310
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3311
      bridges.add(default_nicpp[constants.NIC_LINK])
3312
    for inst_uuid in self.my_inst_info.values():
3313
      for nic in inst_uuid.nics:
3314
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
3315
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3316
          bridges.add(full_nic[constants.NIC_LINK])
3317

    
3318
    if bridges:
3319
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
3320

    
3321
    # Build our expected cluster state
3322
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
3323
                                                 uuid=node.uuid,
3324
                                                 vm_capable=node.vm_capable))
3325
                      for node in node_data_list)
3326

    
3327
    # Gather OOB paths
3328
    oob_paths = []
3329
    for node in self.all_node_info.values():
3330
      path = SupportsOob(self.cfg, node)
3331
      if path and path not in oob_paths:
3332
        oob_paths.append(path)
3333

    
3334
    if oob_paths:
3335
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
3336

    
3337
    for inst_uuid in self.my_inst_uuids:
3338
      instance = self.my_inst_info[inst_uuid]
3339
      if instance.admin_state == constants.ADMINST_OFFLINE:
3340
        i_offline += 1
3341

    
3342
      for nuuid in instance.all_nodes:
3343
        if nuuid not in node_image:
3344
          gnode = self.NodeImage(uuid=nuuid)
3345
          gnode.ghost = (nuuid not in self.all_node_info)
3346
          node_image[nuuid] = gnode
3347

    
3348
      instance.MapLVsByNode(node_vol_should)
3349

    
3350
      pnode = instance.primary_node
3351
      node_image[pnode].pinst.append(instance.uuid)
3352

    
3353
      for snode in instance.secondary_nodes:
3354
        nimg = node_image[snode]
3355
        nimg.sinst.append(instance.uuid)
3356
        if pnode not in nimg.sbp:
3357
          nimg.sbp[pnode] = []
3358
        nimg.sbp[pnode].append(instance.uuid)
3359

    
3360
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3361
                                               self.my_node_info.keys())
3362
    # The value of exclusive_storage should be the same across the group, so if
3363
    # it's True for at least a node, we act as if it were set for all the nodes
3364
    self._exclusive_storage = compat.any(es_flags.values())
3365
    if self._exclusive_storage:
3366
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3367

    
3368
    node_group_uuids = dict(map(lambda n: (n.name, n.group),
3369
                                self.cfg.GetAllNodesInfo().values()))
3370
    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
3371

    
3372
    # At this point, we have the in-memory data structures complete,
3373
    # except for the runtime information, which we'll gather next
3374

    
3375
    # Due to the way our RPC system works, exact response times cannot be
3376
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3377
    # time before and after executing the request, we can at least have a time
3378
    # window.
3379
    nvinfo_starttime = time.time()
3380
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3381
                                           node_verify_param,
3382
                                           self.cfg.GetClusterName(),
3383
                                           self.cfg.GetClusterInfo().hvparams,
3384
                                           node_group_uuids,
3385
                                           groups_config)
3386
    nvinfo_endtime = time.time()
3387

    
3388
    if self.extra_lv_nodes and vg_name is not None:
3389
      extra_lv_nvinfo = \
3390
          self.rpc.call_node_verify(self.extra_lv_nodes,
3391
                                    {constants.NV_LVLIST: vg_name},
3392
                                    self.cfg.GetClusterName(),
3393
                                    self.cfg.GetClusterInfo().hvparams,
3394
                                    node_group_uuids,
3395
                                    groups_config)
3396
    else:
3397
      extra_lv_nvinfo = {}
3398

    
3399
    all_drbd_map = self.cfg.ComputeDRBDMap()
3400

    
3401
    feedback_fn("* Gathering disk information (%s nodes)" %
3402
                len(self.my_node_uuids))
3403
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3404
                                     self.my_inst_info)
3405

    
3406
    feedback_fn("* Verifying configuration file consistency")
3407

    
3408
    self._VerifyClientCertificates(self.my_node_info.values(), all_nvinfo)
3409
    # If not all nodes are being checked, we need to make sure the master node
3410
    # and a non-checked vm_capable node are in the list.
3411
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3412
    if absent_node_uuids:
3413
      vf_nvinfo = all_nvinfo.copy()
3414
      vf_node_info = list(self.my_node_info.values())
3415
      additional_node_uuids = []
3416
      if master_node_uuid not in self.my_node_info:
3417
        additional_node_uuids.append(master_node_uuid)
3418
        vf_node_info.append(self.all_node_info[master_node_uuid])
3419
      # Add the first vm_capable node we find which is not included,
3420
      # excluding the master node (which we already have)
3421
      for node_uuid in absent_node_uuids:
3422
        nodeinfo = self.all_node_info[node_uuid]
3423
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3424
            node_uuid != master_node_uuid):
3425
          additional_node_uuids.append(node_uuid)
3426
          vf_node_info.append(self.all_node_info[node_uuid])
3427
          break
3428
      key = constants.NV_FILELIST
3429
      vf_nvinfo.update(self.rpc.call_node_verify(
3430
         additional_node_uuids, {key: node_verify_param[key]},
3431
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
3432
         node_group_uuids,
3433
         groups_config))
3434
    else:
3435
      vf_nvinfo = all_nvinfo
3436
      vf_node_info = self.my_node_info.values()
3437

    
3438
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3439

    
3440
    feedback_fn("* Verifying node status")
3441

    
3442
    refos_img = None
3443

    
3444
    for node_i in node_data_list:
3445
      nimg = node_image[node_i.uuid]
3446

    
3447
      if node_i.offline:
3448
        if verbose:
3449
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3450
        n_offline += 1
3451
        continue
3452

    
3453
      if node_i.uuid == master_node_uuid:
3454
        ntype = "master"
3455
      elif node_i.master_candidate:
3456
        ntype = "master candidate"
3457
      elif node_i.drained:
3458
        ntype = "drained"
3459
        n_drained += 1
3460
      else:
3461
        ntype = "regular"
3462
      if verbose:
3463
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3464

    
3465
      msg = all_nvinfo[node_i.uuid].fail_msg
3466
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3467
                    "while contacting node: %s", msg)
3468
      if msg:
3469
        nimg.rpc_fail = True
3470
        continue
3471

    
3472
      nresult = all_nvinfo[node_i.uuid].payload
3473

    
3474
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3475
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3476
      self._VerifyNodeNetwork(node_i, nresult)
3477
      self._VerifyNodeUserScripts(node_i, nresult)
3478
      self._VerifyOob(node_i, nresult)
3479
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3480
                                           node_i.uuid == master_node_uuid)
3481
      self._VerifyFileStoragePaths(node_i, nresult)
3482
      self._VerifySharedFileStoragePaths(node_i, nresult)
3483

    
3484
      if nimg.vm_capable:
3485
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3486
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3487
                             all_drbd_map)
3488

    
3489
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3490
        self._UpdateNodeInstances(node_i, nresult, nimg)
3491
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3492
        self._UpdateNodeOS(node_i, nresult, nimg)
3493

    
3494
        if not nimg.os_fail:
3495
          if refos_img is None:
3496
            refos_img = nimg
3497
          self._VerifyNodeOS(node_i, nimg, refos_img)
3498
        self._VerifyNodeBridges(node_i, nresult, bridges)
3499

    
3500
        # Check whether all running instances are primary for the node. (This
3501
        # can no longer be done from _VerifyInstance below, since some of the
3502
        # wrong instances could be from other node groups.)
3503
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3504

    
3505
        for inst_uuid in non_primary_inst_uuids:
3506
          test = inst_uuid in self.all_inst_info
3507
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3508
                        self.cfg.GetInstanceName(inst_uuid),
3509
                        "instance should not run on node %s", node_i.name)
3510
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3511
                        "node is running unknown instance %s", inst_uuid)
3512

    
3513
    self._VerifyGroupDRBDVersion(all_nvinfo)
3514
    self._VerifyGroupLVM(node_image, vg_name)
3515

    
3516
    for node_uuid, result in extra_lv_nvinfo.items():
3517
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3518
                              node_image[node_uuid], vg_name)
3519

    
3520
    feedback_fn("* Verifying instance status")
3521
    for inst_uuid in self.my_inst_uuids:
3522
      instance = self.my_inst_info[inst_uuid]
3523
      if verbose:
3524
        feedback_fn("* Verifying instance %s" % instance.name)
3525
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3526

    
3527
      # If the instance is non-redundant we cannot survive losing its primary
3528
      # node, so we are not N+1 compliant.
3529
      if instance.disk_template not in constants.DTS_MIRRORED:
3530
        i_non_redundant.append(instance)
3531

    
3532
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3533
        i_non_a_balanced.append(instance)
3534

    
3535
    feedback_fn("* Verifying orphan volumes")
3536
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3537

    
3538
    # We will get spurious "unknown volume" warnings if any node of this group
3539
    # is secondary for an instance whose primary is in another group. To avoid
3540
    # them, we find these instances and add their volumes to node_vol_should.
3541
    for instance in self.all_inst_info.values():
3542
      for secondary in instance.secondary_nodes:
3543
        if (secondary in self.my_node_info
3544
            and instance.name not in self.my_inst_info):
3545
          instance.MapLVsByNode(node_vol_should)
3546
          break
3547

    
3548
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3549

    
3550
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3551
      feedback_fn("* Verifying N+1 Memory redundancy")
3552
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3553

    
3554
    feedback_fn("* Other Notes")
3555
    if i_non_redundant:
3556
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3557
                  % len(i_non_redundant))
3558

    
3559
    if i_non_a_balanced:
3560
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3561
                  % len(i_non_a_balanced))
3562

    
3563
    if i_offline:
3564
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3565

    
3566
    if n_offline:
3567
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3568

    
3569
    if n_drained:
3570
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3571

    
3572
    return not self.bad
3573

    
3574
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3575
    """Analyze the post-hooks' result
3576

3577
    This method analyses the hook result, handles it, and sends some
3578
    nicely-formatted feedback back to the user.
3579

3580
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3581
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3582
    @param hooks_results: the results of the multi-node hooks rpc call
3583
    @param feedback_fn: function used send feedback back to the caller
3584
    @param lu_result: previous Exec result
3585
    @return: the new Exec result, based on the previous result
3586
        and hook results
3587

3588
    """
3589
    # We only really run POST phase hooks, only for non-empty groups,
3590
    # and are only interested in their results
3591
    if not self.my_node_uuids:
3592
      # empty node group
3593
      pass
3594
    elif phase == constants.HOOKS_PHASE_POST:
3595
      # Used to change hooks' output to proper indentation
3596
      feedback_fn("* Hooks Results")
3597
      assert hooks_results, "invalid result from hooks"
3598

    
3599
      for node_name in hooks_results:
3600
        res = hooks_results[node_name]
3601
        msg = res.fail_msg
3602
        test = msg and not res.offline
3603
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3604
                      "Communication failure in hooks execution: %s", msg)
3605
        if res.offline or msg:
3606
          # No need to investigate payload if node is offline or gave
3607
          # an error.
3608
          continue
3609
        for script, hkr, output in res.payload:
3610
          test = hkr == constants.HKR_FAIL
3611
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3612
                        "Script %s failed, output:", script)
3613
          if test:
3614
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3615
            feedback_fn("%s" % output)
3616
            lu_result = False
3617

    
3618
    return lu_result
3619

    
3620

    
3621
class LUClusterVerifyDisks(NoHooksLU):
3622
  """Verifies the cluster disks status.
3623

3624
  """
3625
  REQ_BGL = False
3626

    
3627
  def ExpandNames(self):
3628
    self.share_locks = ShareAll()
3629
    self.needed_locks = {
3630
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3631
      }
3632

    
3633
  def Exec(self, feedback_fn):
3634
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3635

    
3636
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3637
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3638
                           for group in group_names])