cluster verify: adjust path verification
[ganeti-local] / lib / cmdlib / cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with the cluster."""
23
24 import OpenSSL
25
26 import copy
27 import itertools
28 import logging
29 import operator
30 import os
31 import re
32 import time
33
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
51
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53   ResultWithJobs
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55   ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56   GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57   GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58   CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59   ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60
61 import ganeti.masterd.instance
62
63
64 class LUClusterActivateMasterIp(NoHooksLU):
65   """Activate the master IP on the master node.
66
67   """
68   def Exec(self, feedback_fn):
69     """Activate the master IP.
70
71     """
72     master_params = self.cfg.GetMasterNetworkParameters()
73     ems = self.cfg.GetUseExternalMipScript()
74     result = self.rpc.call_node_activate_master_ip(master_params.uuid,
75                                                    master_params, ems)
76     result.Raise("Could not activate the master IP")
77
78
79 class LUClusterDeactivateMasterIp(NoHooksLU):
80   """Deactivate the master IP on the master node.
81
82   """
83   def Exec(self, feedback_fn):
84     """Deactivate the master IP.
85
86     """
87     master_params = self.cfg.GetMasterNetworkParameters()
88     ems = self.cfg.GetUseExternalMipScript()
89     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
90                                                      master_params, ems)
91     result.Raise("Could not deactivate the master IP")
92
93
94 class LUClusterConfigQuery(NoHooksLU):
95   """Return configuration values.
96
97   """
98   REQ_BGL = False
99
100   def CheckArguments(self):
101     self.cq = ClusterQuery(None, self.op.output_fields, False)
102
103   def ExpandNames(self):
104     self.cq.ExpandNames(self)
105
106   def DeclareLocks(self, level):
107     self.cq.DeclareLocks(self, level)
108
109   def Exec(self, feedback_fn):
110     result = self.cq.OldStyleQuery(self)
111
112     assert len(result) == 1
113
114     return result[0]
115
116
117 class LUClusterDestroy(LogicalUnit):
118   """Logical unit for destroying the cluster.
119
120   """
121   HPATH = "cluster-destroy"
122   HTYPE = constants.HTYPE_CLUSTER
123
124   def BuildHooksEnv(self):
125     """Build hooks env.
126
127     """
128     return {
129       "OP_TARGET": self.cfg.GetClusterName(),
130       }
131
132   def BuildHooksNodes(self):
133     """Build hooks nodes.
134
135     """
136     return ([], [])
137
138   def CheckPrereq(self):
139     """Check prerequisites.
140
141     This checks whether the cluster is empty.
142
143     Any errors are signaled by raising errors.OpPrereqError.
144
145     """
146     master = self.cfg.GetMasterNode()
147
148     nodelist = self.cfg.GetNodeList()
149     if len(nodelist) != 1 or nodelist[0] != master:
150       raise errors.OpPrereqError("There are still %d node(s) in"
151                                  " this cluster." % (len(nodelist) - 1),
152                                  errors.ECODE_INVAL)
153     instancelist = self.cfg.GetInstanceList()
154     if instancelist:
155       raise errors.OpPrereqError("There are still %d instance(s) in"
156                                  " this cluster." % len(instancelist),
157                                  errors.ECODE_INVAL)
158
159   def Exec(self, feedback_fn):
160     """Destroys the cluster.
161
162     """
163     master_params = self.cfg.GetMasterNetworkParameters()
164
165     # Run post hooks on master node before it's removed
166     RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
167
168     ems = self.cfg.GetUseExternalMipScript()
169     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
170                                                      master_params, ems)
171     result.Warn("Error disabling the master IP address", self.LogWarning)
172     return master_params.uuid
173
174
175 class LUClusterPostInit(LogicalUnit):
176   """Logical unit for running hooks after cluster initialization.
177
178   """
179   HPATH = "cluster-init"
180   HTYPE = constants.HTYPE_CLUSTER
181
182   def BuildHooksEnv(self):
183     """Build hooks env.
184
185     """
186     return {
187       "OP_TARGET": self.cfg.GetClusterName(),
188       }
189
190   def BuildHooksNodes(self):
191     """Build hooks nodes.
192
193     """
194     return ([], [self.cfg.GetMasterNode()])
195
196   def Exec(self, feedback_fn):
197     """Nothing to do.
198
199     """
200     return True
201
202
203 class ClusterQuery(QueryBase):
204   FIELDS = query.CLUSTER_FIELDS
205
206   #: Do not sort (there is only one item)
207   SORT_FIELD = None
208
209   def ExpandNames(self, lu):
210     lu.needed_locks = {}
211
212     # The following variables interact with _QueryBase._GetNames
213     self.wanted = locking.ALL_SET
214     self.do_locking = self.use_locking
215
216     if self.do_locking:
217       raise errors.OpPrereqError("Can not use locking for cluster queries",
218                                  errors.ECODE_INVAL)
219
220   def DeclareLocks(self, lu, level):
221     pass
222
223   def _GetQueryData(self, lu):
224     """Computes the list of nodes and their attributes.
225
226     """
227     # Locking is not used
228     assert not (compat.any(lu.glm.is_owned(level)
229                            for level in locking.LEVELS
230                            if level != locking.LEVEL_CLUSTER) or
231                 self.do_locking or self.use_locking)
232
233     if query.CQ_CONFIG in self.requested_data:
234       cluster = lu.cfg.GetClusterInfo()
235       nodes = lu.cfg.GetAllNodesInfo()
236     else:
237       cluster = NotImplemented
238       nodes = NotImplemented
239
240     if query.CQ_QUEUE_DRAINED in self.requested_data:
241       drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
242     else:
243       drain_flag = NotImplemented
244
245     if query.CQ_WATCHER_PAUSE in self.requested_data:
246       master_node_uuid = lu.cfg.GetMasterNode()
247
248       result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249       result.Raise("Can't retrieve watcher pause from master node '%s'" %
250                    lu.cfg.GetMasterNodeName())
251
252       watcher_pause = result.payload
253     else:
254       watcher_pause = NotImplemented
255
256     return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
257
258
259 class LUClusterQuery(NoHooksLU):
260   """Query cluster configuration.
261
262   """
263   REQ_BGL = False
264
265   def ExpandNames(self):
266     self.needed_locks = {}
267
268   def Exec(self, feedback_fn):
269     """Return cluster config.
270
271     """
272     cluster = self.cfg.GetClusterInfo()
273     os_hvp = {}
274
275     # Filter just for enabled hypervisors
276     for os_name, hv_dict in cluster.os_hvp.items():
277       os_hvp[os_name] = {}
278       for hv_name, hv_params in hv_dict.items():
279         if hv_name in cluster.enabled_hypervisors:
280           os_hvp[os_name][hv_name] = hv_params
281
282     # Convert ip_family to ip_version
283     primary_ip_version = constants.IP4_VERSION
284     if cluster.primary_ip_family == netutils.IP6Address.family:
285       primary_ip_version = constants.IP6_VERSION
286
287     result = {
288       "software_version": constants.RELEASE_VERSION,
289       "protocol_version": constants.PROTOCOL_VERSION,
290       "config_version": constants.CONFIG_VERSION,
291       "os_api_version": max(constants.OS_API_VERSIONS),
292       "export_version": constants.EXPORT_VERSION,
293       "architecture": runtime.GetArchInfo(),
294       "name": cluster.cluster_name,
295       "master": self.cfg.GetMasterNodeName(),
296       "default_hypervisor": cluster.primary_hypervisor,
297       "enabled_hypervisors": cluster.enabled_hypervisors,
298       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
299                         for hypervisor_name in cluster.enabled_hypervisors]),
300       "os_hvp": os_hvp,
301       "beparams": cluster.beparams,
302       "osparams": cluster.osparams,
303       "ipolicy": cluster.ipolicy,
304       "nicparams": cluster.nicparams,
305       "ndparams": cluster.ndparams,
306       "diskparams": cluster.diskparams,
307       "candidate_pool_size": cluster.candidate_pool_size,
308       "master_netdev": cluster.master_netdev,
309       "master_netmask": cluster.master_netmask,
310       "use_external_mip_script": cluster.use_external_mip_script,
311       "volume_group_name": cluster.volume_group_name,
312       "drbd_usermode_helper": cluster.drbd_usermode_helper,
313       "file_storage_dir": cluster.file_storage_dir,
314       "shared_file_storage_dir": cluster.shared_file_storage_dir,
315       "maintain_node_health": cluster.maintain_node_health,
316       "ctime": cluster.ctime,
317       "mtime": cluster.mtime,
318       "uuid": cluster.uuid,
319       "tags": list(cluster.GetTags()),
320       "uid_pool": cluster.uid_pool,
321       "default_iallocator": cluster.default_iallocator,
322       "reserved_lvs": cluster.reserved_lvs,
323       "primary_ip_version": primary_ip_version,
324       "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
325       "hidden_os": cluster.hidden_os,
326       "blacklisted_os": cluster.blacklisted_os,
327       "enabled_disk_templates": cluster.enabled_disk_templates,
328       }
329
330     return result
331
332
333 class LUClusterRedistConf(NoHooksLU):
334   """Force the redistribution of cluster configuration.
335
336   This is a very simple LU.
337
338   """
339   REQ_BGL = False
340
341   def ExpandNames(self):
342     self.needed_locks = {
343       locking.LEVEL_NODE: locking.ALL_SET,
344       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
345     }
346     self.share_locks = ShareAll()
347
348   def Exec(self, feedback_fn):
349     """Redistribute the configuration.
350
351     """
352     self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
353     RedistributeAncillaryFiles(self)
354
355
356 class LUClusterRename(LogicalUnit):
357   """Rename the cluster.
358
359   """
360   HPATH = "cluster-rename"
361   HTYPE = constants.HTYPE_CLUSTER
362
363   def BuildHooksEnv(self):
364     """Build hooks env.
365
366     """
367     return {
368       "OP_TARGET": self.cfg.GetClusterName(),
369       "NEW_NAME": self.op.name,
370       }
371
372   def BuildHooksNodes(self):
373     """Build hooks nodes.
374
375     """
376     return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
377
378   def CheckPrereq(self):
379     """Verify that the passed name is a valid one.
380
381     """
382     hostname = netutils.GetHostname(name=self.op.name,
383                                     family=self.cfg.GetPrimaryIPFamily())
384
385     new_name = hostname.name
386     self.ip = new_ip = hostname.ip
387     old_name = self.cfg.GetClusterName()
388     old_ip = self.cfg.GetMasterIP()
389     if new_name == old_name and new_ip == old_ip:
390       raise errors.OpPrereqError("Neither the name nor the IP address of the"
391                                  " cluster has changed",
392                                  errors.ECODE_INVAL)
393     if new_ip != old_ip:
394       if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
395         raise errors.OpPrereqError("The given cluster IP address (%s) is"
396                                    " reachable on the network" %
397                                    new_ip, errors.ECODE_NOTUNIQUE)
398
399     self.op.name = new_name
400
401   def Exec(self, feedback_fn):
402     """Rename the cluster.
403
404     """
405     clustername = self.op.name
406     new_ip = self.ip
407
408     # shutdown the master IP
409     master_params = self.cfg.GetMasterNetworkParameters()
410     ems = self.cfg.GetUseExternalMipScript()
411     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
412                                                      master_params, ems)
413     result.Raise("Could not disable the master role")
414
415     try:
416       cluster = self.cfg.GetClusterInfo()
417       cluster.cluster_name = clustername
418       cluster.master_ip = new_ip
419       self.cfg.Update(cluster, feedback_fn)
420
421       # update the known hosts file
422       ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
423       node_list = self.cfg.GetOnlineNodeList()
424       try:
425         node_list.remove(master_params.uuid)
426       except ValueError:
427         pass
428       UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
429     finally:
430       master_params.ip = new_ip
431       result = self.rpc.call_node_activate_master_ip(master_params.uuid,
432                                                      master_params, ems)
433       result.Warn("Could not re-enable the master role on the master,"
434                   " please restart manually", self.LogWarning)
435
436     return clustername
437
438
439 class LUClusterRepairDiskSizes(NoHooksLU):
440   """Verifies the cluster disks sizes.
441
442   """
443   REQ_BGL = False
444
445   def ExpandNames(self):
446     if self.op.instances:
447       (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
448       # Not getting the node allocation lock as only a specific set of
449       # instances (and their nodes) is going to be acquired
450       self.needed_locks = {
451         locking.LEVEL_NODE_RES: [],
452         locking.LEVEL_INSTANCE: self.wanted_names,
453         }
454       self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
455     else:
456       self.wanted_names = None
457       self.needed_locks = {
458         locking.LEVEL_NODE_RES: locking.ALL_SET,
459         locking.LEVEL_INSTANCE: locking.ALL_SET,
460
461         # This opcode is acquires the node locks for all instances
462         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
463         }
464
465     self.share_locks = {
466       locking.LEVEL_NODE_RES: 1,
467       locking.LEVEL_INSTANCE: 0,
468       locking.LEVEL_NODE_ALLOC: 1,
469       }
470
471   def DeclareLocks(self, level):
472     if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
473       self._LockInstancesNodes(primary_only=True, level=level)
474
475   def CheckPrereq(self):
476     """Check prerequisites.
477
478     This only checks the optional instance list against the existing names.
479
480     """
481     if self.wanted_names is None:
482       self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
483
484     self.wanted_instances = \
485         map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
486
487   def _EnsureChildSizes(self, disk):
488     """Ensure children of the disk have the needed disk size.
489
490     This is valid mainly for DRBD8 and fixes an issue where the
491     children have smaller disk size.
492
493     @param disk: an L{ganeti.objects.Disk} object
494
495     """
496     if disk.dev_type == constants.LD_DRBD8:
497       assert disk.children, "Empty children for DRBD8?"
498       fchild = disk.children[0]
499       mismatch = fchild.size < disk.size
500       if mismatch:
501         self.LogInfo("Child disk has size %d, parent %d, fixing",
502                      fchild.size, disk.size)
503         fchild.size = disk.size
504
505       # and we recurse on this child only, not on the metadev
506       return self._EnsureChildSizes(fchild) or mismatch
507     else:
508       return False
509
510   def Exec(self, feedback_fn):
511     """Verify the size of cluster disks.
512
513     """
514     # TODO: check child disks too
515     # TODO: check differences in size between primary/secondary nodes
516     per_node_disks = {}
517     for instance in self.wanted_instances:
518       pnode = instance.primary_node
519       if pnode not in per_node_disks:
520         per_node_disks[pnode] = []
521       for idx, disk in enumerate(instance.disks):
522         per_node_disks[pnode].append((instance, idx, disk))
523
524     assert not (frozenset(per_node_disks.keys()) -
525                 self.owned_locks(locking.LEVEL_NODE_RES)), \
526       "Not owning correct locks"
527     assert not self.owned_locks(locking.LEVEL_NODE)
528
529     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
530                                                per_node_disks.keys())
531
532     changed = []
533     for node_uuid, dskl in per_node_disks.items():
534       newl = [v[2].Copy() for v in dskl]
535       for dsk in newl:
536         self.cfg.SetDiskID(dsk, node_uuid)
537       node_name = self.cfg.GetNodeName(node_uuid)
538       result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
539       if result.fail_msg:
540         self.LogWarning("Failure in blockdev_getdimensions call to node"
541                         " %s, ignoring", node_name)
542         continue
543       if len(result.payload) != len(dskl):
544         logging.warning("Invalid result from node %s: len(dksl)=%d,"
545                         " result.payload=%s", node_name, len(dskl),
546                         result.payload)
547         self.LogWarning("Invalid result from node %s, ignoring node results",
548                         node_name)
549         continue
550       for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
551         if dimensions is None:
552           self.LogWarning("Disk %d of instance %s did not return size"
553                           " information, ignoring", idx, instance.name)
554           continue
555         if not isinstance(dimensions, (tuple, list)):
556           self.LogWarning("Disk %d of instance %s did not return valid"
557                           " dimension information, ignoring", idx,
558                           instance.name)
559           continue
560         (size, spindles) = dimensions
561         if not isinstance(size, (int, long)):
562           self.LogWarning("Disk %d of instance %s did not return valid"
563                           " size information, ignoring", idx, instance.name)
564           continue
565         size = size >> 20
566         if size != disk.size:
567           self.LogInfo("Disk %d of instance %s has mismatched size,"
568                        " correcting: recorded %d, actual %d", idx,
569                        instance.name, disk.size, size)
570           disk.size = size
571           self.cfg.Update(instance, feedback_fn)
572           changed.append((instance.name, idx, "size", size))
573         if es_flags[node_uuid]:
574           if spindles is None:
575             self.LogWarning("Disk %d of instance %s did not return valid"
576                             " spindles information, ignoring", idx,
577                             instance.name)
578           elif disk.spindles is None or disk.spindles != spindles:
579             self.LogInfo("Disk %d of instance %s has mismatched spindles,"
580                          " correcting: recorded %s, actual %s",
581                          idx, instance.name, disk.spindles, spindles)
582             disk.spindles = spindles
583             self.cfg.Update(instance, feedback_fn)
584             changed.append((instance.name, idx, "spindles", disk.spindles))
585         if self._EnsureChildSizes(disk):
586           self.cfg.Update(instance, feedback_fn)
587           changed.append((instance.name, idx, "size", disk.size))
588     return changed
589
590
591 def _ValidateNetmask(cfg, netmask):
592   """Checks if a netmask is valid.
593
594   @type cfg: L{config.ConfigWriter}
595   @param cfg: The cluster configuration
596   @type netmask: int
597   @param netmask: the netmask to be verified
598   @raise errors.OpPrereqError: if the validation fails
599
600   """
601   ip_family = cfg.GetPrimaryIPFamily()
602   try:
603     ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
604   except errors.ProgrammerError:
605     raise errors.OpPrereqError("Invalid primary ip family: %s." %
606                                ip_family, errors.ECODE_INVAL)
607   if not ipcls.ValidateNetmask(netmask):
608     raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
609                                (netmask), errors.ECODE_INVAL)
610
611
612 class LUClusterSetParams(LogicalUnit):
613   """Change the parameters of the cluster.
614
615   """
616   HPATH = "cluster-modify"
617   HTYPE = constants.HTYPE_CLUSTER
618   REQ_BGL = False
619
620   def CheckArguments(self):
621     """Check parameters
622
623     """
624     if self.op.uid_pool:
625       uidpool.CheckUidPool(self.op.uid_pool)
626
627     if self.op.add_uids:
628       uidpool.CheckUidPool(self.op.add_uids)
629
630     if self.op.remove_uids:
631       uidpool.CheckUidPool(self.op.remove_uids)
632
633     if self.op.master_netmask is not None:
634       _ValidateNetmask(self.cfg, self.op.master_netmask)
635
636     if self.op.diskparams:
637       for dt_params in self.op.diskparams.values():
638         utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
639       try:
640         utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
641       except errors.OpPrereqError, err:
642         raise errors.OpPrereqError("While verify diskparams options: %s" % err,
643                                    errors.ECODE_INVAL)
644
645   def ExpandNames(self):
646     # FIXME: in the future maybe other cluster params won't require checking on
647     # all nodes to be modified.
648     # FIXME: This opcode changes cluster-wide settings. Is acquiring all
649     # resource locks the right thing, shouldn't it be the BGL instead?
650     self.needed_locks = {
651       locking.LEVEL_NODE: locking.ALL_SET,
652       locking.LEVEL_INSTANCE: locking.ALL_SET,
653       locking.LEVEL_NODEGROUP: locking.ALL_SET,
654       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
655     }
656     self.share_locks = ShareAll()
657
658   def BuildHooksEnv(self):
659     """Build hooks env.
660
661     """
662     return {
663       "OP_TARGET": self.cfg.GetClusterName(),
664       "NEW_VG_NAME": self.op.vg_name,
665       }
666
667   def BuildHooksNodes(self):
668     """Build hooks nodes.
669
670     """
671     mn = self.cfg.GetMasterNode()
672     return ([mn], [mn])
673
674   def _CheckVgName(self, node_uuids, enabled_disk_templates,
675                    new_enabled_disk_templates):
676     """Check the consistency of the vg name on all nodes and in case it gets
677        unset whether there are instances still using it.
678
679     """
680     if self.op.vg_name is not None and not self.op.vg_name:
681       if self.cfg.HasAnyDiskOfType(constants.LD_LV):
682         raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
683                                    " instances exist", errors.ECODE_INVAL)
684
685     if (self.op.vg_name is not None and
686         utils.IsLvmEnabled(enabled_disk_templates)) or \
687            (self.cfg.GetVGName() is not None and
688             utils.LvmGetsEnabled(enabled_disk_templates,
689                                  new_enabled_disk_templates)):
690       self._CheckVgNameOnNodes(node_uuids)
691
692   def _CheckVgNameOnNodes(self, node_uuids):
693     """Check the status of the volume group on each node.
694
695     """
696     vglist = self.rpc.call_vg_list(node_uuids)
697     for node_uuid in node_uuids:
698       msg = vglist[node_uuid].fail_msg
699       if msg:
700         # ignoring down node
701         self.LogWarning("Error while gathering data on node %s"
702                         " (ignoring node): %s",
703                         self.cfg.GetNodeName(node_uuid), msg)
704         continue
705       vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
706                                             self.op.vg_name,
707                                             constants.MIN_VG_SIZE)
708       if vgstatus:
709         raise errors.OpPrereqError("Error on node '%s': %s" %
710                                    (self.cfg.GetNodeName(node_uuid), vgstatus),
711                                    errors.ECODE_ENVIRON)
712
713   def _GetEnabledDiskTemplates(self, cluster):
714     """Determines the enabled disk templates and the subset of disk templates
715        that are newly enabled by this operation.
716
717     """
718     enabled_disk_templates = None
719     new_enabled_disk_templates = []
720     if self.op.enabled_disk_templates:
721       enabled_disk_templates = self.op.enabled_disk_templates
722       new_enabled_disk_templates = \
723         list(set(enabled_disk_templates)
724              - set(cluster.enabled_disk_templates))
725     else:
726       enabled_disk_templates = cluster.enabled_disk_templates
727     return (enabled_disk_templates, new_enabled_disk_templates)
728
729   def CheckPrereq(self):
730     """Check prerequisites.
731
732     This checks whether the given params don't conflict and
733     if the given volume group is valid.
734
735     """
736     if self.op.drbd_helper is not None and not self.op.drbd_helper:
737       if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
738         raise errors.OpPrereqError("Cannot disable drbd helper while"
739                                    " drbd-based instances exist",
740                                    errors.ECODE_INVAL)
741
742     node_uuids = self.owned_locks(locking.LEVEL_NODE)
743     self.cluster = cluster = self.cfg.GetClusterInfo()
744
745     vm_capable_node_uuids = [node.uuid
746                              for node in self.cfg.GetAllNodesInfo().values()
747                              if node.uuid in node_uuids and node.vm_capable]
748
749     (enabled_disk_templates, new_enabled_disk_templates) = \
750       self._GetEnabledDiskTemplates(cluster)
751
752     self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
753                       new_enabled_disk_templates)
754
755     if self.op.drbd_helper:
756       # checks given drbd helper on all nodes
757       helpers = self.rpc.call_drbd_helper(node_uuids)
758       for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
759         if ninfo.offline:
760           self.LogInfo("Not checking drbd helper on offline node %s",
761                        ninfo.name)
762           continue
763         msg = helpers[ninfo.uuid].fail_msg
764         if msg:
765           raise errors.OpPrereqError("Error checking drbd helper on node"
766                                      " '%s': %s" % (ninfo.name, msg),
767                                      errors.ECODE_ENVIRON)
768         node_helper = helpers[ninfo.uuid].payload
769         if node_helper != self.op.drbd_helper:
770           raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
771                                      (ninfo.name, node_helper),
772                                      errors.ECODE_ENVIRON)
773
774     # validate params changes
775     if self.op.beparams:
776       objects.UpgradeBeParams(self.op.beparams)
777       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
778       self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
779
780     if self.op.ndparams:
781       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
782       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
783
784       # TODO: we need a more general way to handle resetting
785       # cluster-level parameters to default values
786       if self.new_ndparams["oob_program"] == "":
787         self.new_ndparams["oob_program"] = \
788             constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
789
790     if self.op.hv_state:
791       new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
792                                            self.cluster.hv_state_static)
793       self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
794                                for hv, values in new_hv_state.items())
795
796     if self.op.disk_state:
797       new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
798                                                self.cluster.disk_state_static)
799       self.new_disk_state = \
800         dict((storage, dict((name, cluster.SimpleFillDiskState(values))
801                             for name, values in svalues.items()))
802              for storage, svalues in new_disk_state.items())
803
804     if self.op.ipolicy:
805       self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
806                                            group_policy=False)
807
808       all_instances = self.cfg.GetAllInstancesInfo().values()
809       violations = set()
810       for group in self.cfg.GetAllNodeGroupsInfo().values():
811         instances = frozenset([inst for inst in all_instances
812                                if compat.any(nuuid in group.members
813                                              for nuuid in inst.all_nodes)])
814         new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
815         ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
816         new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
817                                            self.cfg)
818         if new:
819           violations.update(new)
820
821       if violations:
822         self.LogWarning("After the ipolicy change the following instances"
823                         " violate them: %s",
824                         utils.CommaJoin(utils.NiceSort(violations)))
825
826     if self.op.nicparams:
827       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
828       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
829       objects.NIC.CheckParameterSyntax(self.new_nicparams)
830       nic_errors = []
831
832       # check all instances for consistency
833       for instance in self.cfg.GetAllInstancesInfo().values():
834         for nic_idx, nic in enumerate(instance.nics):
835           params_copy = copy.deepcopy(nic.nicparams)
836           params_filled = objects.FillDict(self.new_nicparams, params_copy)
837
838           # check parameter syntax
839           try:
840             objects.NIC.CheckParameterSyntax(params_filled)
841           except errors.ConfigurationError, err:
842             nic_errors.append("Instance %s, nic/%d: %s" %
843                               (instance.name, nic_idx, err))
844
845           # if we're moving instances to routed, check that they have an ip
846           target_mode = params_filled[constants.NIC_MODE]
847           if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
848             nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
849                               " address" % (instance.name, nic_idx))
850       if nic_errors:
851         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
852                                    "\n".join(nic_errors), errors.ECODE_INVAL)
853
854     # hypervisor list/parameters
855     self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
856     if self.op.hvparams:
857       for hv_name, hv_dict in self.op.hvparams.items():
858         if hv_name not in self.new_hvparams:
859           self.new_hvparams[hv_name] = hv_dict
860         else:
861           self.new_hvparams[hv_name].update(hv_dict)
862
863     # disk template parameters
864     self.new_diskparams = objects.FillDict(cluster.diskparams, {})
865     if self.op.diskparams:
866       for dt_name, dt_params in self.op.diskparams.items():
867         if dt_name not in self.op.diskparams:
868           self.new_diskparams[dt_name] = dt_params
869         else:
870           self.new_diskparams[dt_name].update(dt_params)
871
872     # os hypervisor parameters
873     self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
874     if self.op.os_hvp:
875       for os_name, hvs in self.op.os_hvp.items():
876         if os_name not in self.new_os_hvp:
877           self.new_os_hvp[os_name] = hvs
878         else:
879           for hv_name, hv_dict in hvs.items():
880             if hv_dict is None:
881               # Delete if it exists
882               self.new_os_hvp[os_name].pop(hv_name, None)
883             elif hv_name not in self.new_os_hvp[os_name]:
884               self.new_os_hvp[os_name][hv_name] = hv_dict
885             else:
886               self.new_os_hvp[os_name][hv_name].update(hv_dict)
887
888     # os parameters
889     self.new_osp = objects.FillDict(cluster.osparams, {})
890     if self.op.osparams:
891       for os_name, osp in self.op.osparams.items():
892         if os_name not in self.new_osp:
893           self.new_osp[os_name] = {}
894
895         self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
896                                                  use_none=True)
897
898         if not self.new_osp[os_name]:
899           # we removed all parameters
900           del self.new_osp[os_name]
901         else:
902           # check the parameter validity (remote check)
903           CheckOSParams(self, False, [self.cfg.GetMasterNode()],
904                         os_name, self.new_osp[os_name])
905
906     # changes to the hypervisor list
907     if self.op.enabled_hypervisors is not None:
908       self.hv_list = self.op.enabled_hypervisors
909       for hv in self.hv_list:
910         # if the hypervisor doesn't already exist in the cluster
911         # hvparams, we initialize it to empty, and then (in both
912         # cases) we make sure to fill the defaults, as we might not
913         # have a complete defaults list if the hypervisor wasn't
914         # enabled before
915         if hv not in new_hvp:
916           new_hvp[hv] = {}
917         new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
918         utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
919     else:
920       self.hv_list = cluster.enabled_hypervisors
921
922     if self.op.hvparams or self.op.enabled_hypervisors is not None:
923       # either the enabled list has changed, or the parameters have, validate
924       for hv_name, hv_params in self.new_hvparams.items():
925         if ((self.op.hvparams and hv_name in self.op.hvparams) or
926             (self.op.enabled_hypervisors and
927              hv_name in self.op.enabled_hypervisors)):
928           # either this is a new hypervisor, or its parameters have changed
929           hv_class = hypervisor.GetHypervisorClass(hv_name)
930           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
931           hv_class.CheckParameterSyntax(hv_params)
932           CheckHVParams(self, node_uuids, hv_name, hv_params)
933
934     self._CheckDiskTemplateConsistency()
935
936     if self.op.os_hvp:
937       # no need to check any newly-enabled hypervisors, since the
938       # defaults have already been checked in the above code-block
939       for os_name, os_hvp in self.new_os_hvp.items():
940         for hv_name, hv_params in os_hvp.items():
941           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
942           # we need to fill in the new os_hvp on top of the actual hv_p
943           cluster_defaults = self.new_hvparams.get(hv_name, {})
944           new_osp = objects.FillDict(cluster_defaults, hv_params)
945           hv_class = hypervisor.GetHypervisorClass(hv_name)
946           hv_class.CheckParameterSyntax(new_osp)
947           CheckHVParams(self, node_uuids, hv_name, new_osp)
948
949     if self.op.default_iallocator:
950       alloc_script = utils.FindFile(self.op.default_iallocator,
951                                     constants.IALLOCATOR_SEARCH_PATH,
952                                     os.path.isfile)
953       if alloc_script is None:
954         raise errors.OpPrereqError("Invalid default iallocator script '%s'"
955                                    " specified" % self.op.default_iallocator,
956                                    errors.ECODE_INVAL)
957
958   def _CheckDiskTemplateConsistency(self):
959     """Check whether the disk templates that are going to be disabled
960        are still in use by some instances.
961
962     """
963     if self.op.enabled_disk_templates:
964       cluster = self.cfg.GetClusterInfo()
965       instances = self.cfg.GetAllInstancesInfo()
966
967       disk_templates_to_remove = set(cluster.enabled_disk_templates) \
968         - set(self.op.enabled_disk_templates)
969       for instance in instances.itervalues():
970         if instance.disk_template in disk_templates_to_remove:
971           raise errors.OpPrereqError("Cannot disable disk template '%s',"
972                                      " because instance '%s' is using it." %
973                                      (instance.disk_template, instance.name))
974
975   def _SetVgName(self, feedback_fn):
976     """Determines and sets the new volume group name.
977
978     """
979     if self.op.vg_name is not None:
980       if self.op.vg_name and not \
981            utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
982         feedback_fn("Note that you specified a volume group, but did not"
983                     " enable any lvm disk template.")
984       new_volume = self.op.vg_name
985       if not new_volume:
986         if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
987           raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
988                                      " disk templates are enabled.")
989         new_volume = None
990       if new_volume != self.cfg.GetVGName():
991         self.cfg.SetVGName(new_volume)
992       else:
993         feedback_fn("Cluster LVM configuration already in desired"
994                     " state, not changing")
995     else:
996       if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
997           not self.cfg.GetVGName():
998         raise errors.OpPrereqError("Please specify a volume group when"
999                                    " enabling lvm-based disk-templates.")
1000
1001   def Exec(self, feedback_fn):
1002     """Change the parameters of the cluster.
1003
1004     """
1005     if self.op.enabled_disk_templates:
1006       self.cluster.enabled_disk_templates = \
1007         list(set(self.op.enabled_disk_templates))
1008
1009     self._SetVgName(feedback_fn)
1010
1011     if self.op.drbd_helper is not None:
1012       if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1013         feedback_fn("Note that you specified a drbd user helper, but did"
1014                     " enabled the drbd disk template.")
1015       new_helper = self.op.drbd_helper
1016       if not new_helper:
1017         new_helper = None
1018       if new_helper != self.cfg.GetDRBDHelper():
1019         self.cfg.SetDRBDHelper(new_helper)
1020       else:
1021         feedback_fn("Cluster DRBD helper already in desired state,"
1022                     " not changing")
1023     if self.op.hvparams:
1024       self.cluster.hvparams = self.new_hvparams
1025     if self.op.os_hvp:
1026       self.cluster.os_hvp = self.new_os_hvp
1027     if self.op.enabled_hypervisors is not None:
1028       self.cluster.hvparams = self.new_hvparams
1029       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1030     if self.op.beparams:
1031       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1032     if self.op.nicparams:
1033       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1034     if self.op.ipolicy:
1035       self.cluster.ipolicy = self.new_ipolicy
1036     if self.op.osparams:
1037       self.cluster.osparams = self.new_osp
1038     if self.op.ndparams:
1039       self.cluster.ndparams = self.new_ndparams
1040     if self.op.diskparams:
1041       self.cluster.diskparams = self.new_diskparams
1042     if self.op.hv_state:
1043       self.cluster.hv_state_static = self.new_hv_state
1044     if self.op.disk_state:
1045       self.cluster.disk_state_static = self.new_disk_state
1046
1047     if self.op.candidate_pool_size is not None:
1048       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1049       # we need to update the pool size here, otherwise the save will fail
1050       AdjustCandidatePool(self, [])
1051
1052     if self.op.maintain_node_health is not None:
1053       if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1054         feedback_fn("Note: CONFD was disabled at build time, node health"
1055                     " maintenance is not useful (still enabling it)")
1056       self.cluster.maintain_node_health = self.op.maintain_node_health
1057
1058     if self.op.prealloc_wipe_disks is not None:
1059       self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1060
1061     if self.op.add_uids is not None:
1062       uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1063
1064     if self.op.remove_uids is not None:
1065       uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1066
1067     if self.op.uid_pool is not None:
1068       self.cluster.uid_pool = self.op.uid_pool
1069
1070     if self.op.default_iallocator is not None:
1071       self.cluster.default_iallocator = self.op.default_iallocator
1072
1073     if self.op.reserved_lvs is not None:
1074       self.cluster.reserved_lvs = self.op.reserved_lvs
1075
1076     if self.op.use_external_mip_script is not None:
1077       self.cluster.use_external_mip_script = self.op.use_external_mip_script
1078
1079     def helper_os(aname, mods, desc):
1080       desc += " OS list"
1081       lst = getattr(self.cluster, aname)
1082       for key, val in mods:
1083         if key == constants.DDM_ADD:
1084           if val in lst:
1085             feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1086           else:
1087             lst.append(val)
1088         elif key == constants.DDM_REMOVE:
1089           if val in lst:
1090             lst.remove(val)
1091           else:
1092             feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1093         else:
1094           raise errors.ProgrammerError("Invalid modification '%s'" % key)
1095
1096     if self.op.hidden_os:
1097       helper_os("hidden_os", self.op.hidden_os, "hidden")
1098
1099     if self.op.blacklisted_os:
1100       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1101
1102     if self.op.master_netdev:
1103       master_params = self.cfg.GetMasterNetworkParameters()
1104       ems = self.cfg.GetUseExternalMipScript()
1105       feedback_fn("Shutting down master ip on the current netdev (%s)" %
1106                   self.cluster.master_netdev)
1107       result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1108                                                        master_params, ems)
1109       if not self.op.force:
1110         result.Raise("Could not disable the master ip")
1111       else:
1112         if result.fail_msg:
1113           msg = ("Could not disable the master ip (continuing anyway): %s" %
1114                  result.fail_msg)
1115           feedback_fn(msg)
1116       feedback_fn("Changing master_netdev from %s to %s" %
1117                   (master_params.netdev, self.op.master_netdev))
1118       self.cluster.master_netdev = self.op.master_netdev
1119
1120     if self.op.master_netmask:
1121       master_params = self.cfg.GetMasterNetworkParameters()
1122       feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1123       result = self.rpc.call_node_change_master_netmask(
1124                  master_params.uuid, master_params.netmask,
1125                  self.op.master_netmask, master_params.ip,
1126                  master_params.netdev)
1127       result.Warn("Could not change the master IP netmask", feedback_fn)
1128       self.cluster.master_netmask = self.op.master_netmask
1129
1130     self.cfg.Update(self.cluster, feedback_fn)
1131
1132     if self.op.master_netdev:
1133       master_params = self.cfg.GetMasterNetworkParameters()
1134       feedback_fn("Starting the master ip on the new master netdev (%s)" %
1135                   self.op.master_netdev)
1136       ems = self.cfg.GetUseExternalMipScript()
1137       result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1138                                                      master_params, ems)
1139       result.Warn("Could not re-enable the master ip on the master,"
1140                   " please restart manually", self.LogWarning)
1141
1142
1143 class LUClusterVerify(NoHooksLU):
1144   """Submits all jobs necessary to verify the cluster.
1145
1146   """
1147   REQ_BGL = False
1148
1149   def ExpandNames(self):
1150     self.needed_locks = {}
1151
1152   def Exec(self, feedback_fn):
1153     jobs = []
1154
1155     if self.op.group_name:
1156       groups = [self.op.group_name]
1157       depends_fn = lambda: None
1158     else:
1159       groups = self.cfg.GetNodeGroupList()
1160
1161       # Verify global configuration
1162       jobs.append([
1163         opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1164         ])
1165
1166       # Always depend on global verification
1167       depends_fn = lambda: [(-len(jobs), [])]
1168
1169     jobs.extend(
1170       [opcodes.OpClusterVerifyGroup(group_name=group,
1171                                     ignore_errors=self.op.ignore_errors,
1172                                     depends=depends_fn())]
1173       for group in groups)
1174
1175     # Fix up all parameters
1176     for op in itertools.chain(*jobs): # pylint: disable=W0142
1177       op.debug_simulate_errors = self.op.debug_simulate_errors
1178       op.verbose = self.op.verbose
1179       op.error_codes = self.op.error_codes
1180       try:
1181         op.skip_checks = self.op.skip_checks
1182       except AttributeError:
1183         assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1184
1185     return ResultWithJobs(jobs)
1186
1187
1188 class _VerifyErrors(object):
1189   """Mix-in for cluster/group verify LUs.
1190
1191   It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1192   self.op and self._feedback_fn to be available.)
1193
1194   """
1195
1196   ETYPE_FIELD = "code"
1197   ETYPE_ERROR = "ERROR"
1198   ETYPE_WARNING = "WARNING"
1199
1200   def _Error(self, ecode, item, msg, *args, **kwargs):
1201     """Format an error message.
1202
1203     Based on the opcode's error_codes parameter, either format a
1204     parseable error code, or a simpler error string.
1205
1206     This must be called only from Exec and functions called from Exec.
1207
1208     """
1209     ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1210     itype, etxt, _ = ecode
1211     # If the error code is in the list of ignored errors, demote the error to a
1212     # warning
1213     if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1214       ltype = self.ETYPE_WARNING
1215     # first complete the msg
1216     if args:
1217       msg = msg % args
1218     # then format the whole message
1219     if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1220       msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1221     else:
1222       if item:
1223         item = " " + item
1224       else:
1225         item = ""
1226       msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1227     # and finally report it via the feedback_fn
1228     self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1229     # do not mark the operation as failed for WARN cases only
1230     if ltype == self.ETYPE_ERROR:
1231       self.bad = True
1232
1233   def _ErrorIf(self, cond, *args, **kwargs):
1234     """Log an error message if the passed condition is True.
1235
1236     """
1237     if (bool(cond)
1238         or self.op.debug_simulate_errors): # pylint: disable=E1101
1239       self._Error(*args, **kwargs)
1240
1241
1242 def _VerifyCertificate(filename):
1243   """Verifies a certificate for L{LUClusterVerifyConfig}.
1244
1245   @type filename: string
1246   @param filename: Path to PEM file
1247
1248   """
1249   try:
1250     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1251                                            utils.ReadFile(filename))
1252   except Exception, err: # pylint: disable=W0703
1253     return (LUClusterVerifyConfig.ETYPE_ERROR,
1254             "Failed to load X509 certificate %s: %s" % (filename, err))
1255
1256   (errcode, msg) = \
1257     utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1258                                 constants.SSL_CERT_EXPIRATION_ERROR)
1259
1260   if msg:
1261     fnamemsg = "While verifying %s: %s" % (filename, msg)
1262   else:
1263     fnamemsg = None
1264
1265   if errcode is None:
1266     return (None, fnamemsg)
1267   elif errcode == utils.CERT_WARNING:
1268     return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1269   elif errcode == utils.CERT_ERROR:
1270     return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1271
1272   raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1273
1274
1275 def _GetAllHypervisorParameters(cluster, instances):
1276   """Compute the set of all hypervisor parameters.
1277
1278   @type cluster: L{objects.Cluster}
1279   @param cluster: the cluster object
1280   @param instances: list of L{objects.Instance}
1281   @param instances: additional instances from which to obtain parameters
1282   @rtype: list of (origin, hypervisor, parameters)
1283   @return: a list with all parameters found, indicating the hypervisor they
1284        apply to, and the origin (can be "cluster", "os X", or "instance Y")
1285
1286   """
1287   hvp_data = []
1288
1289   for hv_name in cluster.enabled_hypervisors:
1290     hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1291
1292   for os_name, os_hvp in cluster.os_hvp.items():
1293     for hv_name, hv_params in os_hvp.items():
1294       if hv_params:
1295         full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1296         hvp_data.append(("os %s" % os_name, hv_name, full_params))
1297
1298   # TODO: collapse identical parameter values in a single one
1299   for instance in instances:
1300     if instance.hvparams:
1301       hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1302                        cluster.FillHV(instance)))
1303
1304   return hvp_data
1305
1306
1307 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1308   """Verifies the cluster config.
1309
1310   """
1311   REQ_BGL = False
1312
1313   def _VerifyHVP(self, hvp_data):
1314     """Verifies locally the syntax of the hypervisor parameters.
1315
1316     """
1317     for item, hv_name, hv_params in hvp_data:
1318       msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1319              (item, hv_name))
1320       try:
1321         hv_class = hypervisor.GetHypervisorClass(hv_name)
1322         utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1323         hv_class.CheckParameterSyntax(hv_params)
1324       except errors.GenericError, err:
1325         self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1326
1327   def ExpandNames(self):
1328     self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1329     self.share_locks = ShareAll()
1330
1331   def CheckPrereq(self):
1332     """Check prerequisites.
1333
1334     """
1335     # Retrieve all information
1336     self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1337     self.all_node_info = self.cfg.GetAllNodesInfo()
1338     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1339
1340   def Exec(self, feedback_fn):
1341     """Verify integrity of cluster, performing various test on nodes.
1342
1343     """
1344     self.bad = False
1345     self._feedback_fn = feedback_fn
1346
1347     feedback_fn("* Verifying cluster config")
1348
1349     for msg in self.cfg.VerifyConfig():
1350       self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1351
1352     feedback_fn("* Verifying cluster certificate files")
1353
1354     for cert_filename in pathutils.ALL_CERT_FILES:
1355       (errcode, msg) = _VerifyCertificate(cert_filename)
1356       self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1357
1358     feedback_fn("* Verifying hypervisor parameters")
1359
1360     self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1361                                                 self.all_inst_info.values()))
1362
1363     feedback_fn("* Verifying all nodes belong to an existing group")
1364
1365     # We do this verification here because, should this bogus circumstance
1366     # occur, it would never be caught by VerifyGroup, which only acts on
1367     # nodes/instances reachable from existing node groups.
1368
1369     dangling_nodes = set(node for node in self.all_node_info.values()
1370                          if node.group not in self.all_group_info)
1371
1372     dangling_instances = {}
1373     no_node_instances = []
1374
1375     for inst in self.all_inst_info.values():
1376       if inst.primary_node in [node.uuid for node in dangling_nodes]:
1377         dangling_instances.setdefault(inst.primary_node, []).append(inst)
1378       elif inst.primary_node not in self.all_node_info:
1379         no_node_instances.append(inst)
1380
1381     pretty_dangling = [
1382         "%s (%s)" %
1383         (node.name,
1384          utils.CommaJoin(
1385            self.cfg.GetInstanceNames(
1386              dangling_instances.get(node.uuid, ["no instances"]))))
1387         for node in dangling_nodes]
1388
1389     self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1390                   None,
1391                   "the following nodes (and their instances) belong to a non"
1392                   " existing group: %s", utils.CommaJoin(pretty_dangling))
1393
1394     self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1395                   None,
1396                   "the following instances have a non-existing primary-node:"
1397                   " %s", utils.CommaJoin(
1398                            self.cfg.GetInstanceNames(no_node_instances)))
1399
1400     return not self.bad
1401
1402
1403 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1404   """Verifies the status of a node group.
1405
1406   """
1407   HPATH = "cluster-verify"
1408   HTYPE = constants.HTYPE_CLUSTER
1409   REQ_BGL = False
1410
1411   _HOOKS_INDENT_RE = re.compile("^", re.M)
1412
1413   class NodeImage(object):
1414     """A class representing the logical and physical status of a node.
1415
1416     @type uuid: string
1417     @ivar uuid: the node UUID to which this object refers
1418     @ivar volumes: a structure as returned from
1419         L{ganeti.backend.GetVolumeList} (runtime)
1420     @ivar instances: a list of running instances (runtime)
1421     @ivar pinst: list of configured primary instances (config)
1422     @ivar sinst: list of configured secondary instances (config)
1423     @ivar sbp: dictionary of {primary-node: list of instances} for all
1424         instances for which this node is secondary (config)
1425     @ivar mfree: free memory, as reported by hypervisor (runtime)
1426     @ivar dfree: free disk, as reported by the node (runtime)
1427     @ivar offline: the offline status (config)
1428     @type rpc_fail: boolean
1429     @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1430         not whether the individual keys were correct) (runtime)
1431     @type lvm_fail: boolean
1432     @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1433     @type hyp_fail: boolean
1434     @ivar hyp_fail: whether the RPC call didn't return the instance list
1435     @type ghost: boolean
1436     @ivar ghost: whether this is a known node or not (config)
1437     @type os_fail: boolean
1438     @ivar os_fail: whether the RPC call didn't return valid OS data
1439     @type oslist: list
1440     @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1441     @type vm_capable: boolean
1442     @ivar vm_capable: whether the node can host instances
1443     @type pv_min: float
1444     @ivar pv_min: size in MiB of the smallest PVs
1445     @type pv_max: float
1446     @ivar pv_max: size in MiB of the biggest PVs
1447
1448     """
1449     def __init__(self, offline=False, uuid=None, vm_capable=True):
1450       self.uuid = uuid
1451       self.volumes = {}
1452       self.instances = []
1453       self.pinst = []
1454       self.sinst = []
1455       self.sbp = {}
1456       self.mfree = 0
1457       self.dfree = 0
1458       self.offline = offline
1459       self.vm_capable = vm_capable
1460       self.rpc_fail = False
1461       self.lvm_fail = False
1462       self.hyp_fail = False
1463       self.ghost = False
1464       self.os_fail = False
1465       self.oslist = {}
1466       self.pv_min = None
1467       self.pv_max = None
1468
1469   def ExpandNames(self):
1470     # This raises errors.OpPrereqError on its own:
1471     self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1472
1473     # Get instances in node group; this is unsafe and needs verification later
1474     inst_uuids = \
1475       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1476
1477     self.needed_locks = {
1478       locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1479       locking.LEVEL_NODEGROUP: [self.group_uuid],
1480       locking.LEVEL_NODE: [],
1481
1482       # This opcode is run by watcher every five minutes and acquires all nodes
1483       # for a group. It doesn't run for a long time, so it's better to acquire
1484       # the node allocation lock as well.
1485       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1486       }
1487
1488     self.share_locks = ShareAll()
1489
1490   def DeclareLocks(self, level):
1491     if level == locking.LEVEL_NODE:
1492       # Get members of node group; this is unsafe and needs verification later
1493       nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1494
1495       # In Exec(), we warn about mirrored instances that have primary and
1496       # secondary living in separate node groups. To fully verify that
1497       # volumes for these instances are healthy, we will need to do an
1498       # extra call to their secondaries. We ensure here those nodes will
1499       # be locked.
1500       for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1501         # Important: access only the instances whose lock is owned
1502         instance = self.cfg.GetInstanceInfoByName(inst_name)
1503         if instance.disk_template in constants.DTS_INT_MIRROR:
1504           nodes.update(instance.secondary_nodes)
1505
1506       self.needed_locks[locking.LEVEL_NODE] = nodes
1507
1508   def CheckPrereq(self):
1509     assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1510     self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1511
1512     group_node_uuids = set(self.group_info.members)
1513     group_inst_uuids = \
1514       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1515
1516     unlocked_node_uuids = \
1517         group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1518
1519     unlocked_inst_uuids = \
1520         group_inst_uuids.difference(
1521           [self.cfg.GetInstanceInfoByName(name).uuid
1522            for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1523
1524     if unlocked_node_uuids:
1525       raise errors.OpPrereqError(
1526         "Missing lock for nodes: %s" %
1527         utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1528         errors.ECODE_STATE)
1529
1530     if unlocked_inst_uuids:
1531       raise errors.OpPrereqError(
1532         "Missing lock for instances: %s" %
1533         utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1534         errors.ECODE_STATE)
1535
1536     self.all_node_info = self.cfg.GetAllNodesInfo()
1537     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1538
1539     self.my_node_uuids = group_node_uuids
1540     self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1541                              for node_uuid in group_node_uuids)
1542
1543     self.my_inst_uuids = group_inst_uuids
1544     self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1545                              for inst_uuid in group_inst_uuids)
1546
1547     # We detect here the nodes that will need the extra RPC calls for verifying
1548     # split LV volumes; they should be locked.
1549     extra_lv_nodes = set()
1550
1551     for inst in self.my_inst_info.values():
1552       if inst.disk_template in constants.DTS_INT_MIRROR:
1553         for nuuid in inst.all_nodes:
1554           if self.all_node_info[nuuid].group != self.group_uuid:
1555             extra_lv_nodes.add(nuuid)
1556
1557     unlocked_lv_nodes = \
1558         extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1559
1560     if unlocked_lv_nodes:
1561       raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1562                                  utils.CommaJoin(unlocked_lv_nodes),
1563                                  errors.ECODE_STATE)
1564     self.extra_lv_nodes = list(extra_lv_nodes)
1565
1566   def _VerifyNode(self, ninfo, nresult):
1567     """Perform some basic validation on data returned from a node.
1568
1569       - check the result data structure is well formed and has all the
1570         mandatory fields
1571       - check ganeti version
1572
1573     @type ninfo: L{objects.Node}
1574     @param ninfo: the node to check
1575     @param nresult: the results from the node
1576     @rtype: boolean
1577     @return: whether overall this call was successful (and we can expect
1578          reasonable values in the respose)
1579
1580     """
1581     # main result, nresult should be a non-empty dict
1582     test = not nresult or not isinstance(nresult, dict)
1583     self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1584                   "unable to verify node: no data returned")
1585     if test:
1586       return False
1587
1588     # compares ganeti version
1589     local_version = constants.PROTOCOL_VERSION
1590     remote_version = nresult.get("version", None)
1591     test = not (remote_version and
1592                 isinstance(remote_version, (list, tuple)) and
1593                 len(remote_version) == 2)
1594     self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1595                   "connection to node returned invalid data")
1596     if test:
1597       return False
1598
1599     test = local_version != remote_version[0]
1600     self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1601                   "incompatible protocol versions: master %s,"
1602                   " node %s", local_version, remote_version[0])
1603     if test:
1604       return False
1605
1606     # node seems compatible, we can actually try to look into its results
1607
1608     # full package version
1609     self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1610                   constants.CV_ENODEVERSION, ninfo.name,
1611                   "software version mismatch: master %s, node %s",
1612                   constants.RELEASE_VERSION, remote_version[1],
1613                   code=self.ETYPE_WARNING)
1614
1615     hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1616     if ninfo.vm_capable and isinstance(hyp_result, dict):
1617       for hv_name, hv_result in hyp_result.iteritems():
1618         test = hv_result is not None
1619         self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1620                       "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1621
1622     hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1623     if ninfo.vm_capable and isinstance(hvp_result, list):
1624       for item, hv_name, hv_result in hvp_result:
1625         self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1626                       "hypervisor %s parameter verify failure (source %s): %s",
1627                       hv_name, item, hv_result)
1628
1629     test = nresult.get(constants.NV_NODESETUP,
1630                        ["Missing NODESETUP results"])
1631     self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1632                   "node setup error: %s", "; ".join(test))
1633
1634     return True
1635
1636   def _VerifyNodeTime(self, ninfo, nresult,
1637                       nvinfo_starttime, nvinfo_endtime):
1638     """Check the node time.
1639
1640     @type ninfo: L{objects.Node}
1641     @param ninfo: the node to check
1642     @param nresult: the remote results for the node
1643     @param nvinfo_starttime: the start time of the RPC call
1644     @param nvinfo_endtime: the end time of the RPC call
1645
1646     """
1647     ntime = nresult.get(constants.NV_TIME, None)
1648     try:
1649       ntime_merged = utils.MergeTime(ntime)
1650     except (ValueError, TypeError):
1651       self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1652                     "Node returned invalid time")
1653       return
1654
1655     if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1656       ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1657     elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1658       ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1659     else:
1660       ntime_diff = None
1661
1662     self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1663                   "Node time diverges by at least %s from master node time",
1664                   ntime_diff)
1665
1666   def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1667     """Check the node LVM results and update info for cross-node checks.
1668
1669     @type ninfo: L{objects.Node}
1670     @param ninfo: the node to check
1671     @param nresult: the remote results for the node
1672     @param vg_name: the configured VG name
1673     @type nimg: L{NodeImage}
1674     @param nimg: node image
1675
1676     """
1677     if vg_name is None:
1678       return
1679
1680     # checks vg existence and size > 20G
1681     vglist = nresult.get(constants.NV_VGLIST, None)
1682     test = not vglist
1683     self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1684                   "unable to check volume groups")
1685     if not test:
1686       vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1687                                             constants.MIN_VG_SIZE)
1688       self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1689
1690     # Check PVs
1691     (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1692     for em in errmsgs:
1693       self._Error(constants.CV_ENODELVM, ninfo.name, em)
1694     if pvminmax is not None:
1695       (nimg.pv_min, nimg.pv_max) = pvminmax
1696
1697   def _VerifyGroupDRBDVersion(self, node_verify_infos):
1698     """Check cross-node DRBD version consistency.
1699
1700     @type node_verify_infos: dict
1701     @param node_verify_infos: infos about nodes as returned from the
1702       node_verify call.
1703
1704     """
1705     node_versions = {}
1706     for node_uuid, ndata in node_verify_infos.items():
1707       nresult = ndata.payload
1708       version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1709       node_versions[node_uuid] = version
1710
1711     if len(set(node_versions.values())) > 1:
1712       for node_uuid, version in sorted(node_versions.items()):
1713         msg = "DRBD version mismatch: %s" % version
1714         self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1715                     code=self.ETYPE_WARNING)
1716
1717   def _VerifyGroupLVM(self, node_image, vg_name):
1718     """Check cross-node consistency in LVM.
1719
1720     @type node_image: dict
1721     @param node_image: info about nodes, mapping from node to names to
1722       L{NodeImage} objects
1723     @param vg_name: the configured VG name
1724
1725     """
1726     if vg_name is None:
1727       return
1728
1729     # Only exclusive storage needs this kind of checks
1730     if not self._exclusive_storage:
1731       return
1732
1733     # exclusive_storage wants all PVs to have the same size (approximately),
1734     # if the smallest and the biggest ones are okay, everything is fine.
1735     # pv_min is None iff pv_max is None
1736     vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1737     if not vals:
1738       return
1739     (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1740     (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1741     bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1742     self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1743                   "PV sizes differ too much in the group; smallest (%s MB) is"
1744                   " on %s, biggest (%s MB) is on %s",
1745                   pvmin, self.cfg.GetNodeName(minnode_uuid),
1746                   pvmax, self.cfg.GetNodeName(maxnode_uuid))
1747
1748   def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1749     """Check the node bridges.
1750
1751     @type ninfo: L{objects.Node}
1752     @param ninfo: the node to check
1753     @param nresult: the remote results for the node
1754     @param bridges: the expected list of bridges
1755
1756     """
1757     if not bridges:
1758       return
1759
1760     missing = nresult.get(constants.NV_BRIDGES, None)
1761     test = not isinstance(missing, list)
1762     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1763                   "did not return valid bridge information")
1764     if not test:
1765       self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1766                     "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1767
1768   def _VerifyNodeUserScripts(self, ninfo, nresult):
1769     """Check the results of user scripts presence and executability on the node
1770
1771     @type ninfo: L{objects.Node}
1772     @param ninfo: the node to check
1773     @param nresult: the remote results for the node
1774
1775     """
1776     test = not constants.NV_USERSCRIPTS in nresult
1777     self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1778                   "did not return user scripts information")
1779
1780     broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1781     if not test:
1782       self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1783                     "user scripts not present or not executable: %s" %
1784                     utils.CommaJoin(sorted(broken_scripts)))
1785
1786   def _VerifyNodeNetwork(self, ninfo, nresult):
1787     """Check the node network connectivity results.
1788
1789     @type ninfo: L{objects.Node}
1790     @param ninfo: the node to check
1791     @param nresult: the remote results for the node
1792
1793     """
1794     test = constants.NV_NODELIST not in nresult
1795     self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1796                   "node hasn't returned node ssh connectivity data")
1797     if not test:
1798       if nresult[constants.NV_NODELIST]:
1799         for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1800           self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1801                         "ssh communication with node '%s': %s", a_node, a_msg)
1802
1803     test = constants.NV_NODENETTEST not in nresult
1804     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1805                   "node hasn't returned node tcp connectivity data")
1806     if not test:
1807       if nresult[constants.NV_NODENETTEST]:
1808         nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1809         for anode in nlist:
1810           self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1811                         "tcp communication with node '%s': %s",
1812                         anode, nresult[constants.NV_NODENETTEST][anode])
1813
1814     test = constants.NV_MASTERIP not in nresult
1815     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1816                   "node hasn't returned node master IP reachability data")
1817     if not test:
1818       if not nresult[constants.NV_MASTERIP]:
1819         if ninfo.uuid == self.master_node:
1820           msg = "the master node cannot reach the master IP (not configured?)"
1821         else:
1822           msg = "cannot reach the master IP"
1823         self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1824
1825   def _VerifyInstance(self, instance, node_image, diskstatus):
1826     """Verify an instance.
1827
1828     This function checks to see if the required block devices are
1829     available on the instance's node, and that the nodes are in the correct
1830     state.
1831
1832     """
1833     pnode_uuid = instance.primary_node
1834     pnode_img = node_image[pnode_uuid]
1835     groupinfo = self.cfg.GetAllNodeGroupsInfo()
1836
1837     node_vol_should = {}
1838     instance.MapLVsByNode(node_vol_should)
1839
1840     cluster = self.cfg.GetClusterInfo()
1841     ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1842                                                             self.group_info)
1843     err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1844     self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1845                   utils.CommaJoin(err), code=self.ETYPE_WARNING)
1846
1847     for node_uuid in node_vol_should:
1848       n_img = node_image[node_uuid]
1849       if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1850         # ignore missing volumes on offline or broken nodes
1851         continue
1852       for volume in node_vol_should[node_uuid]:
1853         test = volume not in n_img.volumes
1854         self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1855                       "volume %s missing on node %s", volume,
1856                       self.cfg.GetNodeName(node_uuid))
1857
1858     if instance.admin_state == constants.ADMINST_UP:
1859       test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1860       self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1861                     "instance not running on its primary node %s",
1862                      self.cfg.GetNodeName(pnode_uuid))
1863       self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1864                     instance.name, "instance is marked as running and lives on"
1865                     " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1866
1867     diskdata = [(nname, success, status, idx)
1868                 for (nname, disks) in diskstatus.items()
1869                 for idx, (success, status) in enumerate(disks)]
1870
1871     for nname, success, bdev_status, idx in diskdata:
1872       # the 'ghost node' construction in Exec() ensures that we have a
1873       # node here
1874       snode = node_image[nname]
1875       bad_snode = snode.ghost or snode.offline
1876       self._ErrorIf(instance.disks_active and
1877                     not success and not bad_snode,
1878                     constants.CV_EINSTANCEFAULTYDISK, instance.name,
1879                     "couldn't retrieve status for disk/%s on %s: %s",
1880                     idx, self.cfg.GetNodeName(nname), bdev_status)
1881
1882       if instance.disks_active and success and \
1883          (bdev_status.is_degraded or
1884           bdev_status.ldisk_status != constants.LDS_OKAY):
1885         msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
1886         if bdev_status.is_degraded:
1887           msg += " is degraded"
1888         if bdev_status.ldisk_status != constants.LDS_OKAY:
1889           msg += "; state is '%s'" % \
1890                  constants.LDS_NAMES[bdev_status.ldisk_status]
1891
1892         self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
1893
1894     self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1895                   constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
1896                   "instance %s, connection to primary node failed",
1897                   instance.name)
1898
1899     self._ErrorIf(len(instance.secondary_nodes) > 1,
1900                   constants.CV_EINSTANCELAYOUT, instance.name,
1901                   "instance has multiple secondary nodes: %s",
1902                   utils.CommaJoin(instance.secondary_nodes),
1903                   code=self.ETYPE_WARNING)
1904
1905     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
1906     if any(es_flags.values()):
1907       if instance.disk_template not in constants.DTS_EXCL_STORAGE:
1908         # Disk template not compatible with exclusive_storage: no instance
1909         # node should have the flag set
1910         es_nodes = [n
1911                     for (n, es) in es_flags.items()
1912                     if es]
1913         self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
1914                     "instance has template %s, which is not supported on nodes"
1915                     " that have exclusive storage set: %s",
1916                     instance.disk_template,
1917                     utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
1918       for (idx, disk) in enumerate(instance.disks):
1919         self._ErrorIf(disk.spindles is None,
1920                       constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
1921                       "number of spindles not configured for disk %s while"
1922                       " exclusive storage is enabled, try running"
1923                       " gnt-cluster repair-disk-sizes", idx)
1924
1925     if instance.disk_template in constants.DTS_INT_MIRROR:
1926       instance_nodes = utils.NiceSort(instance.all_nodes)
1927       instance_groups = {}
1928
1929       for node_uuid in instance_nodes:
1930         instance_groups.setdefault(self.all_node_info[node_uuid].group,
1931                                    []).append(node_uuid)
1932
1933       pretty_list = [
1934         "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
1935                            groupinfo[group].name)
1936         # Sort so that we always list the primary node first.
1937         for group, nodes in sorted(instance_groups.items(),
1938                                    key=lambda (_, nodes): pnode_uuid in nodes,
1939                                    reverse=True)]
1940
1941       self._ErrorIf(len(instance_groups) > 1,
1942                     constants.CV_EINSTANCESPLITGROUPS,
1943                     instance.name, "instance has primary and secondary nodes in"
1944                     " different groups: %s", utils.CommaJoin(pretty_list),
1945                     code=self.ETYPE_WARNING)
1946
1947     inst_nodes_offline = []
1948     for snode in instance.secondary_nodes:
1949       s_img = node_image[snode]
1950       self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
1951                     self.cfg.GetNodeName(snode),
1952                     "instance %s, connection to secondary node failed",
1953                     instance.name)
1954
1955       if s_img.offline:
1956         inst_nodes_offline.append(snode)
1957
1958     # warn that the instance lives on offline nodes
1959     self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
1960                   instance.name, "instance has offline secondary node(s) %s",
1961                   utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
1962     # ... or ghost/non-vm_capable nodes
1963     for node_uuid in instance.all_nodes:
1964       self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
1965                     instance.name, "instance lives on ghost node %s",
1966                     self.cfg.GetNodeName(node_uuid))
1967       self._ErrorIf(not node_image[node_uuid].vm_capable,
1968                     constants.CV_EINSTANCEBADNODE, instance.name,
1969                     "instance lives on non-vm_capable node %s",
1970                     self.cfg.GetNodeName(node_uuid))
1971
1972   def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
1973     """Verify if there are any unknown volumes in the cluster.
1974
1975     The .os, .swap and backup volumes are ignored. All other volumes are
1976     reported as unknown.
1977
1978     @type reserved: L{ganeti.utils.FieldSet}
1979     @param reserved: a FieldSet of reserved volume names
1980
1981     """
1982     for node_uuid, n_img in node_image.items():
1983       if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
1984           self.all_node_info[node_uuid].group != self.group_uuid):
1985         # skip non-healthy nodes
1986         continue
1987       for volume in n_img.volumes:
1988         test = ((node_uuid not in node_vol_should or
1989                 volume not in node_vol_should[node_uuid]) and
1990                 not reserved.Matches(volume))
1991         self._ErrorIf(test, constants.CV_ENODEORPHANLV,
1992                       self.cfg.GetNodeName(node_uuid),
1993                       "volume %s is unknown", volume)
1994
1995   def _VerifyNPlusOneMemory(self, node_image, all_insts):
1996     """Verify N+1 Memory Resilience.
1997
1998     Check that if one single node dies we can still start all the
1999     instances it was primary for.
2000
2001     """
2002     cluster_info = self.cfg.GetClusterInfo()
2003     for node_uuid, n_img in node_image.items():
2004       # This code checks that every node which is now listed as
2005       # secondary has enough memory to host all instances it is
2006       # supposed to should a single other node in the cluster fail.
2007       # FIXME: not ready for failover to an arbitrary node
2008       # FIXME: does not support file-backed instances
2009       # WARNING: we currently take into account down instances as well
2010       # as up ones, considering that even if they're down someone
2011       # might want to start them even in the event of a node failure.
2012       if n_img.offline or \
2013          self.all_node_info[node_uuid].group != self.group_uuid:
2014         # we're skipping nodes marked offline and nodes in other groups from
2015         # the N+1 warning, since most likely we don't have good memory
2016         # infromation from them; we already list instances living on such
2017         # nodes, and that's enough warning
2018         continue
2019       #TODO(dynmem): also consider ballooning out other instances
2020       for prinode, inst_uuids in n_img.sbp.items():
2021         needed_mem = 0
2022         for inst_uuid in inst_uuids:
2023           bep = cluster_info.FillBE(all_insts[inst_uuid])
2024           if bep[constants.BE_AUTO_BALANCE]:
2025             needed_mem += bep[constants.BE_MINMEM]
2026         test = n_img.mfree < needed_mem
2027         self._ErrorIf(test, constants.CV_ENODEN1,
2028                       self.cfg.GetNodeName(node_uuid),
2029                       "not enough memory to accomodate instance failovers"
2030                       " should node %s fail (%dMiB needed, %dMiB available)",
2031                       self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2032
2033   def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2034                    (files_all, files_opt, files_mc, files_vm)):
2035     """Verifies file checksums collected from all nodes.
2036
2037     @param nodes: List of L{objects.Node} objects
2038     @param master_node_uuid: UUID of master node
2039     @param all_nvinfo: RPC results
2040
2041     """
2042     # Define functions determining which nodes to consider for a file
2043     files2nodefn = [
2044       (files_all, None),
2045       (files_mc, lambda node: (node.master_candidate or
2046                                node.uuid == master_node_uuid)),
2047       (files_vm, lambda node: node.vm_capable),
2048       ]
2049
2050     # Build mapping from filename to list of nodes which should have the file
2051     nodefiles = {}
2052     for (files, fn) in files2nodefn:
2053       if fn is None:
2054         filenodes = nodes
2055       else:
2056         filenodes = filter(fn, nodes)
2057       nodefiles.update((filename,
2058                         frozenset(map(operator.attrgetter("uuid"), filenodes)))
2059                        for filename in files)
2060
2061     assert set(nodefiles) == (files_all | files_mc | files_vm)
2062
2063     fileinfo = dict((filename, {}) for filename in nodefiles)
2064     ignore_nodes = set()
2065
2066     for node in nodes:
2067       if node.offline:
2068         ignore_nodes.add(node.uuid)
2069         continue
2070
2071       nresult = all_nvinfo[node.uuid]
2072
2073       if nresult.fail_msg or not nresult.payload:
2074         node_files = None
2075       else:
2076         fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2077         node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2078                           for (key, value) in fingerprints.items())
2079         del fingerprints
2080
2081       test = not (node_files and isinstance(node_files, dict))
2082       self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2083                     "Node did not return file checksum data")
2084       if test:
2085         ignore_nodes.add(node.uuid)
2086         continue
2087
2088       # Build per-checksum mapping from filename to nodes having it
2089       for (filename, checksum) in node_files.items():
2090         assert filename in nodefiles
2091         fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2092
2093     for (filename, checksums) in fileinfo.items():
2094       assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2095
2096       # Nodes having the file
2097       with_file = frozenset(node_uuid
2098                             for node_uuids in fileinfo[filename].values()
2099                             for node_uuid in node_uuids) - ignore_nodes
2100
2101       expected_nodes = nodefiles[filename] - ignore_nodes
2102
2103       # Nodes missing file
2104       missing_file = expected_nodes - with_file
2105
2106       if filename in files_opt:
2107         # All or no nodes
2108         self._ErrorIf(missing_file and missing_file != expected_nodes,
2109                       constants.CV_ECLUSTERFILECHECK, None,
2110                       "File %s is optional, but it must exist on all or no"
2111                       " nodes (not found on %s)",
2112                       filename,
2113                       utils.CommaJoin(
2114                         utils.NiceSort(
2115                           map(self.cfg.GetNodeName, missing_file))))
2116       else:
2117         self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2118                       "File %s is missing from node(s) %s", filename,
2119                       utils.CommaJoin(
2120                         utils.NiceSort(
2121                           map(self.cfg.GetNodeName, missing_file))))
2122
2123         # Warn if a node has a file it shouldn't
2124         unexpected = with_file - expected_nodes
2125         self._ErrorIf(unexpected,
2126                       constants.CV_ECLUSTERFILECHECK, None,
2127                       "File %s should not exist on node(s) %s",
2128                       filename, utils.CommaJoin(
2129                         utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2130
2131       # See if there are multiple versions of the file
2132       test = len(checksums) > 1
2133       if test:
2134         variants = ["variant %s on %s" %
2135                     (idx + 1,
2136                      utils.CommaJoin(utils.NiceSort(
2137                        map(self.cfg.GetNodeName, node_uuids))))
2138                     for (idx, (checksum, node_uuids)) in
2139                       enumerate(sorted(checksums.items()))]
2140       else:
2141         variants = []
2142
2143       self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2144                     "File %s found with %s different checksums (%s)",
2145                     filename, len(checksums), "; ".join(variants))
2146
2147   def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2148                       drbd_map):
2149     """Verifies and the node DRBD status.
2150
2151     @type ninfo: L{objects.Node}
2152     @param ninfo: the node to check
2153     @param nresult: the remote results for the node
2154     @param instanceinfo: the dict of instances
2155     @param drbd_helper: the configured DRBD usermode helper
2156     @param drbd_map: the DRBD map as returned by
2157         L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2158
2159     """
2160     if drbd_helper:
2161       helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2162       test = (helper_result is None)
2163       self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2164                     "no drbd usermode helper returned")
2165       if helper_result:
2166         status, payload = helper_result
2167         test = not status
2168         self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2169                       "drbd usermode helper check unsuccessful: %s", payload)
2170         test = status and (payload != drbd_helper)
2171         self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2172                       "wrong drbd usermode helper: %s", payload)
2173
2174     # compute the DRBD minors
2175     node_drbd = {}
2176     for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2177       test = inst_uuid not in instanceinfo
2178       self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2179                     "ghost instance '%s' in temporary DRBD map", inst_uuid)
2180         # ghost instance should not be running, but otherwise we
2181         # don't give double warnings (both ghost instance and
2182         # unallocated minor in use)
2183       if test:
2184         node_drbd[minor] = (inst_uuid, False)
2185       else:
2186         instance = instanceinfo[inst_uuid]
2187         node_drbd[minor] = (inst_uuid, instance.disks_active)
2188
2189     # and now check them
2190     used_minors = nresult.get(constants.NV_DRBDLIST, [])
2191     test = not isinstance(used_minors, (tuple, list))
2192     self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2193                   "cannot parse drbd status file: %s", str(used_minors))
2194     if test:
2195       # we cannot check drbd status
2196       return
2197
2198     for minor, (inst_uuid, must_exist) in node_drbd.items():
2199       test = minor not in used_minors and must_exist
2200       self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2201                     "drbd minor %d of instance %s is not active", minor,
2202                     self.cfg.GetInstanceName(inst_uuid))
2203     for minor in used_minors:
2204       test = minor not in node_drbd
2205       self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2206                     "unallocated drbd minor %d is in use", minor)
2207
2208   def _UpdateNodeOS(self, ninfo, nresult, nimg):
2209     """Builds the node OS structures.
2210
2211     @type ninfo: L{objects.Node}
2212     @param ninfo: the node to check
2213     @param nresult: the remote results for the node
2214     @param nimg: the node image object
2215
2216     """
2217     remote_os = nresult.get(constants.NV_OSLIST, None)
2218     test = (not isinstance(remote_os, list) or
2219             not compat.all(isinstance(v, list) and len(v) == 7
2220                            for v in remote_os))
2221
2222     self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2223                   "node hasn't returned valid OS data")
2224
2225     nimg.os_fail = test
2226
2227     if test:
2228       return
2229
2230     os_dict = {}
2231
2232     for (name, os_path, status, diagnose,
2233          variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2234
2235       if name not in os_dict:
2236         os_dict[name] = []
2237
2238       # parameters is a list of lists instead of list of tuples due to
2239       # JSON lacking a real tuple type, fix it:
2240       parameters = [tuple(v) for v in parameters]
2241       os_dict[name].append((os_path, status, diagnose,
2242                             set(variants), set(parameters), set(api_ver)))
2243
2244     nimg.oslist = os_dict
2245
2246   def _VerifyNodeOS(self, ninfo, nimg, base):
2247     """Verifies the node OS list.
2248
2249     @type ninfo: L{objects.Node}
2250     @param ninfo: the node to check
2251     @param nimg: the node image object
2252     @param base: the 'template' node we match against (e.g. from the master)
2253
2254     """
2255     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2256
2257     beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2258     for os_name, os_data in nimg.oslist.items():
2259       assert os_data, "Empty OS status for OS %s?!" % os_name
2260       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2261       self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2262                     "Invalid OS %s (located at %s): %s",
2263                     os_name, f_path, f_diag)
2264       self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2265                     "OS '%s' has multiple entries"
2266                     " (first one shadows the rest): %s",
2267                     os_name, utils.CommaJoin([v[0] for v in os_data]))
2268       # comparisons with the 'base' image
2269       test = os_name not in base.oslist
2270       self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2271                     "Extra OS %s not present on reference node (%s)",
2272                     os_name, self.cfg.GetNodeName(base.uuid))
2273       if test:
2274         continue
2275       assert base.oslist[os_name], "Base node has empty OS status?"
2276       _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2277       if not b_status:
2278         # base OS is invalid, skipping
2279         continue
2280       for kind, a, b in [("API version", f_api, b_api),
2281                          ("variants list", f_var, b_var),
2282                          ("parameters", beautify_params(f_param),
2283                           beautify_params(b_param))]:
2284         self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2285                       "OS %s for %s differs from reference node %s:"
2286                       " [%s] vs. [%s]", kind, os_name,
2287                       self.cfg.GetNodeName(base.uuid),
2288                       utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2289
2290     # check any missing OSes
2291     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2292     self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2293                   "OSes present on reference node %s"
2294                   " but missing on this node: %s",
2295                   self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2296
2297   def _VerifyFileStoragePaths(self, ninfo, nresult, is_master,
2298                               enabled_disk_templates):
2299     """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2300
2301     @type ninfo: L{objects.Node}
2302     @param ninfo: the node to check
2303     @param nresult: the remote results for the node
2304     @type is_master: bool
2305     @param is_master: Whether node is the master node
2306
2307     """
2308     if (is_master and
2309         (utils.storage.IsFileStorageEnabled(enabled_disk_templates) or
2310          utils.storage.IsSharedFileStorageEnabled(enabled_disk_templates))):
2311       try:
2312         fspaths = nresult[constants.NV_FILE_STORAGE_PATHS]
2313       except KeyError:
2314         # This should never happen
2315         self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2316                       "Node did not return forbidden file storage paths")
2317       else:
2318         self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2319                       "Found forbidden file storage paths: %s",
2320                       utils.CommaJoin(fspaths))
2321     else:
2322       self._ErrorIf(constants.NV_FILE_STORAGE_PATHS in nresult,
2323                     constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2324                     "Node should not have returned forbidden file storage"
2325                     " paths")
2326
2327   def _VerifyOob(self, ninfo, nresult):
2328     """Verifies out of band functionality of a node.
2329
2330     @type ninfo: L{objects.Node}
2331     @param ninfo: the node to check
2332     @param nresult: the remote results for the node
2333
2334     """
2335     # We just have to verify the paths on master and/or master candidates
2336     # as the oob helper is invoked on the master
2337     if ((ninfo.master_candidate or ninfo.master_capable) and
2338         constants.NV_OOB_PATHS in nresult):
2339       for path_result in nresult[constants.NV_OOB_PATHS]:
2340         self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2341                       ninfo.name, path_result)
2342
2343   def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2344     """Verifies and updates the node volume data.
2345
2346     This function will update a L{NodeImage}'s internal structures
2347     with data from the remote call.
2348
2349     @type ninfo: L{objects.Node}
2350     @param ninfo: the node to check
2351     @param nresult: the remote results for the node
2352     @param nimg: the node image object
2353     @param vg_name: the configured VG name
2354
2355     """
2356     nimg.lvm_fail = True
2357     lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2358     if vg_name is None:
2359       pass
2360     elif isinstance(lvdata, basestring):
2361       self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2362                     "LVM problem on node: %s", utils.SafeEncode(lvdata))
2363     elif not isinstance(lvdata, dict):
2364       self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2365                     "rpc call to node failed (lvlist)")
2366     else:
2367       nimg.volumes = lvdata
2368       nimg.lvm_fail = False
2369
2370   def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2371     """Verifies and updates the node instance list.
2372
2373     If the listing was successful, then updates this node's instance
2374     list. Otherwise, it marks the RPC call as failed for the instance
2375     list key.
2376
2377     @type ninfo: L{objects.Node}
2378     @param ninfo: the node to check
2379     @param nresult: the remote results for the node
2380     @param nimg: the node image object
2381
2382     """
2383     idata = nresult.get(constants.NV_INSTANCELIST, None)
2384     test = not isinstance(idata, list)
2385     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2386                   "rpc call to node failed (instancelist): %s",
2387                   utils.SafeEncode(str(idata)))
2388     if test:
2389       nimg.hyp_fail = True
2390     else:
2391       nimg.instances = [inst.uuid for (_, inst) in
2392                         self.cfg.GetMultiInstanceInfoByName(idata)]
2393
2394   def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2395     """Verifies and computes a node information map
2396
2397     @type ninfo: L{objects.Node}
2398     @param ninfo: the node to check
2399     @param nresult: the remote results for the node
2400     @param nimg: the node image object
2401     @param vg_name: the configured VG name
2402
2403     """
2404     # try to read free memory (from the hypervisor)
2405     hv_info = nresult.get(constants.NV_HVINFO, None)
2406     test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2407     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2408                   "rpc call to node failed (hvinfo)")
2409     if not test:
2410       try:
2411         nimg.mfree = int(hv_info["memory_free"])
2412       except (ValueError, TypeError):
2413         self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2414                       "node returned invalid nodeinfo, check hypervisor")
2415
2416     # FIXME: devise a free space model for file based instances as well
2417     if vg_name is not None:
2418       test = (constants.NV_VGLIST not in nresult or
2419               vg_name not in nresult[constants.NV_VGLIST])
2420       self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2421                     "node didn't return data for the volume group '%s'"
2422                     " - it is either missing or broken", vg_name)
2423       if not test:
2424         try:
2425           nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2426         except (ValueError, TypeError):
2427           self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2428                         "node returned invalid LVM info, check LVM status")
2429
2430   def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2431     """Gets per-disk status information for all instances.
2432
2433     @type node_uuids: list of strings
2434     @param node_uuids: Node UUIDs
2435     @type node_image: dict of (UUID, L{objects.Node})
2436     @param node_image: Node objects
2437     @type instanceinfo: dict of (UUID, L{objects.Instance})
2438     @param instanceinfo: Instance objects
2439     @rtype: {instance: {node: [(succes, payload)]}}
2440     @return: a dictionary of per-instance dictionaries with nodes as
2441         keys and disk information as values; the disk information is a
2442         list of tuples (success, payload)
2443
2444     """
2445     node_disks = {}
2446     node_disks_devonly = {}
2447     diskless_instances = set()
2448     diskless = constants.DT_DISKLESS
2449
2450     for nuuid in node_uuids:
2451       node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2452                                              node_image[nuuid].sinst))
2453       diskless_instances.update(uuid for uuid in node_inst_uuids
2454                                 if instanceinfo[uuid].disk_template == diskless)
2455       disks = [(inst_uuid, disk)
2456                for inst_uuid in node_inst_uuids
2457                for disk in instanceinfo[inst_uuid].disks]
2458
2459       if not disks:
2460         # No need to collect data
2461         continue
2462
2463       node_disks[nuuid] = disks
2464
2465       # _AnnotateDiskParams makes already copies of the disks
2466       devonly = []
2467       for (inst_uuid, dev) in disks:
2468         (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2469                                           self.cfg)
2470         self.cfg.SetDiskID(anno_disk, nuuid)
2471         devonly.append(anno_disk)
2472
2473       node_disks_devonly[nuuid] = devonly
2474
2475     assert len(node_disks) == len(node_disks_devonly)
2476
2477     # Collect data from all nodes with disks
2478     result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2479                                                           node_disks_devonly)
2480
2481     assert len(result) == len(node_disks)
2482
2483     instdisk = {}
2484
2485     for (nuuid, nres) in result.items():
2486       node = self.cfg.GetNodeInfo(nuuid)
2487       disks = node_disks[node.uuid]
2488
2489       if nres.offline:
2490         # No data from this node
2491         data = len(disks) * [(False, "node offline")]
2492       else:
2493         msg = nres.fail_msg
2494         self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2495                       "while getting disk information: %s", msg)
2496         if msg:
2497           # No data from this node
2498           data = len(disks) * [(False, msg)]
2499         else:
2500           data = []
2501           for idx, i in enumerate(nres.payload):
2502             if isinstance(i, (tuple, list)) and len(i) == 2:
2503               data.append(i)
2504             else:
2505               logging.warning("Invalid result from node %s, entry %d: %s",
2506                               node.name, idx, i)
2507               data.append((False, "Invalid result from the remote node"))
2508
2509       for ((inst_uuid, _), status) in zip(disks, data):
2510         instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2511           .append(status)
2512
2513     # Add empty entries for diskless instances.
2514     for inst_uuid in diskless_instances:
2515       assert inst_uuid not in instdisk
2516       instdisk[inst_uuid] = {}
2517
2518     assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2519                       len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2520                       compat.all(isinstance(s, (tuple, list)) and
2521                                  len(s) == 2 for s in statuses)
2522                       for inst, nuuids in instdisk.items()
2523                       for nuuid, statuses in nuuids.items())
2524     if __debug__:
2525       instdisk_keys = set(instdisk)
2526       instanceinfo_keys = set(instanceinfo)
2527       assert instdisk_keys == instanceinfo_keys, \
2528         ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2529          (instdisk_keys, instanceinfo_keys))
2530
2531     return instdisk
2532
2533   @staticmethod
2534   def _SshNodeSelector(group_uuid, all_nodes):
2535     """Create endless iterators for all potential SSH check hosts.
2536
2537     """
2538     nodes = [node for node in all_nodes
2539              if (node.group != group_uuid and
2540                  not node.offline)]
2541     keyfunc = operator.attrgetter("group")
2542
2543     return map(itertools.cycle,
2544                [sorted(map(operator.attrgetter("name"), names))
2545                 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2546                                                   keyfunc)])
2547
2548   @classmethod
2549   def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2550     """Choose which nodes should talk to which other nodes.
2551
2552     We will make nodes contact all nodes in their group, and one node from
2553     every other group.
2554
2555     @warning: This algorithm has a known issue if one node group is much
2556       smaller than others (e.g. just one node). In such a case all other
2557       nodes will talk to the single node.
2558
2559     """
2560     online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2561     sel = cls._SshNodeSelector(group_uuid, all_nodes)
2562
2563     return (online_nodes,
2564             dict((name, sorted([i.next() for i in sel]))
2565                  for name in online_nodes))
2566
2567   def BuildHooksEnv(self):
2568     """Build hooks env.
2569
2570     Cluster-Verify hooks just ran in the post phase and their failure makes
2571     the output be logged in the verify output and the verification to fail.
2572
2573     """
2574     env = {
2575       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2576       }
2577
2578     env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2579                for node in self.my_node_info.values())
2580
2581     return env
2582
2583   def BuildHooksNodes(self):
2584     """Build hooks nodes.
2585
2586     """
2587     return ([], list(self.my_node_info.keys()))
2588
2589   def Exec(self, feedback_fn):
2590     """Verify integrity of the node group, performing various test on nodes.
2591
2592     """
2593     # This method has too many local variables. pylint: disable=R0914
2594     feedback_fn("* Verifying group '%s'" % self.group_info.name)
2595
2596     if not self.my_node_uuids:
2597       # empty node group
2598       feedback_fn("* Empty node group, skipping verification")
2599       return True
2600
2601     self.bad = False
2602     verbose = self.op.verbose
2603     self._feedback_fn = feedback_fn
2604
2605     vg_name = self.cfg.GetVGName()
2606     drbd_helper = self.cfg.GetDRBDHelper()
2607     cluster = self.cfg.GetClusterInfo()
2608     hypervisors = cluster.enabled_hypervisors
2609     node_data_list = self.my_node_info.values()
2610
2611     i_non_redundant = [] # Non redundant instances
2612     i_non_a_balanced = [] # Non auto-balanced instances
2613     i_offline = 0 # Count of offline instances
2614     n_offline = 0 # Count of offline nodes
2615     n_drained = 0 # Count of nodes being drained
2616     node_vol_should = {}
2617
2618     # FIXME: verify OS list
2619
2620     # File verification
2621     filemap = ComputeAncillaryFiles(cluster, False)
2622
2623     # do local checksums
2624     master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2625     master_ip = self.cfg.GetMasterIP()
2626
2627     feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2628
2629     user_scripts = []
2630     if self.cfg.GetUseExternalMipScript():
2631       user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2632
2633     node_verify_param = {
2634       constants.NV_FILELIST:
2635         map(vcluster.MakeVirtualPath,
2636             utils.UniqueSequence(filename
2637                                  for files in filemap
2638                                  for filename in files)),
2639       constants.NV_NODELIST:
2640         self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2641                                   self.all_node_info.values()),
2642       constants.NV_HYPERVISOR: hypervisors,
2643       constants.NV_HVPARAMS:
2644         _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2645       constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2646                                  for node in node_data_list
2647                                  if not node.offline],
2648       constants.NV_INSTANCELIST: hypervisors,
2649       constants.NV_VERSION: None,
2650       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2651       constants.NV_NODESETUP: None,
2652       constants.NV_TIME: None,
2653       constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2654       constants.NV_OSLIST: None,
2655       constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2656       constants.NV_USERSCRIPTS: user_scripts,
2657       }
2658
2659     if vg_name is not None:
2660       node_verify_param[constants.NV_VGLIST] = None
2661       node_verify_param[constants.NV_LVLIST] = vg_name
2662       node_verify_param[constants.NV_PVLIST] = [vg_name]
2663
2664     if drbd_helper:
2665       node_verify_param[constants.NV_DRBDVERSION] = None
2666       node_verify_param[constants.NV_DRBDLIST] = None
2667       node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2668
2669     if constants.ENABLE_FILE_STORAGE or constants.ENABLE_SHARED_FILE_STORAGE:
2670       # Load file storage paths only from master node
2671       node_verify_param[constants.NV_FILE_STORAGE_PATHS] = \
2672         self.cfg.GetMasterNodeName()
2673
2674     # bridge checks
2675     # FIXME: this needs to be changed per node-group, not cluster-wide
2676     bridges = set()
2677     default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2678     if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2679       bridges.add(default_nicpp[constants.NIC_LINK])
2680     for inst_uuid in self.my_inst_info.values():
2681       for nic in inst_uuid.nics:
2682         full_nic = cluster.SimpleFillNIC(nic.nicparams)
2683         if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2684           bridges.add(full_nic[constants.NIC_LINK])
2685
2686     if bridges:
2687       node_verify_param[constants.NV_BRIDGES] = list(bridges)
2688
2689     # Build our expected cluster state
2690     node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2691                                                  uuid=node.uuid,
2692                                                  vm_capable=node.vm_capable))
2693                       for node in node_data_list)
2694
2695     # Gather OOB paths
2696     oob_paths = []
2697     for node in self.all_node_info.values():
2698       path = SupportsOob(self.cfg, node)
2699       if path and path not in oob_paths:
2700         oob_paths.append(path)
2701
2702     if oob_paths:
2703       node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2704
2705     for inst_uuid in self.my_inst_uuids:
2706       instance = self.my_inst_info[inst_uuid]
2707       if instance.admin_state == constants.ADMINST_OFFLINE:
2708         i_offline += 1
2709
2710       for nuuid in instance.all_nodes:
2711         if nuuid not in node_image:
2712           gnode = self.NodeImage(uuid=nuuid)
2713           gnode.ghost = (nuuid not in self.all_node_info)
2714           node_image[nuuid] = gnode
2715
2716       instance.MapLVsByNode(node_vol_should)
2717
2718       pnode = instance.primary_node
2719       node_image[pnode].pinst.append(instance.uuid)
2720
2721       for snode in instance.secondary_nodes:
2722         nimg = node_image[snode]
2723         nimg.sinst.append(instance.uuid)
2724         if pnode not in nimg.sbp:
2725           nimg.sbp[pnode] = []
2726         nimg.sbp[pnode].append(instance.uuid)
2727
2728     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2729                                                self.my_node_info.keys())
2730     # The value of exclusive_storage should be the same across the group, so if
2731     # it's True for at least a node, we act as if it were set for all the nodes
2732     self._exclusive_storage = compat.any(es_flags.values())
2733     if self._exclusive_storage:
2734       node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2735
2736     # At this point, we have the in-memory data structures complete,
2737     # except for the runtime information, which we'll gather next
2738
2739     # Due to the way our RPC system works, exact response times cannot be
2740     # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2741     # time before and after executing the request, we can at least have a time
2742     # window.
2743     nvinfo_starttime = time.time()
2744     all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2745                                            node_verify_param,
2746                                            self.cfg.GetClusterName(),
2747                                            self.cfg.GetClusterInfo().hvparams)
2748     nvinfo_endtime = time.time()
2749
2750     if self.extra_lv_nodes and vg_name is not None:
2751       extra_lv_nvinfo = \
2752           self.rpc.call_node_verify(self.extra_lv_nodes,
2753                                     {constants.NV_LVLIST: vg_name},
2754                                     self.cfg.GetClusterName(),
2755                                     self.cfg.GetClusterInfo().hvparams)
2756     else:
2757       extra_lv_nvinfo = {}
2758
2759     all_drbd_map = self.cfg.ComputeDRBDMap()
2760
2761     feedback_fn("* Gathering disk information (%s nodes)" %
2762                 len(self.my_node_uuids))
2763     instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2764                                      self.my_inst_info)
2765
2766     feedback_fn("* Verifying configuration file consistency")
2767
2768     # If not all nodes are being checked, we need to make sure the master node
2769     # and a non-checked vm_capable node are in the list.
2770     absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2771     if absent_node_uuids:
2772       vf_nvinfo = all_nvinfo.copy()
2773       vf_node_info = list(self.my_node_info.values())
2774       additional_node_uuids = []
2775       if master_node_uuid not in self.my_node_info:
2776         additional_node_uuids.append(master_node_uuid)
2777         vf_node_info.append(self.all_node_info[master_node_uuid])
2778       # Add the first vm_capable node we find which is not included,
2779       # excluding the master node (which we already have)
2780       for node_uuid in absent_node_uuids:
2781         nodeinfo = self.all_node_info[node_uuid]
2782         if (nodeinfo.vm_capable and not nodeinfo.offline and
2783             node_uuid != master_node_uuid):
2784           additional_node_uuids.append(node_uuid)
2785           vf_node_info.append(self.all_node_info[node_uuid])
2786           break
2787       key = constants.NV_FILELIST
2788       vf_nvinfo.update(self.rpc.call_node_verify(
2789          additional_node_uuids, {key: node_verify_param[key]},
2790          self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2791     else:
2792       vf_nvinfo = all_nvinfo
2793       vf_node_info = self.my_node_info.values()
2794
2795     self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2796
2797     feedback_fn("* Verifying node status")
2798
2799     refos_img = None
2800
2801     for node_i in node_data_list:
2802       nimg = node_image[node_i.uuid]
2803
2804       if node_i.offline:
2805         if verbose:
2806           feedback_fn("* Skipping offline node %s" % (node_i.name,))
2807         n_offline += 1
2808         continue
2809
2810       if node_i.uuid == master_node_uuid:
2811         ntype = "master"
2812       elif node_i.master_candidate:
2813         ntype = "master candidate"
2814       elif node_i.drained:
2815         ntype = "drained"
2816         n_drained += 1
2817       else:
2818         ntype = "regular"
2819       if verbose:
2820         feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2821
2822       msg = all_nvinfo[node_i.uuid].fail_msg
2823       self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2824                     "while contacting node: %s", msg)
2825       if msg:
2826         nimg.rpc_fail = True
2827         continue
2828
2829       nresult = all_nvinfo[node_i.uuid].payload
2830
2831       nimg.call_ok = self._VerifyNode(node_i, nresult)
2832       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2833       self._VerifyNodeNetwork(node_i, nresult)
2834       self._VerifyNodeUserScripts(node_i, nresult)
2835       self._VerifyOob(node_i, nresult)
2836       self._VerifyFileStoragePaths(node_i, nresult,
2837                                    node_i.uuid == master_node_uuid,
2838                                    cluster.enabled_disk_templates)
2839
2840       if nimg.vm_capable:
2841         self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2842         self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2843                              all_drbd_map)
2844
2845         self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2846         self._UpdateNodeInstances(node_i, nresult, nimg)
2847         self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2848         self._UpdateNodeOS(node_i, nresult, nimg)
2849
2850         if not nimg.os_fail:
2851           if refos_img is None:
2852             refos_img = nimg
2853           self._VerifyNodeOS(node_i, nimg, refos_img)
2854         self._VerifyNodeBridges(node_i, nresult, bridges)
2855
2856         # Check whether all running instances are primary for the node. (This
2857         # can no longer be done from _VerifyInstance below, since some of the
2858         # wrong instances could be from other node groups.)
2859         non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
2860
2861         for inst_uuid in non_primary_inst_uuids:
2862           test = inst_uuid in self.all_inst_info
2863           self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
2864                         self.cfg.GetInstanceName(inst_uuid),
2865                         "instance should not run on node %s", node_i.name)
2866           self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
2867                         "node is running unknown instance %s", inst_uuid)
2868
2869     self._VerifyGroupDRBDVersion(all_nvinfo)
2870     self._VerifyGroupLVM(node_image, vg_name)
2871
2872     for node_uuid, result in extra_lv_nvinfo.items():
2873       self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
2874                               node_image[node_uuid], vg_name)
2875
2876     feedback_fn("* Verifying instance status")
2877     for inst_uuid in self.my_inst_uuids:
2878       instance = self.my_inst_info[inst_uuid]
2879       if verbose:
2880         feedback_fn("* Verifying instance %s" % instance.name)
2881       self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
2882
2883       # If the instance is non-redundant we cannot survive losing its primary
2884       # node, so we are not N+1 compliant.
2885       if instance.disk_template not in constants.DTS_MIRRORED:
2886         i_non_redundant.append(instance)
2887
2888       if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
2889         i_non_a_balanced.append(instance)
2890
2891     feedback_fn("* Verifying orphan volumes")
2892     reserved = utils.FieldSet(*cluster.reserved_lvs)
2893
2894     # We will get spurious "unknown volume" warnings if any node of this group
2895     # is secondary for an instance whose primary is in another group. To avoid
2896     # them, we find these instances and add their volumes to node_vol_should.
2897     for instance in self.all_inst_info.values():
2898       for secondary in instance.secondary_nodes:
2899         if (secondary in self.my_node_info
2900             and instance.name not in self.my_inst_info):
2901           instance.MapLVsByNode(node_vol_should)
2902           break
2903
2904     self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
2905
2906     if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
2907       feedback_fn("* Verifying N+1 Memory redundancy")
2908       self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
2909
2910     feedback_fn("* Other Notes")
2911     if i_non_redundant:
2912       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
2913                   % len(i_non_redundant))
2914
2915     if i_non_a_balanced:
2916       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
2917                   % len(i_non_a_balanced))
2918
2919     if i_offline:
2920       feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
2921
2922     if n_offline:
2923       feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
2924
2925     if n_drained:
2926       feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
2927
2928     return not self.bad
2929
2930   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
2931     """Analyze the post-hooks' result
2932
2933     This method analyses the hook result, handles it, and sends some
2934     nicely-formatted feedback back to the user.
2935
2936     @param phase: one of L{constants.HOOKS_PHASE_POST} or
2937         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
2938     @param hooks_results: the results of the multi-node hooks rpc call
2939     @param feedback_fn: function used send feedback back to the caller
2940     @param lu_result: previous Exec result
2941     @return: the new Exec result, based on the previous result
2942         and hook results
2943
2944     """
2945     # We only really run POST phase hooks, only for non-empty groups,
2946     # and are only interested in their results
2947     if not self.my_node_uuids:
2948       # empty node group
2949       pass
2950     elif phase == constants.HOOKS_PHASE_POST:
2951       # Used to change hooks' output to proper indentation
2952       feedback_fn("* Hooks Results")
2953       assert hooks_results, "invalid result from hooks"
2954
2955       for node_name in hooks_results:
2956         res = hooks_results[node_name]
2957         msg = res.fail_msg
2958         test = msg and not res.offline
2959         self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2960                       "Communication failure in hooks execution: %s", msg)
2961         if res.offline or msg:
2962           # No need to investigate payload if node is offline or gave
2963           # an error.
2964           continue
2965         for script, hkr, output in res.payload:
2966           test = hkr == constants.HKR_FAIL
2967           self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
2968                         "Script %s failed, output:", script)
2969           if test:
2970             output = self._HOOKS_INDENT_RE.sub("      ", output)
2971             feedback_fn("%s" % output)
2972             lu_result = False
2973
2974     return lu_result
2975
2976
2977 class LUClusterVerifyDisks(NoHooksLU):
2978   """Verifies the cluster disks status.
2979
2980   """
2981   REQ_BGL = False
2982
2983   def ExpandNames(self):
2984     self.share_locks = ShareAll()
2985     self.needed_locks = {
2986       locking.LEVEL_NODEGROUP: locking.ALL_SET,
2987       }
2988
2989   def Exec(self, feedback_fn):
2990     group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
2991
2992     # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
2993     return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
2994                            for group in group_names])