Statistics
| Branch: | Tag: | Revision:

root / lib / cmdlib / cluster.py @ b3cc1646

History | View | Annotate | Download (125.4 kB)

1
#
2
#
3

    
4
# Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5
#
6
# This program is free software; you can redistribute it and/or modify
7
# it under the terms of the GNU General Public License as published by
8
# the Free Software Foundation; either version 2 of the License, or
9
# (at your option) any later version.
10
#
11
# This program is distributed in the hope that it will be useful, but
12
# WITHOUT ANY WARRANTY; without even the implied warranty of
13
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14
# General Public License for more details.
15
#
16
# You should have received a copy of the GNU General Public License
17
# along with this program; if not, write to the Free Software
18
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19
# 02110-1301, USA.
20

    
21

    
22
"""Logical units dealing with the cluster."""
23

    
24
import OpenSSL
25

    
26
import copy
27
import itertools
28
import logging
29
import operator
30
import os
31
import re
32
import time
33

    
34
from ganeti import compat
35
from ganeti import constants
36
from ganeti import errors
37
from ganeti import hypervisor
38
from ganeti import locking
39
from ganeti import masterd
40
from ganeti import netutils
41
from ganeti import objects
42
from ganeti import opcodes
43
from ganeti import pathutils
44
from ganeti import query
45
import ganeti.rpc.node as rpc
46
from ganeti import runtime
47
from ganeti import ssh
48
from ganeti import uidpool
49
from ganeti import utils
50
from ganeti import vcluster
51

    
52
from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53
  ResultWithJobs
54
from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55
  ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56
  GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57
  GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58
  CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59
  ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60
  CheckIpolicyVsDiskTemplates, CheckDiskAccessModeValidity, \
61
  CheckDiskAccessModeConsistency, CreateNewClientCert
62

    
63
import ganeti.masterd.instance
64

    
65

    
66
def _UpdateMasterClientCert(
67
    lu, master_uuid, cluster, feedback_fn,
68
    client_cert=pathutils.NODED_CLIENT_CERT_FILE,
69
    client_cert_tmp=pathutils.NODED_CLIENT_CERT_FILE_TMP):
70
  """Renews the master's client certificate and propagates the config.
71

72
  @type lu: C{LogicalUnit}
73
  @param lu: the logical unit holding the config
74
  @type master_uuid: string
75
  @param master_uuid: the master node's UUID
76
  @type cluster: C{objects.Cluster}
77
  @param cluster: the cluster's configuration
78
  @type feedback_fn: function
79
  @param feedback_fn: feedback functions for config updates
80
  @type client_cert: string
81
  @param client_cert: the path of the client certificate
82
  @type client_cert_tmp: string
83
  @param client_cert_tmp: the temporary path of the client certificate
84
  @rtype: string
85
  @return: the digest of the newly created client certificate
86

87
  """
88
  client_digest = CreateNewClientCert(lu, master_uuid, filename=client_cert_tmp)
89
  utils.AddNodeToCandidateCerts(master_uuid, client_digest,
90
                                cluster.candidate_certs)
91
  # This triggers an update of the config and distribution of it with the old
92
  # SSL certificate
93
  lu.cfg.Update(cluster, feedback_fn)
94

    
95
  utils.RemoveFile(client_cert)
96
  utils.RenameFile(client_cert_tmp, client_cert)
97
  return client_digest
98

    
99

    
100
class LUClusterRenewCrypto(NoHooksLU):
101
  """Renew the cluster's crypto tokens.
102

103
  Note that most of this operation is done in gnt_cluster.py, this LU only
104
  takes care of the renewal of the client SSL certificates.
105

106
  """
107
  def Exec(self, feedback_fn):
108
    master_uuid = self.cfg.GetMasterNode()
109
    cluster = self.cfg.GetClusterInfo()
110

    
111
    server_digest = utils.GetCertificateDigest(
112
      cert_filename=pathutils.NODED_CERT_FILE)
113
    utils.AddNodeToCandidateCerts("%s-SERVER" % master_uuid,
114
                                  server_digest,
115
                                  cluster.candidate_certs)
116
    new_master_digest = _UpdateMasterClientCert(self, master_uuid, cluster,
117
                                                feedback_fn)
118

    
119
    cluster.candidate_certs = {master_uuid: new_master_digest}
120
    nodes = self.cfg.GetAllNodesInfo()
121
    for (node_uuid, node_info) in nodes.items():
122
      if node_uuid != master_uuid:
123
        new_digest = CreateNewClientCert(self, node_uuid)
124
        if node_info.master_candidate:
125
          cluster.candidate_certs[node_uuid] = new_digest
126
    # Trigger another update of the config now with the new master cert
127
    self.cfg.Update(cluster, feedback_fn)
128

    
129

    
130
class LUClusterActivateMasterIp(NoHooksLU):
131
  """Activate the master IP on the master node.
132

133
  """
134
  def Exec(self, feedback_fn):
135
    """Activate the master IP.
136

137
    """
138
    master_params = self.cfg.GetMasterNetworkParameters()
139
    ems = self.cfg.GetUseExternalMipScript()
140
    result = self.rpc.call_node_activate_master_ip(master_params.uuid,
141
                                                   master_params, ems)
142
    result.Raise("Could not activate the master IP")
143

    
144

    
145
class LUClusterDeactivateMasterIp(NoHooksLU):
146
  """Deactivate the master IP on the master node.
147

148
  """
149
  def Exec(self, feedback_fn):
150
    """Deactivate the master IP.
151

152
    """
153
    master_params = self.cfg.GetMasterNetworkParameters()
154
    ems = self.cfg.GetUseExternalMipScript()
155
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
156
                                                     master_params, ems)
157
    result.Raise("Could not deactivate the master IP")
158

    
159

    
160
class LUClusterConfigQuery(NoHooksLU):
161
  """Return configuration values.
162

163
  """
164
  REQ_BGL = False
165

    
166
  def CheckArguments(self):
167
    self.cq = ClusterQuery(None, self.op.output_fields, False)
168

    
169
  def ExpandNames(self):
170
    self.cq.ExpandNames(self)
171

    
172
  def DeclareLocks(self, level):
173
    self.cq.DeclareLocks(self, level)
174

    
175
  def Exec(self, feedback_fn):
176
    result = self.cq.OldStyleQuery(self)
177

    
178
    assert len(result) == 1
179

    
180
    return result[0]
181

    
182

    
183
class LUClusterDestroy(LogicalUnit):
184
  """Logical unit for destroying the cluster.
185

186
  """
187
  HPATH = "cluster-destroy"
188
  HTYPE = constants.HTYPE_CLUSTER
189

    
190
  def BuildHooksEnv(self):
191
    """Build hooks env.
192

193
    """
194
    return {
195
      "OP_TARGET": self.cfg.GetClusterName(),
196
      }
197

    
198
  def BuildHooksNodes(self):
199
    """Build hooks nodes.
200

201
    """
202
    return ([], [])
203

    
204
  def CheckPrereq(self):
205
    """Check prerequisites.
206

207
    This checks whether the cluster is empty.
208

209
    Any errors are signaled by raising errors.OpPrereqError.
210

211
    """
212
    master = self.cfg.GetMasterNode()
213

    
214
    nodelist = self.cfg.GetNodeList()
215
    if len(nodelist) != 1 or nodelist[0] != master:
216
      raise errors.OpPrereqError("There are still %d node(s) in"
217
                                 " this cluster." % (len(nodelist) - 1),
218
                                 errors.ECODE_INVAL)
219
    instancelist = self.cfg.GetInstanceList()
220
    if instancelist:
221
      raise errors.OpPrereqError("There are still %d instance(s) in"
222
                                 " this cluster." % len(instancelist),
223
                                 errors.ECODE_INVAL)
224

    
225
  def Exec(self, feedback_fn):
226
    """Destroys the cluster.
227

228
    """
229
    master_params = self.cfg.GetMasterNetworkParameters()
230

    
231
    # Run post hooks on master node before it's removed
232
    RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
233

    
234
    ems = self.cfg.GetUseExternalMipScript()
235
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
236
                                                     master_params, ems)
237
    result.Warn("Error disabling the master IP address", self.LogWarning)
238
    return master_params.uuid
239

    
240

    
241
class LUClusterPostInit(LogicalUnit):
242
  """Logical unit for running hooks after cluster initialization.
243

244
  """
245
  HPATH = "cluster-init"
246
  HTYPE = constants.HTYPE_CLUSTER
247

    
248
  def CheckArguments(self):
249
    self.master_uuid = self.cfg.GetMasterNode()
250
    self.master_ndparams = self.cfg.GetNdParams(self.cfg.GetMasterNodeInfo())
251

    
252
    # TODO: When Issue 584 is solved, and None is properly parsed when used
253
    # as a default value, ndparams.get(.., None) can be changed to
254
    # ndparams[..] to access the values directly
255

    
256
    # OpenvSwitch: Warn user if link is missing
257
    if (self.master_ndparams[constants.ND_OVS] and not
258
        self.master_ndparams.get(constants.ND_OVS_LINK, None)):
259
      self.LogInfo("No physical interface for OpenvSwitch was given."
260
                   " OpenvSwitch will not have an outside connection. This"
261
                   " might not be what you want.")
262

    
263
  def BuildHooksEnv(self):
264
    """Build hooks env.
265

266
    """
267
    return {
268
      "OP_TARGET": self.cfg.GetClusterName(),
269
      }
270

    
271
  def BuildHooksNodes(self):
272
    """Build hooks nodes.
273

274
    """
275
    return ([], [self.cfg.GetMasterNode()])
276

    
277
  def Exec(self, feedback_fn):
278
    """Create and configure Open vSwitch
279

280
    """
281
    if self.master_ndparams[constants.ND_OVS]:
282
      result = self.rpc.call_node_configure_ovs(
283
                 self.master_uuid,
284
                 self.master_ndparams[constants.ND_OVS_NAME],
285
                 self.master_ndparams.get(constants.ND_OVS_LINK, None))
286
      result.Raise("Could not successully configure Open vSwitch")
287

    
288
    cluster = self.cfg.GetClusterInfo()
289
    _UpdateMasterClientCert(self, self.master_uuid, cluster, feedback_fn)
290

    
291
    return True
292

    
293

    
294
class ClusterQuery(QueryBase):
295
  FIELDS = query.CLUSTER_FIELDS
296

    
297
  #: Do not sort (there is only one item)
298
  SORT_FIELD = None
299

    
300
  def ExpandNames(self, lu):
301
    lu.needed_locks = {}
302

    
303
    # The following variables interact with _QueryBase._GetNames
304
    self.wanted = locking.ALL_SET
305
    self.do_locking = self.use_locking
306

    
307
    if self.do_locking:
308
      raise errors.OpPrereqError("Can not use locking for cluster queries",
309
                                 errors.ECODE_INVAL)
310

    
311
  def DeclareLocks(self, lu, level):
312
    pass
313

    
314
  def _GetQueryData(self, lu):
315
    """Computes the list of nodes and their attributes.
316

317
    """
318
    # Locking is not used
319
    assert not (compat.any(lu.glm.is_owned(level)
320
                           for level in locking.LEVELS
321
                           if level != locking.LEVEL_CLUSTER) or
322
                self.do_locking or self.use_locking)
323

    
324
    if query.CQ_CONFIG in self.requested_data:
325
      cluster = lu.cfg.GetClusterInfo()
326
      nodes = lu.cfg.GetAllNodesInfo()
327
    else:
328
      cluster = NotImplemented
329
      nodes = NotImplemented
330

    
331
    if query.CQ_QUEUE_DRAINED in self.requested_data:
332
      drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
333
    else:
334
      drain_flag = NotImplemented
335

    
336
    if query.CQ_WATCHER_PAUSE in self.requested_data:
337
      master_node_uuid = lu.cfg.GetMasterNode()
338

    
339
      result = lu.rpc.call_get_watcher_pause(master_node_uuid)
340
      result.Raise("Can't retrieve watcher pause from master node '%s'" %
341
                   lu.cfg.GetMasterNodeName())
342

    
343
      watcher_pause = result.payload
344
    else:
345
      watcher_pause = NotImplemented
346

    
347
    return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
348

    
349

    
350
class LUClusterQuery(NoHooksLU):
351
  """Query cluster configuration.
352

353
  """
354
  REQ_BGL = False
355

    
356
  def ExpandNames(self):
357
    self.needed_locks = {}
358

    
359
  def Exec(self, feedback_fn):
360
    """Return cluster config.
361

362
    """
363
    cluster = self.cfg.GetClusterInfo()
364
    os_hvp = {}
365

    
366
    # Filter just for enabled hypervisors
367
    for os_name, hv_dict in cluster.os_hvp.items():
368
      os_hvp[os_name] = {}
369
      for hv_name, hv_params in hv_dict.items():
370
        if hv_name in cluster.enabled_hypervisors:
371
          os_hvp[os_name][hv_name] = hv_params
372

    
373
    # Convert ip_family to ip_version
374
    primary_ip_version = constants.IP4_VERSION
375
    if cluster.primary_ip_family == netutils.IP6Address.family:
376
      primary_ip_version = constants.IP6_VERSION
377

    
378
    result = {
379
      "software_version": constants.RELEASE_VERSION,
380
      "protocol_version": constants.PROTOCOL_VERSION,
381
      "config_version": constants.CONFIG_VERSION,
382
      "os_api_version": max(constants.OS_API_VERSIONS),
383
      "export_version": constants.EXPORT_VERSION,
384
      "vcs_version": constants.VCS_VERSION,
385
      "architecture": runtime.GetArchInfo(),
386
      "name": cluster.cluster_name,
387
      "master": self.cfg.GetMasterNodeName(),
388
      "default_hypervisor": cluster.primary_hypervisor,
389
      "enabled_hypervisors": cluster.enabled_hypervisors,
390
      "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
391
                        for hypervisor_name in cluster.enabled_hypervisors]),
392
      "os_hvp": os_hvp,
393
      "beparams": cluster.beparams,
394
      "osparams": cluster.osparams,
395
      "ipolicy": cluster.ipolicy,
396
      "nicparams": cluster.nicparams,
397
      "ndparams": cluster.ndparams,
398
      "diskparams": cluster.diskparams,
399
      "candidate_pool_size": cluster.candidate_pool_size,
400
      "master_netdev": cluster.master_netdev,
401
      "master_netmask": cluster.master_netmask,
402
      "use_external_mip_script": cluster.use_external_mip_script,
403
      "volume_group_name": cluster.volume_group_name,
404
      "drbd_usermode_helper": cluster.drbd_usermode_helper,
405
      "file_storage_dir": cluster.file_storage_dir,
406
      "shared_file_storage_dir": cluster.shared_file_storage_dir,
407
      "maintain_node_health": cluster.maintain_node_health,
408
      "ctime": cluster.ctime,
409
      "mtime": cluster.mtime,
410
      "uuid": cluster.uuid,
411
      "tags": list(cluster.GetTags()),
412
      "uid_pool": cluster.uid_pool,
413
      "default_iallocator": cluster.default_iallocator,
414
      "default_iallocator_params": cluster.default_iallocator_params,
415
      "reserved_lvs": cluster.reserved_lvs,
416
      "primary_ip_version": primary_ip_version,
417
      "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
418
      "hidden_os": cluster.hidden_os,
419
      "blacklisted_os": cluster.blacklisted_os,
420
      "enabled_disk_templates": cluster.enabled_disk_templates,
421
      }
422

    
423
    return result
424

    
425

    
426
class LUClusterRedistConf(NoHooksLU):
427
  """Force the redistribution of cluster configuration.
428

429
  This is a very simple LU.
430

431
  """
432
  REQ_BGL = False
433

    
434
  def ExpandNames(self):
435
    self.needed_locks = {
436
      locking.LEVEL_NODE: locking.ALL_SET,
437
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
438
    }
439
    self.share_locks = ShareAll()
440

    
441
  def Exec(self, feedback_fn):
442
    """Redistribute the configuration.
443

444
    """
445
    self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
446
    RedistributeAncillaryFiles(self)
447

    
448

    
449
class LUClusterRename(LogicalUnit):
450
  """Rename the cluster.
451

452
  """
453
  HPATH = "cluster-rename"
454
  HTYPE = constants.HTYPE_CLUSTER
455

    
456
  def BuildHooksEnv(self):
457
    """Build hooks env.
458

459
    """
460
    return {
461
      "OP_TARGET": self.cfg.GetClusterName(),
462
      "NEW_NAME": self.op.name,
463
      }
464

    
465
  def BuildHooksNodes(self):
466
    """Build hooks nodes.
467

468
    """
469
    return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
470

    
471
  def CheckPrereq(self):
472
    """Verify that the passed name is a valid one.
473

474
    """
475
    hostname = netutils.GetHostname(name=self.op.name,
476
                                    family=self.cfg.GetPrimaryIPFamily())
477

    
478
    new_name = hostname.name
479
    self.ip = new_ip = hostname.ip
480
    old_name = self.cfg.GetClusterName()
481
    old_ip = self.cfg.GetMasterIP()
482
    if new_name == old_name and new_ip == old_ip:
483
      raise errors.OpPrereqError("Neither the name nor the IP address of the"
484
                                 " cluster has changed",
485
                                 errors.ECODE_INVAL)
486
    if new_ip != old_ip:
487
      if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
488
        raise errors.OpPrereqError("The given cluster IP address (%s) is"
489
                                   " reachable on the network" %
490
                                   new_ip, errors.ECODE_NOTUNIQUE)
491

    
492
    self.op.name = new_name
493

    
494
  def Exec(self, feedback_fn):
495
    """Rename the cluster.
496

497
    """
498
    clustername = self.op.name
499
    new_ip = self.ip
500

    
501
    # shutdown the master IP
502
    master_params = self.cfg.GetMasterNetworkParameters()
503
    ems = self.cfg.GetUseExternalMipScript()
504
    result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
505
                                                     master_params, ems)
506
    result.Raise("Could not disable the master role")
507

    
508
    try:
509
      cluster = self.cfg.GetClusterInfo()
510
      cluster.cluster_name = clustername
511
      cluster.master_ip = new_ip
512
      self.cfg.Update(cluster, feedback_fn)
513

    
514
      # update the known hosts file
515
      ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
516
      node_list = self.cfg.GetOnlineNodeList()
517
      try:
518
        node_list.remove(master_params.uuid)
519
      except ValueError:
520
        pass
521
      UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
522
    finally:
523
      master_params.ip = new_ip
524
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
525
                                                     master_params, ems)
526
      result.Warn("Could not re-enable the master role on the master,"
527
                  " please restart manually", self.LogWarning)
528

    
529
    return clustername
530

    
531

    
532
class LUClusterRepairDiskSizes(NoHooksLU):
533
  """Verifies the cluster disks sizes.
534

535
  """
536
  REQ_BGL = False
537

    
538
  def ExpandNames(self):
539
    if self.op.instances:
540
      (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
541
      # Not getting the node allocation lock as only a specific set of
542
      # instances (and their nodes) is going to be acquired
543
      self.needed_locks = {
544
        locking.LEVEL_NODE_RES: [],
545
        locking.LEVEL_INSTANCE: self.wanted_names,
546
        }
547
      self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
548
    else:
549
      self.wanted_names = None
550
      self.needed_locks = {
551
        locking.LEVEL_NODE_RES: locking.ALL_SET,
552
        locking.LEVEL_INSTANCE: locking.ALL_SET,
553

    
554
        # This opcode is acquires the node locks for all instances
555
        locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
556
        }
557

    
558
    self.share_locks = {
559
      locking.LEVEL_NODE_RES: 1,
560
      locking.LEVEL_INSTANCE: 0,
561
      locking.LEVEL_NODE_ALLOC: 1,
562
      }
563

    
564
  def DeclareLocks(self, level):
565
    if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
566
      self._LockInstancesNodes(primary_only=True, level=level)
567

    
568
  def CheckPrereq(self):
569
    """Check prerequisites.
570

571
    This only checks the optional instance list against the existing names.
572

573
    """
574
    if self.wanted_names is None:
575
      self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
576

    
577
    self.wanted_instances = \
578
        map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
579

    
580
  def _EnsureChildSizes(self, disk):
581
    """Ensure children of the disk have the needed disk size.
582

583
    This is valid mainly for DRBD8 and fixes an issue where the
584
    children have smaller disk size.
585

586
    @param disk: an L{ganeti.objects.Disk} object
587

588
    """
589
    if disk.dev_type == constants.DT_DRBD8:
590
      assert disk.children, "Empty children for DRBD8?"
591
      fchild = disk.children[0]
592
      mismatch = fchild.size < disk.size
593
      if mismatch:
594
        self.LogInfo("Child disk has size %d, parent %d, fixing",
595
                     fchild.size, disk.size)
596
        fchild.size = disk.size
597

    
598
      # and we recurse on this child only, not on the metadev
599
      return self._EnsureChildSizes(fchild) or mismatch
600
    else:
601
      return False
602

    
603
  def Exec(self, feedback_fn):
604
    """Verify the size of cluster disks.
605

606
    """
607
    # TODO: check child disks too
608
    # TODO: check differences in size between primary/secondary nodes
609
    per_node_disks = {}
610
    for instance in self.wanted_instances:
611
      pnode = instance.primary_node
612
      if pnode not in per_node_disks:
613
        per_node_disks[pnode] = []
614
      for idx, disk in enumerate(instance.disks):
615
        per_node_disks[pnode].append((instance, idx, disk))
616

    
617
    assert not (frozenset(per_node_disks.keys()) -
618
                self.owned_locks(locking.LEVEL_NODE_RES)), \
619
      "Not owning correct locks"
620
    assert not self.owned_locks(locking.LEVEL_NODE)
621

    
622
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
623
                                               per_node_disks.keys())
624

    
625
    changed = []
626
    for node_uuid, dskl in per_node_disks.items():
627
      if not dskl:
628
        # no disks on the node
629
        continue
630

    
631
      newl = [([v[2].Copy()], v[0]) for v in dskl]
632
      node_name = self.cfg.GetNodeName(node_uuid)
633
      result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
634
      if result.fail_msg:
635
        self.LogWarning("Failure in blockdev_getdimensions call to node"
636
                        " %s, ignoring", node_name)
637
        continue
638
      if len(result.payload) != len(dskl):
639
        logging.warning("Invalid result from node %s: len(dksl)=%d,"
640
                        " result.payload=%s", node_name, len(dskl),
641
                        result.payload)
642
        self.LogWarning("Invalid result from node %s, ignoring node results",
643
                        node_name)
644
        continue
645
      for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
646
        if dimensions is None:
647
          self.LogWarning("Disk %d of instance %s did not return size"
648
                          " information, ignoring", idx, instance.name)
649
          continue
650
        if not isinstance(dimensions, (tuple, list)):
651
          self.LogWarning("Disk %d of instance %s did not return valid"
652
                          " dimension information, ignoring", idx,
653
                          instance.name)
654
          continue
655
        (size, spindles) = dimensions
656
        if not isinstance(size, (int, long)):
657
          self.LogWarning("Disk %d of instance %s did not return valid"
658
                          " size information, ignoring", idx, instance.name)
659
          continue
660
        size = size >> 20
661
        if size != disk.size:
662
          self.LogInfo("Disk %d of instance %s has mismatched size,"
663
                       " correcting: recorded %d, actual %d", idx,
664
                       instance.name, disk.size, size)
665
          disk.size = size
666
          self.cfg.Update(instance, feedback_fn)
667
          changed.append((instance.name, idx, "size", size))
668
        if es_flags[node_uuid]:
669
          if spindles is None:
670
            self.LogWarning("Disk %d of instance %s did not return valid"
671
                            " spindles information, ignoring", idx,
672
                            instance.name)
673
          elif disk.spindles is None or disk.spindles != spindles:
674
            self.LogInfo("Disk %d of instance %s has mismatched spindles,"
675
                         " correcting: recorded %s, actual %s",
676
                         idx, instance.name, disk.spindles, spindles)
677
            disk.spindles = spindles
678
            self.cfg.Update(instance, feedback_fn)
679
            changed.append((instance.name, idx, "spindles", disk.spindles))
680
        if self._EnsureChildSizes(disk):
681
          self.cfg.Update(instance, feedback_fn)
682
          changed.append((instance.name, idx, "size", disk.size))
683
    return changed
684

    
685

    
686
def _ValidateNetmask(cfg, netmask):
687
  """Checks if a netmask is valid.
688

689
  @type cfg: L{config.ConfigWriter}
690
  @param cfg: The cluster configuration
691
  @type netmask: int
692
  @param netmask: the netmask to be verified
693
  @raise errors.OpPrereqError: if the validation fails
694

695
  """
696
  ip_family = cfg.GetPrimaryIPFamily()
697
  try:
698
    ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
699
  except errors.ProgrammerError:
700
    raise errors.OpPrereqError("Invalid primary ip family: %s." %
701
                               ip_family, errors.ECODE_INVAL)
702
  if not ipcls.ValidateNetmask(netmask):
703
    raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
704
                               (netmask), errors.ECODE_INVAL)
705

    
706

    
707
def CheckFileBasedStoragePathVsEnabledDiskTemplates(
708
    logging_warn_fn, file_storage_dir, enabled_disk_templates,
709
    file_disk_template):
710
  """Checks whether the given file-based storage directory is acceptable.
711

712
  Note: This function is public, because it is also used in bootstrap.py.
713

714
  @type logging_warn_fn: function
715
  @param logging_warn_fn: function which accepts a string and logs it
716
  @type file_storage_dir: string
717
  @param file_storage_dir: the directory to be used for file-based instances
718
  @type enabled_disk_templates: list of string
719
  @param enabled_disk_templates: the list of enabled disk templates
720
  @type file_disk_template: string
721
  @param file_disk_template: the file-based disk template for which the
722
      path should be checked
723

724
  """
725
  assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
726
            constants.ST_FILE, constants.ST_SHARED_FILE
727
         ))
728
  file_storage_enabled = file_disk_template in enabled_disk_templates
729
  if file_storage_dir is not None:
730
    if file_storage_dir == "":
731
      if file_storage_enabled:
732
        raise errors.OpPrereqError(
733
            "Unsetting the '%s' storage directory while having '%s' storage"
734
            " enabled is not permitted." %
735
            (file_disk_template, file_disk_template))
736
    else:
737
      if not file_storage_enabled:
738
        logging_warn_fn(
739
            "Specified a %s storage directory, although %s storage is not"
740
            " enabled." % (file_disk_template, file_disk_template))
741
  else:
742
    raise errors.ProgrammerError("Received %s storage dir with value"
743
                                 " 'None'." % file_disk_template)
744

    
745

    
746
def CheckFileStoragePathVsEnabledDiskTemplates(
747
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
748
  """Checks whether the given file storage directory is acceptable.
749

750
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
751

752
  """
753
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
754
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
755
      constants.DT_FILE)
756

    
757

    
758
def CheckSharedFileStoragePathVsEnabledDiskTemplates(
759
    logging_warn_fn, file_storage_dir, enabled_disk_templates):
760
  """Checks whether the given shared file storage directory is acceptable.
761

762
  @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
763

764
  """
765
  CheckFileBasedStoragePathVsEnabledDiskTemplates(
766
      logging_warn_fn, file_storage_dir, enabled_disk_templates,
767
      constants.DT_SHARED_FILE)
768

    
769

    
770
class LUClusterSetParams(LogicalUnit):
771
  """Change the parameters of the cluster.
772

773
  """
774
  HPATH = "cluster-modify"
775
  HTYPE = constants.HTYPE_CLUSTER
776
  REQ_BGL = False
777

    
778
  def CheckArguments(self):
779
    """Check parameters
780

781
    """
782
    if self.op.uid_pool:
783
      uidpool.CheckUidPool(self.op.uid_pool)
784

    
785
    if self.op.add_uids:
786
      uidpool.CheckUidPool(self.op.add_uids)
787

    
788
    if self.op.remove_uids:
789
      uidpool.CheckUidPool(self.op.remove_uids)
790

    
791
    if self.op.master_netmask is not None:
792
      _ValidateNetmask(self.cfg, self.op.master_netmask)
793

    
794
    if self.op.diskparams:
795
      for dt_params in self.op.diskparams.values():
796
        utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
797
      try:
798
        utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
799
        CheckDiskAccessModeValidity(self.op.diskparams)
800
      except errors.OpPrereqError, err:
801
        raise errors.OpPrereqError("While verify diskparams options: %s" % err,
802
                                   errors.ECODE_INVAL)
803

    
804
  def ExpandNames(self):
805
    # FIXME: in the future maybe other cluster params won't require checking on
806
    # all nodes to be modified.
807
    # FIXME: This opcode changes cluster-wide settings. Is acquiring all
808
    # resource locks the right thing, shouldn't it be the BGL instead?
809
    self.needed_locks = {
810
      locking.LEVEL_NODE: locking.ALL_SET,
811
      locking.LEVEL_INSTANCE: locking.ALL_SET,
812
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
813
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
814
    }
815
    self.share_locks = ShareAll()
816

    
817
  def BuildHooksEnv(self):
818
    """Build hooks env.
819

820
    """
821
    return {
822
      "OP_TARGET": self.cfg.GetClusterName(),
823
      "NEW_VG_NAME": self.op.vg_name,
824
      }
825

    
826
  def BuildHooksNodes(self):
827
    """Build hooks nodes.
828

829
    """
830
    mn = self.cfg.GetMasterNode()
831
    return ([mn], [mn])
832

    
833
  def _CheckVgName(self, node_uuids, enabled_disk_templates,
834
                   new_enabled_disk_templates):
835
    """Check the consistency of the vg name on all nodes and in case it gets
836
       unset whether there are instances still using it.
837

838
    """
839
    lvm_is_enabled = utils.IsLvmEnabled(enabled_disk_templates)
840
    lvm_gets_enabled = utils.LvmGetsEnabled(enabled_disk_templates,
841
                                            new_enabled_disk_templates)
842
    current_vg_name = self.cfg.GetVGName()
843

    
844
    if self.op.vg_name == '':
845
      if lvm_is_enabled:
846
        raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
847
                                   " disk templates are or get enabled.")
848

    
849
    if self.op.vg_name is None:
850
      if current_vg_name is None and lvm_is_enabled:
851
        raise errors.OpPrereqError("Please specify a volume group when"
852
                                   " enabling lvm-based disk-templates.")
853

    
854
    if self.op.vg_name is not None and not self.op.vg_name:
855
      if self.cfg.HasAnyDiskOfType(constants.DT_PLAIN):
856
        raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
857
                                   " instances exist", errors.ECODE_INVAL)
858

    
859
    if (self.op.vg_name is not None and lvm_is_enabled) or \
860
        (self.cfg.GetVGName() is not None and lvm_gets_enabled):
861
      self._CheckVgNameOnNodes(node_uuids)
862

    
863
  def _CheckVgNameOnNodes(self, node_uuids):
864
    """Check the status of the volume group on each node.
865

866
    """
867
    vglist = self.rpc.call_vg_list(node_uuids)
868
    for node_uuid in node_uuids:
869
      msg = vglist[node_uuid].fail_msg
870
      if msg:
871
        # ignoring down node
872
        self.LogWarning("Error while gathering data on node %s"
873
                        " (ignoring node): %s",
874
                        self.cfg.GetNodeName(node_uuid), msg)
875
        continue
876
      vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
877
                                            self.op.vg_name,
878
                                            constants.MIN_VG_SIZE)
879
      if vgstatus:
880
        raise errors.OpPrereqError("Error on node '%s': %s" %
881
                                   (self.cfg.GetNodeName(node_uuid), vgstatus),
882
                                   errors.ECODE_ENVIRON)
883

    
884
  @staticmethod
885
  def _GetDiskTemplateSetsInner(op_enabled_disk_templates,
886
                                old_enabled_disk_templates):
887
    """Computes three sets of disk templates.
888

889
    @see: C{_GetDiskTemplateSets} for more details.
890

891
    """
892
    enabled_disk_templates = None
893
    new_enabled_disk_templates = []
894
    disabled_disk_templates = []
895
    if op_enabled_disk_templates:
896
      enabled_disk_templates = op_enabled_disk_templates
897
      new_enabled_disk_templates = \
898
        list(set(enabled_disk_templates)
899
             - set(old_enabled_disk_templates))
900
      disabled_disk_templates = \
901
        list(set(old_enabled_disk_templates)
902
             - set(enabled_disk_templates))
903
    else:
904
      enabled_disk_templates = old_enabled_disk_templates
905
    return (enabled_disk_templates, new_enabled_disk_templates,
906
            disabled_disk_templates)
907

    
908
  def _GetDiskTemplateSets(self, cluster):
909
    """Computes three sets of disk templates.
910

911
    The three sets are:
912
      - disk templates that will be enabled after this operation (no matter if
913
        they were enabled before or not)
914
      - disk templates that get enabled by this operation (thus haven't been
915
        enabled before.)
916
      - disk templates that get disabled by this operation
917

918
    """
919
    return self._GetDiskTemplateSetsInner(self.op.enabled_disk_templates,
920
                                          cluster.enabled_disk_templates)
921

    
922
  def _CheckIpolicy(self, cluster, enabled_disk_templates):
923
    """Checks the ipolicy.
924

925
    @type cluster: C{objects.Cluster}
926
    @param cluster: the cluster's configuration
927
    @type enabled_disk_templates: list of string
928
    @param enabled_disk_templates: list of (possibly newly) enabled disk
929
      templates
930

931
    """
932
    # FIXME: write unit tests for this
933
    if self.op.ipolicy:
934
      self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
935
                                           group_policy=False)
936

    
937
      CheckIpolicyVsDiskTemplates(self.new_ipolicy,
938
                                  enabled_disk_templates)
939

    
940
      all_instances = self.cfg.GetAllInstancesInfo().values()
941
      violations = set()
942
      for group in self.cfg.GetAllNodeGroupsInfo().values():
943
        instances = frozenset([inst for inst in all_instances
944
                               if compat.any(nuuid in group.members
945
                                             for nuuid in inst.all_nodes)])
946
        new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
947
        ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
948
        new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
949
                                           self.cfg)
950
        if new:
951
          violations.update(new)
952

    
953
      if violations:
954
        self.LogWarning("After the ipolicy change the following instances"
955
                        " violate them: %s",
956
                        utils.CommaJoin(utils.NiceSort(violations)))
957
    else:
958
      CheckIpolicyVsDiskTemplates(cluster.ipolicy,
959
                                  enabled_disk_templates)
960

    
961
  def _CheckDrbdHelperOnNodes(self, drbd_helper, node_uuids):
962
    """Checks whether the set DRBD helper actually exists on the nodes.
963

964
    @type drbd_helper: string
965
    @param drbd_helper: path of the drbd usermode helper binary
966
    @type node_uuids: list of strings
967
    @param node_uuids: list of node UUIDs to check for the helper
968

969
    """
970
    # checks given drbd helper on all nodes
971
    helpers = self.rpc.call_drbd_helper(node_uuids)
972
    for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
973
      if ninfo.offline:
974
        self.LogInfo("Not checking drbd helper on offline node %s",
975
                     ninfo.name)
976
        continue
977
      msg = helpers[ninfo.uuid].fail_msg
978
      if msg:
979
        raise errors.OpPrereqError("Error checking drbd helper on node"
980
                                   " '%s': %s" % (ninfo.name, msg),
981
                                   errors.ECODE_ENVIRON)
982
      node_helper = helpers[ninfo.uuid].payload
983
      if node_helper != drbd_helper:
984
        raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
985
                                   (ninfo.name, node_helper),
986
                                   errors.ECODE_ENVIRON)
987

    
988
  def _CheckDrbdHelper(self, node_uuids, drbd_enabled, drbd_gets_enabled):
989
    """Check the DRBD usermode helper.
990

991
    @type node_uuids: list of strings
992
    @param node_uuids: a list of nodes' UUIDs
993
    @type drbd_enabled: boolean
994
    @param drbd_enabled: whether DRBD will be enabled after this operation
995
      (no matter if it was disabled before or not)
996
    @type drbd_gets_enabled: boolen
997
    @param drbd_gets_enabled: true if DRBD was disabled before this
998
      operation, but will be enabled afterwards
999

1000
    """
1001
    if self.op.drbd_helper == '':
1002
      if drbd_enabled:
1003
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1004
                                   " DRBD is enabled.")
1005
      if self.cfg.HasAnyDiskOfType(constants.DT_DRBD8):
1006
        raise errors.OpPrereqError("Cannot disable drbd helper while"
1007
                                   " drbd-based instances exist",
1008
                                   errors.ECODE_INVAL)
1009

    
1010
    else:
1011
      if self.op.drbd_helper is not None and drbd_enabled:
1012
        self._CheckDrbdHelperOnNodes(self.op.drbd_helper, node_uuids)
1013
      else:
1014
        if drbd_gets_enabled:
1015
          current_drbd_helper = self.cfg.GetClusterInfo().drbd_usermode_helper
1016
          if current_drbd_helper is not None:
1017
            self._CheckDrbdHelperOnNodes(current_drbd_helper, node_uuids)
1018
          else:
1019
            raise errors.OpPrereqError("Cannot enable DRBD without a"
1020
                                       " DRBD usermode helper set.")
1021

    
1022
  def _CheckInstancesOfDisabledDiskTemplates(
1023
      self, disabled_disk_templates):
1024
    """Check whether we try to disable a disk template that is in use.
1025

1026
    @type disabled_disk_templates: list of string
1027
    @param disabled_disk_templates: list of disk templates that are going to
1028
      be disabled by this operation
1029

1030
    """
1031
    for disk_template in disabled_disk_templates:
1032
      if self.cfg.HasAnyDiskOfType(disk_template):
1033
        raise errors.OpPrereqError(
1034
            "Cannot disable disk template '%s', because there is at least one"
1035
            " instance using it." % disk_template)
1036

    
1037
  def CheckPrereq(self):
1038
    """Check prerequisites.
1039

1040
    This checks whether the given params don't conflict and
1041
    if the given volume group is valid.
1042

1043
    """
1044
    node_uuids = self.owned_locks(locking.LEVEL_NODE)
1045
    self.cluster = cluster = self.cfg.GetClusterInfo()
1046

    
1047
    vm_capable_node_uuids = [node.uuid
1048
                             for node in self.cfg.GetAllNodesInfo().values()
1049
                             if node.uuid in node_uuids and node.vm_capable]
1050

    
1051
    (enabled_disk_templates, new_enabled_disk_templates,
1052
      disabled_disk_templates) = self._GetDiskTemplateSets(cluster)
1053
    self._CheckInstancesOfDisabledDiskTemplates(disabled_disk_templates)
1054

    
1055
    self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
1056
                      new_enabled_disk_templates)
1057

    
1058
    if self.op.file_storage_dir is not None:
1059
      CheckFileStoragePathVsEnabledDiskTemplates(
1060
          self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
1061

    
1062
    if self.op.shared_file_storage_dir is not None:
1063
      CheckSharedFileStoragePathVsEnabledDiskTemplates(
1064
          self.LogWarning, self.op.shared_file_storage_dir,
1065
          enabled_disk_templates)
1066

    
1067
    drbd_enabled = constants.DT_DRBD8 in enabled_disk_templates
1068
    drbd_gets_enabled = constants.DT_DRBD8 in new_enabled_disk_templates
1069
    self._CheckDrbdHelper(node_uuids, drbd_enabled, drbd_gets_enabled)
1070

    
1071
    # validate params changes
1072
    if self.op.beparams:
1073
      objects.UpgradeBeParams(self.op.beparams)
1074
      utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
1075
      self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
1076

    
1077
    if self.op.ndparams:
1078
      utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
1079
      self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
1080

    
1081
      # TODO: we need a more general way to handle resetting
1082
      # cluster-level parameters to default values
1083
      if self.new_ndparams["oob_program"] == "":
1084
        self.new_ndparams["oob_program"] = \
1085
            constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
1086

    
1087
    if self.op.hv_state:
1088
      new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
1089
                                           self.cluster.hv_state_static)
1090
      self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
1091
                               for hv, values in new_hv_state.items())
1092

    
1093
    if self.op.disk_state:
1094
      new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
1095
                                               self.cluster.disk_state_static)
1096
      self.new_disk_state = \
1097
        dict((storage, dict((name, cluster.SimpleFillDiskState(values))
1098
                            for name, values in svalues.items()))
1099
             for storage, svalues in new_disk_state.items())
1100

    
1101
    self._CheckIpolicy(cluster, enabled_disk_templates)
1102

    
1103
    if self.op.nicparams:
1104
      utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
1105
      self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
1106
      objects.NIC.CheckParameterSyntax(self.new_nicparams)
1107
      nic_errors = []
1108

    
1109
      # check all instances for consistency
1110
      for instance in self.cfg.GetAllInstancesInfo().values():
1111
        for nic_idx, nic in enumerate(instance.nics):
1112
          params_copy = copy.deepcopy(nic.nicparams)
1113
          params_filled = objects.FillDict(self.new_nicparams, params_copy)
1114

    
1115
          # check parameter syntax
1116
          try:
1117
            objects.NIC.CheckParameterSyntax(params_filled)
1118
          except errors.ConfigurationError, err:
1119
            nic_errors.append("Instance %s, nic/%d: %s" %
1120
                              (instance.name, nic_idx, err))
1121

    
1122
          # if we're moving instances to routed, check that they have an ip
1123
          target_mode = params_filled[constants.NIC_MODE]
1124
          if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
1125
            nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
1126
                              " address" % (instance.name, nic_idx))
1127
      if nic_errors:
1128
        raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
1129
                                   "\n".join(nic_errors), errors.ECODE_INVAL)
1130

    
1131
    # hypervisor list/parameters
1132
    self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
1133
    if self.op.hvparams:
1134
      for hv_name, hv_dict in self.op.hvparams.items():
1135
        if hv_name not in self.new_hvparams:
1136
          self.new_hvparams[hv_name] = hv_dict
1137
        else:
1138
          self.new_hvparams[hv_name].update(hv_dict)
1139

    
1140
    # disk template parameters
1141
    self.new_diskparams = objects.FillDict(cluster.diskparams, {})
1142
    if self.op.diskparams:
1143
      for dt_name, dt_params in self.op.diskparams.items():
1144
        if dt_name not in self.new_diskparams:
1145
          self.new_diskparams[dt_name] = dt_params
1146
        else:
1147
          self.new_diskparams[dt_name].update(dt_params)
1148
      CheckDiskAccessModeConsistency(self.op.diskparams, self.cfg)
1149

    
1150
    # os hypervisor parameters
1151
    self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
1152
    if self.op.os_hvp:
1153
      for os_name, hvs in self.op.os_hvp.items():
1154
        if os_name not in self.new_os_hvp:
1155
          self.new_os_hvp[os_name] = hvs
1156
        else:
1157
          for hv_name, hv_dict in hvs.items():
1158
            if hv_dict is None:
1159
              # Delete if it exists
1160
              self.new_os_hvp[os_name].pop(hv_name, None)
1161
            elif hv_name not in self.new_os_hvp[os_name]:
1162
              self.new_os_hvp[os_name][hv_name] = hv_dict
1163
            else:
1164
              self.new_os_hvp[os_name][hv_name].update(hv_dict)
1165

    
1166
    # os parameters
1167
    self.new_osp = objects.FillDict(cluster.osparams, {})
1168
    if self.op.osparams:
1169
      for os_name, osp in self.op.osparams.items():
1170
        if os_name not in self.new_osp:
1171
          self.new_osp[os_name] = {}
1172

    
1173
        self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
1174
                                                 use_none=True)
1175

    
1176
        if not self.new_osp[os_name]:
1177
          # we removed all parameters
1178
          del self.new_osp[os_name]
1179
        else:
1180
          # check the parameter validity (remote check)
1181
          CheckOSParams(self, False, [self.cfg.GetMasterNode()],
1182
                        os_name, self.new_osp[os_name])
1183

    
1184
    # changes to the hypervisor list
1185
    if self.op.enabled_hypervisors is not None:
1186
      self.hv_list = self.op.enabled_hypervisors
1187
      for hv in self.hv_list:
1188
        # if the hypervisor doesn't already exist in the cluster
1189
        # hvparams, we initialize it to empty, and then (in both
1190
        # cases) we make sure to fill the defaults, as we might not
1191
        # have a complete defaults list if the hypervisor wasn't
1192
        # enabled before
1193
        if hv not in new_hvp:
1194
          new_hvp[hv] = {}
1195
        new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1196
        utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1197
    else:
1198
      self.hv_list = cluster.enabled_hypervisors
1199

    
1200
    if self.op.hvparams or self.op.enabled_hypervisors is not None:
1201
      # either the enabled list has changed, or the parameters have, validate
1202
      for hv_name, hv_params in self.new_hvparams.items():
1203
        if ((self.op.hvparams and hv_name in self.op.hvparams) or
1204
            (self.op.enabled_hypervisors and
1205
             hv_name in self.op.enabled_hypervisors)):
1206
          # either this is a new hypervisor, or its parameters have changed
1207
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1208
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1209
          hv_class.CheckParameterSyntax(hv_params)
1210
          CheckHVParams(self, node_uuids, hv_name, hv_params)
1211

    
1212
    self._CheckDiskTemplateConsistency()
1213

    
1214
    if self.op.os_hvp:
1215
      # no need to check any newly-enabled hypervisors, since the
1216
      # defaults have already been checked in the above code-block
1217
      for os_name, os_hvp in self.new_os_hvp.items():
1218
        for hv_name, hv_params in os_hvp.items():
1219
          utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1220
          # we need to fill in the new os_hvp on top of the actual hv_p
1221
          cluster_defaults = self.new_hvparams.get(hv_name, {})
1222
          new_osp = objects.FillDict(cluster_defaults, hv_params)
1223
          hv_class = hypervisor.GetHypervisorClass(hv_name)
1224
          hv_class.CheckParameterSyntax(new_osp)
1225
          CheckHVParams(self, node_uuids, hv_name, new_osp)
1226

    
1227
    if self.op.default_iallocator:
1228
      alloc_script = utils.FindFile(self.op.default_iallocator,
1229
                                    constants.IALLOCATOR_SEARCH_PATH,
1230
                                    os.path.isfile)
1231
      if alloc_script is None:
1232
        raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1233
                                   " specified" % self.op.default_iallocator,
1234
                                   errors.ECODE_INVAL)
1235

    
1236
  def _CheckDiskTemplateConsistency(self):
1237
    """Check whether the disk templates that are going to be disabled
1238
       are still in use by some instances.
1239

1240
    """
1241
    if self.op.enabled_disk_templates:
1242
      cluster = self.cfg.GetClusterInfo()
1243
      instances = self.cfg.GetAllInstancesInfo()
1244

    
1245
      disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1246
        - set(self.op.enabled_disk_templates)
1247
      for instance in instances.itervalues():
1248
        if instance.disk_template in disk_templates_to_remove:
1249
          raise errors.OpPrereqError("Cannot disable disk template '%s',"
1250
                                     " because instance '%s' is using it." %
1251
                                     (instance.disk_template, instance.name))
1252

    
1253
  def _SetVgName(self, feedback_fn):
1254
    """Determines and sets the new volume group name.
1255

1256
    """
1257
    if self.op.vg_name is not None:
1258
      new_volume = self.op.vg_name
1259
      if not new_volume:
1260
        new_volume = None
1261
      if new_volume != self.cfg.GetVGName():
1262
        self.cfg.SetVGName(new_volume)
1263
      else:
1264
        feedback_fn("Cluster LVM configuration already in desired"
1265
                    " state, not changing")
1266

    
1267
  def _SetFileStorageDir(self, feedback_fn):
1268
    """Set the file storage directory.
1269

1270
    """
1271
    if self.op.file_storage_dir is not None:
1272
      if self.cluster.file_storage_dir == self.op.file_storage_dir:
1273
        feedback_fn("Global file storage dir already set to value '%s'"
1274
                    % self.cluster.file_storage_dir)
1275
      else:
1276
        self.cluster.file_storage_dir = self.op.file_storage_dir
1277

    
1278
  def _SetDrbdHelper(self, feedback_fn):
1279
    """Set the DRBD usermode helper.
1280

1281
    """
1282
    if self.op.drbd_helper is not None:
1283
      if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1284
        feedback_fn("Note that you specified a drbd user helper, but did not"
1285
                    " enable the drbd disk template.")
1286
      new_helper = self.op.drbd_helper
1287
      if not new_helper:
1288
        new_helper = None
1289
      if new_helper != self.cfg.GetDRBDHelper():
1290
        self.cfg.SetDRBDHelper(new_helper)
1291
      else:
1292
        feedback_fn("Cluster DRBD helper already in desired state,"
1293
                    " not changing")
1294

    
1295
  def Exec(self, feedback_fn):
1296
    """Change the parameters of the cluster.
1297

1298
    """
1299
    if self.op.enabled_disk_templates:
1300
      self.cluster.enabled_disk_templates = \
1301
        list(self.op.enabled_disk_templates)
1302

    
1303
    self._SetVgName(feedback_fn)
1304
    self._SetFileStorageDir(feedback_fn)
1305
    self._SetDrbdHelper(feedback_fn)
1306

    
1307
    if self.op.hvparams:
1308
      self.cluster.hvparams = self.new_hvparams
1309
    if self.op.os_hvp:
1310
      self.cluster.os_hvp = self.new_os_hvp
1311
    if self.op.enabled_hypervisors is not None:
1312
      self.cluster.hvparams = self.new_hvparams
1313
      self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1314
    if self.op.beparams:
1315
      self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1316
    if self.op.nicparams:
1317
      self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1318
    if self.op.ipolicy:
1319
      self.cluster.ipolicy = self.new_ipolicy
1320
    if self.op.osparams:
1321
      self.cluster.osparams = self.new_osp
1322
    if self.op.ndparams:
1323
      self.cluster.ndparams = self.new_ndparams
1324
    if self.op.diskparams:
1325
      self.cluster.diskparams = self.new_diskparams
1326
    if self.op.hv_state:
1327
      self.cluster.hv_state_static = self.new_hv_state
1328
    if self.op.disk_state:
1329
      self.cluster.disk_state_static = self.new_disk_state
1330

    
1331
    if self.op.candidate_pool_size is not None:
1332
      self.cluster.candidate_pool_size = self.op.candidate_pool_size
1333
      # we need to update the pool size here, otherwise the save will fail
1334
      AdjustCandidatePool(self, [])
1335

    
1336
    if self.op.maintain_node_health is not None:
1337
      if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1338
        feedback_fn("Note: CONFD was disabled at build time, node health"
1339
                    " maintenance is not useful (still enabling it)")
1340
      self.cluster.maintain_node_health = self.op.maintain_node_health
1341

    
1342
    if self.op.modify_etc_hosts is not None:
1343
      self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1344

    
1345
    if self.op.prealloc_wipe_disks is not None:
1346
      self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1347

    
1348
    if self.op.add_uids is not None:
1349
      uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1350

    
1351
    if self.op.remove_uids is not None:
1352
      uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1353

    
1354
    if self.op.uid_pool is not None:
1355
      self.cluster.uid_pool = self.op.uid_pool
1356

    
1357
    if self.op.default_iallocator is not None:
1358
      self.cluster.default_iallocator = self.op.default_iallocator
1359

    
1360
    if self.op.default_iallocator_params is not None:
1361
      self.cluster.default_iallocator_params = self.op.default_iallocator_params
1362

    
1363
    if self.op.reserved_lvs is not None:
1364
      self.cluster.reserved_lvs = self.op.reserved_lvs
1365

    
1366
    if self.op.use_external_mip_script is not None:
1367
      self.cluster.use_external_mip_script = self.op.use_external_mip_script
1368

    
1369
    def helper_os(aname, mods, desc):
1370
      desc += " OS list"
1371
      lst = getattr(self.cluster, aname)
1372
      for key, val in mods:
1373
        if key == constants.DDM_ADD:
1374
          if val in lst:
1375
            feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1376
          else:
1377
            lst.append(val)
1378
        elif key == constants.DDM_REMOVE:
1379
          if val in lst:
1380
            lst.remove(val)
1381
          else:
1382
            feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1383
        else:
1384
          raise errors.ProgrammerError("Invalid modification '%s'" % key)
1385

    
1386
    if self.op.hidden_os:
1387
      helper_os("hidden_os", self.op.hidden_os, "hidden")
1388

    
1389
    if self.op.blacklisted_os:
1390
      helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1391

    
1392
    if self.op.master_netdev:
1393
      master_params = self.cfg.GetMasterNetworkParameters()
1394
      ems = self.cfg.GetUseExternalMipScript()
1395
      feedback_fn("Shutting down master ip on the current netdev (%s)" %
1396
                  self.cluster.master_netdev)
1397
      result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1398
                                                       master_params, ems)
1399
      if not self.op.force:
1400
        result.Raise("Could not disable the master ip")
1401
      else:
1402
        if result.fail_msg:
1403
          msg = ("Could not disable the master ip (continuing anyway): %s" %
1404
                 result.fail_msg)
1405
          feedback_fn(msg)
1406
      feedback_fn("Changing master_netdev from %s to %s" %
1407
                  (master_params.netdev, self.op.master_netdev))
1408
      self.cluster.master_netdev = self.op.master_netdev
1409

    
1410
    if self.op.master_netmask:
1411
      master_params = self.cfg.GetMasterNetworkParameters()
1412
      feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1413
      result = self.rpc.call_node_change_master_netmask(
1414
                 master_params.uuid, master_params.netmask,
1415
                 self.op.master_netmask, master_params.ip,
1416
                 master_params.netdev)
1417
      result.Warn("Could not change the master IP netmask", feedback_fn)
1418
      self.cluster.master_netmask = self.op.master_netmask
1419

    
1420
    self.cfg.Update(self.cluster, feedback_fn)
1421

    
1422
    if self.op.master_netdev:
1423
      master_params = self.cfg.GetMasterNetworkParameters()
1424
      feedback_fn("Starting the master ip on the new master netdev (%s)" %
1425
                  self.op.master_netdev)
1426
      ems = self.cfg.GetUseExternalMipScript()
1427
      result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1428
                                                     master_params, ems)
1429
      result.Warn("Could not re-enable the master ip on the master,"
1430
                  " please restart manually", self.LogWarning)
1431

    
1432

    
1433
class LUClusterVerify(NoHooksLU):
1434
  """Submits all jobs necessary to verify the cluster.
1435

1436
  """
1437
  REQ_BGL = False
1438

    
1439
  def ExpandNames(self):
1440
    self.needed_locks = {}
1441

    
1442
  def Exec(self, feedback_fn):
1443
    jobs = []
1444

    
1445
    if self.op.group_name:
1446
      groups = [self.op.group_name]
1447
      depends_fn = lambda: None
1448
    else:
1449
      groups = self.cfg.GetNodeGroupList()
1450

    
1451
      # Verify global configuration
1452
      jobs.append([
1453
        opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1454
        ])
1455

    
1456
      # Always depend on global verification
1457
      depends_fn = lambda: [(-len(jobs), [])]
1458

    
1459
    jobs.extend(
1460
      [opcodes.OpClusterVerifyGroup(group_name=group,
1461
                                    ignore_errors=self.op.ignore_errors,
1462
                                    depends=depends_fn())]
1463
      for group in groups)
1464

    
1465
    # Fix up all parameters
1466
    for op in itertools.chain(*jobs): # pylint: disable=W0142
1467
      op.debug_simulate_errors = self.op.debug_simulate_errors
1468
      op.verbose = self.op.verbose
1469
      op.error_codes = self.op.error_codes
1470
      try:
1471
        op.skip_checks = self.op.skip_checks
1472
      except AttributeError:
1473
        assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1474

    
1475
    return ResultWithJobs(jobs)
1476

    
1477

    
1478
class _VerifyErrors(object):
1479
  """Mix-in for cluster/group verify LUs.
1480

1481
  It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1482
  self.op and self._feedback_fn to be available.)
1483

1484
  """
1485

    
1486
  ETYPE_FIELD = "code"
1487
  ETYPE_ERROR = "ERROR"
1488
  ETYPE_WARNING = "WARNING"
1489

    
1490
  def _Error(self, ecode, item, msg, *args, **kwargs):
1491
    """Format an error message.
1492

1493
    Based on the opcode's error_codes parameter, either format a
1494
    parseable error code, or a simpler error string.
1495

1496
    This must be called only from Exec and functions called from Exec.
1497

1498
    """
1499
    ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1500
    itype, etxt, _ = ecode
1501
    # If the error code is in the list of ignored errors, demote the error to a
1502
    # warning
1503
    if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1504
      ltype = self.ETYPE_WARNING
1505
    # first complete the msg
1506
    if args:
1507
      msg = msg % args
1508
    # then format the whole message
1509
    if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1510
      msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1511
    else:
1512
      if item:
1513
        item = " " + item
1514
      else:
1515
        item = ""
1516
      msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1517
    # and finally report it via the feedback_fn
1518
    self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1519
    # do not mark the operation as failed for WARN cases only
1520
    if ltype == self.ETYPE_ERROR:
1521
      self.bad = True
1522

    
1523
  def _ErrorIf(self, cond, *args, **kwargs):
1524
    """Log an error message if the passed condition is True.
1525

1526
    """
1527
    if (bool(cond)
1528
        or self.op.debug_simulate_errors): # pylint: disable=E1101
1529
      self._Error(*args, **kwargs)
1530

    
1531

    
1532
def _VerifyCertificate(filename):
1533
  """Verifies a certificate for L{LUClusterVerifyConfig}.
1534

1535
  @type filename: string
1536
  @param filename: Path to PEM file
1537

1538
  """
1539
  try:
1540
    cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1541
                                           utils.ReadFile(filename))
1542
  except Exception, err: # pylint: disable=W0703
1543
    return (LUClusterVerifyConfig.ETYPE_ERROR,
1544
            "Failed to load X509 certificate %s: %s" % (filename, err))
1545

    
1546
  (errcode, msg) = \
1547
    utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1548
                                constants.SSL_CERT_EXPIRATION_ERROR)
1549

    
1550
  if msg:
1551
    fnamemsg = "While verifying %s: %s" % (filename, msg)
1552
  else:
1553
    fnamemsg = None
1554

    
1555
  if errcode is None:
1556
    return (None, fnamemsg)
1557
  elif errcode == utils.CERT_WARNING:
1558
    return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1559
  elif errcode == utils.CERT_ERROR:
1560
    return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1561

    
1562
  raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1563

    
1564

    
1565
def _GetAllHypervisorParameters(cluster, instances):
1566
  """Compute the set of all hypervisor parameters.
1567

1568
  @type cluster: L{objects.Cluster}
1569
  @param cluster: the cluster object
1570
  @param instances: list of L{objects.Instance}
1571
  @param instances: additional instances from which to obtain parameters
1572
  @rtype: list of (origin, hypervisor, parameters)
1573
  @return: a list with all parameters found, indicating the hypervisor they
1574
       apply to, and the origin (can be "cluster", "os X", or "instance Y")
1575

1576
  """
1577
  hvp_data = []
1578

    
1579
  for hv_name in cluster.enabled_hypervisors:
1580
    hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1581

    
1582
  for os_name, os_hvp in cluster.os_hvp.items():
1583
    for hv_name, hv_params in os_hvp.items():
1584
      if hv_params:
1585
        full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1586
        hvp_data.append(("os %s" % os_name, hv_name, full_params))
1587

    
1588
  # TODO: collapse identical parameter values in a single one
1589
  for instance in instances:
1590
    if instance.hvparams:
1591
      hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1592
                       cluster.FillHV(instance)))
1593

    
1594
  return hvp_data
1595

    
1596

    
1597
class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1598
  """Verifies the cluster config.
1599

1600
  """
1601
  REQ_BGL = False
1602

    
1603
  def _VerifyHVP(self, hvp_data):
1604
    """Verifies locally the syntax of the hypervisor parameters.
1605

1606
    """
1607
    for item, hv_name, hv_params in hvp_data:
1608
      msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1609
             (item, hv_name))
1610
      try:
1611
        hv_class = hypervisor.GetHypervisorClass(hv_name)
1612
        utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1613
        hv_class.CheckParameterSyntax(hv_params)
1614
      except errors.GenericError, err:
1615
        self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1616

    
1617
  def ExpandNames(self):
1618
    self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1619
    self.share_locks = ShareAll()
1620

    
1621
  def CheckPrereq(self):
1622
    """Check prerequisites.
1623

1624
    """
1625
    # Retrieve all information
1626
    self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1627
    self.all_node_info = self.cfg.GetAllNodesInfo()
1628
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1629

    
1630
  def Exec(self, feedback_fn):
1631
    """Verify integrity of cluster, performing various test on nodes.
1632

1633
    """
1634
    self.bad = False
1635
    self._feedback_fn = feedback_fn
1636

    
1637
    feedback_fn("* Verifying cluster config")
1638

    
1639
    for msg in self.cfg.VerifyConfig():
1640
      self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1641

    
1642
    feedback_fn("* Verifying cluster certificate files")
1643

    
1644
    for cert_filename in pathutils.ALL_CERT_FILES:
1645
      (errcode, msg) = _VerifyCertificate(cert_filename)
1646
      self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1647

    
1648
    self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1649
                                    pathutils.NODED_CERT_FILE),
1650
                  constants.CV_ECLUSTERCERT,
1651
                  None,
1652
                  pathutils.NODED_CERT_FILE + " must be accessible by the " +
1653
                    constants.LUXID_USER + " user")
1654

    
1655
    feedback_fn("* Verifying hypervisor parameters")
1656

    
1657
    self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1658
                                                self.all_inst_info.values()))
1659

    
1660
    feedback_fn("* Verifying all nodes belong to an existing group")
1661

    
1662
    # We do this verification here because, should this bogus circumstance
1663
    # occur, it would never be caught by VerifyGroup, which only acts on
1664
    # nodes/instances reachable from existing node groups.
1665

    
1666
    dangling_nodes = set(node for node in self.all_node_info.values()
1667
                         if node.group not in self.all_group_info)
1668

    
1669
    dangling_instances = {}
1670
    no_node_instances = []
1671

    
1672
    for inst in self.all_inst_info.values():
1673
      if inst.primary_node in [node.uuid for node in dangling_nodes]:
1674
        dangling_instances.setdefault(inst.primary_node, []).append(inst)
1675
      elif inst.primary_node not in self.all_node_info:
1676
        no_node_instances.append(inst)
1677

    
1678
    pretty_dangling = [
1679
        "%s (%s)" %
1680
        (node.name,
1681
         utils.CommaJoin(inst.name for
1682
                         inst in dangling_instances.get(node.uuid, [])))
1683
        for node in dangling_nodes]
1684

    
1685
    self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1686
                  None,
1687
                  "the following nodes (and their instances) belong to a non"
1688
                  " existing group: %s", utils.CommaJoin(pretty_dangling))
1689

    
1690
    self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1691
                  None,
1692
                  "the following instances have a non-existing primary-node:"
1693
                  " %s", utils.CommaJoin(inst.name for
1694
                                         inst in no_node_instances))
1695

    
1696
    return not self.bad
1697

    
1698

    
1699
class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1700
  """Verifies the status of a node group.
1701

1702
  """
1703
  HPATH = "cluster-verify"
1704
  HTYPE = constants.HTYPE_CLUSTER
1705
  REQ_BGL = False
1706

    
1707
  _HOOKS_INDENT_RE = re.compile("^", re.M)
1708

    
1709
  class NodeImage(object):
1710
    """A class representing the logical and physical status of a node.
1711

1712
    @type uuid: string
1713
    @ivar uuid: the node UUID to which this object refers
1714
    @ivar volumes: a structure as returned from
1715
        L{ganeti.backend.GetVolumeList} (runtime)
1716
    @ivar instances: a list of running instances (runtime)
1717
    @ivar pinst: list of configured primary instances (config)
1718
    @ivar sinst: list of configured secondary instances (config)
1719
    @ivar sbp: dictionary of {primary-node: list of instances} for all
1720
        instances for which this node is secondary (config)
1721
    @ivar mfree: free memory, as reported by hypervisor (runtime)
1722
    @ivar dfree: free disk, as reported by the node (runtime)
1723
    @ivar offline: the offline status (config)
1724
    @type rpc_fail: boolean
1725
    @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1726
        not whether the individual keys were correct) (runtime)
1727
    @type lvm_fail: boolean
1728
    @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1729
    @type hyp_fail: boolean
1730
    @ivar hyp_fail: whether the RPC call didn't return the instance list
1731
    @type ghost: boolean
1732
    @ivar ghost: whether this is a known node or not (config)
1733
    @type os_fail: boolean
1734
    @ivar os_fail: whether the RPC call didn't return valid OS data
1735
    @type oslist: list
1736
    @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1737
    @type vm_capable: boolean
1738
    @ivar vm_capable: whether the node can host instances
1739
    @type pv_min: float
1740
    @ivar pv_min: size in MiB of the smallest PVs
1741
    @type pv_max: float
1742
    @ivar pv_max: size in MiB of the biggest PVs
1743

1744
    """
1745
    def __init__(self, offline=False, uuid=None, vm_capable=True):
1746
      self.uuid = uuid
1747
      self.volumes = {}
1748
      self.instances = []
1749
      self.pinst = []
1750
      self.sinst = []
1751
      self.sbp = {}
1752
      self.mfree = 0
1753
      self.dfree = 0
1754
      self.offline = offline
1755
      self.vm_capable = vm_capable
1756
      self.rpc_fail = False
1757
      self.lvm_fail = False
1758
      self.hyp_fail = False
1759
      self.ghost = False
1760
      self.os_fail = False
1761
      self.oslist = {}
1762
      self.pv_min = None
1763
      self.pv_max = None
1764

    
1765
  def ExpandNames(self):
1766
    # This raises errors.OpPrereqError on its own:
1767
    self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1768

    
1769
    # Get instances in node group; this is unsafe and needs verification later
1770
    inst_uuids = \
1771
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1772

    
1773
    self.needed_locks = {
1774
      locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1775
      locking.LEVEL_NODEGROUP: [self.group_uuid],
1776
      locking.LEVEL_NODE: [],
1777

    
1778
      # This opcode is run by watcher every five minutes and acquires all nodes
1779
      # for a group. It doesn't run for a long time, so it's better to acquire
1780
      # the node allocation lock as well.
1781
      locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1782
      }
1783

    
1784
    self.share_locks = ShareAll()
1785

    
1786
  def DeclareLocks(self, level):
1787
    if level == locking.LEVEL_NODE:
1788
      # Get members of node group; this is unsafe and needs verification later
1789
      nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1790

    
1791
      # In Exec(), we warn about mirrored instances that have primary and
1792
      # secondary living in separate node groups. To fully verify that
1793
      # volumes for these instances are healthy, we will need to do an
1794
      # extra call to their secondaries. We ensure here those nodes will
1795
      # be locked.
1796
      for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1797
        # Important: access only the instances whose lock is owned
1798
        instance = self.cfg.GetInstanceInfoByName(inst_name)
1799
        if instance.disk_template in constants.DTS_INT_MIRROR:
1800
          nodes.update(instance.secondary_nodes)
1801

    
1802
      self.needed_locks[locking.LEVEL_NODE] = nodes
1803

    
1804
  def CheckPrereq(self):
1805
    assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1806
    self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1807

    
1808
    group_node_uuids = set(self.group_info.members)
1809
    group_inst_uuids = \
1810
      self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1811

    
1812
    unlocked_node_uuids = \
1813
        group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1814

    
1815
    unlocked_inst_uuids = \
1816
        group_inst_uuids.difference(
1817
          [self.cfg.GetInstanceInfoByName(name).uuid
1818
           for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1819

    
1820
    if unlocked_node_uuids:
1821
      raise errors.OpPrereqError(
1822
        "Missing lock for nodes: %s" %
1823
        utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1824
        errors.ECODE_STATE)
1825

    
1826
    if unlocked_inst_uuids:
1827
      raise errors.OpPrereqError(
1828
        "Missing lock for instances: %s" %
1829
        utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1830
        errors.ECODE_STATE)
1831

    
1832
    self.all_node_info = self.cfg.GetAllNodesInfo()
1833
    self.all_inst_info = self.cfg.GetAllInstancesInfo()
1834

    
1835
    self.my_node_uuids = group_node_uuids
1836
    self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1837
                             for node_uuid in group_node_uuids)
1838

    
1839
    self.my_inst_uuids = group_inst_uuids
1840
    self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1841
                             for inst_uuid in group_inst_uuids)
1842

    
1843
    # We detect here the nodes that will need the extra RPC calls for verifying
1844
    # split LV volumes; they should be locked.
1845
    extra_lv_nodes = set()
1846

    
1847
    for inst in self.my_inst_info.values():
1848
      if inst.disk_template in constants.DTS_INT_MIRROR:
1849
        for nuuid in inst.all_nodes:
1850
          if self.all_node_info[nuuid].group != self.group_uuid:
1851
            extra_lv_nodes.add(nuuid)
1852

    
1853
    unlocked_lv_nodes = \
1854
        extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1855

    
1856
    if unlocked_lv_nodes:
1857
      raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1858
                                 utils.CommaJoin(unlocked_lv_nodes),
1859
                                 errors.ECODE_STATE)
1860
    self.extra_lv_nodes = list(extra_lv_nodes)
1861

    
1862
  def _VerifyNode(self, ninfo, nresult):
1863
    """Perform some basic validation on data returned from a node.
1864

1865
      - check the result data structure is well formed and has all the
1866
        mandatory fields
1867
      - check ganeti version
1868

1869
    @type ninfo: L{objects.Node}
1870
    @param ninfo: the node to check
1871
    @param nresult: the results from the node
1872
    @rtype: boolean
1873
    @return: whether overall this call was successful (and we can expect
1874
         reasonable values in the respose)
1875

1876
    """
1877
    # main result, nresult should be a non-empty dict
1878
    test = not nresult or not isinstance(nresult, dict)
1879
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1880
                  "unable to verify node: no data returned")
1881
    if test:
1882
      return False
1883

    
1884
    # compares ganeti version
1885
    local_version = constants.PROTOCOL_VERSION
1886
    remote_version = nresult.get("version", None)
1887
    test = not (remote_version and
1888
                isinstance(remote_version, (list, tuple)) and
1889
                len(remote_version) == 2)
1890
    self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1891
                  "connection to node returned invalid data")
1892
    if test:
1893
      return False
1894

    
1895
    test = local_version != remote_version[0]
1896
    self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1897
                  "incompatible protocol versions: master %s,"
1898
                  " node %s", local_version, remote_version[0])
1899
    if test:
1900
      return False
1901

    
1902
    # node seems compatible, we can actually try to look into its results
1903

    
1904
    # full package version
1905
    self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1906
                  constants.CV_ENODEVERSION, ninfo.name,
1907
                  "software version mismatch: master %s, node %s",
1908
                  constants.RELEASE_VERSION, remote_version[1],
1909
                  code=self.ETYPE_WARNING)
1910

    
1911
    hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1912
    if ninfo.vm_capable and isinstance(hyp_result, dict):
1913
      for hv_name, hv_result in hyp_result.iteritems():
1914
        test = hv_result is not None
1915
        self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1916
                      "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1917

    
1918
    hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1919
    if ninfo.vm_capable and isinstance(hvp_result, list):
1920
      for item, hv_name, hv_result in hvp_result:
1921
        self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1922
                      "hypervisor %s parameter verify failure (source %s): %s",
1923
                      hv_name, item, hv_result)
1924

    
1925
    test = nresult.get(constants.NV_NODESETUP,
1926
                       ["Missing NODESETUP results"])
1927
    self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1928
                  "node setup error: %s", "; ".join(test))
1929

    
1930
    return True
1931

    
1932
  def _VerifyNodeTime(self, ninfo, nresult,
1933
                      nvinfo_starttime, nvinfo_endtime):
1934
    """Check the node time.
1935

1936
    @type ninfo: L{objects.Node}
1937
    @param ninfo: the node to check
1938
    @param nresult: the remote results for the node
1939
    @param nvinfo_starttime: the start time of the RPC call
1940
    @param nvinfo_endtime: the end time of the RPC call
1941

1942
    """
1943
    ntime = nresult.get(constants.NV_TIME, None)
1944
    try:
1945
      ntime_merged = utils.MergeTime(ntime)
1946
    except (ValueError, TypeError):
1947
      self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1948
                    "Node returned invalid time")
1949
      return
1950

    
1951
    if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1952
      ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1953
    elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1954
      ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1955
    else:
1956
      ntime_diff = None
1957

    
1958
    self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1959
                  "Node time diverges by at least %s from master node time",
1960
                  ntime_diff)
1961

    
1962
  def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1963
    """Check the node LVM results and update info for cross-node checks.
1964

1965
    @type ninfo: L{objects.Node}
1966
    @param ninfo: the node to check
1967
    @param nresult: the remote results for the node
1968
    @param vg_name: the configured VG name
1969
    @type nimg: L{NodeImage}
1970
    @param nimg: node image
1971

1972
    """
1973
    if vg_name is None:
1974
      return
1975

    
1976
    # checks vg existence and size > 20G
1977
    vglist = nresult.get(constants.NV_VGLIST, None)
1978
    test = not vglist
1979
    self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1980
                  "unable to check volume groups")
1981
    if not test:
1982
      vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1983
                                            constants.MIN_VG_SIZE)
1984
      self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1985

    
1986
    # Check PVs
1987
    (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1988
    for em in errmsgs:
1989
      self._Error(constants.CV_ENODELVM, ninfo.name, em)
1990
    if pvminmax is not None:
1991
      (nimg.pv_min, nimg.pv_max) = pvminmax
1992

    
1993
  def _VerifyGroupDRBDVersion(self, node_verify_infos):
1994
    """Check cross-node DRBD version consistency.
1995

1996
    @type node_verify_infos: dict
1997
    @param node_verify_infos: infos about nodes as returned from the
1998
      node_verify call.
1999

2000
    """
2001
    node_versions = {}
2002
    for node_uuid, ndata in node_verify_infos.items():
2003
      nresult = ndata.payload
2004
      if nresult:
2005
        version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
2006
        node_versions[node_uuid] = version
2007

    
2008
    if len(set(node_versions.values())) > 1:
2009
      for node_uuid, version in sorted(node_versions.items()):
2010
        msg = "DRBD version mismatch: %s" % version
2011
        self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
2012
                    code=self.ETYPE_WARNING)
2013

    
2014
  def _VerifyGroupLVM(self, node_image, vg_name):
2015
    """Check cross-node consistency in LVM.
2016

2017
    @type node_image: dict
2018
    @param node_image: info about nodes, mapping from node to names to
2019
      L{NodeImage} objects
2020
    @param vg_name: the configured VG name
2021

2022
    """
2023
    if vg_name is None:
2024
      return
2025

    
2026
    # Only exclusive storage needs this kind of checks
2027
    if not self._exclusive_storage:
2028
      return
2029

    
2030
    # exclusive_storage wants all PVs to have the same size (approximately),
2031
    # if the smallest and the biggest ones are okay, everything is fine.
2032
    # pv_min is None iff pv_max is None
2033
    vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
2034
    if not vals:
2035
      return
2036
    (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
2037
    (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
2038
    bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
2039
    self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
2040
                  "PV sizes differ too much in the group; smallest (%s MB) is"
2041
                  " on %s, biggest (%s MB) is on %s",
2042
                  pvmin, self.cfg.GetNodeName(minnode_uuid),
2043
                  pvmax, self.cfg.GetNodeName(maxnode_uuid))
2044

    
2045
  def _VerifyNodeBridges(self, ninfo, nresult, bridges):
2046
    """Check the node bridges.
2047

2048
    @type ninfo: L{objects.Node}
2049
    @param ninfo: the node to check
2050
    @param nresult: the remote results for the node
2051
    @param bridges: the expected list of bridges
2052

2053
    """
2054
    if not bridges:
2055
      return
2056

    
2057
    missing = nresult.get(constants.NV_BRIDGES, None)
2058
    test = not isinstance(missing, list)
2059
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2060
                  "did not return valid bridge information")
2061
    if not test:
2062
      self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
2063
                    "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
2064

    
2065
  def _VerifyNodeUserScripts(self, ninfo, nresult):
2066
    """Check the results of user scripts presence and executability on the node
2067

2068
    @type ninfo: L{objects.Node}
2069
    @param ninfo: the node to check
2070
    @param nresult: the remote results for the node
2071

2072
    """
2073
    test = not constants.NV_USERSCRIPTS in nresult
2074
    self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2075
                  "did not return user scripts information")
2076

    
2077
    broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
2078
    if not test:
2079
      self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
2080
                    "user scripts not present or not executable: %s" %
2081
                    utils.CommaJoin(sorted(broken_scripts)))
2082

    
2083
  def _VerifyNodeNetwork(self, ninfo, nresult):
2084
    """Check the node network connectivity results.
2085

2086
    @type ninfo: L{objects.Node}
2087
    @param ninfo: the node to check
2088
    @param nresult: the remote results for the node
2089

2090
    """
2091
    test = constants.NV_NODELIST not in nresult
2092
    self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
2093
                  "node hasn't returned node ssh connectivity data")
2094
    if not test:
2095
      if nresult[constants.NV_NODELIST]:
2096
        for a_node, a_msg in nresult[constants.NV_NODELIST].items():
2097
          self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
2098
                        "ssh communication with node '%s': %s", a_node, a_msg)
2099

    
2100
    test = constants.NV_NODENETTEST not in nresult
2101
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2102
                  "node hasn't returned node tcp connectivity data")
2103
    if not test:
2104
      if nresult[constants.NV_NODENETTEST]:
2105
        nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
2106
        for anode in nlist:
2107
          self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
2108
                        "tcp communication with node '%s': %s",
2109
                        anode, nresult[constants.NV_NODENETTEST][anode])
2110

    
2111
    test = constants.NV_MASTERIP not in nresult
2112
    self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
2113
                  "node hasn't returned node master IP reachability data")
2114
    if not test:
2115
      if not nresult[constants.NV_MASTERIP]:
2116
        if ninfo.uuid == self.master_node:
2117
          msg = "the master node cannot reach the master IP (not configured?)"
2118
        else:
2119
          msg = "cannot reach the master IP"
2120
        self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
2121

    
2122
  def _VerifyInstance(self, instance, node_image, diskstatus):
2123
    """Verify an instance.
2124

2125
    This function checks to see if the required block devices are
2126
    available on the instance's node, and that the nodes are in the correct
2127
    state.
2128

2129
    """
2130
    pnode_uuid = instance.primary_node
2131
    pnode_img = node_image[pnode_uuid]
2132
    groupinfo = self.cfg.GetAllNodeGroupsInfo()
2133

    
2134
    node_vol_should = {}
2135
    instance.MapLVsByNode(node_vol_should)
2136

    
2137
    cluster = self.cfg.GetClusterInfo()
2138
    ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
2139
                                                            self.group_info)
2140
    err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
2141
    self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
2142
                  utils.CommaJoin(err), code=self.ETYPE_WARNING)
2143

    
2144
    for node_uuid in node_vol_should:
2145
      n_img = node_image[node_uuid]
2146
      if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
2147
        # ignore missing volumes on offline or broken nodes
2148
        continue
2149
      for volume in node_vol_should[node_uuid]:
2150
        test = volume not in n_img.volumes
2151
        self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
2152
                      "volume %s missing on node %s", volume,
2153
                      self.cfg.GetNodeName(node_uuid))
2154

    
2155
    if instance.admin_state == constants.ADMINST_UP:
2156
      test = instance.uuid not in pnode_img.instances and not pnode_img.offline
2157
      self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
2158
                    "instance not running on its primary node %s",
2159
                     self.cfg.GetNodeName(pnode_uuid))
2160
      self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
2161
                    instance.name, "instance is marked as running and lives on"
2162
                    " offline node %s", self.cfg.GetNodeName(pnode_uuid))
2163

    
2164
    diskdata = [(nname, success, status, idx)
2165
                for (nname, disks) in diskstatus.items()
2166
                for idx, (success, status) in enumerate(disks)]
2167

    
2168
    for nname, success, bdev_status, idx in diskdata:
2169
      # the 'ghost node' construction in Exec() ensures that we have a
2170
      # node here
2171
      snode = node_image[nname]
2172
      bad_snode = snode.ghost or snode.offline
2173
      self._ErrorIf(instance.disks_active and
2174
                    not success and not bad_snode,
2175
                    constants.CV_EINSTANCEFAULTYDISK, instance.name,
2176
                    "couldn't retrieve status for disk/%s on %s: %s",
2177
                    idx, self.cfg.GetNodeName(nname), bdev_status)
2178

    
2179
      if instance.disks_active and success and \
2180
         (bdev_status.is_degraded or
2181
          bdev_status.ldisk_status != constants.LDS_OKAY):
2182
        msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
2183
        if bdev_status.is_degraded:
2184
          msg += " is degraded"
2185
        if bdev_status.ldisk_status != constants.LDS_OKAY:
2186
          msg += "; state is '%s'" % \
2187
                 constants.LDS_NAMES[bdev_status.ldisk_status]
2188

    
2189
        self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2190

    
2191
    self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2192
                  constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2193
                  "instance %s, connection to primary node failed",
2194
                  instance.name)
2195

    
2196
    self._ErrorIf(len(instance.secondary_nodes) > 1,
2197
                  constants.CV_EINSTANCELAYOUT, instance.name,
2198
                  "instance has multiple secondary nodes: %s",
2199
                  utils.CommaJoin(instance.secondary_nodes),
2200
                  code=self.ETYPE_WARNING)
2201

    
2202
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2203
    if any(es_flags.values()):
2204
      if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2205
        # Disk template not compatible with exclusive_storage: no instance
2206
        # node should have the flag set
2207
        es_nodes = [n
2208
                    for (n, es) in es_flags.items()
2209
                    if es]
2210
        self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2211
                    "instance has template %s, which is not supported on nodes"
2212
                    " that have exclusive storage set: %s",
2213
                    instance.disk_template,
2214
                    utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2215
      for (idx, disk) in enumerate(instance.disks):
2216
        self._ErrorIf(disk.spindles is None,
2217
                      constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2218
                      "number of spindles not configured for disk %s while"
2219
                      " exclusive storage is enabled, try running"
2220
                      " gnt-cluster repair-disk-sizes", idx)
2221

    
2222
    if instance.disk_template in constants.DTS_INT_MIRROR:
2223
      instance_nodes = utils.NiceSort(instance.all_nodes)
2224
      instance_groups = {}
2225

    
2226
      for node_uuid in instance_nodes:
2227
        instance_groups.setdefault(self.all_node_info[node_uuid].group,
2228
                                   []).append(node_uuid)
2229

    
2230
      pretty_list = [
2231
        "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2232
                           groupinfo[group].name)
2233
        # Sort so that we always list the primary node first.
2234
        for group, nodes in sorted(instance_groups.items(),
2235
                                   key=lambda (_, nodes): pnode_uuid in nodes,
2236
                                   reverse=True)]
2237

    
2238
      self._ErrorIf(len(instance_groups) > 1,
2239
                    constants.CV_EINSTANCESPLITGROUPS,
2240
                    instance.name, "instance has primary and secondary nodes in"
2241
                    " different groups: %s", utils.CommaJoin(pretty_list),
2242
                    code=self.ETYPE_WARNING)
2243

    
2244
    inst_nodes_offline = []
2245
    for snode in instance.secondary_nodes:
2246
      s_img = node_image[snode]
2247
      self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2248
                    self.cfg.GetNodeName(snode),
2249
                    "instance %s, connection to secondary node failed",
2250
                    instance.name)
2251

    
2252
      if s_img.offline:
2253
        inst_nodes_offline.append(snode)
2254

    
2255
    # warn that the instance lives on offline nodes
2256
    self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2257
                  instance.name, "instance has offline secondary node(s) %s",
2258
                  utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2259
    # ... or ghost/non-vm_capable nodes
2260
    for node_uuid in instance.all_nodes:
2261
      self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2262
                    instance.name, "instance lives on ghost node %s",
2263
                    self.cfg.GetNodeName(node_uuid))
2264
      self._ErrorIf(not node_image[node_uuid].vm_capable,
2265
                    constants.CV_EINSTANCEBADNODE, instance.name,
2266
                    "instance lives on non-vm_capable node %s",
2267
                    self.cfg.GetNodeName(node_uuid))
2268

    
2269
  def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2270
    """Verify if there are any unknown volumes in the cluster.
2271

2272
    The .os, .swap and backup volumes are ignored. All other volumes are
2273
    reported as unknown.
2274

2275
    @type reserved: L{ganeti.utils.FieldSet}
2276
    @param reserved: a FieldSet of reserved volume names
2277

2278
    """
2279
    for node_uuid, n_img in node_image.items():
2280
      if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2281
          self.all_node_info[node_uuid].group != self.group_uuid):
2282
        # skip non-healthy nodes
2283
        continue
2284
      for volume in n_img.volumes:
2285
        test = ((node_uuid not in node_vol_should or
2286
                volume not in node_vol_should[node_uuid]) and
2287
                not reserved.Matches(volume))
2288
        self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2289
                      self.cfg.GetNodeName(node_uuid),
2290
                      "volume %s is unknown", volume,
2291
                      code=_VerifyErrors.ETYPE_WARNING)
2292

    
2293
  def _VerifyNPlusOneMemory(self, node_image, all_insts):
2294
    """Verify N+1 Memory Resilience.
2295

2296
    Check that if one single node dies we can still start all the
2297
    instances it was primary for.
2298

2299
    """
2300
    cluster_info = self.cfg.GetClusterInfo()
2301
    for node_uuid, n_img in node_image.items():
2302
      # This code checks that every node which is now listed as
2303
      # secondary has enough memory to host all instances it is
2304
      # supposed to should a single other node in the cluster fail.
2305
      # FIXME: not ready for failover to an arbitrary node
2306
      # FIXME: does not support file-backed instances
2307
      # WARNING: we currently take into account down instances as well
2308
      # as up ones, considering that even if they're down someone
2309
      # might want to start them even in the event of a node failure.
2310
      if n_img.offline or \
2311
         self.all_node_info[node_uuid].group != self.group_uuid:
2312
        # we're skipping nodes marked offline and nodes in other groups from
2313
        # the N+1 warning, since most likely we don't have good memory
2314
        # information from them; we already list instances living on such
2315
        # nodes, and that's enough warning
2316
        continue
2317
      #TODO(dynmem): also consider ballooning out other instances
2318
      for prinode, inst_uuids in n_img.sbp.items():
2319
        needed_mem = 0
2320
        for inst_uuid in inst_uuids:
2321
          bep = cluster_info.FillBE(all_insts[inst_uuid])
2322
          if bep[constants.BE_AUTO_BALANCE]:
2323
            needed_mem += bep[constants.BE_MINMEM]
2324
        test = n_img.mfree < needed_mem
2325
        self._ErrorIf(test, constants.CV_ENODEN1,
2326
                      self.cfg.GetNodeName(node_uuid),
2327
                      "not enough memory to accomodate instance failovers"
2328
                      " should node %s fail (%dMiB needed, %dMiB available)",
2329
                      self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2330

    
2331
  def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2332
                   (files_all, files_opt, files_mc, files_vm)):
2333
    """Verifies file checksums collected from all nodes.
2334

2335
    @param nodes: List of L{objects.Node} objects
2336
    @param master_node_uuid: UUID of master node
2337
    @param all_nvinfo: RPC results
2338

2339
    """
2340
    # Define functions determining which nodes to consider for a file
2341
    files2nodefn = [
2342
      (files_all, None),
2343
      (files_mc, lambda node: (node.master_candidate or
2344
                               node.uuid == master_node_uuid)),
2345
      (files_vm, lambda node: node.vm_capable),
2346
      ]
2347

    
2348
    # Build mapping from filename to list of nodes which should have the file
2349
    nodefiles = {}
2350
    for (files, fn) in files2nodefn:
2351
      if fn is None:
2352
        filenodes = nodes
2353
      else:
2354
        filenodes = filter(fn, nodes)
2355
      nodefiles.update((filename,
2356
                        frozenset(map(operator.attrgetter("uuid"), filenodes)))
2357
                       for filename in files)
2358

    
2359
    assert set(nodefiles) == (files_all | files_mc | files_vm)
2360

    
2361
    fileinfo = dict((filename, {}) for filename in nodefiles)
2362
    ignore_nodes = set()
2363

    
2364
    for node in nodes:
2365
      if node.offline:
2366
        ignore_nodes.add(node.uuid)
2367
        continue
2368

    
2369
      nresult = all_nvinfo[node.uuid]
2370

    
2371
      if nresult.fail_msg or not nresult.payload:
2372
        node_files = None
2373
      else:
2374
        fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2375
        node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2376
                          for (key, value) in fingerprints.items())
2377
        del fingerprints
2378

    
2379
      test = not (node_files and isinstance(node_files, dict))
2380
      self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2381
                    "Node did not return file checksum data")
2382
      if test:
2383
        ignore_nodes.add(node.uuid)
2384
        continue
2385

    
2386
      # Build per-checksum mapping from filename to nodes having it
2387
      for (filename, checksum) in node_files.items():
2388
        assert filename in nodefiles
2389
        fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2390

    
2391
    for (filename, checksums) in fileinfo.items():
2392
      assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2393

    
2394
      # Nodes having the file
2395
      with_file = frozenset(node_uuid
2396
                            for node_uuids in fileinfo[filename].values()
2397
                            for node_uuid in node_uuids) - ignore_nodes
2398

    
2399
      expected_nodes = nodefiles[filename] - ignore_nodes
2400

    
2401
      # Nodes missing file
2402
      missing_file = expected_nodes - with_file
2403

    
2404
      if filename in files_opt:
2405
        # All or no nodes
2406
        self._ErrorIf(missing_file and missing_file != expected_nodes,
2407
                      constants.CV_ECLUSTERFILECHECK, None,
2408
                      "File %s is optional, but it must exist on all or no"
2409
                      " nodes (not found on %s)",
2410
                      filename,
2411
                      utils.CommaJoin(
2412
                        utils.NiceSort(
2413
                          map(self.cfg.GetNodeName, missing_file))))
2414
      else:
2415
        self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2416
                      "File %s is missing from node(s) %s", filename,
2417
                      utils.CommaJoin(
2418
                        utils.NiceSort(
2419
                          map(self.cfg.GetNodeName, missing_file))))
2420

    
2421
        # Warn if a node has a file it shouldn't
2422
        unexpected = with_file - expected_nodes
2423
        self._ErrorIf(unexpected,
2424
                      constants.CV_ECLUSTERFILECHECK, None,
2425
                      "File %s should not exist on node(s) %s",
2426
                      filename, utils.CommaJoin(
2427
                        utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2428

    
2429
      # See if there are multiple versions of the file
2430
      test = len(checksums) > 1
2431
      if test:
2432
        variants = ["variant %s on %s" %
2433
                    (idx + 1,
2434
                     utils.CommaJoin(utils.NiceSort(
2435
                       map(self.cfg.GetNodeName, node_uuids))))
2436
                    for (idx, (checksum, node_uuids)) in
2437
                      enumerate(sorted(checksums.items()))]
2438
      else:
2439
        variants = []
2440

    
2441
      self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2442
                    "File %s found with %s different checksums (%s)",
2443
                    filename, len(checksums), "; ".join(variants))
2444

    
2445
  def _VerifyNodeDrbdHelper(self, ninfo, nresult, drbd_helper):
2446
    """Verify the drbd helper.
2447

2448
    """
2449
    if drbd_helper:
2450
      helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2451
      test = (helper_result is None)
2452
      self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2453
                    "no drbd usermode helper returned")
2454
      if helper_result:
2455
        status, payload = helper_result
2456
        test = not status
2457
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2458
                      "drbd usermode helper check unsuccessful: %s", payload)
2459
        test = status and (payload != drbd_helper)
2460
        self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2461
                      "wrong drbd usermode helper: %s", payload)
2462

    
2463
  def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2464
                      drbd_map):
2465
    """Verifies and the node DRBD status.
2466

2467
    @type ninfo: L{objects.Node}
2468
    @param ninfo: the node to check
2469
    @param nresult: the remote results for the node
2470
    @param instanceinfo: the dict of instances
2471
    @param drbd_helper: the configured DRBD usermode helper
2472
    @param drbd_map: the DRBD map as returned by
2473
        L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2474

2475
    """
2476
    self._VerifyNodeDrbdHelper(ninfo, nresult, drbd_helper)
2477

    
2478
    # compute the DRBD minors
2479
    node_drbd = {}
2480
    for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2481
      test = inst_uuid not in instanceinfo
2482
      self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2483
                    "ghost instance '%s' in temporary DRBD map", inst_uuid)
2484
        # ghost instance should not be running, but otherwise we
2485
        # don't give double warnings (both ghost instance and
2486
        # unallocated minor in use)
2487
      if test:
2488
        node_drbd[minor] = (inst_uuid, False)
2489
      else:
2490
        instance = instanceinfo[inst_uuid]
2491
        node_drbd[minor] = (inst_uuid, instance.disks_active)
2492

    
2493
    # and now check them
2494
    used_minors = nresult.get(constants.NV_DRBDLIST, [])
2495
    test = not isinstance(used_minors, (tuple, list))
2496
    self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2497
                  "cannot parse drbd status file: %s", str(used_minors))
2498
    if test:
2499
      # we cannot check drbd status
2500
      return
2501

    
2502
    for minor, (inst_uuid, must_exist) in node_drbd.items():
2503
      test = minor not in used_minors and must_exist
2504
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2505
                    "drbd minor %d of instance %s is not active", minor,
2506
                    self.cfg.GetInstanceName(inst_uuid))
2507
    for minor in used_minors:
2508
      test = minor not in node_drbd
2509
      self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2510
                    "unallocated drbd minor %d is in use", minor)
2511

    
2512
  def _UpdateNodeOS(self, ninfo, nresult, nimg):
2513
    """Builds the node OS structures.
2514

2515
    @type ninfo: L{objects.Node}
2516
    @param ninfo: the node to check
2517
    @param nresult: the remote results for the node
2518
    @param nimg: the node image object
2519

2520
    """
2521
    remote_os = nresult.get(constants.NV_OSLIST, None)
2522
    test = (not isinstance(remote_os, list) or
2523
            not compat.all(isinstance(v, list) and len(v) == 7
2524
                           for v in remote_os))
2525

    
2526
    self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2527
                  "node hasn't returned valid OS data")
2528

    
2529
    nimg.os_fail = test
2530

    
2531
    if test:
2532
      return
2533

    
2534
    os_dict = {}
2535

    
2536
    for (name, os_path, status, diagnose,
2537
         variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2538

    
2539
      if name not in os_dict:
2540
        os_dict[name] = []
2541

    
2542
      # parameters is a list of lists instead of list of tuples due to
2543
      # JSON lacking a real tuple type, fix it:
2544
      parameters = [tuple(v) for v in parameters]
2545
      os_dict[name].append((os_path, status, diagnose,
2546
                            set(variants), set(parameters), set(api_ver)))
2547

    
2548
    nimg.oslist = os_dict
2549

    
2550
  def _VerifyNodeOS(self, ninfo, nimg, base):
2551
    """Verifies the node OS list.
2552

2553
    @type ninfo: L{objects.Node}
2554
    @param ninfo: the node to check
2555
    @param nimg: the node image object
2556
    @param base: the 'template' node we match against (e.g. from the master)
2557

2558
    """
2559
    assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2560

    
2561
    beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2562
    for os_name, os_data in nimg.oslist.items():
2563
      assert os_data, "Empty OS status for OS %s?!" % os_name
2564
      f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2565
      self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2566
                    "Invalid OS %s (located at %s): %s",
2567
                    os_name, f_path, f_diag)
2568
      self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2569
                    "OS '%s' has multiple entries"
2570
                    " (first one shadows the rest): %s",
2571
                    os_name, utils.CommaJoin([v[0] for v in os_data]))
2572
      # comparisons with the 'base' image
2573
      test = os_name not in base.oslist
2574
      self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2575
                    "Extra OS %s not present on reference node (%s)",
2576
                    os_name, self.cfg.GetNodeName(base.uuid))
2577
      if test:
2578
        continue
2579
      assert base.oslist[os_name], "Base node has empty OS status?"
2580
      _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2581
      if not b_status:
2582
        # base OS is invalid, skipping
2583
        continue
2584
      for kind, a, b in [("API version", f_api, b_api),
2585
                         ("variants list", f_var, b_var),
2586
                         ("parameters", beautify_params(f_param),
2587
                          beautify_params(b_param))]:
2588
        self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2589
                      "OS %s for %s differs from reference node %s:"
2590
                      " [%s] vs. [%s]", kind, os_name,
2591
                      self.cfg.GetNodeName(base.uuid),
2592
                      utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2593

    
2594
    # check any missing OSes
2595
    missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2596
    self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2597
                  "OSes present on reference node %s"
2598
                  " but missing on this node: %s",
2599
                  self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2600

    
2601
  def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2602
    """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2603

2604
    @type ninfo: L{objects.Node}
2605
    @param ninfo: the node to check
2606
    @param nresult: the remote results for the node
2607
    @type is_master: bool
2608
    @param is_master: Whether node is the master node
2609

2610
    """
2611
    cluster = self.cfg.GetClusterInfo()
2612
    if (is_master and
2613
        (cluster.IsFileStorageEnabled() or
2614
         cluster.IsSharedFileStorageEnabled())):
2615
      try:
2616
        fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2617
      except KeyError:
2618
        # This should never happen
2619
        self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2620
                      "Node did not return forbidden file storage paths")
2621
      else:
2622
        self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2623
                      "Found forbidden file storage paths: %s",
2624
                      utils.CommaJoin(fspaths))
2625
    else:
2626
      self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2627
                    constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2628
                    "Node should not have returned forbidden file storage"
2629
                    " paths")
2630

    
2631
  def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2632
                          verify_key, error_key):
2633
    """Verifies (file) storage paths.
2634

2635
    @type ninfo: L{objects.Node}
2636
    @param ninfo: the node to check
2637
    @param nresult: the remote results for the node
2638
    @type file_disk_template: string
2639
    @param file_disk_template: file-based disk template, whose directory
2640
        is supposed to be verified
2641
    @type verify_key: string
2642
    @param verify_key: key for the verification map of this file
2643
        verification step
2644
    @param error_key: error key to be added to the verification results
2645
        in case something goes wrong in this verification step
2646

2647
    """
2648
    assert (file_disk_template in utils.storage.GetDiskTemplatesOfStorageTypes(
2649
              constants.ST_FILE, constants.ST_SHARED_FILE
2650
           ))
2651

    
2652
    cluster = self.cfg.GetClusterInfo()
2653
    if cluster.IsDiskTemplateEnabled(file_disk_template):
2654
      self._ErrorIf(
2655
          verify_key in nresult,
2656
          error_key, ninfo.name,
2657
          "The configured %s storage path is unusable: %s" %
2658
          (file_disk_template, nresult.get(verify_key)))
2659

    
2660
  def _VerifyFileStoragePaths(self, ninfo, nresult):
2661
    """Verifies (file) storage paths.
2662

2663
    @see: C{_VerifyStoragePaths}
2664

2665
    """
2666
    self._VerifyStoragePaths(
2667
        ninfo, nresult, constants.DT_FILE,
2668
        constants.NV_FILE_STORAGE_PATH,
2669
        constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2670

    
2671
  def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2672
    """Verifies (file) storage paths.
2673

2674
    @see: C{_VerifyStoragePaths}
2675

2676
    """
2677
    self._VerifyStoragePaths(
2678
        ninfo, nresult, constants.DT_SHARED_FILE,
2679
        constants.NV_SHARED_FILE_STORAGE_PATH,
2680
        constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2681

    
2682
  def _VerifyOob(self, ninfo, nresult):
2683
    """Verifies out of band functionality of a node.
2684

2685
    @type ninfo: L{objects.Node}
2686
    @param ninfo: the node to check
2687
    @param nresult: the remote results for the node
2688

2689
    """
2690
    # We just have to verify the paths on master and/or master candidates
2691
    # as the oob helper is invoked on the master
2692
    if ((ninfo.master_candidate or ninfo.master_capable) and
2693
        constants.NV_OOB_PATHS in nresult):
2694
      for path_result in nresult[constants.NV_OOB_PATHS]:
2695
        self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2696
                      ninfo.name, path_result)
2697

    
2698
  def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2699
    """Verifies and updates the node volume data.
2700

2701
    This function will update a L{NodeImage}'s internal structures
2702
    with data from the remote call.
2703

2704
    @type ninfo: L{objects.Node}
2705
    @param ninfo: the node to check
2706
    @param nresult: the remote results for the node
2707
    @param nimg: the node image object
2708
    @param vg_name: the configured VG name
2709

2710
    """
2711
    nimg.lvm_fail = True
2712
    lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2713
    if vg_name is None:
2714
      pass
2715
    elif isinstance(lvdata, basestring):
2716
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2717
                    "LVM problem on node: %s", utils.SafeEncode(lvdata))
2718
    elif not isinstance(lvdata, dict):
2719
      self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2720
                    "rpc call to node failed (lvlist)")
2721
    else:
2722
      nimg.volumes = lvdata
2723
      nimg.lvm_fail = False
2724

    
2725
  def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2726
    """Verifies and updates the node instance list.
2727

2728
    If the listing was successful, then updates this node's instance
2729
    list. Otherwise, it marks the RPC call as failed for the instance
2730
    list key.
2731

2732
    @type ninfo: L{objects.Node}
2733
    @param ninfo: the node to check
2734
    @param nresult: the remote results for the node
2735
    @param nimg: the node image object
2736

2737
    """
2738
    idata = nresult.get(constants.NV_INSTANCELIST, None)
2739
    test = not isinstance(idata, list)
2740
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2741
                  "rpc call to node failed (instancelist): %s",
2742
                  utils.SafeEncode(str(idata)))
2743
    if test:
2744
      nimg.hyp_fail = True
2745
    else:
2746
      nimg.instances = [inst.uuid for (_, inst) in
2747
                        self.cfg.GetMultiInstanceInfoByName(idata)]
2748

    
2749
  def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2750
    """Verifies and computes a node information map
2751

2752
    @type ninfo: L{objects.Node}
2753
    @param ninfo: the node to check
2754
    @param nresult: the remote results for the node
2755
    @param nimg: the node image object
2756
    @param vg_name: the configured VG name
2757

2758
    """
2759
    # try to read free memory (from the hypervisor)
2760
    hv_info = nresult.get(constants.NV_HVINFO, None)
2761
    test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2762
    self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2763
                  "rpc call to node failed (hvinfo)")
2764
    if not test:
2765
      try:
2766
        nimg.mfree = int(hv_info["memory_free"])
2767
      except (ValueError, TypeError):
2768
        self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2769
                      "node returned invalid nodeinfo, check hypervisor")
2770

    
2771
    # FIXME: devise a free space model for file based instances as well
2772
    if vg_name is not None:
2773
      test = (constants.NV_VGLIST not in nresult or
2774
              vg_name not in nresult[constants.NV_VGLIST])
2775
      self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2776
                    "node didn't return data for the volume group '%s'"
2777
                    " - it is either missing or broken", vg_name)
2778
      if not test:
2779
        try:
2780
          nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2781
        except (ValueError, TypeError):
2782
          self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2783
                        "node returned invalid LVM info, check LVM status")
2784

    
2785
  def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2786
    """Gets per-disk status information for all instances.
2787

2788
    @type node_uuids: list of strings
2789
    @param node_uuids: Node UUIDs
2790
    @type node_image: dict of (UUID, L{objects.Node})
2791
    @param node_image: Node objects
2792
    @type instanceinfo: dict of (UUID, L{objects.Instance})
2793
    @param instanceinfo: Instance objects
2794
    @rtype: {instance: {node: [(succes, payload)]}}
2795
    @return: a dictionary of per-instance dictionaries with nodes as
2796
        keys and disk information as values; the disk information is a
2797
        list of tuples (success, payload)
2798

2799
    """
2800
    node_disks = {}
2801
    node_disks_dev_inst_only = {}
2802
    diskless_instances = set()
2803
    diskless = constants.DT_DISKLESS
2804

    
2805
    for nuuid in node_uuids:
2806
      node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2807
                                             node_image[nuuid].sinst))
2808
      diskless_instances.update(uuid for uuid in node_inst_uuids
2809
                                if instanceinfo[uuid].disk_template == diskless)
2810
      disks = [(inst_uuid, disk)
2811
               for inst_uuid in node_inst_uuids
2812
               for disk in instanceinfo[inst_uuid].disks]
2813

    
2814
      if not disks:
2815
        # No need to collect data
2816
        continue
2817

    
2818
      node_disks[nuuid] = disks
2819

    
2820
      # _AnnotateDiskParams makes already copies of the disks
2821
      dev_inst_only = []
2822
      for (inst_uuid, dev) in disks:
2823
        (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2824
                                          self.cfg)
2825
        dev_inst_only.append((anno_disk, instanceinfo[inst_uuid]))
2826

    
2827
      node_disks_dev_inst_only[nuuid] = dev_inst_only
2828

    
2829
    assert len(node_disks) == len(node_disks_dev_inst_only)
2830

    
2831
    # Collect data from all nodes with disks
2832
    result = self.rpc.call_blockdev_getmirrorstatus_multi(
2833
               node_disks.keys(), node_disks_dev_inst_only)
2834

    
2835
    assert len(result) == len(node_disks)
2836

    
2837
    instdisk = {}
2838

    
2839
    for (nuuid, nres) in result.items():
2840
      node = self.cfg.GetNodeInfo(nuuid)
2841
      disks = node_disks[node.uuid]
2842

    
2843
      if nres.offline:
2844
        # No data from this node
2845
        data = len(disks) * [(False, "node offline")]
2846
      else:
2847
        msg = nres.fail_msg
2848
        self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2849
                      "while getting disk information: %s", msg)
2850
        if msg:
2851
          # No data from this node
2852
          data = len(disks) * [(False, msg)]
2853
        else:
2854
          data = []
2855
          for idx, i in enumerate(nres.payload):
2856
            if isinstance(i, (tuple, list)) and len(i) == 2:
2857
              data.append(i)
2858
            else:
2859
              logging.warning("Invalid result from node %s, entry %d: %s",
2860
                              node.name, idx, i)
2861
              data.append((False, "Invalid result from the remote node"))
2862

    
2863
      for ((inst_uuid, _), status) in zip(disks, data):
2864
        instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2865
          .append(status)
2866

    
2867
    # Add empty entries for diskless instances.
2868
    for inst_uuid in diskless_instances:
2869
      assert inst_uuid not in instdisk
2870
      instdisk[inst_uuid] = {}
2871

    
2872
    assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2873
                      len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2874
                      compat.all(isinstance(s, (tuple, list)) and
2875
                                 len(s) == 2 for s in statuses)
2876
                      for inst, nuuids in instdisk.items()
2877
                      for nuuid, statuses in nuuids.items())
2878
    if __debug__:
2879
      instdisk_keys = set(instdisk)
2880
      instanceinfo_keys = set(instanceinfo)
2881
      assert instdisk_keys == instanceinfo_keys, \
2882
        ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2883
         (instdisk_keys, instanceinfo_keys))
2884

    
2885
    return instdisk
2886

    
2887
  @staticmethod
2888
  def _SshNodeSelector(group_uuid, all_nodes):
2889
    """Create endless iterators for all potential SSH check hosts.
2890

2891
    """
2892
    nodes = [node for node in all_nodes
2893
             if (node.group != group_uuid and
2894
                 not node.offline)]
2895
    keyfunc = operator.attrgetter("group")
2896

    
2897
    return map(itertools.cycle,
2898
               [sorted(map(operator.attrgetter("name"), names))
2899
                for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2900
                                                  keyfunc)])
2901

    
2902
  @classmethod
2903
  def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2904
    """Choose which nodes should talk to which other nodes.
2905

2906
    We will make nodes contact all nodes in their group, and one node from
2907
    every other group.
2908

2909
    @warning: This algorithm has a known issue if one node group is much
2910
      smaller than others (e.g. just one node). In such a case all other
2911
      nodes will talk to the single node.
2912

2913
    """
2914
    online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2915
    sel = cls._SshNodeSelector(group_uuid, all_nodes)
2916

    
2917
    return (online_nodes,
2918
            dict((name, sorted([i.next() for i in sel]))
2919
                 for name in online_nodes))
2920

    
2921
  def BuildHooksEnv(self):
2922
    """Build hooks env.
2923

2924
    Cluster-Verify hooks just ran in the post phase and their failure makes
2925
    the output be logged in the verify output and the verification to fail.
2926

2927
    """
2928
    env = {
2929
      "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2930
      }
2931

    
2932
    env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2933
               for node in self.my_node_info.values())
2934

    
2935
    return env
2936

    
2937
  def BuildHooksNodes(self):
2938
    """Build hooks nodes.
2939

2940
    """
2941
    return ([], list(self.my_node_info.keys()))
2942

    
2943
  def Exec(self, feedback_fn):
2944
    """Verify integrity of the node group, performing various test on nodes.
2945

2946
    """
2947
    # This method has too many local variables. pylint: disable=R0914
2948
    feedback_fn("* Verifying group '%s'" % self.group_info.name)
2949

    
2950
    if not self.my_node_uuids:
2951
      # empty node group
2952
      feedback_fn("* Empty node group, skipping verification")
2953
      return True
2954

    
2955
    self.bad = False
2956
    verbose = self.op.verbose
2957
    self._feedback_fn = feedback_fn
2958

    
2959
    vg_name = self.cfg.GetVGName()
2960
    drbd_helper = self.cfg.GetDRBDHelper()
2961
    cluster = self.cfg.GetClusterInfo()
2962
    hypervisors = cluster.enabled_hypervisors
2963
    node_data_list = self.my_node_info.values()
2964

    
2965
    i_non_redundant = [] # Non redundant instances
2966
    i_non_a_balanced = [] # Non auto-balanced instances
2967
    i_offline = 0 # Count of offline instances
2968
    n_offline = 0 # Count of offline nodes
2969
    n_drained = 0 # Count of nodes being drained
2970
    node_vol_should = {}
2971

    
2972
    # FIXME: verify OS list
2973

    
2974
    # File verification
2975
    filemap = ComputeAncillaryFiles(cluster, False)
2976

    
2977
    # do local checksums
2978
    master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2979
    master_ip = self.cfg.GetMasterIP()
2980

    
2981
    feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2982

    
2983
    user_scripts = []
2984
    if self.cfg.GetUseExternalMipScript():
2985
      user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2986

    
2987
    node_verify_param = {
2988
      constants.NV_FILELIST:
2989
        map(vcluster.MakeVirtualPath,
2990
            utils.UniqueSequence(filename
2991
                                 for files in filemap
2992
                                 for filename in files)),
2993
      constants.NV_NODELIST:
2994
        self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2995
                                  self.all_node_info.values()),
2996
      constants.NV_HYPERVISOR: hypervisors,
2997
      constants.NV_HVPARAMS:
2998
        _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2999
      constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
3000
                                 for node in node_data_list
3001
                                 if not node.offline],
3002
      constants.NV_INSTANCELIST: hypervisors,
3003
      constants.NV_VERSION: None,
3004
      constants.NV_HVINFO: self.cfg.GetHypervisorType(),
3005
      constants.NV_NODESETUP: None,
3006
      constants.NV_TIME: None,
3007
      constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
3008
      constants.NV_OSLIST: None,
3009
      constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
3010
      constants.NV_USERSCRIPTS: user_scripts,
3011
      }
3012

    
3013
    if vg_name is not None:
3014
      node_verify_param[constants.NV_VGLIST] = None
3015
      node_verify_param[constants.NV_LVLIST] = vg_name
3016
      node_verify_param[constants.NV_PVLIST] = [vg_name]
3017

    
3018
    if cluster.IsDiskTemplateEnabled(constants.DT_DRBD8):
3019
      if drbd_helper:
3020
        node_verify_param[constants.NV_DRBDVERSION] = None
3021
        node_verify_param[constants.NV_DRBDLIST] = None
3022
        node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
3023

    
3024
    if cluster.IsFileStorageEnabled() or \
3025
        cluster.IsSharedFileStorageEnabled():
3026
      # Load file storage paths only from master node
3027
      node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
3028
        self.cfg.GetMasterNodeName()
3029
      if cluster.IsFileStorageEnabled():
3030
        node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
3031
          cluster.file_storage_dir
3032

    
3033
    # bridge checks
3034
    # FIXME: this needs to be changed per node-group, not cluster-wide
3035
    bridges = set()
3036
    default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
3037
    if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3038
      bridges.add(default_nicpp[constants.NIC_LINK])
3039
    for inst_uuid in self.my_inst_info.values():
3040
      for nic in inst_uuid.nics:
3041
        full_nic = cluster.SimpleFillNIC(nic.nicparams)
3042
        if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
3043
          bridges.add(full_nic[constants.NIC_LINK])
3044

    
3045
    if bridges:
3046
      node_verify_param[constants.NV_BRIDGES] = list(bridges)
3047

    
3048
    # Build our expected cluster state
3049
    node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
3050
                                                 uuid=node.uuid,
3051
                                                 vm_capable=node.vm_capable))
3052
                      for node in node_data_list)
3053

    
3054
    # Gather OOB paths
3055
    oob_paths = []
3056
    for node in self.all_node_info.values():
3057
      path = SupportsOob(self.cfg, node)
3058
      if path and path not in oob_paths:
3059
        oob_paths.append(path)
3060

    
3061
    if oob_paths:
3062
      node_verify_param[constants.NV_OOB_PATHS] = oob_paths
3063

    
3064
    for inst_uuid in self.my_inst_uuids:
3065
      instance = self.my_inst_info[inst_uuid]
3066
      if instance.admin_state == constants.ADMINST_OFFLINE:
3067
        i_offline += 1
3068

    
3069
      for nuuid in instance.all_nodes:
3070
        if nuuid not in node_image:
3071
          gnode = self.NodeImage(uuid=nuuid)
3072
          gnode.ghost = (nuuid not in self.all_node_info)
3073
          node_image[nuuid] = gnode
3074

    
3075
      instance.MapLVsByNode(node_vol_should)
3076

    
3077
      pnode = instance.primary_node
3078
      node_image[pnode].pinst.append(instance.uuid)
3079

    
3080
      for snode in instance.secondary_nodes:
3081
        nimg = node_image[snode]
3082
        nimg.sinst.append(instance.uuid)
3083
        if pnode not in nimg.sbp:
3084
          nimg.sbp[pnode] = []
3085
        nimg.sbp[pnode].append(instance.uuid)
3086

    
3087
    es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
3088
                                               self.my_node_info.keys())
3089
    # The value of exclusive_storage should be the same across the group, so if
3090
    # it's True for at least a node, we act as if it were set for all the nodes
3091
    self._exclusive_storage = compat.any(es_flags.values())
3092
    if self._exclusive_storage:
3093
      node_verify_param[constants.NV_EXCLUSIVEPVS] = True
3094

    
3095
    node_group_uuids = dict(map(lambda n: (n.name, n.group),
3096
                                self.cfg.GetAllNodesInfo().values()))
3097
    groups_config = self.cfg.GetAllNodeGroupsInfoDict()
3098

    
3099
    # At this point, we have the in-memory data structures complete,
3100
    # except for the runtime information, which we'll gather next
3101

    
3102
    # Due to the way our RPC system works, exact response times cannot be
3103
    # guaranteed (e.g. a broken node could run into a timeout). By keeping the
3104
    # time before and after executing the request, we can at least have a time
3105
    # window.
3106
    nvinfo_starttime = time.time()
3107
    all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
3108
                                           node_verify_param,
3109
                                           self.cfg.GetClusterName(),
3110
                                           self.cfg.GetClusterInfo().hvparams,
3111
                                           node_group_uuids,
3112
                                           groups_config)
3113
    nvinfo_endtime = time.time()
3114

    
3115
    if self.extra_lv_nodes and vg_name is not None:
3116
      extra_lv_nvinfo = \
3117
          self.rpc.call_node_verify(self.extra_lv_nodes,
3118
                                    {constants.NV_LVLIST: vg_name},
3119
                                    self.cfg.GetClusterName(),
3120
                                    self.cfg.GetClusterInfo().hvparams,
3121
                                    node_group_uuids,
3122
                                    groups_config)
3123
    else:
3124
      extra_lv_nvinfo = {}
3125

    
3126
    all_drbd_map = self.cfg.ComputeDRBDMap()
3127

    
3128
    feedback_fn("* Gathering disk information (%s nodes)" %
3129
                len(self.my_node_uuids))
3130
    instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
3131
                                     self.my_inst_info)
3132

    
3133
    feedback_fn("* Verifying configuration file consistency")
3134

    
3135
    # If not all nodes are being checked, we need to make sure the master node
3136
    # and a non-checked vm_capable node are in the list.
3137
    absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
3138
    if absent_node_uuids:
3139
      vf_nvinfo = all_nvinfo.copy()
3140
      vf_node_info = list(self.my_node_info.values())
3141
      additional_node_uuids = []
3142
      if master_node_uuid not in self.my_node_info:
3143
        additional_node_uuids.append(master_node_uuid)
3144
        vf_node_info.append(self.all_node_info[master_node_uuid])
3145
      # Add the first vm_capable node we find which is not included,
3146
      # excluding the master node (which we already have)
3147
      for node_uuid in absent_node_uuids:
3148
        nodeinfo = self.all_node_info[node_uuid]
3149
        if (nodeinfo.vm_capable and not nodeinfo.offline and
3150
            node_uuid != master_node_uuid):
3151
          additional_node_uuids.append(node_uuid)
3152
          vf_node_info.append(self.all_node_info[node_uuid])
3153
          break
3154
      key = constants.NV_FILELIST
3155
      vf_nvinfo.update(self.rpc.call_node_verify(
3156
         additional_node_uuids, {key: node_verify_param[key]},
3157
         self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams,
3158
         node_group_uuids,
3159
         groups_config))
3160
    else:
3161
      vf_nvinfo = all_nvinfo
3162
      vf_node_info = self.my_node_info.values()
3163

    
3164
    self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
3165

    
3166
    feedback_fn("* Verifying node status")
3167

    
3168
    refos_img = None
3169

    
3170
    for node_i in node_data_list:
3171
      nimg = node_image[node_i.uuid]
3172

    
3173
      if node_i.offline:
3174
        if verbose:
3175
          feedback_fn("* Skipping offline node %s" % (node_i.name,))
3176
        n_offline += 1
3177
        continue
3178

    
3179
      if node_i.uuid == master_node_uuid:
3180
        ntype = "master"
3181
      elif node_i.master_candidate:
3182
        ntype = "master candidate"
3183
      elif node_i.drained:
3184
        ntype = "drained"
3185
        n_drained += 1
3186
      else:
3187
        ntype = "regular"
3188
      if verbose:
3189
        feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
3190

    
3191
      msg = all_nvinfo[node_i.uuid].fail_msg
3192
      self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
3193
                    "while contacting node: %s", msg)
3194
      if msg:
3195
        nimg.rpc_fail = True
3196
        continue
3197

    
3198
      nresult = all_nvinfo[node_i.uuid].payload
3199

    
3200
      nimg.call_ok = self._VerifyNode(node_i, nresult)
3201
      self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
3202
      self._VerifyNodeNetwork(node_i, nresult)
3203
      self._VerifyNodeUserScripts(node_i, nresult)
3204
      self._VerifyOob(node_i, nresult)
3205
      self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3206
                                           node_i.uuid == master_node_uuid)
3207
      self._VerifyFileStoragePaths(node_i, nresult)
3208
      self._VerifySharedFileStoragePaths(node_i, nresult)
3209

    
3210
      if nimg.vm_capable:
3211
        self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3212
        self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3213
                             all_drbd_map)
3214

    
3215
        self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3216
        self._UpdateNodeInstances(node_i, nresult, nimg)
3217
        self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3218
        self._UpdateNodeOS(node_i, nresult, nimg)
3219

    
3220
        if not nimg.os_fail:
3221
          if refos_img is None:
3222
            refos_img = nimg
3223
          self._VerifyNodeOS(node_i, nimg, refos_img)
3224
        self._VerifyNodeBridges(node_i, nresult, bridges)
3225

    
3226
        # Check whether all running instances are primary for the node. (This
3227
        # can no longer be done from _VerifyInstance below, since some of the
3228
        # wrong instances could be from other node groups.)
3229
        non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3230

    
3231
        for inst_uuid in non_primary_inst_uuids:
3232
          test = inst_uuid in self.all_inst_info
3233
          self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3234
                        self.cfg.GetInstanceName(inst_uuid),
3235
                        "instance should not run on node %s", node_i.name)
3236
          self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3237
                        "node is running unknown instance %s", inst_uuid)
3238

    
3239
    self._VerifyGroupDRBDVersion(all_nvinfo)
3240
    self._VerifyGroupLVM(node_image, vg_name)
3241

    
3242
    for node_uuid, result in extra_lv_nvinfo.items():
3243
      self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3244
                              node_image[node_uuid], vg_name)
3245

    
3246
    feedback_fn("* Verifying instance status")
3247
    for inst_uuid in self.my_inst_uuids:
3248
      instance = self.my_inst_info[inst_uuid]
3249
      if verbose:
3250
        feedback_fn("* Verifying instance %s" % instance.name)
3251
      self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3252

    
3253
      # If the instance is non-redundant we cannot survive losing its primary
3254
      # node, so we are not N+1 compliant.
3255
      if instance.disk_template not in constants.DTS_MIRRORED:
3256
        i_non_redundant.append(instance)
3257

    
3258
      if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3259
        i_non_a_balanced.append(instance)
3260

    
3261
    feedback_fn("* Verifying orphan volumes")
3262
    reserved = utils.FieldSet(*cluster.reserved_lvs)
3263

    
3264
    # We will get spurious "unknown volume" warnings if any node of this group
3265
    # is secondary for an instance whose primary is in another group. To avoid
3266
    # them, we find these instances and add their volumes to node_vol_should.
3267
    for instance in self.all_inst_info.values():
3268
      for secondary in instance.secondary_nodes:
3269
        if (secondary in self.my_node_info
3270
            and instance.name not in self.my_inst_info):
3271
          instance.MapLVsByNode(node_vol_should)
3272
          break
3273

    
3274
    self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3275

    
3276
    if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3277
      feedback_fn("* Verifying N+1 Memory redundancy")
3278
      self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3279

    
3280
    feedback_fn("* Other Notes")
3281
    if i_non_redundant:
3282
      feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3283
                  % len(i_non_redundant))
3284

    
3285
    if i_non_a_balanced:
3286
      feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3287
                  % len(i_non_a_balanced))
3288

    
3289
    if i_offline:
3290
      feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3291

    
3292
    if n_offline:
3293
      feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3294

    
3295
    if n_drained:
3296
      feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3297

    
3298
    return not self.bad
3299

    
3300
  def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3301
    """Analyze the post-hooks' result
3302

3303
    This method analyses the hook result, handles it, and sends some
3304
    nicely-formatted feedback back to the user.
3305

3306
    @param phase: one of L{constants.HOOKS_PHASE_POST} or
3307
        L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3308
    @param hooks_results: the results of the multi-node hooks rpc call
3309
    @param feedback_fn: function used send feedback back to the caller
3310
    @param lu_result: previous Exec result
3311
    @return: the new Exec result, based on the previous result
3312
        and hook results
3313

3314
    """
3315
    # We only really run POST phase hooks, only for non-empty groups,
3316
    # and are only interested in their results
3317
    if not self.my_node_uuids:
3318
      # empty node group
3319
      pass
3320
    elif phase == constants.HOOKS_PHASE_POST:
3321
      # Used to change hooks' output to proper indentation
3322
      feedback_fn("* Hooks Results")
3323
      assert hooks_results, "invalid result from hooks"
3324

    
3325
      for node_name in hooks_results:
3326
        res = hooks_results[node_name]
3327
        msg = res.fail_msg
3328
        test = msg and not res.offline
3329
        self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3330
                      "Communication failure in hooks execution: %s", msg)
3331
        if res.offline or msg:
3332
          # No need to investigate payload if node is offline or gave
3333
          # an error.
3334
          continue
3335
        for script, hkr, output in res.payload:
3336
          test = hkr == constants.HKR_FAIL
3337
          self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3338
                        "Script %s failed, output:", script)
3339
          if test:
3340
            output = self._HOOKS_INDENT_RE.sub("      ", output)
3341
            feedback_fn("%s" % output)
3342
            lu_result = False
3343

    
3344
    return lu_result
3345

    
3346

    
3347
class LUClusterVerifyDisks(NoHooksLU):
3348
  """Verifies the cluster disks status.
3349

3350
  """
3351
  REQ_BGL = False
3352

    
3353
  def ExpandNames(self):
3354
    self.share_locks = ShareAll()
3355
    self.needed_locks = {
3356
      locking.LEVEL_NODEGROUP: locking.ALL_SET,
3357
      }
3358

    
3359
  def Exec(self, feedback_fn):
3360
    group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3361

    
3362
    # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3363
    return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3364
                           for group in group_names])