2b93e935f329c8da2a2ffc9c3534560dbb734946
[ganeti-local] / lib / cmdlib / cluster.py
1 #
2 #
3
4 # Copyright (C) 2006, 2007, 2008, 2009, 2010, 2011, 2012, 2013 Google Inc.
5 #
6 # This program is free software; you can redistribute it and/or modify
7 # it under the terms of the GNU General Public License as published by
8 # the Free Software Foundation; either version 2 of the License, or
9 # (at your option) any later version.
10 #
11 # This program is distributed in the hope that it will be useful, but
12 # WITHOUT ANY WARRANTY; without even the implied warranty of
13 # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
14 # General Public License for more details.
15 #
16 # You should have received a copy of the GNU General Public License
17 # along with this program; if not, write to the Free Software
18 # Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
19 # 02110-1301, USA.
20
21
22 """Logical units dealing with the cluster."""
23
24 import OpenSSL
25
26 import copy
27 import itertools
28 import logging
29 import operator
30 import os
31 import re
32 import time
33
34 from ganeti import compat
35 from ganeti import constants
36 from ganeti import errors
37 from ganeti import hypervisor
38 from ganeti import locking
39 from ganeti import masterd
40 from ganeti import netutils
41 from ganeti import objects
42 from ganeti import opcodes
43 from ganeti import pathutils
44 from ganeti import query
45 from ganeti import rpc
46 from ganeti import runtime
47 from ganeti import ssh
48 from ganeti import uidpool
49 from ganeti import utils
50 from ganeti import vcluster
51
52 from ganeti.cmdlib.base import NoHooksLU, QueryBase, LogicalUnit, \
53   ResultWithJobs
54 from ganeti.cmdlib.common import ShareAll, RunPostHook, \
55   ComputeAncillaryFiles, RedistributeAncillaryFiles, UploadHelper, \
56   GetWantedInstances, MergeAndVerifyHvState, MergeAndVerifyDiskState, \
57   GetUpdatedIPolicy, ComputeNewInstanceViolations, GetUpdatedParams, \
58   CheckOSParams, CheckHVParams, AdjustCandidatePool, CheckNodePVs, \
59   ComputeIPolicyInstanceViolation, AnnotateDiskParams, SupportsOob, \
60   CheckIpolicyVsDiskTemplates
61
62 import ganeti.masterd.instance
63
64
65 class LUClusterActivateMasterIp(NoHooksLU):
66   """Activate the master IP on the master node.
67
68   """
69   def Exec(self, feedback_fn):
70     """Activate the master IP.
71
72     """
73     master_params = self.cfg.GetMasterNetworkParameters()
74     ems = self.cfg.GetUseExternalMipScript()
75     result = self.rpc.call_node_activate_master_ip(master_params.uuid,
76                                                    master_params, ems)
77     result.Raise("Could not activate the master IP")
78
79
80 class LUClusterDeactivateMasterIp(NoHooksLU):
81   """Deactivate the master IP on the master node.
82
83   """
84   def Exec(self, feedback_fn):
85     """Deactivate the master IP.
86
87     """
88     master_params = self.cfg.GetMasterNetworkParameters()
89     ems = self.cfg.GetUseExternalMipScript()
90     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
91                                                      master_params, ems)
92     result.Raise("Could not deactivate the master IP")
93
94
95 class LUClusterConfigQuery(NoHooksLU):
96   """Return configuration values.
97
98   """
99   REQ_BGL = False
100
101   def CheckArguments(self):
102     self.cq = ClusterQuery(None, self.op.output_fields, False)
103
104   def ExpandNames(self):
105     self.cq.ExpandNames(self)
106
107   def DeclareLocks(self, level):
108     self.cq.DeclareLocks(self, level)
109
110   def Exec(self, feedback_fn):
111     result = self.cq.OldStyleQuery(self)
112
113     assert len(result) == 1
114
115     return result[0]
116
117
118 class LUClusterDestroy(LogicalUnit):
119   """Logical unit for destroying the cluster.
120
121   """
122   HPATH = "cluster-destroy"
123   HTYPE = constants.HTYPE_CLUSTER
124
125   def BuildHooksEnv(self):
126     """Build hooks env.
127
128     """
129     return {
130       "OP_TARGET": self.cfg.GetClusterName(),
131       }
132
133   def BuildHooksNodes(self):
134     """Build hooks nodes.
135
136     """
137     return ([], [])
138
139   def CheckPrereq(self):
140     """Check prerequisites.
141
142     This checks whether the cluster is empty.
143
144     Any errors are signaled by raising errors.OpPrereqError.
145
146     """
147     master = self.cfg.GetMasterNode()
148
149     nodelist = self.cfg.GetNodeList()
150     if len(nodelist) != 1 or nodelist[0] != master:
151       raise errors.OpPrereqError("There are still %d node(s) in"
152                                  " this cluster." % (len(nodelist) - 1),
153                                  errors.ECODE_INVAL)
154     instancelist = self.cfg.GetInstanceList()
155     if instancelist:
156       raise errors.OpPrereqError("There are still %d instance(s) in"
157                                  " this cluster." % len(instancelist),
158                                  errors.ECODE_INVAL)
159
160   def Exec(self, feedback_fn):
161     """Destroys the cluster.
162
163     """
164     master_params = self.cfg.GetMasterNetworkParameters()
165
166     # Run post hooks on master node before it's removed
167     RunPostHook(self, self.cfg.GetNodeName(master_params.uuid))
168
169     ems = self.cfg.GetUseExternalMipScript()
170     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
171                                                      master_params, ems)
172     result.Warn("Error disabling the master IP address", self.LogWarning)
173     return master_params.uuid
174
175
176 class LUClusterPostInit(LogicalUnit):
177   """Logical unit for running hooks after cluster initialization.
178
179   """
180   HPATH = "cluster-init"
181   HTYPE = constants.HTYPE_CLUSTER
182
183   def BuildHooksEnv(self):
184     """Build hooks env.
185
186     """
187     return {
188       "OP_TARGET": self.cfg.GetClusterName(),
189       }
190
191   def BuildHooksNodes(self):
192     """Build hooks nodes.
193
194     """
195     return ([], [self.cfg.GetMasterNode()])
196
197   def Exec(self, feedback_fn):
198     """Nothing to do.
199
200     """
201     return True
202
203
204 class ClusterQuery(QueryBase):
205   FIELDS = query.CLUSTER_FIELDS
206
207   #: Do not sort (there is only one item)
208   SORT_FIELD = None
209
210   def ExpandNames(self, lu):
211     lu.needed_locks = {}
212
213     # The following variables interact with _QueryBase._GetNames
214     self.wanted = locking.ALL_SET
215     self.do_locking = self.use_locking
216
217     if self.do_locking:
218       raise errors.OpPrereqError("Can not use locking for cluster queries",
219                                  errors.ECODE_INVAL)
220
221   def DeclareLocks(self, lu, level):
222     pass
223
224   def _GetQueryData(self, lu):
225     """Computes the list of nodes and their attributes.
226
227     """
228     # Locking is not used
229     assert not (compat.any(lu.glm.is_owned(level)
230                            for level in locking.LEVELS
231                            if level != locking.LEVEL_CLUSTER) or
232                 self.do_locking or self.use_locking)
233
234     if query.CQ_CONFIG in self.requested_data:
235       cluster = lu.cfg.GetClusterInfo()
236       nodes = lu.cfg.GetAllNodesInfo()
237     else:
238       cluster = NotImplemented
239       nodes = NotImplemented
240
241     if query.CQ_QUEUE_DRAINED in self.requested_data:
242       drain_flag = os.path.exists(pathutils.JOB_QUEUE_DRAIN_FILE)
243     else:
244       drain_flag = NotImplemented
245
246     if query.CQ_WATCHER_PAUSE in self.requested_data:
247       master_node_uuid = lu.cfg.GetMasterNode()
248
249       result = lu.rpc.call_get_watcher_pause(master_node_uuid)
250       result.Raise("Can't retrieve watcher pause from master node '%s'" %
251                    lu.cfg.GetMasterNodeName())
252
253       watcher_pause = result.payload
254     else:
255       watcher_pause = NotImplemented
256
257     return query.ClusterQueryData(cluster, nodes, drain_flag, watcher_pause)
258
259
260 class LUClusterQuery(NoHooksLU):
261   """Query cluster configuration.
262
263   """
264   REQ_BGL = False
265
266   def ExpandNames(self):
267     self.needed_locks = {}
268
269   def Exec(self, feedback_fn):
270     """Return cluster config.
271
272     """
273     cluster = self.cfg.GetClusterInfo()
274     os_hvp = {}
275
276     # Filter just for enabled hypervisors
277     for os_name, hv_dict in cluster.os_hvp.items():
278       os_hvp[os_name] = {}
279       for hv_name, hv_params in hv_dict.items():
280         if hv_name in cluster.enabled_hypervisors:
281           os_hvp[os_name][hv_name] = hv_params
282
283     # Convert ip_family to ip_version
284     primary_ip_version = constants.IP4_VERSION
285     if cluster.primary_ip_family == netutils.IP6Address.family:
286       primary_ip_version = constants.IP6_VERSION
287
288     result = {
289       "software_version": constants.RELEASE_VERSION,
290       "protocol_version": constants.PROTOCOL_VERSION,
291       "config_version": constants.CONFIG_VERSION,
292       "os_api_version": max(constants.OS_API_VERSIONS),
293       "export_version": constants.EXPORT_VERSION,
294       "vcs_version": constants.VCS_VERSION,
295       "architecture": runtime.GetArchInfo(),
296       "name": cluster.cluster_name,
297       "master": self.cfg.GetMasterNodeName(),
298       "default_hypervisor": cluster.primary_hypervisor,
299       "enabled_hypervisors": cluster.enabled_hypervisors,
300       "hvparams": dict([(hypervisor_name, cluster.hvparams[hypervisor_name])
301                         for hypervisor_name in cluster.enabled_hypervisors]),
302       "os_hvp": os_hvp,
303       "beparams": cluster.beparams,
304       "osparams": cluster.osparams,
305       "ipolicy": cluster.ipolicy,
306       "nicparams": cluster.nicparams,
307       "ndparams": cluster.ndparams,
308       "diskparams": cluster.diskparams,
309       "candidate_pool_size": cluster.candidate_pool_size,
310       "master_netdev": cluster.master_netdev,
311       "master_netmask": cluster.master_netmask,
312       "use_external_mip_script": cluster.use_external_mip_script,
313       "volume_group_name": cluster.volume_group_name,
314       "drbd_usermode_helper": cluster.drbd_usermode_helper,
315       "file_storage_dir": cluster.file_storage_dir,
316       "shared_file_storage_dir": cluster.shared_file_storage_dir,
317       "maintain_node_health": cluster.maintain_node_health,
318       "ctime": cluster.ctime,
319       "mtime": cluster.mtime,
320       "uuid": cluster.uuid,
321       "tags": list(cluster.GetTags()),
322       "uid_pool": cluster.uid_pool,
323       "default_iallocator": cluster.default_iallocator,
324       "reserved_lvs": cluster.reserved_lvs,
325       "primary_ip_version": primary_ip_version,
326       "prealloc_wipe_disks": cluster.prealloc_wipe_disks,
327       "hidden_os": cluster.hidden_os,
328       "blacklisted_os": cluster.blacklisted_os,
329       "enabled_disk_templates": cluster.enabled_disk_templates,
330       }
331
332     return result
333
334
335 class LUClusterRedistConf(NoHooksLU):
336   """Force the redistribution of cluster configuration.
337
338   This is a very simple LU.
339
340   """
341   REQ_BGL = False
342
343   def ExpandNames(self):
344     self.needed_locks = {
345       locking.LEVEL_NODE: locking.ALL_SET,
346       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
347     }
348     self.share_locks = ShareAll()
349
350   def Exec(self, feedback_fn):
351     """Redistribute the configuration.
352
353     """
354     self.cfg.Update(self.cfg.GetClusterInfo(), feedback_fn)
355     RedistributeAncillaryFiles(self)
356
357
358 class LUClusterRename(LogicalUnit):
359   """Rename the cluster.
360
361   """
362   HPATH = "cluster-rename"
363   HTYPE = constants.HTYPE_CLUSTER
364
365   def BuildHooksEnv(self):
366     """Build hooks env.
367
368     """
369     return {
370       "OP_TARGET": self.cfg.GetClusterName(),
371       "NEW_NAME": self.op.name,
372       }
373
374   def BuildHooksNodes(self):
375     """Build hooks nodes.
376
377     """
378     return ([self.cfg.GetMasterNode()], self.cfg.GetNodeList())
379
380   def CheckPrereq(self):
381     """Verify that the passed name is a valid one.
382
383     """
384     hostname = netutils.GetHostname(name=self.op.name,
385                                     family=self.cfg.GetPrimaryIPFamily())
386
387     new_name = hostname.name
388     self.ip = new_ip = hostname.ip
389     old_name = self.cfg.GetClusterName()
390     old_ip = self.cfg.GetMasterIP()
391     if new_name == old_name and new_ip == old_ip:
392       raise errors.OpPrereqError("Neither the name nor the IP address of the"
393                                  " cluster has changed",
394                                  errors.ECODE_INVAL)
395     if new_ip != old_ip:
396       if netutils.TcpPing(new_ip, constants.DEFAULT_NODED_PORT):
397         raise errors.OpPrereqError("The given cluster IP address (%s) is"
398                                    " reachable on the network" %
399                                    new_ip, errors.ECODE_NOTUNIQUE)
400
401     self.op.name = new_name
402
403   def Exec(self, feedback_fn):
404     """Rename the cluster.
405
406     """
407     clustername = self.op.name
408     new_ip = self.ip
409
410     # shutdown the master IP
411     master_params = self.cfg.GetMasterNetworkParameters()
412     ems = self.cfg.GetUseExternalMipScript()
413     result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
414                                                      master_params, ems)
415     result.Raise("Could not disable the master role")
416
417     try:
418       cluster = self.cfg.GetClusterInfo()
419       cluster.cluster_name = clustername
420       cluster.master_ip = new_ip
421       self.cfg.Update(cluster, feedback_fn)
422
423       # update the known hosts file
424       ssh.WriteKnownHostsFile(self.cfg, pathutils.SSH_KNOWN_HOSTS_FILE)
425       node_list = self.cfg.GetOnlineNodeList()
426       try:
427         node_list.remove(master_params.uuid)
428       except ValueError:
429         pass
430       UploadHelper(self, node_list, pathutils.SSH_KNOWN_HOSTS_FILE)
431     finally:
432       master_params.ip = new_ip
433       result = self.rpc.call_node_activate_master_ip(master_params.uuid,
434                                                      master_params, ems)
435       result.Warn("Could not re-enable the master role on the master,"
436                   " please restart manually", self.LogWarning)
437
438     return clustername
439
440
441 class LUClusterRepairDiskSizes(NoHooksLU):
442   """Verifies the cluster disks sizes.
443
444   """
445   REQ_BGL = False
446
447   def ExpandNames(self):
448     if self.op.instances:
449       (_, self.wanted_names) = GetWantedInstances(self, self.op.instances)
450       # Not getting the node allocation lock as only a specific set of
451       # instances (and their nodes) is going to be acquired
452       self.needed_locks = {
453         locking.LEVEL_NODE_RES: [],
454         locking.LEVEL_INSTANCE: self.wanted_names,
455         }
456       self.recalculate_locks[locking.LEVEL_NODE_RES] = constants.LOCKS_REPLACE
457     else:
458       self.wanted_names = None
459       self.needed_locks = {
460         locking.LEVEL_NODE_RES: locking.ALL_SET,
461         locking.LEVEL_INSTANCE: locking.ALL_SET,
462
463         # This opcode is acquires the node locks for all instances
464         locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
465         }
466
467     self.share_locks = {
468       locking.LEVEL_NODE_RES: 1,
469       locking.LEVEL_INSTANCE: 0,
470       locking.LEVEL_NODE_ALLOC: 1,
471       }
472
473   def DeclareLocks(self, level):
474     if level == locking.LEVEL_NODE_RES and self.wanted_names is not None:
475       self._LockInstancesNodes(primary_only=True, level=level)
476
477   def CheckPrereq(self):
478     """Check prerequisites.
479
480     This only checks the optional instance list against the existing names.
481
482     """
483     if self.wanted_names is None:
484       self.wanted_names = self.owned_locks(locking.LEVEL_INSTANCE)
485
486     self.wanted_instances = \
487         map(compat.snd, self.cfg.GetMultiInstanceInfoByName(self.wanted_names))
488
489   def _EnsureChildSizes(self, disk):
490     """Ensure children of the disk have the needed disk size.
491
492     This is valid mainly for DRBD8 and fixes an issue where the
493     children have smaller disk size.
494
495     @param disk: an L{ganeti.objects.Disk} object
496
497     """
498     if disk.dev_type == constants.LD_DRBD8:
499       assert disk.children, "Empty children for DRBD8?"
500       fchild = disk.children[0]
501       mismatch = fchild.size < disk.size
502       if mismatch:
503         self.LogInfo("Child disk has size %d, parent %d, fixing",
504                      fchild.size, disk.size)
505         fchild.size = disk.size
506
507       # and we recurse on this child only, not on the metadev
508       return self._EnsureChildSizes(fchild) or mismatch
509     else:
510       return False
511
512   def Exec(self, feedback_fn):
513     """Verify the size of cluster disks.
514
515     """
516     # TODO: check child disks too
517     # TODO: check differences in size between primary/secondary nodes
518     per_node_disks = {}
519     for instance in self.wanted_instances:
520       pnode = instance.primary_node
521       if pnode not in per_node_disks:
522         per_node_disks[pnode] = []
523       for idx, disk in enumerate(instance.disks):
524         per_node_disks[pnode].append((instance, idx, disk))
525
526     assert not (frozenset(per_node_disks.keys()) -
527                 self.owned_locks(locking.LEVEL_NODE_RES)), \
528       "Not owning correct locks"
529     assert not self.owned_locks(locking.LEVEL_NODE)
530
531     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
532                                                per_node_disks.keys())
533
534     changed = []
535     for node_uuid, dskl in per_node_disks.items():
536       newl = [v[2].Copy() for v in dskl]
537       for dsk in newl:
538         self.cfg.SetDiskID(dsk, node_uuid)
539       node_name = self.cfg.GetNodeName(node_uuid)
540       result = self.rpc.call_blockdev_getdimensions(node_uuid, newl)
541       if result.fail_msg:
542         self.LogWarning("Failure in blockdev_getdimensions call to node"
543                         " %s, ignoring", node_name)
544         continue
545       if len(result.payload) != len(dskl):
546         logging.warning("Invalid result from node %s: len(dksl)=%d,"
547                         " result.payload=%s", node_name, len(dskl),
548                         result.payload)
549         self.LogWarning("Invalid result from node %s, ignoring node results",
550                         node_name)
551         continue
552       for ((instance, idx, disk), dimensions) in zip(dskl, result.payload):
553         if dimensions is None:
554           self.LogWarning("Disk %d of instance %s did not return size"
555                           " information, ignoring", idx, instance.name)
556           continue
557         if not isinstance(dimensions, (tuple, list)):
558           self.LogWarning("Disk %d of instance %s did not return valid"
559                           " dimension information, ignoring", idx,
560                           instance.name)
561           continue
562         (size, spindles) = dimensions
563         if not isinstance(size, (int, long)):
564           self.LogWarning("Disk %d of instance %s did not return valid"
565                           " size information, ignoring", idx, instance.name)
566           continue
567         size = size >> 20
568         if size != disk.size:
569           self.LogInfo("Disk %d of instance %s has mismatched size,"
570                        " correcting: recorded %d, actual %d", idx,
571                        instance.name, disk.size, size)
572           disk.size = size
573           self.cfg.Update(instance, feedback_fn)
574           changed.append((instance.name, idx, "size", size))
575         if es_flags[node_uuid]:
576           if spindles is None:
577             self.LogWarning("Disk %d of instance %s did not return valid"
578                             " spindles information, ignoring", idx,
579                             instance.name)
580           elif disk.spindles is None or disk.spindles != spindles:
581             self.LogInfo("Disk %d of instance %s has mismatched spindles,"
582                          " correcting: recorded %s, actual %s",
583                          idx, instance.name, disk.spindles, spindles)
584             disk.spindles = spindles
585             self.cfg.Update(instance, feedback_fn)
586             changed.append((instance.name, idx, "spindles", disk.spindles))
587         if self._EnsureChildSizes(disk):
588           self.cfg.Update(instance, feedback_fn)
589           changed.append((instance.name, idx, "size", disk.size))
590     return changed
591
592
593 def _ValidateNetmask(cfg, netmask):
594   """Checks if a netmask is valid.
595
596   @type cfg: L{config.ConfigWriter}
597   @param cfg: The cluster configuration
598   @type netmask: int
599   @param netmask: the netmask to be verified
600   @raise errors.OpPrereqError: if the validation fails
601
602   """
603   ip_family = cfg.GetPrimaryIPFamily()
604   try:
605     ipcls = netutils.IPAddress.GetClassFromIpFamily(ip_family)
606   except errors.ProgrammerError:
607     raise errors.OpPrereqError("Invalid primary ip family: %s." %
608                                ip_family, errors.ECODE_INVAL)
609   if not ipcls.ValidateNetmask(netmask):
610     raise errors.OpPrereqError("CIDR netmask (%s) not valid" %
611                                (netmask), errors.ECODE_INVAL)
612
613
614 def CheckFileBasedStoragePathVsEnabledDiskTemplates(
615     logging_warn_fn, file_storage_dir, enabled_disk_templates,
616     file_disk_template):
617   """Checks whether the given file-based storage directory is acceptable.
618
619   Note: This function is public, because it is also used in bootstrap.py.
620
621   @type logging_warn_fn: function
622   @param logging_warn_fn: function which accepts a string and logs it
623   @type file_storage_dir: string
624   @param file_storage_dir: the directory to be used for file-based instances
625   @type enabled_disk_templates: list of string
626   @param enabled_disk_templates: the list of enabled disk templates
627   @type file_disk_template: string
628   @param file_disk_template: the file-based disk template for which the
629       path should be checked
630
631   """
632   assert (file_disk_template in
633           utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
634   file_storage_enabled = file_disk_template in enabled_disk_templates
635   if file_storage_dir is not None:
636     if file_storage_dir == "":
637       if file_storage_enabled:
638         raise errors.OpPrereqError(
639             "Unsetting the '%s' storage directory while having '%s' storage"
640             " enabled is not permitted." %
641             (file_disk_template, file_disk_template))
642     else:
643       if not file_storage_enabled:
644         logging_warn_fn(
645             "Specified a %s storage directory, although %s storage is not"
646             " enabled." % (file_disk_template, file_disk_template))
647   else:
648     raise errors.ProgrammerError("Received %s storage dir with value"
649                                  " 'None'." % file_disk_template)
650
651
652 def CheckFileStoragePathVsEnabledDiskTemplates(
653     logging_warn_fn, file_storage_dir, enabled_disk_templates):
654   """Checks whether the given file storage directory is acceptable.
655
656   @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
657
658   """
659   CheckFileBasedStoragePathVsEnabledDiskTemplates(
660       logging_warn_fn, file_storage_dir, enabled_disk_templates,
661       constants.DT_FILE)
662
663
664 def CheckSharedFileStoragePathVsEnabledDiskTemplates(
665     logging_warn_fn, file_storage_dir, enabled_disk_templates):
666   """Checks whether the given shared file storage directory is acceptable.
667
668   @see: C{CheckFileBasedStoragePathVsEnabledDiskTemplates}
669
670   """
671   CheckFileBasedStoragePathVsEnabledDiskTemplates(
672       logging_warn_fn, file_storage_dir, enabled_disk_templates,
673       constants.DT_SHARED_FILE)
674
675
676 class LUClusterSetParams(LogicalUnit):
677   """Change the parameters of the cluster.
678
679   """
680   HPATH = "cluster-modify"
681   HTYPE = constants.HTYPE_CLUSTER
682   REQ_BGL = False
683
684   def CheckArguments(self):
685     """Check parameters
686
687     """
688     if self.op.uid_pool:
689       uidpool.CheckUidPool(self.op.uid_pool)
690
691     if self.op.add_uids:
692       uidpool.CheckUidPool(self.op.add_uids)
693
694     if self.op.remove_uids:
695       uidpool.CheckUidPool(self.op.remove_uids)
696
697     if self.op.master_netmask is not None:
698       _ValidateNetmask(self.cfg, self.op.master_netmask)
699
700     if self.op.diskparams:
701       for dt_params in self.op.diskparams.values():
702         utils.ForceDictType(dt_params, constants.DISK_DT_TYPES)
703       try:
704         utils.VerifyDictOptions(self.op.diskparams, constants.DISK_DT_DEFAULTS)
705       except errors.OpPrereqError, err:
706         raise errors.OpPrereqError("While verify diskparams options: %s" % err,
707                                    errors.ECODE_INVAL)
708
709   def ExpandNames(self):
710     # FIXME: in the future maybe other cluster params won't require checking on
711     # all nodes to be modified.
712     # FIXME: This opcode changes cluster-wide settings. Is acquiring all
713     # resource locks the right thing, shouldn't it be the BGL instead?
714     self.needed_locks = {
715       locking.LEVEL_NODE: locking.ALL_SET,
716       locking.LEVEL_INSTANCE: locking.ALL_SET,
717       locking.LEVEL_NODEGROUP: locking.ALL_SET,
718       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
719     }
720     self.share_locks = ShareAll()
721
722   def BuildHooksEnv(self):
723     """Build hooks env.
724
725     """
726     return {
727       "OP_TARGET": self.cfg.GetClusterName(),
728       "NEW_VG_NAME": self.op.vg_name,
729       }
730
731   def BuildHooksNodes(self):
732     """Build hooks nodes.
733
734     """
735     mn = self.cfg.GetMasterNode()
736     return ([mn], [mn])
737
738   def _CheckVgName(self, node_uuids, enabled_disk_templates,
739                    new_enabled_disk_templates):
740     """Check the consistency of the vg name on all nodes and in case it gets
741        unset whether there are instances still using it.
742
743     """
744     if self.op.vg_name is not None and not self.op.vg_name:
745       if self.cfg.HasAnyDiskOfType(constants.LD_LV):
746         raise errors.OpPrereqError("Cannot disable lvm storage while lvm-based"
747                                    " instances exist", errors.ECODE_INVAL)
748
749     if (self.op.vg_name is not None and
750         utils.IsLvmEnabled(enabled_disk_templates)) or \
751            (self.cfg.GetVGName() is not None and
752             utils.LvmGetsEnabled(enabled_disk_templates,
753                                  new_enabled_disk_templates)):
754       self._CheckVgNameOnNodes(node_uuids)
755
756   def _CheckVgNameOnNodes(self, node_uuids):
757     """Check the status of the volume group on each node.
758
759     """
760     vglist = self.rpc.call_vg_list(node_uuids)
761     for node_uuid in node_uuids:
762       msg = vglist[node_uuid].fail_msg
763       if msg:
764         # ignoring down node
765         self.LogWarning("Error while gathering data on node %s"
766                         " (ignoring node): %s",
767                         self.cfg.GetNodeName(node_uuid), msg)
768         continue
769       vgstatus = utils.CheckVolumeGroupSize(vglist[node_uuid].payload,
770                                             self.op.vg_name,
771                                             constants.MIN_VG_SIZE)
772       if vgstatus:
773         raise errors.OpPrereqError("Error on node '%s': %s" %
774                                    (self.cfg.GetNodeName(node_uuid), vgstatus),
775                                    errors.ECODE_ENVIRON)
776
777   def _GetEnabledDiskTemplates(self, cluster):
778     """Determines the enabled disk templates and the subset of disk templates
779        that are newly enabled by this operation.
780
781     """
782     enabled_disk_templates = None
783     new_enabled_disk_templates = []
784     if self.op.enabled_disk_templates:
785       enabled_disk_templates = self.op.enabled_disk_templates
786       new_enabled_disk_templates = \
787         list(set(enabled_disk_templates)
788              - set(cluster.enabled_disk_templates))
789     else:
790       enabled_disk_templates = cluster.enabled_disk_templates
791     return (enabled_disk_templates, new_enabled_disk_templates)
792
793   def _CheckIpolicy(self, cluster, enabled_disk_templates):
794     """Checks the ipolicy.
795
796     @type cluster: C{objects.Cluster}
797     @param cluster: the cluster's configuration
798     @type enabled_disk_templates: list of string
799     @param enabled_disk_templates: list of (possibly newly) enabled disk
800       templates
801
802     """
803     # FIXME: write unit tests for this
804     if self.op.ipolicy:
805       self.new_ipolicy = GetUpdatedIPolicy(cluster.ipolicy, self.op.ipolicy,
806                                            group_policy=False)
807
808       CheckIpolicyVsDiskTemplates(self.new_ipolicy,
809                                   enabled_disk_templates)
810
811       all_instances = self.cfg.GetAllInstancesInfo().values()
812       violations = set()
813       for group in self.cfg.GetAllNodeGroupsInfo().values():
814         instances = frozenset([inst for inst in all_instances
815                                if compat.any(nuuid in group.members
816                                              for nuuid in inst.all_nodes)])
817         new_ipolicy = objects.FillIPolicy(self.new_ipolicy, group.ipolicy)
818         ipol = masterd.instance.CalculateGroupIPolicy(cluster, group)
819         new = ComputeNewInstanceViolations(ipol, new_ipolicy, instances,
820                                            self.cfg)
821         if new:
822           violations.update(new)
823
824       if violations:
825         self.LogWarning("After the ipolicy change the following instances"
826                         " violate them: %s",
827                         utils.CommaJoin(utils.NiceSort(violations)))
828     else:
829       CheckIpolicyVsDiskTemplates(cluster.ipolicy,
830                                   enabled_disk_templates)
831
832   def CheckPrereq(self):
833     """Check prerequisites.
834
835     This checks whether the given params don't conflict and
836     if the given volume group is valid.
837
838     """
839     if self.op.drbd_helper is not None and not self.op.drbd_helper:
840       if self.cfg.HasAnyDiskOfType(constants.LD_DRBD8):
841         raise errors.OpPrereqError("Cannot disable drbd helper while"
842                                    " drbd-based instances exist",
843                                    errors.ECODE_INVAL)
844
845     node_uuids = self.owned_locks(locking.LEVEL_NODE)
846     self.cluster = cluster = self.cfg.GetClusterInfo()
847
848     vm_capable_node_uuids = [node.uuid
849                              for node in self.cfg.GetAllNodesInfo().values()
850                              if node.uuid in node_uuids and node.vm_capable]
851
852     (enabled_disk_templates, new_enabled_disk_templates) = \
853       self._GetEnabledDiskTemplates(cluster)
854
855     self._CheckVgName(vm_capable_node_uuids, enabled_disk_templates,
856                       new_enabled_disk_templates)
857
858     if self.op.file_storage_dir is not None:
859       CheckFileStoragePathVsEnabledDiskTemplates(
860           self.LogWarning, self.op.file_storage_dir, enabled_disk_templates)
861
862     if self.op.shared_file_storage_dir is not None:
863       CheckSharedFileStoragePathVsEnabledDiskTemplates(
864           self.LogWarning, self.op.shared_file_storage_dir,
865           enabled_disk_templates)
866
867     if self.op.drbd_helper:
868       # checks given drbd helper on all nodes
869       helpers = self.rpc.call_drbd_helper(node_uuids)
870       for (_, ninfo) in self.cfg.GetMultiNodeInfo(node_uuids):
871         if ninfo.offline:
872           self.LogInfo("Not checking drbd helper on offline node %s",
873                        ninfo.name)
874           continue
875         msg = helpers[ninfo.uuid].fail_msg
876         if msg:
877           raise errors.OpPrereqError("Error checking drbd helper on node"
878                                      " '%s': %s" % (ninfo.name, msg),
879                                      errors.ECODE_ENVIRON)
880         node_helper = helpers[ninfo.uuid].payload
881         if node_helper != self.op.drbd_helper:
882           raise errors.OpPrereqError("Error on node '%s': drbd helper is %s" %
883                                      (ninfo.name, node_helper),
884                                      errors.ECODE_ENVIRON)
885
886     # validate params changes
887     if self.op.beparams:
888       objects.UpgradeBeParams(self.op.beparams)
889       utils.ForceDictType(self.op.beparams, constants.BES_PARAMETER_TYPES)
890       self.new_beparams = cluster.SimpleFillBE(self.op.beparams)
891
892     if self.op.ndparams:
893       utils.ForceDictType(self.op.ndparams, constants.NDS_PARAMETER_TYPES)
894       self.new_ndparams = cluster.SimpleFillND(self.op.ndparams)
895
896       # TODO: we need a more general way to handle resetting
897       # cluster-level parameters to default values
898       if self.new_ndparams["oob_program"] == "":
899         self.new_ndparams["oob_program"] = \
900             constants.NDC_DEFAULTS[constants.ND_OOB_PROGRAM]
901
902     if self.op.hv_state:
903       new_hv_state = MergeAndVerifyHvState(self.op.hv_state,
904                                            self.cluster.hv_state_static)
905       self.new_hv_state = dict((hv, cluster.SimpleFillHvState(values))
906                                for hv, values in new_hv_state.items())
907
908     if self.op.disk_state:
909       new_disk_state = MergeAndVerifyDiskState(self.op.disk_state,
910                                                self.cluster.disk_state_static)
911       self.new_disk_state = \
912         dict((storage, dict((name, cluster.SimpleFillDiskState(values))
913                             for name, values in svalues.items()))
914              for storage, svalues in new_disk_state.items())
915
916     self._CheckIpolicy(cluster, enabled_disk_templates)
917
918     if self.op.nicparams:
919       utils.ForceDictType(self.op.nicparams, constants.NICS_PARAMETER_TYPES)
920       self.new_nicparams = cluster.SimpleFillNIC(self.op.nicparams)
921       objects.NIC.CheckParameterSyntax(self.new_nicparams)
922       nic_errors = []
923
924       # check all instances for consistency
925       for instance in self.cfg.GetAllInstancesInfo().values():
926         for nic_idx, nic in enumerate(instance.nics):
927           params_copy = copy.deepcopy(nic.nicparams)
928           params_filled = objects.FillDict(self.new_nicparams, params_copy)
929
930           # check parameter syntax
931           try:
932             objects.NIC.CheckParameterSyntax(params_filled)
933           except errors.ConfigurationError, err:
934             nic_errors.append("Instance %s, nic/%d: %s" %
935                               (instance.name, nic_idx, err))
936
937           # if we're moving instances to routed, check that they have an ip
938           target_mode = params_filled[constants.NIC_MODE]
939           if target_mode == constants.NIC_MODE_ROUTED and not nic.ip:
940             nic_errors.append("Instance %s, nic/%d: routed NIC with no ip"
941                               " address" % (instance.name, nic_idx))
942       if nic_errors:
943         raise errors.OpPrereqError("Cannot apply the change, errors:\n%s" %
944                                    "\n".join(nic_errors), errors.ECODE_INVAL)
945
946     # hypervisor list/parameters
947     self.new_hvparams = new_hvp = objects.FillDict(cluster.hvparams, {})
948     if self.op.hvparams:
949       for hv_name, hv_dict in self.op.hvparams.items():
950         if hv_name not in self.new_hvparams:
951           self.new_hvparams[hv_name] = hv_dict
952         else:
953           self.new_hvparams[hv_name].update(hv_dict)
954
955     # disk template parameters
956     self.new_diskparams = objects.FillDict(cluster.diskparams, {})
957     if self.op.diskparams:
958       for dt_name, dt_params in self.op.diskparams.items():
959         if dt_name not in self.new_diskparams:
960           self.new_diskparams[dt_name] = dt_params
961         else:
962           self.new_diskparams[dt_name].update(dt_params)
963
964     # os hypervisor parameters
965     self.new_os_hvp = objects.FillDict(cluster.os_hvp, {})
966     if self.op.os_hvp:
967       for os_name, hvs in self.op.os_hvp.items():
968         if os_name not in self.new_os_hvp:
969           self.new_os_hvp[os_name] = hvs
970         else:
971           for hv_name, hv_dict in hvs.items():
972             if hv_dict is None:
973               # Delete if it exists
974               self.new_os_hvp[os_name].pop(hv_name, None)
975             elif hv_name not in self.new_os_hvp[os_name]:
976               self.new_os_hvp[os_name][hv_name] = hv_dict
977             else:
978               self.new_os_hvp[os_name][hv_name].update(hv_dict)
979
980     # os parameters
981     self.new_osp = objects.FillDict(cluster.osparams, {})
982     if self.op.osparams:
983       for os_name, osp in self.op.osparams.items():
984         if os_name not in self.new_osp:
985           self.new_osp[os_name] = {}
986
987         self.new_osp[os_name] = GetUpdatedParams(self.new_osp[os_name], osp,
988                                                  use_none=True)
989
990         if not self.new_osp[os_name]:
991           # we removed all parameters
992           del self.new_osp[os_name]
993         else:
994           # check the parameter validity (remote check)
995           CheckOSParams(self, False, [self.cfg.GetMasterNode()],
996                         os_name, self.new_osp[os_name])
997
998     # changes to the hypervisor list
999     if self.op.enabled_hypervisors is not None:
1000       self.hv_list = self.op.enabled_hypervisors
1001       for hv in self.hv_list:
1002         # if the hypervisor doesn't already exist in the cluster
1003         # hvparams, we initialize it to empty, and then (in both
1004         # cases) we make sure to fill the defaults, as we might not
1005         # have a complete defaults list if the hypervisor wasn't
1006         # enabled before
1007         if hv not in new_hvp:
1008           new_hvp[hv] = {}
1009         new_hvp[hv] = objects.FillDict(constants.HVC_DEFAULTS[hv], new_hvp[hv])
1010         utils.ForceDictType(new_hvp[hv], constants.HVS_PARAMETER_TYPES)
1011     else:
1012       self.hv_list = cluster.enabled_hypervisors
1013
1014     if self.op.hvparams or self.op.enabled_hypervisors is not None:
1015       # either the enabled list has changed, or the parameters have, validate
1016       for hv_name, hv_params in self.new_hvparams.items():
1017         if ((self.op.hvparams and hv_name in self.op.hvparams) or
1018             (self.op.enabled_hypervisors and
1019              hv_name in self.op.enabled_hypervisors)):
1020           # either this is a new hypervisor, or its parameters have changed
1021           hv_class = hypervisor.GetHypervisorClass(hv_name)
1022           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1023           hv_class.CheckParameterSyntax(hv_params)
1024           CheckHVParams(self, node_uuids, hv_name, hv_params)
1025
1026     self._CheckDiskTemplateConsistency()
1027
1028     if self.op.os_hvp:
1029       # no need to check any newly-enabled hypervisors, since the
1030       # defaults have already been checked in the above code-block
1031       for os_name, os_hvp in self.new_os_hvp.items():
1032         for hv_name, hv_params in os_hvp.items():
1033           utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1034           # we need to fill in the new os_hvp on top of the actual hv_p
1035           cluster_defaults = self.new_hvparams.get(hv_name, {})
1036           new_osp = objects.FillDict(cluster_defaults, hv_params)
1037           hv_class = hypervisor.GetHypervisorClass(hv_name)
1038           hv_class.CheckParameterSyntax(new_osp)
1039           CheckHVParams(self, node_uuids, hv_name, new_osp)
1040
1041     if self.op.default_iallocator:
1042       alloc_script = utils.FindFile(self.op.default_iallocator,
1043                                     constants.IALLOCATOR_SEARCH_PATH,
1044                                     os.path.isfile)
1045       if alloc_script is None:
1046         raise errors.OpPrereqError("Invalid default iallocator script '%s'"
1047                                    " specified" % self.op.default_iallocator,
1048                                    errors.ECODE_INVAL)
1049
1050   def _CheckDiskTemplateConsistency(self):
1051     """Check whether the disk templates that are going to be disabled
1052        are still in use by some instances.
1053
1054     """
1055     if self.op.enabled_disk_templates:
1056       cluster = self.cfg.GetClusterInfo()
1057       instances = self.cfg.GetAllInstancesInfo()
1058
1059       disk_templates_to_remove = set(cluster.enabled_disk_templates) \
1060         - set(self.op.enabled_disk_templates)
1061       for instance in instances.itervalues():
1062         if instance.disk_template in disk_templates_to_remove:
1063           raise errors.OpPrereqError("Cannot disable disk template '%s',"
1064                                      " because instance '%s' is using it." %
1065                                      (instance.disk_template, instance.name))
1066
1067   def _SetVgName(self, feedback_fn):
1068     """Determines and sets the new volume group name.
1069
1070     """
1071     if self.op.vg_name is not None:
1072       if self.op.vg_name and not \
1073            utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1074         feedback_fn("Note that you specified a volume group, but did not"
1075                     " enable any lvm disk template.")
1076       new_volume = self.op.vg_name
1077       if not new_volume:
1078         if utils.IsLvmEnabled(self.cluster.enabled_disk_templates):
1079           raise errors.OpPrereqError("Cannot unset volume group if lvm-based"
1080                                      " disk templates are enabled.")
1081         new_volume = None
1082       if new_volume != self.cfg.GetVGName():
1083         self.cfg.SetVGName(new_volume)
1084       else:
1085         feedback_fn("Cluster LVM configuration already in desired"
1086                     " state, not changing")
1087     else:
1088       if utils.IsLvmEnabled(self.cluster.enabled_disk_templates) and \
1089           not self.cfg.GetVGName():
1090         raise errors.OpPrereqError("Please specify a volume group when"
1091                                    " enabling lvm-based disk-templates.")
1092
1093   def _SetFileStorageDir(self, feedback_fn):
1094     """Set the file storage directory.
1095
1096     """
1097     if self.op.file_storage_dir is not None:
1098       if self.cluster.file_storage_dir == self.op.file_storage_dir:
1099         feedback_fn("Global file storage dir already set to value '%s'"
1100                     % self.cluster.file_storage_dir)
1101       else:
1102         self.cluster.file_storage_dir = self.op.file_storage_dir
1103
1104   def Exec(self, feedback_fn):
1105     """Change the parameters of the cluster.
1106
1107     """
1108     if self.op.enabled_disk_templates:
1109       self.cluster.enabled_disk_templates = \
1110         list(set(self.op.enabled_disk_templates))
1111
1112     self._SetVgName(feedback_fn)
1113     self._SetFileStorageDir(feedback_fn)
1114
1115     if self.op.drbd_helper is not None:
1116       if not constants.DT_DRBD8 in self.cluster.enabled_disk_templates:
1117         feedback_fn("Note that you specified a drbd user helper, but did"
1118                     " enabled the drbd disk template.")
1119       new_helper = self.op.drbd_helper
1120       if not new_helper:
1121         new_helper = None
1122       if new_helper != self.cfg.GetDRBDHelper():
1123         self.cfg.SetDRBDHelper(new_helper)
1124       else:
1125         feedback_fn("Cluster DRBD helper already in desired state,"
1126                     " not changing")
1127     if self.op.hvparams:
1128       self.cluster.hvparams = self.new_hvparams
1129     if self.op.os_hvp:
1130       self.cluster.os_hvp = self.new_os_hvp
1131     if self.op.enabled_hypervisors is not None:
1132       self.cluster.hvparams = self.new_hvparams
1133       self.cluster.enabled_hypervisors = self.op.enabled_hypervisors
1134     if self.op.beparams:
1135       self.cluster.beparams[constants.PP_DEFAULT] = self.new_beparams
1136     if self.op.nicparams:
1137       self.cluster.nicparams[constants.PP_DEFAULT] = self.new_nicparams
1138     if self.op.ipolicy:
1139       self.cluster.ipolicy = self.new_ipolicy
1140     if self.op.osparams:
1141       self.cluster.osparams = self.new_osp
1142     if self.op.ndparams:
1143       self.cluster.ndparams = self.new_ndparams
1144     if self.op.diskparams:
1145       self.cluster.diskparams = self.new_diskparams
1146     if self.op.hv_state:
1147       self.cluster.hv_state_static = self.new_hv_state
1148     if self.op.disk_state:
1149       self.cluster.disk_state_static = self.new_disk_state
1150
1151     if self.op.candidate_pool_size is not None:
1152       self.cluster.candidate_pool_size = self.op.candidate_pool_size
1153       # we need to update the pool size here, otherwise the save will fail
1154       AdjustCandidatePool(self, [])
1155
1156     if self.op.maintain_node_health is not None:
1157       if self.op.maintain_node_health and not constants.ENABLE_CONFD:
1158         feedback_fn("Note: CONFD was disabled at build time, node health"
1159                     " maintenance is not useful (still enabling it)")
1160       self.cluster.maintain_node_health = self.op.maintain_node_health
1161
1162     if self.op.modify_etc_hosts is not None:
1163       self.cluster.modify_etc_hosts = self.op.modify_etc_hosts
1164
1165     if self.op.prealloc_wipe_disks is not None:
1166       self.cluster.prealloc_wipe_disks = self.op.prealloc_wipe_disks
1167
1168     if self.op.add_uids is not None:
1169       uidpool.AddToUidPool(self.cluster.uid_pool, self.op.add_uids)
1170
1171     if self.op.remove_uids is not None:
1172       uidpool.RemoveFromUidPool(self.cluster.uid_pool, self.op.remove_uids)
1173
1174     if self.op.uid_pool is not None:
1175       self.cluster.uid_pool = self.op.uid_pool
1176
1177     if self.op.default_iallocator is not None:
1178       self.cluster.default_iallocator = self.op.default_iallocator
1179
1180     if self.op.reserved_lvs is not None:
1181       self.cluster.reserved_lvs = self.op.reserved_lvs
1182
1183     if self.op.use_external_mip_script is not None:
1184       self.cluster.use_external_mip_script = self.op.use_external_mip_script
1185
1186     def helper_os(aname, mods, desc):
1187       desc += " OS list"
1188       lst = getattr(self.cluster, aname)
1189       for key, val in mods:
1190         if key == constants.DDM_ADD:
1191           if val in lst:
1192             feedback_fn("OS %s already in %s, ignoring" % (val, desc))
1193           else:
1194             lst.append(val)
1195         elif key == constants.DDM_REMOVE:
1196           if val in lst:
1197             lst.remove(val)
1198           else:
1199             feedback_fn("OS %s not found in %s, ignoring" % (val, desc))
1200         else:
1201           raise errors.ProgrammerError("Invalid modification '%s'" % key)
1202
1203     if self.op.hidden_os:
1204       helper_os("hidden_os", self.op.hidden_os, "hidden")
1205
1206     if self.op.blacklisted_os:
1207       helper_os("blacklisted_os", self.op.blacklisted_os, "blacklisted")
1208
1209     if self.op.master_netdev:
1210       master_params = self.cfg.GetMasterNetworkParameters()
1211       ems = self.cfg.GetUseExternalMipScript()
1212       feedback_fn("Shutting down master ip on the current netdev (%s)" %
1213                   self.cluster.master_netdev)
1214       result = self.rpc.call_node_deactivate_master_ip(master_params.uuid,
1215                                                        master_params, ems)
1216       if not self.op.force:
1217         result.Raise("Could not disable the master ip")
1218       else:
1219         if result.fail_msg:
1220           msg = ("Could not disable the master ip (continuing anyway): %s" %
1221                  result.fail_msg)
1222           feedback_fn(msg)
1223       feedback_fn("Changing master_netdev from %s to %s" %
1224                   (master_params.netdev, self.op.master_netdev))
1225       self.cluster.master_netdev = self.op.master_netdev
1226
1227     if self.op.master_netmask:
1228       master_params = self.cfg.GetMasterNetworkParameters()
1229       feedback_fn("Changing master IP netmask to %s" % self.op.master_netmask)
1230       result = self.rpc.call_node_change_master_netmask(
1231                  master_params.uuid, master_params.netmask,
1232                  self.op.master_netmask, master_params.ip,
1233                  master_params.netdev)
1234       result.Warn("Could not change the master IP netmask", feedback_fn)
1235       self.cluster.master_netmask = self.op.master_netmask
1236
1237     self.cfg.Update(self.cluster, feedback_fn)
1238
1239     if self.op.master_netdev:
1240       master_params = self.cfg.GetMasterNetworkParameters()
1241       feedback_fn("Starting the master ip on the new master netdev (%s)" %
1242                   self.op.master_netdev)
1243       ems = self.cfg.GetUseExternalMipScript()
1244       result = self.rpc.call_node_activate_master_ip(master_params.uuid,
1245                                                      master_params, ems)
1246       result.Warn("Could not re-enable the master ip on the master,"
1247                   " please restart manually", self.LogWarning)
1248
1249
1250 class LUClusterVerify(NoHooksLU):
1251   """Submits all jobs necessary to verify the cluster.
1252
1253   """
1254   REQ_BGL = False
1255
1256   def ExpandNames(self):
1257     self.needed_locks = {}
1258
1259   def Exec(self, feedback_fn):
1260     jobs = []
1261
1262     if self.op.group_name:
1263       groups = [self.op.group_name]
1264       depends_fn = lambda: None
1265     else:
1266       groups = self.cfg.GetNodeGroupList()
1267
1268       # Verify global configuration
1269       jobs.append([
1270         opcodes.OpClusterVerifyConfig(ignore_errors=self.op.ignore_errors),
1271         ])
1272
1273       # Always depend on global verification
1274       depends_fn = lambda: [(-len(jobs), [])]
1275
1276     jobs.extend(
1277       [opcodes.OpClusterVerifyGroup(group_name=group,
1278                                     ignore_errors=self.op.ignore_errors,
1279                                     depends=depends_fn())]
1280       for group in groups)
1281
1282     # Fix up all parameters
1283     for op in itertools.chain(*jobs): # pylint: disable=W0142
1284       op.debug_simulate_errors = self.op.debug_simulate_errors
1285       op.verbose = self.op.verbose
1286       op.error_codes = self.op.error_codes
1287       try:
1288         op.skip_checks = self.op.skip_checks
1289       except AttributeError:
1290         assert not isinstance(op, opcodes.OpClusterVerifyGroup)
1291
1292     return ResultWithJobs(jobs)
1293
1294
1295 class _VerifyErrors(object):
1296   """Mix-in for cluster/group verify LUs.
1297
1298   It provides _Error and _ErrorIf, and updates the self.bad boolean. (Expects
1299   self.op and self._feedback_fn to be available.)
1300
1301   """
1302
1303   ETYPE_FIELD = "code"
1304   ETYPE_ERROR = "ERROR"
1305   ETYPE_WARNING = "WARNING"
1306
1307   def _Error(self, ecode, item, msg, *args, **kwargs):
1308     """Format an error message.
1309
1310     Based on the opcode's error_codes parameter, either format a
1311     parseable error code, or a simpler error string.
1312
1313     This must be called only from Exec and functions called from Exec.
1314
1315     """
1316     ltype = kwargs.get(self.ETYPE_FIELD, self.ETYPE_ERROR)
1317     itype, etxt, _ = ecode
1318     # If the error code is in the list of ignored errors, demote the error to a
1319     # warning
1320     if etxt in self.op.ignore_errors:     # pylint: disable=E1101
1321       ltype = self.ETYPE_WARNING
1322     # first complete the msg
1323     if args:
1324       msg = msg % args
1325     # then format the whole message
1326     if self.op.error_codes: # This is a mix-in. pylint: disable=E1101
1327       msg = "%s:%s:%s:%s:%s" % (ltype, etxt, itype, item, msg)
1328     else:
1329       if item:
1330         item = " " + item
1331       else:
1332         item = ""
1333       msg = "%s: %s%s: %s" % (ltype, itype, item, msg)
1334     # and finally report it via the feedback_fn
1335     self._feedback_fn("  - %s" % msg) # Mix-in. pylint: disable=E1101
1336     # do not mark the operation as failed for WARN cases only
1337     if ltype == self.ETYPE_ERROR:
1338       self.bad = True
1339
1340   def _ErrorIf(self, cond, *args, **kwargs):
1341     """Log an error message if the passed condition is True.
1342
1343     """
1344     if (bool(cond)
1345         or self.op.debug_simulate_errors): # pylint: disable=E1101
1346       self._Error(*args, **kwargs)
1347
1348
1349 def _VerifyCertificate(filename):
1350   """Verifies a certificate for L{LUClusterVerifyConfig}.
1351
1352   @type filename: string
1353   @param filename: Path to PEM file
1354
1355   """
1356   try:
1357     cert = OpenSSL.crypto.load_certificate(OpenSSL.crypto.FILETYPE_PEM,
1358                                            utils.ReadFile(filename))
1359   except Exception, err: # pylint: disable=W0703
1360     return (LUClusterVerifyConfig.ETYPE_ERROR,
1361             "Failed to load X509 certificate %s: %s" % (filename, err))
1362
1363   (errcode, msg) = \
1364     utils.VerifyX509Certificate(cert, constants.SSL_CERT_EXPIRATION_WARN,
1365                                 constants.SSL_CERT_EXPIRATION_ERROR)
1366
1367   if msg:
1368     fnamemsg = "While verifying %s: %s" % (filename, msg)
1369   else:
1370     fnamemsg = None
1371
1372   if errcode is None:
1373     return (None, fnamemsg)
1374   elif errcode == utils.CERT_WARNING:
1375     return (LUClusterVerifyConfig.ETYPE_WARNING, fnamemsg)
1376   elif errcode == utils.CERT_ERROR:
1377     return (LUClusterVerifyConfig.ETYPE_ERROR, fnamemsg)
1378
1379   raise errors.ProgrammerError("Unhandled certificate error code %r" % errcode)
1380
1381
1382 def _GetAllHypervisorParameters(cluster, instances):
1383   """Compute the set of all hypervisor parameters.
1384
1385   @type cluster: L{objects.Cluster}
1386   @param cluster: the cluster object
1387   @param instances: list of L{objects.Instance}
1388   @param instances: additional instances from which to obtain parameters
1389   @rtype: list of (origin, hypervisor, parameters)
1390   @return: a list with all parameters found, indicating the hypervisor they
1391        apply to, and the origin (can be "cluster", "os X", or "instance Y")
1392
1393   """
1394   hvp_data = []
1395
1396   for hv_name in cluster.enabled_hypervisors:
1397     hvp_data.append(("cluster", hv_name, cluster.GetHVDefaults(hv_name)))
1398
1399   for os_name, os_hvp in cluster.os_hvp.items():
1400     for hv_name, hv_params in os_hvp.items():
1401       if hv_params:
1402         full_params = cluster.GetHVDefaults(hv_name, os_name=os_name)
1403         hvp_data.append(("os %s" % os_name, hv_name, full_params))
1404
1405   # TODO: collapse identical parameter values in a single one
1406   for instance in instances:
1407     if instance.hvparams:
1408       hvp_data.append(("instance %s" % instance.name, instance.hypervisor,
1409                        cluster.FillHV(instance)))
1410
1411   return hvp_data
1412
1413
1414 class LUClusterVerifyConfig(NoHooksLU, _VerifyErrors):
1415   """Verifies the cluster config.
1416
1417   """
1418   REQ_BGL = False
1419
1420   def _VerifyHVP(self, hvp_data):
1421     """Verifies locally the syntax of the hypervisor parameters.
1422
1423     """
1424     for item, hv_name, hv_params in hvp_data:
1425       msg = ("hypervisor %s parameters syntax check (source %s): %%s" %
1426              (item, hv_name))
1427       try:
1428         hv_class = hypervisor.GetHypervisorClass(hv_name)
1429         utils.ForceDictType(hv_params, constants.HVS_PARAMETER_TYPES)
1430         hv_class.CheckParameterSyntax(hv_params)
1431       except errors.GenericError, err:
1432         self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg % str(err))
1433
1434   def ExpandNames(self):
1435     self.needed_locks = dict.fromkeys(locking.LEVELS, locking.ALL_SET)
1436     self.share_locks = ShareAll()
1437
1438   def CheckPrereq(self):
1439     """Check prerequisites.
1440
1441     """
1442     # Retrieve all information
1443     self.all_group_info = self.cfg.GetAllNodeGroupsInfo()
1444     self.all_node_info = self.cfg.GetAllNodesInfo()
1445     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1446
1447   def Exec(self, feedback_fn):
1448     """Verify integrity of cluster, performing various test on nodes.
1449
1450     """
1451     self.bad = False
1452     self._feedback_fn = feedback_fn
1453
1454     feedback_fn("* Verifying cluster config")
1455
1456     for msg in self.cfg.VerifyConfig():
1457       self._ErrorIf(True, constants.CV_ECLUSTERCFG, None, msg)
1458
1459     feedback_fn("* Verifying cluster certificate files")
1460
1461     for cert_filename in pathutils.ALL_CERT_FILES:
1462       (errcode, msg) = _VerifyCertificate(cert_filename)
1463       self._ErrorIf(errcode, constants.CV_ECLUSTERCERT, None, msg, code=errcode)
1464
1465     self._ErrorIf(not utils.CanRead(constants.LUXID_USER,
1466                                     pathutils.NODED_CERT_FILE),
1467                   constants.CV_ECLUSTERCERT,
1468                   None,
1469                   pathutils.NODED_CERT_FILE + " must be accessible by the " +
1470                     constants.LUXID_USER + " user")
1471
1472     feedback_fn("* Verifying hypervisor parameters")
1473
1474     self._VerifyHVP(_GetAllHypervisorParameters(self.cfg.GetClusterInfo(),
1475                                                 self.all_inst_info.values()))
1476
1477     feedback_fn("* Verifying all nodes belong to an existing group")
1478
1479     # We do this verification here because, should this bogus circumstance
1480     # occur, it would never be caught by VerifyGroup, which only acts on
1481     # nodes/instances reachable from existing node groups.
1482
1483     dangling_nodes = set(node for node in self.all_node_info.values()
1484                          if node.group not in self.all_group_info)
1485
1486     dangling_instances = {}
1487     no_node_instances = []
1488
1489     for inst in self.all_inst_info.values():
1490       if inst.primary_node in [node.uuid for node in dangling_nodes]:
1491         dangling_instances.setdefault(inst.primary_node, []).append(inst)
1492       elif inst.primary_node not in self.all_node_info:
1493         no_node_instances.append(inst)
1494
1495     pretty_dangling = [
1496         "%s (%s)" %
1497         (node.name,
1498          utils.CommaJoin(inst.name for
1499                          inst in dangling_instances.get(node.uuid, [])))
1500         for node in dangling_nodes]
1501
1502     self._ErrorIf(bool(dangling_nodes), constants.CV_ECLUSTERDANGLINGNODES,
1503                   None,
1504                   "the following nodes (and their instances) belong to a non"
1505                   " existing group: %s", utils.CommaJoin(pretty_dangling))
1506
1507     self._ErrorIf(bool(no_node_instances), constants.CV_ECLUSTERDANGLINGINST,
1508                   None,
1509                   "the following instances have a non-existing primary-node:"
1510                   " %s", utils.CommaJoin(inst.name for
1511                                          inst in no_node_instances))
1512
1513     return not self.bad
1514
1515
1516 class LUClusterVerifyGroup(LogicalUnit, _VerifyErrors):
1517   """Verifies the status of a node group.
1518
1519   """
1520   HPATH = "cluster-verify"
1521   HTYPE = constants.HTYPE_CLUSTER
1522   REQ_BGL = False
1523
1524   _HOOKS_INDENT_RE = re.compile("^", re.M)
1525
1526   class NodeImage(object):
1527     """A class representing the logical and physical status of a node.
1528
1529     @type uuid: string
1530     @ivar uuid: the node UUID to which this object refers
1531     @ivar volumes: a structure as returned from
1532         L{ganeti.backend.GetVolumeList} (runtime)
1533     @ivar instances: a list of running instances (runtime)
1534     @ivar pinst: list of configured primary instances (config)
1535     @ivar sinst: list of configured secondary instances (config)
1536     @ivar sbp: dictionary of {primary-node: list of instances} for all
1537         instances for which this node is secondary (config)
1538     @ivar mfree: free memory, as reported by hypervisor (runtime)
1539     @ivar dfree: free disk, as reported by the node (runtime)
1540     @ivar offline: the offline status (config)
1541     @type rpc_fail: boolean
1542     @ivar rpc_fail: whether the RPC verify call was successfull (overall,
1543         not whether the individual keys were correct) (runtime)
1544     @type lvm_fail: boolean
1545     @ivar lvm_fail: whether the RPC call didn't return valid LVM data
1546     @type hyp_fail: boolean
1547     @ivar hyp_fail: whether the RPC call didn't return the instance list
1548     @type ghost: boolean
1549     @ivar ghost: whether this is a known node or not (config)
1550     @type os_fail: boolean
1551     @ivar os_fail: whether the RPC call didn't return valid OS data
1552     @type oslist: list
1553     @ivar oslist: list of OSes as diagnosed by DiagnoseOS
1554     @type vm_capable: boolean
1555     @ivar vm_capable: whether the node can host instances
1556     @type pv_min: float
1557     @ivar pv_min: size in MiB of the smallest PVs
1558     @type pv_max: float
1559     @ivar pv_max: size in MiB of the biggest PVs
1560
1561     """
1562     def __init__(self, offline=False, uuid=None, vm_capable=True):
1563       self.uuid = uuid
1564       self.volumes = {}
1565       self.instances = []
1566       self.pinst = []
1567       self.sinst = []
1568       self.sbp = {}
1569       self.mfree = 0
1570       self.dfree = 0
1571       self.offline = offline
1572       self.vm_capable = vm_capable
1573       self.rpc_fail = False
1574       self.lvm_fail = False
1575       self.hyp_fail = False
1576       self.ghost = False
1577       self.os_fail = False
1578       self.oslist = {}
1579       self.pv_min = None
1580       self.pv_max = None
1581
1582   def ExpandNames(self):
1583     # This raises errors.OpPrereqError on its own:
1584     self.group_uuid = self.cfg.LookupNodeGroup(self.op.group_name)
1585
1586     # Get instances in node group; this is unsafe and needs verification later
1587     inst_uuids = \
1588       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1589
1590     self.needed_locks = {
1591       locking.LEVEL_INSTANCE: self.cfg.GetInstanceNames(inst_uuids),
1592       locking.LEVEL_NODEGROUP: [self.group_uuid],
1593       locking.LEVEL_NODE: [],
1594
1595       # This opcode is run by watcher every five minutes and acquires all nodes
1596       # for a group. It doesn't run for a long time, so it's better to acquire
1597       # the node allocation lock as well.
1598       locking.LEVEL_NODE_ALLOC: locking.ALL_SET,
1599       }
1600
1601     self.share_locks = ShareAll()
1602
1603   def DeclareLocks(self, level):
1604     if level == locking.LEVEL_NODE:
1605       # Get members of node group; this is unsafe and needs verification later
1606       nodes = set(self.cfg.GetNodeGroup(self.group_uuid).members)
1607
1608       # In Exec(), we warn about mirrored instances that have primary and
1609       # secondary living in separate node groups. To fully verify that
1610       # volumes for these instances are healthy, we will need to do an
1611       # extra call to their secondaries. We ensure here those nodes will
1612       # be locked.
1613       for inst_name in self.owned_locks(locking.LEVEL_INSTANCE):
1614         # Important: access only the instances whose lock is owned
1615         instance = self.cfg.GetInstanceInfoByName(inst_name)
1616         if instance.disk_template in constants.DTS_INT_MIRROR:
1617           nodes.update(instance.secondary_nodes)
1618
1619       self.needed_locks[locking.LEVEL_NODE] = nodes
1620
1621   def CheckPrereq(self):
1622     assert self.group_uuid in self.owned_locks(locking.LEVEL_NODEGROUP)
1623     self.group_info = self.cfg.GetNodeGroup(self.group_uuid)
1624
1625     group_node_uuids = set(self.group_info.members)
1626     group_inst_uuids = \
1627       self.cfg.GetNodeGroupInstances(self.group_uuid, primary_only=True)
1628
1629     unlocked_node_uuids = \
1630         group_node_uuids.difference(self.owned_locks(locking.LEVEL_NODE))
1631
1632     unlocked_inst_uuids = \
1633         group_inst_uuids.difference(
1634           [self.cfg.GetInstanceInfoByName(name).uuid
1635            for name in self.owned_locks(locking.LEVEL_INSTANCE)])
1636
1637     if unlocked_node_uuids:
1638       raise errors.OpPrereqError(
1639         "Missing lock for nodes: %s" %
1640         utils.CommaJoin(self.cfg.GetNodeNames(unlocked_node_uuids)),
1641         errors.ECODE_STATE)
1642
1643     if unlocked_inst_uuids:
1644       raise errors.OpPrereqError(
1645         "Missing lock for instances: %s" %
1646         utils.CommaJoin(self.cfg.GetInstanceNames(unlocked_inst_uuids)),
1647         errors.ECODE_STATE)
1648
1649     self.all_node_info = self.cfg.GetAllNodesInfo()
1650     self.all_inst_info = self.cfg.GetAllInstancesInfo()
1651
1652     self.my_node_uuids = group_node_uuids
1653     self.my_node_info = dict((node_uuid, self.all_node_info[node_uuid])
1654                              for node_uuid in group_node_uuids)
1655
1656     self.my_inst_uuids = group_inst_uuids
1657     self.my_inst_info = dict((inst_uuid, self.all_inst_info[inst_uuid])
1658                              for inst_uuid in group_inst_uuids)
1659
1660     # We detect here the nodes that will need the extra RPC calls for verifying
1661     # split LV volumes; they should be locked.
1662     extra_lv_nodes = set()
1663
1664     for inst in self.my_inst_info.values():
1665       if inst.disk_template in constants.DTS_INT_MIRROR:
1666         for nuuid in inst.all_nodes:
1667           if self.all_node_info[nuuid].group != self.group_uuid:
1668             extra_lv_nodes.add(nuuid)
1669
1670     unlocked_lv_nodes = \
1671         extra_lv_nodes.difference(self.owned_locks(locking.LEVEL_NODE))
1672
1673     if unlocked_lv_nodes:
1674       raise errors.OpPrereqError("Missing node locks for LV check: %s" %
1675                                  utils.CommaJoin(unlocked_lv_nodes),
1676                                  errors.ECODE_STATE)
1677     self.extra_lv_nodes = list(extra_lv_nodes)
1678
1679   def _VerifyNode(self, ninfo, nresult):
1680     """Perform some basic validation on data returned from a node.
1681
1682       - check the result data structure is well formed and has all the
1683         mandatory fields
1684       - check ganeti version
1685
1686     @type ninfo: L{objects.Node}
1687     @param ninfo: the node to check
1688     @param nresult: the results from the node
1689     @rtype: boolean
1690     @return: whether overall this call was successful (and we can expect
1691          reasonable values in the respose)
1692
1693     """
1694     # main result, nresult should be a non-empty dict
1695     test = not nresult or not isinstance(nresult, dict)
1696     self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1697                   "unable to verify node: no data returned")
1698     if test:
1699       return False
1700
1701     # compares ganeti version
1702     local_version = constants.PROTOCOL_VERSION
1703     remote_version = nresult.get("version", None)
1704     test = not (remote_version and
1705                 isinstance(remote_version, (list, tuple)) and
1706                 len(remote_version) == 2)
1707     self._ErrorIf(test, constants.CV_ENODERPC, ninfo.name,
1708                   "connection to node returned invalid data")
1709     if test:
1710       return False
1711
1712     test = local_version != remote_version[0]
1713     self._ErrorIf(test, constants.CV_ENODEVERSION, ninfo.name,
1714                   "incompatible protocol versions: master %s,"
1715                   " node %s", local_version, remote_version[0])
1716     if test:
1717       return False
1718
1719     # node seems compatible, we can actually try to look into its results
1720
1721     # full package version
1722     self._ErrorIf(constants.RELEASE_VERSION != remote_version[1],
1723                   constants.CV_ENODEVERSION, ninfo.name,
1724                   "software version mismatch: master %s, node %s",
1725                   constants.RELEASE_VERSION, remote_version[1],
1726                   code=self.ETYPE_WARNING)
1727
1728     hyp_result = nresult.get(constants.NV_HYPERVISOR, None)
1729     if ninfo.vm_capable and isinstance(hyp_result, dict):
1730       for hv_name, hv_result in hyp_result.iteritems():
1731         test = hv_result is not None
1732         self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
1733                       "hypervisor %s verify failure: '%s'", hv_name, hv_result)
1734
1735     hvp_result = nresult.get(constants.NV_HVPARAMS, None)
1736     if ninfo.vm_capable and isinstance(hvp_result, list):
1737       for item, hv_name, hv_result in hvp_result:
1738         self._ErrorIf(True, constants.CV_ENODEHV, ninfo.name,
1739                       "hypervisor %s parameter verify failure (source %s): %s",
1740                       hv_name, item, hv_result)
1741
1742     test = nresult.get(constants.NV_NODESETUP,
1743                        ["Missing NODESETUP results"])
1744     self._ErrorIf(test, constants.CV_ENODESETUP, ninfo.name,
1745                   "node setup error: %s", "; ".join(test))
1746
1747     return True
1748
1749   def _VerifyNodeTime(self, ninfo, nresult,
1750                       nvinfo_starttime, nvinfo_endtime):
1751     """Check the node time.
1752
1753     @type ninfo: L{objects.Node}
1754     @param ninfo: the node to check
1755     @param nresult: the remote results for the node
1756     @param nvinfo_starttime: the start time of the RPC call
1757     @param nvinfo_endtime: the end time of the RPC call
1758
1759     """
1760     ntime = nresult.get(constants.NV_TIME, None)
1761     try:
1762       ntime_merged = utils.MergeTime(ntime)
1763     except (ValueError, TypeError):
1764       self._ErrorIf(True, constants.CV_ENODETIME, ninfo.name,
1765                     "Node returned invalid time")
1766       return
1767
1768     if ntime_merged < (nvinfo_starttime - constants.NODE_MAX_CLOCK_SKEW):
1769       ntime_diff = "%.01fs" % abs(nvinfo_starttime - ntime_merged)
1770     elif ntime_merged > (nvinfo_endtime + constants.NODE_MAX_CLOCK_SKEW):
1771       ntime_diff = "%.01fs" % abs(ntime_merged - nvinfo_endtime)
1772     else:
1773       ntime_diff = None
1774
1775     self._ErrorIf(ntime_diff is not None, constants.CV_ENODETIME, ninfo.name,
1776                   "Node time diverges by at least %s from master node time",
1777                   ntime_diff)
1778
1779   def _UpdateVerifyNodeLVM(self, ninfo, nresult, vg_name, nimg):
1780     """Check the node LVM results and update info for cross-node checks.
1781
1782     @type ninfo: L{objects.Node}
1783     @param ninfo: the node to check
1784     @param nresult: the remote results for the node
1785     @param vg_name: the configured VG name
1786     @type nimg: L{NodeImage}
1787     @param nimg: node image
1788
1789     """
1790     if vg_name is None:
1791       return
1792
1793     # checks vg existence and size > 20G
1794     vglist = nresult.get(constants.NV_VGLIST, None)
1795     test = not vglist
1796     self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
1797                   "unable to check volume groups")
1798     if not test:
1799       vgstatus = utils.CheckVolumeGroupSize(vglist, vg_name,
1800                                             constants.MIN_VG_SIZE)
1801       self._ErrorIf(vgstatus, constants.CV_ENODELVM, ninfo.name, vgstatus)
1802
1803     # Check PVs
1804     (errmsgs, pvminmax) = CheckNodePVs(nresult, self._exclusive_storage)
1805     for em in errmsgs:
1806       self._Error(constants.CV_ENODELVM, ninfo.name, em)
1807     if pvminmax is not None:
1808       (nimg.pv_min, nimg.pv_max) = pvminmax
1809
1810   def _VerifyGroupDRBDVersion(self, node_verify_infos):
1811     """Check cross-node DRBD version consistency.
1812
1813     @type node_verify_infos: dict
1814     @param node_verify_infos: infos about nodes as returned from the
1815       node_verify call.
1816
1817     """
1818     node_versions = {}
1819     for node_uuid, ndata in node_verify_infos.items():
1820       nresult = ndata.payload
1821       version = nresult.get(constants.NV_DRBDVERSION, "Missing DRBD version")
1822       node_versions[node_uuid] = version
1823
1824     if len(set(node_versions.values())) > 1:
1825       for node_uuid, version in sorted(node_versions.items()):
1826         msg = "DRBD version mismatch: %s" % version
1827         self._Error(constants.CV_ENODEDRBDHELPER, node_uuid, msg,
1828                     code=self.ETYPE_WARNING)
1829
1830   def _VerifyGroupLVM(self, node_image, vg_name):
1831     """Check cross-node consistency in LVM.
1832
1833     @type node_image: dict
1834     @param node_image: info about nodes, mapping from node to names to
1835       L{NodeImage} objects
1836     @param vg_name: the configured VG name
1837
1838     """
1839     if vg_name is None:
1840       return
1841
1842     # Only exclusive storage needs this kind of checks
1843     if not self._exclusive_storage:
1844       return
1845
1846     # exclusive_storage wants all PVs to have the same size (approximately),
1847     # if the smallest and the biggest ones are okay, everything is fine.
1848     # pv_min is None iff pv_max is None
1849     vals = filter((lambda ni: ni.pv_min is not None), node_image.values())
1850     if not vals:
1851       return
1852     (pvmin, minnode_uuid) = min((ni.pv_min, ni.uuid) for ni in vals)
1853     (pvmax, maxnode_uuid) = max((ni.pv_max, ni.uuid) for ni in vals)
1854     bad = utils.LvmExclusiveTestBadPvSizes(pvmin, pvmax)
1855     self._ErrorIf(bad, constants.CV_EGROUPDIFFERENTPVSIZE, self.group_info.name,
1856                   "PV sizes differ too much in the group; smallest (%s MB) is"
1857                   " on %s, biggest (%s MB) is on %s",
1858                   pvmin, self.cfg.GetNodeName(minnode_uuid),
1859                   pvmax, self.cfg.GetNodeName(maxnode_uuid))
1860
1861   def _VerifyNodeBridges(self, ninfo, nresult, bridges):
1862     """Check the node bridges.
1863
1864     @type ninfo: L{objects.Node}
1865     @param ninfo: the node to check
1866     @param nresult: the remote results for the node
1867     @param bridges: the expected list of bridges
1868
1869     """
1870     if not bridges:
1871       return
1872
1873     missing = nresult.get(constants.NV_BRIDGES, None)
1874     test = not isinstance(missing, list)
1875     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1876                   "did not return valid bridge information")
1877     if not test:
1878       self._ErrorIf(bool(missing), constants.CV_ENODENET, ninfo.name,
1879                     "missing bridges: %s" % utils.CommaJoin(sorted(missing)))
1880
1881   def _VerifyNodeUserScripts(self, ninfo, nresult):
1882     """Check the results of user scripts presence and executability on the node
1883
1884     @type ninfo: L{objects.Node}
1885     @param ninfo: the node to check
1886     @param nresult: the remote results for the node
1887
1888     """
1889     test = not constants.NV_USERSCRIPTS in nresult
1890     self._ErrorIf(test, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1891                   "did not return user scripts information")
1892
1893     broken_scripts = nresult.get(constants.NV_USERSCRIPTS, None)
1894     if not test:
1895       self._ErrorIf(broken_scripts, constants.CV_ENODEUSERSCRIPTS, ninfo.name,
1896                     "user scripts not present or not executable: %s" %
1897                     utils.CommaJoin(sorted(broken_scripts)))
1898
1899   def _VerifyNodeNetwork(self, ninfo, nresult):
1900     """Check the node network connectivity results.
1901
1902     @type ninfo: L{objects.Node}
1903     @param ninfo: the node to check
1904     @param nresult: the remote results for the node
1905
1906     """
1907     test = constants.NV_NODELIST not in nresult
1908     self._ErrorIf(test, constants.CV_ENODESSH, ninfo.name,
1909                   "node hasn't returned node ssh connectivity data")
1910     if not test:
1911       if nresult[constants.NV_NODELIST]:
1912         for a_node, a_msg in nresult[constants.NV_NODELIST].items():
1913           self._ErrorIf(True, constants.CV_ENODESSH, ninfo.name,
1914                         "ssh communication with node '%s': %s", a_node, a_msg)
1915
1916     test = constants.NV_NODENETTEST not in nresult
1917     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1918                   "node hasn't returned node tcp connectivity data")
1919     if not test:
1920       if nresult[constants.NV_NODENETTEST]:
1921         nlist = utils.NiceSort(nresult[constants.NV_NODENETTEST].keys())
1922         for anode in nlist:
1923           self._ErrorIf(True, constants.CV_ENODENET, ninfo.name,
1924                         "tcp communication with node '%s': %s",
1925                         anode, nresult[constants.NV_NODENETTEST][anode])
1926
1927     test = constants.NV_MASTERIP not in nresult
1928     self._ErrorIf(test, constants.CV_ENODENET, ninfo.name,
1929                   "node hasn't returned node master IP reachability data")
1930     if not test:
1931       if not nresult[constants.NV_MASTERIP]:
1932         if ninfo.uuid == self.master_node:
1933           msg = "the master node cannot reach the master IP (not configured?)"
1934         else:
1935           msg = "cannot reach the master IP"
1936         self._ErrorIf(True, constants.CV_ENODENET, ninfo.name, msg)
1937
1938   def _VerifyInstance(self, instance, node_image, diskstatus):
1939     """Verify an instance.
1940
1941     This function checks to see if the required block devices are
1942     available on the instance's node, and that the nodes are in the correct
1943     state.
1944
1945     """
1946     pnode_uuid = instance.primary_node
1947     pnode_img = node_image[pnode_uuid]
1948     groupinfo = self.cfg.GetAllNodeGroupsInfo()
1949
1950     node_vol_should = {}
1951     instance.MapLVsByNode(node_vol_should)
1952
1953     cluster = self.cfg.GetClusterInfo()
1954     ipolicy = ganeti.masterd.instance.CalculateGroupIPolicy(cluster,
1955                                                             self.group_info)
1956     err = ComputeIPolicyInstanceViolation(ipolicy, instance, self.cfg)
1957     self._ErrorIf(err, constants.CV_EINSTANCEPOLICY, instance.name,
1958                   utils.CommaJoin(err), code=self.ETYPE_WARNING)
1959
1960     for node_uuid in node_vol_should:
1961       n_img = node_image[node_uuid]
1962       if n_img.offline or n_img.rpc_fail or n_img.lvm_fail:
1963         # ignore missing volumes on offline or broken nodes
1964         continue
1965       for volume in node_vol_should[node_uuid]:
1966         test = volume not in n_img.volumes
1967         self._ErrorIf(test, constants.CV_EINSTANCEMISSINGDISK, instance.name,
1968                       "volume %s missing on node %s", volume,
1969                       self.cfg.GetNodeName(node_uuid))
1970
1971     if instance.admin_state == constants.ADMINST_UP:
1972       test = instance.uuid not in pnode_img.instances and not pnode_img.offline
1973       self._ErrorIf(test, constants.CV_EINSTANCEDOWN, instance.name,
1974                     "instance not running on its primary node %s",
1975                      self.cfg.GetNodeName(pnode_uuid))
1976       self._ErrorIf(pnode_img.offline, constants.CV_EINSTANCEBADNODE,
1977                     instance.name, "instance is marked as running and lives on"
1978                     " offline node %s", self.cfg.GetNodeName(pnode_uuid))
1979
1980     diskdata = [(nname, success, status, idx)
1981                 for (nname, disks) in diskstatus.items()
1982                 for idx, (success, status) in enumerate(disks)]
1983
1984     for nname, success, bdev_status, idx in diskdata:
1985       # the 'ghost node' construction in Exec() ensures that we have a
1986       # node here
1987       snode = node_image[nname]
1988       bad_snode = snode.ghost or snode.offline
1989       self._ErrorIf(instance.disks_active and
1990                     not success and not bad_snode,
1991                     constants.CV_EINSTANCEFAULTYDISK, instance.name,
1992                     "couldn't retrieve status for disk/%s on %s: %s",
1993                     idx, self.cfg.GetNodeName(nname), bdev_status)
1994
1995       if instance.disks_active and success and \
1996          (bdev_status.is_degraded or
1997           bdev_status.ldisk_status != constants.LDS_OKAY):
1998         msg = "disk/%s on %s" % (idx, self.cfg.GetNodeName(nname))
1999         if bdev_status.is_degraded:
2000           msg += " is degraded"
2001         if bdev_status.ldisk_status != constants.LDS_OKAY:
2002           msg += "; state is '%s'" % \
2003                  constants.LDS_NAMES[bdev_status.ldisk_status]
2004
2005         self._Error(constants.CV_EINSTANCEFAULTYDISK, instance.name, msg)
2006
2007     self._ErrorIf(pnode_img.rpc_fail and not pnode_img.offline,
2008                   constants.CV_ENODERPC, self.cfg.GetNodeName(pnode_uuid),
2009                   "instance %s, connection to primary node failed",
2010                   instance.name)
2011
2012     self._ErrorIf(len(instance.secondary_nodes) > 1,
2013                   constants.CV_EINSTANCELAYOUT, instance.name,
2014                   "instance has multiple secondary nodes: %s",
2015                   utils.CommaJoin(instance.secondary_nodes),
2016                   code=self.ETYPE_WARNING)
2017
2018     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg, instance.all_nodes)
2019     if any(es_flags.values()):
2020       if instance.disk_template not in constants.DTS_EXCL_STORAGE:
2021         # Disk template not compatible with exclusive_storage: no instance
2022         # node should have the flag set
2023         es_nodes = [n
2024                     for (n, es) in es_flags.items()
2025                     if es]
2026         self._Error(constants.CV_EINSTANCEUNSUITABLENODE, instance.name,
2027                     "instance has template %s, which is not supported on nodes"
2028                     " that have exclusive storage set: %s",
2029                     instance.disk_template,
2030                     utils.CommaJoin(self.cfg.GetNodeNames(es_nodes)))
2031       for (idx, disk) in enumerate(instance.disks):
2032         self._ErrorIf(disk.spindles is None,
2033                       constants.CV_EINSTANCEMISSINGCFGPARAMETER, instance.name,
2034                       "number of spindles not configured for disk %s while"
2035                       " exclusive storage is enabled, try running"
2036                       " gnt-cluster repair-disk-sizes", idx)
2037
2038     if instance.disk_template in constants.DTS_INT_MIRROR:
2039       instance_nodes = utils.NiceSort(instance.all_nodes)
2040       instance_groups = {}
2041
2042       for node_uuid in instance_nodes:
2043         instance_groups.setdefault(self.all_node_info[node_uuid].group,
2044                                    []).append(node_uuid)
2045
2046       pretty_list = [
2047         "%s (group %s)" % (utils.CommaJoin(self.cfg.GetNodeNames(nodes)),
2048                            groupinfo[group].name)
2049         # Sort so that we always list the primary node first.
2050         for group, nodes in sorted(instance_groups.items(),
2051                                    key=lambda (_, nodes): pnode_uuid in nodes,
2052                                    reverse=True)]
2053
2054       self._ErrorIf(len(instance_groups) > 1,
2055                     constants.CV_EINSTANCESPLITGROUPS,
2056                     instance.name, "instance has primary and secondary nodes in"
2057                     " different groups: %s", utils.CommaJoin(pretty_list),
2058                     code=self.ETYPE_WARNING)
2059
2060     inst_nodes_offline = []
2061     for snode in instance.secondary_nodes:
2062       s_img = node_image[snode]
2063       self._ErrorIf(s_img.rpc_fail and not s_img.offline, constants.CV_ENODERPC,
2064                     self.cfg.GetNodeName(snode),
2065                     "instance %s, connection to secondary node failed",
2066                     instance.name)
2067
2068       if s_img.offline:
2069         inst_nodes_offline.append(snode)
2070
2071     # warn that the instance lives on offline nodes
2072     self._ErrorIf(inst_nodes_offline, constants.CV_EINSTANCEBADNODE,
2073                   instance.name, "instance has offline secondary node(s) %s",
2074                   utils.CommaJoin(self.cfg.GetNodeNames(inst_nodes_offline)))
2075     # ... or ghost/non-vm_capable nodes
2076     for node_uuid in instance.all_nodes:
2077       self._ErrorIf(node_image[node_uuid].ghost, constants.CV_EINSTANCEBADNODE,
2078                     instance.name, "instance lives on ghost node %s",
2079                     self.cfg.GetNodeName(node_uuid))
2080       self._ErrorIf(not node_image[node_uuid].vm_capable,
2081                     constants.CV_EINSTANCEBADNODE, instance.name,
2082                     "instance lives on non-vm_capable node %s",
2083                     self.cfg.GetNodeName(node_uuid))
2084
2085   def _VerifyOrphanVolumes(self, node_vol_should, node_image, reserved):
2086     """Verify if there are any unknown volumes in the cluster.
2087
2088     The .os, .swap and backup volumes are ignored. All other volumes are
2089     reported as unknown.
2090
2091     @type reserved: L{ganeti.utils.FieldSet}
2092     @param reserved: a FieldSet of reserved volume names
2093
2094     """
2095     for node_uuid, n_img in node_image.items():
2096       if (n_img.offline or n_img.rpc_fail or n_img.lvm_fail or
2097           self.all_node_info[node_uuid].group != self.group_uuid):
2098         # skip non-healthy nodes
2099         continue
2100       for volume in n_img.volumes:
2101         test = ((node_uuid not in node_vol_should or
2102                 volume not in node_vol_should[node_uuid]) and
2103                 not reserved.Matches(volume))
2104         self._ErrorIf(test, constants.CV_ENODEORPHANLV,
2105                       self.cfg.GetNodeName(node_uuid),
2106                       "volume %s is unknown", volume)
2107
2108   def _VerifyNPlusOneMemory(self, node_image, all_insts):
2109     """Verify N+1 Memory Resilience.
2110
2111     Check that if one single node dies we can still start all the
2112     instances it was primary for.
2113
2114     """
2115     cluster_info = self.cfg.GetClusterInfo()
2116     for node_uuid, n_img in node_image.items():
2117       # This code checks that every node which is now listed as
2118       # secondary has enough memory to host all instances it is
2119       # supposed to should a single other node in the cluster fail.
2120       # FIXME: not ready for failover to an arbitrary node
2121       # FIXME: does not support file-backed instances
2122       # WARNING: we currently take into account down instances as well
2123       # as up ones, considering that even if they're down someone
2124       # might want to start them even in the event of a node failure.
2125       if n_img.offline or \
2126          self.all_node_info[node_uuid].group != self.group_uuid:
2127         # we're skipping nodes marked offline and nodes in other groups from
2128         # the N+1 warning, since most likely we don't have good memory
2129         # infromation from them; we already list instances living on such
2130         # nodes, and that's enough warning
2131         continue
2132       #TODO(dynmem): also consider ballooning out other instances
2133       for prinode, inst_uuids in n_img.sbp.items():
2134         needed_mem = 0
2135         for inst_uuid in inst_uuids:
2136           bep = cluster_info.FillBE(all_insts[inst_uuid])
2137           if bep[constants.BE_AUTO_BALANCE]:
2138             needed_mem += bep[constants.BE_MINMEM]
2139         test = n_img.mfree < needed_mem
2140         self._ErrorIf(test, constants.CV_ENODEN1,
2141                       self.cfg.GetNodeName(node_uuid),
2142                       "not enough memory to accomodate instance failovers"
2143                       " should node %s fail (%dMiB needed, %dMiB available)",
2144                       self.cfg.GetNodeName(prinode), needed_mem, n_img.mfree)
2145
2146   def _VerifyFiles(self, nodes, master_node_uuid, all_nvinfo,
2147                    (files_all, files_opt, files_mc, files_vm)):
2148     """Verifies file checksums collected from all nodes.
2149
2150     @param nodes: List of L{objects.Node} objects
2151     @param master_node_uuid: UUID of master node
2152     @param all_nvinfo: RPC results
2153
2154     """
2155     # Define functions determining which nodes to consider for a file
2156     files2nodefn = [
2157       (files_all, None),
2158       (files_mc, lambda node: (node.master_candidate or
2159                                node.uuid == master_node_uuid)),
2160       (files_vm, lambda node: node.vm_capable),
2161       ]
2162
2163     # Build mapping from filename to list of nodes which should have the file
2164     nodefiles = {}
2165     for (files, fn) in files2nodefn:
2166       if fn is None:
2167         filenodes = nodes
2168       else:
2169         filenodes = filter(fn, nodes)
2170       nodefiles.update((filename,
2171                         frozenset(map(operator.attrgetter("uuid"), filenodes)))
2172                        for filename in files)
2173
2174     assert set(nodefiles) == (files_all | files_mc | files_vm)
2175
2176     fileinfo = dict((filename, {}) for filename in nodefiles)
2177     ignore_nodes = set()
2178
2179     for node in nodes:
2180       if node.offline:
2181         ignore_nodes.add(node.uuid)
2182         continue
2183
2184       nresult = all_nvinfo[node.uuid]
2185
2186       if nresult.fail_msg or not nresult.payload:
2187         node_files = None
2188       else:
2189         fingerprints = nresult.payload.get(constants.NV_FILELIST, None)
2190         node_files = dict((vcluster.LocalizeVirtualPath(key), value)
2191                           for (key, value) in fingerprints.items())
2192         del fingerprints
2193
2194       test = not (node_files and isinstance(node_files, dict))
2195       self._ErrorIf(test, constants.CV_ENODEFILECHECK, node.name,
2196                     "Node did not return file checksum data")
2197       if test:
2198         ignore_nodes.add(node.uuid)
2199         continue
2200
2201       # Build per-checksum mapping from filename to nodes having it
2202       for (filename, checksum) in node_files.items():
2203         assert filename in nodefiles
2204         fileinfo[filename].setdefault(checksum, set()).add(node.uuid)
2205
2206     for (filename, checksums) in fileinfo.items():
2207       assert compat.all(len(i) > 10 for i in checksums), "Invalid checksum"
2208
2209       # Nodes having the file
2210       with_file = frozenset(node_uuid
2211                             for node_uuids in fileinfo[filename].values()
2212                             for node_uuid in node_uuids) - ignore_nodes
2213
2214       expected_nodes = nodefiles[filename] - ignore_nodes
2215
2216       # Nodes missing file
2217       missing_file = expected_nodes - with_file
2218
2219       if filename in files_opt:
2220         # All or no nodes
2221         self._ErrorIf(missing_file and missing_file != expected_nodes,
2222                       constants.CV_ECLUSTERFILECHECK, None,
2223                       "File %s is optional, but it must exist on all or no"
2224                       " nodes (not found on %s)",
2225                       filename,
2226                       utils.CommaJoin(
2227                         utils.NiceSort(
2228                           map(self.cfg.GetNodeName, missing_file))))
2229       else:
2230         self._ErrorIf(missing_file, constants.CV_ECLUSTERFILECHECK, None,
2231                       "File %s is missing from node(s) %s", filename,
2232                       utils.CommaJoin(
2233                         utils.NiceSort(
2234                           map(self.cfg.GetNodeName, missing_file))))
2235
2236         # Warn if a node has a file it shouldn't
2237         unexpected = with_file - expected_nodes
2238         self._ErrorIf(unexpected,
2239                       constants.CV_ECLUSTERFILECHECK, None,
2240                       "File %s should not exist on node(s) %s",
2241                       filename, utils.CommaJoin(
2242                         utils.NiceSort(map(self.cfg.GetNodeName, unexpected))))
2243
2244       # See if there are multiple versions of the file
2245       test = len(checksums) > 1
2246       if test:
2247         variants = ["variant %s on %s" %
2248                     (idx + 1,
2249                      utils.CommaJoin(utils.NiceSort(
2250                        map(self.cfg.GetNodeName, node_uuids))))
2251                     for (idx, (checksum, node_uuids)) in
2252                       enumerate(sorted(checksums.items()))]
2253       else:
2254         variants = []
2255
2256       self._ErrorIf(test, constants.CV_ECLUSTERFILECHECK, None,
2257                     "File %s found with %s different checksums (%s)",
2258                     filename, len(checksums), "; ".join(variants))
2259
2260   def _VerifyNodeDrbd(self, ninfo, nresult, instanceinfo, drbd_helper,
2261                       drbd_map):
2262     """Verifies and the node DRBD status.
2263
2264     @type ninfo: L{objects.Node}
2265     @param ninfo: the node to check
2266     @param nresult: the remote results for the node
2267     @param instanceinfo: the dict of instances
2268     @param drbd_helper: the configured DRBD usermode helper
2269     @param drbd_map: the DRBD map as returned by
2270         L{ganeti.config.ConfigWriter.ComputeDRBDMap}
2271
2272     """
2273     if drbd_helper:
2274       helper_result = nresult.get(constants.NV_DRBDHELPER, None)
2275       test = (helper_result is None)
2276       self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2277                     "no drbd usermode helper returned")
2278       if helper_result:
2279         status, payload = helper_result
2280         test = not status
2281         self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2282                       "drbd usermode helper check unsuccessful: %s", payload)
2283         test = status and (payload != drbd_helper)
2284         self._ErrorIf(test, constants.CV_ENODEDRBDHELPER, ninfo.name,
2285                       "wrong drbd usermode helper: %s", payload)
2286
2287     # compute the DRBD minors
2288     node_drbd = {}
2289     for minor, inst_uuid in drbd_map[ninfo.uuid].items():
2290       test = inst_uuid not in instanceinfo
2291       self._ErrorIf(test, constants.CV_ECLUSTERCFG, None,
2292                     "ghost instance '%s' in temporary DRBD map", inst_uuid)
2293         # ghost instance should not be running, but otherwise we
2294         # don't give double warnings (both ghost instance and
2295         # unallocated minor in use)
2296       if test:
2297         node_drbd[minor] = (inst_uuid, False)
2298       else:
2299         instance = instanceinfo[inst_uuid]
2300         node_drbd[minor] = (inst_uuid, instance.disks_active)
2301
2302     # and now check them
2303     used_minors = nresult.get(constants.NV_DRBDLIST, [])
2304     test = not isinstance(used_minors, (tuple, list))
2305     self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2306                   "cannot parse drbd status file: %s", str(used_minors))
2307     if test:
2308       # we cannot check drbd status
2309       return
2310
2311     for minor, (inst_uuid, must_exist) in node_drbd.items():
2312       test = minor not in used_minors and must_exist
2313       self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2314                     "drbd minor %d of instance %s is not active", minor,
2315                     self.cfg.GetInstanceName(inst_uuid))
2316     for minor in used_minors:
2317       test = minor not in node_drbd
2318       self._ErrorIf(test, constants.CV_ENODEDRBD, ninfo.name,
2319                     "unallocated drbd minor %d is in use", minor)
2320
2321   def _UpdateNodeOS(self, ninfo, nresult, nimg):
2322     """Builds the node OS structures.
2323
2324     @type ninfo: L{objects.Node}
2325     @param ninfo: the node to check
2326     @param nresult: the remote results for the node
2327     @param nimg: the node image object
2328
2329     """
2330     remote_os = nresult.get(constants.NV_OSLIST, None)
2331     test = (not isinstance(remote_os, list) or
2332             not compat.all(isinstance(v, list) and len(v) == 7
2333                            for v in remote_os))
2334
2335     self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2336                   "node hasn't returned valid OS data")
2337
2338     nimg.os_fail = test
2339
2340     if test:
2341       return
2342
2343     os_dict = {}
2344
2345     for (name, os_path, status, diagnose,
2346          variants, parameters, api_ver) in nresult[constants.NV_OSLIST]:
2347
2348       if name not in os_dict:
2349         os_dict[name] = []
2350
2351       # parameters is a list of lists instead of list of tuples due to
2352       # JSON lacking a real tuple type, fix it:
2353       parameters = [tuple(v) for v in parameters]
2354       os_dict[name].append((os_path, status, diagnose,
2355                             set(variants), set(parameters), set(api_ver)))
2356
2357     nimg.oslist = os_dict
2358
2359   def _VerifyNodeOS(self, ninfo, nimg, base):
2360     """Verifies the node OS list.
2361
2362     @type ninfo: L{objects.Node}
2363     @param ninfo: the node to check
2364     @param nimg: the node image object
2365     @param base: the 'template' node we match against (e.g. from the master)
2366
2367     """
2368     assert not nimg.os_fail, "Entered _VerifyNodeOS with failed OS rpc?"
2369
2370     beautify_params = lambda l: ["%s: %s" % (k, v) for (k, v) in l]
2371     for os_name, os_data in nimg.oslist.items():
2372       assert os_data, "Empty OS status for OS %s?!" % os_name
2373       f_path, f_status, f_diag, f_var, f_param, f_api = os_data[0]
2374       self._ErrorIf(not f_status, constants.CV_ENODEOS, ninfo.name,
2375                     "Invalid OS %s (located at %s): %s",
2376                     os_name, f_path, f_diag)
2377       self._ErrorIf(len(os_data) > 1, constants.CV_ENODEOS, ninfo.name,
2378                     "OS '%s' has multiple entries"
2379                     " (first one shadows the rest): %s",
2380                     os_name, utils.CommaJoin([v[0] for v in os_data]))
2381       # comparisons with the 'base' image
2382       test = os_name not in base.oslist
2383       self._ErrorIf(test, constants.CV_ENODEOS, ninfo.name,
2384                     "Extra OS %s not present on reference node (%s)",
2385                     os_name, self.cfg.GetNodeName(base.uuid))
2386       if test:
2387         continue
2388       assert base.oslist[os_name], "Base node has empty OS status?"
2389       _, b_status, _, b_var, b_param, b_api = base.oslist[os_name][0]
2390       if not b_status:
2391         # base OS is invalid, skipping
2392         continue
2393       for kind, a, b in [("API version", f_api, b_api),
2394                          ("variants list", f_var, b_var),
2395                          ("parameters", beautify_params(f_param),
2396                           beautify_params(b_param))]:
2397         self._ErrorIf(a != b, constants.CV_ENODEOS, ninfo.name,
2398                       "OS %s for %s differs from reference node %s:"
2399                       " [%s] vs. [%s]", kind, os_name,
2400                       self.cfg.GetNodeName(base.uuid),
2401                       utils.CommaJoin(sorted(a)), utils.CommaJoin(sorted(b)))
2402
2403     # check any missing OSes
2404     missing = set(base.oslist.keys()).difference(nimg.oslist.keys())
2405     self._ErrorIf(missing, constants.CV_ENODEOS, ninfo.name,
2406                   "OSes present on reference node %s"
2407                   " but missing on this node: %s",
2408                   self.cfg.GetNodeName(base.uuid), utils.CommaJoin(missing))
2409
2410   def _VerifyAcceptedFileStoragePaths(self, ninfo, nresult, is_master):
2411     """Verifies paths in L{pathutils.FILE_STORAGE_PATHS_FILE}.
2412
2413     @type ninfo: L{objects.Node}
2414     @param ninfo: the node to check
2415     @param nresult: the remote results for the node
2416     @type is_master: bool
2417     @param is_master: Whether node is the master node
2418
2419     """
2420     cluster = self.cfg.GetClusterInfo()
2421     if (is_master and
2422         (cluster.IsFileStorageEnabled() or
2423          cluster.IsSharedFileStorageEnabled())):
2424       try:
2425         fspaths = nresult[constants.NV_ACCEPTED_STORAGE_PATHS]
2426       except KeyError:
2427         # This should never happen
2428         self._ErrorIf(True, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2429                       "Node did not return forbidden file storage paths")
2430       else:
2431         self._ErrorIf(fspaths, constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2432                       "Found forbidden file storage paths: %s",
2433                       utils.CommaJoin(fspaths))
2434     else:
2435       self._ErrorIf(constants.NV_ACCEPTED_STORAGE_PATHS in nresult,
2436                     constants.CV_ENODEFILESTORAGEPATHS, ninfo.name,
2437                     "Node should not have returned forbidden file storage"
2438                     " paths")
2439
2440   def _VerifyStoragePaths(self, ninfo, nresult, file_disk_template,
2441                           verify_key, error_key):
2442     """Verifies (file) storage paths.
2443
2444     @type ninfo: L{objects.Node}
2445     @param ninfo: the node to check
2446     @param nresult: the remote results for the node
2447     @type file_disk_template: string
2448     @param file_disk_template: file-based disk template, whose directory
2449         is supposed to be verified
2450     @type verify_key: string
2451     @param verify_key: key for the verification map of this file
2452         verification step
2453     @param error_key: error key to be added to the verification results
2454         in case something goes wrong in this verification step
2455
2456     """
2457     assert (file_disk_template in
2458             utils.storage.GetDiskTemplatesOfStorageType(constants.ST_FILE))
2459     cluster = self.cfg.GetClusterInfo()
2460     if cluster.IsDiskTemplateEnabled(file_disk_template):
2461       self._ErrorIf(
2462           verify_key in nresult,
2463           error_key, ninfo.name,
2464           "The configured %s storage path is unusable: %s" %
2465           (file_disk_template, nresult.get(verify_key)))
2466
2467   def _VerifyFileStoragePaths(self, ninfo, nresult):
2468     """Verifies (file) storage paths.
2469
2470     @see: C{_VerifyStoragePaths}
2471
2472     """
2473     self._VerifyStoragePaths(
2474         ninfo, nresult, constants.DT_FILE,
2475         constants.NV_FILE_STORAGE_PATH,
2476         constants.CV_ENODEFILESTORAGEPATHUNUSABLE)
2477
2478   def _VerifySharedFileStoragePaths(self, ninfo, nresult):
2479     """Verifies (file) storage paths.
2480
2481     @see: C{_VerifyStoragePaths}
2482
2483     """
2484     self._VerifyStoragePaths(
2485         ninfo, nresult, constants.DT_SHARED_FILE,
2486         constants.NV_SHARED_FILE_STORAGE_PATH,
2487         constants.CV_ENODESHAREDFILESTORAGEPATHUNUSABLE)
2488
2489   def _VerifyOob(self, ninfo, nresult):
2490     """Verifies out of band functionality of a node.
2491
2492     @type ninfo: L{objects.Node}
2493     @param ninfo: the node to check
2494     @param nresult: the remote results for the node
2495
2496     """
2497     # We just have to verify the paths on master and/or master candidates
2498     # as the oob helper is invoked on the master
2499     if ((ninfo.master_candidate or ninfo.master_capable) and
2500         constants.NV_OOB_PATHS in nresult):
2501       for path_result in nresult[constants.NV_OOB_PATHS]:
2502         self._ErrorIf(path_result, constants.CV_ENODEOOBPATH,
2503                       ninfo.name, path_result)
2504
2505   def _UpdateNodeVolumes(self, ninfo, nresult, nimg, vg_name):
2506     """Verifies and updates the node volume data.
2507
2508     This function will update a L{NodeImage}'s internal structures
2509     with data from the remote call.
2510
2511     @type ninfo: L{objects.Node}
2512     @param ninfo: the node to check
2513     @param nresult: the remote results for the node
2514     @param nimg: the node image object
2515     @param vg_name: the configured VG name
2516
2517     """
2518     nimg.lvm_fail = True
2519     lvdata = nresult.get(constants.NV_LVLIST, "Missing LV data")
2520     if vg_name is None:
2521       pass
2522     elif isinstance(lvdata, basestring):
2523       self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2524                     "LVM problem on node: %s", utils.SafeEncode(lvdata))
2525     elif not isinstance(lvdata, dict):
2526       self._ErrorIf(True, constants.CV_ENODELVM, ninfo.name,
2527                     "rpc call to node failed (lvlist)")
2528     else:
2529       nimg.volumes = lvdata
2530       nimg.lvm_fail = False
2531
2532   def _UpdateNodeInstances(self, ninfo, nresult, nimg):
2533     """Verifies and updates the node instance list.
2534
2535     If the listing was successful, then updates this node's instance
2536     list. Otherwise, it marks the RPC call as failed for the instance
2537     list key.
2538
2539     @type ninfo: L{objects.Node}
2540     @param ninfo: the node to check
2541     @param nresult: the remote results for the node
2542     @param nimg: the node image object
2543
2544     """
2545     idata = nresult.get(constants.NV_INSTANCELIST, None)
2546     test = not isinstance(idata, list)
2547     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2548                   "rpc call to node failed (instancelist): %s",
2549                   utils.SafeEncode(str(idata)))
2550     if test:
2551       nimg.hyp_fail = True
2552     else:
2553       nimg.instances = [inst.uuid for (_, inst) in
2554                         self.cfg.GetMultiInstanceInfoByName(idata)]
2555
2556   def _UpdateNodeInfo(self, ninfo, nresult, nimg, vg_name):
2557     """Verifies and computes a node information map
2558
2559     @type ninfo: L{objects.Node}
2560     @param ninfo: the node to check
2561     @param nresult: the remote results for the node
2562     @param nimg: the node image object
2563     @param vg_name: the configured VG name
2564
2565     """
2566     # try to read free memory (from the hypervisor)
2567     hv_info = nresult.get(constants.NV_HVINFO, None)
2568     test = not isinstance(hv_info, dict) or "memory_free" not in hv_info
2569     self._ErrorIf(test, constants.CV_ENODEHV, ninfo.name,
2570                   "rpc call to node failed (hvinfo)")
2571     if not test:
2572       try:
2573         nimg.mfree = int(hv_info["memory_free"])
2574       except (ValueError, TypeError):
2575         self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2576                       "node returned invalid nodeinfo, check hypervisor")
2577
2578     # FIXME: devise a free space model for file based instances as well
2579     if vg_name is not None:
2580       test = (constants.NV_VGLIST not in nresult or
2581               vg_name not in nresult[constants.NV_VGLIST])
2582       self._ErrorIf(test, constants.CV_ENODELVM, ninfo.name,
2583                     "node didn't return data for the volume group '%s'"
2584                     " - it is either missing or broken", vg_name)
2585       if not test:
2586         try:
2587           nimg.dfree = int(nresult[constants.NV_VGLIST][vg_name])
2588         except (ValueError, TypeError):
2589           self._ErrorIf(True, constants.CV_ENODERPC, ninfo.name,
2590                         "node returned invalid LVM info, check LVM status")
2591
2592   def _CollectDiskInfo(self, node_uuids, node_image, instanceinfo):
2593     """Gets per-disk status information for all instances.
2594
2595     @type node_uuids: list of strings
2596     @param node_uuids: Node UUIDs
2597     @type node_image: dict of (UUID, L{objects.Node})
2598     @param node_image: Node objects
2599     @type instanceinfo: dict of (UUID, L{objects.Instance})
2600     @param instanceinfo: Instance objects
2601     @rtype: {instance: {node: [(succes, payload)]}}
2602     @return: a dictionary of per-instance dictionaries with nodes as
2603         keys and disk information as values; the disk information is a
2604         list of tuples (success, payload)
2605
2606     """
2607     node_disks = {}
2608     node_disks_devonly = {}
2609     diskless_instances = set()
2610     diskless = constants.DT_DISKLESS
2611
2612     for nuuid in node_uuids:
2613       node_inst_uuids = list(itertools.chain(node_image[nuuid].pinst,
2614                                              node_image[nuuid].sinst))
2615       diskless_instances.update(uuid for uuid in node_inst_uuids
2616                                 if instanceinfo[uuid].disk_template == diskless)
2617       disks = [(inst_uuid, disk)
2618                for inst_uuid in node_inst_uuids
2619                for disk in instanceinfo[inst_uuid].disks]
2620
2621       if not disks:
2622         # No need to collect data
2623         continue
2624
2625       node_disks[nuuid] = disks
2626
2627       # _AnnotateDiskParams makes already copies of the disks
2628       devonly = []
2629       for (inst_uuid, dev) in disks:
2630         (anno_disk,) = AnnotateDiskParams(instanceinfo[inst_uuid], [dev],
2631                                           self.cfg)
2632         self.cfg.SetDiskID(anno_disk, nuuid)
2633         devonly.append(anno_disk)
2634
2635       node_disks_devonly[nuuid] = devonly
2636
2637     assert len(node_disks) == len(node_disks_devonly)
2638
2639     # Collect data from all nodes with disks
2640     result = self.rpc.call_blockdev_getmirrorstatus_multi(node_disks.keys(),
2641                                                           node_disks_devonly)
2642
2643     assert len(result) == len(node_disks)
2644
2645     instdisk = {}
2646
2647     for (nuuid, nres) in result.items():
2648       node = self.cfg.GetNodeInfo(nuuid)
2649       disks = node_disks[node.uuid]
2650
2651       if nres.offline:
2652         # No data from this node
2653         data = len(disks) * [(False, "node offline")]
2654       else:
2655         msg = nres.fail_msg
2656         self._ErrorIf(msg, constants.CV_ENODERPC, node.name,
2657                       "while getting disk information: %s", msg)
2658         if msg:
2659           # No data from this node
2660           data = len(disks) * [(False, msg)]
2661         else:
2662           data = []
2663           for idx, i in enumerate(nres.payload):
2664             if isinstance(i, (tuple, list)) and len(i) == 2:
2665               data.append(i)
2666             else:
2667               logging.warning("Invalid result from node %s, entry %d: %s",
2668                               node.name, idx, i)
2669               data.append((False, "Invalid result from the remote node"))
2670
2671       for ((inst_uuid, _), status) in zip(disks, data):
2672         instdisk.setdefault(inst_uuid, {}).setdefault(node.uuid, []) \
2673           .append(status)
2674
2675     # Add empty entries for diskless instances.
2676     for inst_uuid in diskless_instances:
2677       assert inst_uuid not in instdisk
2678       instdisk[inst_uuid] = {}
2679
2680     assert compat.all(len(statuses) == len(instanceinfo[inst].disks) and
2681                       len(nuuids) <= len(instanceinfo[inst].all_nodes) and
2682                       compat.all(isinstance(s, (tuple, list)) and
2683                                  len(s) == 2 for s in statuses)
2684                       for inst, nuuids in instdisk.items()
2685                       for nuuid, statuses in nuuids.items())
2686     if __debug__:
2687       instdisk_keys = set(instdisk)
2688       instanceinfo_keys = set(instanceinfo)
2689       assert instdisk_keys == instanceinfo_keys, \
2690         ("instdisk keys (%s) do not match instanceinfo keys (%s)" %
2691          (instdisk_keys, instanceinfo_keys))
2692
2693     return instdisk
2694
2695   @staticmethod
2696   def _SshNodeSelector(group_uuid, all_nodes):
2697     """Create endless iterators for all potential SSH check hosts.
2698
2699     """
2700     nodes = [node for node in all_nodes
2701              if (node.group != group_uuid and
2702                  not node.offline)]
2703     keyfunc = operator.attrgetter("group")
2704
2705     return map(itertools.cycle,
2706                [sorted(map(operator.attrgetter("name"), names))
2707                 for _, names in itertools.groupby(sorted(nodes, key=keyfunc),
2708                                                   keyfunc)])
2709
2710   @classmethod
2711   def _SelectSshCheckNodes(cls, group_nodes, group_uuid, all_nodes):
2712     """Choose which nodes should talk to which other nodes.
2713
2714     We will make nodes contact all nodes in their group, and one node from
2715     every other group.
2716
2717     @warning: This algorithm has a known issue if one node group is much
2718       smaller than others (e.g. just one node). In such a case all other
2719       nodes will talk to the single node.
2720
2721     """
2722     online_nodes = sorted(node.name for node in group_nodes if not node.offline)
2723     sel = cls._SshNodeSelector(group_uuid, all_nodes)
2724
2725     return (online_nodes,
2726             dict((name, sorted([i.next() for i in sel]))
2727                  for name in online_nodes))
2728
2729   def BuildHooksEnv(self):
2730     """Build hooks env.
2731
2732     Cluster-Verify hooks just ran in the post phase and their failure makes
2733     the output be logged in the verify output and the verification to fail.
2734
2735     """
2736     env = {
2737       "CLUSTER_TAGS": " ".join(self.cfg.GetClusterInfo().GetTags()),
2738       }
2739
2740     env.update(("NODE_TAGS_%s" % node.name, " ".join(node.GetTags()))
2741                for node in self.my_node_info.values())
2742
2743     return env
2744
2745   def BuildHooksNodes(self):
2746     """Build hooks nodes.
2747
2748     """
2749     return ([], list(self.my_node_info.keys()))
2750
2751   def Exec(self, feedback_fn):
2752     """Verify integrity of the node group, performing various test on nodes.
2753
2754     """
2755     # This method has too many local variables. pylint: disable=R0914
2756     feedback_fn("* Verifying group '%s'" % self.group_info.name)
2757
2758     if not self.my_node_uuids:
2759       # empty node group
2760       feedback_fn("* Empty node group, skipping verification")
2761       return True
2762
2763     self.bad = False
2764     verbose = self.op.verbose
2765     self._feedback_fn = feedback_fn
2766
2767     vg_name = self.cfg.GetVGName()
2768     drbd_helper = self.cfg.GetDRBDHelper()
2769     cluster = self.cfg.GetClusterInfo()
2770     hypervisors = cluster.enabled_hypervisors
2771     node_data_list = self.my_node_info.values()
2772
2773     i_non_redundant = [] # Non redundant instances
2774     i_non_a_balanced = [] # Non auto-balanced instances
2775     i_offline = 0 # Count of offline instances
2776     n_offline = 0 # Count of offline nodes
2777     n_drained = 0 # Count of nodes being drained
2778     node_vol_should = {}
2779
2780     # FIXME: verify OS list
2781
2782     # File verification
2783     filemap = ComputeAncillaryFiles(cluster, False)
2784
2785     # do local checksums
2786     master_node_uuid = self.master_node = self.cfg.GetMasterNode()
2787     master_ip = self.cfg.GetMasterIP()
2788
2789     feedback_fn("* Gathering data (%d nodes)" % len(self.my_node_uuids))
2790
2791     user_scripts = []
2792     if self.cfg.GetUseExternalMipScript():
2793       user_scripts.append(pathutils.EXTERNAL_MASTER_SETUP_SCRIPT)
2794
2795     node_verify_param = {
2796       constants.NV_FILELIST:
2797         map(vcluster.MakeVirtualPath,
2798             utils.UniqueSequence(filename
2799                                  for files in filemap
2800                                  for filename in files)),
2801       constants.NV_NODELIST:
2802         self._SelectSshCheckNodes(node_data_list, self.group_uuid,
2803                                   self.all_node_info.values()),
2804       constants.NV_HYPERVISOR: hypervisors,
2805       constants.NV_HVPARAMS:
2806         _GetAllHypervisorParameters(cluster, self.all_inst_info.values()),
2807       constants.NV_NODENETTEST: [(node.name, node.primary_ip, node.secondary_ip)
2808                                  for node in node_data_list
2809                                  if not node.offline],
2810       constants.NV_INSTANCELIST: hypervisors,
2811       constants.NV_VERSION: None,
2812       constants.NV_HVINFO: self.cfg.GetHypervisorType(),
2813       constants.NV_NODESETUP: None,
2814       constants.NV_TIME: None,
2815       constants.NV_MASTERIP: (self.cfg.GetMasterNodeName(), master_ip),
2816       constants.NV_OSLIST: None,
2817       constants.NV_VMNODES: self.cfg.GetNonVmCapableNodeList(),
2818       constants.NV_USERSCRIPTS: user_scripts,
2819       }
2820
2821     if vg_name is not None:
2822       node_verify_param[constants.NV_VGLIST] = None
2823       node_verify_param[constants.NV_LVLIST] = vg_name
2824       node_verify_param[constants.NV_PVLIST] = [vg_name]
2825
2826     if drbd_helper:
2827       node_verify_param[constants.NV_DRBDVERSION] = None
2828       node_verify_param[constants.NV_DRBDLIST] = None
2829       node_verify_param[constants.NV_DRBDHELPER] = drbd_helper
2830
2831     if cluster.IsFileStorageEnabled() or \
2832         cluster.IsSharedFileStorageEnabled():
2833       # Load file storage paths only from master node
2834       node_verify_param[constants.NV_ACCEPTED_STORAGE_PATHS] = \
2835         self.cfg.GetMasterNodeName()
2836       if cluster.IsFileStorageEnabled():
2837         node_verify_param[constants.NV_FILE_STORAGE_PATH] = \
2838           cluster.file_storage_dir
2839
2840     # bridge checks
2841     # FIXME: this needs to be changed per node-group, not cluster-wide
2842     bridges = set()
2843     default_nicpp = cluster.nicparams[constants.PP_DEFAULT]
2844     if default_nicpp[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2845       bridges.add(default_nicpp[constants.NIC_LINK])
2846     for inst_uuid in self.my_inst_info.values():
2847       for nic in inst_uuid.nics:
2848         full_nic = cluster.SimpleFillNIC(nic.nicparams)
2849         if full_nic[constants.NIC_MODE] == constants.NIC_MODE_BRIDGED:
2850           bridges.add(full_nic[constants.NIC_LINK])
2851
2852     if bridges:
2853       node_verify_param[constants.NV_BRIDGES] = list(bridges)
2854
2855     # Build our expected cluster state
2856     node_image = dict((node.uuid, self.NodeImage(offline=node.offline,
2857                                                  uuid=node.uuid,
2858                                                  vm_capable=node.vm_capable))
2859                       for node in node_data_list)
2860
2861     # Gather OOB paths
2862     oob_paths = []
2863     for node in self.all_node_info.values():
2864       path = SupportsOob(self.cfg, node)
2865       if path and path not in oob_paths:
2866         oob_paths.append(path)
2867
2868     if oob_paths:
2869       node_verify_param[constants.NV_OOB_PATHS] = oob_paths
2870
2871     for inst_uuid in self.my_inst_uuids:
2872       instance = self.my_inst_info[inst_uuid]
2873       if instance.admin_state == constants.ADMINST_OFFLINE:
2874         i_offline += 1
2875
2876       for nuuid in instance.all_nodes:
2877         if nuuid not in node_image:
2878           gnode = self.NodeImage(uuid=nuuid)
2879           gnode.ghost = (nuuid not in self.all_node_info)
2880           node_image[nuuid] = gnode
2881
2882       instance.MapLVsByNode(node_vol_should)
2883
2884       pnode = instance.primary_node
2885       node_image[pnode].pinst.append(instance.uuid)
2886
2887       for snode in instance.secondary_nodes:
2888         nimg = node_image[snode]
2889         nimg.sinst.append(instance.uuid)
2890         if pnode not in nimg.sbp:
2891           nimg.sbp[pnode] = []
2892         nimg.sbp[pnode].append(instance.uuid)
2893
2894     es_flags = rpc.GetExclusiveStorageForNodes(self.cfg,
2895                                                self.my_node_info.keys())
2896     # The value of exclusive_storage should be the same across the group, so if
2897     # it's True for at least a node, we act as if it were set for all the nodes
2898     self._exclusive_storage = compat.any(es_flags.values())
2899     if self._exclusive_storage:
2900       node_verify_param[constants.NV_EXCLUSIVEPVS] = True
2901
2902     # At this point, we have the in-memory data structures complete,
2903     # except for the runtime information, which we'll gather next
2904
2905     # Due to the way our RPC system works, exact response times cannot be
2906     # guaranteed (e.g. a broken node could run into a timeout). By keeping the
2907     # time before and after executing the request, we can at least have a time
2908     # window.
2909     nvinfo_starttime = time.time()
2910     all_nvinfo = self.rpc.call_node_verify(self.my_node_uuids,
2911                                            node_verify_param,
2912                                            self.cfg.GetClusterName(),
2913                                            self.cfg.GetClusterInfo().hvparams)
2914     nvinfo_endtime = time.time()
2915
2916     if self.extra_lv_nodes and vg_name is not None:
2917       extra_lv_nvinfo = \
2918           self.rpc.call_node_verify(self.extra_lv_nodes,
2919                                     {constants.NV_LVLIST: vg_name},
2920                                     self.cfg.GetClusterName(),
2921                                     self.cfg.GetClusterInfo().hvparams)
2922     else:
2923       extra_lv_nvinfo = {}
2924
2925     all_drbd_map = self.cfg.ComputeDRBDMap()
2926
2927     feedback_fn("* Gathering disk information (%s nodes)" %
2928                 len(self.my_node_uuids))
2929     instdisk = self._CollectDiskInfo(self.my_node_info.keys(), node_image,
2930                                      self.my_inst_info)
2931
2932     feedback_fn("* Verifying configuration file consistency")
2933
2934     # If not all nodes are being checked, we need to make sure the master node
2935     # and a non-checked vm_capable node are in the list.
2936     absent_node_uuids = set(self.all_node_info).difference(self.my_node_info)
2937     if absent_node_uuids:
2938       vf_nvinfo = all_nvinfo.copy()
2939       vf_node_info = list(self.my_node_info.values())
2940       additional_node_uuids = []
2941       if master_node_uuid not in self.my_node_info:
2942         additional_node_uuids.append(master_node_uuid)
2943         vf_node_info.append(self.all_node_info[master_node_uuid])
2944       # Add the first vm_capable node we find which is not included,
2945       # excluding the master node (which we already have)
2946       for node_uuid in absent_node_uuids:
2947         nodeinfo = self.all_node_info[node_uuid]
2948         if (nodeinfo.vm_capable and not nodeinfo.offline and
2949             node_uuid != master_node_uuid):
2950           additional_node_uuids.append(node_uuid)
2951           vf_node_info.append(self.all_node_info[node_uuid])
2952           break
2953       key = constants.NV_FILELIST
2954       vf_nvinfo.update(self.rpc.call_node_verify(
2955          additional_node_uuids, {key: node_verify_param[key]},
2956          self.cfg.GetClusterName(), self.cfg.GetClusterInfo().hvparams))
2957     else:
2958       vf_nvinfo = all_nvinfo
2959       vf_node_info = self.my_node_info.values()
2960
2961     self._VerifyFiles(vf_node_info, master_node_uuid, vf_nvinfo, filemap)
2962
2963     feedback_fn("* Verifying node status")
2964
2965     refos_img = None
2966
2967     for node_i in node_data_list:
2968       nimg = node_image[node_i.uuid]
2969
2970       if node_i.offline:
2971         if verbose:
2972           feedback_fn("* Skipping offline node %s" % (node_i.name,))
2973         n_offline += 1
2974         continue
2975
2976       if node_i.uuid == master_node_uuid:
2977         ntype = "master"
2978       elif node_i.master_candidate:
2979         ntype = "master candidate"
2980       elif node_i.drained:
2981         ntype = "drained"
2982         n_drained += 1
2983       else:
2984         ntype = "regular"
2985       if verbose:
2986         feedback_fn("* Verifying node %s (%s)" % (node_i.name, ntype))
2987
2988       msg = all_nvinfo[node_i.uuid].fail_msg
2989       self._ErrorIf(msg, constants.CV_ENODERPC, node_i.name,
2990                     "while contacting node: %s", msg)
2991       if msg:
2992         nimg.rpc_fail = True
2993         continue
2994
2995       nresult = all_nvinfo[node_i.uuid].payload
2996
2997       nimg.call_ok = self._VerifyNode(node_i, nresult)
2998       self._VerifyNodeTime(node_i, nresult, nvinfo_starttime, nvinfo_endtime)
2999       self._VerifyNodeNetwork(node_i, nresult)
3000       self._VerifyNodeUserScripts(node_i, nresult)
3001       self._VerifyOob(node_i, nresult)
3002       self._VerifyAcceptedFileStoragePaths(node_i, nresult,
3003                                            node_i.uuid == master_node_uuid)
3004       self._VerifyFileStoragePaths(node_i, nresult)
3005       self._VerifySharedFileStoragePaths(node_i, nresult)
3006
3007       if nimg.vm_capable:
3008         self._UpdateVerifyNodeLVM(node_i, nresult, vg_name, nimg)
3009         self._VerifyNodeDrbd(node_i, nresult, self.all_inst_info, drbd_helper,
3010                              all_drbd_map)
3011
3012         self._UpdateNodeVolumes(node_i, nresult, nimg, vg_name)
3013         self._UpdateNodeInstances(node_i, nresult, nimg)
3014         self._UpdateNodeInfo(node_i, nresult, nimg, vg_name)
3015         self._UpdateNodeOS(node_i, nresult, nimg)
3016
3017         if not nimg.os_fail:
3018           if refos_img is None:
3019             refos_img = nimg
3020           self._VerifyNodeOS(node_i, nimg, refos_img)
3021         self._VerifyNodeBridges(node_i, nresult, bridges)
3022
3023         # Check whether all running instances are primary for the node. (This
3024         # can no longer be done from _VerifyInstance below, since some of the
3025         # wrong instances could be from other node groups.)
3026         non_primary_inst_uuids = set(nimg.instances).difference(nimg.pinst)
3027
3028         for inst_uuid in non_primary_inst_uuids:
3029           test = inst_uuid in self.all_inst_info
3030           self._ErrorIf(test, constants.CV_EINSTANCEWRONGNODE,
3031                         self.cfg.GetInstanceName(inst_uuid),
3032                         "instance should not run on node %s", node_i.name)
3033           self._ErrorIf(not test, constants.CV_ENODEORPHANINSTANCE, node_i.name,
3034                         "node is running unknown instance %s", inst_uuid)
3035
3036     self._VerifyGroupDRBDVersion(all_nvinfo)
3037     self._VerifyGroupLVM(node_image, vg_name)
3038
3039     for node_uuid, result in extra_lv_nvinfo.items():
3040       self._UpdateNodeVolumes(self.all_node_info[node_uuid], result.payload,
3041                               node_image[node_uuid], vg_name)
3042
3043     feedback_fn("* Verifying instance status")
3044     for inst_uuid in self.my_inst_uuids:
3045       instance = self.my_inst_info[inst_uuid]
3046       if verbose:
3047         feedback_fn("* Verifying instance %s" % instance.name)
3048       self._VerifyInstance(instance, node_image, instdisk[inst_uuid])
3049
3050       # If the instance is non-redundant we cannot survive losing its primary
3051       # node, so we are not N+1 compliant.
3052       if instance.disk_template not in constants.DTS_MIRRORED:
3053         i_non_redundant.append(instance)
3054
3055       if not cluster.FillBE(instance)[constants.BE_AUTO_BALANCE]:
3056         i_non_a_balanced.append(instance)
3057
3058     feedback_fn("* Verifying orphan volumes")
3059     reserved = utils.FieldSet(*cluster.reserved_lvs)
3060
3061     # We will get spurious "unknown volume" warnings if any node of this group
3062     # is secondary for an instance whose primary is in another group. To avoid
3063     # them, we find these instances and add their volumes to node_vol_should.
3064     for instance in self.all_inst_info.values():
3065       for secondary in instance.secondary_nodes:
3066         if (secondary in self.my_node_info
3067             and instance.name not in self.my_inst_info):
3068           instance.MapLVsByNode(node_vol_should)
3069           break
3070
3071     self._VerifyOrphanVolumes(node_vol_should, node_image, reserved)
3072
3073     if constants.VERIFY_NPLUSONE_MEM not in self.op.skip_checks:
3074       feedback_fn("* Verifying N+1 Memory redundancy")
3075       self._VerifyNPlusOneMemory(node_image, self.my_inst_info)
3076
3077     feedback_fn("* Other Notes")
3078     if i_non_redundant:
3079       feedback_fn("  - NOTICE: %d non-redundant instance(s) found."
3080                   % len(i_non_redundant))
3081
3082     if i_non_a_balanced:
3083       feedback_fn("  - NOTICE: %d non-auto-balanced instance(s) found."
3084                   % len(i_non_a_balanced))
3085
3086     if i_offline:
3087       feedback_fn("  - NOTICE: %d offline instance(s) found." % i_offline)
3088
3089     if n_offline:
3090       feedback_fn("  - NOTICE: %d offline node(s) found." % n_offline)
3091
3092     if n_drained:
3093       feedback_fn("  - NOTICE: %d drained node(s) found." % n_drained)
3094
3095     return not self.bad
3096
3097   def HooksCallBack(self, phase, hooks_results, feedback_fn, lu_result):
3098     """Analyze the post-hooks' result
3099
3100     This method analyses the hook result, handles it, and sends some
3101     nicely-formatted feedback back to the user.
3102
3103     @param phase: one of L{constants.HOOKS_PHASE_POST} or
3104         L{constants.HOOKS_PHASE_PRE}; it denotes the hooks phase
3105     @param hooks_results: the results of the multi-node hooks rpc call
3106     @param feedback_fn: function used send feedback back to the caller
3107     @param lu_result: previous Exec result
3108     @return: the new Exec result, based on the previous result
3109         and hook results
3110
3111     """
3112     # We only really run POST phase hooks, only for non-empty groups,
3113     # and are only interested in their results
3114     if not self.my_node_uuids:
3115       # empty node group
3116       pass
3117     elif phase == constants.HOOKS_PHASE_POST:
3118       # Used to change hooks' output to proper indentation
3119       feedback_fn("* Hooks Results")
3120       assert hooks_results, "invalid result from hooks"
3121
3122       for node_name in hooks_results:
3123         res = hooks_results[node_name]
3124         msg = res.fail_msg
3125         test = msg and not res.offline
3126         self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3127                       "Communication failure in hooks execution: %s", msg)
3128         if res.offline or msg:
3129           # No need to investigate payload if node is offline or gave
3130           # an error.
3131           continue
3132         for script, hkr, output in res.payload:
3133           test = hkr == constants.HKR_FAIL
3134           self._ErrorIf(test, constants.CV_ENODEHOOKS, node_name,
3135                         "Script %s failed, output:", script)
3136           if test:
3137             output = self._HOOKS_INDENT_RE.sub("      ", output)
3138             feedback_fn("%s" % output)
3139             lu_result = False
3140
3141     return lu_result
3142
3143
3144 class LUClusterVerifyDisks(NoHooksLU):
3145   """Verifies the cluster disks status.
3146
3147   """
3148   REQ_BGL = False
3149
3150   def ExpandNames(self):
3151     self.share_locks = ShareAll()
3152     self.needed_locks = {
3153       locking.LEVEL_NODEGROUP: locking.ALL_SET,
3154       }
3155
3156   def Exec(self, feedback_fn):
3157     group_names = self.owned_locks(locking.LEVEL_NODEGROUP)
3158
3159     # Submit one instance of L{opcodes.OpGroupVerifyDisks} per node group
3160     return ResultWithJobs([[opcodes.OpGroupVerifyDisks(group_name=group)]
3161                            for group in group_names])