Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ 31d3b918

History | View | Annotate | Download (128.6 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import copy
25
import itertools
26
import logging
27
import operator
28
import os
29
import re
30
import time
31

    
32
from ganeti import compat
33
from ganeti import constants
34
from ganeti import errors
35
from ganeti import hypervisor
36
from ganeti import locking
37
from ganeti import masterd
38
from ganeti import netutils
39
from ganeti import objects
40
from ganeti import opcodes
41
from ganeti import pathutils
42
from ganeti import query
43
import ganeti.rpc.node as rpc
44
from ganeti import runtime
45
from ganeti import ssh
46
from ganeti import uidpool
47
from ganeti import utils
48
from ganeti import vcluster
49

    
50
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
51
  ResultWithJobs
52
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
53
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
54
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
55
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
56
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
57
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
58
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
59
  CheckDiskAccessModeConsistency, CreateNewClientCert
60

    
61
import ganeti.masterd.instance
62

    
63

    
64
def _UpdateMasterClientCert(
65
    lu, master_uuid, cluster, feedback_fn,
66
    client_cert=pathutils.NODED_CLIENT_CERT_FILE,
67
    client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP):
68
  """Renews the master's client certificate and propagates the config.
69

70
  @type lu: C{LogicalUnit}
71
  @param lu: the logical unit holding the config
72
  @type master_uuid: string
73
  @param master_uuid: the master node's UUID
74
  @type cluster: C{objects.Cluster}
75
  @param cluster: the cluster's configuration
76
  @type feedback_fn: function
77
  @param feedback_fn: feedback functions for config updates
78
  @type client_cert: string
79
  @param client_cert: the path of the client certificate
80
  @type client_cert_tmp: string
81
  @param client_cert_tmp: the temporary path of the client certificate
82
  @rtype: string
83
  @return: the digest of the newly created client certificate
84

85
  """
86
  client_digest = CreateNewClientCert(lu, master_uuid, filename=client_cert_tmp)
87
  utils.AddNodeToCandidateCerts(master_uuid, client_digest,
88
                                cluster.candidate_certs)
89
  # This triggers an update of the config and distribution of it with the old
90
  # SSL certificate
91
  lu.cfg.Update(cluster, feedback_fn)
92

    
93
  utils.RemoveFile(client_cert)
94
  utils.RenameFile(client_cert_tmp, client_cert)
95
  return client_digest
96

    
97

    
98
class LUClusterRenewCrypto(NoHooksLU):
99
  """Renew the cluster's crypto tokens.
100

101
  Note that most of this operation is done in gnt_cluster.py, this LU only
102
  takes care of the renewal of the client SSL certificates.
103

104
  """
105
  def Exec(self, feedback_fn):
106
    master_uuid = self.cfg.GetMasterNode()
107
    cluster = self.cfg.GetClusterInfo()
108

    
109
    server_digest = utils.GetCertificateDigest(
110
      cert_filename=pathutils.NODED_CERT_FILE)
111
    utils.AddNodeToCandidateCerts("%s-SERVER" % master_uuid,
112
                                  server_digest,
113
                                  cluster.candidate_certs)
114
    new_master_digest = _UpdateMasterClientCert(self, master_uuid, cluster,
115
                                                feedback_fn)
116

    
117
    cluster.candidate_certs = {master_uuid: new_master_digest}
118
    nodes = self.cfg.GetAllNodesInfo()
119
    for (node_uuid, node_info) in nodes.items():
120
      if node_uuid != master_uuid:
121
        new_digest = CreateNewClientCert(self, node_uuid)
122
        if node_info.master_candidate:
123
          cluster.candidate_certs[node_uuid] = new_digest
124
    # Trigger another update of the config now with the new master cert
125
    self.cfg.Update(cluster, feedback_fn)
126

    
127

    
128
class LUClusterActivateMasterIp(NoHooksLU):
129
  """Activate the master IP on the master node.
130

131
  """
132
  def Exec(self, feedback_fn):
133
    """Activate the master IP.
134

135
    """
136
    master_params = self.cfg.GetMasterNetworkParameters()
137
    ems = self.cfg.GetUseExternalMipScript()
138
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
139
                                                   master_params, ems)
140
    result.Raise("Could not activate the master IP")
141

    
142

    
143
class LUClusterDeactivateMasterIp(NoHooksLU):
144
  """Deactivate the master IP on the master node.
145

146
  """
147
  def Exec(self, feedback_fn):
148
    """Deactivate the master IP.
149

150
    """
151
    master_params = self.cfg.GetMasterNetworkParameters()
152
    ems = self.cfg.GetUseExternalMipScript()
153
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
154
                                                     master_params, ems)
155
    result.Raise("Could not deactivate the master IP")
156

    
157

    
158
class LUClusterConfigQuery(NoHooksLU):
159
  """Return configuration values.
160

161
  """
162
  REQ_BGL = False
163

    
164
  def CheckArguments(self):
165
    self.cq = ClusterQuery(None, self.op.output_fields, False)
166

    
167
  def ExpandNames(self):
168
    self.cq.ExpandNames(self)
169

    
170
  def DeclareLocks(self, level):
171
    self.cq.DeclareLocks(self, level)
172

    
173
  def Exec(self, feedback_fn):
174
    result = self.cq.OldStyleQuery(self)
175

    
176
    assert len(result) == 1
177

    
178
    return result[0]
179

    
180

    
181
class LUClusterDestroy(LogicalUnit):
182
  """Logical unit for destroying the cluster.
183

184
  """
185
  HPATH = "cluster-destroy"
186
  HTYPE = constants.HTYPE_CLUSTER
187

    
188
  def BuildHooksEnv(self):
189
    """Build hooks env.
190

191
    """
192
    return {
193
      "OP_TARGET": self.cfg.GetClusterName(),
194
      }
195

    
196
  def BuildHooksNodes(self):
197
    """Build hooks nodes.
198

199
    """
200
    return ([], [])
201

    
202
  def CheckPrereq(self):
203
    """Check prerequisites.
204

205
    This checks whether the cluster is empty.
206

207
    Any errors are signaled by raising errors.OpPrereqError.
208

209
    """
210
    master = self.cfg.GetMasterNode()
211

    
212
    nodelist = self.cfg.GetNodeList()
213
    if len(nodelist) != 1 or nodelist[0] != master:
214
      raise errors.OpPrereqError("There are still %d node(s) in"
215
                                 " this cluster." % (len(nodelist) - 1),
216
                                 errors.ECODE_INVAL)
217
    instancelist = self.cfg.GetInstanceList()
218
    if instancelist:
219
      raise errors.OpPrereqError("There are still %d instance(s) in"
220
                                 " this cluster." % len(instancelist),
221
                                 errors.ECODE_INVAL)
222

    
223
  def Exec(self, feedback_fn):
224
    """Destroys the cluster.
225

226
    """
227
    master_params = self.cfg.GetMasterNetworkParameters()
228

    
229
    # Run post hooks on master node before it's removed
230
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
231

    
232
    ems = self.cfg.GetUseExternalMipScript()
233
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
234
                                                     master_params, ems)
235
    result.Warn("Error disabling the master IP address", self.LogWarning)
236
    return master_params.uuid
237

    
238

    
239
class LUClusterPostInit(LogicalUnit):
240
  """Logical unit for running hooks after cluster initialization.
241

242
  """
243
  HPATH = "cluster-init"
244
  HTYPE = constants.HTYPE_CLUSTER
245

    
246
  def CheckArguments(self):
247
    self.master_uuid = self.cfg.GetMasterNode()
248
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
249

    
250
    # TODO: When Issue 584 is solved, and None is properly parsed when used
251
    # as a default value, ndparams.get(.., None) can be changed to
252
    # ndparams[..] to access the values directly
253

    
254
    # OpenvSwitch: Warn user if link is missing
255
    if (self.master_ndparams[constants.ND_OVS] and not
256
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
257
      self.LogInfo("No physical interface for OpenvSwitch was given."
258
                   " OpenvSwitch will not have an outside connection. This"
259
                   " might not be what you want.")
260

    
261
  def BuildHooksEnv(self):
262
    """Build hooks env.
263

264
    """
265
    return {
266
      "OP_TARGET": self.cfg.GetClusterName(),
267
      }
268

    
269
  def BuildHooksNodes(self):
270
    """Build hooks nodes.
271

272
    """
273
    return ([], [self.cfg.GetMasterNode()])
274

    
275
  def Exec(self, feedback_fn):
276
    """Create and configure Open vSwitch
277

278
    """
279
    if self.master_ndparams[constants.ND_OVS]:
280
      result = self.rpc.call_node_configure_ovs(
281
                 self.master_uuid,
282
                 self.master_ndparams[constants.ND_OVS_NAME],
283
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
284
      result.Raise("Could not successully configure Open vSwitch")
285

    
286
    cluster = self.cfg.GetClusterInfo()
287
    _UpdateMasterClientCert(self, self.master_uuid, cluster, feedback_fn)
288

    
289
    return True
290

    
291

    
292
class ClusterQuery(QueryBase):
293
  FIELDS = query.CLUSTER_FIELDS
294

    
295
  #: Do not sort (there is only one item)
296
  SORT_FIELD = None
297

    
298
  def ExpandNames(self, lu):
299
    lu.needed_locks = {}
300

    
301
    # The following variables interact with _QueryBase._GetNames
302
    self.wanted = locking.ALL_SET
303
    self.do_locking = self.use_locking
304

    
305
    if self.do_locking:
306
      raise errors.OpPrereqError("Can not use locking for cluster queries",
307
                                 errors.ECODE_INVAL)
308

    
309
  def DeclareLocks(self, lu, level):
310
    pass
311

    
312
  def _GetQueryData(self, lu):
313
    """Computes the list of nodes and their attributes.
314

315
    """
316
    # Locking is not used
317
    assert not (compat.any(lu.glm.is_owned(level)
318
                           for level in locking.LEVELS
319
                           if level != locking.LEVEL_CLUSTER) or
320
                self.do_locking or self.use_locking)
321

    
322
    if query.CQ_CONFIG in self.requested_data:
323
      cluster = lu.cfg.GetClusterInfo()
324
      nodes = lu.cfg.GetAllNodesInfo()
325
    else:
326
      cluster = NotImplemented
327
      nodes = NotImplemented
328

    
329
    if query.CQ_QUEUE_DRAINED in self.requested_data:
330
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
331
    else:
332
      drain_flag = NotImplemented
333

    
334
    if query.CQ_WATCHER_PAUSE in self.requested_data:
335
      master_node_uuid = lu.cfg.GetMasterNode()
336

    
337
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
338
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
339
                   lu.cfg.GetMasterNodeName())
340

    
341
      watcher_pause = result.payload
342
    else:
343
      watcher_pause = NotImplemented
344

    
345
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
346

    
347

    
348
class LUClusterQuery(NoHooksLU):
349
  """Query cluster configuration.
350

351
  """
352
  REQ_BGL = False
353

    
354
  def ExpandNames(self):
355
    self.needed_locks = {}
356

    
357
  def Exec(self, feedback_fn):
358
    """Return cluster config.
359

360
    """
361
    cluster = self.cfg.GetClusterInfo()
362
    os_hvp = {}
363

    
364
    # Filter just for enabled hypervisors
365
    for os_name, hv_dict in cluster.os_hvp.items():
366
      os_hvp[os_name] = {}
367
      for hv_name, hv_params in hv_dict.items():
368
        if hv_name in cluster.enabled_hypervisors:
369
          os_hvp[os_name][hv_name] = hv_params
370

    
371
    # Convert ip_family to ip_version
372
    primary_ip_version = constants.IP4_VERSION
373
    if cluster.primary_ip_family == netutils.IP6Address.family:
374
      primary_ip_version = constants.IP6_VERSION
375

    
376
    result = {
377
      "software_version": constants.RELEASE_VERSION,
378
      "protocol_version": constants.PROTOCOL_VERSION,
379
      "config_version": constants.CONFIG_VERSION,
380
      "os_api_version": max(constants.OS_API_VERSIONS),
381
      "export_version": constants.EXPORT_VERSION,
382
      "vcs_version": constants.VCS_VERSION,
383
      "architecture": runtime.GetArchInfo(),
384
      "name": cluster.cluster_name,
385
      "master": self.cfg.GetMasterNodeName(),
386
      "default_hypervisor": cluster.primary_hypervisor,
387
      "enabled_hypervisors": cluster.enabled_hypervisors,
388
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
389
                        for hypervisor_name in cluster.enabled_hypervisors]),
390
      "os_hvp": os_hvp,
391
      "beparams": cluster.beparams,
392
      "osparams": cluster.osparams,
393
      "ipolicy": cluster.ipolicy,
394
      "nicparams": cluster.nicparams,
395
      "ndparams": cluster.ndparams,
396
      "diskparams": cluster.diskparams,
397
      "candidate_pool_size": cluster.candidate_pool_size,
398
      "max_running_jobs": cluster.max_running_jobs,
399
      "master_netdev": cluster.master_netdev,
400
      "master_netmask": cluster.master_netmask,
401
      "use_external_mip_script": cluster.use_external_mip_script,
402
      "volume_group_name": cluster.volume_group_name,
403
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
404
      "file_storage_dir": cluster.file_storage_dir,
405
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
406
      "maintain_node_health": cluster.maintain_node_health,
407
      "ctime": cluster.ctime,
408
      "mtime": cluster.mtime,
409
      "uuid": cluster.uuid,
410
      "tags": list(cluster.GetTags()),
411
      "uid_pool": cluster.uid_pool,
412
      "default_iallocator": cluster.default_iallocator,
413
      "default_iallocator_params": cluster.default_iallocator_params,
414
      "reserved_lvs": cluster.reserved_lvs,
415
      "primary_ip_version": primary_ip_version,
416
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
417
      "hidden_os": cluster.hidden_os,
418
      "blacklisted_os": cluster.blacklisted_os,
419
      "enabled_disk_templates": cluster.enabled_disk_templates,
420
      }
421

    
422
    return result
423

    
424

    
425
class LUClusterRedistConf(NoHooksLU):
426
  """Force the redistribution of cluster configuration.
427

428
  This is a very simple LU.
429

430
  """
431
  REQ_BGL = False
432

    
433
  def ExpandNames(self):
434
    self.needed_locks = {
435
      locking.LEVEL_NODE: locking.ALL_SET,
436
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
437
    }
438
    self.share_locks = ShareAll()
439

    
440
  def Exec(self, feedback_fn):
441
    """Redistribute the configuration.
442

443
    """
444
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
445
    RedistributeAncillaryFiles(self)
446

    
447

    
448
class LUClusterRename(LogicalUnit):
449
  """Rename the cluster.
450

451
  """
452
  HPATH = "cluster-rename"
453
  HTYPE = constants.HTYPE_CLUSTER
454

    
455
  def BuildHooksEnv(self):
456
    """Build hooks env.
457

458
    """
459
    return {
460
      "OP_TARGET": self.cfg.GetClusterName(),
461
      "NEW_NAME": self.op.name,
462
      }
463

    
464
  def BuildHooksNodes(self):
465
    """Build hooks nodes.
466

467
    """
468
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
469

    
470
  def CheckPrereq(self):
471
    """Verify that the passed name is a valid one.
472

473
    """
474
    hostname = netutils.GetHostname(name=self.op.name,
475
                                    family=self.cfg.GetPrimaryIPFamily())
476

    
477
    new_name = hostname.name
478
    self.ip = new_ip = hostname.ip
479
    old_name = self.cfg.GetClusterName()
480
    old_ip = self.cfg.GetMasterIP()
481
    if new_name == old_name and new_ip == old_ip:
482
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
483
                                 " cluster has changed",
484
                                 errors.ECODE_INVAL)
485
    if new_ip != old_ip:
486
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
487
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
488
                                   " reachable on the network" %
489
                                   new_ip, errors.ECODE_NOTUNIQUE)
490

    
491
    self.op.name = new_name
492

    
493
  def Exec(self, feedback_fn):
494
    """Rename the cluster.
495

496
    """
497
    clustername = self.op.name
498
    new_ip = self.ip
499

    
500
    # shutdown the master IP
501
    master_params = self.cfg.GetMasterNetworkParameters()
502
    ems = self.cfg.GetUseExternalMipScript()
503
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
504
                                                     master_params, ems)
505
    result.Raise("Could not disable the master role")
506

    
507
    try:
508
      cluster = self.cfg.GetClusterInfo()
509
      cluster.cluster_name = clustername
510
      cluster.master_ip = new_ip
511
      self.cfg.Update(cluster, feedback_fn)
512

    
513
      # update the known hosts file
514
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
515
      node_list = self.cfg.GetOnlineNodeList()
516
      try:
517
        node_list.remove(master_params.uuid)
518
      except ValueError:
519
        pass
520
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
521
    finally:
522
      master_params.ip = new_ip
523
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
524
                                                     master_params, ems)
525
      result.Warn("Could not re-enable the master role on the master,"
526
                  " please restart manually", self.LogWarning)
527

    
528
    return clustername
529

    
530

    
531
class LUClusterRepairDiskSizes(NoHooksLU):
532
  """Verifies the cluster disks sizes.
533

534
  """
535
  REQ_BGL = False
536

    
537
  def ExpandNames(self):
538
    if self.op.instances:
539
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
540
      # Not getting the node allocation lock as only a specific set of
541
      # instances (and their nodes) is going to be acquired
542
      self.needed_locks = {
543
        locking.LEVEL_NODE_RES: [],
544
        locking.LEVEL_INSTANCE: self.wanted_names,
545
        }
546
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
547
    else:
548
      self.wanted_names = None
549
      self.needed_locks = {
550
        locking.LEVEL_NODE_RES: locking.ALL_SET,
551
        locking.LEVEL_INSTANCE: locking.ALL_SET,
552

    
553
        # This opcode is acquires the node locks for all instances
554
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
555
        }
556

    
557
    self.share_locks = {
558
      locking.LEVEL_NODE_RES: 1,
559
      locking.LEVEL_INSTANCE: 0,
560
      locking.LEVEL_NODE_ALLOC: 1,
561
      }
562

    
563
  def DeclareLocks(self, level):
564
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
565
      self._LockInstancesNodes(primary_only=True, level=level)
566

    
567
  def CheckPrereq(self):
568
    """Check prerequisites.
569

570
    This only checks the optional instance list against the existing names.
571

572
    """
573
    if self.wanted_names is None:
574
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
575

    
576
    self.wanted_instances = \
577
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
578

    
579
  def _EnsureChildSizes(self, disk):
580
    """Ensure children of the disk have the needed disk size.
581

582
    This is valid mainly for DRBD8 and fixes an issue where the
583
    children have smaller disk size.
584

585
    @param disk: an L{ganeti.objects.Disk} object
586

587
    """
588
    if disk.dev_type == constants.DT_DRBD8:
589
      assert disk.children, "Empty children for DRBD8?"
590
      fchild = disk.children[0]
591
      mismatch = fchild.size < disk.size
592
      if mismatch:
593
        self.LogInfo("Child disk has size %d, parent %d, fixing",
594
                     fchild.size, disk.size)
595
        fchild.size = disk.size
596

    
597
      # and we recurse on this child only, not on the metadev
598
      return self._EnsureChildSizes(fchild) or mismatch
599
    else:
600
      return False
601

    
602
  def Exec(self, feedback_fn):
603
    """Verify the size of cluster disks.
604

605
    """
606
    # TODO: check child disks too
607
    # TODO: check differences in size between primary/secondary nodes
608
    per_node_disks = {}
609
    for instance in self.wanted_instances:
610
      pnode = instance.primary_node
611
      if pnode not in per_node_disks:
612
        per_node_disks[pnode] = []
613
      for idx, disk in enumerate(instance.disks):
614
        per_node_disks[pnode].append((instance, idx, disk))
615

    
616
    assert not (frozenset(per_node_disks.keys()) -
617
                self.owned_locks(locking.LEVEL_NODE_RES)), \
618
      "Not owning correct locks"
619
    assert not self.owned_locks(locking.LEVEL_NODE)
620

    
621
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
622
                                               per_node_disks.keys())
623

    
624
    changed = []
625
    for node_uuid, dskl in per_node_disks.items():
626
      if not dskl:
627
        # no disks on the node
628
        continue
629

    
630
      newl = [([v[2].Copy()], v[0]) for v in dskl]
631
      node_name = self.cfg.GetNodeName(node_uuid)
632
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
633
      if result.fail_msg:
634
        self.LogWarning("Failure in blockdev_getdimensions call to node"
635
                        " %s, ignoring", node_name)
636
        continue
637
      if len(result.payload) != len(dskl):
638
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
639
                        " result.payload=%s", node_name, len(dskl),
640
                        result.payload)
641
        self.LogWarning("Invalid result from node %s, ignoring node results",
642
                        node_name)
643
        continue
644
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
645
        if dimensions is None:
646
          self.LogWarning("Disk %d of instance %s did not return size"
647
                          " information, ignoring", idx, instance.name)
648
          continue
649
        if not isinstance(dimensions, (tuple, list)):
650
          self.LogWarning("Disk %d of instance %s did not return valid"
651
                          " dimension information, ignoring", idx,
652
                          instance.name)
653
          continue
654
        (size, spindles) = dimensions
655
        if not isinstance(size, (int, long)):
656
          self.LogWarning("Disk %d of instance %s did not return valid"
657
                          " size information, ignoring", idx, instance.name)
658
          continue
659
        size = size >> 20
660
        if size != disk.size:
661
          self.LogInfo("Disk %d of instance %s has mismatched size,"
662
                       " correcting: recorded %d, actual %d", idx,
663
                       instance.name, disk.size, size)
664
          disk.size = size
665
          self.cfg.Update(instance, feedback_fn)
666
          changed.append((instance.name, idx, "size", size))
667
        if es_flags[node_uuid]:
668
          if spindles is None:
669
            self.LogWarning("Disk %d of instance %s did not return valid"
670
                            " spindles information, ignoring", idx,
671
                            instance.name)
672
          elif disk.spindles is None or disk.spindles != spindles:
673
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
674
                         " correcting: recorded %s, actual %s",
675
                         idx, instance.name, disk.spindles, spindles)
676
            disk.spindles = spindles
677
            self.cfg.Update(instance, feedback_fn)
678
            changed.append((instance.name, idx, "spindles", disk.spindles))
679
        if self._EnsureChildSizes(disk):
680
          self.cfg.Update(instance, feedback_fn)
681
          changed.append((instance.name, idx, "size", disk.size))
682
    return changed
683

    
684

    
685
def _ValidateNetmask(cfg, netmask):
686
  """Checks if a netmask is valid.
687

688
  @type cfg: L{config.ConfigWriter}
689
  @param cfg: The cluster configuration
690
  @type netmask: int
691
  @param netmask: the netmask to be verified
692
  @raise errors.OpPrereqError: if the validation fails
693

694
  """
695
  ip_family = cfg.GetPrimaryIPFamily()
696
  try:
697
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
698
  except errors.ProgrammerError:
699
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
700
                               ip_family, errors.ECODE_INVAL)
701
  if not ipcls.ValidateNetmask(netmask):
702
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
703
                               (netmask), errors.ECODE_INVAL)
704

    
705

    
706
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
707
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
708
    file_disk_template):
709
  """Checks whether the given file-based storage directory is acceptable.
710

711
  Note: This function is public, because it is also used in bootstrap.py.
712

713
  @type logging_warn_fn: function
714
  @param logging_warn_fn: function which accepts a string and logs it
715
  @type file_storage_dir: string
716
  @param file_storage_dir: the directory to be used for file-based instances
717
  @type enabled_disk_templates: list of string
718
  @param enabled_disk_templates: the list of enabled disk templates
719
  @type file_disk_template: string
720
  @param file_disk_template: the file-based disk template for which the
721
      path should be checked
722

723
  """
724
  assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
725
            constants.ST_FILE, constants.ST_SHARED_FILE
726
         ))
727
  file_storage_enabled = file_disk_template in enabled_disk_templates
728
  if file_storage_dir is not None:
729
    if file_storage_dir == "":
730
      if file_storage_enabled:
731
        raise errors.OpPrereqError(
732
            "Unsetting the '%s' storage directory while having '%s' storage"
733
            " enabled is not permitted." %
734
            (file_disk_template, file_disk_template))
735
    else:
736
      if not file_storage_enabled:
737
        logging_warn_fn(
738
            "Specified a %s storage directory, although %s storage is not"
739
            " enabled." % (file_disk_template, file_disk_template))
740
  else:
741
    raise errors.ProgrammerError("Received %s storage dir with value"
742
                                 " 'None'." % file_disk_template)
743

    
744

    
745
def CheckFileStoragePathVsEnabledDiskTemplates(
746
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
747
  """Checks whether the given file storage directory is acceptable.
748

749
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
750

751
  """
752
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
753
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
754
      constants.DT_FILE)
755

    
756

    
757
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
758
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
759
  """Checks whether the given shared file storage directory is acceptable.
760

761
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
762

763
  """
764
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
765
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
766
      constants.DT_SHARED_FILE)
767

    
768

    
769
class LUClusterSetParams(LogicalUnit):
770
  """Change the parameters of the cluster.
771

772
  """
773
  HPATH = "cluster-modify"
774
  HTYPE = constants.HTYPE_CLUSTER
775
  REQ_BGL = False
776

    
777
  def CheckArguments(self):
778
    """Check parameters
779

780
    """
781
    if self.op.uid_pool:
782
      uidpool.CheckUidPool(self.op.uid_pool)
783

    
784
    if self.op.add_uids:
785
      uidpool.CheckUidPool(self.op.add_uids)
786

    
787
    if self.op.remove_uids:
788
      uidpool.CheckUidPool(self.op.remove_uids)
789

    
790
    if self.op.master_netmask is not None:
791
      _ValidateNetmask(self.cfg, self.op.master_netmask)
792

    
793
    if self.op.diskparams:
794
      for dt_params in self.op.diskparams.values():
795
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
796
      try:
797
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
798
        CheckDiskAccessModeValidity(self.op.diskparams)
799
      except errors.OpPrereqError, err:
800
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
801
                                   errors.ECODE_INVAL)
802

    
803
  def ExpandNames(self):
804
    # FIXME: in the future maybe other cluster params won't require checking on
805
    # all nodes to be modified.
806
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
807
    # resource locks the right thing, shouldn't it be the BGL instead?
808
    self.needed_locks = {
809
      locking.LEVEL_NODE: locking.ALL_SET,
810
      locking.LEVEL_INSTANCE: locking.ALL_SET,
811
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
812
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
813
    }
814
    self.share_locks = ShareAll()
815

    
816
  def BuildHooksEnv(self):
817
    """Build hooks env.
818

819
    """
820
    return {
821
      "OP_TARGET": self.cfg.GetClusterName(),
822
      "NEW_VG_NAME": self.op.vg_name,
823
      }
824

    
825
  def BuildHooksNodes(self):
826
    """Build hooks nodes.
827

828
    """
829
    mn = self.cfg.GetMasterNode()
830
    return ([mn], [mn])
831

    
832
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
833
                   new_enabled_disk_templates):
834
    """Check the consistency of the vg name on all nodes and in case it gets
835
       unset whether there are instances still using it.
836

837
    """
838
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
839
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
840
                                            new_enabled_disk_templates)
841
    current_vg_name = self.cfg.GetVGName()
842

    
843
    if self.op.vg_name == '':
844
      if lvm_is_enabled:
845
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
846
                                   " disk templates are or get enabled.")
847

    
848
    if self.op.vg_name is None:
849
      if current_vg_name is None and lvm_is_enabled:
850
        raise errors.OpPrereqError("Please specify a volume group when"
851
                                   " enabling lvm-based disk-templates.")
852

    
853
    if self.op.vg_name is not None and not self.op.vg_name:
854
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
855
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
856
                                   " instances exist", errors.ECODE_INVAL)
857

    
858
    if (self.op.vg_name is not None and lvm_is_enabled) or \
859
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
860
      self._CheckVgNameOnNodes(node_uuids)
861

    
862
  def _CheckVgNameOnNodes(self, node_uuids):
863
    """Check the status of the volume group on each node.
864

865
    """
866
    vglist = self.rpc.call_vg_list(node_uuids)
867
    for node_uuid in node_uuids:
868
      msg = vglist[node_uuid].fail_msg
869
      if msg:
870
        # ignoring down node
871
        self.LogWarning("Error while gathering data on node %s"
872
                        " (ignoring node): %s",
873
                        self.cfg.GetNodeName(node_uuid), msg)
874
        continue
875
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
876
                                            self.op.vg_name,
877
                                            constants.MIN_VG_SIZE)
878
      if vgstatus:
879
        raise errors.OpPrereqError("Error on node '%s': %s" %
880
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
881
                                   errors.ECODE_ENVIRON)
882

    
883
  @staticmethod
884
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
885
                                old_enabled_disk_templates):
886
    """Computes three sets of disk templates.
887

888
    @see: C{_GetDiskTemplateSets} for more details.
889

890
    """
891
    enabled_disk_templates = None
892
    new_enabled_disk_templates = []
893
    disabled_disk_templates = []
894
    if op_enabled_disk_templates:
895
      enabled_disk_templates = op_enabled_disk_templates
896
      new_enabled_disk_templates = \
897
        list(set(enabled_disk_templates)
898
             - set(old_enabled_disk_templates))
899
      disabled_disk_templates = \
900
        list(set(old_enabled_disk_templates)
901
             - set(enabled_disk_templates))
902
    else:
903
      enabled_disk_templates = old_enabled_disk_templates
904
    return (enabled_disk_templates, new_enabled_disk_templates,
905
            disabled_disk_templates)
906

    
907
  def _GetDiskTemplateSets(self, cluster):
908
    """Computes three sets of disk templates.
909

910
    The three sets are:
911
      - disk templates that will be enabled after this operation (no matter if
912
        they were enabled before or not)
913
      - disk templates that get enabled by this operation (thus haven't been
914
        enabled before.)
915
      - disk templates that get disabled by this operation
916

917
    """
918
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
919
                                          cluster.enabled_disk_templates)
920

    
921
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
922
    """Checks the ipolicy.
923

924
    @type cluster: C{objects.Cluster}
925
    @param cluster: the cluster's configuration
926
    @type enabled_disk_templates: list of string
927
    @param enabled_disk_templates: list of (possibly newly) enabled disk
928
      templates
929

930
    """
931
    # FIXME: write unit tests for this
932
    if self.op.ipolicy:
933
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
934
                                           group_policy=False)
935

    
936
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
937
                                  enabled_disk_templates)
938

    
939
      all_instances = self.cfg.GetAllInstancesInfo().values()
940
      violations = set()
941
      for group in self.cfg.GetAllNodeGroupsInfo().values():
942
        instances = frozenset([inst for inst in all_instances
943
                               if compat.any(nuuid in group.members
944
                                             for nuuid in inst.all_nodes)])
945
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
946
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
947
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
948
                                           self.cfg)
949
        if new:
950
          violations.update(new)
951

    
952
      if violations:
953
        self.LogWarning("After the ipolicy change the following instances"
954
                        " violate them: %s",
955
                        utils.CommaJoin(utils.NiceSort(violations)))
956
    else:
957
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
958
                                  enabled_disk_templates)
959

    
960
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
961
    """Checks whether the set DRBD helper actually exists on the nodes.
962

963
    @type drbd_helper: string
964
    @param drbd_helper: path of the drbd usermode helper binary
965
    @type node_uuids: list of strings
966
    @param node_uuids: list of node UUIDs to check for the helper
967

968
    """
969
    # checks given drbd helper on all nodes
970
    helpers = self.rpc.call_drbd_helper(node_uuids)
971
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
972
      if ninfo.offline:
973
        self.LogInfo("Not checking drbd helper on offline node %s",
974
                     ninfo.name)
975
        continue
976
      msg = helpers[ninfo.uuid].fail_msg
977
      if msg:
978
        raise errors.OpPrereqError("Error checking drbd helper on node"
979
                                   " '%s': %s" % (ninfo.name, msg),
980
                                   errors.ECODE_ENVIRON)
981
      node_helper = helpers[ninfo.uuid].payload
982
      if node_helper != drbd_helper:
983
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
984
                                   (ninfo.name, node_helper),
985
                                   errors.ECODE_ENVIRON)
986

    
987
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
988
    """Check the DRBD usermode helper.
989

990
    @type node_uuids: list of strings
991
    @param node_uuids: a list of nodes' UUIDs
992
    @type drbd_enabled: boolean
993
    @param drbd_enabled: whether DRBD will be enabled after this operation
994
      (no matter if it was disabled before or not)
995
    @type drbd_gets_enabled: boolen
996
    @param drbd_gets_enabled: true if DRBD was disabled before this
997
      operation, but will be enabled afterwards
998

999
    """
1000
    if self.op.drbd_helper == '':
1001
      if drbd_enabled:
1002
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1003
                                   " DRBD is enabled.")
1004
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
1005
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1006
                                   " drbd-based instances exist",
1007
                                   errors.ECODE_INVAL)
1008

    
1009
    else:
1010
      if self.op.drbd_helper is not None and drbd_enabled:
1011
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
1012
      else:
1013
        if drbd_gets_enabled:
1014
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
1015
          if current_drbd_helper is not None:
1016
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
1017
          else:
1018
            raise errors.OpPrereqError("Cannot enable DRBD without a"
1019
                                       " DRBD usermode helper set.")
1020

    
1021
  def _CheckInstancesOfDisabledDiskTemplates(
1022
      self, disabled_disk_templates):
1023
    """Check whether we try to disable a disk template that is in use.
1024

1025
    @type disabled_disk_templates: list of string
1026
    @param disabled_disk_templates: list of disk templates that are going to
1027
      be disabled by this operation
1028

1029
    """
1030
    for disk_template in disabled_disk_templates:
1031
      if self.cfg.HasAnyDiskOfType(disk_template):
1032
        raise errors.OpPrereqError(
1033
            "Cannot disable disk template '%s', because there is at least one"
1034
            " instance using it." % disk_template)
1035

    
1036
  def CheckPrereq(self):
1037
    """Check prerequisites.
1038

1039
    This checks whether the given params don't conflict and
1040
    if the given volume group is valid.
1041

1042
    """
1043
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1044
    self.cluster = cluster = self.cfg.GetClusterInfo()
1045

    
1046
    vm_capable_node_uuids = [node.uuid
1047
                             for node in self.cfg.GetAllNodesInfo().values()
1048
                             if node.uuid in node_uuids and node.vm_capable]
1049

    
1050
    (enabled_disk_templates, new_enabled_disk_templates,
1051
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
1052
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
1053

    
1054
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
1055
                      new_enabled_disk_templates)
1056

    
1057
    if self.op.file_storage_dir is not None:
1058
      CheckFileStoragePathVsEnabledDiskTemplates(
1059
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
1060

    
1061
    if self.op.shared_file_storage_dir is not None:
1062
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
1063
          self.LogWarning, self.op.shared_file_storage_dir,
1064
          enabled_disk_templates)
1065

    
1066
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1067
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1068
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1069

    
1070
    # validate params changes
1071
    if self.op.beparams:
1072
      objects.UpgradeBeParams(self.op.beparams)
1073
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1074
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1075

    
1076
    if self.op.ndparams:
1077
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1078
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1079

    
1080
      # TODO: we need a more general way to handle resetting
1081
      # cluster-level parameters to default values
1082
      if self.new_ndparams["oob_program"] == "":
1083
        self.new_ndparams["oob_program"] = \
1084
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1085

    
1086
    if self.op.hv_state:
1087
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1088
                                           self.cluster.hv_state_static)
1089
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1090
                               for hv, values in new_hv_state.items())
1091

    
1092
    if self.op.disk_state:
1093
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1094
                                               self.cluster.disk_state_static)
1095
      self.new_disk_state = \
1096
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1097
                            for name, values in svalues.items()))
1098
             for storage, svalues in new_disk_state.items())
1099

    
1100
    self._CheckIpolicy(cluster, enabled_disk_templates)
1101

    
1102
    if self.op.nicparams:
1103
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1104
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1105
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1106
      nic_errors = []
1107

    
1108
      # check all instances for consistency
1109
      for instance in self.cfg.GetAllInstancesInfo().values():
1110
        for nic_idx, nic in enumerate(instance.nics):
1111
          params_copy = copy.deepcopy(nic.nicparams)
1112
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1113

    
1114
          # check parameter syntax
1115
          try:
1116
            objects.NIC.CheckParameterSyntax(params_filled)
1117
          except errors.ConfigurationError, err:
1118
            nic_errors.append("Instance %s, nic/%d: %s" %
1119
                              (instance.name, nic_idx, err))
1120

    
1121
          # if we're moving instances to routed, check that they have an ip
1122
          target_mode = params_filled[constants.NIC_MODE]
1123
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1124
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1125
                              " address" % (instance.name, nic_idx))
1126
      if nic_errors:
1127
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1128
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1129

    
1130
    # hypervisor list/parameters
1131
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1132
    if self.op.hvparams:
1133
      for hv_name, hv_dict in self.op.hvparams.items():
1134
        if hv_name not in self.new_hvparams:
1135
          self.new_hvparams[hv_name] = hv_dict
1136
        else:
1137
          self.new_hvparams[hv_name].update(hv_dict)
1138

    
1139
    # disk template parameters
1140
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1141
    if self.op.diskparams:
1142
      for dt_name, dt_params in self.op.diskparams.items():
1143
        if dt_name not in self.new_diskparams:
1144
          self.new_diskparams[dt_name] = dt_params
1145
        else:
1146
          self.new_diskparams[dt_name].update(dt_params)
1147
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1148

    
1149
    # os hypervisor parameters
1150
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1151
    if self.op.os_hvp:
1152
      for os_name, hvs in self.op.os_hvp.items():
1153
        if os_name not in self.new_os_hvp:
1154
          self.new_os_hvp[os_name] = hvs
1155
        else:
1156
          for hv_name, hv_dict in hvs.items():
1157
            if hv_dict is None:
1158
              # Delete if it exists
1159
              self.new_os_hvp[os_name].pop(hv_name, None)
1160
            elif hv_name not in self.new_os_hvp[os_name]:
1161
              self.new_os_hvp[os_name][hv_name] = hv_dict
1162
            else:
1163
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1164

    
1165
    # os parameters
1166
    self._BuildOSParams(cluster)
1167

    
1168
    # changes to the hypervisor list
1169
    if self.op.enabled_hypervisors is not None:
1170
      self.hv_list = self.op.enabled_hypervisors
1171
      for hv in self.hv_list:
1172
        # if the hypervisor doesn't already exist in the cluster
1173
        # hvparams, we initialize it to empty, and then (in both
1174
        # cases) we make sure to fill the defaults, as we might not
1175
        # have a complete defaults list if the hypervisor wasn't
1176
        # enabled before
1177
        if hv not in new_hvp:
1178
          new_hvp[hv] = {}
1179
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1180
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1181
    else:
1182
      self.hv_list = cluster.enabled_hypervisors
1183

    
1184
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1185
      # either the enabled list has changed, or the parameters have, validate
1186
      for hv_name, hv_params in self.new_hvparams.items():
1187
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1188
            (self.op.enabled_hypervisors and
1189
             hv_name in self.op.enabled_hypervisors)):
1190
          # either this is a new hypervisor, or its parameters have changed
1191
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1192
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1193
          hv_class.CheckParameterSyntax(hv_params)
1194
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1195

    
1196
    self._CheckDiskTemplateConsistency()
1197

    
1198
    if self.op.os_hvp:
1199
      # no need to check any newly-enabled hypervisors, since the
1200
      # defaults have already been checked in the above code-block
1201
      for os_name, os_hvp in self.new_os_hvp.items():
1202
        for hv_name, hv_params in os_hvp.items():
1203
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1204
          # we need to fill in the new os_hvp on top of the actual hv_p
1205
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1206
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1207
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1208
          hv_class.CheckParameterSyntax(new_osp)
1209
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1210

    
1211
    if self.op.default_iallocator:
1212
      alloc_script = utils.FindFile(self.op.default_iallocator,
1213
                                    constants.IALLOCATOR_SEARCH_PATH,
1214
                                    os.path.isfile)
1215
      if alloc_script is None:
1216
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1217
                                   " specified" % self.op.default_iallocator,
1218
                                   errors.ECODE_INVAL)
1219

    
1220
  def _BuildOSParams(self, cluster):
1221
    "Calculate the new OS parameters for this operation."
1222

    
1223
    def _GetNewParams(source, new_params):
1224
      "Wrapper around GetUpdatedParams."
1225
      if new_params is None:
1226
        return source
1227
      result = objects.FillDict(source, {}) # deep copy of source
1228
      for os_name in new_params:
1229
        result[os_name] = GetUpdatedParams(result.get(os_name, {}),
1230
                                           new_params[os_name],
1231
                                           use_none=True)
1232
        if not result[os_name]:
1233
          del result[os_name] # we removed all parameters
1234
      return result
1235

    
1236
    self.new_osp = _GetNewParams(cluster.osparams,
1237
                                 self.op.osparams)
1238
    self.new_osp_private = _GetNewParams(cluster.osparams_private_cluster,
1239
                                         self.op.osparams_private_cluster)
1240

    
1241
    # Remove os validity check
1242
    changed_oses = (set(self.new_osp.keys()) | set(self.new_osp_private.keys()))
1243
    for os_name in changed_oses:
1244
      os_params = cluster.SimpleFillOS(
1245
        os_name,
1246
        self.new_osp.get(os_name, {}),
1247
        os_params_private=self.new_osp_private.get(os_name, {})
1248
      )
1249
      # check the parameter validity (remote check)
1250
      CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1251
                    os_name, os_params)
1252

    
1253
  def _CheckDiskTemplateConsistency(self):
1254
    """Check whether the disk templates that are going to be disabled
1255
       are still in use by some instances.
1256

1257
    """
1258
    if self.op.enabled_disk_templates:
1259
      cluster = self.cfg.GetClusterInfo()
1260
      instances = self.cfg.GetAllInstancesInfo()
1261

    
1262
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1263
        - set(self.op.enabled_disk_templates)
1264
      for instance in instances.itervalues():
1265
        if instance.disk_template in disk_templates_to_remove:
1266
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1267
                                     " because instance '%s' is using it." %
1268
                                     (instance.disk_template, instance.name))
1269

    
1270
  def _SetVgName(self, feedback_fn):
1271
    """Determines and sets the new volume group name.
1272

1273
    """
1274
    if self.op.vg_name is not None:
1275
      new_volume = self.op.vg_name
1276
      if not new_volume:
1277
        new_volume = None
1278
      if new_volume != self.cfg.GetVGName():
1279
        self.cfg.SetVGName(new_volume)
1280
      else:
1281
        feedback_fn("Cluster LVM configuration already in desired"
1282
                    " state, not changing")
1283

    
1284
  def _SetFileStorageDir(self, feedback_fn):
1285
    """Set the file storage directory.
1286

1287
    """
1288
    if self.op.file_storage_dir is not None:
1289
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1290
        feedback_fn("Global file storage dir already set to value '%s'"
1291
                    % self.cluster.file_storage_dir)
1292
      else:
1293
        self.cluster.file_storage_dir = self.op.file_storage_dir
1294

    
1295
  def _SetDrbdHelper(self, feedback_fn):
1296
    """Set the DRBD usermode helper.
1297

1298
    """
1299
    if self.op.drbd_helper is not None:
1300
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1301
        feedback_fn("Note that you specified a drbd user helper, but did not"
1302
                    " enable the drbd disk template.")
1303
      new_helper = self.op.drbd_helper
1304
      if not new_helper:
1305
        new_helper = None
1306
      if new_helper != self.cfg.GetDRBDHelper():
1307
        self.cfg.SetDRBDHelper(new_helper)
1308
      else:
1309
        feedback_fn("Cluster DRBD helper already in desired state,"
1310
                    " not changing")
1311

    
1312
  def Exec(self, feedback_fn):
1313
    """Change the parameters of the cluster.
1314

1315
    """
1316
    if self.op.enabled_disk_templates:
1317
      self.cluster.enabled_disk_templates = \
1318
        list(self.op.enabled_disk_templates)
1319

    
1320
    self._SetVgName(feedback_fn)
1321
    self._SetFileStorageDir(feedback_fn)
1322
    self._SetDrbdHelper(feedback_fn)
1323

    
1324
    if self.op.hvparams:
1325
      self.cluster.hvparams = self.new_hvparams
1326
    if self.op.os_hvp:
1327
      self.cluster.os_hvp = self.new_os_hvp
1328
    if self.op.enabled_hypervisors is not None:
1329
      self.cluster.hvparams = self.new_hvparams
1330
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1331
    if self.op.beparams:
1332
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1333
    if self.op.nicparams:
1334
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1335
    if self.op.ipolicy:
1336
      self.cluster.ipolicy = self.new_ipolicy
1337
    if self.op.osparams:
1338
      self.cluster.osparams = self.new_osp
1339
    if self.op.osparams_private_cluster:
1340
      self.cluster.osparams_private_cluster = self.new_osp_private
1341
    if self.op.ndparams:
1342
      self.cluster.ndparams = self.new_ndparams
1343
    if self.op.diskparams:
1344
      self.cluster.diskparams = self.new_diskparams
1345
    if self.op.hv_state:
1346
      self.cluster.hv_state_static = self.new_hv_state
1347
    if self.op.disk_state:
1348
      self.cluster.disk_state_static = self.new_disk_state
1349

    
1350
    if self.op.candidate_pool_size is not None:
1351
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1352
      # we need to update the pool size here, otherwise the save will fail
1353
      AdjustCandidatePool(self, [], feedback_fn)
1354

    
1355
    if self.op.max_running_jobs is not None:
1356
      self.cluster.max_running_jobs = self.op.max_running_jobs
1357

    
1358
    if self.op.maintain_node_health is not None:
1359
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1360
        feedback_fn("Note: CONFD was disabled at build time, node health"
1361
                    " maintenance is not useful (still enabling it)")
1362
      self.cluster.maintain_node_health = self.op.maintain_node_health
1363

    
1364
    if self.op.modify_etc_hosts is not None:
1365
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1366

    
1367
    if self.op.prealloc_wipe_disks is not None:
1368
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1369

    
1370
    if self.op.add_uids is not None:
1371
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1372

    
1373
    if self.op.remove_uids is not None:
1374
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1375

    
1376
    if self.op.uid_pool is not None:
1377
      self.cluster.uid_pool = self.op.uid_pool
1378

    
1379
    if self.op.default_iallocator is not None:
1380
      self.cluster.default_iallocator = self.op.default_iallocator
1381

    
1382
    if self.op.default_iallocator_params is not None:
1383
      self.cluster.default_iallocator_params = self.op.default_iallocator_params
1384

    
1385
    if self.op.reserved_lvs is not None:
1386
      self.cluster.reserved_lvs = self.op.reserved_lvs
1387

    
1388
    if self.op.use_external_mip_script is not None:
1389
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1390

    
1391
    def helper_os(aname, mods, desc):
1392
      desc += " OS list"
1393
      lst = getattr(self.cluster, aname)
1394
      for key, val in mods:
1395
        if key == constants.DDM_ADD:
1396
          if val in lst:
1397
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1398
          else:
1399
            lst.append(val)
1400
        elif key == constants.DDM_REMOVE:
1401
          if val in lst:
1402
            lst.remove(val)
1403
          else:
1404
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1405
        else:
1406
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1407

    
1408
    if self.op.hidden_os:
1409
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1410

    
1411
    if self.op.blacklisted_os:
1412
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1413

    
1414
    if self.op.master_netdev:
1415
      master_params = self.cfg.GetMasterNetworkParameters()
1416
      ems = self.cfg.GetUseExternalMipScript()
1417
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1418
                  self.cluster.master_netdev)
1419
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1420
                                                       master_params, ems)
1421
      if not self.op.force:
1422
        result.Raise("Could not disable the master ip")
1423
      else:
1424
        if result.fail_msg:
1425
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1426
                 result.fail_msg)
1427
          feedback_fn(msg)
1428
      feedback_fn("Changing master_netdev from %s to %s" %
1429
                  (master_params.netdev, self.op.master_netdev))
1430
      self.cluster.master_netdev = self.op.master_netdev
1431

    
1432
    if self.op.master_netmask:
1433
      master_params = self.cfg.GetMasterNetworkParameters()
1434
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1435
      result = self.rpc.call_node_change_master_netmask(
1436
                 master_params.uuid, master_params.netmask,
1437
                 self.op.master_netmask, master_params.ip,
1438
                 master_params.netdev)
1439
      result.Warn("Could not change the master IP netmask", feedback_fn)
1440
      self.cluster.master_netmask = self.op.master_netmask
1441

    
1442
    self.cfg.Update(self.cluster, feedback_fn)
1443

    
1444
    if self.op.master_netdev:
1445
      master_params = self.cfg.GetMasterNetworkParameters()
1446
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1447
                  self.op.master_netdev)
1448
      ems = self.cfg.GetUseExternalMipScript()
1449
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1450
                                                     master_params, ems)
1451
      result.Warn("Could not re-enable the master ip on the master,"
1452
                  " please restart manually", self.LogWarning)
1453

    
1454

    
1455
class LUClusterVerify(NoHooksLU):
1456
  """Submits all jobs necessary to verify the cluster.
1457

1458
  """
1459
  REQ_BGL = False
1460

    
1461
  def ExpandNames(self):
1462
    self.needed_locks = {}
1463

    
1464
  def Exec(self, feedback_fn):
1465
    jobs = []
1466

    
1467
    if self.op.group_name:
1468
      groups = [self.op.group_name]
1469
      depends_fn = lambda: None
1470
    else:
1471
      groups = self.cfg.GetNodeGroupList()
1472

    
1473
      # Verify global configuration
1474
      jobs.append([
1475
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1476
        ])
1477

    
1478
      # Always depend on global verification
1479
      depends_fn = lambda: [(-len(jobs), [])]
1480

    
1481
    jobs.extend(
1482
      [opcodes.OpClusterVerifyGroup(group_name=group,
1483
                                    ignore_errors=self.op.ignore_errors,
1484
                                    depends=depends_fn())]
1485
      for group in groups)
1486

    
1487
    # Fix up all parameters
1488
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1489
      op.debug_simulate_errors = self.op.debug_simulate_errors
1490
      op.verbose = self.op.verbose
1491
      op.error_codes = self.op.error_codes
1492
      try:
1493
        op.skip_checks = self.op.skip_checks
1494
      except AttributeError:
1495
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1496

    
1497
    return ResultWithJobs(jobs)
1498

    
1499

    
1500
class _VerifyErrors(object):
1501
  """Mix-in for cluster/group verify LUs.
1502

1503
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1504
  self.op and self._feedback_fn to be available.)
1505

1506
  """
1507

    
1508
  ETYPE_FIELD = "code"
1509
  ETYPE_ERROR = constants.CV_ERROR
1510
  ETYPE_WARNING = constants.CV_WARNING
1511

    
1512
  def _Error(self, ecode, item, msg, *args, **kwargs):
1513
    """Format an error message.
1514

1515
    Based on the opcode's error_codes parameter, either format a
1516
    parseable error code, or a simpler error string.
1517

1518
    This must be called only from Exec and functions called from Exec.
1519

1520
    """
1521
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1522
    itype, etxt, _ = ecode
1523
    # If the error code is in the list of ignored errors, demote the error to a
1524
    # warning
1525
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1526
      ltype = self.ETYPE_WARNING
1527
    # first complete the msg
1528
    if args:
1529
      msg = msg % args
1530
    # then format the whole message
1531
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1532
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1533
    else:
1534
      if item:
1535
        item = " " + item
1536
      else:
1537
        item = ""
1538
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1539
    # and finally report it via the feedback_fn
1540
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1541
    # do not mark the operation as failed for WARN cases only
1542
    if ltype == self.ETYPE_ERROR:
1543
      self.bad = True
1544

    
1545
  def _ErrorIf(self, cond, *args, **kwargs):
1546
    """Log an error message if the passed condition is True.
1547

1548
    """
1549
    if (bool(cond)
1550
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1551
      self._Error(*args, **kwargs)
1552

    
1553

    
1554
def _GetAllHypervisorParameters(cluster, instances):
1555
  """Compute the set of all hypervisor parameters.
1556

1557
  @type cluster: L{objects.Cluster}
1558
  @param cluster: the cluster object
1559
  @param instances: list of L{objects.Instance}
1560
  @param instances: additional instances from which to obtain parameters
1561
  @rtype: list of (origin, hypervisor, parameters)
1562
  @return: a list with all parameters found, indicating the hypervisor they
1563
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1564

1565
  """
1566
  hvp_data = []
1567

    
1568
  for hv_name in cluster.enabled_hypervisors:
1569
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1570

    
1571
  for os_name, os_hvp in cluster.os_hvp.items():
1572
    for hv_name, hv_params in os_hvp.items():
1573
      if hv_params:
1574
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1575
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1576

    
1577
  # TODO: collapse identical parameter values in a single one
1578
  for instance in instances:
1579
    if instance.hvparams:
1580
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1581
                       cluster.FillHV(instance)))
1582

    
1583
  return hvp_data
1584

    
1585

    
1586
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1587
  """Verifies the cluster config.
1588

1589
  """
1590
  REQ_BGL = False
1591

    
1592
  def _VerifyHVP(self, hvp_data):
1593
    """Verifies locally the syntax of the hypervisor parameters.
1594

1595
    """
1596
    for item, hv_name, hv_params in hvp_data:
1597
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1598
             (item, hv_name))
1599
      try:
1600
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1601
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1602
        hv_class.CheckParameterSyntax(hv_params)
1603
      except errors.GenericError, err:
1604
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1605

    
1606
  def ExpandNames(self):
1607
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1608
    self.share_locks = ShareAll()
1609

    
1610
  def CheckPrereq(self):
1611
    """Check prerequisites.
1612

1613
    """
1614
    # Retrieve all information
1615
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1616
    self.all_node_info = self.cfg.GetAllNodesInfo()
1617
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1618

    
1619
  def Exec(self, feedback_fn):
1620
    """Verify integrity of cluster, performing various test on nodes.
1621

1622
    """
1623
    self.bad = False
1624
    self._feedback_fn = feedback_fn
1625

    
1626
    feedback_fn("* Verifying cluster config")
1627

    
1628
    for msg in self.cfg.VerifyConfig():
1629
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1630

    
1631
    feedback_fn("* Verifying cluster certificate files")
1632

    
1633
    for cert_filename in pathutils.ALL_CERT_FILES:
1634
      (errcode, msg) = utils.VerifyCertificate(cert_filename)
1635
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1636

    
1637
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1638
                                    pathutils.NODED_CERT_FILE),
1639
                  constants.CV_ECLUSTERCERT,
1640
                  None,
1641
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1642
                    constants.LUXID_USER + " user")
1643

    
1644
    feedback_fn("* Verifying hypervisor parameters")
1645

    
1646
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1647
                                                self.all_inst_info.values()))
1648

    
1649
    feedback_fn("* Verifying all nodes belong to an existing group")
1650

    
1651
    # We do this verification here because, should this bogus circumstance
1652
    # occur, it would never be caught by VerifyGroup, which only acts on
1653
    # nodes/instances reachable from existing node groups.
1654

    
1655
    dangling_nodes = set(node for node in self.all_node_info.values()
1656
                         if node.group not in self.all_group_info)
1657

    
1658
    dangling_instances = {}
1659
    no_node_instances = []
1660

    
1661
    for inst in self.all_inst_info.values():
1662
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1663
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1664
      elif inst.primary_node not in self.all_node_info:
1665
        no_node_instances.append(inst)
1666

    
1667
    pretty_dangling = [
1668
        "%s (%s)" %
1669
        (node.name,
1670
         utils.CommaJoin(inst.name for
1671
                         inst in dangling_instances.get(node.uuid, [])))
1672
        for node in dangling_nodes]
1673

    
1674
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1675
                  None,
1676
                  "the following nodes (and their instances) belong to a non"
1677
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1678

    
1679
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1680
                  None,
1681
                  "the following instances have a non-existing primary-node:"
1682
                  " %s", utils.CommaJoin(inst.name for
1683
                                         inst in no_node_instances))
1684

    
1685
    return not self.bad
1686

    
1687

    
1688
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1689
  """Verifies the status of a node group.
1690

1691
  """
1692
  HPATH = "cluster-verify"
1693
  HTYPE = constants.HTYPE_CLUSTER
1694
  REQ_BGL = False
1695

    
1696
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1697

    
1698
  class NodeImage(object):
1699
    """A class representing the logical and physical status of a node.
1700

1701
    @type uuid: string
1702
    @ivar uuid: the node UUID to which this object refers
1703
    @ivar volumes: a structure as returned from
1704
        L{ganeti.backend.GetVolumeList} (runtime)
1705
    @ivar instances: a list of running instances (runtime)
1706
    @ivar pinst: list of configured primary instances (config)
1707
    @ivar sinst: list of configured secondary instances (config)
1708
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1709
        instances for which this node is secondary (config)
1710
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1711
    @ivar dfree: free disk, as reported by the node (runtime)
1712
    @ivar offline: the offline status (config)
1713
    @type rpc_fail: boolean
1714
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1715
        not whether the individual keys were correct) (runtime)
1716
    @type lvm_fail: boolean
1717
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1718
    @type hyp_fail: boolean
1719
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1720
    @type ghost: boolean
1721
    @ivar ghost: whether this is a known node or not (config)
1722
    @type os_fail: boolean
1723
    @ivar os_fail: whether the RPC call didn't return valid OS data
1724
    @type oslist: list
1725
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1726
    @type vm_capable: boolean
1727
    @ivar vm_capable: whether the node can host instances
1728
    @type pv_min: float
1729
    @ivar pv_min: size in MiB of the smallest PVs
1730
    @type pv_max: float
1731
    @ivar pv_max: size in MiB of the biggest PVs
1732

1733
    """
1734
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1735
      self.uuid = uuid
1736
      self.volumes = {}
1737
      self.instances = []
1738
      self.pinst = []
1739
      self.sinst = []
1740
      self.sbp = {}
1741
      self.mfree = 0
1742
      self.dfree = 0
1743
      self.offline = offline
1744
      self.vm_capable = vm_capable
1745
      self.rpc_fail = False
1746
      self.lvm_fail = False
1747
      self.hyp_fail = False
1748
      self.ghost = False
1749
      self.os_fail = False
1750
      self.oslist = {}
1751
      self.pv_min = None
1752
      self.pv_max = None
1753

    
1754
  def ExpandNames(self):
1755
    # This raises errors.OpPrereqError on its own:
1756
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1757

    
1758
    # Get instances in node group; this is unsafe and needs verification later
1759
    inst_uuids = \
1760
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1761

    
1762
    self.needed_locks = {
1763
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1764
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1765
      locking.LEVEL_NODE: [],
1766

    
1767
      # This opcode is run by watcher every five minutes and acquires all nodes
1768
      # for a group. It doesn't run for a long time, so it's better to acquire
1769
      # the node allocation lock as well.
1770
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1771
      }
1772

    
1773
    self.share_locks = ShareAll()
1774

    
1775
  def DeclareLocks(self, level):
1776
    if level == locking.LEVEL_NODE:
1777
      # Get members of node group; this is unsafe and needs verification later
1778
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1779

    
1780
      # In Exec(), we warn about mirrored instances that have primary and
1781
      # secondary living in separate node groups. To fully verify that
1782
      # volumes for these instances are healthy, we will need to do an
1783
      # extra call to their secondaries. We ensure here those nodes will
1784
      # be locked.
1785
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1786
        # Important: access only the instances whose lock is owned
1787
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1788
        if instance.disk_template in constants.DTS_INT_MIRROR:
1789
          nodes.update(instance.secondary_nodes)
1790

    
1791
      self.needed_locks[locking.LEVEL_NODE] = nodes
1792

    
1793
  def CheckPrereq(self):
1794
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1795
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1796

    
1797
    group_node_uuids = set(self.group_info.members)
1798
    group_inst_uuids = \
1799
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1800

    
1801
    unlocked_node_uuids = \
1802
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1803

    
1804
    unlocked_inst_uuids = \
1805
        group_inst_uuids.difference(
1806
          [self.cfg.GetInstanceInfoByName(name).uuid
1807
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1808

    
1809
    if unlocked_node_uuids:
1810
      raise errors.OpPrereqError(
1811
        "Missing lock for nodes: %s" %
1812
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1813
        errors.ECODE_STATE)
1814

    
1815
    if unlocked_inst_uuids:
1816
      raise errors.OpPrereqError(
1817
        "Missing lock for instances: %s" %
1818
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1819
        errors.ECODE_STATE)
1820

    
1821
    self.all_node_info = self.cfg.GetAllNodesInfo()
1822
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1823

    
1824
    self.my_node_uuids = group_node_uuids
1825
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1826
                             for node_uuid in group_node_uuids)
1827

    
1828
    self.my_inst_uuids = group_inst_uuids
1829
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1830
                             for inst_uuid in group_inst_uuids)
1831

    
1832
    # We detect here the nodes that will need the extra RPC calls for verifying
1833
    # split LV volumes; they should be locked.
1834
    extra_lv_nodes = set()
1835

    
1836
    for inst in self.my_inst_info.values():
1837
      if inst.disk_template in constants.DTS_INT_MIRROR:
1838
        for nuuid in inst.all_nodes:
1839
          if self.all_node_info[nuuid].group != self.group_uuid:
1840
            extra_lv_nodes.add(nuuid)
1841

    
1842
    unlocked_lv_nodes = \
1843
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1844

    
1845
    if unlocked_lv_nodes:
1846
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1847
                                 utils.CommaJoin(unlocked_lv_nodes),
1848
                                 errors.ECODE_STATE)
1849
    self.extra_lv_nodes = list(extra_lv_nodes)
1850

    
1851
  def _VerifyNode(self, ninfo, nresult):
1852
    """Perform some basic validation on data returned from a node.
1853

1854
      - check the result data structure is well formed and has all the
1855
        mandatory fields
1856
      - check ganeti version
1857

1858
    @type ninfo: L{objects.Node}
1859
    @param ninfo: the node to check
1860
    @param nresult: the results from the node
1861
    @rtype: boolean
1862
    @return: whether overall this call was successful (and we can expect
1863
         reasonable values in the respose)
1864

1865
    """
1866
    # main result, nresult should be a non-empty dict
1867
    test = not nresult or not isinstance(nresult, dict)
1868
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1869
                  "unable to verify node: no data returned")
1870
    if test:
1871
      return False
1872

    
1873
    # compares ganeti version
1874
    local_version = constants.PROTOCOL_VERSION
1875
    remote_version = nresult.get("version", None)
1876
    test = not (remote_version and
1877
                isinstance(remote_version, (list, tuple)) and
1878
                len(remote_version) == 2)
1879
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1880
                  "connection to node returned invalid data")
1881
    if test:
1882
      return False
1883

    
1884
    test = local_version != remote_version[0]
1885
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1886
                  "incompatible protocol versions: master %s,"
1887
                  " node %s", local_version, remote_version[0])
1888
    if test:
1889
      return False
1890

    
1891
    # node seems compatible, we can actually try to look into its results
1892

    
1893
    # full package version
1894
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1895
                  constants.CV_ENODEVERSION, ninfo.name,
1896
                  "software version mismatch: master %s, node %s",
1897
                  constants.RELEASE_VERSION, remote_version[1],
1898
                  code=self.ETYPE_WARNING)
1899

    
1900
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1901
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1902
      for hv_name, hv_result in hyp_result.iteritems():
1903
        test = hv_result is not None
1904
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1905
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1906

    
1907
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1908
    if ninfo.vm_capable and isinstance(hvp_result, list):
1909
      for item, hv_name, hv_result in hvp_result:
1910
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1911
                      "hypervisor %s parameter verify failure (source %s): %s",
1912
                      hv_name, item, hv_result)
1913

    
1914
    test = nresult.get(constants.NV_NODESETUP,
1915
                       ["Missing NODESETUP results"])
1916
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1917
                  "node setup error: %s", "; ".join(test))
1918

    
1919
    return True
1920

    
1921
  def _VerifyNodeTime(self, ninfo, nresult,
1922
                      nvinfo_starttime, nvinfo_endtime):
1923
    """Check the node time.
1924

1925
    @type ninfo: L{objects.Node}
1926
    @param ninfo: the node to check
1927
    @param nresult: the remote results for the node
1928
    @param nvinfo_starttime: the start time of the RPC call
1929
    @param nvinfo_endtime: the end time of the RPC call
1930

1931
    """
1932
    ntime = nresult.get(constants.NV_TIME, None)
1933
    try:
1934
      ntime_merged = utils.MergeTime(ntime)
1935
    except (ValueError, TypeError):
1936
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1937
                    "Node returned invalid time")
1938
      return
1939

    
1940
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1941
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1942
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1943
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1944
    else:
1945
      ntime_diff = None
1946

    
1947
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1948
                  "Node time diverges by at least %s from master node time",
1949
                  ntime_diff)
1950

    
1951
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1952
    """Check the node LVM results and update info for cross-node checks.
1953

1954
    @type ninfo: L{objects.Node}
1955
    @param ninfo: the node to check
1956
    @param nresult: the remote results for the node
1957
    @param vg_name: the configured VG name
1958
    @type nimg: L{NodeImage}
1959
    @param nimg: node image
1960

1961
    """
1962
    if vg_name is None:
1963
      return
1964

    
1965
    # checks vg existence and size > 20G
1966
    vglist = nresult.get(constants.NV_VGLIST, None)
1967
    test = not vglist
1968
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1969
                  "unable to check volume groups")
1970
    if not test:
1971
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1972
                                            constants.MIN_VG_SIZE)
1973
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1974

    
1975
    # Check PVs
1976
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1977
    for em in errmsgs:
1978
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1979
    if pvminmax is not None:
1980
      (nimg.pv_min, nimg.pv_max) = pvminmax
1981

    
1982
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1983
    """Check cross-node DRBD version consistency.
1984

1985
    @type node_verify_infos: dict
1986
    @param node_verify_infos: infos about nodes as returned from the
1987
      node_verify call.
1988

1989
    """
1990
    node_versions = {}
1991
    for node_uuid, ndata in node_verify_infos.items():
1992
      nresult = ndata.payload
1993
      if nresult:
1994
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1995
        node_versions[node_uuid] = version
1996

    
1997
    if len(set(node_versions.values())) > 1:
1998
      for node_uuid, version in sorted(node_versions.items()):
1999
        msg = "DRBD version mismatch: %s" % version
2000
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
2001
                    code=self.ETYPE_WARNING)
2002

    
2003
  def _VerifyGroupLVM(self, node_image, vg_name):
2004
    """Check cross-node consistency in LVM.
2005

2006
    @type node_image: dict
2007
    @param node_image: info about nodes, mapping from node to names to
2008
      L{NodeImage} objects
2009
    @param vg_name: the configured VG name
2010

2011
    """
2012
    if vg_name is None:
2013
      return
2014

    
2015
    # Only exclusive storage needs this kind of checks
2016
    if not self._exclusive_storage:
2017
      return
2018

    
2019
    # exclusive_storage wants all PVs to have the same size (approximately),
2020
    # if the smallest and the biggest ones are okay, everything is fine.
2021
    # pv_min is None iff pv_max is None
2022
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
2023
    if not vals:
2024
      return
2025
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
2026
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
2027
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
2028
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
2029
                  "PV sizes differ too much in the group; smallest (%s MB) is"
2030
                  " on %s, biggest (%s MB) is on %s",
2031
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
2032
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
2033

    
2034
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
2035
    """Check the node bridges.
2036

2037
    @type ninfo: L{objects.Node}
2038
    @param ninfo: the node to check
2039
    @param nresult: the remote results for the node
2040
    @param bridges: the expected list of bridges
2041

2042
    """
2043
    if not bridges:
2044
      return
2045

    
2046
    missing = nresult.get(constants.NV_BRIDGES, None)
2047
    test = not isinstance(missing, list)
2048
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2049
                  "did not return valid bridge information")
2050
    if not test:
2051
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
2052
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
2053

    
2054
  def _VerifyNodeUserScripts(self, ninfo, nresult):
2055
    """Check the results of user scripts presence and executability on the node
2056

2057
    @type ninfo: L{objects.Node}
2058
    @param ninfo: the node to check
2059
    @param nresult: the remote results for the node
2060

2061
    """
2062
    test = not constants.NV_USERSCRIPTS in nresult
2063
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2064
                  "did not return user scripts information")
2065

    
2066
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2067
    if not test:
2068
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2069
                    "user scripts not present or not executable: %s" %
2070
                    utils.CommaJoin(sorted(broken_scripts)))
2071

    
2072
  def _VerifyNodeNetwork(self, ninfo, nresult):
2073
    """Check the node network connectivity results.
2074

2075
    @type ninfo: L{objects.Node}
2076
    @param ninfo: the node to check
2077
    @param nresult: the remote results for the node
2078

2079
    """
2080
    test = constants.NV_NODELIST not in nresult
2081
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2082
                  "node hasn't returned node ssh connectivity data")
2083
    if not test:
2084
      if nresult[constants.NV_NODELIST]:
2085
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2086
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2087
                        "ssh communication with node '%s': %s", a_node, a_msg)
2088

    
2089
    test = constants.NV_NODENETTEST not in nresult
2090
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2091
                  "node hasn't returned node tcp connectivity data")
2092
    if not test:
2093
      if nresult[constants.NV_NODENETTEST]:
2094
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2095
        for anode in nlist:
2096
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2097
                        "tcp communication with node '%s': %s",
2098
                        anode, nresult[constants.NV_NODENETTEST][anode])
2099

    
2100
    test = constants.NV_MASTERIP not in nresult
2101
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2102
                  "node hasn't returned node master IP reachability data")
2103
    if not test:
2104
      if not nresult[constants.NV_MASTERIP]:
2105
        if ninfo.uuid == self.master_node:
2106
          msg = "the master node cannot reach the master IP (not configured?)"
2107
        else:
2108
          msg = "cannot reach the master IP"
2109
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2110

    
2111
  def _VerifyInstance(self, instance, node_image, diskstatus):
2112
    """Verify an instance.
2113

2114
    This function checks to see if the required block devices are
2115
    available on the instance's node, and that the nodes are in the correct
2116
    state.
2117

2118
    """
2119
    pnode_uuid = instance.primary_node
2120
    pnode_img = node_image[pnode_uuid]
2121
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2122

    
2123
    node_vol_should = {}
2124
    instance.MapLVsByNode(node_vol_should)
2125

    
2126
    cluster = self.cfg.GetClusterInfo()
2127
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2128
                                                            self.group_info)
2129
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2130
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2131
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2132

    
2133
    for node_uuid in node_vol_should:
2134
      n_img = node_image[node_uuid]
2135
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2136
        # ignore missing volumes on offline or broken nodes
2137
        continue
2138
      for volume in node_vol_should[node_uuid]:
2139
        test = volume not in n_img.volumes
2140
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2141
                      "volume %s missing on node %s", volume,
2142
                      self.cfg.GetNodeName(node_uuid))
2143

    
2144
    if instance.admin_state == constants.ADMINST_UP:
2145
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2146
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2147
                    "instance not running on its primary node %s",
2148
                     self.cfg.GetNodeName(pnode_uuid))
2149
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2150
                    instance.name, "instance is marked as running and lives on"
2151
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2152

    
2153
    diskdata = [(nname, success, status, idx)
2154
                for (nname, disks) in diskstatus.items()
2155
                for idx, (success, status) in enumerate(disks)]
2156

    
2157
    for nname, success, bdev_status, idx in diskdata:
2158
      # the 'ghost node' construction in Exec() ensures that we have a
2159
      # node here
2160
      snode = node_image[nname]
2161
      bad_snode = snode.ghost or snode.offline
2162
      self._ErrorIf(instance.disks_active and
2163
                    not success and not bad_snode,
2164
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2165
                    "couldn't retrieve status for disk/%s on %s: %s",
2166
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2167

    
2168
      if instance.disks_active and success and \
2169
         (bdev_status.is_degraded or
2170
          bdev_status.ldisk_status != constants.LDS_OKAY):
2171
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2172
        if bdev_status.is_degraded:
2173
          msg += " is degraded"
2174
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2175
          msg += "; state is '%s'" % \
2176
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2177

    
2178
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2179

    
2180
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2181
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2182
                  "instance %s, connection to primary node failed",
2183
                  instance.name)
2184

    
2185
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2186
                  constants.CV_EINSTANCELAYOUT, instance.name,
2187
                  "instance has multiple secondary nodes: %s",
2188
                  utils.CommaJoin(instance.secondary_nodes),
2189
                  code=self.ETYPE_WARNING)
2190

    
2191
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2192
    if any(es_flags.values()):
2193
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2194
        # Disk template not compatible with exclusive_storage: no instance
2195
        # node should have the flag set
2196
        es_nodes = [n
2197
                    for (n, es) in es_flags.items()
2198
                    if es]
2199
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2200
                    "instance has template %s, which is not supported on nodes"
2201
                    " that have exclusive storage set: %s",
2202
                    instance.disk_template,
2203
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2204
      for (idx, disk) in enumerate(instance.disks):
2205
        self._ErrorIf(disk.spindles is None,
2206
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2207
                      "number of spindles not configured for disk %s while"
2208
                      " exclusive storage is enabled, try running"
2209
                      " gnt-cluster repair-disk-sizes", idx)
2210

    
2211
    if instance.disk_template in constants.DTS_INT_MIRROR:
2212
      instance_nodes = utils.NiceSort(instance.all_nodes)
2213
      instance_groups = {}
2214

    
2215
      for node_uuid in instance_nodes:
2216
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2217
                                   []).append(node_uuid)
2218

    
2219
      pretty_list = [
2220
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2221
                           groupinfo[group].name)
2222
        # Sort so that we always list the primary node first.
2223
        for group, nodes in sorted(instance_groups.items(),
2224
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2225
                                   reverse=True)]
2226

    
2227
      self._ErrorIf(len(instance_groups) > 1,
2228
                    constants.CV_EINSTANCESPLITGROUPS,
2229
                    instance.name, "instance has primary and secondary nodes in"
2230
                    " different groups: %s", utils.CommaJoin(pretty_list),
2231
                    code=self.ETYPE_WARNING)
2232

    
2233
    inst_nodes_offline = []
2234
    for snode in instance.secondary_nodes:
2235
      s_img = node_image[snode]
2236
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2237
                    self.cfg.GetNodeName(snode),
2238
                    "instance %s, connection to secondary node failed",
2239
                    instance.name)
2240

    
2241
      if s_img.offline:
2242
        inst_nodes_offline.append(snode)
2243

    
2244
    # warn that the instance lives on offline nodes
2245
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2246
                  instance.name, "instance has offline secondary node(s) %s",
2247
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2248
    # ... or ghost/non-vm_capable nodes
2249
    for node_uuid in instance.all_nodes:
2250
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2251
                    instance.name, "instance lives on ghost node %s",
2252
                    self.cfg.GetNodeName(node_uuid))
2253
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2254
                    constants.CV_EINSTANCEBADNODE, instance.name,
2255
                    "instance lives on non-vm_capable node %s",
2256
                    self.cfg.GetNodeName(node_uuid))
2257

    
2258
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2259
    """Verify if there are any unknown volumes in the cluster.
2260

2261
    The .os, .swap and backup volumes are ignored. All other volumes are
2262
    reported as unknown.
2263

2264
    @type reserved: L{ganeti.utils.FieldSet}
2265
    @param reserved: a FieldSet of reserved volume names
2266

2267
    """
2268
    for node_uuid, n_img in node_image.items():
2269
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2270
          self.all_node_info[node_uuid].group != self.group_uuid):
2271
        # skip non-healthy nodes
2272
        continue
2273
      for volume in n_img.volumes:
2274
        test = ((node_uuid not in node_vol_should or
2275
                volume not in node_vol_should[node_uuid]) and
2276
                not reserved.Matches(volume))
2277
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2278
                      self.cfg.GetNodeName(node_uuid),
2279
                      "volume %s is unknown", volume,
2280
                      code=_VerifyErrors.ETYPE_WARNING)
2281

    
2282
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2283
    """Verify N+1 Memory Resilience.
2284

2285
    Check that if one single node dies we can still start all the
2286
    instances it was primary for.
2287

2288
    """
2289
    cluster_info = self.cfg.GetClusterInfo()
2290
    for node_uuid, n_img in node_image.items():
2291
      # This code checks that every node which is now listed as
2292
      # secondary has enough memory to host all instances it is
2293
      # supposed to should a single other node in the cluster fail.
2294
      # FIXME: not ready for failover to an arbitrary node
2295
      # FIXME: does not support file-backed instances
2296
      # WARNING: we currently take into account down instances as well
2297
      # as up ones, considering that even if they're down someone
2298
      # might want to start them even in the event of a node failure.
2299
      if n_img.offline or \
2300
         self.all_node_info[node_uuid].group != self.group_uuid:
2301
        # we're skipping nodes marked offline and nodes in other groups from
2302
        # the N+1 warning, since most likely we don't have good memory
2303
        # information from them; we already list instances living on such
2304
        # nodes, and that's enough warning
2305
        continue
2306
      #TODO(dynmem): also consider ballooning out other instances
2307
      for prinode, inst_uuids in n_img.sbp.items():
2308
        needed_mem = 0
2309
        for inst_uuid in inst_uuids:
2310
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2311
          if bep[constants.BE_AUTO_BALANCE]:
2312
            needed_mem += bep[constants.BE_MINMEM]
2313
        test = n_img.mfree < needed_mem
2314
        self._ErrorIf(test, constants.CV_ENODEN1,
2315
                      self.cfg.GetNodeName(node_uuid),
2316
                      "not enough memory to accomodate instance failovers"
2317
                      " should node %s fail (%dMiB needed, %dMiB available)",
2318
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2319

    
2320
  def _VerifyClientCertificates(self, nodes, all_nvinfo):
2321
    """Verifies the consistency of the client certificates.
2322

2323
    This includes several aspects:
2324
      - the individual validation of all nodes' certificates
2325
      - the consistency of the master candidate certificate map
2326
      - the consistency of the master candidate certificate map with the
2327
        certificates that the master candidates are actually using.
2328

2329
    @param nodes: the list of nodes to consider in this verification
2330
    @param all_nvinfo: the map of results of the verify_node call to
2331
      all nodes
2332

2333
    """
2334
    candidate_certs = self.cfg.GetClusterInfo().candidate_certs
2335
    if candidate_certs is None or len(candidate_certs) == 0:
2336
      self._ErrorIf(
2337
        True, constants.CV_ECLUSTERCLIENTCERT, None,
2338
        "The cluster's list of master candidate certificates is empty."
2339
        "If you just updated the cluster, please run"
2340
        " 'gnt-cluster renew-crypto --new-node-certificates'.")
2341
      return
2342

    
2343
    self._ErrorIf(
2344
      len(candidate_certs) != len(set(candidate_certs.values())),
2345
      constants.CV_ECLUSTERCLIENTCERT, None,
2346
      "There are at least two master candidates configured to use the same"
2347
      " certificate.")
2348

    
2349
    # collect the client certificate
2350
    for node in nodes:
2351
      if node.offline:
2352
        continue
2353

    
2354
      nresult = all_nvinfo[node.uuid]
2355
      if nresult.fail_msg or not nresult.payload:
2356
        continue
2357

    
2358
      (errcode, msg) = nresult.payload.get(constants.NV_CLIENT_CERT, None)
2359

    
2360
      self._ErrorIf(
2361
        errcode is not None, constants.CV_ECLUSTERCLIENTCERT, None,
2362
        "Client certificate of node '%s' failed validation: %s (code '%s')",
2363
        node.uuid, msg, errcode)
2364

    
2365
      if not errcode:
2366
        digest = msg
2367
        if node.master_candidate:
2368
          if node.uuid in candidate_certs:
2369
            self._ErrorIf(
2370
              digest != candidate_certs[node.uuid],
2371
              constants.CV_ECLUSTERCLIENTCERT, None,
2372
              "Client certificate digest of master candidate '%s' does not"
2373
              " match its entry in the cluster's map of master candidate"
2374
              " certificates. Expected: %s Got: %s", node.uuid,
2375
              digest, candidate_certs[node.uuid])
2376
          else:
2377
            self._ErrorIf(
2378
              True, constants.CV_ECLUSTERCLIENTCERT, None,
2379
              "The master candidate '%s' does not have an entry in the"
2380
              " map of candidate certificates.", node.uuid)
2381
            self._ErrorIf(
2382
              digest in candidate_certs.values(),
2383
              constants.CV_ECLUSTERCLIENTCERT, None,
2384
              "Master candidate '%s' is using a certificate of another node.",
2385
              node.uuid)
2386
        else:
2387
          self._ErrorIf(
2388
            node.uuid in candidate_certs,
2389
            constants.CV_ECLUSTERCLIENTCERT, None,
2390
            "Node '%s' is not a master candidate, but still listed in the"
2391
            " map of master candidate certificates.", node.uuid)
2392
          self._ErrorIf(
2393
            (node.uuid not in candidate_certs) and
2394
              (digest in candidate_certs.values()),
2395
            constants.CV_ECLUSTERCLIENTCERT, None,
2396
            "Node '%s' is not a master candidate and is incorrectly using a"
2397
            " certificate of another node which is master candidate.",
2398
            node.uuid)
2399

    
2400
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2401
                   (files_all, files_opt, files_mc, files_vm)):
2402
    """Verifies file checksums collected from all nodes.
2403

2404
    @param nodes: List of L{objects.Node} objects
2405
    @param master_node_uuid: UUID of master node
2406
    @param all_nvinfo: RPC results
2407

2408
    """
2409
    # Define functions determining which nodes to consider for a file
2410
    files2nodefn = [
2411
      (files_all, None),
2412
      (files_mc, lambda node: (node.master_candidate or
2413
                               node.uuid == master_node_uuid)),
2414
      (files_vm, lambda node: node.vm_capable),
2415
      ]
2416

    
2417
    # Build mapping from filename to list of nodes which should have the file
2418
    nodefiles = {}
2419
    for (files, fn) in files2nodefn:
2420
      if fn is None:
2421
        filenodes = nodes
2422
      else:
2423
        filenodes = filter(fn, nodes)
2424
      nodefiles.update((filename,
2425
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2426
                       for filename in files)
2427

    
2428
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2429

    
2430
    fileinfo = dict((filename, {}) for filename in nodefiles)
2431
    ignore_nodes = set()
2432

    
2433
    for node in nodes:
2434
      if node.offline:
2435
        ignore_nodes.add(node.uuid)
2436
        continue
2437

    
2438
      nresult = all_nvinfo[node.uuid]
2439

    
2440
      if nresult.fail_msg or not nresult.payload:
2441
        node_files = None
2442
      else:
2443
        fingerprints = nresult.payload.get(constants.NV_FILELIST, {})
2444
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2445
                          for (key, value) in fingerprints.items())
2446
        del fingerprints
2447

    
2448
      test = not (node_files and isinstance(node_files, dict))
2449
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2450
                    "Node did not return file checksum data")
2451
      if test:
2452
        ignore_nodes.add(node.uuid)
2453
        continue
2454

    
2455
      # Build per-checksum mapping from filename to nodes having it
2456
      for (filename, checksum) in node_files.items():
2457
        assert filename in nodefiles
2458
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2459

    
2460
    for (filename, checksums) in fileinfo.items():
2461
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2462

    
2463
      # Nodes having the file
2464
      with_file = frozenset(node_uuid
2465
                            for node_uuids in fileinfo[filename].values()
2466
                            for node_uuid in node_uuids) - ignore_nodes
2467

    
2468
      expected_nodes = nodefiles[filename] - ignore_nodes
2469

    
2470
      # Nodes missing file
2471
      missing_file = expected_nodes - with_file
2472

    
2473
      if filename in files_opt:
2474
        # All or no nodes
2475
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2476
                      constants.CV_ECLUSTERFILECHECK, None,
2477
                      "File %s is optional, but it must exist on all or no"
2478
                      " nodes (not found on %s)",
2479
                      filename,
2480
                      utils.CommaJoin(
2481
                        utils.NiceSort(
2482
                          map(self.cfg.GetNodeName, missing_file))))
2483
      else:
2484
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2485
                      "File %s is missing from node(s) %s", filename,
2486
                      utils.CommaJoin(
2487
                        utils.NiceSort(
2488
                          map(self.cfg.GetNodeName, missing_file))))
2489

    
2490
        # Warn if a node has a file it shouldn't
2491
        unexpected = with_file - expected_nodes
2492
        self._ErrorIf(unexpected,
2493
                      constants.CV_ECLUSTERFILECHECK, None,
2494
                      "File %s should not exist on node(s) %s",
2495
                      filename, utils.CommaJoin(
2496
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2497

    
2498
      # See if there are multiple versions of the file
2499
      test = len(checksums) > 1
2500
      if test:
2501
        variants = ["variant %s on %s" %
2502
                    (idx + 1,
2503
                     utils.CommaJoin(utils.NiceSort(
2504
                       map(self.cfg.GetNodeName, node_uuids))))
2505
                    for (idx, (checksum, node_uuids)) in
2506
                      enumerate(sorted(checksums.items()))]
2507
      else:
2508
        variants = []
2509

    
2510
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2511
                    "File %s found with %s different checksums (%s)",
2512
                    filename, len(checksums), "; ".join(variants))
2513

    
2514
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2515
    """Verify the drbd helper.
2516

2517
    """
2518
    if drbd_helper:
2519
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2520
      test = (helper_result is None)
2521
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2522
                    "no drbd usermode helper returned")
2523
      if helper_result:
2524
        status, payload = helper_result
2525
        test = not status
2526
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2527
                      "drbd usermode helper check unsuccessful: %s", payload)
2528
        test = status and (payload != drbd_helper)
2529
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2530
                      "wrong drbd usermode helper: %s", payload)
2531

    
2532
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2533
                      drbd_map):
2534
    """Verifies and the node DRBD status.
2535

2536
    @type ninfo: L{objects.Node}
2537
    @param ninfo: the node to check
2538
    @param nresult: the remote results for the node
2539
    @param instanceinfo: the dict of instances
2540
    @param drbd_helper: the configured DRBD usermode helper
2541
    @param drbd_map: the DRBD map as returned by
2542
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2543

2544
    """
2545
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2546

    
2547
    # compute the DRBD minors
2548
    node_drbd = {}
2549
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2550
      test = inst_uuid not in instanceinfo
2551
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2552
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2553
        # ghost instance should not be running, but otherwise we
2554
        # don't give double warnings (both ghost instance and
2555
        # unallocated minor in use)
2556
      if test:
2557
        node_drbd[minor] = (inst_uuid, False)
2558
      else:
2559
        instance = instanceinfo[inst_uuid]
2560
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2561

    
2562
    # and now check them
2563
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2564
    test = not isinstance(used_minors, (tuple, list))
2565
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2566
                  "cannot parse drbd status file: %s", str(used_minors))
2567
    if test:
2568
      # we cannot check drbd status
2569
      return
2570

    
2571
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2572
      test = minor not in used_minors and must_exist
2573
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2574
                    "drbd minor %d of instance %s is not active", minor,
2575
                    self.cfg.GetInstanceName(inst_uuid))
2576
    for minor in used_minors:
2577
      test = minor not in node_drbd
2578
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2579
                    "unallocated drbd minor %d is in use", minor)
2580

    
2581
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2582
    """Builds the node OS structures.
2583

2584
    @type ninfo: L{objects.Node}
2585
    @param ninfo: the node to check
2586
    @param nresult: the remote results for the node
2587
    @param nimg: the node image object
2588

2589
    """
2590
    remote_os = nresult.get(constants.NV_OSLIST, None)
2591
    test = (not isinstance(remote_os, list) or
2592
            not compat.all(isinstance(v, list) and len(v) == 7
2593
                           for v in remote_os))
2594

    
2595
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2596
                  "node hasn't returned valid OS data")
2597

    
2598
    nimg.os_fail = test
2599

    
2600
    if test:
2601
      return
2602

    
2603
    os_dict = {}
2604

    
2605
    for (name, os_path, status, diagnose,
2606
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2607

    
2608
      if name not in os_dict:
2609
        os_dict[name] = []
2610

    
2611
      # parameters is a list of lists instead of list of tuples due to
2612
      # JSON lacking a real tuple type, fix it:
2613
      parameters = [tuple(v) for v in parameters]
2614
      os_dict[name].append((os_path, status, diagnose,
2615
                            set(variants), set(parameters), set(api_ver)))
2616

    
2617
    nimg.oslist = os_dict
2618

    
2619
  def _VerifyNodeOS(self, ninfo, nimg, base):
2620
    """Verifies the node OS list.
2621

2622
    @type ninfo: L{objects.Node}
2623
    @param ninfo: the node to check
2624
    @param nimg: the node image object
2625
    @param base: the 'template' node we match against (e.g. from the master)
2626

2627
    """
2628
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2629

    
2630
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2631
    for os_name, os_data in nimg.oslist.items():
2632
      assert os_data, "Empty OS status for OS %s?!" % os_name
2633
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2634
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2635
                    "Invalid OS %s (located at %s): %s",
2636
                    os_name, f_path, f_diag)
2637
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2638
                    "OS '%s' has multiple entries"
2639
                    " (first one shadows the rest): %s",
2640
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2641
      # comparisons with the 'base' image
2642
      test = os_name not in base.oslist
2643
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2644
                    "Extra OS %s not present on reference node (%s)",
2645
                    os_name, self.cfg.GetNodeName(base.uuid))
2646
      if test:
2647
        continue
2648
      assert base.oslist[os_name], "Base node has empty OS status?"
2649
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2650
      if not b_status:
2651
        # base OS is invalid, skipping
2652
        continue
2653
      for kind, a, b in [("API version", f_api, b_api),
2654
                         ("variants list", f_var, b_var),
2655
                         ("parameters", beautify_params(f_param),
2656
                          beautify_params(b_param))]:
2657
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2658
                      "OS %s for %s differs from reference node %s:"
2659
                      " [%s] vs. [%s]", kind, os_name,
2660
                      self.cfg.GetNodeName(base.uuid),
2661
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2662

    
2663
    # check any missing OSes
2664
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2665
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2666
                  "OSes present on reference node %s"
2667
                  " but missing on this node: %s",
2668
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2669

    
2670
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2671
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2672

2673
    @type ninfo: L{objects.Node}
2674
    @param ninfo: the node to check
2675
    @param nresult: the remote results for the node
2676
    @type is_master: bool
2677
    @param is_master: Whether node is the master node
2678

2679
    """
2680
    cluster = self.cfg.GetClusterInfo()
2681
    if (is_master and
2682
        (cluster.IsFileStorageEnabled() or
2683
         cluster.IsSharedFileStorageEnabled())):
2684
      try:
2685
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2686
      except KeyError:
2687
        # This should never happen
2688
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2689
                      "Node did not return forbidden file storage paths")
2690
      else:
2691
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2692
                      "Found forbidden file storage paths: %s",
2693
                      utils.CommaJoin(fspaths))
2694
    else:
2695
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2696
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2697
                    "Node should not have returned forbidden file storage"
2698
                    " paths")
2699

    
2700
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2701
                          verify_key, error_key):
2702
    """Verifies (file) storage paths.
2703

2704
    @type ninfo: L{objects.Node}
2705
    @param ninfo: the node to check
2706
    @param nresult: the remote results for the node
2707
    @type file_disk_template: string
2708
    @param file_disk_template: file-based disk template, whose directory
2709
        is supposed to be verified
2710
    @type verify_key: string
2711
    @param verify_key: key for the verification map of this file
2712
        verification step
2713
    @param error_key: error key to be added to the verification results
2714
        in case something goes wrong in this verification step
2715

2716
    """
2717
    assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
2718
              constants.ST_FILE, constants.ST_SHARED_FILE
2719
           ))
2720

    
2721
    cluster = self.cfg.GetClusterInfo()
2722
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2723
      self._ErrorIf(
2724
          verify_key in nresult,
2725
          error_key, ninfo.name,
2726
          "The configured %s storage path is unusable: %s" %
2727
          (file_disk_template, nresult.get(verify_key)))
2728

    
2729
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2730
    """Verifies (file) storage paths.
2731

2732
    @see: C{_VerifyStoragePaths}
2733

2734
    """
2735
    self._VerifyStoragePaths(
2736
        ninfo, nresult, constants.DT_FILE,
2737
        constants.NV_FILE_STORAGE_PATH,
2738
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2739

    
2740
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2741
    """Verifies (file) storage paths.
2742

2743
    @see: C{_VerifyStoragePaths}
2744

2745
    """
2746
    self._VerifyStoragePaths(
2747
        ninfo, nresult, constants.DT_SHARED_FILE,
2748
        constants.NV_SHARED_FILE_STORAGE_PATH,
2749
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2750

    
2751
  def _VerifyOob(self, ninfo, nresult):
2752
    """Verifies out of band functionality of a node.
2753

2754
    @type ninfo: L{objects.Node}
2755
    @param ninfo: the node to check
2756
    @param nresult: the remote results for the node
2757

2758
    """
2759
    # We just have to verify the paths on master and/or master candidates
2760
    # as the oob helper is invoked on the master
2761
    if ((ninfo.master_candidate or ninfo.master_capable) and
2762
        constants.NV_OOB_PATHS in nresult):
2763
      for path_result in nresult[constants.NV_OOB_PATHS]:
2764
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2765
                      ninfo.name, path_result)
2766

    
2767
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2768
    """Verifies and updates the node volume data.
2769

2770
    This function will update a L{NodeImage}'s internal structures
2771
    with data from the remote call.
2772

2773
    @type ninfo: L{objects.Node}
2774
    @param ninfo: the node to check
2775
    @param nresult: the remote results for the node
2776
    @param nimg: the node image object
2777
    @param vg_name: the configured VG name
2778

2779
    """
2780
    nimg.lvm_fail = True
2781
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2782
    if vg_name is None:
2783
      pass
2784
    elif isinstance(lvdata, basestring):
2785
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2786
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2787
    elif not isinstance(lvdata, dict):
2788
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2789
                    "rpc call to node failed (lvlist)")
2790
    else:
2791
      nimg.volumes = lvdata
2792
      nimg.lvm_fail = False
2793

    
2794
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2795
    """Verifies and updates the node instance list.
2796

2797
    If the listing was successful, then updates this node's instance
2798
    list. Otherwise, it marks the RPC call as failed for the instance
2799
    list key.
2800

2801
    @type ninfo: L{objects.Node}
2802
    @param ninfo: the node to check
2803
    @param nresult: the remote results for the node
2804
    @param nimg: the node image object
2805

2806
    """
2807
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2808
    test = not isinstance(idata, list)
2809
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2810
                  "rpc call to node failed (instancelist): %s",
2811
                  utils.SafeEncode(str(idata)))
2812
    if test:
2813
      nimg.hyp_fail = True
2814
    else:
2815
      nimg.instances = [inst.uuid for (_, inst) in
2816
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2817

    
2818
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2819
    """Verifies and computes a node information map
2820

2821
    @type ninfo: L{objects.Node}
2822
    @param ninfo: the node to check
2823
    @param nresult: the remote results for the node
2824
    @param nimg: the node image object
2825
    @param vg_name: the configured VG name
2826

2827
    """
2828
    # try to read free memory (from the hypervisor)
2829
    hv_info = nresult.get(constants.NV_HVINFO, None)
2830
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2831
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2832
                  "rpc call to node failed (hvinfo)")
2833
    if not test:
2834
      try:
2835
        nimg.mfree = int(hv_info["memory_free"])
2836
      except (ValueError, TypeError):
2837
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2838
                      "node returned invalid nodeinfo, check hypervisor")
2839

    
2840
    # FIXME: devise a free space model for file based instances as well
2841
    if vg_name is not None:
2842
      test = (constants.NV_VGLIST not in nresult or
2843
              vg_name not in nresult[constants.NV_VGLIST])
2844
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2845
                    "node didn't return data for the volume group '%s'"
2846
                    " - it is either missing or broken", vg_name)
2847
      if not test:
2848
        try:
2849
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2850
        except (ValueError, TypeError):
2851
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2852
                        "node returned invalid LVM info, check LVM status")
2853

    
2854
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2855
    """Gets per-disk status information for all instances.
2856

2857
    @type node_uuids: list of strings
2858
    @param node_uuids: Node UUIDs
2859
    @type node_image: dict of (UUID, L{objects.Node})
2860
    @param node_image: Node objects
2861
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2862
    @param instanceinfo: Instance objects
2863
    @rtype: {instance: {node: [(succes, payload)]}}
2864
    @return: a dictionary of per-instance dictionaries with nodes as
2865
        keys and disk information as values; the disk information is a
2866
        list of tuples (success, payload)
2867

2868
    """
2869
    node_disks = {}
2870
    node_disks_dev_inst_only = {}
2871
    diskless_instances = set()
2872
    diskless = constants.DT_DISKLESS
2873

    
2874
    for nuuid in node_uuids:
2875
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2876
                                             node_image[nuuid].sinst))
2877
      diskless_instances.update(uuid for uuid in node_inst_uuids
2878
                                if instanceinfo[uuid].disk_template == diskless)
2879
      disks = [(inst_uuid, disk)
2880
               for inst_uuid in node_inst_uuids
2881
               for disk in instanceinfo[inst_uuid].disks]
2882

    
2883
      if not disks:
2884
        # No need to collect data
2885
        continue
2886

    
2887
      node_disks[nuuid] = disks
2888

    
2889
      # _AnnotateDiskParams makes already copies of the disks
2890
      dev_inst_only = []
2891
      for (inst_uuid, dev) in disks:
2892
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2893
                                          self.cfg)
2894
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2895

    
2896
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2897

    
2898
    assert len(node_disks) == len(node_disks_dev_inst_only)
2899

    
2900
    # Collect data from all nodes with disks
2901
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2902
               node_disks.keys(), node_disks_dev_inst_only)
2903

    
2904
    assert len(result) == len(node_disks)
2905

    
2906
    instdisk = {}
2907

    
2908
    for (nuuid, nres) in result.items():
2909
      node = self.cfg.GetNodeInfo(nuuid)
2910
      disks = node_disks[node.uuid]
2911

    
2912
      if nres.offline:
2913
        # No data from this node
2914
        data = len(disks) * [(False, "node offline")]
2915
      else:
2916
        msg = nres.fail_msg
2917
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2918
                      "while getting disk information: %s", msg)
2919
        if msg:
2920
          # No data from this node
2921
          data = len(disks) * [(False, msg)]
2922
        else:
2923
          data = []
2924
          for idx, i in enumerate(nres.payload):
2925
            if isinstance(i, (tuple, list)) and len(i) == 2:
2926
              data.append(i)
2927
            else:
2928
              logging.warning("Invalid result from node %s, entry %d: %s",
2929
                              node.name, idx, i)
2930
              data.append((False, "Invalid result from the remote node"))
2931

    
2932
      for ((inst_uuid, _), status) in zip(disks, data):
2933
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2934
          .append(status)
2935

    
2936
    # Add empty entries for diskless instances.
2937
    for inst_uuid in diskless_instances:
2938
      assert inst_uuid not in instdisk
2939
      instdisk[inst_uuid] = {}
2940

    
2941
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2942
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2943
                      compat.all(isinstance(s, (tuple, list)) and
2944
                                 len(s) == 2 for s in statuses)
2945
                      for inst, nuuids in instdisk.items()
2946
                      for nuuid, statuses in nuuids.items())
2947
    if __debug__:
2948
      instdisk_keys = set(instdisk)
2949
      instanceinfo_keys = set(instanceinfo)
2950
      assert instdisk_keys == instanceinfo_keys, \
2951
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2952
         (instdisk_keys, instanceinfo_keys))
2953

    
2954
    return instdisk
2955

    
2956
  @staticmethod
2957
  def _SshNodeSelector(group_uuid, all_nodes):
2958
    """Create endless iterators for all potential SSH check hosts.
2959

2960
    """
2961
    nodes = [node for node in all_nodes
2962
             if (node.group != group_uuid and
2963
                 not node.offline)]
2964
    keyfunc = operator.attrgetter("group")
2965

    
2966
    return map(itertools.cycle,
2967
               [sorted(map(operator.attrgetter("name"), names))
2968
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2969
                                                  keyfunc)])
2970

    
2971
  @classmethod
2972
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2973
    """Choose which nodes should talk to which other nodes.
2974

2975
    We will make nodes contact all nodes in their group, and one node from
2976
    every other group.
2977

2978
    @warning: This algorithm has a known issue if one node group is much
2979
      smaller than others (e.g. just one node). In such a case all other
2980
      nodes will talk to the single node.
2981

2982
    """
2983
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2984
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2985

    
2986
    return (online_nodes,
2987
            dict((name, sorted([i.next() for i in sel]))
2988
                 for name in online_nodes))
2989

    
2990
  def BuildHooksEnv(self):
2991
    """Build hooks env.
2992

2993
    Cluster-Verify hooks just ran in the post phase and their failure makes
2994
    the output be logged in the verify output and the verification to fail.
2995

2996
    """
2997
    env = {
2998
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2999
      }
3000

    
3001
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
3002
               for node in self.my_node_info.values())
3003

    
3004
    return env
3005

    
3006
  def BuildHooksNodes(self):
3007
    """Build hooks nodes.
3008

3009
    """
3010
    return ([], list(self.my_node_info.keys()))
3011

    
3012
  def Exec(self, feedback_fn):
3013
    """Verify integrity of the node group, performing various test on nodes.
3014

3015
    """
3016
    # This method has too many local variables. pylint: disable=R0914
3017
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
3018

    
3019
    if not self.my_node_uuids:
3020
      # empty node group
3021
      feedback_fn("* Empty node group, skipping verification")
3022
      return True
3023

    
3024
    self.bad = False
3025
    verbose = self.op.verbose
3026
    self._feedback_fn = feedback_fn
3027

    
3028
    vg_name = self.cfg.GetVGName()
3029
    drbd_helper = self.cfg.GetDRBDHelper()
3030
    cluster = self.cfg.GetClusterInfo()
3031
    hypervisors = cluster.enabled_hypervisors
3032
    node_data_list = self.my_node_info.values()
3033

    
3034
    i_non_redundant = [] # Non redundant instances
3035
    i_non_a_balanced = [] # Non auto-balanced instances
3036
    i_offline = 0 # Count of offline instances
3037
    n_offline = 0 # Count of offline nodes
3038
    n_drained = 0 # Count of nodes being drained
3039
    node_vol_should = {}
3040

    
3041
    # FIXME: verify OS list
3042

    
3043
    # File verification
3044
    filemap = ComputeAncillaryFiles(cluster, False)
3045

    
3046
    # do local checksums
3047
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
3048
    master_ip = self.cfg.GetMasterIP()
3049

    
3050
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
3051

    
3052
    user_scripts = []
3053
    if self.cfg.GetUseExternalMipScript():
3054
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
3055

    
3056
    node_verify_param = {
3057
      constants.NV_FILELIST:
3058
        map(vcluster.MakeVirtualPath,
3059
            utils.UniqueSequence(filename
3060
                                 for files in filemap
3061
                                 for filename in files)),
3062
      constants.NV_NODELIST:
3063
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
3064
                                  self.all_node_info.values()),
3065
      constants.NV_HYPERVISOR: hypervisors,
3066
      constants.NV_HVPARAMS:
3067
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
3068
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
3069
                                 for node in node_data_list
3070
                                 if not node.offline],
3071
      constants.NV_INSTANCELIST: hypervisors,
3072
      constants.NV_VERSION: None,
3073
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
3074
      constants.NV_NODESETUP: None,
3075
      constants.NV_TIME: None,
3076
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
3077
      constants.NV_OSLIST: None,
3078
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
3079
      constants.NV_USERSCRIPTS: user_scripts,
3080
      constants.NV_CLIENT_CERT: None,
3081
      }
3082

    
3083
    if vg_name is not None:
3084
      node_verify_param[constants.NV_VGLIST] = None
3085
      node_verify_param[constants.NV_LVLIST] = vg_name
3086
      node_verify_param[constants.NV_PVLIST] = [vg_name]
3087

    
3088
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
3089
      if drbd_helper:
3090
        node_verify_param[constants.NV_DRBDVERSION] = None
3091
        node_verify_param[constants.NV_DRBDLIST] = None
3092
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
3093

    
3094
    if cluster.IsFileStorageEnabled() or \
3095
        cluster.IsSharedFileStorageEnabled():
3096
      # Load file storage paths only from master node
3097
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
3098
        self.cfg.GetMasterNodeName()
3099
      if cluster.IsFileStorageEnabled():
3100
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
3101
          cluster.file_storage_dir
3102

    
3103
    # bridge checks
3104
    # FIXME: this needs to be changed per node-group, not cluster-wide
3105
    bridges = set()
3106
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
3107
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3108
      bridges.add(default_nicpp[constants.NIC_LINK])
3109
    for inst_uuid in self.my_inst_info.values():
3110
      for nic in inst_uuid.nics:
3111
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
3112
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3113
          bridges.add(full_nic[constants.NIC_LINK])
3114

    
3115
    if bridges:
3116
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
3117

    
3118
    # Build our expected cluster state
3119
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
3120
                                                 uuid=node.uuid,
3121
                                                 vm_capable=node.vm_capable))
3122
                      for node in node_data_list)
3123

    
3124
    # Gather OOB paths
3125
    oob_paths = []
3126
    for node in self.all_node_info.values():
3127
      path = SupportsOob(self.cfg, node)
3128
      if path and path not in oob_paths:
3129
        oob_paths.append(path)
3130

    
3131
    if oob_paths:
3132
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
3133

    
3134
    for inst_uuid in self.my_inst_uuids:
3135
      instance = self.my_inst_info[inst_uuid]
3136
      if instance.admin_state == constants.ADMINST_OFFLINE:
3137
        i_offline += 1
3138

    
3139
      for nuuid in instance.all_nodes:
3140
        if nuuid not in node_image:
3141
          gnode = self.NodeImage(uuid=nuuid)
3142
          gnode.ghost = (nuuid not in self.all_node_info)
3143
          node_image[nuuid] = gnode
3144

    
3145
      instance.MapLVsByNode(node_vol_should)
3146

    
3147
      pnode = instance.primary_node
3148
      node_image[pnode].pinst.append(instance.uuid)
3149

    
3150
      for snode in instance.secondary_nodes:
3151
        nimg = node_image[snode]
3152
        nimg.sinst.append(instance.uuid)
3153
        if pnode not in nimg.sbp:
3154
          nimg.sbp[pnode] = []
3155
        nimg.sbp[pnode].append(instance.uuid)
3156

    
3157
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3158
                                               self.my_node_info.keys())
3159
    # The value of exclusive_storage should be the same across the group, so if
3160
    # it's True for at least a node, we act as if it were set for all the nodes
3161
    self._exclusive_storage = compat.any(es_flags.values())
3162
    if self._exclusive_storage:
3163
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3164

    
3165
    node_group_uuids = dict(map(lambda n: (n.name, n.group),
3166
                                self.cfg.GetAllNodesInfo().values()))
3167
    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
3168

    
3169
    # At this point, we have the in-memory data structures complete,
3170
    # except for the runtime information, which we'll gather next
3171

    
3172
    # Due to the way our RPC system works, exact response times cannot be
3173
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3174
    # time before and after executing the request, we can at least have a time
3175
    # window.
3176
    nvinfo_starttime = time.time()
3177
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3178
                                           node_verify_param,
3179
                                           self.cfg.GetClusterName(),
3180
                                           self.cfg.GetClusterInfo().hvparams,
3181
                                           node_group_uuids,
3182
                                           groups_config)
3183
    nvinfo_endtime = time.time()
3184

    
3185
    if self.extra_lv_nodes and vg_name is not None:
3186
      extra_lv_nvinfo = \
3187
          self.rpc.call_node_verify(self.extra_lv_nodes,
3188
                                    {constants.NV_LVLIST: vg_name},
3189
                                    self.cfg.GetClusterName(),
3190
                                    self.cfg.GetClusterInfo().hvparams,
3191
                                    node_group_uuids,
3192
                                    groups_config)
3193
    else:
3194
      extra_lv_nvinfo = {}
3195

    
3196
    all_drbd_map = self.cfg.ComputeDRBDMap()
3197

    
3198
    feedback_fn("* Gathering disk information (%s nodes)" %
3199
                len(self.my_node_uuids))
3200
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3201
                                     self.my_inst_info)
3202

    
3203
    feedback_fn("* Verifying configuration file consistency")
3204

    
3205
    # If not all nodes are being checked, we need to make sure the master node
3206
    # and a non-checked vm_capable node are in the list.
3207
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3208
    if absent_node_uuids:
3209
      vf_nvinfo = all_nvinfo.copy()
3210
      vf_node_info = list(self.my_node_info.values())
3211
      additional_node_uuids = []
3212
      if master_node_uuid not in self.my_node_info:
3213
        additional_node_uuids.append(master_node_uuid)
3214
        vf_node_info.append(self.all_node_info[master_node_uuid])
3215
      # Add the first vm_capable node we find which is not included,
3216
      # excluding the master node (which we already have)
3217
      for node_uuid in absent_node_uuids:
3218
        nodeinfo = self.all_node_info[node_uuid]
3219
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3220
            node_uuid != master_node_uuid):
3221
          additional_node_uuids.append(node_uuid)
3222
          vf_node_info.append(self.all_node_info[node_uuid])
3223
          break
3224
      key = constants.NV_FILELIST
3225
      vf_nvinfo.update(self.rpc.call_node_verify(
3226
         additional_node_uuids, {key: node_verify_param[key]},
3227
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
3228
         node_group_uuids,
3229
         groups_config))
3230
    else:
3231
      vf_nvinfo = all_nvinfo
3232
      vf_node_info = self.my_node_info.values()
3233

    
3234
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3235

    
3236
    feedback_fn("* Verifying node status")
3237

    
3238
    refos_img = None
3239

    
3240
    for node_i in node_data_list:
3241
      nimg = node_image[node_i.uuid]
3242

    
3243
      if node_i.offline:
3244
        if verbose:
3245
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3246
        n_offline += 1
3247
        continue
3248

    
3249
      if node_i.uuid == master_node_uuid:
3250
        ntype = "master"
3251
      elif node_i.master_candidate:
3252
        ntype = "master candidate"
3253
      elif node_i.drained:
3254
        ntype = "drained"
3255
        n_drained += 1
3256
      else:
3257
        ntype = "regular"
3258
      if verbose:
3259
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3260

    
3261
      msg = all_nvinfo[node_i.uuid].fail_msg
3262
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3263
                    "while contacting node: %s", msg)
3264
      if msg:
3265
        nimg.rpc_fail = True
3266
        continue
3267

    
3268
      nresult = all_nvinfo[node_i.uuid].payload
3269

    
3270
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3271
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3272
      self._VerifyNodeNetwork(node_i, nresult)
3273
      self._VerifyNodeUserScripts(node_i, nresult)
3274
      self._VerifyOob(node_i, nresult)
3275
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3276
                                           node_i.uuid == master_node_uuid)
3277
      self._VerifyFileStoragePaths(node_i, nresult)
3278
      self._VerifySharedFileStoragePaths(node_i, nresult)
3279

    
3280
      if nimg.vm_capable:
3281
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3282
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3283
                             all_drbd_map)
3284

    
3285
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3286
        self._UpdateNodeInstances(node_i, nresult, nimg)
3287
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3288
        self._UpdateNodeOS(node_i, nresult, nimg)
3289

    
3290
        if not nimg.os_fail:
3291
          if refos_img is None:
3292
            refos_img = nimg
3293
          self._VerifyNodeOS(node_i, nimg, refos_img)
3294
        self._VerifyNodeBridges(node_i, nresult, bridges)
3295

    
3296
        # Check whether all running instances are primary for the node. (This
3297
        # can no longer be done from _VerifyInstance below, since some of the
3298
        # wrong instances could be from other node groups.)
3299
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3300

    
3301
        for inst_uuid in non_primary_inst_uuids:
3302
          test = inst_uuid in self.all_inst_info
3303
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3304
                        self.cfg.GetInstanceName(inst_uuid),
3305
                        "instance should not run on node %s", node_i.name)
3306
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3307
                        "node is running unknown instance %s", inst_uuid)
3308

    
3309
    self._VerifyGroupDRBDVersion(all_nvinfo)
3310
    self._VerifyGroupLVM(node_image, vg_name)
3311

    
3312
    for node_uuid, result in extra_lv_nvinfo.items():
3313
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3314
                              node_image[node_uuid], vg_name)
3315

    
3316
    feedback_fn("* Verifying instance status")
3317
    for inst_uuid in self.my_inst_uuids:
3318
      instance = self.my_inst_info[inst_uuid]
3319
      if verbose:
3320
        feedback_fn("* Verifying instance %s" % instance.name)
3321
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3322

    
3323
      # If the instance is non-redundant we cannot survive losing its primary
3324
      # node, so we are not N+1 compliant.
3325
      if instance.disk_template not in constants.DTS_MIRRORED:
3326
        i_non_redundant.append(instance)
3327

    
3328
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3329
        i_non_a_balanced.append(instance)
3330

    
3331
    feedback_fn("* Verifying orphan volumes")
3332
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3333

    
3334
    # We will get spurious "unknown volume" warnings if any node of this group
3335
    # is secondary for an instance whose primary is in another group. To avoid
3336
    # them, we find these instances and add their volumes to node_vol_should.
3337
    for instance in self.all_inst_info.values():
3338
      for secondary in instance.secondary_nodes:
3339
        if (secondary in self.my_node_info
3340
            and instance.name not in self.my_inst_info):
3341
          instance.MapLVsByNode(node_vol_should)
3342
          break
3343

    
3344
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3345

    
3346
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3347
      feedback_fn("* Verifying N+1 Memory redundancy")
3348
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3349

    
3350
    feedback_fn("* Other Notes")
3351
    if i_non_redundant:
3352
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3353
                  % len(i_non_redundant))
3354

    
3355
    if i_non_a_balanced:
3356
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3357
                  % len(i_non_a_balanced))
3358

    
3359
    if i_offline:
3360
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3361

    
3362
    if n_offline:
3363
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3364

    
3365
    if n_drained:
3366
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3367

    
3368
    return not self.bad
3369

    
3370
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3371
    """Analyze the post-hooks' result
3372

3373
    This method analyses the hook result, handles it, and sends some
3374
    nicely-formatted feedback back to the user.
3375

3376
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3377
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3378
    @param hooks_results: the results of the multi-node hooks rpc call
3379
    @param feedback_fn: function used send feedback back to the caller
3380
    @param lu_result: previous Exec result
3381
    @return: the new Exec result, based on the previous result
3382
        and hook results
3383

3384
    """
3385
    # We only really run POST phase hooks, only for non-empty groups,
3386
    # and are only interested in their results
3387
    if not self.my_node_uuids:
3388
      # empty node group
3389
      pass
3390
    elif phase == constants.HOOKS_PHASE_POST:
3391
      # Used to change hooks' output to proper indentation
3392
      feedback_fn("* Hooks Results")
3393
      assert hooks_results, "invalid result from hooks"
3394

    
3395
      for node_name in hooks_results:
3396
        res = hooks_results[node_name]
3397
        msg = res.fail_msg
3398
        test = msg and not res.offline
3399
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3400
                      "Communication failure in hooks execution: %s", msg)
3401
        if res.offline or msg:
3402
          # No need to investigate payload if node is offline or gave
3403
          # an error.
3404
          continue
3405
        for script, hkr, output in res.payload:
3406
          test = hkr == constants.HKR_FAIL
3407
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3408
                        "Script %s failed, output:", script)
3409
          if test:
3410
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3411
            feedback_fn("%s" % output)
3412
            lu_result = False
3413

    
3414
    return lu_result
3415

    
3416

    
3417
class LUClusterVerifyDisks(NoHooksLU):
3418
  """Verifies the cluster disks status.
3419

3420
  """
3421
  REQ_BGL = False
3422

    
3423
  def ExpandNames(self):
3424
    self.share_locks = ShareAll()
3425
    self.needed_locks = {
3426
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3427
      }
3428

    
3429
  def Exec(self, feedback_fn):
3430
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3431

    
3432
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3433
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3434
                           for group in group_names])