Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ da27bc7d

History | View | Annotate | Download (129 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import copy
25
import itertools
26
import logging
27
import operator
28
import os
29
import re
30
import time
31

    
32
from ganeti import compat
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import hypervisor
36
from ganeti import locking
37
from ganeti import masterd
38
from ganeti import netutils
39
from ganeti import objects
40
from ganeti import opcodes
41
from ganeti import pathutils
42
from ganeti import query
43
import ganeti.rpc.node as rpc
44
from ganeti import runtime
45
from ganeti import ssh
46
from ganeti import uidpool
47
from ganeti import utils
48
from ganeti import vcluster
49

    
50
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
51
  ResultWithJobs
52
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
53
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
54
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
55
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
56
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
57
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
58
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
59
  CheckDiskAccessModeConsistency, CreateNewClientCert
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
def _UpdateMasterClientCert(
65
    lu, master_uuid, cluster, feedback_fn,
66
    client_cert=pathutils.NODED_CLIENT_CERT_FILE,
67
    client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP):
68
  """Renews the master's client certificate and propagates the config.
69

70
  @type lu: C{LogicalUnit}
71
  @param lu: the logical unit holding the config
72
  @type master_uuid: string
73
  @param master_uuid: the master node's UUID
74
  @type cluster: C{objects.Cluster}
75
  @param cluster: the cluster's configuration
76
  @type feedback_fn: function
77
  @param feedback_fn: feedback functions for config updates
78
  @type client_cert: string
79
  @param client_cert: the path of the client certificate
80
  @type client_cert_tmp: string
81
  @param client_cert_tmp: the temporary path of the client certificate
82
  @rtype: string
83
  @return: the digest of the newly created client certificate
84

85
  """
86
  client_digest = CreateNewClientCert(lu, master_uuid, filename=client_cert_tmp)
87
  utils.AddNodeToCandidateCerts(master_uuid, client_digest,
88
                                cluster.candidate_certs)
89
  # This triggers an update of the config and distribution of it with the old
90
  # SSL certificate
91
  lu.cfg.Update(cluster, feedback_fn)
92

    
93
  utils.RemoveFile(client_cert)
94
  utils.RenameFile(client_cert_tmp, client_cert)
95
  return client_digest
96

    
97

    
98
class LUClusterRenewCrypto(NoHooksLU):
99
  """Renew the cluster's crypto tokens.
100

101
  Note that most of this operation is done in gnt_cluster.py, this LU only
102
  takes care of the renewal of the client SSL certificates.
103

104
  """
105
  def Exec(self, feedback_fn):
106
    master_uuid = self.cfg.GetMasterNode()
107
    cluster = self.cfg.GetClusterInfo()
108

    
109
    server_digest = utils.GetCertificateDigest(
110
      cert_filename=pathutils.NODED_CERT_FILE)
111
    utils.AddNodeToCandidateCerts("%s-SERVER" % master_uuid,
112
                                  server_digest,
113
                                  cluster.candidate_certs)
114
    try:
115
      old_master_digest = utils.GetCertificateDigest(
116
        cert_filename=pathutils.NODED_CLIENT_CERT_FILE)
117
      utils.AddNodeToCandidateCerts("%s-OLDMASTER" % master_uuid,
118
                                    old_master_digest,
119
                                    cluster.candidate_certs)
120
    except IOError:
121
      logging.info("No old certificate available.")
122

    
123
    new_master_digest = _UpdateMasterClientCert(self, master_uuid, cluster,
124
                                                feedback_fn)
125

    
126
    utils.AddNodeToCandidateCerts(master_uuid,
127
                                  new_master_digest,
128
                                  cluster.candidate_certs)
129
    nodes = self.cfg.GetAllNodesInfo()
130
    for (node_uuid, node_info) in nodes.items():
131
      if node_uuid != master_uuid:
132
        new_digest = CreateNewClientCert(self, node_uuid)
133
        if node_info.master_candidate:
134
          utils.AddNodeToCandidateCerts(node_uuid,
135
                                        new_digest,
136
                                        cluster.candidate_certs)
137
    utils.RemoveNodeFromCandidateCerts("%s-SERVER" % master_uuid,
138
                                       cluster.candidate_certs)
139
    utils.RemoveNodeFromCandidateCerts("%s-OLDMASTER" % master_uuid,
140
                                       cluster.candidate_certs)
141
    # Trigger another update of the config now with the new master cert
142
    self.cfg.Update(cluster, feedback_fn)
143

    
144

    
145
class LUClusterActivateMasterIp(NoHooksLU):
146
  """Activate the master IP on the master node.
147

148
  """
149
  def Exec(self, feedback_fn):
150
    """Activate the master IP.
151

152
    """
153
    master_params = self.cfg.GetMasterNetworkParameters()
154
    ems = self.cfg.GetUseExternalMipScript()
155
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
156
                                                   master_params, ems)
157
    result.Raise("Could not activate the master IP")
158

    
159

    
160
class LUClusterDeactivateMasterIp(NoHooksLU):
161
  """Deactivate the master IP on the master node.
162

163
  """
164
  def Exec(self, feedback_fn):
165
    """Deactivate the master IP.
166

167
    """
168
    master_params = self.cfg.GetMasterNetworkParameters()
169
    ems = self.cfg.GetUseExternalMipScript()
170
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
171
                                                     master_params, ems)
172
    result.Raise("Could not deactivate the master IP")
173

    
174

    
175
class LUClusterConfigQuery(NoHooksLU):
176
  """Return configuration values.
177

178
  """
179
  REQ_BGL = False
180

    
181
  def CheckArguments(self):
182
    self.cq = ClusterQuery(None, self.op.output_fields, False)
183

    
184
  def ExpandNames(self):
185
    self.cq.ExpandNames(self)
186

    
187
  def DeclareLocks(self, level):
188
    self.cq.DeclareLocks(self, level)
189

    
190
  def Exec(self, feedback_fn):
191
    result = self.cq.OldStyleQuery(self)
192

    
193
    assert len(result) == 1
194

    
195
    return result[0]
196

    
197

    
198
class LUClusterDestroy(LogicalUnit):
199
  """Logical unit for destroying the cluster.
200

201
  """
202
  HPATH = "cluster-destroy"
203
  HTYPE = constants.HTYPE_CLUSTER
204

    
205
  def BuildHooksEnv(self):
206
    """Build hooks env.
207

208
    """
209
    return {
210
      "OP_TARGET": self.cfg.GetClusterName(),
211
      }
212

    
213
  def BuildHooksNodes(self):
214
    """Build hooks nodes.
215

216
    """
217
    return ([], [])
218

    
219
  def CheckPrereq(self):
220
    """Check prerequisites.
221

222
    This checks whether the cluster is empty.
223

224
    Any errors are signaled by raising errors.OpPrereqError.
225

226
    """
227
    master = self.cfg.GetMasterNode()
228

    
229
    nodelist = self.cfg.GetNodeList()
230
    if len(nodelist) != 1 or nodelist[0] != master:
231
      raise errors.OpPrereqError("There are still %d node(s) in"
232
                                 " this cluster." % (len(nodelist) - 1),
233
                                 errors.ECODE_INVAL)
234
    instancelist = self.cfg.GetInstanceList()
235
    if instancelist:
236
      raise errors.OpPrereqError("There are still %d instance(s) in"
237
                                 " this cluster." % len(instancelist),
238
                                 errors.ECODE_INVAL)
239

    
240
  def Exec(self, feedback_fn):
241
    """Destroys the cluster.
242

243
    """
244
    master_params = self.cfg.GetMasterNetworkParameters()
245

    
246
    # Run post hooks on master node before it's removed
247
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
248

    
249
    ems = self.cfg.GetUseExternalMipScript()
250
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
251
                                                     master_params, ems)
252
    result.Warn("Error disabling the master IP address", self.LogWarning)
253
    return master_params.uuid
254

    
255

    
256
class LUClusterPostInit(LogicalUnit):
257
  """Logical unit for running hooks after cluster initialization.
258

259
  """
260
  HPATH = "cluster-init"
261
  HTYPE = constants.HTYPE_CLUSTER
262

    
263
  def CheckArguments(self):
264
    self.master_uuid = self.cfg.GetMasterNode()
265
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
266

    
267
    # TODO: When Issue 584 is solved, and None is properly parsed when used
268
    # as a default value, ndparams.get(.., None) can be changed to
269
    # ndparams[..] to access the values directly
270

    
271
    # OpenvSwitch: Warn user if link is missing
272
    if (self.master_ndparams[constants.ND_OVS] and not
273
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
274
      self.LogInfo("No physical interface for OpenvSwitch was given."
275
                   " OpenvSwitch will not have an outside connection. This"
276
                   " might not be what you want.")
277

    
278
  def BuildHooksEnv(self):
279
    """Build hooks env.
280

281
    """
282
    return {
283
      "OP_TARGET": self.cfg.GetClusterName(),
284
      }
285

    
286
  def BuildHooksNodes(self):
287
    """Build hooks nodes.
288

289
    """
290
    return ([], [self.cfg.GetMasterNode()])
291

    
292
  def Exec(self, feedback_fn):
293
    """Create and configure Open vSwitch
294

295
    """
296
    if self.master_ndparams[constants.ND_OVS]:
297
      result = self.rpc.call_node_configure_ovs(
298
                 self.master_uuid,
299
                 self.master_ndparams[constants.ND_OVS_NAME],
300
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
301
      result.Raise("Could not successully configure Open vSwitch")
302

    
303
    cluster = self.cfg.GetClusterInfo()
304
    _UpdateMasterClientCert(self, self.master_uuid, cluster, feedback_fn)
305

    
306
    return True
307

    
308

    
309
class ClusterQuery(QueryBase):
310
  FIELDS = query.CLUSTER_FIELDS
311

    
312
  #: Do not sort (there is only one item)
313
  SORT_FIELD = None
314

    
315
  def ExpandNames(self, lu):
316
    lu.needed_locks = {}
317

    
318
    # The following variables interact with _QueryBase._GetNames
319
    self.wanted = locking.ALL_SET
320
    self.do_locking = self.use_locking
321

    
322
    if self.do_locking:
323
      raise errors.OpPrereqError("Can not use locking for cluster queries",
324
                                 errors.ECODE_INVAL)
325

    
326
  def DeclareLocks(self, lu, level):
327
    pass
328

    
329
  def _GetQueryData(self, lu):
330
    """Computes the list of nodes and their attributes.
331

332
    """
333
    # Locking is not used
334
    assert not (compat.any(lu.glm.is_owned(level)
335
                           for level in locking.LEVELS
336
                           if level != locking.LEVEL_CLUSTER) or
337
                self.do_locking or self.use_locking)
338

    
339
    if query.CQ_CONFIG in self.requested_data:
340
      cluster = lu.cfg.GetClusterInfo()
341
      nodes = lu.cfg.GetAllNodesInfo()
342
    else:
343
      cluster = NotImplemented
344
      nodes = NotImplemented
345

    
346
    if query.CQ_QUEUE_DRAINED in self.requested_data:
347
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
348
    else:
349
      drain_flag = NotImplemented
350

    
351
    if query.CQ_WATCHER_PAUSE in self.requested_data:
352
      master_node_uuid = lu.cfg.GetMasterNode()
353

    
354
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
355
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
356
                   lu.cfg.GetMasterNodeName())
357

    
358
      watcher_pause = result.payload
359
    else:
360
      watcher_pause = NotImplemented
361

    
362
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
363

    
364

    
365
class LUClusterQuery(NoHooksLU):
366
  """Query cluster configuration.
367

368
  """
369
  REQ_BGL = False
370

    
371
  def ExpandNames(self):
372
    self.needed_locks = {}
373

    
374
  def Exec(self, feedback_fn):
375
    """Return cluster config.
376

377
    """
378
    cluster = self.cfg.GetClusterInfo()
379
    os_hvp = {}
380

    
381
    # Filter just for enabled hypervisors
382
    for os_name, hv_dict in cluster.os_hvp.items():
383
      os_hvp[os_name] = {}
384
      for hv_name, hv_params in hv_dict.items():
385
        if hv_name in cluster.enabled_hypervisors:
386
          os_hvp[os_name][hv_name] = hv_params
387

    
388
    # Convert ip_family to ip_version
389
    primary_ip_version = constants.IP4_VERSION
390
    if cluster.primary_ip_family == netutils.IP6Address.family:
391
      primary_ip_version = constants.IP6_VERSION
392

    
393
    result = {
394
      "software_version": constants.RELEASE_VERSION,
395
      "protocol_version": constants.PROTOCOL_VERSION,
396
      "config_version": constants.CONFIG_VERSION,
397
      "os_api_version": max(constants.OS_API_VERSIONS),
398
      "export_version": constants.EXPORT_VERSION,
399
      "vcs_version": constants.VCS_VERSION,
400
      "architecture": runtime.GetArchInfo(),
401
      "name": cluster.cluster_name,
402
      "master": self.cfg.GetMasterNodeName(),
403
      "default_hypervisor": cluster.primary_hypervisor,
404
      "enabled_hypervisors": cluster.enabled_hypervisors,
405
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
406
                        for hypervisor_name in cluster.enabled_hypervisors]),
407
      "os_hvp": os_hvp,
408
      "beparams": cluster.beparams,
409
      "osparams": cluster.osparams,
410
      "ipolicy": cluster.ipolicy,
411
      "nicparams": cluster.nicparams,
412
      "ndparams": cluster.ndparams,
413
      "diskparams": cluster.diskparams,
414
      "candidate_pool_size": cluster.candidate_pool_size,
415
      "max_running_jobs": cluster.max_running_jobs,
416
      "master_netdev": cluster.master_netdev,
417
      "master_netmask": cluster.master_netmask,
418
      "use_external_mip_script": cluster.use_external_mip_script,
419
      "volume_group_name": cluster.volume_group_name,
420
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
421
      "file_storage_dir": cluster.file_storage_dir,
422
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
423
      "maintain_node_health": cluster.maintain_node_health,
424
      "ctime": cluster.ctime,
425
      "mtime": cluster.mtime,
426
      "uuid": cluster.uuid,
427
      "tags": list(cluster.GetTags()),
428
      "uid_pool": cluster.uid_pool,
429
      "default_iallocator": cluster.default_iallocator,
430
      "default_iallocator_params": cluster.default_iallocator_params,
431
      "reserved_lvs": cluster.reserved_lvs,
432
      "primary_ip_version": primary_ip_version,
433
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
434
      "hidden_os": cluster.hidden_os,
435
      "blacklisted_os": cluster.blacklisted_os,
436
      "enabled_disk_templates": cluster.enabled_disk_templates,
437
      }
438

    
439
    return result
440

    
441

    
442
class LUClusterRedistConf(NoHooksLU):
443
  """Force the redistribution of cluster configuration.
444

445
  This is a very simple LU.
446

447
  """
448
  REQ_BGL = False
449

    
450
  def ExpandNames(self):
451
    self.needed_locks = {
452
      locking.LEVEL_NODE: locking.ALL_SET,
453
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
454
    }
455
    self.share_locks = ShareAll()
456

    
457
  def Exec(self, feedback_fn):
458
    """Redistribute the configuration.
459

460
    """
461
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
462
    RedistributeAncillaryFiles(self)
463

    
464

    
465
class LUClusterRename(LogicalUnit):
466
  """Rename the cluster.
467

468
  """
469
  HPATH = "cluster-rename"
470
  HTYPE = constants.HTYPE_CLUSTER
471

    
472
  def BuildHooksEnv(self):
473
    """Build hooks env.
474

475
    """
476
    return {
477
      "OP_TARGET": self.cfg.GetClusterName(),
478
      "NEW_NAME": self.op.name,
479
      }
480

    
481
  def BuildHooksNodes(self):
482
    """Build hooks nodes.
483

484
    """
485
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
486

    
487
  def CheckPrereq(self):
488
    """Verify that the passed name is a valid one.
489

490
    """
491
    hostname = netutils.GetHostname(name=self.op.name,
492
                                    family=self.cfg.GetPrimaryIPFamily())
493

    
494
    new_name = hostname.name
495
    self.ip = new_ip = hostname.ip
496
    old_name = self.cfg.GetClusterName()
497
    old_ip = self.cfg.GetMasterIP()
498
    if new_name == old_name and new_ip == old_ip:
499
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
500
                                 " cluster has changed",
501
                                 errors.ECODE_INVAL)
502
    if new_ip != old_ip:
503
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
504
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
505
                                   " reachable on the network" %
506
                                   new_ip, errors.ECODE_NOTUNIQUE)
507

    
508
    self.op.name = new_name
509

    
510
  def Exec(self, feedback_fn):
511
    """Rename the cluster.
512

513
    """
514
    clustername = self.op.name
515
    new_ip = self.ip
516

    
517
    # shutdown the master IP
518
    master_params = self.cfg.GetMasterNetworkParameters()
519
    ems = self.cfg.GetUseExternalMipScript()
520
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
521
                                                     master_params, ems)
522
    result.Raise("Could not disable the master role")
523

    
524
    try:
525
      cluster = self.cfg.GetClusterInfo()
526
      cluster.cluster_name = clustername
527
      cluster.master_ip = new_ip
528
      self.cfg.Update(cluster, feedback_fn)
529

    
530
      # update the known hosts file
531
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
532
      node_list = self.cfg.GetOnlineNodeList()
533
      try:
534
        node_list.remove(master_params.uuid)
535
      except ValueError:
536
        pass
537
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
538
    finally:
539
      master_params.ip = new_ip
540
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
541
                                                     master_params, ems)
542
      result.Warn("Could not re-enable the master role on the master,"
543
                  " please restart manually", self.LogWarning)
544

    
545
    return clustername
546

    
547

    
548
class LUClusterRepairDiskSizes(NoHooksLU):
549
  """Verifies the cluster disks sizes.
550

551
  """
552
  REQ_BGL = False
553

    
554
  def ExpandNames(self):
555
    if self.op.instances:
556
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
557
      # Not getting the node allocation lock as only a specific set of
558
      # instances (and their nodes) is going to be acquired
559
      self.needed_locks = {
560
        locking.LEVEL_NODE_RES: [],
561
        locking.LEVEL_INSTANCE: self.wanted_names,
562
        }
563
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
564
    else:
565
      self.wanted_names = None
566
      self.needed_locks = {
567
        locking.LEVEL_NODE_RES: locking.ALL_SET,
568
        locking.LEVEL_INSTANCE: locking.ALL_SET,
569

    
570
        # This opcode is acquires the node locks for all instances
571
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
572
        }
573

    
574
    self.share_locks = {
575
      locking.LEVEL_NODE_RES: 1,
576
      locking.LEVEL_INSTANCE: 0,
577
      locking.LEVEL_NODE_ALLOC: 1,
578
      }
579

    
580
  def DeclareLocks(self, level):
581
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
582
      self._LockInstancesNodes(primary_only=True, level=level)
583

    
584
  def CheckPrereq(self):
585
    """Check prerequisites.
586

587
    This only checks the optional instance list against the existing names.
588

589
    """
590
    if self.wanted_names is None:
591
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
592

    
593
    self.wanted_instances = \
594
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
595

    
596
  def _EnsureChildSizes(self, disk):
597
    """Ensure children of the disk have the needed disk size.
598

599
    This is valid mainly for DRBD8 and fixes an issue where the
600
    children have smaller disk size.
601

602
    @param disk: an L{ganeti.objects.Disk} object
603

604
    """
605
    if disk.dev_type == constants.DT_DRBD8:
606
      assert disk.children, "Empty children for DRBD8?"
607
      fchild = disk.children[0]
608
      mismatch = fchild.size < disk.size
609
      if mismatch:
610
        self.LogInfo("Child disk has size %d, parent %d, fixing",
611
                     fchild.size, disk.size)
612
        fchild.size = disk.size
613

    
614
      # and we recurse on this child only, not on the metadev
615
      return self._EnsureChildSizes(fchild) or mismatch
616
    else:
617
      return False
618

    
619
  def Exec(self, feedback_fn):
620
    """Verify the size of cluster disks.
621

622
    """
623
    # TODO: check child disks too
624
    # TODO: check differences in size between primary/secondary nodes
625
    per_node_disks = {}
626
    for instance in self.wanted_instances:
627
      pnode = instance.primary_node
628
      if pnode not in per_node_disks:
629
        per_node_disks[pnode] = []
630
      for idx, disk in enumerate(instance.disks):
631
        per_node_disks[pnode].append((instance, idx, disk))
632

    
633
    assert not (frozenset(per_node_disks.keys()) -
634
                self.owned_locks(locking.LEVEL_NODE_RES)), \
635
      "Not owning correct locks"
636
    assert not self.owned_locks(locking.LEVEL_NODE)
637

    
638
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
639
                                               per_node_disks.keys())
640

    
641
    changed = []
642
    for node_uuid, dskl in per_node_disks.items():
643
      if not dskl:
644
        # no disks on the node
645
        continue
646

    
647
      newl = [([v[2].Copy()], v[0]) for v in dskl]
648
      node_name = self.cfg.GetNodeName(node_uuid)
649
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
650
      if result.fail_msg:
651
        self.LogWarning("Failure in blockdev_getdimensions call to node"
652
                        " %s, ignoring", node_name)
653
        continue
654
      if len(result.payload) != len(dskl):
655
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
656
                        " result.payload=%s", node_name, len(dskl),
657
                        result.payload)
658
        self.LogWarning("Invalid result from node %s, ignoring node results",
659
                        node_name)
660
        continue
661
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
662
        if dimensions is None:
663
          self.LogWarning("Disk %d of instance %s did not return size"
664
                          " information, ignoring", idx, instance.name)
665
          continue
666
        if not isinstance(dimensions, (tuple, list)):
667
          self.LogWarning("Disk %d of instance %s did not return valid"
668
                          " dimension information, ignoring", idx,
669
                          instance.name)
670
          continue
671
        (size, spindles) = dimensions
672
        if not isinstance(size, (int, long)):
673
          self.LogWarning("Disk %d of instance %s did not return valid"
674
                          " size information, ignoring", idx, instance.name)
675
          continue
676
        size = size >> 20
677
        if size != disk.size:
678
          self.LogInfo("Disk %d of instance %s has mismatched size,"
679
                       " correcting: recorded %d, actual %d", idx,
680
                       instance.name, disk.size, size)
681
          disk.size = size
682
          self.cfg.Update(instance, feedback_fn)
683
          changed.append((instance.name, idx, "size", size))
684
        if es_flags[node_uuid]:
685
          if spindles is None:
686
            self.LogWarning("Disk %d of instance %s did not return valid"
687
                            " spindles information, ignoring", idx,
688
                            instance.name)
689
          elif disk.spindles is None or disk.spindles != spindles:
690
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
691
                         " correcting: recorded %s, actual %s",
692
                         idx, instance.name, disk.spindles, spindles)
693
            disk.spindles = spindles
694
            self.cfg.Update(instance, feedback_fn)
695
            changed.append((instance.name, idx, "spindles", disk.spindles))
696
        if self._EnsureChildSizes(disk):
697
          self.cfg.Update(instance, feedback_fn)
698
          changed.append((instance.name, idx, "size", disk.size))
699
    return changed
700

    
701

    
702
def _ValidateNetmask(cfg, netmask):
703
  """Checks if a netmask is valid.
704

705
  @type cfg: L{config.ConfigWriter}
706
  @param cfg: The cluster configuration
707
  @type netmask: int
708
  @param netmask: the netmask to be verified
709
  @raise errors.OpPrereqError: if the validation fails
710

711
  """
712
  ip_family = cfg.GetPrimaryIPFamily()
713
  try:
714
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
715
  except errors.ProgrammerError:
716
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
717
                               ip_family, errors.ECODE_INVAL)
718
  if not ipcls.ValidateNetmask(netmask):
719
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
720
                               (netmask), errors.ECODE_INVAL)
721

    
722

    
723
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
724
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
725
    file_disk_template):
726
  """Checks whether the given file-based storage directory is acceptable.
727

728
  Note: This function is public, because it is also used in bootstrap.py.
729

730
  @type logging_warn_fn: function
731
  @param logging_warn_fn: function which accepts a string and logs it
732
  @type file_storage_dir: string
733
  @param file_storage_dir: the directory to be used for file-based instances
734
  @type enabled_disk_templates: list of string
735
  @param enabled_disk_templates: the list of enabled disk templates
736
  @type file_disk_template: string
737
  @param file_disk_template: the file-based disk template for which the
738
      path should be checked
739

740
  """
741
  assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
742
            constants.ST_FILE, constants.ST_SHARED_FILE
743
         ))
744
  file_storage_enabled = file_disk_template in enabled_disk_templates
745
  if file_storage_dir is not None:
746
    if file_storage_dir == "":
747
      if file_storage_enabled:
748
        raise errors.OpPrereqError(
749
            "Unsetting the '%s' storage directory while having '%s' storage"
750
            " enabled is not permitted." %
751
            (file_disk_template, file_disk_template))
752
    else:
753
      if not file_storage_enabled:
754
        logging_warn_fn(
755
            "Specified a %s storage directory, although %s storage is not"
756
            " enabled." % (file_disk_template, file_disk_template))
757
  else:
758
    raise errors.ProgrammerError("Received %s storage dir with value"
759
                                 " 'None'." % file_disk_template)
760

    
761

    
762
def CheckFileStoragePathVsEnabledDiskTemplates(
763
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
764
  """Checks whether the given file storage directory is acceptable.
765

766
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
767

768
  """
769
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
770
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
771
      constants.DT_FILE)
772

    
773

    
774
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
775
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
776
  """Checks whether the given shared file storage directory is acceptable.
777

778
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
779

780
  """
781
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
782
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
783
      constants.DT_SHARED_FILE)
784

    
785

    
786
class LUClusterSetParams(LogicalUnit):
787
  """Change the parameters of the cluster.
788

789
  """
790
  HPATH = "cluster-modify"
791
  HTYPE = constants.HTYPE_CLUSTER
792
  REQ_BGL = False
793

    
794
  def CheckArguments(self):
795
    """Check parameters
796

797
    """
798
    if self.op.uid_pool:
799
      uidpool.CheckUidPool(self.op.uid_pool)
800

    
801
    if self.op.add_uids:
802
      uidpool.CheckUidPool(self.op.add_uids)
803

    
804
    if self.op.remove_uids:
805
      uidpool.CheckUidPool(self.op.remove_uids)
806

    
807
    if self.op.master_netmask is not None:
808
      _ValidateNetmask(self.cfg, self.op.master_netmask)
809

    
810
    if self.op.diskparams:
811
      for dt_params in self.op.diskparams.values():
812
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
813
      try:
814
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
815
        CheckDiskAccessModeValidity(self.op.diskparams)
816
      except errors.OpPrereqError, err:
817
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
818
                                   errors.ECODE_INVAL)
819

    
820
  def ExpandNames(self):
821
    # FIXME: in the future maybe other cluster params won't require checking on
822
    # all nodes to be modified.
823
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
824
    # resource locks the right thing, shouldn't it be the BGL instead?
825
    self.needed_locks = {
826
      locking.LEVEL_NODE: locking.ALL_SET,
827
      locking.LEVEL_INSTANCE: locking.ALL_SET,
828
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
829
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
830
    }
831
    self.share_locks = ShareAll()
832

    
833
  def BuildHooksEnv(self):
834
    """Build hooks env.
835

836
    """
837
    return {
838
      "OP_TARGET": self.cfg.GetClusterName(),
839
      "NEW_VG_NAME": self.op.vg_name,
840
      }
841

    
842
  def BuildHooksNodes(self):
843
    """Build hooks nodes.
844

845
    """
846
    mn = self.cfg.GetMasterNode()
847
    return ([mn], [mn])
848

    
849
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
850
                   new_enabled_disk_templates):
851
    """Check the consistency of the vg name on all nodes and in case it gets
852
       unset whether there are instances still using it.
853

854
    """
855
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
856
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
857
                                            new_enabled_disk_templates)
858
    current_vg_name = self.cfg.GetVGName()
859

    
860
    if self.op.vg_name == '':
861
      if lvm_is_enabled:
862
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
863
                                   " disk templates are or get enabled.")
864

    
865
    if self.op.vg_name is None:
866
      if current_vg_name is None and lvm_is_enabled:
867
        raise errors.OpPrereqError("Please specify a volume group when"
868
                                   " enabling lvm-based disk-templates.")
869

    
870
    if self.op.vg_name is not None and not self.op.vg_name:
871
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
872
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
873
                                   " instances exist", errors.ECODE_INVAL)
874

    
875
    if (self.op.vg_name is not None and lvm_is_enabled) or \
876
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
877
      self._CheckVgNameOnNodes(node_uuids)
878

    
879
  def _CheckVgNameOnNodes(self, node_uuids):
880
    """Check the status of the volume group on each node.
881

882
    """
883
    vglist = self.rpc.call_vg_list(node_uuids)
884
    for node_uuid in node_uuids:
885
      msg = vglist[node_uuid].fail_msg
886
      if msg:
887
        # ignoring down node
888
        self.LogWarning("Error while gathering data on node %s"
889
                        " (ignoring node): %s",
890
                        self.cfg.GetNodeName(node_uuid), msg)
891
        continue
892
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
893
                                            self.op.vg_name,
894
                                            constants.MIN_VG_SIZE)
895
      if vgstatus:
896
        raise errors.OpPrereqError("Error on node '%s': %s" %
897
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
898
                                   errors.ECODE_ENVIRON)
899

    
900
  @staticmethod
901
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
902
                                old_enabled_disk_templates):
903
    """Computes three sets of disk templates.
904

905
    @see: C{_GetDiskTemplateSets} for more details.
906

907
    """
908
    enabled_disk_templates = None
909
    new_enabled_disk_templates = []
910
    disabled_disk_templates = []
911
    if op_enabled_disk_templates:
912
      enabled_disk_templates = op_enabled_disk_templates
913
      new_enabled_disk_templates = \
914
        list(set(enabled_disk_templates)
915
             - set(old_enabled_disk_templates))
916
      disabled_disk_templates = \
917
        list(set(old_enabled_disk_templates)
918
             - set(enabled_disk_templates))
919
    else:
920
      enabled_disk_templates = old_enabled_disk_templates
921
    return (enabled_disk_templates, new_enabled_disk_templates,
922
            disabled_disk_templates)
923

    
924
  def _GetDiskTemplateSets(self, cluster):
925
    """Computes three sets of disk templates.
926

927
    The three sets are:
928
      - disk templates that will be enabled after this operation (no matter if
929
        they were enabled before or not)
930
      - disk templates that get enabled by this operation (thus haven't been
931
        enabled before.)
932
      - disk templates that get disabled by this operation
933

934
    """
935
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
936
                                          cluster.enabled_disk_templates)
937

    
938
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
939
    """Checks the ipolicy.
940

941
    @type cluster: C{objects.Cluster}
942
    @param cluster: the cluster's configuration
943
    @type enabled_disk_templates: list of string
944
    @param enabled_disk_templates: list of (possibly newly) enabled disk
945
      templates
946

947
    """
948
    # FIXME: write unit tests for this
949
    if self.op.ipolicy:
950
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
951
                                           group_policy=False)
952

    
953
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
954
                                  enabled_disk_templates)
955

    
956
      all_instances = self.cfg.GetAllInstancesInfo().values()
957
      violations = set()
958
      for group in self.cfg.GetAllNodeGroupsInfo().values():
959
        instances = frozenset([inst for inst in all_instances
960
                               if compat.any(nuuid in group.members
961
                                             for nuuid in inst.all_nodes)])
962
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
963
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
964
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
965
                                           self.cfg)
966
        if new:
967
          violations.update(new)
968

    
969
      if violations:
970
        self.LogWarning("After the ipolicy change the following instances"
971
                        " violate them: %s",
972
                        utils.CommaJoin(utils.NiceSort(violations)))
973
    else:
974
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
975
                                  enabled_disk_templates)
976

    
977
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
978
    """Checks whether the set DRBD helper actually exists on the nodes.
979

980
    @type drbd_helper: string
981
    @param drbd_helper: path of the drbd usermode helper binary
982
    @type node_uuids: list of strings
983
    @param node_uuids: list of node UUIDs to check for the helper
984

985
    """
986
    # checks given drbd helper on all nodes
987
    helpers = self.rpc.call_drbd_helper(node_uuids)
988
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
989
      if ninfo.offline:
990
        self.LogInfo("Not checking drbd helper on offline node %s",
991
                     ninfo.name)
992
        continue
993
      msg = helpers[ninfo.uuid].fail_msg
994
      if msg:
995
        raise errors.OpPrereqError("Error checking drbd helper on node"
996
                                   " '%s': %s" % (ninfo.name, msg),
997
                                   errors.ECODE_ENVIRON)
998
      node_helper = helpers[ninfo.uuid].payload
999
      if node_helper != drbd_helper:
1000
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
1001
                                   (ninfo.name, node_helper),
1002
                                   errors.ECODE_ENVIRON)
1003

    
1004
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
1005
    """Check the DRBD usermode helper.
1006

1007
    @type node_uuids: list of strings
1008
    @param node_uuids: a list of nodes' UUIDs
1009
    @type drbd_enabled: boolean
1010
    @param drbd_enabled: whether DRBD will be enabled after this operation
1011
      (no matter if it was disabled before or not)
1012
    @type drbd_gets_enabled: boolen
1013
    @param drbd_gets_enabled: true if DRBD was disabled before this
1014
      operation, but will be enabled afterwards
1015

1016
    """
1017
    if self.op.drbd_helper == '':
1018
      if drbd_enabled:
1019
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1020
                                   " DRBD is enabled.")
1021
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
1022
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1023
                                   " drbd-based instances exist",
1024
                                   errors.ECODE_INVAL)
1025

    
1026
    else:
1027
      if self.op.drbd_helper is not None and drbd_enabled:
1028
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
1029
      else:
1030
        if drbd_gets_enabled:
1031
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
1032
          if current_drbd_helper is not None:
1033
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
1034
          else:
1035
            raise errors.OpPrereqError("Cannot enable DRBD without a"
1036
                                       " DRBD usermode helper set.")
1037

    
1038
  def _CheckInstancesOfDisabledDiskTemplates(
1039
      self, disabled_disk_templates):
1040
    """Check whether we try to disable a disk template that is in use.
1041

1042
    @type disabled_disk_templates: list of string
1043
    @param disabled_disk_templates: list of disk templates that are going to
1044
      be disabled by this operation
1045

1046
    """
1047
    for disk_template in disabled_disk_templates:
1048
      if self.cfg.HasAnyDiskOfType(disk_template):
1049
        raise errors.OpPrereqError(
1050
            "Cannot disable disk template '%s', because there is at least one"
1051
            " instance using it." % disk_template)
1052

    
1053
  def CheckPrereq(self):
1054
    """Check prerequisites.
1055

1056
    This checks whether the given params don't conflict and
1057
    if the given volume group is valid.
1058

1059
    """
1060
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1061
    self.cluster = cluster = self.cfg.GetClusterInfo()
1062

    
1063
    vm_capable_node_uuids = [node.uuid
1064
                             for node in self.cfg.GetAllNodesInfo().values()
1065
                             if node.uuid in node_uuids and node.vm_capable]
1066

    
1067
    (enabled_disk_templates, new_enabled_disk_templates,
1068
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
1069
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
1070

    
1071
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
1072
                      new_enabled_disk_templates)
1073

    
1074
    if self.op.file_storage_dir is not None:
1075
      CheckFileStoragePathVsEnabledDiskTemplates(
1076
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
1077

    
1078
    if self.op.shared_file_storage_dir is not None:
1079
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
1080
          self.LogWarning, self.op.shared_file_storage_dir,
1081
          enabled_disk_templates)
1082

    
1083
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1084
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1085
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1086

    
1087
    # validate params changes
1088
    if self.op.beparams:
1089
      objects.UpgradeBeParams(self.op.beparams)
1090
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1091
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1092

    
1093
    if self.op.ndparams:
1094
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1095
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1096

    
1097
      # TODO: we need a more general way to handle resetting
1098
      # cluster-level parameters to default values
1099
      if self.new_ndparams["oob_program"] == "":
1100
        self.new_ndparams["oob_program"] = \
1101
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1102

    
1103
    if self.op.hv_state:
1104
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1105
                                           self.cluster.hv_state_static)
1106
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1107
                               for hv, values in new_hv_state.items())
1108

    
1109
    if self.op.disk_state:
1110
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1111
                                               self.cluster.disk_state_static)
1112
      self.new_disk_state = \
1113
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1114
                            for name, values in svalues.items()))
1115
             for storage, svalues in new_disk_state.items())
1116

    
1117
    self._CheckIpolicy(cluster, enabled_disk_templates)
1118

    
1119
    if self.op.nicparams:
1120
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1121
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1122
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1123
      nic_errors = []
1124

    
1125
      # check all instances for consistency
1126
      for instance in self.cfg.GetAllInstancesInfo().values():
1127
        for nic_idx, nic in enumerate(instance.nics):
1128
          params_copy = copy.deepcopy(nic.nicparams)
1129
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1130

    
1131
          # check parameter syntax
1132
          try:
1133
            objects.NIC.CheckParameterSyntax(params_filled)
1134
          except errors.ConfigurationError, err:
1135
            nic_errors.append("Instance %s, nic/%d: %s" %
1136
                              (instance.name, nic_idx, err))
1137

    
1138
          # if we're moving instances to routed, check that they have an ip
1139
          target_mode = params_filled[constants.NIC_MODE]
1140
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1141
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1142
                              " address" % (instance.name, nic_idx))
1143
      if nic_errors:
1144
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1145
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1146

    
1147
    # hypervisor list/parameters
1148
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1149
    if self.op.hvparams:
1150
      for hv_name, hv_dict in self.op.hvparams.items():
1151
        if hv_name not in self.new_hvparams:
1152
          self.new_hvparams[hv_name] = hv_dict
1153
        else:
1154
          self.new_hvparams[hv_name].update(hv_dict)
1155

    
1156
    # disk template parameters
1157
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1158
    if self.op.diskparams:
1159
      for dt_name, dt_params in self.op.diskparams.items():
1160
        if dt_name not in self.new_diskparams:
1161
          self.new_diskparams[dt_name] = dt_params
1162
        else:
1163
          self.new_diskparams[dt_name].update(dt_params)
1164
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1165

    
1166
    # os hypervisor parameters
1167
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1168
    if self.op.os_hvp:
1169
      for os_name, hvs in self.op.os_hvp.items():
1170
        if os_name not in self.new_os_hvp:
1171
          self.new_os_hvp[os_name] = hvs
1172
        else:
1173
          for hv_name, hv_dict in hvs.items():
1174
            if hv_dict is None:
1175
              # Delete if it exists
1176
              self.new_os_hvp[os_name].pop(hv_name, None)
1177
            elif hv_name not in self.new_os_hvp[os_name]:
1178
              self.new_os_hvp[os_name][hv_name] = hv_dict
1179
            else:
1180
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1181

    
1182
    # os parameters
1183
    self.new_osp = objects.FillDict(cluster.osparams, {})
1184
    if self.op.osparams:
1185
      for os_name, osp in self.op.osparams.items():
1186
        if os_name not in self.new_osp:
1187
          self.new_osp[os_name] = {}
1188

    
1189
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1190
                                                 use_none=True)
1191

    
1192
        if not self.new_osp[os_name]:
1193
          # we removed all parameters
1194
          del self.new_osp[os_name]
1195
        else:
1196
          # check the parameter validity (remote check)
1197
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1198
                        os_name, self.new_osp[os_name])
1199

    
1200
    # changes to the hypervisor list
1201
    if self.op.enabled_hypervisors is not None:
1202
      self.hv_list = self.op.enabled_hypervisors
1203
      for hv in self.hv_list:
1204
        # if the hypervisor doesn't already exist in the cluster
1205
        # hvparams, we initialize it to empty, and then (in both
1206
        # cases) we make sure to fill the defaults, as we might not
1207
        # have a complete defaults list if the hypervisor wasn't
1208
        # enabled before
1209
        if hv not in new_hvp:
1210
          new_hvp[hv] = {}
1211
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1212
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1213
    else:
1214
      self.hv_list = cluster.enabled_hypervisors
1215

    
1216
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1217
      # either the enabled list has changed, or the parameters have, validate
1218
      for hv_name, hv_params in self.new_hvparams.items():
1219
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1220
            (self.op.enabled_hypervisors and
1221
             hv_name in self.op.enabled_hypervisors)):
1222
          # either this is a new hypervisor, or its parameters have changed
1223
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1224
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1225
          hv_class.CheckParameterSyntax(hv_params)
1226
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1227

    
1228
    self._CheckDiskTemplateConsistency()
1229

    
1230
    if self.op.os_hvp:
1231
      # no need to check any newly-enabled hypervisors, since the
1232
      # defaults have already been checked in the above code-block
1233
      for os_name, os_hvp in self.new_os_hvp.items():
1234
        for hv_name, hv_params in os_hvp.items():
1235
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1236
          # we need to fill in the new os_hvp on top of the actual hv_p
1237
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1238
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1239
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1240
          hv_class.CheckParameterSyntax(new_osp)
1241
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1242

    
1243
    if self.op.default_iallocator:
1244
      alloc_script = utils.FindFile(self.op.default_iallocator,
1245
                                    constants.IALLOCATOR_SEARCH_PATH,
1246
                                    os.path.isfile)
1247
      if alloc_script is None:
1248
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1249
                                   " specified" % self.op.default_iallocator,
1250
                                   errors.ECODE_INVAL)
1251

    
1252
  def _CheckDiskTemplateConsistency(self):
1253
    """Check whether the disk templates that are going to be disabled
1254
       are still in use by some instances.
1255

1256
    """
1257
    if self.op.enabled_disk_templates:
1258
      cluster = self.cfg.GetClusterInfo()
1259
      instances = self.cfg.GetAllInstancesInfo()
1260

    
1261
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1262
        - set(self.op.enabled_disk_templates)
1263
      for instance in instances.itervalues():
1264
        if instance.disk_template in disk_templates_to_remove:
1265
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1266
                                     " because instance '%s' is using it." %
1267
                                     (instance.disk_template, instance.name))
1268

    
1269
  def _SetVgName(self, feedback_fn):
1270
    """Determines and sets the new volume group name.
1271

1272
    """
1273
    if self.op.vg_name is not None:
1274
      new_volume = self.op.vg_name
1275
      if not new_volume:
1276
        new_volume = None
1277
      if new_volume != self.cfg.GetVGName():
1278
        self.cfg.SetVGName(new_volume)
1279
      else:
1280
        feedback_fn("Cluster LVM configuration already in desired"
1281
                    " state, not changing")
1282

    
1283
  def _SetFileStorageDir(self, feedback_fn):
1284
    """Set the file storage directory.
1285

1286
    """
1287
    if self.op.file_storage_dir is not None:
1288
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1289
        feedback_fn("Global file storage dir already set to value '%s'"
1290
                    % self.cluster.file_storage_dir)
1291
      else:
1292
        self.cluster.file_storage_dir = self.op.file_storage_dir
1293

    
1294
  def _SetDrbdHelper(self, feedback_fn):
1295
    """Set the DRBD usermode helper.
1296

1297
    """
1298
    if self.op.drbd_helper is not None:
1299
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1300
        feedback_fn("Note that you specified a drbd user helper, but did not"
1301
                    " enable the drbd disk template.")
1302
      new_helper = self.op.drbd_helper
1303
      if not new_helper:
1304
        new_helper = None
1305
      if new_helper != self.cfg.GetDRBDHelper():
1306
        self.cfg.SetDRBDHelper(new_helper)
1307
      else:
1308
        feedback_fn("Cluster DRBD helper already in desired state,"
1309
                    " not changing")
1310

    
1311
  def Exec(self, feedback_fn):
1312
    """Change the parameters of the cluster.
1313

1314
    """
1315
    if self.op.enabled_disk_templates:
1316
      self.cluster.enabled_disk_templates = \
1317
        list(self.op.enabled_disk_templates)
1318

    
1319
    self._SetVgName(feedback_fn)
1320
    self._SetFileStorageDir(feedback_fn)
1321
    self._SetDrbdHelper(feedback_fn)
1322

    
1323
    if self.op.hvparams:
1324
      self.cluster.hvparams = self.new_hvparams
1325
    if self.op.os_hvp:
1326
      self.cluster.os_hvp = self.new_os_hvp
1327
    if self.op.enabled_hypervisors is not None:
1328
      self.cluster.hvparams = self.new_hvparams
1329
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1330
    if self.op.beparams:
1331
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1332
    if self.op.nicparams:
1333
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1334
    if self.op.ipolicy:
1335
      self.cluster.ipolicy = self.new_ipolicy
1336
    if self.op.osparams:
1337
      self.cluster.osparams = self.new_osp
1338
    if self.op.ndparams:
1339
      self.cluster.ndparams = self.new_ndparams
1340
    if self.op.diskparams:
1341
      self.cluster.diskparams = self.new_diskparams
1342
    if self.op.hv_state:
1343
      self.cluster.hv_state_static = self.new_hv_state
1344
    if self.op.disk_state:
1345
      self.cluster.disk_state_static = self.new_disk_state
1346

    
1347
    if self.op.candidate_pool_size is not None:
1348
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1349
      # we need to update the pool size here, otherwise the save will fail
1350
      AdjustCandidatePool(self, [], feedback_fn)
1351

    
1352
    if self.op.max_running_jobs is not None:
1353
      self.cluster.max_running_jobs = self.op.max_running_jobs
1354

    
1355
    if self.op.maintain_node_health is not None:
1356
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1357
        feedback_fn("Note: CONFD was disabled at build time, node health"
1358
                    " maintenance is not useful (still enabling it)")
1359
      self.cluster.maintain_node_health = self.op.maintain_node_health
1360

    
1361
    if self.op.modify_etc_hosts is not None:
1362
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1363

    
1364
    if self.op.prealloc_wipe_disks is not None:
1365
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1366

    
1367
    if self.op.add_uids is not None:
1368
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1369

    
1370
    if self.op.remove_uids is not None:
1371
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1372

    
1373
    if self.op.uid_pool is not None:
1374
      self.cluster.uid_pool = self.op.uid_pool
1375

    
1376
    if self.op.default_iallocator is not None:
1377
      self.cluster.default_iallocator = self.op.default_iallocator
1378

    
1379
    if self.op.default_iallocator_params is not None:
1380
      self.cluster.default_iallocator_params = self.op.default_iallocator_params
1381

    
1382
    if self.op.reserved_lvs is not None:
1383
      self.cluster.reserved_lvs = self.op.reserved_lvs
1384

    
1385
    if self.op.use_external_mip_script is not None:
1386
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1387

    
1388
    def helper_os(aname, mods, desc):
1389
      desc += " OS list"
1390
      lst = getattr(self.cluster, aname)
1391
      for key, val in mods:
1392
        if key == constants.DDM_ADD:
1393
          if val in lst:
1394
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1395
          else:
1396
            lst.append(val)
1397
        elif key == constants.DDM_REMOVE:
1398
          if val in lst:
1399
            lst.remove(val)
1400
          else:
1401
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1402
        else:
1403
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1404

    
1405
    if self.op.hidden_os:
1406
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1407

    
1408
    if self.op.blacklisted_os:
1409
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1410

    
1411
    if self.op.master_netdev:
1412
      master_params = self.cfg.GetMasterNetworkParameters()
1413
      ems = self.cfg.GetUseExternalMipScript()
1414
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1415
                  self.cluster.master_netdev)
1416
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1417
                                                       master_params, ems)
1418
      if not self.op.force:
1419
        result.Raise("Could not disable the master ip")
1420
      else:
1421
        if result.fail_msg:
1422
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1423
                 result.fail_msg)
1424
          feedback_fn(msg)
1425
      feedback_fn("Changing master_netdev from %s to %s" %
1426
                  (master_params.netdev, self.op.master_netdev))
1427
      self.cluster.master_netdev = self.op.master_netdev
1428

    
1429
    if self.op.master_netmask:
1430
      master_params = self.cfg.GetMasterNetworkParameters()
1431
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1432
      result = self.rpc.call_node_change_master_netmask(
1433
                 master_params.uuid, master_params.netmask,
1434
                 self.op.master_netmask, master_params.ip,
1435
                 master_params.netdev)
1436
      result.Warn("Could not change the master IP netmask", feedback_fn)
1437
      self.cluster.master_netmask = self.op.master_netmask
1438

    
1439
    self.cfg.Update(self.cluster, feedback_fn)
1440

    
1441
    if self.op.master_netdev:
1442
      master_params = self.cfg.GetMasterNetworkParameters()
1443
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1444
                  self.op.master_netdev)
1445
      ems = self.cfg.GetUseExternalMipScript()
1446
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1447
                                                     master_params, ems)
1448
      result.Warn("Could not re-enable the master ip on the master,"
1449
                  " please restart manually", self.LogWarning)
1450

    
1451

    
1452
class LUClusterVerify(NoHooksLU):
1453
  """Submits all jobs necessary to verify the cluster.
1454

1455
  """
1456
  REQ_BGL = False
1457

    
1458
  def ExpandNames(self):
1459
    self.needed_locks = {}
1460

    
1461
  def Exec(self, feedback_fn):
1462
    jobs = []
1463

    
1464
    if self.op.group_name:
1465
      groups = [self.op.group_name]
1466
      depends_fn = lambda: None
1467
    else:
1468
      groups = self.cfg.GetNodeGroupList()
1469

    
1470
      # Verify global configuration
1471
      jobs.append([
1472
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1473
        ])
1474

    
1475
      # Always depend on global verification
1476
      depends_fn = lambda: [(-len(jobs), [])]
1477

    
1478
    jobs.extend(
1479
      [opcodes.OpClusterVerifyGroup(group_name=group,
1480
                                    ignore_errors=self.op.ignore_errors,
1481
                                    depends=depends_fn())]
1482
      for group in groups)
1483

    
1484
    # Fix up all parameters
1485
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1486
      op.debug_simulate_errors = self.op.debug_simulate_errors
1487
      op.verbose = self.op.verbose
1488
      op.error_codes = self.op.error_codes
1489
      try:
1490
        op.skip_checks = self.op.skip_checks
1491
      except AttributeError:
1492
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1493

    
1494
    return ResultWithJobs(jobs)
1495

    
1496

    
1497
class _VerifyErrors(object):
1498
  """Mix-in for cluster/group verify LUs.
1499

1500
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1501
  self.op and self._feedback_fn to be available.)
1502

1503
  """
1504

    
1505
  ETYPE_FIELD = "code"
1506
  ETYPE_ERROR = constants.CV_ERROR
1507
  ETYPE_WARNING = constants.CV_WARNING
1508

    
1509
  def _Error(self, ecode, item, msg, *args, **kwargs):
1510
    """Format an error message.
1511

1512
    Based on the opcode's error_codes parameter, either format a
1513
    parseable error code, or a simpler error string.
1514

1515
    This must be called only from Exec and functions called from Exec.
1516

1517
    """
1518
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1519
    itype, etxt, _ = ecode
1520
    # If the error code is in the list of ignored errors, demote the error to a
1521
    # warning
1522
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1523
      ltype = self.ETYPE_WARNING
1524
    # first complete the msg
1525
    if args:
1526
      msg = msg % args
1527
    # then format the whole message
1528
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1529
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1530
    else:
1531
      if item:
1532
        item = " " + item
1533
      else:
1534
        item = ""
1535
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1536
    # and finally report it via the feedback_fn
1537
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1538
    # do not mark the operation as failed for WARN cases only
1539
    if ltype == self.ETYPE_ERROR:
1540
      self.bad = True
1541

    
1542
  def _ErrorIf(self, cond, *args, **kwargs):
1543
    """Log an error message if the passed condition is True.
1544

1545
    """
1546
    if (bool(cond)
1547
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1548
      self._Error(*args, **kwargs)
1549

    
1550

    
1551
def _GetAllHypervisorParameters(cluster, instances):
1552
  """Compute the set of all hypervisor parameters.
1553

1554
  @type cluster: L{objects.Cluster}
1555
  @param cluster: the cluster object
1556
  @param instances: list of L{objects.Instance}
1557
  @param instances: additional instances from which to obtain parameters
1558
  @rtype: list of (origin, hypervisor, parameters)
1559
  @return: a list with all parameters found, indicating the hypervisor they
1560
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1561

1562
  """
1563
  hvp_data = []
1564

    
1565
  for hv_name in cluster.enabled_hypervisors:
1566
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1567

    
1568
  for os_name, os_hvp in cluster.os_hvp.items():
1569
    for hv_name, hv_params in os_hvp.items():
1570
      if hv_params:
1571
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1572
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1573

    
1574
  # TODO: collapse identical parameter values in a single one
1575
  for instance in instances:
1576
    if instance.hvparams:
1577
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1578
                       cluster.FillHV(instance)))
1579

    
1580
  return hvp_data
1581

    
1582

    
1583
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1584
  """Verifies the cluster config.
1585

1586
  """
1587
  REQ_BGL = False
1588

    
1589
  def _VerifyHVP(self, hvp_data):
1590
    """Verifies locally the syntax of the hypervisor parameters.
1591

1592
    """
1593
    for item, hv_name, hv_params in hvp_data:
1594
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1595
             (item, hv_name))
1596
      try:
1597
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1598
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1599
        hv_class.CheckParameterSyntax(hv_params)
1600
      except errors.GenericError, err:
1601
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1602

    
1603
  def ExpandNames(self):
1604
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1605
    self.share_locks = ShareAll()
1606

    
1607
  def CheckPrereq(self):
1608
    """Check prerequisites.
1609

1610
    """
1611
    # Retrieve all information
1612
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1613
    self.all_node_info = self.cfg.GetAllNodesInfo()
1614
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1615

    
1616
  def Exec(self, feedback_fn):
1617
    """Verify integrity of cluster, performing various test on nodes.
1618

1619
    """
1620
    self.bad = False
1621
    self._feedback_fn = feedback_fn
1622

    
1623
    feedback_fn("* Verifying cluster config")
1624

    
1625
    for msg in self.cfg.VerifyConfig():
1626
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1627

    
1628
    feedback_fn("* Verifying cluster certificate files")
1629

    
1630
    for cert_filename in pathutils.ALL_CERT_FILES:
1631
      (errcode, msg) = utils.VerifyCertificate(cert_filename)
1632
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1633

    
1634
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1635
                                    pathutils.NODED_CERT_FILE),
1636
                  constants.CV_ECLUSTERCERT,
1637
                  None,
1638
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1639
                    constants.LUXID_USER + " user")
1640

    
1641
    feedback_fn("* Verifying hypervisor parameters")
1642

    
1643
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1644
                                                self.all_inst_info.values()))
1645

    
1646
    feedback_fn("* Verifying all nodes belong to an existing group")
1647

    
1648
    # We do this verification here because, should this bogus circumstance
1649
    # occur, it would never be caught by VerifyGroup, which only acts on
1650
    # nodes/instances reachable from existing node groups.
1651

    
1652
    dangling_nodes = set(node for node in self.all_node_info.values()
1653
                         if node.group not in self.all_group_info)
1654

    
1655
    dangling_instances = {}
1656
    no_node_instances = []
1657

    
1658
    for inst in self.all_inst_info.values():
1659
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1660
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1661
      elif inst.primary_node not in self.all_node_info:
1662
        no_node_instances.append(inst)
1663

    
1664
    pretty_dangling = [
1665
        "%s (%s)" %
1666
        (node.name,
1667
         utils.CommaJoin(inst.name for
1668
                         inst in dangling_instances.get(node.uuid, [])))
1669
        for node in dangling_nodes]
1670

    
1671
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1672
                  None,
1673
                  "the following nodes (and their instances) belong to a non"
1674
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1675

    
1676
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1677
                  None,
1678
                  "the following instances have a non-existing primary-node:"
1679
                  " %s", utils.CommaJoin(inst.name for
1680
                                         inst in no_node_instances))
1681

    
1682
    return not self.bad
1683

    
1684

    
1685
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1686
  """Verifies the status of a node group.
1687

1688
  """
1689
  HPATH = "cluster-verify"
1690
  HTYPE = constants.HTYPE_CLUSTER
1691
  REQ_BGL = False
1692

    
1693
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1694

    
1695
  class NodeImage(object):
1696
    """A class representing the logical and physical status of a node.
1697

1698
    @type uuid: string
1699
    @ivar uuid: the node UUID to which this object refers
1700
    @ivar volumes: a structure as returned from
1701
        L{ganeti.backend.GetVolumeList} (runtime)
1702
    @ivar instances: a list of running instances (runtime)
1703
    @ivar pinst: list of configured primary instances (config)
1704
    @ivar sinst: list of configured secondary instances (config)
1705
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1706
        instances for which this node is secondary (config)
1707
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1708
    @ivar dfree: free disk, as reported by the node (runtime)
1709
    @ivar offline: the offline status (config)
1710
    @type rpc_fail: boolean
1711
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1712
        not whether the individual keys were correct) (runtime)
1713
    @type lvm_fail: boolean
1714
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1715
    @type hyp_fail: boolean
1716
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1717
    @type ghost: boolean
1718
    @ivar ghost: whether this is a known node or not (config)
1719
    @type os_fail: boolean
1720
    @ivar os_fail: whether the RPC call didn't return valid OS data
1721
    @type oslist: list
1722
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1723
    @type vm_capable: boolean
1724
    @ivar vm_capable: whether the node can host instances
1725
    @type pv_min: float
1726
    @ivar pv_min: size in MiB of the smallest PVs
1727
    @type pv_max: float
1728
    @ivar pv_max: size in MiB of the biggest PVs
1729

1730
    """
1731
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1732
      self.uuid = uuid
1733
      self.volumes = {}
1734
      self.instances = []
1735
      self.pinst = []
1736
      self.sinst = []
1737
      self.sbp = {}
1738
      self.mfree = 0
1739
      self.dfree = 0
1740
      self.offline = offline
1741
      self.vm_capable = vm_capable
1742
      self.rpc_fail = False
1743
      self.lvm_fail = False
1744
      self.hyp_fail = False
1745
      self.ghost = False
1746
      self.os_fail = False
1747
      self.oslist = {}
1748
      self.pv_min = None
1749
      self.pv_max = None
1750

    
1751
  def ExpandNames(self):
1752
    # This raises errors.OpPrereqError on its own:
1753
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1754

    
1755
    # Get instances in node group; this is unsafe and needs verification later
1756
    inst_uuids = \
1757
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1758

    
1759
    self.needed_locks = {
1760
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1761
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1762
      locking.LEVEL_NODE: [],
1763

    
1764
      # This opcode is run by watcher every five minutes and acquires all nodes
1765
      # for a group. It doesn't run for a long time, so it's better to acquire
1766
      # the node allocation lock as well.
1767
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1768
      }
1769

    
1770
    self.share_locks = ShareAll()
1771

    
1772
  def DeclareLocks(self, level):
1773
    if level == locking.LEVEL_NODE:
1774
      # Get members of node group; this is unsafe and needs verification later
1775
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1776

    
1777
      # In Exec(), we warn about mirrored instances that have primary and
1778
      # secondary living in separate node groups. To fully verify that
1779
      # volumes for these instances are healthy, we will need to do an
1780
      # extra call to their secondaries. We ensure here those nodes will
1781
      # be locked.
1782
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1783
        # Important: access only the instances whose lock is owned
1784
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1785
        if instance.disk_template in constants.DTS_INT_MIRROR:
1786
          nodes.update(instance.secondary_nodes)
1787

    
1788
      self.needed_locks[locking.LEVEL_NODE] = nodes
1789

    
1790
  def CheckPrereq(self):
1791
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1792
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1793

    
1794
    group_node_uuids = set(self.group_info.members)
1795
    group_inst_uuids = \
1796
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1797

    
1798
    unlocked_node_uuids = \
1799
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1800

    
1801
    unlocked_inst_uuids = \
1802
        group_inst_uuids.difference(
1803
          [self.cfg.GetInstanceInfoByName(name).uuid
1804
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1805

    
1806
    if unlocked_node_uuids:
1807
      raise errors.OpPrereqError(
1808
        "Missing lock for nodes: %s" %
1809
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1810
        errors.ECODE_STATE)
1811

    
1812
    if unlocked_inst_uuids:
1813
      raise errors.OpPrereqError(
1814
        "Missing lock for instances: %s" %
1815
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1816
        errors.ECODE_STATE)
1817

    
1818
    self.all_node_info = self.cfg.GetAllNodesInfo()
1819
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1820

    
1821
    self.my_node_uuids = group_node_uuids
1822
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1823
                             for node_uuid in group_node_uuids)
1824

    
1825
    self.my_inst_uuids = group_inst_uuids
1826
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1827
                             for inst_uuid in group_inst_uuids)
1828

    
1829
    # We detect here the nodes that will need the extra RPC calls for verifying
1830
    # split LV volumes; they should be locked.
1831
    extra_lv_nodes = set()
1832

    
1833
    for inst in self.my_inst_info.values():
1834
      if inst.disk_template in constants.DTS_INT_MIRROR:
1835
        for nuuid in inst.all_nodes:
1836
          if self.all_node_info[nuuid].group != self.group_uuid:
1837
            extra_lv_nodes.add(nuuid)
1838

    
1839
    unlocked_lv_nodes = \
1840
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1841

    
1842
    if unlocked_lv_nodes:
1843
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1844
                                 utils.CommaJoin(unlocked_lv_nodes),
1845
                                 errors.ECODE_STATE)
1846
    self.extra_lv_nodes = list(extra_lv_nodes)
1847

    
1848
  def _VerifyNode(self, ninfo, nresult):
1849
    """Perform some basic validation on data returned from a node.
1850

1851
      - check the result data structure is well formed and has all the
1852
        mandatory fields
1853
      - check ganeti version
1854

1855
    @type ninfo: L{objects.Node}
1856
    @param ninfo: the node to check
1857
    @param nresult: the results from the node
1858
    @rtype: boolean
1859
    @return: whether overall this call was successful (and we can expect
1860
         reasonable values in the respose)
1861

1862
    """
1863
    # main result, nresult should be a non-empty dict
1864
    test = not nresult or not isinstance(nresult, dict)
1865
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1866
                  "unable to verify node: no data returned")
1867
    if test:
1868
      return False
1869

    
1870
    # compares ganeti version
1871
    local_version = constants.PROTOCOL_VERSION
1872
    remote_version = nresult.get("version", None)
1873
    test = not (remote_version and
1874
                isinstance(remote_version, (list, tuple)) and
1875
                len(remote_version) == 2)
1876
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1877
                  "connection to node returned invalid data")
1878
    if test:
1879
      return False
1880

    
1881
    test = local_version != remote_version[0]
1882
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1883
                  "incompatible protocol versions: master %s,"
1884
                  " node %s", local_version, remote_version[0])
1885
    if test:
1886
      return False
1887

    
1888
    # node seems compatible, we can actually try to look into its results
1889

    
1890
    # full package version
1891
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1892
                  constants.CV_ENODEVERSION, ninfo.name,
1893
                  "software version mismatch: master %s, node %s",
1894
                  constants.RELEASE_VERSION, remote_version[1],
1895
                  code=self.ETYPE_WARNING)
1896

    
1897
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1898
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1899
      for hv_name, hv_result in hyp_result.iteritems():
1900
        test = hv_result is not None
1901
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1902
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1903

    
1904
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1905
    if ninfo.vm_capable and isinstance(hvp_result, list):
1906
      for item, hv_name, hv_result in hvp_result:
1907
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1908
                      "hypervisor %s parameter verify failure (source %s): %s",
1909
                      hv_name, item, hv_result)
1910

    
1911
    test = nresult.get(constants.NV_NODESETUP,
1912
                       ["Missing NODESETUP results"])
1913
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1914
                  "node setup error: %s", "; ".join(test))
1915

    
1916
    return True
1917

    
1918
  def _VerifyNodeTime(self, ninfo, nresult,
1919
                      nvinfo_starttime, nvinfo_endtime):
1920
    """Check the node time.
1921

1922
    @type ninfo: L{objects.Node}
1923
    @param ninfo: the node to check
1924
    @param nresult: the remote results for the node
1925
    @param nvinfo_starttime: the start time of the RPC call
1926
    @param nvinfo_endtime: the end time of the RPC call
1927

1928
    """
1929
    ntime = nresult.get(constants.NV_TIME, None)
1930
    try:
1931
      ntime_merged = utils.MergeTime(ntime)
1932
    except (ValueError, TypeError):
1933
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1934
                    "Node returned invalid time")
1935
      return
1936

    
1937
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1938
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1939
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1940
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1941
    else:
1942
      ntime_diff = None
1943

    
1944
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1945
                  "Node time diverges by at least %s from master node time",
1946
                  ntime_diff)
1947

    
1948
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1949
    """Check the node LVM results and update info for cross-node checks.
1950

1951
    @type ninfo: L{objects.Node}
1952
    @param ninfo: the node to check
1953
    @param nresult: the remote results for the node
1954
    @param vg_name: the configured VG name
1955
    @type nimg: L{NodeImage}
1956
    @param nimg: node image
1957

1958
    """
1959
    if vg_name is None:
1960
      return
1961

    
1962
    # checks vg existence and size > 20G
1963
    vglist = nresult.get(constants.NV_VGLIST, None)
1964
    test = not vglist
1965
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1966
                  "unable to check volume groups")
1967
    if not test:
1968
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1969
                                            constants.MIN_VG_SIZE)
1970
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1971

    
1972
    # Check PVs
1973
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1974
    for em in errmsgs:
1975
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1976
    if pvminmax is not None:
1977
      (nimg.pv_min, nimg.pv_max) = pvminmax
1978

    
1979
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1980
    """Check cross-node DRBD version consistency.
1981

1982
    @type node_verify_infos: dict
1983
    @param node_verify_infos: infos about nodes as returned from the
1984
      node_verify call.
1985

1986
    """
1987
    node_versions = {}
1988
    for node_uuid, ndata in node_verify_infos.items():
1989
      nresult = ndata.payload
1990
      if nresult:
1991
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1992
        node_versions[node_uuid] = version
1993

    
1994
    if len(set(node_versions.values())) > 1:
1995
      for node_uuid, version in sorted(node_versions.items()):
1996
        msg = "DRBD version mismatch: %s" % version
1997
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1998
                    code=self.ETYPE_WARNING)
1999

    
2000
  def _VerifyGroupLVM(self, node_image, vg_name):
2001
    """Check cross-node consistency in LVM.
2002

2003
    @type node_image: dict
2004
    @param node_image: info about nodes, mapping from node to names to
2005
      L{NodeImage} objects
2006
    @param vg_name: the configured VG name
2007

2008
    """
2009
    if vg_name is None:
2010
      return
2011

    
2012
    # Only exclusive storage needs this kind of checks
2013
    if not self._exclusive_storage:
2014
      return
2015

    
2016
    # exclusive_storage wants all PVs to have the same size (approximately),
2017
    # if the smallest and the biggest ones are okay, everything is fine.
2018
    # pv_min is None iff pv_max is None
2019
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
2020
    if not vals:
2021
      return
2022
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
2023
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
2024
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
2025
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
2026
                  "PV sizes differ too much in the group; smallest (%s MB) is"
2027
                  " on %s, biggest (%s MB) is on %s",
2028
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
2029
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
2030

    
2031
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
2032
    """Check the node bridges.
2033

2034
    @type ninfo: L{objects.Node}
2035
    @param ninfo: the node to check
2036
    @param nresult: the remote results for the node
2037
    @param bridges: the expected list of bridges
2038

2039
    """
2040
    if not bridges:
2041
      return
2042

    
2043
    missing = nresult.get(constants.NV_BRIDGES, None)
2044
    test = not isinstance(missing, list)
2045
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2046
                  "did not return valid bridge information")
2047
    if not test:
2048
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
2049
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
2050

    
2051
  def _VerifyNodeUserScripts(self, ninfo, nresult):
2052
    """Check the results of user scripts presence and executability on the node
2053

2054
    @type ninfo: L{objects.Node}
2055
    @param ninfo: the node to check
2056
    @param nresult: the remote results for the node
2057

2058
    """
2059
    test = not constants.NV_USERSCRIPTS in nresult
2060
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2061
                  "did not return user scripts information")
2062

    
2063
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2064
    if not test:
2065
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2066
                    "user scripts not present or not executable: %s" %
2067
                    utils.CommaJoin(sorted(broken_scripts)))
2068

    
2069
  def _VerifyNodeNetwork(self, ninfo, nresult):
2070
    """Check the node network connectivity results.
2071

2072
    @type ninfo: L{objects.Node}
2073
    @param ninfo: the node to check
2074
    @param nresult: the remote results for the node
2075

2076
    """
2077
    test = constants.NV_NODELIST not in nresult
2078
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2079
                  "node hasn't returned node ssh connectivity data")
2080
    if not test:
2081
      if nresult[constants.NV_NODELIST]:
2082
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2083
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2084
                        "ssh communication with node '%s': %s", a_node, a_msg)
2085

    
2086
    test = constants.NV_NODENETTEST not in nresult
2087
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2088
                  "node hasn't returned node tcp connectivity data")
2089
    if not test:
2090
      if nresult[constants.NV_NODENETTEST]:
2091
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2092
        for anode in nlist:
2093
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2094
                        "tcp communication with node '%s': %s",
2095
                        anode, nresult[constants.NV_NODENETTEST][anode])
2096

    
2097
    test = constants.NV_MASTERIP not in nresult
2098
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2099
                  "node hasn't returned node master IP reachability data")
2100
    if not test:
2101
      if not nresult[constants.NV_MASTERIP]:
2102
        if ninfo.uuid == self.master_node:
2103
          msg = "the master node cannot reach the master IP (not configured?)"
2104
        else:
2105
          msg = "cannot reach the master IP"
2106
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2107

    
2108
  def _VerifyInstance(self, instance, node_image, diskstatus):
2109
    """Verify an instance.
2110

2111
    This function checks to see if the required block devices are
2112
    available on the instance's node, and that the nodes are in the correct
2113
    state.
2114

2115
    """
2116
    pnode_uuid = instance.primary_node
2117
    pnode_img = node_image[pnode_uuid]
2118
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2119

    
2120
    node_vol_should = {}
2121
    instance.MapLVsByNode(node_vol_should)
2122

    
2123
    cluster = self.cfg.GetClusterInfo()
2124
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2125
                                                            self.group_info)
2126
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2127
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2128
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2129

    
2130
    for node_uuid in node_vol_should:
2131
      n_img = node_image[node_uuid]
2132
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2133
        # ignore missing volumes on offline or broken nodes
2134
        continue
2135
      for volume in node_vol_should[node_uuid]:
2136
        test = volume not in n_img.volumes
2137
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2138
                      "volume %s missing on node %s", volume,
2139
                      self.cfg.GetNodeName(node_uuid))
2140

    
2141
    if instance.admin_state == constants.ADMINST_UP:
2142
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2143
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2144
                    "instance not running on its primary node %s",
2145
                     self.cfg.GetNodeName(pnode_uuid))
2146
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2147
                    instance.name, "instance is marked as running and lives on"
2148
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2149

    
2150
    diskdata = [(nname, success, status, idx)
2151
                for (nname, disks) in diskstatus.items()
2152
                for idx, (success, status) in enumerate(disks)]
2153

    
2154
    for nname, success, bdev_status, idx in diskdata:
2155
      # the 'ghost node' construction in Exec() ensures that we have a
2156
      # node here
2157
      snode = node_image[nname]
2158
      bad_snode = snode.ghost or snode.offline
2159
      self._ErrorIf(instance.disks_active and
2160
                    not success and not bad_snode,
2161
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2162
                    "couldn't retrieve status for disk/%s on %s: %s",
2163
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2164

    
2165
      if instance.disks_active and success and \
2166
         (bdev_status.is_degraded or
2167
          bdev_status.ldisk_status != constants.LDS_OKAY):
2168
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2169
        if bdev_status.is_degraded:
2170
          msg += " is degraded"
2171
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2172
          msg += "; state is '%s'" % \
2173
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2174

    
2175
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2176

    
2177
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2178
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2179
                  "instance %s, connection to primary node failed",
2180
                  instance.name)
2181

    
2182
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2183
                  constants.CV_EINSTANCELAYOUT, instance.name,
2184
                  "instance has multiple secondary nodes: %s",
2185
                  utils.CommaJoin(instance.secondary_nodes),
2186
                  code=self.ETYPE_WARNING)
2187

    
2188
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2189
    if any(es_flags.values()):
2190
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2191
        # Disk template not compatible with exclusive_storage: no instance
2192
        # node should have the flag set
2193
        es_nodes = [n
2194
                    for (n, es) in es_flags.items()
2195
                    if es]
2196
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2197
                    "instance has template %s, which is not supported on nodes"
2198
                    " that have exclusive storage set: %s",
2199
                    instance.disk_template,
2200
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2201
      for (idx, disk) in enumerate(instance.disks):
2202
        self._ErrorIf(disk.spindles is None,
2203
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2204
                      "number of spindles not configured for disk %s while"
2205
                      " exclusive storage is enabled, try running"
2206
                      " gnt-cluster repair-disk-sizes", idx)
2207

    
2208
    if instance.disk_template in constants.DTS_INT_MIRROR:
2209
      instance_nodes = utils.NiceSort(instance.all_nodes)
2210
      instance_groups = {}
2211

    
2212
      for node_uuid in instance_nodes:
2213
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2214
                                   []).append(node_uuid)
2215

    
2216
      pretty_list = [
2217
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2218
                           groupinfo[group].name)
2219
        # Sort so that we always list the primary node first.
2220
        for group, nodes in sorted(instance_groups.items(),
2221
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2222
                                   reverse=True)]
2223

    
2224
      self._ErrorIf(len(instance_groups) > 1,
2225
                    constants.CV_EINSTANCESPLITGROUPS,
2226
                    instance.name, "instance has primary and secondary nodes in"
2227
                    " different groups: %s", utils.CommaJoin(pretty_list),
2228
                    code=self.ETYPE_WARNING)
2229

    
2230
    inst_nodes_offline = []
2231
    for snode in instance.secondary_nodes:
2232
      s_img = node_image[snode]
2233
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2234
                    self.cfg.GetNodeName(snode),
2235
                    "instance %s, connection to secondary node failed",
2236
                    instance.name)
2237

    
2238
      if s_img.offline:
2239
        inst_nodes_offline.append(snode)
2240

    
2241
    # warn that the instance lives on offline nodes
2242
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2243
                  instance.name, "instance has offline secondary node(s) %s",
2244
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2245
    # ... or ghost/non-vm_capable nodes
2246
    for node_uuid in instance.all_nodes:
2247
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2248
                    instance.name, "instance lives on ghost node %s",
2249
                    self.cfg.GetNodeName(node_uuid))
2250
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2251
                    constants.CV_EINSTANCEBADNODE, instance.name,
2252
                    "instance lives on non-vm_capable node %s",
2253
                    self.cfg.GetNodeName(node_uuid))
2254

    
2255
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2256
    """Verify if there are any unknown volumes in the cluster.
2257

2258
    The .os, .swap and backup volumes are ignored. All other volumes are
2259
    reported as unknown.
2260

2261
    @type reserved: L{ganeti.utils.FieldSet}
2262
    @param reserved: a FieldSet of reserved volume names
2263

2264
    """
2265
    for node_uuid, n_img in node_image.items():
2266
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2267
          self.all_node_info[node_uuid].group != self.group_uuid):
2268
        # skip non-healthy nodes
2269
        continue
2270
      for volume in n_img.volumes:
2271
        test = ((node_uuid not in node_vol_should or
2272
                volume not in node_vol_should[node_uuid]) and
2273
                not reserved.Matches(volume))
2274
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2275
                      self.cfg.GetNodeName(node_uuid),
2276
                      "volume %s is unknown", volume,
2277
                      code=_VerifyErrors.ETYPE_WARNING)
2278

    
2279
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2280
    """Verify N+1 Memory Resilience.
2281

2282
    Check that if one single node dies we can still start all the
2283
    instances it was primary for.
2284

2285
    """
2286
    cluster_info = self.cfg.GetClusterInfo()
2287
    for node_uuid, n_img in node_image.items():
2288
      # This code checks that every node which is now listed as
2289
      # secondary has enough memory to host all instances it is
2290
      # supposed to should a single other node in the cluster fail.
2291
      # FIXME: not ready for failover to an arbitrary node
2292
      # FIXME: does not support file-backed instances
2293
      # WARNING: we currently take into account down instances as well
2294
      # as up ones, considering that even if they're down someone
2295
      # might want to start them even in the event of a node failure.
2296
      if n_img.offline or \
2297
         self.all_node_info[node_uuid].group != self.group_uuid:
2298
        # we're skipping nodes marked offline and nodes in other groups from
2299
        # the N+1 warning, since most likely we don't have good memory
2300
        # information from them; we already list instances living on such
2301
        # nodes, and that's enough warning
2302
        continue
2303
      #TODO(dynmem): also consider ballooning out other instances
2304
      for prinode, inst_uuids in n_img.sbp.items():
2305
        needed_mem = 0
2306
        for inst_uuid in inst_uuids:
2307
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2308
          if bep[constants.BE_AUTO_BALANCE]:
2309
            needed_mem += bep[constants.BE_MINMEM]
2310
        test = n_img.mfree < needed_mem
2311
        self._ErrorIf(test, constants.CV_ENODEN1,
2312
                      self.cfg.GetNodeName(node_uuid),
2313
                      "not enough memory to accomodate instance failovers"
2314
                      " should node %s fail (%dMiB needed, %dMiB available)",
2315
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2316

    
2317
  def _VerifyClientCertificates(self, nodes, all_nvinfo):
2318
    """Verifies the consistency of the client certificates.
2319

2320
    This includes several aspects:
2321
      - the individual validation of all nodes' certificates
2322
      - the consistency of the master candidate certificate map
2323
      - the consistency of the master candidate certificate map with the
2324
        certificates that the master candidates are actually using.
2325

2326
    @param nodes: the list of nodes to consider in this verification
2327
    @param all_nvinfo: the map of results of the verify_node call to
2328
      all nodes
2329

2330
    """
2331
    candidate_certs = self.cfg.GetClusterInfo().candidate_certs
2332
    if candidate_certs is None or len(candidate_certs) == 0:
2333
      self._ErrorIf(
2334
        True, constants.CV_ECLUSTERCLIENTCERT, None,
2335
        "The cluster's list of master candidate certificates is empty."
2336
        "If you just updated the cluster, please run"
2337
        " 'gnt-cluster renew-crypto --new-node-certificates'.")
2338
      return
2339

    
2340
    self._ErrorIf(
2341
      len(candidate_certs) != len(set(candidate_certs.values())),
2342
      constants.CV_ECLUSTERCLIENTCERT, None,
2343
      "There are at least two master candidates configured to use the same"
2344
      " certificate.")
2345

    
2346
    # collect the client certificate
2347
    for node in nodes:
2348
      if node.offline:
2349
        continue
2350

    
2351
      nresult = all_nvinfo[node.uuid]
2352
      if nresult.fail_msg or not nresult.payload:
2353
        continue
2354

    
2355
      (errcode, msg) = nresult.payload.get(constants.NV_CLIENT_CERT, None)
2356

    
2357
      self._ErrorIf(
2358
        errcode is not None, constants.CV_ECLUSTERCLIENTCERT, None,
2359
        "Client certificate of node '%s' failed validation: %s (code '%s')",
2360
        node.uuid, msg, errcode)
2361

    
2362
      if not errcode:
2363
        digest = msg
2364
        if node.master_candidate:
2365
          if node.uuid in candidate_certs:
2366
            self._ErrorIf(
2367
              digest != candidate_certs[node.uuid],
2368
              constants.CV_ECLUSTERCLIENTCERT, None,
2369
              "Client certificate digest of master candidate '%s' does not"
2370
              " match its entry in the cluster's map of master candidate"
2371
              " certificates. Expected: %s Got: %s", node.uuid,
2372
              digest, candidate_certs[node.uuid])
2373
          else:
2374
            self._ErrorIf(
2375
              True, constants.CV_ECLUSTERCLIENTCERT, None,
2376
              "The master candidate '%s' does not have an entry in the"
2377
              " map of candidate certificates.", node.uuid)
2378
            self._ErrorIf(
2379
              digest in candidate_certs.values(),
2380
              constants.CV_ECLUSTERCLIENTCERT, None,
2381
              "Master candidate '%s' is using a certificate of another node.",
2382
              node.uuid)
2383
        else:
2384
          self._ErrorIf(
2385
            node.uuid in candidate_certs,
2386
            constants.CV_ECLUSTERCLIENTCERT, None,
2387
            "Node '%s' is not a master candidate, but still listed in the"
2388
            " map of master candidate certificates.", node.uuid)
2389
          self._ErrorIf(
2390
            (node.uuid not in candidate_certs) and
2391
              (digest in candidate_certs.values()),
2392
            constants.CV_ECLUSTERCLIENTCERT, None,
2393
            "Node '%s' is not a master candidate and is incorrectly using a"
2394
            " certificate of another node which is master candidate.",
2395
            node.uuid)
2396

    
2397
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2398
                   (files_all, files_opt, files_mc, files_vm)):
2399
    """Verifies file checksums collected from all nodes.
2400

2401
    @param nodes: List of L{objects.Node} objects
2402
    @param master_node_uuid: UUID of master node
2403
    @param all_nvinfo: RPC results
2404

2405
    """
2406
    # Define functions determining which nodes to consider for a file
2407
    files2nodefn = [
2408
      (files_all, None),
2409
      (files_mc, lambda node: (node.master_candidate or
2410
                               node.uuid == master_node_uuid)),
2411
      (files_vm, lambda node: node.vm_capable),
2412
      ]
2413

    
2414
    # Build mapping from filename to list of nodes which should have the file
2415
    nodefiles = {}
2416
    for (files, fn) in files2nodefn:
2417
      if fn is None:
2418
        filenodes = nodes
2419
      else:
2420
        filenodes = filter(fn, nodes)
2421
      nodefiles.update((filename,
2422
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2423
                       for filename in files)
2424

    
2425
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2426

    
2427
    fileinfo = dict((filename, {}) for filename in nodefiles)
2428
    ignore_nodes = set()
2429

    
2430
    for node in nodes:
2431
      if node.offline:
2432
        ignore_nodes.add(node.uuid)
2433
        continue
2434

    
2435
      nresult = all_nvinfo[node.uuid]
2436

    
2437
      if nresult.fail_msg or not nresult.payload:
2438
        node_files = None
2439
      else:
2440
        fingerprints = nresult.payload.get(constants.NV_FILELIST, {})
2441
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2442
                          for (key, value) in fingerprints.items())
2443
        del fingerprints
2444

    
2445
      test = not (node_files and isinstance(node_files, dict))
2446
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2447
                    "Node did not return file checksum data")
2448
      if test:
2449
        ignore_nodes.add(node.uuid)
2450
        continue
2451

    
2452
      # Build per-checksum mapping from filename to nodes having it
2453
      for (filename, checksum) in node_files.items():
2454
        assert filename in nodefiles
2455
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2456

    
2457
    for (filename, checksums) in fileinfo.items():
2458
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2459

    
2460
      # Nodes having the file
2461
      with_file = frozenset(node_uuid
2462
                            for node_uuids in fileinfo[filename].values()
2463
                            for node_uuid in node_uuids) - ignore_nodes
2464

    
2465
      expected_nodes = nodefiles[filename] - ignore_nodes
2466

    
2467
      # Nodes missing file
2468
      missing_file = expected_nodes - with_file
2469

    
2470
      if filename in files_opt:
2471
        # All or no nodes
2472
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2473
                      constants.CV_ECLUSTERFILECHECK, None,
2474
                      "File %s is optional, but it must exist on all or no"
2475
                      " nodes (not found on %s)",
2476
                      filename,
2477
                      utils.CommaJoin(
2478
                        utils.NiceSort(
2479
                          map(self.cfg.GetNodeName, missing_file))))
2480
      else:
2481
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2482
                      "File %s is missing from node(s) %s", filename,
2483
                      utils.CommaJoin(
2484
                        utils.NiceSort(
2485
                          map(self.cfg.GetNodeName, missing_file))))
2486

    
2487
        # Warn if a node has a file it shouldn't
2488
        unexpected = with_file - expected_nodes
2489
        self._ErrorIf(unexpected,
2490
                      constants.CV_ECLUSTERFILECHECK, None,
2491
                      "File %s should not exist on node(s) %s",
2492
                      filename, utils.CommaJoin(
2493
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2494

    
2495
      # See if there are multiple versions of the file
2496
      test = len(checksums) > 1
2497
      if test:
2498
        variants = ["variant %s on %s" %
2499
                    (idx + 1,
2500
                     utils.CommaJoin(utils.NiceSort(
2501
                       map(self.cfg.GetNodeName, node_uuids))))
2502
                    for (idx, (checksum, node_uuids)) in
2503
                      enumerate(sorted(checksums.items()))]
2504
      else:
2505
        variants = []
2506

    
2507
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2508
                    "File %s found with %s different checksums (%s)",
2509
                    filename, len(checksums), "; ".join(variants))
2510

    
2511
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2512
    """Verify the drbd helper.
2513

2514
    """
2515
    if drbd_helper:
2516
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2517
      test = (helper_result is None)
2518
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2519
                    "no drbd usermode helper returned")
2520
      if helper_result:
2521
        status, payload = helper_result
2522
        test = not status
2523
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2524
                      "drbd usermode helper check unsuccessful: %s", payload)
2525
        test = status and (payload != drbd_helper)
2526
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2527
                      "wrong drbd usermode helper: %s", payload)
2528

    
2529
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2530
                      drbd_map):
2531
    """Verifies and the node DRBD status.
2532

2533
    @type ninfo: L{objects.Node}
2534
    @param ninfo: the node to check
2535
    @param nresult: the remote results for the node
2536
    @param instanceinfo: the dict of instances
2537
    @param drbd_helper: the configured DRBD usermode helper
2538
    @param drbd_map: the DRBD map as returned by
2539
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2540

2541
    """
2542
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2543

    
2544
    # compute the DRBD minors
2545
    node_drbd = {}
2546
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2547
      test = inst_uuid not in instanceinfo
2548
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2549
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2550
        # ghost instance should not be running, but otherwise we
2551
        # don't give double warnings (both ghost instance and
2552
        # unallocated minor in use)
2553
      if test:
2554
        node_drbd[minor] = (inst_uuid, False)
2555
      else:
2556
        instance = instanceinfo[inst_uuid]
2557
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2558

    
2559
    # and now check them
2560
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2561
    test = not isinstance(used_minors, (tuple, list))
2562
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2563
                  "cannot parse drbd status file: %s", str(used_minors))
2564
    if test:
2565
      # we cannot check drbd status
2566
      return
2567

    
2568
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2569
      test = minor not in used_minors and must_exist
2570
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2571
                    "drbd minor %d of instance %s is not active", minor,
2572
                    self.cfg.GetInstanceName(inst_uuid))
2573
    for minor in used_minors:
2574
      test = minor not in node_drbd
2575
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2576
                    "unallocated drbd minor %d is in use", minor)
2577

    
2578
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2579
    """Builds the node OS structures.
2580

2581
    @type ninfo: L{objects.Node}
2582
    @param ninfo: the node to check
2583
    @param nresult: the remote results for the node
2584
    @param nimg: the node image object
2585

2586
    """
2587
    remote_os = nresult.get(constants.NV_OSLIST, None)
2588
    test = (not isinstance(remote_os, list) or
2589
            not compat.all(isinstance(v, list) and len(v) == 7
2590
                           for v in remote_os))
2591

    
2592
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2593
                  "node hasn't returned valid OS data")
2594

    
2595
    nimg.os_fail = test
2596

    
2597
    if test:
2598
      return
2599

    
2600
    os_dict = {}
2601

    
2602
    for (name, os_path, status, diagnose,
2603
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2604

    
2605
      if name not in os_dict:
2606
        os_dict[name] = []
2607

    
2608
      # parameters is a list of lists instead of list of tuples due to
2609
      # JSON lacking a real tuple type, fix it:
2610
      parameters = [tuple(v) for v in parameters]
2611
      os_dict[name].append((os_path, status, diagnose,
2612
                            set(variants), set(parameters), set(api_ver)))
2613

    
2614
    nimg.oslist = os_dict
2615

    
2616
  def _VerifyNodeOS(self, ninfo, nimg, base):
2617
    """Verifies the node OS list.
2618

2619
    @type ninfo: L{objects.Node}
2620
    @param ninfo: the node to check
2621
    @param nimg: the node image object
2622
    @param base: the 'template' node we match against (e.g. from the master)
2623

2624
    """
2625
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2626

    
2627
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2628
    for os_name, os_data in nimg.oslist.items():
2629
      assert os_data, "Empty OS status for OS %s?!" % os_name
2630
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2631
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2632
                    "Invalid OS %s (located at %s): %s",
2633
                    os_name, f_path, f_diag)
2634
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2635
                    "OS '%s' has multiple entries"
2636
                    " (first one shadows the rest): %s",
2637
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2638
      # comparisons with the 'base' image
2639
      test = os_name not in base.oslist
2640
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2641
                    "Extra OS %s not present on reference node (%s)",
2642
                    os_name, self.cfg.GetNodeName(base.uuid))
2643
      if test:
2644
        continue
2645
      assert base.oslist[os_name], "Base node has empty OS status?"
2646
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2647
      if not b_status:
2648
        # base OS is invalid, skipping
2649
        continue
2650
      for kind, a, b in [("API version", f_api, b_api),
2651
                         ("variants list", f_var, b_var),
2652
                         ("parameters", beautify_params(f_param),
2653
                          beautify_params(b_param))]:
2654
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2655
                      "OS %s for %s differs from reference node %s:"
2656
                      " [%s] vs. [%s]", kind, os_name,
2657
                      self.cfg.GetNodeName(base.uuid),
2658
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2659

    
2660
    # check any missing OSes
2661
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2662
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2663
                  "OSes present on reference node %s"
2664
                  " but missing on this node: %s",
2665
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2666

    
2667
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2668
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2669

2670
    @type ninfo: L{objects.Node}
2671
    @param ninfo: the node to check
2672
    @param nresult: the remote results for the node
2673
    @type is_master: bool
2674
    @param is_master: Whether node is the master node
2675

2676
    """
2677
    cluster = self.cfg.GetClusterInfo()
2678
    if (is_master and
2679
        (cluster.IsFileStorageEnabled() or
2680
         cluster.IsSharedFileStorageEnabled())):
2681
      try:
2682
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2683
      except KeyError:
2684
        # This should never happen
2685
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2686
                      "Node did not return forbidden file storage paths")
2687
      else:
2688
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2689
                      "Found forbidden file storage paths: %s",
2690
                      utils.CommaJoin(fspaths))
2691
    else:
2692
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2693
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2694
                    "Node should not have returned forbidden file storage"
2695
                    " paths")
2696

    
2697
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2698
                          verify_key, error_key):
2699
    """Verifies (file) storage paths.
2700

2701
    @type ninfo: L{objects.Node}
2702
    @param ninfo: the node to check
2703
    @param nresult: the remote results for the node
2704
    @type file_disk_template: string
2705
    @param file_disk_template: file-based disk template, whose directory
2706
        is supposed to be verified
2707
    @type verify_key: string
2708
    @param verify_key: key for the verification map of this file
2709
        verification step
2710
    @param error_key: error key to be added to the verification results
2711
        in case something goes wrong in this verification step
2712

2713
    """
2714
    assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
2715
              constants.ST_FILE, constants.ST_SHARED_FILE
2716
           ))
2717

    
2718
    cluster = self.cfg.GetClusterInfo()
2719
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2720
      self._ErrorIf(
2721
          verify_key in nresult,
2722
          error_key, ninfo.name,
2723
          "The configured %s storage path is unusable: %s" %
2724
          (file_disk_template, nresult.get(verify_key)))
2725

    
2726
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2727
    """Verifies (file) storage paths.
2728

2729
    @see: C{_VerifyStoragePaths}
2730

2731
    """
2732
    self._VerifyStoragePaths(
2733
        ninfo, nresult, constants.DT_FILE,
2734
        constants.NV_FILE_STORAGE_PATH,
2735
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2736

    
2737
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2738
    """Verifies (file) storage paths.
2739

2740
    @see: C{_VerifyStoragePaths}
2741

2742
    """
2743
    self._VerifyStoragePaths(
2744
        ninfo, nresult, constants.DT_SHARED_FILE,
2745
        constants.NV_SHARED_FILE_STORAGE_PATH,
2746
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2747

    
2748
  def _VerifyOob(self, ninfo, nresult):
2749
    """Verifies out of band functionality of a node.
2750

2751
    @type ninfo: L{objects.Node}
2752
    @param ninfo: the node to check
2753
    @param nresult: the remote results for the node
2754

2755
    """
2756
    # We just have to verify the paths on master and/or master candidates
2757
    # as the oob helper is invoked on the master
2758
    if ((ninfo.master_candidate or ninfo.master_capable) and
2759
        constants.NV_OOB_PATHS in nresult):
2760
      for path_result in nresult[constants.NV_OOB_PATHS]:
2761
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2762
                      ninfo.name, path_result)
2763

    
2764
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2765
    """Verifies and updates the node volume data.
2766

2767
    This function will update a L{NodeImage}'s internal structures
2768
    with data from the remote call.
2769

2770
    @type ninfo: L{objects.Node}
2771
    @param ninfo: the node to check
2772
    @param nresult: the remote results for the node
2773
    @param nimg: the node image object
2774
    @param vg_name: the configured VG name
2775

2776
    """
2777
    nimg.lvm_fail = True
2778
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2779
    if vg_name is None:
2780
      pass
2781
    elif isinstance(lvdata, basestring):
2782
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2783
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2784
    elif not isinstance(lvdata, dict):
2785
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2786
                    "rpc call to node failed (lvlist)")
2787
    else:
2788
      nimg.volumes = lvdata
2789
      nimg.lvm_fail = False
2790

    
2791
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2792
    """Verifies and updates the node instance list.
2793

2794
    If the listing was successful, then updates this node's instance
2795
    list. Otherwise, it marks the RPC call as failed for the instance
2796
    list key.
2797

2798
    @type ninfo: L{objects.Node}
2799
    @param ninfo: the node to check
2800
    @param nresult: the remote results for the node
2801
    @param nimg: the node image object
2802

2803
    """
2804
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2805
    test = not isinstance(idata, list)
2806
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2807
                  "rpc call to node failed (instancelist): %s",
2808
                  utils.SafeEncode(str(idata)))
2809
    if test:
2810
      nimg.hyp_fail = True
2811
    else:
2812
      nimg.instances = [inst.uuid for (_, inst) in
2813
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2814

    
2815
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2816
    """Verifies and computes a node information map
2817

2818
    @type ninfo: L{objects.Node}
2819
    @param ninfo: the node to check
2820
    @param nresult: the remote results for the node
2821
    @param nimg: the node image object
2822
    @param vg_name: the configured VG name
2823

2824
    """
2825
    # try to read free memory (from the hypervisor)
2826
    hv_info = nresult.get(constants.NV_HVINFO, None)
2827
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2828
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2829
                  "rpc call to node failed (hvinfo)")
2830
    if not test:
2831
      try:
2832
        nimg.mfree = int(hv_info["memory_free"])
2833
      except (ValueError, TypeError):
2834
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2835
                      "node returned invalid nodeinfo, check hypervisor")
2836

    
2837
    # FIXME: devise a free space model for file based instances as well
2838
    if vg_name is not None:
2839
      test = (constants.NV_VGLIST not in nresult or
2840
              vg_name not in nresult[constants.NV_VGLIST])
2841
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2842
                    "node didn't return data for the volume group '%s'"
2843
                    " - it is either missing or broken", vg_name)
2844
      if not test:
2845
        try:
2846
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2847
        except (ValueError, TypeError):
2848
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2849
                        "node returned invalid LVM info, check LVM status")
2850

    
2851
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2852
    """Gets per-disk status information for all instances.
2853

2854
    @type node_uuids: list of strings
2855
    @param node_uuids: Node UUIDs
2856
    @type node_image: dict of (UUID, L{objects.Node})
2857
    @param node_image: Node objects
2858
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2859
    @param instanceinfo: Instance objects
2860
    @rtype: {instance: {node: [(succes, payload)]}}
2861
    @return: a dictionary of per-instance dictionaries with nodes as
2862
        keys and disk information as values; the disk information is a
2863
        list of tuples (success, payload)
2864

2865
    """
2866
    node_disks = {}
2867
    node_disks_dev_inst_only = {}
2868
    diskless_instances = set()
2869
    nodisk_instances = set()
2870
    diskless = constants.DT_DISKLESS
2871

    
2872
    for nuuid in node_uuids:
2873
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2874
                                             node_image[nuuid].sinst))
2875
      diskless_instances.update(uuid for uuid in node_inst_uuids
2876
                                if instanceinfo[uuid].disk_template == diskless)
2877
      disks = [(inst_uuid, disk)
2878
               for inst_uuid in node_inst_uuids
2879
               for disk in instanceinfo[inst_uuid].disks]
2880

    
2881
      if not disks:
2882
        nodisk_instances.update(uuid for uuid in node_inst_uuids
2883
                                if instanceinfo[uuid].disk_template != diskless)
2884
        # No need to collect data
2885
        continue
2886

    
2887
      node_disks[nuuid] = disks
2888

    
2889
      # _AnnotateDiskParams makes already copies of the disks
2890
      dev_inst_only = []
2891
      for (inst_uuid, dev) in disks:
2892
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2893
                                          self.cfg)
2894
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2895

    
2896
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2897

    
2898
    assert len(node_disks) == len(node_disks_dev_inst_only)
2899

    
2900
    # Collect data from all nodes with disks
2901
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2902
               node_disks.keys(), node_disks_dev_inst_only)
2903

    
2904
    assert len(result) == len(node_disks)
2905

    
2906
    instdisk = {}
2907

    
2908
    for (nuuid, nres) in result.items():
2909
      node = self.cfg.GetNodeInfo(nuuid)
2910
      disks = node_disks[node.uuid]
2911

    
2912
      if nres.offline:
2913
        # No data from this node
2914
        data = len(disks) * [(False, "node offline")]
2915
      else:
2916
        msg = nres.fail_msg
2917
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2918
                      "while getting disk information: %s", msg)
2919
        if msg:
2920
          # No data from this node
2921
          data = len(disks) * [(False, msg)]
2922
        else:
2923
          data = []
2924
          for idx, i in enumerate(nres.payload):
2925
            if isinstance(i, (tuple, list)) and len(i) == 2:
2926
              data.append(i)
2927
            else:
2928
              logging.warning("Invalid result from node %s, entry %d: %s",
2929
                              node.name, idx, i)
2930
              data.append((False, "Invalid result from the remote node"))
2931

    
2932
      for ((inst_uuid, _), status) in zip(disks, data):
2933
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2934
          .append(status)
2935

    
2936
    # Add empty entries for diskless instances.
2937
    for inst_uuid in diskless_instances:
2938
      assert inst_uuid not in instdisk
2939
      instdisk[inst_uuid] = {}
2940
    # ...and disk-full instances that happen to have no disks
2941
    for inst_uuid in nodisk_instances:
2942
      assert inst_uuid not in instdisk
2943
      instdisk[inst_uuid] = {}
2944

    
2945
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2946
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2947
                      compat.all(isinstance(s, (tuple, list)) and
2948
                                 len(s) == 2 for s in statuses)
2949
                      for inst, nuuids in instdisk.items()
2950
                      for nuuid, statuses in nuuids.items())
2951
    if __debug__:
2952
      instdisk_keys = set(instdisk)
2953
      instanceinfo_keys = set(instanceinfo)
2954
      assert instdisk_keys == instanceinfo_keys, \
2955
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2956
         (instdisk_keys, instanceinfo_keys))
2957

    
2958
    return instdisk
2959

    
2960
  @staticmethod
2961
  def _SshNodeSelector(group_uuid, all_nodes):
2962
    """Create endless iterators for all potential SSH check hosts.
2963

2964
    """
2965
    nodes = [node for node in all_nodes
2966
             if (node.group != group_uuid and
2967
                 not node.offline)]
2968
    keyfunc = operator.attrgetter("group")
2969

    
2970
    return map(itertools.cycle,
2971
               [sorted(map(operator.attrgetter("name"), names))
2972
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2973
                                                  keyfunc)])
2974

    
2975
  @classmethod
2976
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2977
    """Choose which nodes should talk to which other nodes.
2978

2979
    We will make nodes contact all nodes in their group, and one node from
2980
    every other group.
2981

2982
    @warning: This algorithm has a known issue if one node group is much
2983
      smaller than others (e.g. just one node). In such a case all other
2984
      nodes will talk to the single node.
2985

2986
    """
2987
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2988
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2989

    
2990
    return (online_nodes,
2991
            dict((name, sorted([i.next() for i in sel]))
2992
                 for name in online_nodes))
2993

    
2994
  def BuildHooksEnv(self):
2995
    """Build hooks env.
2996

2997
    Cluster-Verify hooks just ran in the post phase and their failure makes
2998
    the output be logged in the verify output and the verification to fail.
2999

3000
    """
3001
    env = {
3002
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
3003
      }
3004

    
3005
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
3006
               for node in self.my_node_info.values())
3007

    
3008
    return env
3009

    
3010
  def BuildHooksNodes(self):
3011
    """Build hooks nodes.
3012

3013
    """
3014
    return ([], list(self.my_node_info.keys()))
3015

    
3016
  def Exec(self, feedback_fn):
3017
    """Verify integrity of the node group, performing various test on nodes.
3018

3019
    """
3020
    # This method has too many local variables. pylint: disable=R0914
3021
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
3022

    
3023
    if not self.my_node_uuids:
3024
      # empty node group
3025
      feedback_fn("* Empty node group, skipping verification")
3026
      return True
3027

    
3028
    self.bad = False
3029
    verbose = self.op.verbose
3030
    self._feedback_fn = feedback_fn
3031

    
3032
    vg_name = self.cfg.GetVGName()
3033
    drbd_helper = self.cfg.GetDRBDHelper()
3034
    cluster = self.cfg.GetClusterInfo()
3035
    hypervisors = cluster.enabled_hypervisors
3036
    node_data_list = self.my_node_info.values()
3037

    
3038
    i_non_redundant = [] # Non redundant instances
3039
    i_non_a_balanced = [] # Non auto-balanced instances
3040
    i_offline = 0 # Count of offline instances
3041
    n_offline = 0 # Count of offline nodes
3042
    n_drained = 0 # Count of nodes being drained
3043
    node_vol_should = {}
3044

    
3045
    # FIXME: verify OS list
3046

    
3047
    # File verification
3048
    filemap = ComputeAncillaryFiles(cluster, False)
3049

    
3050
    # do local checksums
3051
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
3052
    master_ip = self.cfg.GetMasterIP()
3053

    
3054
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
3055

    
3056
    user_scripts = []
3057
    if self.cfg.GetUseExternalMipScript():
3058
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
3059

    
3060
    node_verify_param = {
3061
      constants.NV_FILELIST:
3062
        map(vcluster.MakeVirtualPath,
3063
            utils.UniqueSequence(filename
3064
                                 for files in filemap
3065
                                 for filename in files)),
3066
      constants.NV_NODELIST:
3067
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
3068
                                  self.all_node_info.values()),
3069
      constants.NV_HYPERVISOR: hypervisors,
3070
      constants.NV_HVPARAMS:
3071
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
3072
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
3073
                                 for node in node_data_list
3074
                                 if not node.offline],
3075
      constants.NV_INSTANCELIST: hypervisors,
3076
      constants.NV_VERSION: None,
3077
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
3078
      constants.NV_NODESETUP: None,
3079
      constants.NV_TIME: None,
3080
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
3081
      constants.NV_OSLIST: None,
3082
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
3083
      constants.NV_USERSCRIPTS: user_scripts,
3084
      constants.NV_CLIENT_CERT: None,
3085
      }
3086

    
3087
    if vg_name is not None:
3088
      node_verify_param[constants.NV_VGLIST] = None
3089
      node_verify_param[constants.NV_LVLIST] = vg_name
3090
      node_verify_param[constants.NV_PVLIST] = [vg_name]
3091

    
3092
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
3093
      if drbd_helper:
3094
        node_verify_param[constants.NV_DRBDVERSION] = None
3095
        node_verify_param[constants.NV_DRBDLIST] = None
3096
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
3097

    
3098
    if cluster.IsFileStorageEnabled() or \
3099
        cluster.IsSharedFileStorageEnabled():
3100
      # Load file storage paths only from master node
3101
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
3102
        self.cfg.GetMasterNodeName()
3103
      if cluster.IsFileStorageEnabled():
3104
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
3105
          cluster.file_storage_dir
3106

    
3107
    # bridge checks
3108
    # FIXME: this needs to be changed per node-group, not cluster-wide
3109
    bridges = set()
3110
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
3111
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3112
      bridges.add(default_nicpp[constants.NIC_LINK])
3113
    for inst_uuid in self.my_inst_info.values():
3114
      for nic in inst_uuid.nics:
3115
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
3116
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3117
          bridges.add(full_nic[constants.NIC_LINK])
3118

    
3119
    if bridges:
3120
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
3121

    
3122
    # Build our expected cluster state
3123
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
3124
                                                 uuid=node.uuid,
3125
                                                 vm_capable=node.vm_capable))
3126
                      for node in node_data_list)
3127

    
3128
    # Gather OOB paths
3129
    oob_paths = []
3130
    for node in self.all_node_info.values():
3131
      path = SupportsOob(self.cfg, node)
3132
      if path and path not in oob_paths:
3133
        oob_paths.append(path)
3134

    
3135
    if oob_paths:
3136
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
3137

    
3138
    for inst_uuid in self.my_inst_uuids:
3139
      instance = self.my_inst_info[inst_uuid]
3140
      if instance.admin_state == constants.ADMINST_OFFLINE:
3141
        i_offline += 1
3142

    
3143
      for nuuid in instance.all_nodes:
3144
        if nuuid not in node_image:
3145
          gnode = self.NodeImage(uuid=nuuid)
3146
          gnode.ghost = (nuuid not in self.all_node_info)
3147
          node_image[nuuid] = gnode
3148

    
3149
      instance.MapLVsByNode(node_vol_should)
3150

    
3151
      pnode = instance.primary_node
3152
      node_image[pnode].pinst.append(instance.uuid)
3153

    
3154
      for snode in instance.secondary_nodes:
3155
        nimg = node_image[snode]
3156
        nimg.sinst.append(instance.uuid)
3157
        if pnode not in nimg.sbp:
3158
          nimg.sbp[pnode] = []
3159
        nimg.sbp[pnode].append(instance.uuid)
3160

    
3161
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3162
                                               self.my_node_info.keys())
3163
    # The value of exclusive_storage should be the same across the group, so if
3164
    # it's True for at least a node, we act as if it were set for all the nodes
3165
    self._exclusive_storage = compat.any(es_flags.values())
3166
    if self._exclusive_storage:
3167
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3168

    
3169
    node_group_uuids = dict(map(lambda n: (n.name, n.group),
3170
                                self.cfg.GetAllNodesInfo().values()))
3171
    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
3172

    
3173
    # At this point, we have the in-memory data structures complete,
3174
    # except for the runtime information, which we'll gather next
3175

    
3176
    # Due to the way our RPC system works, exact response times cannot be
3177
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3178
    # time before and after executing the request, we can at least have a time
3179
    # window.
3180
    nvinfo_starttime = time.time()
3181
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3182
                                           node_verify_param,
3183
                                           self.cfg.GetClusterName(),
3184
                                           self.cfg.GetClusterInfo().hvparams,
3185
                                           node_group_uuids,
3186
                                           groups_config)
3187
    nvinfo_endtime = time.time()
3188

    
3189
    if self.extra_lv_nodes and vg_name is not None:
3190
      extra_lv_nvinfo = \
3191
          self.rpc.call_node_verify(self.extra_lv_nodes,
3192
                                    {constants.NV_LVLIST: vg_name},
3193
                                    self.cfg.GetClusterName(),
3194
                                    self.cfg.GetClusterInfo().hvparams,
3195
                                    node_group_uuids,
3196
                                    groups_config)
3197
    else:
3198
      extra_lv_nvinfo = {}
3199

    
3200
    all_drbd_map = self.cfg.ComputeDRBDMap()
3201

    
3202
    feedback_fn("* Gathering disk information (%s nodes)" %
3203
                len(self.my_node_uuids))
3204
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3205
                                     self.my_inst_info)
3206

    
3207
    feedback_fn("* Verifying configuration file consistency")
3208

    
3209
    self._VerifyClientCertificates(self.my_node_info.values(), all_nvinfo)
3210
    # If not all nodes are being checked, we need to make sure the master node
3211
    # and a non-checked vm_capable node are in the list.
3212
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3213
    if absent_node_uuids:
3214
      vf_nvinfo = all_nvinfo.copy()
3215
      vf_node_info = list(self.my_node_info.values())
3216
      additional_node_uuids = []
3217
      if master_node_uuid not in self.my_node_info:
3218
        additional_node_uuids.append(master_node_uuid)
3219
        vf_node_info.append(self.all_node_info[master_node_uuid])
3220
      # Add the first vm_capable node we find which is not included,
3221
      # excluding the master node (which we already have)
3222
      for node_uuid in absent_node_uuids:
3223
        nodeinfo = self.all_node_info[node_uuid]
3224
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3225
            node_uuid != master_node_uuid):
3226
          additional_node_uuids.append(node_uuid)
3227
          vf_node_info.append(self.all_node_info[node_uuid])
3228
          break
3229
      key = constants.NV_FILELIST
3230
      vf_nvinfo.update(self.rpc.call_node_verify(
3231
         additional_node_uuids, {key: node_verify_param[key]},
3232
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
3233
         node_group_uuids,
3234
         groups_config))
3235
    else:
3236
      vf_nvinfo = all_nvinfo
3237
      vf_node_info = self.my_node_info.values()
3238

    
3239
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3240

    
3241
    feedback_fn("* Verifying node status")
3242

    
3243
    refos_img = None
3244

    
3245
    for node_i in node_data_list:
3246
      nimg = node_image[node_i.uuid]
3247

    
3248
      if node_i.offline:
3249
        if verbose:
3250
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3251
        n_offline += 1
3252
        continue
3253

    
3254
      if node_i.uuid == master_node_uuid:
3255
        ntype = "master"
3256
      elif node_i.master_candidate:
3257
        ntype = "master candidate"
3258
      elif node_i.drained:
3259
        ntype = "drained"
3260
        n_drained += 1
3261
      else:
3262
        ntype = "regular"
3263
      if verbose:
3264
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3265

    
3266
      msg = all_nvinfo[node_i.uuid].fail_msg
3267
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3268
                    "while contacting node: %s", msg)
3269
      if msg:
3270
        nimg.rpc_fail = True
3271
        continue
3272

    
3273
      nresult = all_nvinfo[node_i.uuid].payload
3274

    
3275
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3276
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3277
      self._VerifyNodeNetwork(node_i, nresult)
3278
      self._VerifyNodeUserScripts(node_i, nresult)
3279
      self._VerifyOob(node_i, nresult)
3280
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3281
                                           node_i.uuid == master_node_uuid)
3282
      self._VerifyFileStoragePaths(node_i, nresult)
3283
      self._VerifySharedFileStoragePaths(node_i, nresult)
3284

    
3285
      if nimg.vm_capable:
3286
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3287
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3288
                             all_drbd_map)
3289

    
3290
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3291
        self._UpdateNodeInstances(node_i, nresult, nimg)
3292
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3293
        self._UpdateNodeOS(node_i, nresult, nimg)
3294

    
3295
        if not nimg.os_fail:
3296
          if refos_img is None:
3297
            refos_img = nimg
3298
          self._VerifyNodeOS(node_i, nimg, refos_img)
3299
        self._VerifyNodeBridges(node_i, nresult, bridges)
3300

    
3301
        # Check whether all running instances are primary for the node. (This
3302
        # can no longer be done from _VerifyInstance below, since some of the
3303
        # wrong instances could be from other node groups.)
3304
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3305

    
3306
        for inst_uuid in non_primary_inst_uuids:
3307
          test = inst_uuid in self.all_inst_info
3308
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3309
                        self.cfg.GetInstanceName(inst_uuid),
3310
                        "instance should not run on node %s", node_i.name)
3311
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3312
                        "node is running unknown instance %s", inst_uuid)
3313

    
3314
    self._VerifyGroupDRBDVersion(all_nvinfo)
3315
    self._VerifyGroupLVM(node_image, vg_name)
3316

    
3317
    for node_uuid, result in extra_lv_nvinfo.items():
3318
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3319
                              node_image[node_uuid], vg_name)
3320

    
3321
    feedback_fn("* Verifying instance status")
3322
    for inst_uuid in self.my_inst_uuids:
3323
      instance = self.my_inst_info[inst_uuid]
3324
      if verbose:
3325
        feedback_fn("* Verifying instance %s" % instance.name)
3326
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3327

    
3328
      # If the instance is non-redundant we cannot survive losing its primary
3329
      # node, so we are not N+1 compliant.
3330
      if instance.disk_template not in constants.DTS_MIRRORED:
3331
        i_non_redundant.append(instance)
3332

    
3333
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3334
        i_non_a_balanced.append(instance)
3335

    
3336
    feedback_fn("* Verifying orphan volumes")
3337
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3338

    
3339
    # We will get spurious "unknown volume" warnings if any node of this group
3340
    # is secondary for an instance whose primary is in another group. To avoid
3341
    # them, we find these instances and add their volumes to node_vol_should.
3342
    for instance in self.all_inst_info.values():
3343
      for secondary in instance.secondary_nodes:
3344
        if (secondary in self.my_node_info
3345
            and instance.name not in self.my_inst_info):
3346
          instance.MapLVsByNode(node_vol_should)
3347
          break
3348

    
3349
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3350

    
3351
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3352
      feedback_fn("* Verifying N+1 Memory redundancy")
3353
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3354

    
3355
    feedback_fn("* Other Notes")
3356
    if i_non_redundant:
3357
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3358
                  % len(i_non_redundant))
3359

    
3360
    if i_non_a_balanced:
3361
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3362
                  % len(i_non_a_balanced))
3363

    
3364
    if i_offline:
3365
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3366

    
3367
    if n_offline:
3368
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3369

    
3370
    if n_drained:
3371
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3372

    
3373
    return not self.bad
3374

    
3375
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3376
    """Analyze the post-hooks' result
3377

3378
    This method analyses the hook result, handles it, and sends some
3379
    nicely-formatted feedback back to the user.
3380

3381
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3382
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3383
    @param hooks_results: the results of the multi-node hooks rpc call
3384
    @param feedback_fn: function used send feedback back to the caller
3385
    @param lu_result: previous Exec result
3386
    @return: the new Exec result, based on the previous result
3387
        and hook results
3388

3389
    """
3390
    # We only really run POST phase hooks, only for non-empty groups,
3391
    # and are only interested in their results
3392
    if not self.my_node_uuids:
3393
      # empty node group
3394
      pass
3395
    elif phase == constants.HOOKS_PHASE_POST:
3396
      # Used to change hooks' output to proper indentation
3397
      feedback_fn("* Hooks Results")
3398
      assert hooks_results, "invalid result from hooks"
3399

    
3400
      for node_name in hooks_results:
3401
        res = hooks_results[node_name]
3402
        msg = res.fail_msg
3403
        test = msg and not res.offline
3404
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3405
                      "Communication failure in hooks execution: %s", msg)
3406
        if res.offline or msg:
3407
          # No need to investigate payload if node is offline or gave
3408
          # an error.
3409
          continue
3410
        for script, hkr, output in res.payload:
3411
          test = hkr == constants.HKR_FAIL
3412
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3413
                        "Script %s failed, output:", script)
3414
          if test:
3415
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3416
            feedback_fn("%s" % output)
3417
            lu_result = False
3418

    
3419
    return lu_result
3420

    
3421

    
3422
class LUClusterVerifyDisks(NoHooksLU):
3423
  """Verifies the cluster disks status.
3424

3425
  """
3426
  REQ_BGL = False
3427

    
3428
  def ExpandNames(self):
3429
    self.share_locks = ShareAll()
3430
    self.needed_locks = {
3431
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3432
      }
3433

    
3434
  def Exec(self, feedback_fn):
3435
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3436

    
3437
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3438
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3439
                           for group in group_names])