Extend unit tests for LUClusterVerifyGroup
[ganeti-local] / lib / cmdlib / cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with the cluster."""
23
24 import OpenSSL
25
26 import copy
27 import itertools
28 import logging
29 import operator
30 import os
31 import re
32 import time
33
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
51
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53   ResultWithJobs
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55   ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56   GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57   GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58   CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59   ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob
60
61 import ganeti.masterd.instance
62
63
64 class LUClusterActivateMasterIp(NoHooksLU):
65   """Activate the master IP on the master node.
66
67   """
68   def Exec(self, feedback_fn):
69     """Activate the master IP.
70
71     """
72     master_params = self.cfg.GetMasterNetworkParameters()
73     ems = self.cfg.GetUseExternalMipScript()
74     result = self.rpc.call_node_activate_master_ip(master_params.uuid,
75                                                    master_params, ems)
76     result.Raise("Could not activate the master IP")
77
78
79 class LUClusterDeactivateMasterIp(NoHooksLU):
80   """Deactivate the master IP on the master node.
81
82   """
83   def Exec(self, feedback_fn):
84     """Deactivate the master IP.
85
86     """
87     master_params = self.cfg.GetMasterNetworkParameters()
88     ems = self.cfg.GetUseExternalMipScript()
89     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
90                                                      master_params, ems)
91     result.Raise("Could not deactivate the master IP")
92
93
94 class LUClusterConfigQuery(NoHooksLU):
95   """Return configuration values.
96
97   """
98   REQ_BGL = False
99
100   def CheckArguments(self):
101     self.cq = ClusterQuery(None, self.op.output_fields, False)
102
103   def ExpandNames(self):
104     self.cq.ExpandNames(self)
105
106   def DeclareLocks(self, level):
107     self.cq.DeclareLocks(self, level)
108
109   def Exec(self, feedback_fn):
110     result = self.cq.OldStyleQuery(self)
111
112     assert len(result) == 1
113
114     return result[0]
115
116
117 class LUClusterDestroy(LogicalUnit):
118   """Logical unit for destroying the cluster.
119
120   """
121   HPATH = "cluster-destroy"
122   HTYPE = constants.HTYPE_CLUSTER
123
124   def BuildHooksEnv(self):
125     """Build hooks env.
126
127     """
128     return {
129       "OP_TARGET": self.cfg.GetClusterName(),
130       }
131
132   def BuildHooksNodes(self):
133     """Build hooks nodes.
134
135     """
136     return ([], [])
137
138   def CheckPrereq(self):
139     """Check prerequisites.
140
141     This checks whether the cluster is empty.
142
143     Any errors are signaled by raising errors.OpPrereqError.
144
145     """
146     master = self.cfg.GetMasterNode()
147
148     nodelist = self.cfg.GetNodeList()
149     if len(nodelist) != 1 or nodelist[0] != master:
150       raise errors.OpPrereqError("There are still %d node(s) in"
151                                  " this cluster." % (len(nodelist) - 1),
152                                  errors.ECODE_INVAL)
153     instancelist = self.cfg.GetInstanceList()
154     if instancelist:
155       raise errors.OpPrereqError("There are still %d instance(s) in"
156                                  " this cluster." % len(instancelist),
157                                  errors.ECODE_INVAL)
158
159   def Exec(self, feedback_fn):
160     """Destroys the cluster.
161
162     """
163     master_params = self.cfg.GetMasterNetworkParameters()
164
165     # Run post hooks on master node before it's removed
166     RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
167
168     ems = self.cfg.GetUseExternalMipScript()
169     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
170                                                      master_params, ems)
171     result.Warn("Error disabling the master IP address", self.LogWarning)
172     return master_params.uuid
173
174
175 class LUClusterPostInit(LogicalUnit):
176   """Logical unit for running hooks after cluster initialization.
177
178   """
179   HPATH = "cluster-init"
180   HTYPE = constants.HTYPE_CLUSTER
181
182   def BuildHooksEnv(self):
183     """Build hooks env.
184
185     """
186     return {
187       "OP_TARGET": self.cfg.GetClusterName(),
188       }
189
190   def BuildHooksNodes(self):
191     """Build hooks nodes.
192
193     """
194     return ([], [self.cfg.GetMasterNode()])
195
196   def Exec(self, feedback_fn):
197     """Nothing to do.
198
199     """
200     return True
201
202
203 class ClusterQuery(QueryBase):
204   FIELDS = query.CLUSTER_FIELDS
205
206   #: Do not sort (there is only one item)
207   SORT_FIELD = None
208
209   def ExpandNames(self, lu):
210     lu.needed_locks = {}
211
212     # The following variables interact with _QueryBase._GetNames
213     self.wanted = locking.ALL_SET
214     self.do_locking = self.use_locking
215
216     if self.do_locking:
217       raise errors.OpPrereqError("Can not use locking for cluster queries",
218                                  errors.ECODE_INVAL)
219
220   def DeclareLocks(self, lu, level):
221     pass
222
223   def _GetQueryData(self, lu):
224     """Computes the list of nodes and their attributes.
225
226     """
227     # Locking is not used
228     assert not (compat.any(lu.glm.is_owned(level)
229                            for level in locking.LEVELS
230                            if level != locking.LEVEL_CLUSTER) or
231                 self.do_locking or self.use_locking)
232
233     if query.CQ_CONFIG in self.requested_data:
234       cluster = lu.cfg.GetClusterInfo()
235       nodes = lu.cfg.GetAllNodesInfo()
236     else:
237       cluster = NotImplemented
238       nodes = NotImplemented
239
240     if query.CQ_QUEUE_DRAINED in self.requested_data:
241       drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
242     else:
243       drain_flag = NotImplemented
244
245     if query.CQ_WATCHER_PAUSE in self.requested_data:
246       master_node_uuid = lu.cfg.GetMasterNode()
247
248       result = lu.rpc.call_get_watcher_pause(master_node_uuid)
249       result.Raise("Can't retrieve watcher pause from master node '%s'" %
250                    lu.cfg.GetMasterNodeName())
251
252       watcher_pause = result.payload
253     else:
254       watcher_pause = NotImplemented
255
256     return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
257
258
259 class LUClusterQuery(NoHooksLU):
260   """Query cluster configuration.
261
262   """
263   REQ_BGL = False
264
265   def ExpandNames(self):
266     self.needed_locks = {}
267
268   def Exec(self, feedback_fn):
269     """Return cluster config.
270
271     """
272     cluster = self.cfg.GetClusterInfo()
273     os_hvp = {}
274
275     # Filter just for enabled hypervisors
276     for os_name, hv_dict in cluster.os_hvp.items():
277       os_hvp[os_name] = {}
278       for hv_name, hv_params in hv_dict.items():
279         if hv_name in cluster.enabled_hypervisors:
280           os_hvp[os_name][hv_name] = hv_params
281
282     # Convert ip_family to ip_version
283     primary_ip_version = constants.IP4_VERSION
284     if cluster.primary_ip_family == netutils.IP6Address.family:
285       primary_ip_version = constants.IP6_VERSION
286
287     result = {
288       "software_version": constants.RELEASE_VERSION,
289       "protocol_version": constants.PROTOCOL_VERSION,
290       "config_version": constants.CONFIG_VERSION,
291       "os_api_version": max(constants.OS_API_VERSIONS),
292       "export_version": constants.EXPORT_VERSION,
293       "vcs_version": constants.VCS_VERSION,
294       "architecture": runtime.GetArchInfo(),
295       "name": cluster.cluster_name,
296       "master": self.cfg.GetMasterNodeName(),
297       "default_hypervisor": cluster.primary_hypervisor,
298       "enabled_hypervisors": cluster.enabled_hypervisors,
299       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
300                         for hypervisor_name in cluster.enabled_hypervisors]),
301       "os_hvp": os_hvp,
302       "beparams": cluster.beparams,
303       "osparams": cluster.osparams,
304       "ipolicy": cluster.ipolicy,
305       "nicparams": cluster.nicparams,
306       "ndparams": cluster.ndparams,
307       "diskparams": cluster.diskparams,
308       "candidate_pool_size": cluster.candidate_pool_size,
309       "master_netdev": cluster.master_netdev,
310       "master_netmask": cluster.master_netmask,
311       "use_external_mip_script": cluster.use_external_mip_script,
312       "volume_group_name": cluster.volume_group_name,
313       "drbd_usermode_helper": cluster.drbd_usermode_helper,
314       "file_storage_dir": cluster.file_storage_dir,
315       "shared_file_storage_dir": cluster.shared_file_storage_dir,
316       "maintain_node_health": cluster.maintain_node_health,
317       "ctime": cluster.ctime,
318       "mtime": cluster.mtime,
319       "uuid": cluster.uuid,
320       "tags": list(cluster.GetTags()),
321       "uid_pool": cluster.uid_pool,
322       "default_iallocator": cluster.default_iallocator,
323       "reserved_lvs": cluster.reserved_lvs,
324       "primary_ip_version": primary_ip_version,
325       "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
326       "hidden_os": cluster.hidden_os,
327       "blacklisted_os": cluster.blacklisted_os,
328       "enabled_disk_templates": cluster.enabled_disk_templates,
329       }
330
331     return result
332
333
334 class LUClusterRedistConf(NoHooksLU):
335   """Force the redistribution of cluster configuration.
336
337   This is a very simple LU.
338
339   """
340   REQ_BGL = False
341
342   def ExpandNames(self):
343     self.needed_locks = {
344       locking.LEVEL_NODE: locking.ALL_SET,
345       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
346     }
347     self.share_locks = ShareAll()
348
349   def Exec(self, feedback_fn):
350     """Redistribute the configuration.
351
352     """
353     self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
354     RedistributeAncillaryFiles(self)
355
356
357 class LUClusterRename(LogicalUnit):
358   """Rename the cluster.
359
360   """
361   HPATH = "cluster-rename"
362   HTYPE = constants.HTYPE_CLUSTER
363
364   def BuildHooksEnv(self):
365     """Build hooks env.
366
367     """
368     return {
369       "OP_TARGET": self.cfg.GetClusterName(),
370       "NEW_NAME": self.op.name,
371       }
372
373   def BuildHooksNodes(self):
374     """Build hooks nodes.
375
376     """
377     return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
378
379   def CheckPrereq(self):
380     """Verify that the passed name is a valid one.
381
382     """
383     hostname = netutils.GetHostname(name=self.op.name,
384                                     family=self.cfg.GetPrimaryIPFamily())
385
386     new_name = hostname.name
387     self.ip = new_ip = hostname.ip
388     old_name = self.cfg.GetClusterName()
389     old_ip = self.cfg.GetMasterIP()
390     if new_name == old_name and new_ip == old_ip:
391       raise errors.OpPrereqError("Neither the name nor the IP address of the"
392                                  " cluster has changed",
393                                  errors.ECODE_INVAL)
394     if new_ip != old_ip:
395       if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
396         raise errors.OpPrereqError("The given cluster IP address (%s) is"
397                                    " reachable on the network" %
398                                    new_ip, errors.ECODE_NOTUNIQUE)
399
400     self.op.name = new_name
401
402   def Exec(self, feedback_fn):
403     """Rename the cluster.
404
405     """
406     clustername = self.op.name
407     new_ip = self.ip
408
409     # shutdown the master IP
410     master_params = self.cfg.GetMasterNetworkParameters()
411     ems = self.cfg.GetUseExternalMipScript()
412     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
413                                                      master_params, ems)
414     result.Raise("Could not disable the master role")
415
416     try:
417       cluster = self.cfg.GetClusterInfo()
418       cluster.cluster_name = clustername
419       cluster.master_ip = new_ip
420       self.cfg.Update(cluster, feedback_fn)
421
422       # update the known hosts file
423       ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
424       node_list = self.cfg.GetOnlineNodeList()
425       try:
426         node_list.remove(master_params.uuid)
427       except ValueError:
428         pass
429       UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
430     finally:
431       master_params.ip = new_ip
432       result = self.rpc.call_node_activate_master_ip(master_params.uuid,
433                                                      master_params, ems)
434       result.Warn("Could not re-enable the master role on the master,"
435                   " please restart manually", self.LogWarning)
436
437     return clustername
438
439
440 class LUClusterRepairDiskSizes(NoHooksLU):
441   """Verifies the cluster disks sizes.
442
443   """
444   REQ_BGL = False
445
446   def ExpandNames(self):
447     if self.op.instances:
448       (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
449       # Not getting the node allocation lock as only a specific set of
450       # instances (and their nodes) is going to be acquired
451       self.needed_locks = {
452         locking.LEVEL_NODE_RES: [],
453         locking.LEVEL_INSTANCE: self.wanted_names,
454         }
455       self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
456     else:
457       self.wanted_names = None
458       self.needed_locks = {
459         locking.LEVEL_NODE_RES: locking.ALL_SET,
460         locking.LEVEL_INSTANCE: locking.ALL_SET,
461
462         # This opcode is acquires the node locks for all instances
463         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
464         }
465
466     self.share_locks = {
467       locking.LEVEL_NODE_RES: 1,
468       locking.LEVEL_INSTANCE: 0,
469       locking.LEVEL_NODE_ALLOC: 1,
470       }
471
472   def DeclareLocks(self, level):
473     if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
474       self._LockInstancesNodes(primary_only=True, level=level)
475
476   def CheckPrereq(self):
477     """Check prerequisites.
478
479     This only checks the optional instance list against the existing names.
480
481     """
482     if self.wanted_names is None:
483       self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
484
485     self.wanted_instances = \
486         map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
487
488   def _EnsureChildSizes(self, disk):
489     """Ensure children of the disk have the needed disk size.
490
491     This is valid mainly for DRBD8 and fixes an issue where the
492     children have smaller disk size.
493
494     @param disk: an L{ganeti.objects.Disk} object
495
496     """
497     if disk.dev_type == constants.LD_DRBD8:
498       assert disk.children, "Empty children for DRBD8?"
499       fchild = disk.children[0]
500       mismatch = fchild.size < disk.size
501       if mismatch:
502         self.LogInfo("Child disk has size %d, parent %d, fixing",
503                      fchild.size, disk.size)
504         fchild.size = disk.size
505
506       # and we recurse on this child only, not on the metadev
507       return self._EnsureChildSizes(fchild) or mismatch
508     else:
509       return False
510
511   def Exec(self, feedback_fn):
512     """Verify the size of cluster disks.
513
514     """
515     # TODO: check child disks too
516     # TODO: check differences in size between primary/secondary nodes
517     per_node_disks = {}
518     for instance in self.wanted_instances:
519       pnode = instance.primary_node
520       if pnode not in per_node_disks:
521         per_node_disks[pnode] = []
522       for idx, disk in enumerate(instance.disks):
523         per_node_disks[pnode].append((instance, idx, disk))
524
525     assert not (frozenset(per_node_disks.keys()) -
526                 self.owned_locks(locking.LEVEL_NODE_RES)), \
527       "Not owning correct locks"
528     assert not self.owned_locks(locking.LEVEL_NODE)
529
530     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
531                                                per_node_disks.keys())
532
533     changed = []
534     for node_uuid, dskl in per_node_disks.items():
535       newl = [v[2].Copy() for v in dskl]
536       for dsk in newl:
537         self.cfg.SetDiskID(dsk, node_uuid)
538       node_name = self.cfg.GetNodeName(node_uuid)
539       result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
540       if result.fail_msg:
541         self.LogWarning("Failure in blockdev_getdimensions call to node"
542                         " %s, ignoring", node_name)
543         continue
544       if len(result.payload) != len(dskl):
545         logging.warning("Invalid result from node %s: len(dksl)=%d,"
546                         " result.payload=%s", node_name, len(dskl),
547                         result.payload)
548         self.LogWarning("Invalid result from node %s, ignoring node results",
549                         node_name)
550         continue
551       for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
552         if dimensions is None:
553           self.LogWarning("Disk %d of instance %s did not return size"
554                           " information, ignoring", idx, instance.name)
555           continue
556         if not isinstance(dimensions, (tuple, list)):
557           self.LogWarning("Disk %d of instance %s did not return valid"
558                           " dimension information, ignoring", idx,
559                           instance.name)
560           continue
561         (size, spindles) = dimensions
562         if not isinstance(size, (int, long)):
563           self.LogWarning("Disk %d of instance %s did not return valid"
564                           " size information, ignoring", idx, instance.name)
565           continue
566         size = size >> 20
567         if size != disk.size:
568           self.LogInfo("Disk %d of instance %s has mismatched size,"
569                        " correcting: recorded %d, actual %d", idx,
570                        instance.name, disk.size, size)
571           disk.size = size
572           self.cfg.Update(instance, feedback_fn)
573           changed.append((instance.name, idx, "size", size))
574         if es_flags[node_uuid]:
575           if spindles is None:
576             self.LogWarning("Disk %d of instance %s did not return valid"
577                             " spindles information, ignoring", idx,
578                             instance.name)
579           elif disk.spindles is None or disk.spindles != spindles:
580             self.LogInfo("Disk %d of instance %s has mismatched spindles,"
581                          " correcting: recorded %s, actual %s",
582                          idx, instance.name, disk.spindles, spindles)
583             disk.spindles = spindles
584             self.cfg.Update(instance, feedback_fn)
585             changed.append((instance.name, idx, "spindles", disk.spindles))
586         if self._EnsureChildSizes(disk):
587           self.cfg.Update(instance, feedback_fn)
588           changed.append((instance.name, idx, "size", disk.size))
589     return changed
590
591
592 def _ValidateNetmask(cfg, netmask):
593   """Checks if a netmask is valid.
594
595   @type cfg: L{config.ConfigWriter}
596   @param cfg: The cluster configuration
597   @type netmask: int
598   @param netmask: the netmask to be verified
599   @raise errors.OpPrereqError: if the validation fails
600
601   """
602   ip_family = cfg.GetPrimaryIPFamily()
603   try:
604     ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
605   except errors.ProgrammerError:
606     raise errors.OpPrereqError("Invalid primary ip family: %s." %
607                                ip_family, errors.ECODE_INVAL)
608   if not ipcls.ValidateNetmask(netmask):
609     raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
610                                (netmask), errors.ECODE_INVAL)
611
612
613 def CheckFileBasedStoragePathVsEnabledDiskTemplates(
614     logging_warn_fn, file_storage_dir, enabled_disk_templates,
615     file_disk_template):
616   """Checks whether the given file-based storage directory is acceptable.
617
618   Note: This function is public, because it is also used in bootstrap.py.
619
620   @type logging_warn_fn: function
621   @param logging_warn_fn: function which accepts a string and logs it
622   @type file_storage_dir: string
623   @param file_storage_dir: the directory to be used for file-based instances
624   @type enabled_disk_templates: list of string
625   @param enabled_disk_templates: the list of enabled disk templates
626   @type file_disk_template: string
627   @param file_disk_template: the file-based disk template for which the
628       path should be checked
629
630   """
631   assert (file_disk_template in
632           utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
633   file_storage_enabled = file_disk_template in enabled_disk_templates
634   if file_storage_dir is not None:
635     if file_storage_dir == "":
636       if file_storage_enabled:
637         raise errors.OpPrereqError(
638             "Unsetting the '%s' storage directory while having '%s' storage"
639             " enabled is not permitted." %
640             (file_disk_template, file_disk_template))
641     else:
642       if not file_storage_enabled:
643         logging_warn_fn(
644             "Specified a %s storage directory, although %s storage is not"
645             " enabled." % (file_disk_template, file_disk_template))
646   else:
647     raise errors.ProgrammerError("Received %s storage dir with value"
648                                  " 'None'." % file_disk_template)
649
650
651 def CheckFileStoragePathVsEnabledDiskTemplates(
652     logging_warn_fn, file_storage_dir, enabled_disk_templates):
653   """Checks whether the given file storage directory is acceptable.
654
655   @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
656
657   """
658   CheckFileBasedStoragePathVsEnabledDiskTemplates(
659       logging_warn_fn, file_storage_dir, enabled_disk_templates,
660       constants.DT_FILE)
661
662
663 def CheckSharedFileStoragePathVsEnabledDiskTemplates(
664     logging_warn_fn, file_storage_dir, enabled_disk_templates):
665   """Checks whether the given shared file storage directory is acceptable.
666
667   @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
668
669   """
670   CheckFileBasedStoragePathVsEnabledDiskTemplates(
671       logging_warn_fn, file_storage_dir, enabled_disk_templates,
672       constants.DT_SHARED_FILE)
673
674
675 class LUClusterSetParams(LogicalUnit):
676   """Change the parameters of the cluster.
677
678   """
679   HPATH = "cluster-modify"
680   HTYPE = constants.HTYPE_CLUSTER
681   REQ_BGL = False
682
683   def CheckArguments(self):
684     """Check parameters
685
686     """
687     if self.op.uid_pool:
688       uidpool.CheckUidPool(self.op.uid_pool)
689
690     if self.op.add_uids:
691       uidpool.CheckUidPool(self.op.add_uids)
692
693     if self.op.remove_uids:
694       uidpool.CheckUidPool(self.op.remove_uids)
695
696     if self.op.master_netmask is not None:
697       _ValidateNetmask(self.cfg, self.op.master_netmask)
698
699     if self.op.diskparams:
700       for dt_params in self.op.diskparams.values():
701         utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
702       try:
703         utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
704       except errors.OpPrereqError, err:
705         raise errors.OpPrereqError("While verify diskparams options: %s" % err,
706                                    errors.ECODE_INVAL)
707
708   def ExpandNames(self):
709     # FIXME: in the future maybe other cluster params won't require checking on
710     # all nodes to be modified.
711     # FIXME: This opcode changes cluster-wide settings. Is acquiring all
712     # resource locks the right thing, shouldn't it be the BGL instead?
713     self.needed_locks = {
714       locking.LEVEL_NODE: locking.ALL_SET,
715       locking.LEVEL_INSTANCE: locking.ALL_SET,
716       locking.LEVEL_NODEGROUP: locking.ALL_SET,
717       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
718     }
719     self.share_locks = ShareAll()
720
721   def BuildHooksEnv(self):
722     """Build hooks env.
723
724     """
725     return {
726       "OP_TARGET": self.cfg.GetClusterName(),
727       "NEW_VG_NAME": self.op.vg_name,
728       }
729
730   def BuildHooksNodes(self):
731     """Build hooks nodes.
732
733     """
734     mn = self.cfg.GetMasterNode()
735     return ([mn], [mn])
736
737   def _CheckVgName(self, node_uuids, enabled_disk_templates,
738                    new_enabled_disk_templates):
739     """Check the consistency of the vg name on all nodes and in case it gets
740        unset whether there are instances still using it.
741
742     """
743     if self.op.vg_name is not None and not self.op.vg_name:
744       if self.cfg.HasAnyDiskOfType(constants.LD_LV):
745         raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
746                                    " instances exist", errors.ECODE_INVAL)
747
748     if (self.op.vg_name is not None and
749         utils.IsLvmEnabled(enabled_disk_templates)) or \
750            (self.cfg.GetVGName() is not None and
751             utils.LvmGetsEnabled(enabled_disk_templates,
752                                  new_enabled_disk_templates)):
753       self._CheckVgNameOnNodes(node_uuids)
754
755   def _CheckVgNameOnNodes(self, node_uuids):
756     """Check the status of the volume group on each node.
757
758     """
759     vglist = self.rpc.call_vg_list(node_uuids)
760     for node_uuid in node_uuids:
761       msg = vglist[node_uuid].fail_msg
762       if msg:
763         # ignoring down node
764         self.LogWarning("Error while gathering data on node %s"
765                         " (ignoring node): %s",
766                         self.cfg.GetNodeName(node_uuid), msg)
767         continue
768       vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
769                                             self.op.vg_name,
770                                             constants.MIN_VG_SIZE)
771       if vgstatus:
772         raise errors.OpPrereqError("Error on node '%s': %s" %
773                                    (self.cfg.GetNodeName(node_uuid), vgstatus),
774                                    errors.ECODE_ENVIRON)
775
776   def _GetEnabledDiskTemplates(self, cluster):
777     """Determines the enabled disk templates and the subset of disk templates
778        that are newly enabled by this operation.
779
780     """
781     enabled_disk_templates = None
782     new_enabled_disk_templates = []
783     if self.op.enabled_disk_templates:
784       enabled_disk_templates = self.op.enabled_disk_templates
785       new_enabled_disk_templates = \
786         list(set(enabled_disk_templates)
787              - set(cluster.enabled_disk_templates))
788     else:
789       enabled_disk_templates = cluster.enabled_disk_templates
790     return (enabled_disk_templates, new_enabled_disk_templates)
791
792   def CheckPrereq(self):
793     """Check prerequisites.
794
795     This checks whether the given params don't conflict and
796     if the given volume group is valid.
797
798     """
799     if self.op.drbd_helper is not None and not self.op.drbd_helper:
800       if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
801         raise errors.OpPrereqError("Cannot disable drbd helper while"
802                                    " drbd-based instances exist",
803                                    errors.ECODE_INVAL)
804
805     node_uuids = self.owned_locks(locking.LEVEL_NODE)
806     self.cluster = cluster = self.cfg.GetClusterInfo()
807
808     vm_capable_node_uuids = [node.uuid
809                              for node in self.cfg.GetAllNodesInfo().values()
810                              if node.uuid in node_uuids and node.vm_capable]
811
812     (enabled_disk_templates, new_enabled_disk_templates) = \
813       self._GetEnabledDiskTemplates(cluster)
814
815     self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
816                       new_enabled_disk_templates)
817
818     if self.op.file_storage_dir is not None:
819       CheckFileStoragePathVsEnabledDiskTemplates(
820           self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
821
822     if self.op.shared_file_storage_dir is not None:
823       CheckSharedFileStoragePathVsEnabledDiskTemplates(
824           self.LogWarning, self.op.shared_file_storage_dir,
825           enabled_disk_templates)
826
827     if self.op.drbd_helper:
828       # checks given drbd helper on all nodes
829       helpers = self.rpc.call_drbd_helper(node_uuids)
830       for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
831         if ninfo.offline:
832           self.LogInfo("Not checking drbd helper on offline node %s",
833                        ninfo.name)
834           continue
835         msg = helpers[ninfo.uuid].fail_msg
836         if msg:
837           raise errors.OpPrereqError("Error checking drbd helper on node"
838                                      " '%s': %s" % (ninfo.name, msg),
839                                      errors.ECODE_ENVIRON)
840         node_helper = helpers[ninfo.uuid].payload
841         if node_helper != self.op.drbd_helper:
842           raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
843                                      (ninfo.name, node_helper),
844                                      errors.ECODE_ENVIRON)
845
846     # validate params changes
847     if self.op.beparams:
848       objects.UpgradeBeParams(self.op.beparams)
849       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
850       self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
851
852     if self.op.ndparams:
853       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
854       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
855
856       # TODO: we need a more general way to handle resetting
857       # cluster-level parameters to default values
858       if self.new_ndparams["oob_program"] == "":
859         self.new_ndparams["oob_program"] = \
860             constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
861
862     if self.op.hv_state:
863       new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
864                                            self.cluster.hv_state_static)
865       self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
866                                for hv, values in new_hv_state.items())
867
868     if self.op.disk_state:
869       new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
870                                                self.cluster.disk_state_static)
871       self.new_disk_state = \
872         dict((storage, dict((name, cluster.SimpleFillDiskState(values))
873                             for name, values in svalues.items()))
874              for storage, svalues in new_disk_state.items())
875
876     if self.op.ipolicy:
877       self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
878                                            group_policy=False)
879
880       all_instances = self.cfg.GetAllInstancesInfo().values()
881       violations = set()
882       for group in self.cfg.GetAllNodeGroupsInfo().values():
883         instances = frozenset([inst for inst in all_instances
884                                if compat.any(nuuid in group.members
885                                              for nuuid in inst.all_nodes)])
886         new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
887         ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
888         new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
889                                            self.cfg)
890         if new:
891           violations.update(new)
892
893       if violations:
894         self.LogWarning("After the ipolicy change the following instances"
895                         " violate them: %s",
896                         utils.CommaJoin(utils.NiceSort(violations)))
897
898     if self.op.nicparams:
899       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
900       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
901       objects.NIC.CheckParameterSyntax(self.new_nicparams)
902       nic_errors = []
903
904       # check all instances for consistency
905       for instance in self.cfg.GetAllInstancesInfo().values():
906         for nic_idx, nic in enumerate(instance.nics):
907           params_copy = copy.deepcopy(nic.nicparams)
908           params_filled = objects.FillDict(self.new_nicparams, params_copy)
909
910           # check parameter syntax
911           try:
912             objects.NIC.CheckParameterSyntax(params_filled)
913           except errors.ConfigurationError, err:
914             nic_errors.append("Instance %s, nic/%d: %s" %
915                               (instance.name, nic_idx, err))
916
917           # if we're moving instances to routed, check that they have an ip
918           target_mode = params_filled[constants.NIC_MODE]
919           if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
920             nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
921                               " address" % (instance.name, nic_idx))
922       if nic_errors:
923         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
924                                    "\n".join(nic_errors), errors.ECODE_INVAL)
925
926     # hypervisor list/parameters
927     self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
928     if self.op.hvparams:
929       for hv_name, hv_dict in self.op.hvparams.items():
930         if hv_name not in self.new_hvparams:
931           self.new_hvparams[hv_name] = hv_dict
932         else:
933           self.new_hvparams[hv_name].update(hv_dict)
934
935     # disk template parameters
936     self.new_diskparams = objects.FillDict(cluster.diskparams, {})
937     if self.op.diskparams:
938       for dt_name, dt_params in self.op.diskparams.items():
939         if dt_name not in self.new_diskparams:
940           self.new_diskparams[dt_name] = dt_params
941         else:
942           self.new_diskparams[dt_name].update(dt_params)
943
944     # os hypervisor parameters
945     self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
946     if self.op.os_hvp:
947       for os_name, hvs in self.op.os_hvp.items():
948         if os_name not in self.new_os_hvp:
949           self.new_os_hvp[os_name] = hvs
950         else:
951           for hv_name, hv_dict in hvs.items():
952             if hv_dict is None:
953               # Delete if it exists
954               self.new_os_hvp[os_name].pop(hv_name, None)
955             elif hv_name not in self.new_os_hvp[os_name]:
956               self.new_os_hvp[os_name][hv_name] = hv_dict
957             else:
958               self.new_os_hvp[os_name][hv_name].update(hv_dict)
959
960     # os parameters
961     self.new_osp = objects.FillDict(cluster.osparams, {})
962     if self.op.osparams:
963       for os_name, osp in self.op.osparams.items():
964         if os_name not in self.new_osp:
965           self.new_osp[os_name] = {}
966
967         self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
968                                                  use_none=True)
969
970         if not self.new_osp[os_name]:
971           # we removed all parameters
972           del self.new_osp[os_name]
973         else:
974           # check the parameter validity (remote check)
975           CheckOSParams(self, False, [self.cfg.GetMasterNode()],
976                         os_name, self.new_osp[os_name])
977
978     # changes to the hypervisor list
979     if self.op.enabled_hypervisors is not None:
980       self.hv_list = self.op.enabled_hypervisors
981       for hv in self.hv_list:
982         # if the hypervisor doesn't already exist in the cluster
983         # hvparams, we initialize it to empty, and then (in both
984         # cases) we make sure to fill the defaults, as we might not
985         # have a complete defaults list if the hypervisor wasn't
986         # enabled before
987         if hv not in new_hvp:
988           new_hvp[hv] = {}
989         new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
990         utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
991     else:
992       self.hv_list = cluster.enabled_hypervisors
993
994     if self.op.hvparams or self.op.enabled_hypervisors is not None:
995       # either the enabled list has changed, or the parameters have, validate
996       for hv_name, hv_params in self.new_hvparams.items():
997         if ((self.op.hvparams and hv_name in self.op.hvparams) or
998             (self.op.enabled_hypervisors and
999              hv_name in self.op.enabled_hypervisors)):
1000           # either this is a new hypervisor, or its parameters have changed
1001           hv_class = hypervisor.GetHypervisorClass(hv_name)
1002           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1003           hv_class.CheckParameterSyntax(hv_params)
1004           CheckHVParams(self, node_uuids, hv_name, hv_params)
1005
1006     self._CheckDiskTemplateConsistency()
1007
1008     if self.op.os_hvp:
1009       # no need to check any newly-enabled hypervisors, since the
1010       # defaults have already been checked in the above code-block
1011       for os_name, os_hvp in self.new_os_hvp.items():
1012         for hv_name, hv_params in os_hvp.items():
1013           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1014           # we need to fill in the new os_hvp on top of the actual hv_p
1015           cluster_defaults = self.new_hvparams.get(hv_name, {})
1016           new_osp = objects.FillDict(cluster_defaults, hv_params)
1017           hv_class = hypervisor.GetHypervisorClass(hv_name)
1018           hv_class.CheckParameterSyntax(new_osp)
1019           CheckHVParams(self, node_uuids, hv_name, new_osp)
1020
1021     if self.op.default_iallocator:
1022       alloc_script = utils.FindFile(self.op.default_iallocator,
1023                                     constants.IALLOCATOR_SEARCH_PATH,
1024                                     os.path.isfile)
1025       if alloc_script is None:
1026         raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1027                                    " specified" % self.op.default_iallocator,
1028                                    errors.ECODE_INVAL)
1029
1030   def _CheckDiskTemplateConsistency(self):
1031     """Check whether the disk templates that are going to be disabled
1032        are still in use by some instances.
1033
1034     """
1035     if self.op.enabled_disk_templates:
1036       cluster = self.cfg.GetClusterInfo()
1037       instances = self.cfg.GetAllInstancesInfo()
1038
1039       disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1040         - set(self.op.enabled_disk_templates)
1041       for instance in instances.itervalues():
1042         if instance.disk_template in disk_templates_to_remove:
1043           raise errors.OpPrereqError("Cannot disable disk template '%s',"
1044                                      " because instance '%s' is using it." %
1045                                      (instance.disk_template, instance.name))
1046
1047   def _SetVgName(self, feedback_fn):
1048     """Determines and sets the new volume group name.
1049
1050     """
1051     if self.op.vg_name is not None:
1052       if self.op.vg_name and not \
1053            utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1054         feedback_fn("Note that you specified a volume group, but did not"
1055                     " enable any lvm disk template.")
1056       new_volume = self.op.vg_name
1057       if not new_volume:
1058         if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1059           raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
1060                                      " disk templates are enabled.")
1061         new_volume = None
1062       if new_volume != self.cfg.GetVGName():
1063         self.cfg.SetVGName(new_volume)
1064       else:
1065         feedback_fn("Cluster LVM configuration already in desired"
1066                     " state, not changing")
1067     else:
1068       if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
1069           not self.cfg.GetVGName():
1070         raise errors.OpPrereqError("Please specify a volume group when"
1071                                    " enabling lvm-based disk-templates.")
1072
1073   def _SetFileStorageDir(self, feedback_fn):
1074     """Set the file storage directory.
1075
1076     """
1077     if self.op.file_storage_dir is not None:
1078       if self.cluster.file_storage_dir == self.op.file_storage_dir:
1079         feedback_fn("Global file storage dir already set to value '%s'"
1080                     % self.cluster.file_storage_dir)
1081       else:
1082         self.cluster.file_storage_dir = self.op.file_storage_dir
1083
1084   def Exec(self, feedback_fn):
1085     """Change the parameters of the cluster.
1086
1087     """
1088     if self.op.enabled_disk_templates:
1089       self.cluster.enabled_disk_templates = \
1090         list(set(self.op.enabled_disk_templates))
1091
1092     self._SetVgName(feedback_fn)
1093     self._SetFileStorageDir(feedback_fn)
1094
1095     if self.op.drbd_helper is not None:
1096       if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1097         feedback_fn("Note that you specified a drbd user helper, but did not"
1098                     " enable the drbd disk template.")
1099       new_helper = self.op.drbd_helper
1100       if not new_helper:
1101         new_helper = None
1102       if new_helper != self.cfg.GetDRBDHelper():
1103         self.cfg.SetDRBDHelper(new_helper)
1104       else:
1105         feedback_fn("Cluster DRBD helper already in desired state,"
1106                     " not changing")
1107     if self.op.hvparams:
1108       self.cluster.hvparams = self.new_hvparams
1109     if self.op.os_hvp:
1110       self.cluster.os_hvp = self.new_os_hvp
1111     if self.op.enabled_hypervisors is not None:
1112       self.cluster.hvparams = self.new_hvparams
1113       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1114     if self.op.beparams:
1115       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1116     if self.op.nicparams:
1117       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1118     if self.op.ipolicy:
1119       self.cluster.ipolicy = self.new_ipolicy
1120     if self.op.osparams:
1121       self.cluster.osparams = self.new_osp
1122     if self.op.ndparams:
1123       self.cluster.ndparams = self.new_ndparams
1124     if self.op.diskparams:
1125       self.cluster.diskparams = self.new_diskparams
1126     if self.op.hv_state:
1127       self.cluster.hv_state_static = self.new_hv_state
1128     if self.op.disk_state:
1129       self.cluster.disk_state_static = self.new_disk_state
1130
1131     if self.op.candidate_pool_size is not None:
1132       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1133       # we need to update the pool size here, otherwise the save will fail
1134       AdjustCandidatePool(self, [])
1135
1136     if self.op.maintain_node_health is not None:
1137       if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1138         feedback_fn("Note: CONFD was disabled at build time, node health"
1139                     " maintenance is not useful (still enabling it)")
1140       self.cluster.maintain_node_health = self.op.maintain_node_health
1141
1142     if self.op.modify_etc_hosts is not None:
1143       self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1144
1145     if self.op.prealloc_wipe_disks is not None:
1146       self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1147
1148     if self.op.add_uids is not None:
1149       uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1150
1151     if self.op.remove_uids is not None:
1152       uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1153
1154     if self.op.uid_pool is not None:
1155       self.cluster.uid_pool = self.op.uid_pool
1156
1157     if self.op.default_iallocator is not None:
1158       self.cluster.default_iallocator = self.op.default_iallocator
1159
1160     if self.op.reserved_lvs is not None:
1161       self.cluster.reserved_lvs = self.op.reserved_lvs
1162
1163     if self.op.use_external_mip_script is not None:
1164       self.cluster.use_external_mip_script = self.op.use_external_mip_script
1165
1166     def helper_os(aname, mods, desc):
1167       desc += " OS list"
1168       lst = getattr(self.cluster, aname)
1169       for key, val in mods:
1170         if key == constants.DDM_ADD:
1171           if val in lst:
1172             feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1173           else:
1174             lst.append(val)
1175         elif key == constants.DDM_REMOVE:
1176           if val in lst:
1177             lst.remove(val)
1178           else:
1179             feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1180         else:
1181           raise errors.ProgrammerError("Invalid modification '%s'" % key)
1182
1183     if self.op.hidden_os:
1184       helper_os("hidden_os", self.op.hidden_os, "hidden")
1185
1186     if self.op.blacklisted_os:
1187       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1188
1189     if self.op.master_netdev:
1190       master_params = self.cfg.GetMasterNetworkParameters()
1191       ems = self.cfg.GetUseExternalMipScript()
1192       feedback_fn("Shutting down master ip on the current netdev (%s)" %
1193                   self.cluster.master_netdev)
1194       result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1195                                                        master_params, ems)
1196       if not self.op.force:
1197         result.Raise("Could not disable the master ip")
1198       else:
1199         if result.fail_msg:
1200           msg = ("Could not disable the master ip (continuing anyway): %s" %
1201                  result.fail_msg)
1202           feedback_fn(msg)
1203       feedback_fn("Changing master_netdev from %s to %s" %
1204                   (master_params.netdev, self.op.master_netdev))
1205       self.cluster.master_netdev = self.op.master_netdev
1206
1207     if self.op.master_netmask:
1208       master_params = self.cfg.GetMasterNetworkParameters()
1209       feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1210       result = self.rpc.call_node_change_master_netmask(
1211                  master_params.uuid, master_params.netmask,
1212                  self.op.master_netmask, master_params.ip,
1213                  master_params.netdev)
1214       result.Warn("Could not change the master IP netmask", feedback_fn)
1215       self.cluster.master_netmask = self.op.master_netmask
1216
1217     self.cfg.Update(self.cluster, feedback_fn)
1218
1219     if self.op.master_netdev:
1220       master_params = self.cfg.GetMasterNetworkParameters()
1221       feedback_fn("Starting the master ip on the new master netdev (%s)" %
1222                   self.op.master_netdev)
1223       ems = self.cfg.GetUseExternalMipScript()
1224       result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1225                                                      master_params, ems)
1226       result.Warn("Could not re-enable the master ip on the master,"
1227                   " please restart manually", self.LogWarning)
1228
1229
1230 class LUClusterVerify(NoHooksLU):
1231   """Submits all jobs necessary to verify the cluster.
1232
1233   """
1234   REQ_BGL = False
1235
1236   def ExpandNames(self):
1237     self.needed_locks = {}
1238
1239   def Exec(self, feedback_fn):
1240     jobs = []
1241
1242     if self.op.group_name:
1243       groups = [self.op.group_name]
1244       depends_fn = lambda: None
1245     else:
1246       groups = self.cfg.GetNodeGroupList()
1247
1248       # Verify global configuration
1249       jobs.append([
1250         opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1251         ])
1252
1253       # Always depend on global verification
1254       depends_fn = lambda: [(-len(jobs), [])]
1255
1256     jobs.extend(
1257       [opcodes.OpClusterVerifyGroup(group_name=group,
1258                                     ignore_errors=self.op.ignore_errors,
1259                                     depends=depends_fn())]
1260       for group in groups)
1261
1262     # Fix up all parameters
1263     for op in itertools.chain(*jobs): # pylint: disable=W0142
1264       op.debug_simulate_errors = self.op.debug_simulate_errors
1265       op.verbose = self.op.verbose
1266       op.error_codes = self.op.error_codes
1267       try:
1268         op.skip_checks = self.op.skip_checks
1269       except AttributeError:
1270         assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1271
1272     return ResultWithJobs(jobs)
1273
1274
1275 class _VerifyErrors(object):
1276   """Mix-in for cluster/group verify LUs.
1277
1278   It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1279   self.op and self._feedback_fn to be available.)
1280
1281   """
1282
1283   ETYPE_FIELD = "code"
1284   ETYPE_ERROR = "ERROR"
1285   ETYPE_WARNING = "WARNING"
1286
1287   def _Error(self, ecode, item, msg, *args, **kwargs):
1288     """Format an error message.
1289
1290     Based on the opcode's error_codes parameter, either format a
1291     parseable error code, or a simpler error string.
1292
1293     This must be called only from Exec and functions called from Exec.
1294
1295     """
1296     ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1297     itype, etxt, _ = ecode
1298     # If the error code is in the list of ignored errors, demote the error to a
1299     # warning
1300     if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1301       ltype = self.ETYPE_WARNING
1302     # first complete the msg
1303     if args:
1304       msg = msg % args
1305     # then format the whole message
1306     if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1307       msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1308     else:
1309       if item:
1310         item = " " + item
1311       else:
1312         item = ""
1313       msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1314     # and finally report it via the feedback_fn
1315     self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1316     # do not mark the operation as failed for WARN cases only
1317     if ltype == self.ETYPE_ERROR:
1318       self.bad = True
1319
1320   def _ErrorIf(self, cond, *args, **kwargs):
1321     """Log an error message if the passed condition is True.
1322
1323     """
1324     if (bool(cond)
1325         or self.op.debug_simulate_errors): # pylint: disable=E1101
1326       self._Error(*args, **kwargs)
1327
1328
1329 def _VerifyCertificate(filename):
1330   """Verifies a certificate for L{LUClusterVerifyConfig}.
1331
1332   @type filename: string
1333   @param filename: Path to PEM file
1334
1335   """
1336   try:
1337     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1338                                            utils.ReadFile(filename))
1339   except Exception, err: # pylint: disable=W0703
1340     return (LUClusterVerifyConfig.ETYPE_ERROR,
1341             "Failed to load X509 certificate %s: %s" % (filename, err))
1342
1343   (errcode, msg) = \
1344     utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1345                                 constants.SSL_CERT_EXPIRATION_ERROR)
1346
1347   if msg:
1348     fnamemsg = "While verifying %s: %s" % (filename, msg)
1349   else:
1350     fnamemsg = None
1351
1352   if errcode is None:
1353     return (None, fnamemsg)
1354   elif errcode == utils.CERT_WARNING:
1355     return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1356   elif errcode == utils.CERT_ERROR:
1357     return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1358
1359   raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1360
1361
1362 def _GetAllHypervisorParameters(cluster, instances):
1363   """Compute the set of all hypervisor parameters.
1364
1365   @type cluster: L{objects.Cluster}
1366   @param cluster: the cluster object
1367   @param instances: list of L{objects.Instance}
1368   @param instances: additional instances from which to obtain parameters
1369   @rtype: list of (origin, hypervisor, parameters)
1370   @return: a list with all parameters found, indicating the hypervisor they
1371        apply to, and the origin (can be "cluster", "os X", or "instance Y")
1372
1373   """
1374   hvp_data = []
1375
1376   for hv_name in cluster.enabled_hypervisors:
1377     hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1378
1379   for os_name, os_hvp in cluster.os_hvp.items():
1380     for hv_name, hv_params in os_hvp.items():
1381       if hv_params:
1382         full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1383         hvp_data.append(("os %s" % os_name, hv_name, full_params))
1384
1385   # TODO: collapse identical parameter values in a single one
1386   for instance in instances:
1387     if instance.hvparams:
1388       hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1389                        cluster.FillHV(instance)))
1390
1391   return hvp_data
1392
1393
1394 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1395   """Verifies the cluster config.
1396
1397   """
1398   REQ_BGL = False
1399
1400   def _VerifyHVP(self, hvp_data):
1401     """Verifies locally the syntax of the hypervisor parameters.
1402
1403     """
1404     for item, hv_name, hv_params in hvp_data:
1405       msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1406              (item, hv_name))
1407       try:
1408         hv_class = hypervisor.GetHypervisorClass(hv_name)
1409         utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1410         hv_class.CheckParameterSyntax(hv_params)
1411       except errors.GenericError, err:
1412         self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1413
1414   def ExpandNames(self):
1415     self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1416     self.share_locks = ShareAll()
1417
1418   def CheckPrereq(self):
1419     """Check prerequisites.
1420
1421     """
1422     # Retrieve all information
1423     self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1424     self.all_node_info = self.cfg.GetAllNodesInfo()
1425     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1426
1427   def Exec(self, feedback_fn):
1428     """Verify integrity of cluster, performing various test on nodes.
1429
1430     """
1431     self.bad = False
1432     self._feedback_fn = feedback_fn
1433
1434     feedback_fn("* Verifying cluster config")
1435
1436     for msg in self.cfg.VerifyConfig():
1437       self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1438
1439     feedback_fn("* Verifying cluster certificate files")
1440
1441     for cert_filename in pathutils.ALL_CERT_FILES:
1442       (errcode, msg) = _VerifyCertificate(cert_filename)
1443       self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1444
1445     self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1446                                     pathutils.NODED_CERT_FILE),
1447                   constants.CV_ECLUSTERCERT,
1448                   None,
1449                   pathutils.NODED_CERT_FILE + " must be accessible by the " +
1450                     constants.LUXID_USER + " user")
1451
1452     feedback_fn("* Verifying hypervisor parameters")
1453
1454     self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1455                                                 self.all_inst_info.values()))
1456
1457     feedback_fn("* Verifying all nodes belong to an existing group")
1458
1459     # We do this verification here because, should this bogus circumstance
1460     # occur, it would never be caught by VerifyGroup, which only acts on
1461     # nodes/instances reachable from existing node groups.
1462
1463     dangling_nodes = set(node for node in self.all_node_info.values()
1464                          if node.group not in self.all_group_info)
1465
1466     dangling_instances = {}
1467     no_node_instances = []
1468
1469     for inst in self.all_inst_info.values():
1470       if inst.primary_node in [node.uuid for node in dangling_nodes]:
1471         dangling_instances.setdefault(inst.primary_node, []).append(inst)
1472       elif inst.primary_node not in self.all_node_info:
1473         no_node_instances.append(inst)
1474
1475     pretty_dangling = [
1476         "%s (%s)" %
1477         (node.name,
1478          utils.CommaJoin(inst.name for
1479                          inst in dangling_instances.get(node.uuid, [])))
1480         for node in dangling_nodes]
1481
1482     self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1483                   None,
1484                   "the following nodes (and their instances) belong to a non"
1485                   " existing group: %s", utils.CommaJoin(pretty_dangling))
1486
1487     self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1488                   None,
1489                   "the following instances have a non-existing primary-node:"
1490                   " %s", utils.CommaJoin(inst.name for
1491                                          inst in no_node_instances))
1492
1493     return not self.bad
1494
1495
1496 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1497   """Verifies the status of a node group.
1498
1499   """
1500   HPATH = "cluster-verify"
1501   HTYPE = constants.HTYPE_CLUSTER
1502   REQ_BGL = False
1503
1504   _HOOKS_INDENT_RE = re.compile("^", re.M)
1505
1506   class NodeImage(object):
1507     """A class representing the logical and physical status of a node.
1508
1509     @type uuid: string
1510     @ivar uuid: the node UUID to which this object refers
1511     @ivar volumes: a structure as returned from
1512         L{ganeti.backend.GetVolumeList} (runtime)
1513     @ivar instances: a list of running instances (runtime)
1514     @ivar pinst: list of configured primary instances (config)
1515     @ivar sinst: list of configured secondary instances (config)
1516     @ivar sbp: dictionary of {primary-node: list of instances} for all
1517         instances for which this node is secondary (config)
1518     @ivar mfree: free memory, as reported by hypervisor (runtime)
1519     @ivar dfree: free disk, as reported by the node (runtime)
1520     @ivar offline: the offline status (config)
1521     @type rpc_fail: boolean
1522     @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1523         not whether the individual keys were correct) (runtime)
1524     @type lvm_fail: boolean
1525     @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1526     @type hyp_fail: boolean
1527     @ivar hyp_fail: whether the RPC call didn't return the instance list
1528     @type ghost: boolean
1529     @ivar ghost: whether this is a known node or not (config)
1530     @type os_fail: boolean
1531     @ivar os_fail: whether the RPC call didn't return valid OS data
1532     @type oslist: list
1533     @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1534     @type vm_capable: boolean
1535     @ivar vm_capable: whether the node can host instances
1536     @type pv_min: float
1537     @ivar pv_min: size in MiB of the smallest PVs
1538     @type pv_max: float
1539     @ivar pv_max: size in MiB of the biggest PVs
1540
1541     """
1542     def __init__(self, offline=False, uuid=None, vm_capable=True):
1543       self.uuid = uuid
1544       self.volumes = {}
1545       self.instances = []
1546       self.pinst = []
1547       self.sinst = []
1548       self.sbp = {}
1549       self.mfree = 0
1550       self.dfree = 0
1551       self.offline = offline
1552       self.vm_capable = vm_capable
1553       self.rpc_fail = False
1554       self.lvm_fail = False
1555       self.hyp_fail = False
1556       self.ghost = False
1557       self.os_fail = False
1558       self.oslist = {}
1559       self.pv_min = None
1560       self.pv_max = None
1561
1562   def ExpandNames(self):
1563     # This raises errors.OpPrereqError on its own:
1564     self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1565
1566     # Get instances in node group; this is unsafe and needs verification later
1567     inst_uuids = \
1568       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1569
1570     self.needed_locks = {
1571       locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1572       locking.LEVEL_NODEGROUP: [self.group_uuid],
1573       locking.LEVEL_NODE: [],
1574
1575       # This opcode is run by watcher every five minutes and acquires all nodes
1576       # for a group. It doesn't run for a long time, so it's better to acquire
1577       # the node allocation lock as well.
1578       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1579       }
1580
1581     self.share_locks = ShareAll()
1582
1583   def DeclareLocks(self, level):
1584     if level == locking.LEVEL_NODE:
1585       # Get members of node group; this is unsafe and needs verification later
1586       nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1587
1588       # In Exec(), we warn about mirrored instances that have primary and
1589       # secondary living in separate node groups. To fully verify that
1590       # volumes for these instances are healthy, we will need to do an
1591       # extra call to their secondaries. We ensure here those nodes will
1592       # be locked.
1593       for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1594         # Important: access only the instances whose lock is owned
1595         instance = self.cfg.GetInstanceInfoByName(inst_name)
1596         if instance.disk_template in constants.DTS_INT_MIRROR:
1597           nodes.update(instance.secondary_nodes)
1598
1599       self.needed_locks[locking.LEVEL_NODE] = nodes
1600
1601   def CheckPrereq(self):
1602     assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1603     self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1604
1605     group_node_uuids = set(self.group_info.members)
1606     group_inst_uuids = \
1607       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1608
1609     unlocked_node_uuids = \
1610         group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1611
1612     unlocked_inst_uuids = \
1613         group_inst_uuids.difference(
1614           [self.cfg.GetInstanceInfoByName(name).uuid
1615            for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1616
1617     if unlocked_node_uuids:
1618       raise errors.OpPrereqError(
1619         "Missing lock for nodes: %s" %
1620         utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1621         errors.ECODE_STATE)
1622
1623     if unlocked_inst_uuids:
1624       raise errors.OpPrereqError(
1625         "Missing lock for instances: %s" %
1626         utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1627         errors.ECODE_STATE)
1628
1629     self.all_node_info = self.cfg.GetAllNodesInfo()
1630     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1631
1632     self.my_node_uuids = group_node_uuids
1633     self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1634                              for node_uuid in group_node_uuids)
1635
1636     self.my_inst_uuids = group_inst_uuids
1637     self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1638                              for inst_uuid in group_inst_uuids)
1639
1640     # We detect here the nodes that will need the extra RPC calls for verifying
1641     # split LV volumes; they should be locked.
1642     extra_lv_nodes = set()
1643
1644     for inst in self.my_inst_info.values():
1645       if inst.disk_template in constants.DTS_INT_MIRROR:
1646         for nuuid in inst.all_nodes:
1647           if self.all_node_info[nuuid].group != self.group_uuid:
1648             extra_lv_nodes.add(nuuid)
1649
1650     unlocked_lv_nodes = \
1651         extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1652
1653     if unlocked_lv_nodes:
1654       raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1655                                  utils.CommaJoin(unlocked_lv_nodes),
1656                                  errors.ECODE_STATE)
1657     self.extra_lv_nodes = list(extra_lv_nodes)
1658
1659   def _VerifyNode(self, ninfo, nresult):
1660     """Perform some basic validation on data returned from a node.
1661
1662       - check the result data structure is well formed and has all the
1663         mandatory fields
1664       - check ganeti version
1665
1666     @type ninfo: L{objects.Node}
1667     @param ninfo: the node to check
1668     @param nresult: the results from the node
1669     @rtype: boolean
1670     @return: whether overall this call was successful (and we can expect
1671          reasonable values in the respose)
1672
1673     """
1674     # main result, nresult should be a non-empty dict
1675     test = not nresult or not isinstance(nresult, dict)
1676     self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1677                   "unable to verify node: no data returned")
1678     if test:
1679       return False
1680
1681     # compares ganeti version
1682     local_version = constants.PROTOCOL_VERSION
1683     remote_version = nresult.get("version", None)
1684     test = not (remote_version and
1685                 isinstance(remote_version, (list, tuple)) and
1686                 len(remote_version) == 2)
1687     self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1688                   "connection to node returned invalid data")
1689     if test:
1690       return False
1691
1692     test = local_version != remote_version[0]
1693     self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1694                   "incompatible protocol versions: master %s,"
1695                   " node %s", local_version, remote_version[0])
1696     if test:
1697       return False
1698
1699     # node seems compatible, we can actually try to look into its results
1700
1701     # full package version
1702     self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1703                   constants.CV_ENODEVERSION, ninfo.name,
1704                   "software version mismatch: master %s, node %s",
1705                   constants.RELEASE_VERSION, remote_version[1],
1706                   code=self.ETYPE_WARNING)
1707
1708     hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1709     if ninfo.vm_capable and isinstance(hyp_result, dict):
1710       for hv_name, hv_result in hyp_result.iteritems():
1711         test = hv_result is not None
1712         self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1713                       "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1714
1715     hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1716     if ninfo.vm_capable and isinstance(hvp_result, list):
1717       for item, hv_name, hv_result in hvp_result:
1718         self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1719                       "hypervisor %s parameter verify failure (source %s): %s",
1720                       hv_name, item, hv_result)
1721
1722     test = nresult.get(constants.NV_NODESETUP,
1723                        ["Missing NODESETUP results"])
1724     self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1725                   "node setup error: %s", "; ".join(test))
1726
1727     return True
1728
1729   def _VerifyNodeTime(self, ninfo, nresult,
1730                       nvinfo_starttime, nvinfo_endtime):
1731     """Check the node time.
1732
1733     @type ninfo: L{objects.Node}
1734     @param ninfo: the node to check
1735     @param nresult: the remote results for the node
1736     @param nvinfo_starttime: the start time of the RPC call
1737     @param nvinfo_endtime: the end time of the RPC call
1738
1739     """
1740     ntime = nresult.get(constants.NV_TIME, None)
1741     try:
1742       ntime_merged = utils.MergeTime(ntime)
1743     except (ValueError, TypeError):
1744       self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1745                     "Node returned invalid time")
1746       return
1747
1748     if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1749       ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1750     elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1751       ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1752     else:
1753       ntime_diff = None
1754
1755     self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1756                   "Node time diverges by at least %s from master node time",
1757                   ntime_diff)
1758
1759   def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1760     """Check the node LVM results and update info for cross-node checks.
1761
1762     @type ninfo: L{objects.Node}
1763     @param ninfo: the node to check
1764     @param nresult: the remote results for the node
1765     @param vg_name: the configured VG name
1766     @type nimg: L{NodeImage}
1767     @param nimg: node image
1768
1769     """
1770     if vg_name is None:
1771       return
1772
1773     # checks vg existence and size > 20G
1774     vglist = nresult.get(constants.NV_VGLIST, None)
1775     test = not vglist
1776     self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1777                   "unable to check volume groups")
1778     if not test:
1779       vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1780                                             constants.MIN_VG_SIZE)
1781       self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1782
1783     # Check PVs
1784     (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1785     for em in errmsgs:
1786       self._Error(constants.CV_ENODELVM, ninfo.name, em)
1787     if pvminmax is not None:
1788       (nimg.pv_min, nimg.pv_max) = pvminmax
1789
1790   def _VerifyGroupDRBDVersion(self, node_verify_infos):
1791     """Check cross-node DRBD version consistency.
1792
1793     @type node_verify_infos: dict
1794     @param node_verify_infos: infos about nodes as returned from the
1795       node_verify call.
1796
1797     """
1798     node_versions = {}
1799     for node_uuid, ndata in node_verify_infos.items():
1800       nresult = ndata.payload
1801       version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1802       node_versions[node_uuid] = version
1803
1804     if len(set(node_versions.values())) > 1:
1805       for node_uuid, version in sorted(node_versions.items()):
1806         msg = "DRBD version mismatch: %s" % version
1807         self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1808                     code=self.ETYPE_WARNING)
1809
1810   def _VerifyGroupLVM(self, node_image, vg_name):
1811     """Check cross-node consistency in LVM.
1812
1813     @type node_image: dict
1814     @param node_image: info about nodes, mapping from node to names to
1815       L{NodeImage} objects
1816     @param vg_name: the configured VG name
1817
1818     """
1819     if vg_name is None:
1820       return
1821
1822     # Only exclusive storage needs this kind of checks
1823     if not self._exclusive_storage:
1824       return
1825
1826     # exclusive_storage wants all PVs to have the same size (approximately),
1827     # if the smallest and the biggest ones are okay, everything is fine.
1828     # pv_min is None iff pv_max is None
1829     vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1830     if not vals:
1831       return
1832     (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1833     (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1834     bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1835     self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1836                   "PV sizes differ too much in the group; smallest (%s MB) is"
1837                   " on %s, biggest (%s MB) is on %s",
1838                   pvmin, self.cfg.GetNodeName(minnode_uuid),
1839                   pvmax, self.cfg.GetNodeName(maxnode_uuid))
1840
1841   def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1842     """Check the node bridges.
1843
1844     @type ninfo: L{objects.Node}
1845     @param ninfo: the node to check
1846     @param nresult: the remote results for the node
1847     @param bridges: the expected list of bridges
1848
1849     """
1850     if not bridges:
1851       return
1852
1853     missing = nresult.get(constants.NV_BRIDGES, None)
1854     test = not isinstance(missing, list)
1855     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1856                   "did not return valid bridge information")
1857     if not test:
1858       self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1859                     "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1860
1861   def _VerifyNodeUserScripts(self, ninfo, nresult):
1862     """Check the results of user scripts presence and executability on the node
1863
1864     @type ninfo: L{objects.Node}
1865     @param ninfo: the node to check
1866     @param nresult: the remote results for the node
1867
1868     """
1869     test = not constants.NV_USERSCRIPTS in nresult
1870     self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1871                   "did not return user scripts information")
1872
1873     broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1874     if not test:
1875       self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1876                     "user scripts not present or not executable: %s" %
1877                     utils.CommaJoin(sorted(broken_scripts)))
1878
1879   def _VerifyNodeNetwork(self, ninfo, nresult):
1880     """Check the node network connectivity results.
1881
1882     @type ninfo: L{objects.Node}
1883     @param ninfo: the node to check
1884     @param nresult: the remote results for the node
1885
1886     """
1887     test = constants.NV_NODELIST not in nresult
1888     self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1889                   "node hasn't returned node ssh connectivity data")
1890     if not test:
1891       if nresult[constants.NV_NODELIST]:
1892         for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1893           self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1894                         "ssh communication with node '%s': %s", a_node, a_msg)
1895
1896     test = constants.NV_NODENETTEST not in nresult
1897     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1898                   "node hasn't returned node tcp connectivity data")
1899     if not test:
1900       if nresult[constants.NV_NODENETTEST]:
1901         nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1902         for anode in nlist:
1903           self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1904                         "tcp communication with node '%s': %s",
1905                         anode, nresult[constants.NV_NODENETTEST][anode])
1906
1907     test = constants.NV_MASTERIP not in nresult
1908     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1909                   "node hasn't returned node master IP reachability data")
1910     if not test:
1911       if not nresult[constants.NV_MASTERIP]:
1912         if ninfo.uuid == self.master_node:
1913           msg = "the master node cannot reach the master IP (not configured?)"
1914         else:
1915           msg = "cannot reach the master IP"
1916         self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1917
1918   def _VerifyInstance(self, instance, node_image, diskstatus):
1919     """Verify an instance.
1920
1921     This function checks to see if the required block devices are
1922     available on the instance's node, and that the nodes are in the correct
1923     state.
1924
1925     """
1926     pnode_uuid = instance.primary_node
1927     pnode_img = node_image[pnode_uuid]
1928     groupinfo = self.cfg.GetAllNodeGroupsInfo()
1929
1930     node_vol_should = {}
1931     instance.MapLVsByNode(node_vol_should)
1932
1933     cluster = self.cfg.GetClusterInfo()
1934     ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1935                                                             self.group_info)
1936     err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1937     self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1938                   utils.CommaJoin(err), code=self.ETYPE_WARNING)
1939
1940     for node_uuid in node_vol_should:
1941       n_img = node_image[node_uuid]
1942       if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1943         # ignore missing volumes on offline or broken nodes
1944         continue
1945       for volume in node_vol_should[node_uuid]:
1946         test = volume not in n_img.volumes
1947         self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1948                       "volume %s missing on node %s", volume,
1949                       self.cfg.GetNodeName(node_uuid))
1950
1951     if instance.admin_state == constants.ADMINST_UP:
1952       test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1953       self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1954                     "instance not running on its primary node %s",
1955                      self.cfg.GetNodeName(pnode_uuid))
1956       self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1957                     instance.name, "instance is marked as running and lives on"
1958                     " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1959
1960     diskdata = [(nname, success, status, idx)
1961                 for (nname, disks) in diskstatus.items()
1962                 for idx, (success, status) in enumerate(disks)]
1963
1964     for nname, success, bdev_status, idx in diskdata:
1965       # the 'ghost node' construction in Exec() ensures that we have a
1966       # node here
1967       snode = node_image[nname]
1968       bad_snode = snode.ghost or snode.offline
1969       self._ErrorIf(instance.disks_active and
1970                     not success and not bad_snode,
1971                     constants.CV_EINSTANCEFAULTYDISK, instance.name,
1972                     "couldn't retrieve status for disk/%s on %s: %s",
1973                     idx, self.cfg.GetNodeName(nname), bdev_status)
1974
1975       if instance.disks_active and success and \
1976          (bdev_status.is_degraded or
1977           bdev_status.ldisk_status != constants.LDS_OKAY):
1978         msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
1979         if bdev_status.is_degraded:
1980           msg += " is degraded"
1981         if bdev_status.ldisk_status != constants.LDS_OKAY:
1982           msg += "; state is '%s'" % \
1983                  constants.LDS_NAMES[bdev_status.ldisk_status]
1984
1985         self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
1986
1987     self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
1988                   constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
1989                   "instance %s, connection to primary node failed",
1990                   instance.name)
1991
1992     self._ErrorIf(len(instance.secondary_nodes) > 1,
1993                   constants.CV_EINSTANCELAYOUT, instance.name,
1994                   "instance has multiple secondary nodes: %s",
1995                   utils.CommaJoin(instance.secondary_nodes),
1996                   code=self.ETYPE_WARNING)
1997
1998     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
1999     if any(es_flags.values()):
2000       if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2001         # Disk template not compatible with exclusive_storage: no instance
2002         # node should have the flag set
2003         es_nodes = [n
2004                     for (n, es) in es_flags.items()
2005                     if es]
2006         self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2007                     "instance has template %s, which is not supported on nodes"
2008                     " that have exclusive storage set: %s",
2009                     instance.disk_template,
2010                     utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2011       for (idx, disk) in enumerate(instance.disks):
2012         self._ErrorIf(disk.spindles is None,
2013                       constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2014                       "number of spindles not configured for disk %s while"
2015                       " exclusive storage is enabled, try running"
2016                       " gnt-cluster repair-disk-sizes", idx)
2017
2018     if instance.disk_template in constants.DTS_INT_MIRROR:
2019       instance_nodes = utils.NiceSort(instance.all_nodes)
2020       instance_groups = {}
2021
2022       for node_uuid in instance_nodes:
2023         instance_groups.setdefault(self.all_node_info[node_uuid].group,
2024                                    []).append(node_uuid)
2025
2026       pretty_list = [
2027         "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2028                            groupinfo[group].name)
2029         # Sort so that we always list the primary node first.
2030         for group, nodes in sorted(instance_groups.items(),
2031                                    key=lambda (_, nodes): pnode_uuid in nodes,
2032                                    reverse=True)]
2033
2034       self._ErrorIf(len(instance_groups) > 1,
2035                     constants.CV_EINSTANCESPLITGROUPS,
2036                     instance.name, "instance has primary and secondary nodes in"
2037                     " different groups: %s", utils.CommaJoin(pretty_list),
2038                     code=self.ETYPE_WARNING)
2039
2040     inst_nodes_offline = []
2041     for snode in instance.secondary_nodes:
2042       s_img = node_image[snode]
2043       self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2044                     self.cfg.GetNodeName(snode),
2045                     "instance %s, connection to secondary node failed",
2046                     instance.name)
2047
2048       if s_img.offline:
2049         inst_nodes_offline.append(snode)
2050
2051     # warn that the instance lives on offline nodes
2052     self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2053                   instance.name, "instance has offline secondary node(s) %s",
2054                   utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2055     # ... or ghost/non-vm_capable nodes
2056     for node_uuid in instance.all_nodes:
2057       self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2058                     instance.name, "instance lives on ghost node %s",
2059                     self.cfg.GetNodeName(node_uuid))
2060       self._ErrorIf(not node_image[node_uuid].vm_capable,
2061                     constants.CV_EINSTANCEBADNODE, instance.name,
2062                     "instance lives on non-vm_capable node %s",
2063                     self.cfg.GetNodeName(node_uuid))
2064
2065   def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2066     """Verify if there are any unknown volumes in the cluster.
2067
2068     The .os, .swap and backup volumes are ignored. All other volumes are
2069     reported as unknown.
2070
2071     @type reserved: L{ganeti.utils.FieldSet}
2072     @param reserved: a FieldSet of reserved volume names
2073
2074     """
2075     for node_uuid, n_img in node_image.items():
2076       if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2077           self.all_node_info[node_uuid].group != self.group_uuid):
2078         # skip non-healthy nodes
2079         continue
2080       for volume in n_img.volumes:
2081         test = ((node_uuid not in node_vol_should or
2082                 volume not in node_vol_should[node_uuid]) and
2083                 not reserved.Matches(volume))
2084         self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2085                       self.cfg.GetNodeName(node_uuid),
2086                       "volume %s is unknown", volume)
2087
2088   def _VerifyNPlusOneMemory(self, node_image, all_insts):
2089     """Verify N+1 Memory Resilience.
2090
2091     Check that if one single node dies we can still start all the
2092     instances it was primary for.
2093
2094     """
2095     cluster_info = self.cfg.GetClusterInfo()
2096     for node_uuid, n_img in node_image.items():
2097       # This code checks that every node which is now listed as
2098       # secondary has enough memory to host all instances it is
2099       # supposed to should a single other node in the cluster fail.
2100       # FIXME: not ready for failover to an arbitrary node
2101       # FIXME: does not support file-backed instances
2102       # WARNING: we currently take into account down instances as well
2103       # as up ones, considering that even if they're down someone
2104       # might want to start them even in the event of a node failure.
2105       if n_img.offline or \
2106          self.all_node_info[node_uuid].group != self.group_uuid:
2107         # we're skipping nodes marked offline and nodes in other groups from
2108         # the N+1 warning, since most likely we don't have good memory
2109         # information from them; we already list instances living on such
2110         # nodes, and that's enough warning
2111         continue
2112       #TODO(dynmem): also consider ballooning out other instances
2113       for prinode, inst_uuids in n_img.sbp.items():
2114         needed_mem = 0
2115         for inst_uuid in inst_uuids:
2116           bep = cluster_info.FillBE(all_insts[inst_uuid])
2117           if bep[constants.BE_AUTO_BALANCE]:
2118             needed_mem += bep[constants.BE_MINMEM]
2119         test = n_img.mfree < needed_mem
2120         self._ErrorIf(test, constants.CV_ENODEN1,
2121                       self.cfg.GetNodeName(node_uuid),
2122                       "not enough memory to accomodate instance failovers"
2123                       " should node %s fail (%dMiB needed, %dMiB available)",
2124                       self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2125
2126   def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2127                    (files_all, files_opt, files_mc, files_vm)):
2128     """Verifies file checksums collected from all nodes.
2129
2130     @param nodes: List of L{objects.Node} objects
2131     @param master_node_uuid: UUID of master node
2132     @param all_nvinfo: RPC results
2133
2134     """
2135     # Define functions determining which nodes to consider for a file
2136     files2nodefn = [
2137       (files_all, None),
2138       (files_mc, lambda node: (node.master_candidate or
2139                                node.uuid == master_node_uuid)),
2140       (files_vm, lambda node: node.vm_capable),
2141       ]
2142
2143     # Build mapping from filename to list of nodes which should have the file
2144     nodefiles = {}
2145     for (files, fn) in files2nodefn:
2146       if fn is None:
2147         filenodes = nodes
2148       else:
2149         filenodes = filter(fn, nodes)
2150       nodefiles.update((filename,
2151                         frozenset(map(operator.attrgetter("uuid"), filenodes)))
2152                        for filename in files)
2153
2154     assert set(nodefiles) == (files_all | files_mc | files_vm)
2155
2156     fileinfo = dict((filename, {}) for filename in nodefiles)
2157     ignore_nodes = set()
2158
2159     for node in nodes:
2160       if node.offline:
2161         ignore_nodes.add(node.uuid)
2162         continue
2163
2164       nresult = all_nvinfo[node.uuid]
2165
2166       if nresult.fail_msg or not nresult.payload:
2167         node_files = None
2168       else:
2169         fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2170         node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2171                           for (key, value) in fingerprints.items())
2172         del fingerprints
2173
2174       test = not (node_files and isinstance(node_files, dict))
2175       self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2176                     "Node did not return file checksum data")
2177       if test:
2178         ignore_nodes.add(node.uuid)
2179         continue
2180
2181       # Build per-checksum mapping from filename to nodes having it
2182       for (filename, checksum) in node_files.items():
2183         assert filename in nodefiles
2184         fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2185
2186     for (filename, checksums) in fileinfo.items():
2187       assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2188
2189       # Nodes having the file
2190       with_file = frozenset(node_uuid
2191                             for node_uuids in fileinfo[filename].values()
2192                             for node_uuid in node_uuids) - ignore_nodes
2193
2194       expected_nodes = nodefiles[filename] - ignore_nodes
2195
2196       # Nodes missing file
2197       missing_file = expected_nodes - with_file
2198
2199       if filename in files_opt:
2200         # All or no nodes
2201         self._ErrorIf(missing_file and missing_file != expected_nodes,
2202                       constants.CV_ECLUSTERFILECHECK, None,
2203                       "File %s is optional, but it must exist on all or no"
2204                       " nodes (not found on %s)",
2205                       filename,
2206                       utils.CommaJoin(
2207                         utils.NiceSort(
2208                           map(self.cfg.GetNodeName, missing_file))))
2209       else:
2210         self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2211                       "File %s is missing from node(s) %s", filename,
2212                       utils.CommaJoin(
2213                         utils.NiceSort(
2214                           map(self.cfg.GetNodeName, missing_file))))
2215
2216         # Warn if a node has a file it shouldn't
2217         unexpected = with_file - expected_nodes
2218         self._ErrorIf(unexpected,
2219                       constants.CV_ECLUSTERFILECHECK, None,
2220                       "File %s should not exist on node(s) %s",
2221                       filename, utils.CommaJoin(
2222                         utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2223
2224       # See if there are multiple versions of the file
2225       test = len(checksums) > 1
2226       if test:
2227         variants = ["variant %s on %s" %
2228                     (idx + 1,
2229                      utils.CommaJoin(utils.NiceSort(
2230                        map(self.cfg.GetNodeName, node_uuids))))
2231                     for (idx, (checksum, node_uuids)) in
2232                       enumerate(sorted(checksums.items()))]
2233       else:
2234         variants = []
2235
2236       self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2237                     "File %s found with %s different checksums (%s)",
2238                     filename, len(checksums), "; ".join(variants))
2239
2240   def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2241                       drbd_map):
2242     """Verifies and the node DRBD status.
2243
2244     @type ninfo: L{objects.Node}
2245     @param ninfo: the node to check
2246     @param nresult: the remote results for the node
2247     @param instanceinfo: the dict of instances
2248     @param drbd_helper: the configured DRBD usermode helper
2249     @param drbd_map: the DRBD map as returned by
2250         L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2251
2252     """
2253     if drbd_helper:
2254       helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2255       test = (helper_result is None)
2256       self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2257                     "no drbd usermode helper returned")
2258       if helper_result:
2259         status, payload = helper_result
2260         test = not status
2261         self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2262                       "drbd usermode helper check unsuccessful: %s", payload)
2263         test = status and (payload != drbd_helper)
2264         self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2265                       "wrong drbd usermode helper: %s", payload)
2266
2267     # compute the DRBD minors
2268     node_drbd = {}
2269     for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2270       test = inst_uuid not in instanceinfo
2271       self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2272                     "ghost instance '%s' in temporary DRBD map", inst_uuid)
2273         # ghost instance should not be running, but otherwise we
2274         # don't give double warnings (both ghost instance and
2275         # unallocated minor in use)
2276       if test:
2277         node_drbd[minor] = (inst_uuid, False)
2278       else:
2279         instance = instanceinfo[inst_uuid]
2280         node_drbd[minor] = (inst_uuid, instance.disks_active)
2281
2282     # and now check them
2283     used_minors = nresult.get(constants.NV_DRBDLIST, [])
2284     test = not isinstance(used_minors, (tuple, list))
2285     self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2286                   "cannot parse drbd status file: %s", str(used_minors))
2287     if test:
2288       # we cannot check drbd status
2289       return
2290
2291     for minor, (inst_uuid, must_exist) in node_drbd.items():
2292       test = minor not in used_minors and must_exist
2293       self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2294                     "drbd minor %d of instance %s is not active", minor,
2295                     self.cfg.GetInstanceName(inst_uuid))
2296     for minor in used_minors:
2297       test = minor not in node_drbd
2298       self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2299                     "unallocated drbd minor %d is in use", minor)
2300
2301   def _UpdateNodeOS(self, ninfo, nresult, nimg):
2302     """Builds the node OS structures.
2303
2304     @type ninfo: L{objects.Node}
2305     @param ninfo: the node to check
2306     @param nresult: the remote results for the node
2307     @param nimg: the node image object
2308
2309     """
2310     remote_os = nresult.get(constants.NV_OSLIST, None)
2311     test = (not isinstance(remote_os, list) or
2312             not compat.all(isinstance(v, list) and len(v) == 7
2313                            for v in remote_os))
2314
2315     self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2316                   "node hasn't returned valid OS data")
2317
2318     nimg.os_fail = test
2319
2320     if test:
2321       return
2322
2323     os_dict = {}
2324
2325     for (name, os_path, status, diagnose,
2326          variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2327
2328       if name not in os_dict:
2329         os_dict[name] = []
2330
2331       # parameters is a list of lists instead of list of tuples due to
2332       # JSON lacking a real tuple type, fix it:
2333       parameters = [tuple(v) for v in parameters]
2334       os_dict[name].append((os_path, status, diagnose,
2335                             set(variants), set(parameters), set(api_ver)))
2336
2337     nimg.oslist = os_dict
2338
2339   def _VerifyNodeOS(self, ninfo, nimg, base):
2340     """Verifies the node OS list.
2341
2342     @type ninfo: L{objects.Node}
2343     @param ninfo: the node to check
2344     @param nimg: the node image object
2345     @param base: the 'template' node we match against (e.g. from the master)
2346
2347     """
2348     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2349
2350     beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2351     for os_name, os_data in nimg.oslist.items():
2352       assert os_data, "Empty OS status for OS %s?!" % os_name
2353       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2354       self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2355                     "Invalid OS %s (located at %s): %s",
2356                     os_name, f_path, f_diag)
2357       self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2358                     "OS '%s' has multiple entries"
2359                     " (first one shadows the rest): %s",
2360                     os_name, utils.CommaJoin([v[0] for v in os_data]))
2361       # comparisons with the 'base' image
2362       test = os_name not in base.oslist
2363       self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2364                     "Extra OS %s not present on reference node (%s)",
2365                     os_name, self.cfg.GetNodeName(base.uuid))
2366       if test:
2367         continue
2368       assert base.oslist[os_name], "Base node has empty OS status?"
2369       _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2370       if not b_status:
2371         # base OS is invalid, skipping
2372         continue
2373       for kind, a, b in [("API version", f_api, b_api),
2374                          ("variants list", f_var, b_var),
2375                          ("parameters", beautify_params(f_param),
2376                           beautify_params(b_param))]:
2377         self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2378                       "OS %s for %s differs from reference node %s:"
2379                       " [%s] vs. [%s]", kind, os_name,
2380                       self.cfg.GetNodeName(base.uuid),
2381                       utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2382
2383     # check any missing OSes
2384     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2385     self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2386                   "OSes present on reference node %s"
2387                   " but missing on this node: %s",
2388                   self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2389
2390   def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2391     """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2392
2393     @type ninfo: L{objects.Node}
2394     @param ninfo: the node to check
2395     @param nresult: the remote results for the node
2396     @type is_master: bool
2397     @param is_master: Whether node is the master node
2398
2399     """
2400     cluster = self.cfg.GetClusterInfo()
2401     if (is_master and
2402         (cluster.IsFileStorageEnabled() or
2403          cluster.IsSharedFileStorageEnabled())):
2404       try:
2405         fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2406       except KeyError:
2407         # This should never happen
2408         self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2409                       "Node did not return forbidden file storage paths")
2410       else:
2411         self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2412                       "Found forbidden file storage paths: %s",
2413                       utils.CommaJoin(fspaths))
2414     else:
2415       self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2416                     constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2417                     "Node should not have returned forbidden file storage"
2418                     " paths")
2419
2420   def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2421                           verify_key, error_key):
2422     """Verifies (file) storage paths.
2423
2424     @type ninfo: L{objects.Node}
2425     @param ninfo: the node to check
2426     @param nresult: the remote results for the node
2427     @type file_disk_template: string
2428     @param file_disk_template: file-based disk template, whose directory
2429         is supposed to be verified
2430     @type verify_key: string
2431     @param verify_key: key for the verification map of this file
2432         verification step
2433     @param error_key: error key to be added to the verification results
2434         in case something goes wrong in this verification step
2435
2436     """
2437     assert (file_disk_template in
2438             utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2439     cluster = self.cfg.GetClusterInfo()
2440     if cluster.IsDiskTemplateEnabled(file_disk_template):
2441       self._ErrorIf(
2442           verify_key in nresult,
2443           error_key, ninfo.name,
2444           "The configured %s storage path is unusable: %s" %
2445           (file_disk_template, nresult.get(verify_key)))
2446
2447   def _VerifyFileStoragePaths(self, ninfo, nresult):
2448     """Verifies (file) storage paths.
2449
2450     @see: C{_VerifyStoragePaths}
2451
2452     """
2453     self._VerifyStoragePaths(
2454         ninfo, nresult, constants.DT_FILE,
2455         constants.NV_FILE_STORAGE_PATH,
2456         constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2457
2458   def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2459     """Verifies (file) storage paths.
2460
2461     @see: C{_VerifyStoragePaths}
2462
2463     """
2464     self._VerifyStoragePaths(
2465         ninfo, nresult, constants.DT_SHARED_FILE,
2466         constants.NV_SHARED_FILE_STORAGE_PATH,
2467         constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2468
2469   def _VerifyOob(self, ninfo, nresult):
2470     """Verifies out of band functionality of a node.
2471
2472     @type ninfo: L{objects.Node}
2473     @param ninfo: the node to check
2474     @param nresult: the remote results for the node
2475
2476     """
2477     # We just have to verify the paths on master and/or master candidates
2478     # as the oob helper is invoked on the master
2479     if ((ninfo.master_candidate or ninfo.master_capable) and
2480         constants.NV_OOB_PATHS in nresult):
2481       for path_result in nresult[constants.NV_OOB_PATHS]:
2482         self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2483                       ninfo.name, path_result)
2484
2485   def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2486     """Verifies and updates the node volume data.
2487
2488     This function will update a L{NodeImage}'s internal structures
2489     with data from the remote call.
2490
2491     @type ninfo: L{objects.Node}
2492     @param ninfo: the node to check
2493     @param nresult: the remote results for the node
2494     @param nimg: the node image object
2495     @param vg_name: the configured VG name
2496
2497     """
2498     nimg.lvm_fail = True
2499     lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2500     if vg_name is None:
2501       pass
2502     elif isinstance(lvdata, basestring):
2503       self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2504                     "LVM problem on node: %s", utils.SafeEncode(lvdata))
2505     elif not isinstance(lvdata, dict):
2506       self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2507                     "rpc call to node failed (lvlist)")
2508     else:
2509       nimg.volumes = lvdata
2510       nimg.lvm_fail = False
2511
2512   def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2513     """Verifies and updates the node instance list.
2514
2515     If the listing was successful, then updates this node's instance
2516     list. Otherwise, it marks the RPC call as failed for the instance
2517     list key.
2518
2519     @type ninfo: L{objects.Node}
2520     @param ninfo: the node to check
2521     @param nresult: the remote results for the node
2522     @param nimg: the node image object
2523
2524     """
2525     idata = nresult.get(constants.NV_INSTANCELIST, None)
2526     test = not isinstance(idata, list)
2527     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2528                   "rpc call to node failed (instancelist): %s",
2529                   utils.SafeEncode(str(idata)))
2530     if test:
2531       nimg.hyp_fail = True
2532     else:
2533       nimg.instances = [inst.uuid for (_, inst) in
2534                         self.cfg.GetMultiInstanceInfoByName(idata)]
2535
2536   def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2537     """Verifies and computes a node information map
2538
2539     @type ninfo: L{objects.Node}
2540     @param ninfo: the node to check
2541     @param nresult: the remote results for the node
2542     @param nimg: the node image object
2543     @param vg_name: the configured VG name
2544
2545     """
2546     # try to read free memory (from the hypervisor)
2547     hv_info = nresult.get(constants.NV_HVINFO, None)
2548     test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2549     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2550                   "rpc call to node failed (hvinfo)")
2551     if not test:
2552       try:
2553         nimg.mfree = int(hv_info["memory_free"])
2554       except (ValueError, TypeError):
2555         self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2556                       "node returned invalid nodeinfo, check hypervisor")
2557
2558     # FIXME: devise a free space model for file based instances as well
2559     if vg_name is not None:
2560       test = (constants.NV_VGLIST not in nresult or
2561               vg_name not in nresult[constants.NV_VGLIST])
2562       self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2563                     "node didn't return data for the volume group '%s'"
2564                     " - it is either missing or broken", vg_name)
2565       if not test:
2566         try:
2567           nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2568         except (ValueError, TypeError):
2569           self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2570                         "node returned invalid LVM info, check LVM status")
2571
2572   def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2573     """Gets per-disk status information for all instances.
2574
2575     @type node_uuids: list of strings
2576     @param node_uuids: Node UUIDs
2577     @type node_image: dict of (UUID, L{objects.Node})
2578     @param node_image: Node objects
2579     @type instanceinfo: dict of (UUID, L{objects.Instance})
2580     @param instanceinfo: Instance objects
2581     @rtype: {instance: {node: [(succes, payload)]}}
2582     @return: a dictionary of per-instance dictionaries with nodes as
2583         keys and disk information as values; the disk information is a
2584         list of tuples (success, payload)
2585
2586     """
2587     node_disks = {}
2588     node_disks_devonly = {}
2589     diskless_instances = set()
2590     diskless = constants.DT_DISKLESS
2591
2592     for nuuid in node_uuids:
2593       node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2594                                              node_image[nuuid].sinst))
2595       diskless_instances.update(uuid for uuid in node_inst_uuids
2596                                 if instanceinfo[uuid].disk_template == diskless)
2597       disks = [(inst_uuid, disk)
2598                for inst_uuid in node_inst_uuids
2599                for disk in instanceinfo[inst_uuid].disks]
2600
2601       if not disks:
2602         # No need to collect data
2603         continue
2604
2605       node_disks[nuuid] = disks
2606
2607       # _AnnotateDiskParams makes already copies of the disks
2608       devonly = []
2609       for (inst_uuid, dev) in disks:
2610         (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2611                                           self.cfg)
2612         self.cfg.SetDiskID(anno_disk, nuuid)
2613         devonly.append(anno_disk)
2614
2615       node_disks_devonly[nuuid] = devonly
2616
2617     assert len(node_disks) == len(node_disks_devonly)
2618
2619     # Collect data from all nodes with disks
2620     result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2621                                                           node_disks_devonly)
2622
2623     assert len(result) == len(node_disks)
2624
2625     instdisk = {}
2626
2627     for (nuuid, nres) in result.items():
2628       node = self.cfg.GetNodeInfo(nuuid)
2629       disks = node_disks[node.uuid]
2630
2631       if nres.offline:
2632         # No data from this node
2633         data = len(disks) * [(False, "node offline")]
2634       else:
2635         msg = nres.fail_msg
2636         self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2637                       "while getting disk information: %s", msg)
2638         if msg:
2639           # No data from this node
2640           data = len(disks) * [(False, msg)]
2641         else:
2642           data = []
2643           for idx, i in enumerate(nres.payload):
2644             if isinstance(i, (tuple, list)) and len(i) == 2:
2645               data.append(i)
2646             else:
2647               logging.warning("Invalid result from node %s, entry %d: %s",
2648                               node.name, idx, i)
2649               data.append((False, "Invalid result from the remote node"))
2650
2651       for ((inst_uuid, _), status) in zip(disks, data):
2652         instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2653           .append(status)
2654
2655     # Add empty entries for diskless instances.
2656     for inst_uuid in diskless_instances:
2657       assert inst_uuid not in instdisk
2658       instdisk[inst_uuid] = {}
2659
2660     assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2661                       len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2662                       compat.all(isinstance(s, (tuple, list)) and
2663                                  len(s) == 2 for s in statuses)
2664                       for inst, nuuids in instdisk.items()
2665                       for nuuid, statuses in nuuids.items())
2666     if __debug__:
2667       instdisk_keys = set(instdisk)
2668       instanceinfo_keys = set(instanceinfo)
2669       assert instdisk_keys == instanceinfo_keys, \
2670         ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2671          (instdisk_keys, instanceinfo_keys))
2672
2673     return instdisk
2674
2675   @staticmethod
2676   def _SshNodeSelector(group_uuid, all_nodes):
2677     """Create endless iterators for all potential SSH check hosts.
2678
2679     """
2680     nodes = [node for node in all_nodes
2681              if (node.group != group_uuid and
2682                  not node.offline)]
2683     keyfunc = operator.attrgetter("group")
2684
2685     return map(itertools.cycle,
2686                [sorted(map(operator.attrgetter("name"), names))
2687                 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2688                                                   keyfunc)])
2689
2690   @classmethod
2691   def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2692     """Choose which nodes should talk to which other nodes.
2693
2694     We will make nodes contact all nodes in their group, and one node from
2695     every other group.
2696
2697     @warning: This algorithm has a known issue if one node group is much
2698       smaller than others (e.g. just one node). In such a case all other
2699       nodes will talk to the single node.
2700
2701     """
2702     online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2703     sel = cls._SshNodeSelector(group_uuid, all_nodes)
2704
2705     return (online_nodes,
2706             dict((name, sorted([i.next() for i in sel]))
2707                  for name in online_nodes))
2708
2709   def BuildHooksEnv(self):
2710     """Build hooks env.
2711
2712     Cluster-Verify hooks just ran in the post phase and their failure makes
2713     the output be logged in the verify output and the verification to fail.
2714
2715     """
2716     env = {
2717       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2718       }
2719
2720     env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2721                for node in self.my_node_info.values())
2722
2723     return env
2724
2725   def BuildHooksNodes(self):
2726     """Build hooks nodes.
2727
2728     """
2729     return ([], list(self.my_node_info.keys()))
2730
2731   def Exec(self, feedback_fn):
2732     """Verify integrity of the node group, performing various test on nodes.
2733
2734     """
2735     # This method has too many local variables. pylint: disable=R0914
2736     feedback_fn("* Verifying group '%s'" % self.group_info.name)
2737
2738     if not self.my_node_uuids:
2739       # empty node group
2740       feedback_fn("* Empty node group, skipping verification")
2741       return True
2742
2743     self.bad = False
2744     verbose = self.op.verbose
2745     self._feedback_fn = feedback_fn
2746
2747     vg_name = self.cfg.GetVGName()
2748     drbd_helper = self.cfg.GetDRBDHelper()
2749     cluster = self.cfg.GetClusterInfo()
2750     hypervisors = cluster.enabled_hypervisors
2751     node_data_list = self.my_node_info.values()
2752
2753     i_non_redundant = [] # Non redundant instances
2754     i_non_a_balanced = [] # Non auto-balanced instances
2755     i_offline = 0 # Count of offline instances
2756     n_offline = 0 # Count of offline nodes
2757     n_drained = 0 # Count of nodes being drained
2758     node_vol_should = {}
2759
2760     # FIXME: verify OS list
2761
2762     # File verification
2763     filemap = ComputeAncillaryFiles(cluster, False)
2764
2765     # do local checksums
2766     master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2767     master_ip = self.cfg.GetMasterIP()
2768
2769     feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2770
2771     user_scripts = []
2772     if self.cfg.GetUseExternalMipScript():
2773       user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2774
2775     node_verify_param = {
2776       constants.NV_FILELIST:
2777         map(vcluster.MakeVirtualPath,
2778             utils.UniqueSequence(filename
2779                                  for files in filemap
2780                                  for filename in files)),
2781       constants.NV_NODELIST:
2782         self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2783                                   self.all_node_info.values()),
2784       constants.NV_HYPERVISOR: hypervisors,
2785       constants.NV_HVPARAMS:
2786         _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2787       constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2788                                  for node in node_data_list
2789                                  if not node.offline],
2790       constants.NV_INSTANCELIST: hypervisors,
2791       constants.NV_VERSION: None,
2792       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2793       constants.NV_NODESETUP: None,
2794       constants.NV_TIME: None,
2795       constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2796       constants.NV_OSLIST: None,
2797       constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2798       constants.NV_USERSCRIPTS: user_scripts,
2799       }
2800
2801     if vg_name is not None:
2802       node_verify_param[constants.NV_VGLIST] = None
2803       node_verify_param[constants.NV_LVLIST] = vg_name
2804       node_verify_param[constants.NV_PVLIST] = [vg_name]
2805
2806     if drbd_helper:
2807       node_verify_param[constants.NV_DRBDVERSION] = None
2808       node_verify_param[constants.NV_DRBDLIST] = None
2809       node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2810
2811     if cluster.IsFileStorageEnabled() or \
2812         cluster.IsSharedFileStorageEnabled():
2813       # Load file storage paths only from master node
2814       node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2815         self.cfg.GetMasterNodeName()
2816       if cluster.IsFileStorageEnabled():
2817         node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2818           cluster.file_storage_dir
2819
2820     # bridge checks
2821     # FIXME: this needs to be changed per node-group, not cluster-wide
2822     bridges = set()
2823     default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2824     if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2825       bridges.add(default_nicpp[constants.NIC_LINK])
2826     for inst_uuid in self.my_inst_info.values():
2827       for nic in inst_uuid.nics:
2828         full_nic = cluster.SimpleFillNIC(nic.nicparams)
2829         if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2830           bridges.add(full_nic[constants.NIC_LINK])
2831
2832     if bridges:
2833       node_verify_param[constants.NV_BRIDGES] = list(bridges)
2834
2835     # Build our expected cluster state
2836     node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2837                                                  uuid=node.uuid,
2838                                                  vm_capable=node.vm_capable))
2839                       for node in node_data_list)
2840
2841     # Gather OOB paths
2842     oob_paths = []
2843     for node in self.all_node_info.values():
2844       path = SupportsOob(self.cfg, node)
2845       if path and path not in oob_paths:
2846         oob_paths.append(path)
2847
2848     if oob_paths:
2849       node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2850
2851     for inst_uuid in self.my_inst_uuids:
2852       instance = self.my_inst_info[inst_uuid]
2853       if instance.admin_state == constants.ADMINST_OFFLINE:
2854         i_offline += 1
2855
2856       for nuuid in instance.all_nodes:
2857         if nuuid not in node_image:
2858           gnode = self.NodeImage(uuid=nuuid)
2859           gnode.ghost = (nuuid not in self.all_node_info)
2860           node_image[nuuid] = gnode
2861
2862       instance.MapLVsByNode(node_vol_should)
2863
2864       pnode = instance.primary_node
2865       node_image[pnode].pinst.append(instance.uuid)
2866
2867       for snode in instance.secondary_nodes:
2868         nimg = node_image[snode]
2869         nimg.sinst.append(instance.uuid)
2870         if pnode not in nimg.sbp:
2871           nimg.sbp[pnode] = []
2872         nimg.sbp[pnode].append(instance.uuid)
2873
2874     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2875                                                self.my_node_info.keys())
2876     # The value of exclusive_storage should be the same across the group, so if
2877     # it's True for at least a node, we act as if it were set for all the nodes
2878     self._exclusive_storage = compat.any(es_flags.values())
2879     if self._exclusive_storage:
2880       node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2881
2882     # At this point, we have the in-memory data structures complete,
2883     # except for the runtime information, which we'll gather next
2884
2885     # Due to the way our RPC system works, exact response times cannot be
2886     # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2887     # time before and after executing the request, we can at least have a time
2888     # window.
2889     nvinfo_starttime = time.time()
2890     all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2891                                            node_verify_param,
2892                                            self.cfg.GetClusterName(),
2893                                            self.cfg.GetClusterInfo().hvparams)
2894     nvinfo_endtime = time.time()
2895
2896     if self.extra_lv_nodes and vg_name is not None:
2897       extra_lv_nvinfo = \
2898           self.rpc.call_node_verify(self.extra_lv_nodes,
2899                                     {constants.NV_LVLIST: vg_name},
2900                                     self.cfg.GetClusterName(),
2901                                     self.cfg.GetClusterInfo().hvparams)
2902     else:
2903       extra_lv_nvinfo = {}
2904
2905     all_drbd_map = self.cfg.ComputeDRBDMap()
2906
2907     feedback_fn("* Gathering disk information (%s nodes)" %
2908                 len(self.my_node_uuids))
2909     instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2910                                      self.my_inst_info)
2911
2912     feedback_fn("* Verifying configuration file consistency")
2913
2914     # If not all nodes are being checked, we need to make sure the master node
2915     # and a non-checked vm_capable node are in the list.
2916     absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2917     if absent_node_uuids:
2918       vf_nvinfo = all_nvinfo.copy()
2919       vf_node_info = list(self.my_node_info.values())
2920       additional_node_uuids = []
2921       if master_node_uuid not in self.my_node_info:
2922         additional_node_uuids.append(master_node_uuid)
2923         vf_node_info.append(self.all_node_info[master_node_uuid])
2924       # Add the first vm_capable node we find which is not included,
2925       # excluding the master node (which we already have)
2926       for node_uuid in absent_node_uuids:
2927         nodeinfo = self.all_node_info[node_uuid]
2928         if (nodeinfo.vm_capable and not nodeinfo.offline and
2929             node_uuid != master_node_uuid):
2930           additional_node_uuids.append(node_uuid)
2931           vf_node_info.append(self.all_node_info[node_uuid])
2932           break
2933       key = constants.NV_FILELIST
2934       vf_nvinfo.update(self.rpc.call_node_verify(
2935          additional_node_uuids, {key: node_verify_param[key]},
2936          self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2937     else:
2938       vf_nvinfo = all_nvinfo
2939       vf_node_info = self.my_node_info.values()
2940
2941     self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2942
2943     feedback_fn("* Verifying node status")
2944
2945     refos_img = None
2946
2947     for node_i in node_data_list:
2948       nimg = node_image[node_i.uuid]
2949
2950       if node_i.offline:
2951         if verbose:
2952           feedback_fn("* Skipping offline node %s" % (node_i.name,))
2953         n_offline += 1
2954         continue
2955
2956       if node_i.uuid == master_node_uuid:
2957         ntype = "master"
2958       elif node_i.master_candidate:
2959         ntype = "master candidate"
2960       elif node_i.drained:
2961         ntype = "drained"
2962         n_drained += 1
2963       else:
2964         ntype = "regular"
2965       if verbose:
2966         feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2967
2968       msg = all_nvinfo[node_i.uuid].fail_msg
2969       self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2970                     "while contacting node: %s", msg)
2971       if msg:
2972         nimg.rpc_fail = True
2973         continue
2974
2975       nresult = all_nvinfo[node_i.uuid].payload
2976
2977       nimg.call_ok = self._VerifyNode(node_i, nresult)
2978       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2979       self._VerifyNodeNetwork(node_i, nresult)
2980       self._VerifyNodeUserScripts(node_i, nresult)
2981       self._VerifyOob(node_i, nresult)
2982       self._VerifyAcceptedFileStoragePaths(node_i, nresult,
2983                                            node_i.uuid == master_node_uuid)
2984       self._VerifyFileStoragePaths(node_i, nresult)
2985       self._VerifySharedFileStoragePaths(node_i, nresult)
2986
2987       if nimg.vm_capable:
2988         self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
2989         self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
2990                              all_drbd_map)
2991
2992         self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
2993         self._UpdateNodeInstances(node_i, nresult, nimg)
2994         self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
2995         self._UpdateNodeOS(node_i, nresult, nimg)
2996
2997         if not nimg.os_fail:
2998           if refos_img is None:
2999             refos_img = nimg
3000           self._VerifyNodeOS(node_i, nimg, refos_img)
3001         self._VerifyNodeBridges(node_i, nresult, bridges)
3002
3003         # Check whether all running instances are primary for the node. (This
3004         # can no longer be done from _VerifyInstance below, since some of the
3005         # wrong instances could be from other node groups.)
3006         non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3007
3008         for inst_uuid in non_primary_inst_uuids:
3009           test = inst_uuid in self.all_inst_info
3010           self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3011                         self.cfg.GetInstanceName(inst_uuid),
3012                         "instance should not run on node %s", node_i.name)
3013           self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3014                         "node is running unknown instance %s", inst_uuid)
3015
3016     self._VerifyGroupDRBDVersion(all_nvinfo)
3017     self._VerifyGroupLVM(node_image, vg_name)
3018
3019     for node_uuid, result in extra_lv_nvinfo.items():
3020       self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3021                               node_image[node_uuid], vg_name)
3022
3023     feedback_fn("* Verifying instance status")
3024     for inst_uuid in self.my_inst_uuids:
3025       instance = self.my_inst_info[inst_uuid]
3026       if verbose:
3027         feedback_fn("* Verifying instance %s" % instance.name)
3028       self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3029
3030       # If the instance is non-redundant we cannot survive losing its primary
3031       # node, so we are not N+1 compliant.
3032       if instance.disk_template not in constants.DTS_MIRRORED:
3033         i_non_redundant.append(instance)
3034
3035       if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3036         i_non_a_balanced.append(instance)
3037
3038     feedback_fn("* Verifying orphan volumes")
3039     reserved = utils.FieldSet(*cluster.reserved_lvs)
3040
3041     # We will get spurious "unknown volume" warnings if any node of this group
3042     # is secondary for an instance whose primary is in another group. To avoid
3043     # them, we find these instances and add their volumes to node_vol_should.
3044     for instance in self.all_inst_info.values():
3045       for secondary in instance.secondary_nodes:
3046         if (secondary in self.my_node_info
3047             and instance.name not in self.my_inst_info):
3048           instance.MapLVsByNode(node_vol_should)
3049           break
3050
3051     self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3052
3053     if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3054       feedback_fn("* Verifying N+1 Memory redundancy")
3055       self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3056
3057     feedback_fn("* Other Notes")
3058     if i_non_redundant:
3059       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3060                   % len(i_non_redundant))
3061
3062     if i_non_a_balanced:
3063       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3064                   % len(i_non_a_balanced))
3065
3066     if i_offline:
3067       feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3068
3069     if n_offline:
3070       feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3071
3072     if n_drained:
3073       feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3074
3075     return not self.bad
3076
3077   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3078     """Analyze the post-hooks' result
3079
3080     This method analyses the hook result, handles it, and sends some
3081     nicely-formatted feedback back to the user.
3082
3083     @param phase: one of L{constants.HOOKS_PHASE_POST} or
3084         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3085     @param hooks_results: the results of the multi-node hooks rpc call
3086     @param feedback_fn: function used send feedback back to the caller
3087     @param lu_result: previous Exec result
3088     @return: the new Exec result, based on the previous result
3089         and hook results
3090
3091     """
3092     # We only really run POST phase hooks, only for non-empty groups,
3093     # and are only interested in their results
3094     if not self.my_node_uuids:
3095       # empty node group
3096       pass
3097     elif phase == constants.HOOKS_PHASE_POST:
3098       # Used to change hooks' output to proper indentation
3099       feedback_fn("* Hooks Results")
3100       assert hooks_results, "invalid result from hooks"
3101
3102       for node_name in hooks_results:
3103         res = hooks_results[node_name]
3104         msg = res.fail_msg
3105         test = msg and not res.offline
3106         self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3107                       "Communication failure in hooks execution: %s", msg)
3108         if res.offline or msg:
3109           # No need to investigate payload if node is offline or gave
3110           # an error.
3111           continue
3112         for script, hkr, output in res.payload:
3113           test = hkr == constants.HKR_FAIL
3114           self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3115                         "Script %s failed, output:", script)
3116           if test:
3117             output = self._HOOKS_INDENT_RE.sub("      ", output)
3118             feedback_fn("%s" % output)
3119             lu_result = False
3120
3121     return lu_result
3122
3123
3124 class LUClusterVerifyDisks(NoHooksLU):
3125   """Verifies the cluster disks status.
3126
3127   """
3128   REQ_BGL = False
3129
3130   def ExpandNames(self):
3131     self.share_locks = ShareAll()
3132     self.needed_locks = {
3133       locking.LEVEL_NODEGROUP: locking.ALL_SET,
3134       }
3135
3136   def Exec(self, feedback_fn):
3137     group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3138
3139     # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3140     return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3141                            for group in group_names])